001/*
002 * Copyright (c) 2016 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
004 *
005 * Project and contact information: http://www.cascading.org/
006 *
007 * This file is part of the Cascading project.
008 *
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *     http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 */
021
022package cascading.flow.stream.graph;
023
024import java.util.Collection;
025import java.util.Collections;
026import java.util.List;
027import java.util.Set;
028import java.util.TreeSet;
029
030import cascading.flow.FlowElement;
031import cascading.flow.FlowNode;
032import cascading.flow.FlowProcess;
033import cascading.flow.planner.Scope;
034import cascading.flow.planner.graph.AnnotatedGraph;
035import cascading.flow.planner.graph.ElementGraph;
036import cascading.flow.planner.graph.Extent;
037import cascading.flow.stream.annotations.BlockingMode;
038import cascading.flow.stream.duct.Duct;
039import cascading.flow.stream.duct.Gate;
040import cascading.flow.stream.element.AggregatorEveryStage;
041import cascading.flow.stream.element.BufferEveryWindow;
042import cascading.flow.stream.element.ElementDuct;
043import cascading.flow.stream.element.ElementFlowProcess;
044import cascading.flow.stream.element.FilterEachStage;
045import cascading.flow.stream.element.FunctionEachStage;
046import cascading.flow.stream.element.GroupAssertionEveryStage;
047import cascading.flow.stream.element.GroupingSpliceGate;
048import cascading.flow.stream.element.MemoryCoGroupGate;
049import cascading.flow.stream.element.MemoryHashJoinGate;
050import cascading.flow.stream.element.MergeStage;
051import cascading.flow.stream.element.SinkStage;
052import cascading.flow.stream.element.SourceStage;
053import cascading.flow.stream.element.TrapHandler;
054import cascading.flow.stream.element.ValueAssertionEachStage;
055import cascading.pipe.Boundary;
056import cascading.pipe.CoGroup;
057import cascading.pipe.Each;
058import cascading.pipe.Every;
059import cascading.pipe.GroupBy;
060import cascading.pipe.HashJoin;
061import cascading.pipe.Merge;
062import cascading.pipe.Pipe;
063import cascading.pipe.Splice;
064import cascading.tap.Tap;
065import cascading.util.Util;
066
067/**
068 *
069 */
070public abstract class NodeStreamGraph extends StreamGraph
071  {
072  protected FlowProcess flowProcess;
073  protected final FlowNode node;
074  protected FlowElement streamedSource;
075  protected final ElementGraph elementGraph;
076
077  public NodeStreamGraph( FlowProcess flowProcess, FlowNode node )
078    {
079    this.flowProcess = flowProcess;
080    this.node = node;
081    this.elementGraph = node.getElementGraph();
082    }
083
084  public NodeStreamGraph( FlowProcess flowProcess, FlowNode node, FlowElement streamedSource )
085    {
086    this.flowProcess = flowProcess;
087    this.node = node;
088    this.elementGraph = streamedSource == null ? node.getElementGraph() : node.getPipelineGraphFor( streamedSource );
089    this.streamedSource = streamedSource;
090    }
091
092  protected Object getProperty( String name )
093    {
094    return flowProcess.getProperty( name );
095    }
096
097  protected void handleDuct( FlowElement lhsElement, Duct lhsDuct )
098    {
099    List<FlowElement> successors = elementGraph.successorListOf( lhsElement );
100
101    if( successors.contains( Extent.tail ) )
102      addTail( lhsDuct );
103    else
104      handleSuccessors( lhsElement, lhsDuct, successors );
105    }
106
107  private void handleSuccessors( FlowElement lhsElement, Duct lhsDuct, List<FlowElement> successors )
108    {
109    for( FlowElement rhsElement : Util.createIdentitySet( successors ) )
110      {
111      if( rhsElement instanceof Extent )
112        continue;
113
114      boolean isSink = elementGraph.successorListOf( rhsElement ).contains( Extent.tail );
115      boolean isSource = elementGraph.predecessorListOf( rhsElement ).contains( Extent.head );
116
117      IORole role = IORole.pass;
118
119      if( isSource && !isSink )
120        role = IORole.source;
121      else if( !isSource && isSink )
122        role = IORole.sink;
123      else if( isSource && isSink )
124        role = IORole.both;
125
126      Duct newRhsDuct = createDuctFor( rhsElement, role );
127      Duct rhsDuct = findExisting( newRhsDuct );
128
129      Set<Scope> allEdges = elementGraph.getAllEdges( lhsElement, rhsElement );
130
131      for( Scope scope : allEdges )
132        {
133        int ordinal = scope.getOrdinal();
134
135        addPath( lhsDuct, ordinal, rhsDuct );
136        }
137
138      if( rhsDuct != newRhsDuct ) // don't keep going if we have already seen rhs
139        continue;
140
141      handleDuct( rhsElement, rhsDuct );
142      }
143    }
144
145  private Duct createDuctFor( FlowElement element, IORole role )
146    {
147    Duct rhsDuct;
148
149    if( element instanceof Each )
150      {
151      Each eachElement = (Each) element;
152
153      if( eachElement.isFunction() )
154        rhsDuct = new FunctionEachStage( flowProcess, eachElement );
155      else if( eachElement.isFilter() )
156        rhsDuct = new FilterEachStage( flowProcess, eachElement );
157      else if( eachElement.isValueAssertion() )
158        rhsDuct = new ValueAssertionEachStage( flowProcess, eachElement );
159      else
160        throw new IllegalStateException( "unknown operation: " + eachElement.getOperation().getClass().getCanonicalName() );
161      }
162    else if( element instanceof Every )
163      {
164      Every everyElement = (Every) element;
165
166      if( everyElement.isBuffer() )
167        rhsDuct = new BufferEveryWindow( flowProcess, everyElement );
168      else if( everyElement.isAggregator() )
169        rhsDuct = new AggregatorEveryStage( flowProcess, everyElement );
170      else if( everyElement.isGroupAssertion() )
171        rhsDuct = new GroupAssertionEveryStage( flowProcess, everyElement );
172      else
173        throw new IllegalStateException( "unknown operation: " + everyElement.getOperation().getClass().getCanonicalName() );
174      }
175    else if( element instanceof Boundary )
176      {
177      rhsDuct = createBoundaryStage( (Boundary) element, role );
178      }
179    else if( element instanceof Splice )
180      {
181      Splice spliceElement = (Splice) element;
182
183      if( spliceElement.isGroupBy() )
184        rhsDuct = createGroupByGate( (GroupBy) spliceElement, role );
185      else if( spliceElement.isCoGroup() )
186        rhsDuct = createCoGroupGate( (CoGroup) spliceElement, role );
187      else if( spliceElement.isMerge() )
188        rhsDuct = createMergeStage( (Merge) element, role );
189      else
190        rhsDuct = createHashJoinGate( (HashJoin) element );
191      }
192    else if( element instanceof Tap )
193      {
194      rhsDuct = createSinkStage( (Tap) element );
195      }
196    else
197      throw new IllegalStateException( "unknown element type: " + element.getClass().getName() );
198
199    return rhsDuct;
200    }
201
202  protected Duct createBoundaryStage( Boundary element, IORole role )
203    {
204    // could return MergeStage at this point as they are roughly equivalent
205    throw new UnsupportedOperationException( "boundary not supported by planner" );
206    }
207
208  protected SinkStage createSinkStage( Tap element )
209    {
210    return new SinkStage( flowProcess, element );
211    }
212
213  protected abstract Gate createCoGroupGate( CoGroup element, IORole role );
214
215  protected abstract Gate createGroupByGate( GroupBy element, IORole role );
216
217  protected Duct createMergeStage( Merge merge, IORole both )
218    {
219    return new MergeStage( flowProcess, merge );
220    }
221
222  protected Gate createHashJoinGate( HashJoin join )
223    {
224    if( join.getNumSelfJoins() != 0 )
225      return createBlockingJoinGate( join );
226
227    // lets not block the streamed side unless it will cause a deadlock
228    if( hasElementAnnotation( BlockingMode.Blocked, join ) )
229      return createBlockingJoinGate( join );
230
231    return createNonBlockingJoinGate( join );
232    }
233
234  private boolean hasElementAnnotation( Enum annotation, FlowElement flowElement )
235    {
236    if( !( (AnnotatedGraph) elementGraph ).hasAnnotations() )
237      return false;
238
239    return ( (AnnotatedGraph) elementGraph ).getAnnotations().hadKey( annotation, flowElement );
240    }
241
242  protected GroupingSpliceGate createNonBlockingJoinGate( HashJoin join )
243    {
244    return new MemoryHashJoinGate( flowProcess, join );
245    }
246
247  protected MemoryCoGroupGate createBlockingJoinGate( HashJoin join )
248    {
249    return new MemoryCoGroupGate( flowProcess, join );
250    }
251
252  protected Duct findExisting( Duct current )
253    {
254    Collection<Duct> allDucts = getAllDucts();
255
256    for( Duct duct : allDucts )
257      {
258      if( duct.equals( current ) )
259        return duct;
260      }
261
262    return current;
263    }
264
265  protected void setTraps()
266    {
267    Collection<Duct> ducts = getAllDucts();
268
269    for( Duct duct : ducts )
270      {
271      if( !( duct instanceof ElementDuct ) )
272        continue;
273
274      ElementDuct elementDuct = (ElementDuct) duct;
275      FlowElement flowElement = elementDuct.getFlowElement();
276
277      Set<String> branchNames = new TreeSet<String>();
278
279      if( flowElement instanceof Pipe )
280        branchNames.add( ( (Pipe) flowElement ).getName() );
281      else if( flowElement instanceof Tap )
282        branchNames.addAll( getTapBranchNamesFor( duct ) );
283      else
284        throw new IllegalStateException( "unexpected duct type" + duct.getClass().getCanonicalName() );
285
286      elementDuct.setBranchNames( branchNames );
287
288      for( String branchName : branchNames )
289        {
290        Tap trap = node.getTrap( branchName );
291
292        if( trap != null )
293          {
294          FlowProcess elementFlowProcess = new ElementFlowProcess( flowProcess, trap.getConfigDef() );
295          elementDuct.setTrapHandler( new TrapHandler( elementFlowProcess, flowElement, trap, branchName ) );
296          break;
297          }
298        }
299
300      if( !elementDuct.hasTrapHandler() )
301        elementDuct.setTrapHandler( new TrapHandler( flowProcess ) );
302      }
303    }
304
305  /**
306   * Returns a Set as a given tap may be bound to multiple branches
307   *
308   * @param duct
309   * @return
310   */
311  private Set<String> getTapBranchNamesFor( Duct duct )
312    {
313    if( ( (Tap) ( (ElementDuct) duct ).getFlowElement() ).isTemporary() )
314      return Collections.emptySet();
315
316    if( duct instanceof SourceStage )
317      return node.getSourceTapNames( (Tap) ( (SourceStage) duct ).getFlowElement() );
318    else if( duct instanceof SinkStage )
319      return node.getSinkTapNames( (Tap) ( (SinkStage) duct ).getFlowElement() );
320    else
321      throw new IllegalStateException( "duct does not wrap a Tap: " + duct.getClass().getCanonicalName() );
322    }
323
324  protected void setScopes()
325    {
326    Collection<Duct> ducts = getAllDucts();
327
328    for( Duct duct : ducts )
329      {
330      if( !( duct instanceof ElementDuct ) )
331        continue;
332
333      ElementDuct elementDuct = (ElementDuct) duct;
334
335      // get the actual incoming/outgoing scopes for the full node as we need the total number of branches
336      elementDuct.getIncomingScopes().addAll( node.getPreviousScopes( elementDuct.getFlowElement() ) );
337      elementDuct.getOutgoingScopes().addAll( node.getNextScopes( elementDuct.getFlowElement() ) );
338      }
339    }
340  }