001 /* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.flow.stream; 022 023 import java.util.Collection; 024 import java.util.List; 025 import java.util.Set; 026 import java.util.TreeSet; 027 028 import cascading.flow.FlowElement; 029 import cascading.flow.FlowProcess; 030 import cascading.flow.planner.BaseFlowStep; 031 import cascading.flow.planner.Scope; 032 import cascading.pipe.CoGroup; 033 import cascading.pipe.Each; 034 import cascading.pipe.Every; 035 import cascading.pipe.GroupBy; 036 import cascading.pipe.HashJoin; 037 import cascading.pipe.Merge; 038 import cascading.pipe.Pipe; 039 import cascading.pipe.Splice; 040 import cascading.tap.Tap; 041 import org.jgrapht.GraphPath; 042 043 import static cascading.flow.planner.ElementGraphs.getAllShortestPathsBetween; 044 045 /** 046 * 047 */ 048 public abstract class StepStreamGraph extends StreamGraph 049 { 050 protected FlowProcess flowProcess; 051 protected final BaseFlowStep step; 052 053 public StepStreamGraph( FlowProcess flowProcess, BaseFlowStep step ) 054 { 055 this.flowProcess = flowProcess; 056 this.step = step; 057 } 058 059 protected Object getProperty( String name ) 060 { 061 return flowProcess.getProperty( name ); 062 } 063 064 protected void handleDuct( FlowElement lhsElement, Duct lhsDuct ) 065 { 066 List<FlowElement> successors = step.getSuccessors( lhsElement ); 067 068 if( !stopOnElement( lhsElement, successors ) ) 069 handleSuccessors( lhsDuct, successors ); 070 else 071 addTail( lhsDuct ); 072 } 073 074 protected abstract boolean stopOnElement( FlowElement lhsElement, List<FlowElement> successors ); 075 076 private void handleSuccessors( Duct lhsDuct, List<FlowElement> successors ) 077 { 078 for( FlowElement rhsElement : successors ) 079 { 080 Duct newRhsDuct = createDuctFor( rhsElement ); 081 Duct rhsDuct = findExisting( newRhsDuct ); 082 083 int ordinal = findEdgeOrdinal( lhsDuct, rhsDuct ); 084 085 addPath( lhsDuct, ordinal, rhsDuct ); 086 087 if( rhsDuct != newRhsDuct ) // don't keep going if we have already seen rhs 088 continue; 089 090 handleDuct( rhsElement, rhsDuct ); 091 } 092 } 093 094 private int findEdgeOrdinal( Duct lhsDuct, Duct rhsDuct ) 095 { 096 if( !( rhsDuct instanceof SpliceGate ) ) 097 return 0; 098 099 FlowElement lhsElement = ( (ElementDuct) lhsDuct ).getFlowElement(); 100 Splice rhsElement = (Splice) ( (SpliceGate) rhsDuct ).getFlowElement(); 101 102 List<GraphPath<FlowElement, Scope>> paths = getAllShortestPathsBetween( step.getGraph(), lhsElement, rhsElement ); 103 104 for( GraphPath<FlowElement, Scope> path : paths ) 105 { 106 if( path.getEdgeList().size() == 1 ) 107 return rhsElement.getPipePos().get( path.getEdgeList().get( 0 ).getName() ); 108 } 109 110 throw new IllegalStateException( "could not find ordinal" ); 111 } 112 113 private Duct createDuctFor( FlowElement element ) 114 { 115 Duct rhsDuct; 116 117 if( element instanceof Each ) 118 { 119 Each eachElement = (Each) element; 120 121 if( eachElement.isFunction() ) 122 rhsDuct = new FunctionEachStage( flowProcess, eachElement ); 123 else if( eachElement.isFilter() ) 124 rhsDuct = new FilterEachStage( flowProcess, eachElement ); 125 else if( eachElement.isValueAssertion() ) 126 rhsDuct = new ValueAssertionEachStage( flowProcess, eachElement ); 127 else 128 throw new IllegalStateException( "unknown operation: " + eachElement.getOperation().getClass().getCanonicalName() ); 129 } 130 else if( element instanceof Every ) 131 { 132 Every everyElement = (Every) element; 133 134 if( everyElement.isBuffer() ) 135 rhsDuct = new BufferEveryWindow( flowProcess, everyElement ); 136 else if( everyElement.isAggregator() ) 137 rhsDuct = new AggregatorEveryStage( flowProcess, everyElement ); 138 else if( everyElement.isGroupAssertion() ) 139 rhsDuct = new GroupAssertionEveryStage( flowProcess, everyElement ); 140 else 141 throw new IllegalStateException( "unknown operation: " + everyElement.getOperation().getClass().getCanonicalName() ); 142 } 143 else if( element instanceof Splice ) 144 { 145 Splice spliceElement = (Splice) element; 146 147 if( spliceElement.isGroupBy() ) 148 rhsDuct = createGroupByGate( (GroupBy) spliceElement ); 149 else if( spliceElement.isCoGroup() ) 150 rhsDuct = createCoGroupGate( (CoGroup) spliceElement ); 151 else if( spliceElement.isMerge() ) 152 rhsDuct = createMergeStage( (Merge) element ); 153 else 154 rhsDuct = createHashJoinGate( (HashJoin) element ); 155 } 156 else if( element instanceof Tap ) 157 { 158 rhsDuct = createSinkStage( (Tap) element ); 159 } 160 else 161 throw new IllegalStateException( "unknown element type: " + element.getClass().getName() ); 162 163 return rhsDuct; 164 } 165 166 protected SinkStage createSinkStage( Tap element ) 167 { 168 return new SinkStage( flowProcess, element ); 169 } 170 171 protected abstract Gate createCoGroupGate( CoGroup element ); 172 173 protected abstract Gate createGroupByGate( GroupBy element ); 174 175 protected Duct createMergeStage( Merge merge ) 176 { 177 return new MergeStage( flowProcess, merge ); 178 } 179 180 protected Gate createHashJoinGate( HashJoin join ) 181 { 182 if( join.getNumSelfJoins() != 0 ) 183 return createBlockingJoinGate( join ); 184 185 // lets not block the streamed side unless it will cause a deadlock 186 if( joinHasSameStreamedSource( join ) ) 187 return createBlockingJoinGate( join ); 188 189 return createNonBlockingJoinGate( join ); 190 } 191 192 protected MemoryHashJoinGate createNonBlockingJoinGate( HashJoin join ) 193 { 194 return new MemoryHashJoinGate( flowProcess, join ); 195 } 196 197 protected MemoryCoGroupGate createBlockingJoinGate( HashJoin join ) 198 { 199 return new MemoryCoGroupGate( flowProcess, join ); 200 } 201 202 private boolean joinHasSameStreamedSource( HashJoin join ) 203 { 204 if( !step.getStreamedSourceByJoin().isEmpty() ) 205 { 206 // if streamed source is multi path 207 Object tap = step.getStreamedSourceByJoin().get( join ); 208 209 return getNumImmediateBranches( (FlowElement) tap, join ) > 1; 210 } 211 212 // means we are in local mode if joins is empty 213 for( Object tap : step.getSources() ) 214 { 215 if( getNumImmediateBranches( (FlowElement) tap, join ) > 1 ) 216 return true; 217 } 218 219 return false; 220 } 221 222 private int getNumImmediateBranches( FlowElement tap, HashJoin join ) 223 { 224 return getAllShortestPathsBetween( step.getGraph(), tap, join ).size(); 225 } 226 227 protected Duct findExisting( Duct current ) 228 { 229 Collection<Duct> allDucts = getAllDucts(); 230 231 for( Duct duct : allDucts ) 232 { 233 if( duct.equals( current ) ) 234 return duct; 235 } 236 237 return current; 238 } 239 240 protected void setTraps() 241 { 242 Collection<Duct> ducts = getAllDucts(); 243 244 for( Duct duct : ducts ) 245 { 246 if( !( duct instanceof ElementDuct ) ) 247 continue; 248 249 ElementDuct elementDuct = (ElementDuct) duct; 250 FlowElement flowElement = elementDuct.getFlowElement(); 251 252 Set<String> branchNames = new TreeSet<String>(); 253 254 if( flowElement instanceof Pipe ) 255 branchNames.add( ( (Pipe) flowElement ).getName() ); 256 else if( flowElement instanceof Tap ) 257 branchNames.addAll( getTapBranchNamesFor( duct ) ); 258 else 259 throw new IllegalStateException( "unexpected duct type" + duct.getClass().getCanonicalName() ); 260 261 elementDuct.setBranchNames( branchNames ); 262 263 for( String branchName : branchNames ) 264 { 265 Tap trap = step.getTrap( branchName ); 266 267 if( trap != null ) 268 { 269 FlowProcess elementFlowProcess = new ElementFlowProcess( flowProcess, trap.getConfigDef() ); 270 elementDuct.setTrapHandler( new TrapHandler( elementFlowProcess, flowElement, trap, branchName ) ); 271 break; 272 } 273 } 274 275 if( !elementDuct.hasTrapHandler() ) 276 elementDuct.setTrapHandler( new TrapHandler( flowProcess ) ); 277 } 278 } 279 280 /** 281 * Returns a Set as a given tap may be bound to multiple branches 282 * 283 * @param duct 284 * @return 285 */ 286 private Set<String> getTapBranchNamesFor( Duct duct ) 287 { 288 if( duct instanceof SourceStage ) 289 return step.getSourceName( (Tap) ( (SourceStage) duct ).getFlowElement() ); 290 else if( duct instanceof SinkStage ) 291 return step.getSinkName( (Tap) ( (SinkStage) duct ).getFlowElement() ); 292 else 293 throw new IllegalStateException( "duct does not wrap a Tap: " + duct.getClass().getCanonicalName() ); 294 } 295 296 protected void setScopes() 297 { 298 Collection<Duct> ducts = getAllDucts(); 299 300 for( Duct duct : ducts ) 301 { 302 if( !( duct instanceof ElementDuct ) ) 303 continue; 304 305 ElementDuct elementDuct = (ElementDuct) duct; 306 307 elementDuct.getIncomingScopes().addAll( step.getPreviousScopes( elementDuct.getFlowElement() ) ); 308 elementDuct.getOutgoingScopes().addAll( step.getNextScopes( elementDuct.getFlowElement() ) ); 309 } 310 } 311 }