001 /* 002 * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.flow.hadoop.planner; 022 023 import java.util.LinkedHashMap; 024 import java.util.List; 025 import java.util.Map; 026 import java.util.Set; 027 028 import cascading.flow.FlowElement; 029 import cascading.flow.FlowStep; 030 import cascading.flow.hadoop.HadoopFlowStep; 031 import cascading.flow.planner.ElementGraph; 032 import cascading.flow.planner.FlowStepGraph; 033 import cascading.flow.planner.PlannerException; 034 import cascading.flow.planner.Scope; 035 import cascading.pipe.Group; 036 import cascading.pipe.HashJoin; 037 import cascading.pipe.Pipe; 038 import cascading.pipe.Splice; 039 import cascading.tap.Tap; 040 import org.apache.hadoop.mapred.JobConf; 041 import org.jgrapht.GraphPath; 042 import org.jgrapht.Graphs; 043 import org.jgrapht.graph.SimpleDirectedGraph; 044 import org.jgrapht.traverse.TopologicalOrderIterator; 045 import org.slf4j.Logger; 046 import org.slf4j.LoggerFactory; 047 048 import static cascading.flow.planner.ElementGraphs.*; 049 050 /** 051 * 052 */ 053 public class HadoopStepGraph extends FlowStepGraph<JobConf> 054 { 055 /** Field LOG */ 056 private static final Logger LOG = LoggerFactory.getLogger( HadoopStepGraph.class ); 057 058 public HadoopStepGraph() 059 { 060 } 061 062 public HadoopStepGraph( String flowName, ElementGraph elementGraph ) 063 { 064 super( flowName, elementGraph ); 065 } 066 067 protected FlowStep<JobConf> createFlowStep( String stepName, int stepNum ) 068 { 069 return new HadoopFlowStep( stepName, stepNum ); 070 } 071 072 /** 073 * Creates the map reduce step graph. 074 * 075 * @param flowName 076 * @param elementGraph 077 */ 078 protected void makeStepGraph( String flowName, ElementGraph elementGraph ) 079 { 080 SimpleDirectedGraph<Tap, Integer> tapGraph = elementGraph.makeTapGraph(); 081 082 int numJobs = countNumJobs( tapGraph ); 083 084 Map<Tap, FlowStep<JobConf>> steps = new LinkedHashMap<Tap, FlowStep<JobConf>>(); 085 TopologicalOrderIterator<Tap, Integer> iterator = new TopologicalOrderIterator<Tap, Integer>( tapGraph ); 086 int count = 0; 087 088 while( iterator.hasNext() ) 089 { 090 Tap source = iterator.next(); 091 092 LOG.debug( "handling source: {}", source ); 093 094 List<Tap> sinks = Graphs.successorListOf( tapGraph, source ); 095 096 for( Tap sink : sinks ) 097 { 098 LOG.debug( "handling path: {} -> {}", source, sink ); 099 100 HadoopFlowStep step = (HadoopFlowStep) getCreateFlowStep( steps, sink, numJobs ); 101 102 addVertex( step ); 103 104 if( steps.containsKey( source ) ) 105 addEdge( steps.get( source ), step, count++ ); 106 107 populateStep( elementGraph, source, sink, step ); 108 } 109 } 110 } 111 112 private void populateStep( ElementGraph elementGraph, Tap source, Tap sink, HadoopFlowStep step ) 113 { 114 Map<String, Tap> traps = elementGraph.getTrapMap(); 115 116 // support multiple paths from source to sink 117 // this allows for self joins on groups, even with different operation stacks between them 118 // note we must ignore paths with intermediate taps 119 List<GraphPath<FlowElement, Scope>> paths = getAllShortestPathsBetween( elementGraph, source, sink ); 120 121 for( GraphPath<FlowElement, Scope> path : paths ) 122 { 123 if( pathContainsTap( path ) ) 124 continue; 125 126 List<Scope> scopes = path.getEdgeList(); 127 String sourceName = scopes.get( 0 ).getName(); // root node of the shortest path 128 String sinkName = scopes.get( scopes.size() - 1 ).getName(); 129 130 step.addSource( sourceName, source ); 131 step.addSink( sinkName, sink ); 132 133 FlowElement lhs = source; 134 135 step.getGraph().addVertex( lhs ); 136 137 boolean onMapSide = true; 138 139 for( Scope scope : scopes ) 140 { 141 FlowElement rhs = elementGraph.getEdgeTarget( scope ); 142 143 step.getGraph().addVertex( rhs ); 144 step.getGraph().addEdge( lhs, rhs, scope ); 145 146 if( rhs instanceof Group ) 147 { 148 step.addGroup( (Group) rhs ); 149 onMapSide = false; 150 } 151 else if( rhs instanceof HashJoin ) 152 { 153 if( !onMapSide ) 154 throw new PlannerException( "joins must not present Reduce side" ); 155 156 Map<Integer, Integer> sourcePaths = countOrderedDirectPathsBetween( elementGraph, source, (Splice) rhs ); 157 158 boolean isStreamed = isOnlyStreamedPath( sourcePaths ); 159 boolean isAccumulated = isOnlyAccumulatedPath( sourcePaths ); 160 boolean isBoth = isBothAccumulatedAndStreamedPath( sourcePaths ); 161 162 if( isStreamed || isBoth ) 163 step.addStreamedSourceFor( (HashJoin) rhs, source ); 164 165 if( isAccumulated || isBoth ) 166 step.addAccumulatedSourceFor( (HashJoin) rhs, source ); 167 } 168 else if( rhs instanceof Pipe ) // add relevant traps to step 169 { 170 String name = ( (Pipe) rhs ).getName(); 171 172 // this is legacy, can probably now collapse into one collection safely 173 if( traps.containsKey( name ) ) 174 { 175 if( onMapSide ) 176 step.getMapperTraps().put( name, traps.get( name ) ); 177 else 178 step.getReducerTraps().put( name, traps.get( name ) ); 179 } 180 } 181 182 lhs = rhs; 183 } 184 } 185 } 186 187 private int countNumJobs( SimpleDirectedGraph<Tap, Integer> tapGraph ) 188 { 189 Set<Tap> vertices = tapGraph.vertexSet(); 190 int count = 0; 191 192 for( Tap vertex : vertices ) 193 { 194 if( tapGraph.inDegreeOf( vertex ) != 0 ) 195 count++; 196 } 197 198 return count; 199 } 200 }