001/* 002 * Copyright (c) 2016 Chris K Wensel <chris@wensel.net>. All Rights Reserved. 003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 004 * 005 * Project and contact information: http://www.cascading.org/ 006 * 007 * This file is part of the Cascading project. 008 * 009 * Licensed under the Apache License, Version 2.0 (the "License"); 010 * you may not use this file except in compliance with the License. 011 * You may obtain a copy of the License at 012 * 013 * http://www.apache.org/licenses/LICENSE-2.0 014 * 015 * Unless required by applicable law or agreed to in writing, software 016 * distributed under the License is distributed on an "AS IS" BASIS, 017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 018 * See the License for the specific language governing permissions and 019 * limitations under the License. 020 */ 021 022package cascading.flow.stream.graph; 023 024import java.util.Collection; 025import java.util.Collections; 026import java.util.List; 027import java.util.Set; 028import java.util.TreeSet; 029 030import cascading.flow.FlowElement; 031import cascading.flow.FlowNode; 032import cascading.flow.FlowProcess; 033import cascading.flow.planner.Scope; 034import cascading.flow.planner.graph.AnnotatedGraph; 035import cascading.flow.planner.graph.ElementGraph; 036import cascading.flow.planner.graph.Extent; 037import cascading.flow.stream.annotations.BlockingMode; 038import cascading.flow.stream.duct.Duct; 039import cascading.flow.stream.duct.Gate; 040import cascading.flow.stream.element.AggregatorEveryStage; 041import cascading.flow.stream.element.BufferEveryWindow; 042import cascading.flow.stream.element.ElementDuct; 043import cascading.flow.stream.element.ElementFlowProcess; 044import cascading.flow.stream.element.FilterEachStage; 045import cascading.flow.stream.element.FunctionEachStage; 046import cascading.flow.stream.element.GroupAssertionEveryStage; 047import cascading.flow.stream.element.GroupingSpliceGate; 048import cascading.flow.stream.element.MemoryCoGroupGate; 049import cascading.flow.stream.element.MemoryHashJoinGate; 050import cascading.flow.stream.element.MergeStage; 051import cascading.flow.stream.element.SinkStage; 052import cascading.flow.stream.element.SourceStage; 053import cascading.flow.stream.element.TrapHandler; 054import cascading.flow.stream.element.ValueAssertionEachStage; 055import cascading.pipe.Boundary; 056import cascading.pipe.CoGroup; 057import cascading.pipe.Each; 058import cascading.pipe.Every; 059import cascading.pipe.GroupBy; 060import cascading.pipe.HashJoin; 061import cascading.pipe.Merge; 062import cascading.pipe.Pipe; 063import cascading.pipe.Splice; 064import cascading.tap.Tap; 065import cascading.util.Util; 066 067/** 068 * 069 */ 070public abstract class NodeStreamGraph extends StreamGraph 071 { 072 protected FlowProcess flowProcess; 073 protected final FlowNode node; 074 protected FlowElement streamedSource; 075 protected final ElementGraph elementGraph; 076 077 public NodeStreamGraph( FlowProcess flowProcess, FlowNode node ) 078 { 079 this.flowProcess = flowProcess; 080 this.node = node; 081 this.elementGraph = node.getElementGraph(); 082 } 083 084 public NodeStreamGraph( FlowProcess flowProcess, FlowNode node, FlowElement streamedSource ) 085 { 086 this.flowProcess = flowProcess; 087 this.node = node; 088 this.elementGraph = streamedSource == null ? node.getElementGraph() : node.getPipelineGraphFor( streamedSource ); 089 this.streamedSource = streamedSource; 090 } 091 092 protected Object getProperty( String name ) 093 { 094 return flowProcess.getProperty( name ); 095 } 096 097 protected void handleDuct( FlowElement lhsElement, Duct lhsDuct ) 098 { 099 List<FlowElement> successors = elementGraph.successorListOf( lhsElement ); 100 101 if( successors.contains( Extent.tail ) ) 102 addTail( lhsDuct ); 103 else 104 handleSuccessors( lhsElement, lhsDuct, successors ); 105 } 106 107 private void handleSuccessors( FlowElement lhsElement, Duct lhsDuct, List<FlowElement> successors ) 108 { 109 for( FlowElement rhsElement : Util.createIdentitySet( successors ) ) 110 { 111 if( rhsElement instanceof Extent ) 112 continue; 113 114 boolean isSink = elementGraph.successorListOf( rhsElement ).contains( Extent.tail ); 115 boolean isSource = elementGraph.predecessorListOf( rhsElement ).contains( Extent.head ); 116 117 IORole role = IORole.pass; 118 119 if( isSource && !isSink ) 120 role = IORole.source; 121 else if( !isSource && isSink ) 122 role = IORole.sink; 123 else if( isSource && isSink ) 124 role = IORole.both; 125 126 Duct newRhsDuct = createDuctFor( rhsElement, role ); 127 Duct rhsDuct = findExisting( newRhsDuct ); 128 129 Set<Scope> allEdges = elementGraph.getAllEdges( lhsElement, rhsElement ); 130 131 for( Scope scope : allEdges ) 132 { 133 int ordinal = scope.getOrdinal(); 134 135 addPath( lhsDuct, ordinal, rhsDuct ); 136 } 137 138 if( rhsDuct != newRhsDuct ) // don't keep going if we have already seen rhs 139 continue; 140 141 handleDuct( rhsElement, rhsDuct ); 142 } 143 } 144 145 private Duct createDuctFor( FlowElement element, IORole role ) 146 { 147 Duct rhsDuct; 148 149 if( element instanceof Each ) 150 { 151 Each eachElement = (Each) element; 152 153 if( eachElement.isFunction() ) 154 rhsDuct = new FunctionEachStage( flowProcess, eachElement ); 155 else if( eachElement.isFilter() ) 156 rhsDuct = new FilterEachStage( flowProcess, eachElement ); 157 else if( eachElement.isValueAssertion() ) 158 rhsDuct = new ValueAssertionEachStage( flowProcess, eachElement ); 159 else 160 throw new IllegalStateException( "unknown operation: " + eachElement.getOperation().getClass().getCanonicalName() ); 161 } 162 else if( element instanceof Every ) 163 { 164 Every everyElement = (Every) element; 165 166 if( everyElement.isBuffer() ) 167 rhsDuct = new BufferEveryWindow( flowProcess, everyElement ); 168 else if( everyElement.isAggregator() ) 169 rhsDuct = new AggregatorEveryStage( flowProcess, everyElement ); 170 else if( everyElement.isGroupAssertion() ) 171 rhsDuct = new GroupAssertionEveryStage( flowProcess, everyElement ); 172 else 173 throw new IllegalStateException( "unknown operation: " + everyElement.getOperation().getClass().getCanonicalName() ); 174 } 175 else if( element instanceof Boundary ) 176 { 177 rhsDuct = createBoundaryStage( (Boundary) element, role ); 178 } 179 else if( element instanceof Splice ) 180 { 181 Splice spliceElement = (Splice) element; 182 183 if( spliceElement.isGroupBy() ) 184 rhsDuct = createGroupByGate( (GroupBy) spliceElement, role ); 185 else if( spliceElement.isCoGroup() ) 186 rhsDuct = createCoGroupGate( (CoGroup) spliceElement, role ); 187 else if( spliceElement.isMerge() ) 188 rhsDuct = createMergeStage( (Merge) element, role ); 189 else 190 rhsDuct = createHashJoinGate( (HashJoin) element ); 191 } 192 else if( element instanceof Tap ) 193 { 194 rhsDuct = createSinkStage( (Tap) element ); 195 } 196 else 197 throw new IllegalStateException( "unknown element type: " + element.getClass().getName() ); 198 199 return rhsDuct; 200 } 201 202 protected Duct createBoundaryStage( Boundary element, IORole role ) 203 { 204 // could return MergeStage at this point as they are roughly equivalent 205 throw new UnsupportedOperationException( "boundary not supported by planner" ); 206 } 207 208 protected SinkStage createSinkStage( Tap element ) 209 { 210 return new SinkStage( flowProcess, element ); 211 } 212 213 protected abstract Gate createCoGroupGate( CoGroup element, IORole role ); 214 215 protected abstract Gate createGroupByGate( GroupBy element, IORole role ); 216 217 protected Duct createMergeStage( Merge merge, IORole both ) 218 { 219 return new MergeStage( flowProcess, merge ); 220 } 221 222 protected Gate createHashJoinGate( HashJoin join ) 223 { 224 if( join.getNumSelfJoins() != 0 ) 225 return createBlockingJoinGate( join ); 226 227 // lets not block the streamed side unless it will cause a deadlock 228 if( hasElementAnnotation( BlockingMode.Blocked, join ) ) 229 return createBlockingJoinGate( join ); 230 231 return createNonBlockingJoinGate( join ); 232 } 233 234 private boolean hasElementAnnotation( Enum annotation, FlowElement flowElement ) 235 { 236 if( !( (AnnotatedGraph) elementGraph ).hasAnnotations() ) 237 return false; 238 239 return ( (AnnotatedGraph) elementGraph ).getAnnotations().hadKey( annotation, flowElement ); 240 } 241 242 protected GroupingSpliceGate createNonBlockingJoinGate( HashJoin join ) 243 { 244 return new MemoryHashJoinGate( flowProcess, join ); 245 } 246 247 protected MemoryCoGroupGate createBlockingJoinGate( HashJoin join ) 248 { 249 return new MemoryCoGroupGate( flowProcess, join ); 250 } 251 252 protected Duct findExisting( Duct current ) 253 { 254 Collection<Duct> allDucts = getAllDucts(); 255 256 for( Duct duct : allDucts ) 257 { 258 if( duct.equals( current ) ) 259 return duct; 260 } 261 262 return current; 263 } 264 265 protected void setTraps() 266 { 267 Collection<Duct> ducts = getAllDucts(); 268 269 for( Duct duct : ducts ) 270 { 271 if( !( duct instanceof ElementDuct ) ) 272 continue; 273 274 ElementDuct elementDuct = (ElementDuct) duct; 275 FlowElement flowElement = elementDuct.getFlowElement(); 276 277 Set<String> branchNames = new TreeSet<String>(); 278 279 if( flowElement instanceof Pipe ) 280 branchNames.add( ( (Pipe) flowElement ).getName() ); 281 else if( flowElement instanceof Tap ) 282 branchNames.addAll( getTapBranchNamesFor( duct ) ); 283 else 284 throw new IllegalStateException( "unexpected duct type" + duct.getClass().getCanonicalName() ); 285 286 elementDuct.setBranchNames( branchNames ); 287 288 for( String branchName : branchNames ) 289 { 290 Tap trap = node.getTrap( branchName ); 291 292 if( trap != null ) 293 { 294 FlowProcess elementFlowProcess = new ElementFlowProcess( flowProcess, trap.getConfigDef() ); 295 elementDuct.setTrapHandler( new TrapHandler( elementFlowProcess, flowElement, trap, branchName ) ); 296 break; 297 } 298 } 299 300 if( !elementDuct.hasTrapHandler() ) 301 elementDuct.setTrapHandler( new TrapHandler( flowProcess ) ); 302 } 303 } 304 305 /** 306 * Returns a Set as a given tap may be bound to multiple branches 307 * 308 * @param duct 309 * @return 310 */ 311 private Set<String> getTapBranchNamesFor( Duct duct ) 312 { 313 if( ( (Tap) ( (ElementDuct) duct ).getFlowElement() ).isTemporary() ) 314 return Collections.emptySet(); 315 316 if( duct instanceof SourceStage ) 317 return node.getSourceTapNames( (Tap) ( (SourceStage) duct ).getFlowElement() ); 318 else if( duct instanceof SinkStage ) 319 return node.getSinkTapNames( (Tap) ( (SinkStage) duct ).getFlowElement() ); 320 else 321 throw new IllegalStateException( "duct does not wrap a Tap: " + duct.getClass().getCanonicalName() ); 322 } 323 324 protected void setScopes() 325 { 326 Collection<Duct> ducts = getAllDucts(); 327 328 for( Duct duct : ducts ) 329 { 330 if( !( duct instanceof ElementDuct ) ) 331 continue; 332 333 ElementDuct elementDuct = (ElementDuct) duct; 334 335 // get the actual incoming/outgoing scopes for the full node as we need the total number of branches 336 elementDuct.getIncomingScopes().addAll( node.getPreviousScopes( elementDuct.getFlowElement() ) ); 337 elementDuct.getOutgoingScopes().addAll( node.getNextScopes( elementDuct.getFlowElement() ) ); 338 } 339 } 340 }