001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.flow.hadoop.stream;
022    
023    import java.io.IOException;
024    import java.util.HashMap;
025    import java.util.List;
026    import java.util.Map;
027    import java.util.Set;
028    
029    import cascading.flow.FlowElement;
030    import cascading.flow.FlowException;
031    import cascading.flow.FlowProcess;
032    import cascading.flow.hadoop.HadoopFlowProcess;
033    import cascading.flow.hadoop.HadoopFlowStep;
034    import cascading.flow.hadoop.util.HadoopUtil;
035    import cascading.flow.stream.Gate;
036    import cascading.flow.stream.MemoryHashJoinGate;
037    import cascading.flow.stream.SinkStage;
038    import cascading.flow.stream.SourceStage;
039    import cascading.flow.stream.SpliceGate;
040    import cascading.flow.stream.StepStreamGraph;
041    import cascading.pipe.CoGroup;
042    import cascading.pipe.Group;
043    import cascading.pipe.GroupBy;
044    import cascading.pipe.HashJoin;
045    import cascading.tap.Tap;
046    import org.apache.hadoop.mapred.JobConf;
047    
048    /**
049     *
050     */
051    public class HadoopMapStreamGraph extends StepStreamGraph
052      {
053      private final Tap source;
054      private SourceStage streamedHead;
055    
056      public HadoopMapStreamGraph( HadoopFlowProcess flowProcess, HadoopFlowStep step, Tap source )
057        {
058        super( flowProcess, step );
059        this.source = source;
060    
061        buildGraph();
062    
063        setTraps();
064        setScopes();
065    
066        printGraph( step.getID(), "map", flowProcess.getCurrentSliceNum() );
067        bind();
068        }
069    
070      public SourceStage getStreamedHead()
071        {
072        return streamedHead;
073        }
074    
075      protected void buildGraph()
076        {
077        streamedHead = handleHead( this.source, flowProcess );
078    
079        FlowElement tail = step.getGroup() != null ? step.getGroup() : step.getSink();
080        Set<Tap> tributaries = step.getJoinTributariesBetween( this.source, tail );
081    
082        tributaries.remove( this.source ); // we cannot stream and accumulate the same source
083    
084        // accumulated paths
085        for( Object source : tributaries )
086          {
087          HadoopFlowProcess hadoopProcess = (HadoopFlowProcess) flowProcess;
088          JobConf conf = hadoopProcess.getJobConf();
089    
090          // allows client side config to be used cluster side
091          String property = conf.getRaw( "cascading.step.accumulated.source.conf." + Tap.id( (Tap) source ) );
092    
093          if( property == null )
094            throw new IllegalStateException( "accumulated source conf property missing for: " + ( (Tap) source ).getIdentifier() );
095    
096          conf = getSourceConf( hadoopProcess, conf, property );
097          flowProcess = new HadoopFlowProcess( hadoopProcess, conf );
098    
099          handleHead( (Tap) source, flowProcess );
100          }
101        }
102    
103      private JobConf getSourceConf( HadoopFlowProcess flowProcess, JobConf conf, String property )
104        {
105        Map<String, String> priorConf;
106        try
107          {
108          priorConf = (Map<String, String>) HadoopUtil.deserializeBase64( property, conf, HashMap.class, true );
109          }
110        catch( IOException exception )
111          {
112          throw new FlowException( "unable to deserialize properties", exception );
113          }
114    
115        return flowProcess.mergeMapIntoConfig( conf, priorConf );
116        }
117    
118      private SourceStage handleHead( Tap source, FlowProcess flowProcess )
119        {
120        SourceStage sourceDuct = new SourceStage( flowProcess, source );
121    
122        addHead( sourceDuct );
123    
124        handleDuct( source, sourceDuct );
125    
126        return sourceDuct;
127        }
128    
129      @Override
130      protected SinkStage createSinkStage( Tap element )
131        {
132        return new HadoopSinkStage( flowProcess, element );
133        }
134    
135      protected Gate createCoGroupGate( CoGroup element )
136        {
137        return new HadoopCoGroupGate( flowProcess, element, SpliceGate.Role.sink );
138        }
139    
140      protected Gate createGroupByGate( GroupBy element )
141        {
142        return new HadoopGroupByGate( flowProcess, element, SpliceGate.Role.sink );
143        }
144    
145      @Override
146      protected MemoryHashJoinGate createNonBlockingJoinGate( HashJoin join )
147        {
148        return new HadoopMemoryJoinGate( flowProcess, join ); // does not use a latch
149        }
150    
151      protected boolean stopOnElement( FlowElement lhsElement, List<FlowElement> successors )
152        {
153        if( lhsElement instanceof Group )
154          return true;
155    
156        if( successors.isEmpty() )
157          {
158          if( !( lhsElement instanceof Tap ) )
159            throw new IllegalStateException( "expected a Tap instance" );
160    
161          return true;
162          }
163    
164        return false;
165        }
166      }