001/* 002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.flow.stream.graph; 022 023import java.util.Collection; 024import java.util.Collections; 025import java.util.List; 026import java.util.Set; 027import java.util.TreeSet; 028 029import cascading.flow.FlowElement; 030import cascading.flow.FlowNode; 031import cascading.flow.FlowProcess; 032import cascading.flow.planner.Scope; 033import cascading.flow.planner.graph.AnnotatedGraph; 034import cascading.flow.planner.graph.ElementGraph; 035import cascading.flow.planner.graph.Extent; 036import cascading.flow.stream.annotations.BlockingMode; 037import cascading.flow.stream.duct.Duct; 038import cascading.flow.stream.duct.Gate; 039import cascading.flow.stream.element.AggregatorEveryStage; 040import cascading.flow.stream.element.BufferEveryWindow; 041import cascading.flow.stream.element.ElementDuct; 042import cascading.flow.stream.element.ElementFlowProcess; 043import cascading.flow.stream.element.FilterEachStage; 044import cascading.flow.stream.element.FunctionEachStage; 045import cascading.flow.stream.element.GroupAssertionEveryStage; 046import cascading.flow.stream.element.GroupingSpliceGate; 047import cascading.flow.stream.element.MemoryCoGroupGate; 048import cascading.flow.stream.element.MemoryHashJoinGate; 049import cascading.flow.stream.element.MergeStage; 050import cascading.flow.stream.element.SinkStage; 051import cascading.flow.stream.element.SourceStage; 052import cascading.flow.stream.element.TrapHandler; 053import cascading.flow.stream.element.ValueAssertionEachStage; 054import cascading.pipe.Boundary; 055import cascading.pipe.CoGroup; 056import cascading.pipe.Each; 057import cascading.pipe.Every; 058import cascading.pipe.GroupBy; 059import cascading.pipe.HashJoin; 060import cascading.pipe.Merge; 061import cascading.pipe.Pipe; 062import cascading.pipe.Splice; 063import cascading.tap.Tap; 064import cascading.util.Util; 065 066/** 067 * 068 */ 069public abstract class NodeStreamGraph extends StreamGraph 070 { 071 protected FlowProcess flowProcess; 072 protected final FlowNode node; 073 protected FlowElement streamedSource; 074 protected final ElementGraph elementGraph; 075 076 public NodeStreamGraph( FlowProcess flowProcess, FlowNode node ) 077 { 078 this.flowProcess = flowProcess; 079 this.node = node; 080 this.elementGraph = node.getElementGraph(); 081 } 082 083 public NodeStreamGraph( FlowProcess flowProcess, FlowNode node, FlowElement streamedSource ) 084 { 085 this.flowProcess = flowProcess; 086 this.node = node; 087 this.elementGraph = streamedSource == null ? node.getElementGraph() : node.getPipelineGraphFor( streamedSource ); 088 this.streamedSource = streamedSource; 089 } 090 091 protected Object getProperty( String name ) 092 { 093 return flowProcess.getProperty( name ); 094 } 095 096 protected void handleDuct( FlowElement lhsElement, Duct lhsDuct ) 097 { 098 List<FlowElement> successors = elementGraph.successorListOf( lhsElement ); 099 100 if( successors.contains( Extent.tail ) ) 101 addTail( lhsDuct ); 102 else 103 handleSuccessors( lhsDuct, successors ); 104 } 105 106 private void handleSuccessors( Duct lhsDuct, List<FlowElement> successors ) 107 { 108 for( FlowElement rhsElement : successors ) 109 { 110 if( rhsElement instanceof Extent ) 111 continue; 112 113 boolean isSink = elementGraph.successorListOf( rhsElement ).contains( Extent.tail ); 114 boolean isSource = elementGraph.predecessorListOf( rhsElement ).contains( Extent.head ); 115 116 IORole role = IORole.pass; 117 118 if( isSource && !isSink ) 119 role = IORole.source; 120 else if( !isSource && isSink ) 121 role = IORole.sink; 122 else if( isSource && isSink ) 123 role = IORole.both; 124 125 Duct newRhsDuct = createDuctFor( rhsElement, role ); 126 Duct rhsDuct = findExisting( newRhsDuct ); 127 128 int ordinal = findEdgeOrdinal( lhsDuct, rhsDuct ); 129 130 addPath( lhsDuct, ordinal, rhsDuct ); 131 132 if( rhsDuct != newRhsDuct ) // don't keep going if we have already seen rhs 133 continue; 134 135 handleDuct( rhsElement, rhsDuct ); 136 } 137 } 138 139 private int findEdgeOrdinal( Duct lhsDuct, Duct rhsDuct ) 140 { 141 if( !( rhsDuct instanceof GroupingSpliceGate ) ) 142 return 0; 143 144 FlowElement lhsElement = ( (ElementDuct) lhsDuct ).getFlowElement(); 145 FlowElement rhsElement = ( (ElementDuct) rhsDuct ).getFlowElement(); 146 147 Set<Scope> allEdges = elementGraph.getAllEdges( lhsElement, rhsElement ); 148 149 if( allEdges.size() == 1 ) 150 return Util.getFirst( allEdges ).getOrdinal(); 151 152 throw new IllegalStateException( "could not find ordinal, too many edges between elements" ); 153 } 154 155 private Duct createDuctFor( FlowElement element, IORole role ) 156 { 157 Duct rhsDuct; 158 159 if( element instanceof Each ) 160 { 161 Each eachElement = (Each) element; 162 163 if( eachElement.isFunction() ) 164 rhsDuct = new FunctionEachStage( flowProcess, eachElement ); 165 else if( eachElement.isFilter() ) 166 rhsDuct = new FilterEachStage( flowProcess, eachElement ); 167 else if( eachElement.isValueAssertion() ) 168 rhsDuct = new ValueAssertionEachStage( flowProcess, eachElement ); 169 else 170 throw new IllegalStateException( "unknown operation: " + eachElement.getOperation().getClass().getCanonicalName() ); 171 } 172 else if( element instanceof Every ) 173 { 174 Every everyElement = (Every) element; 175 176 if( everyElement.isBuffer() ) 177 rhsDuct = new BufferEveryWindow( flowProcess, everyElement ); 178 else if( everyElement.isAggregator() ) 179 rhsDuct = new AggregatorEveryStage( flowProcess, everyElement ); 180 else if( everyElement.isGroupAssertion() ) 181 rhsDuct = new GroupAssertionEveryStage( flowProcess, everyElement ); 182 else 183 throw new IllegalStateException( "unknown operation: " + everyElement.getOperation().getClass().getCanonicalName() ); 184 } 185 else if( element instanceof Boundary ) 186 { 187 rhsDuct = createBoundaryStage( (Boundary) element, role ); 188 } 189 else if( element instanceof Splice ) 190 { 191 Splice spliceElement = (Splice) element; 192 193 if( spliceElement.isGroupBy() ) 194 rhsDuct = createGroupByGate( (GroupBy) spliceElement, role ); 195 else if( spliceElement.isCoGroup() ) 196 rhsDuct = createCoGroupGate( (CoGroup) spliceElement, role ); 197 else if( spliceElement.isMerge() ) 198 rhsDuct = createMergeStage( (Merge) element, role ); 199 else 200 rhsDuct = createHashJoinGate( (HashJoin) element ); 201 } 202 else if( element instanceof Tap ) 203 { 204 rhsDuct = createSinkStage( (Tap) element ); 205 } 206 else 207 throw new IllegalStateException( "unknown element type: " + element.getClass().getName() ); 208 209 return rhsDuct; 210 } 211 212 protected Duct createBoundaryStage( Boundary element, IORole role ) 213 { 214 // could return MergeStage at this point as they are roughly equivalent 215 throw new UnsupportedOperationException( "boundary not supported by planner" ); 216 } 217 218 protected SinkStage createSinkStage( Tap element ) 219 { 220 return new SinkStage( flowProcess, element ); 221 } 222 223 protected abstract Gate createCoGroupGate( CoGroup element, IORole role ); 224 225 protected abstract Gate createGroupByGate( GroupBy element, IORole role ); 226 227 protected Duct createMergeStage( Merge merge, IORole both ) 228 { 229 return new MergeStage( flowProcess, merge ); 230 } 231 232 protected Gate createHashJoinGate( HashJoin join ) 233 { 234 if( join.getNumSelfJoins() != 0 ) 235 return createBlockingJoinGate( join ); 236 237 // lets not block the streamed side unless it will cause a deadlock 238 if( hasElementAnnotation( BlockingMode.Blocked, join ) ) 239 return createBlockingJoinGate( join ); 240 241 return createNonBlockingJoinGate( join ); 242 } 243 244 private boolean hasElementAnnotation( Enum annotation, FlowElement flowElement ) 245 { 246 if( !( (AnnotatedGraph) elementGraph ).hasAnnotations() ) 247 return false; 248 249 return ( (AnnotatedGraph) elementGraph ).getAnnotations().hadKey( annotation, flowElement ); 250 } 251 252 protected GroupingSpliceGate createNonBlockingJoinGate( HashJoin join ) 253 { 254 return new MemoryHashJoinGate( flowProcess, join ); 255 } 256 257 protected MemoryCoGroupGate createBlockingJoinGate( HashJoin join ) 258 { 259 return new MemoryCoGroupGate( flowProcess, join ); 260 } 261 262 protected Duct findExisting( Duct current ) 263 { 264 Collection<Duct> allDucts = getAllDucts(); 265 266 for( Duct duct : allDucts ) 267 { 268 if( duct.equals( current ) ) 269 return duct; 270 } 271 272 return current; 273 } 274 275 protected void setTraps() 276 { 277 Collection<Duct> ducts = getAllDucts(); 278 279 for( Duct duct : ducts ) 280 { 281 if( !( duct instanceof ElementDuct ) ) 282 continue; 283 284 ElementDuct elementDuct = (ElementDuct) duct; 285 FlowElement flowElement = elementDuct.getFlowElement(); 286 287 Set<String> branchNames = new TreeSet<String>(); 288 289 if( flowElement instanceof Pipe ) 290 branchNames.add( ( (Pipe) flowElement ).getName() ); 291 else if( flowElement instanceof Tap ) 292 branchNames.addAll( getTapBranchNamesFor( duct ) ); 293 else 294 throw new IllegalStateException( "unexpected duct type" + duct.getClass().getCanonicalName() ); 295 296 elementDuct.setBranchNames( branchNames ); 297 298 for( String branchName : branchNames ) 299 { 300 Tap trap = node.getTrap( branchName ); 301 302 if( trap != null ) 303 { 304 FlowProcess elementFlowProcess = new ElementFlowProcess( flowProcess, trap.getConfigDef() ); 305 elementDuct.setTrapHandler( new TrapHandler( elementFlowProcess, flowElement, trap, branchName ) ); 306 break; 307 } 308 } 309 310 if( !elementDuct.hasTrapHandler() ) 311 elementDuct.setTrapHandler( new TrapHandler( flowProcess ) ); 312 } 313 } 314 315 /** 316 * Returns a Set as a given tap may be bound to multiple branches 317 * 318 * @param duct 319 * @return 320 */ 321 private Set<String> getTapBranchNamesFor( Duct duct ) 322 { 323 if( ( (Tap) ( (ElementDuct) duct ).getFlowElement() ).isTemporary() ) 324 return Collections.emptySet(); 325 326 if( duct instanceof SourceStage ) 327 return node.getSourceTapNames( (Tap) ( (SourceStage) duct ).getFlowElement() ); 328 else if( duct instanceof SinkStage ) 329 return node.getSinkTapNames( (Tap) ( (SinkStage) duct ).getFlowElement() ); 330 else 331 throw new IllegalStateException( "duct does not wrap a Tap: " + duct.getClass().getCanonicalName() ); 332 } 333 334 protected void setScopes() 335 { 336 Collection<Duct> ducts = getAllDucts(); 337 338 for( Duct duct : ducts ) 339 { 340 if( !( duct instanceof ElementDuct ) ) 341 continue; 342 343 ElementDuct elementDuct = (ElementDuct) duct; 344 345 // get the actual incoming/outgoing scopes for the full node as we need the total number of branches 346 elementDuct.getIncomingScopes().addAll( node.getPreviousScopes( elementDuct.getFlowElement() ) ); 347 elementDuct.getOutgoingScopes().addAll( node.getNextScopes( elementDuct.getFlowElement() ) ); 348 } 349 } 350 }