001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.flow.hadoop.planner;
022    
023    import java.util.LinkedHashMap;
024    import java.util.List;
025    import java.util.Map;
026    import java.util.Set;
027    
028    import cascading.flow.FlowElement;
029    import cascading.flow.FlowStep;
030    import cascading.flow.hadoop.HadoopFlowStep;
031    import cascading.flow.planner.ElementGraph;
032    import cascading.flow.planner.FlowStepGraph;
033    import cascading.flow.planner.PlannerException;
034    import cascading.flow.planner.Scope;
035    import cascading.pipe.Group;
036    import cascading.pipe.HashJoin;
037    import cascading.pipe.Pipe;
038    import cascading.pipe.Splice;
039    import cascading.tap.Tap;
040    import org.apache.hadoop.mapred.JobConf;
041    import org.jgrapht.GraphPath;
042    import org.jgrapht.Graphs;
043    import org.jgrapht.graph.SimpleDirectedGraph;
044    import org.jgrapht.traverse.TopologicalOrderIterator;
045    import org.slf4j.Logger;
046    import org.slf4j.LoggerFactory;
047    
048    import static cascading.flow.planner.ElementGraphs.*;
049    
050    /**
051     *
052     */
053    public class HadoopStepGraph extends FlowStepGraph<JobConf>
054      {
055      /** Field LOG */
056      private static final Logger LOG = LoggerFactory.getLogger( HadoopStepGraph.class );
057    
058      public HadoopStepGraph()
059        {
060        }
061    
062      public HadoopStepGraph( String flowName, ElementGraph elementGraph )
063        {
064        super( flowName, elementGraph );
065        }
066    
067      protected FlowStep<JobConf> createFlowStep( String stepName, int stepNum )
068        {
069        return new HadoopFlowStep( stepName, stepNum );
070        }
071    
072      /**
073       * Creates the map reduce step graph.
074       *
075       * @param flowName
076       * @param elementGraph
077       */
078      protected void makeStepGraph( String flowName, ElementGraph elementGraph )
079        {
080        SimpleDirectedGraph<Tap, Integer> tapGraph = elementGraph.makeTapGraph();
081    
082        int numJobs = countNumJobs( tapGraph );
083    
084        Map<Tap, FlowStep<JobConf>> steps = new LinkedHashMap<Tap, FlowStep<JobConf>>();
085        TopologicalOrderIterator<Tap, Integer> iterator = new TopologicalOrderIterator<Tap, Integer>( tapGraph );
086        int count = 0;
087    
088        while( iterator.hasNext() )
089          {
090          Tap source = iterator.next();
091    
092          LOG.debug( "handling source: {}", source );
093    
094          List<Tap> sinks = Graphs.successorListOf( tapGraph, source );
095    
096          for( Tap sink : sinks )
097            {
098            LOG.debug( "handling path: {} -> {}", source, sink );
099    
100            HadoopFlowStep step = (HadoopFlowStep) getCreateFlowStep( steps, sink, numJobs );
101    
102            addVertex( step );
103    
104            if( steps.containsKey( source ) )
105              addEdge( steps.get( source ), step, count++ );
106    
107            populateStep( elementGraph, source, sink, step );
108            }
109          }
110        }
111    
112      private void populateStep( ElementGraph elementGraph, Tap source, Tap sink, HadoopFlowStep step )
113        {
114        Map<String, Tap> traps = elementGraph.getTrapMap();
115    
116        // support multiple paths from source to sink
117        // this allows for self joins on groups, even with different operation stacks between them
118        // note we must ignore paths with intermediate taps
119        List<GraphPath<FlowElement, Scope>> paths = getAllShortestPathsBetween( elementGraph, source, sink );
120    
121        for( GraphPath<FlowElement, Scope> path : paths )
122          {
123          if( pathContainsTap( path ) )
124            continue;
125    
126          List<Scope> scopes = path.getEdgeList();
127          String sourceName = scopes.get( 0 ).getName(); // root node of the shortest path
128          String sinkName = scopes.get( scopes.size() - 1 ).getName();
129    
130          step.addSource( sourceName, source );
131          step.addSink( sinkName, sink );
132    
133          FlowElement lhs = source;
134    
135          step.getGraph().addVertex( lhs );
136    
137          boolean onMapSide = true;
138    
139          for( Scope scope : scopes )
140            {
141            FlowElement rhs = elementGraph.getEdgeTarget( scope );
142    
143            step.getGraph().addVertex( rhs );
144            step.getGraph().addEdge( lhs, rhs, scope );
145    
146            if( rhs instanceof Group )
147              {
148              step.addGroup( (Group) rhs );
149              onMapSide = false;
150              }
151            else if( rhs instanceof HashJoin )
152              {
153              if( !onMapSide )
154                throw new PlannerException( "joins must not present Reduce side" );
155    
156              Map<Integer, Integer> sourcePaths = countOrderedDirectPathsBetween( elementGraph, source, (Splice) rhs );
157    
158              boolean isStreamed = isOnlyStreamedPath( sourcePaths );
159              boolean isAccumulated = isOnlyAccumulatedPath( sourcePaths );
160              boolean isBoth = isBothAccumulatedAndStreamedPath( sourcePaths );
161    
162              if( isStreamed || isBoth )
163                step.addStreamedSourceFor( (HashJoin) rhs, source );
164    
165              if( isAccumulated || isBoth )
166                step.addAccumulatedSourceFor( (HashJoin) rhs, source );
167              }
168            else if( rhs instanceof Pipe ) // add relevant traps to step
169              {
170              String name = ( (Pipe) rhs ).getName();
171    
172              // this is legacy, can probably now collapse into one collection safely
173              if( traps.containsKey( name ) )
174                {
175                if( onMapSide )
176                  step.getMapperTraps().put( name, traps.get( name ) );
177                else
178                  step.getReducerTraps().put( name, traps.get( name ) );
179                }
180              }
181    
182            lhs = rhs;
183            }
184          }
185        }
186    
187      private int countNumJobs( SimpleDirectedGraph<Tap, Integer> tapGraph )
188        {
189        Set<Tap> vertices = tapGraph.vertexSet();
190        int count = 0;
191    
192        for( Tap vertex : vertices )
193          {
194          if( tapGraph.inDegreeOf( vertex ) != 0 )
195            count++;
196          }
197    
198        return count;
199        }
200      }