001/*
002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow.stream.graph;
022
023import java.util.Collection;
024import java.util.Collections;
025import java.util.List;
026import java.util.Set;
027import java.util.TreeSet;
028
029import cascading.flow.FlowElement;
030import cascading.flow.FlowNode;
031import cascading.flow.FlowProcess;
032import cascading.flow.planner.Scope;
033import cascading.flow.planner.graph.AnnotatedGraph;
034import cascading.flow.planner.graph.ElementGraph;
035import cascading.flow.planner.graph.Extent;
036import cascading.flow.stream.annotations.BlockingMode;
037import cascading.flow.stream.duct.Duct;
038import cascading.flow.stream.duct.Gate;
039import cascading.flow.stream.element.AggregatorEveryStage;
040import cascading.flow.stream.element.BufferEveryWindow;
041import cascading.flow.stream.element.ElementDuct;
042import cascading.flow.stream.element.ElementFlowProcess;
043import cascading.flow.stream.element.FilterEachStage;
044import cascading.flow.stream.element.FunctionEachStage;
045import cascading.flow.stream.element.GroupAssertionEveryStage;
046import cascading.flow.stream.element.GroupingSpliceGate;
047import cascading.flow.stream.element.MemoryCoGroupGate;
048import cascading.flow.stream.element.MemoryHashJoinGate;
049import cascading.flow.stream.element.MergeStage;
050import cascading.flow.stream.element.SinkStage;
051import cascading.flow.stream.element.SourceStage;
052import cascading.flow.stream.element.TrapHandler;
053import cascading.flow.stream.element.ValueAssertionEachStage;
054import cascading.pipe.Boundary;
055import cascading.pipe.CoGroup;
056import cascading.pipe.Each;
057import cascading.pipe.Every;
058import cascading.pipe.GroupBy;
059import cascading.pipe.HashJoin;
060import cascading.pipe.Merge;
061import cascading.pipe.Pipe;
062import cascading.pipe.Splice;
063import cascading.tap.Tap;
064import cascading.util.Util;
065
066/**
067 *
068 */
069public abstract class NodeStreamGraph extends StreamGraph
070  {
071  protected FlowProcess flowProcess;
072  protected final FlowNode node;
073  protected FlowElement streamedSource;
074  protected final ElementGraph elementGraph;
075
076  public NodeStreamGraph( FlowProcess flowProcess, FlowNode node )
077    {
078    this.flowProcess = flowProcess;
079    this.node = node;
080    this.elementGraph = node.getElementGraph();
081    }
082
083  public NodeStreamGraph( FlowProcess flowProcess, FlowNode node, FlowElement streamedSource )
084    {
085    this.flowProcess = flowProcess;
086    this.node = node;
087    this.elementGraph = streamedSource == null ? node.getElementGraph() : node.getPipelineGraphFor( streamedSource );
088    this.streamedSource = streamedSource;
089    }
090
091  protected Object getProperty( String name )
092    {
093    return flowProcess.getProperty( name );
094    }
095
096  protected void handleDuct( FlowElement lhsElement, Duct lhsDuct )
097    {
098    List<FlowElement> successors = elementGraph.successorListOf( lhsElement );
099
100    if( successors.contains( Extent.tail ) )
101      addTail( lhsDuct );
102    else
103      handleSuccessors( lhsDuct, successors );
104    }
105
106  private void handleSuccessors( Duct lhsDuct, List<FlowElement> successors )
107    {
108    for( FlowElement rhsElement : successors )
109      {
110      if( rhsElement instanceof Extent )
111        continue;
112
113      boolean isSink = elementGraph.successorListOf( rhsElement ).contains( Extent.tail );
114      boolean isSource = elementGraph.predecessorListOf( rhsElement ).contains( Extent.head );
115
116      IORole role = IORole.pass;
117
118      if( isSource && !isSink )
119        role = IORole.source;
120      else if( !isSource && isSink )
121        role = IORole.sink;
122      else if( isSource && isSink )
123        role = IORole.both;
124
125      Duct newRhsDuct = createDuctFor( rhsElement, role );
126      Duct rhsDuct = findExisting( newRhsDuct );
127
128      int ordinal = findEdgeOrdinal( lhsDuct, rhsDuct );
129
130      addPath( lhsDuct, ordinal, rhsDuct );
131
132      if( rhsDuct != newRhsDuct ) // don't keep going if we have already seen rhs
133        continue;
134
135      handleDuct( rhsElement, rhsDuct );
136      }
137    }
138
139  private int findEdgeOrdinal( Duct lhsDuct, Duct rhsDuct )
140    {
141    if( !( rhsDuct instanceof GroupingSpliceGate ) )
142      return 0;
143
144    FlowElement lhsElement = ( (ElementDuct) lhsDuct ).getFlowElement();
145    FlowElement rhsElement = ( (ElementDuct) rhsDuct ).getFlowElement();
146
147    Set<Scope> allEdges = elementGraph.getAllEdges( lhsElement, rhsElement );
148
149    if( allEdges.size() == 1 )
150      return Util.getFirst( allEdges ).getOrdinal();
151
152    throw new IllegalStateException( "could not find ordinal, too many edges between elements" );
153    }
154
155  private Duct createDuctFor( FlowElement element, IORole role )
156    {
157    Duct rhsDuct;
158
159    if( element instanceof Each )
160      {
161      Each eachElement = (Each) element;
162
163      if( eachElement.isFunction() )
164        rhsDuct = new FunctionEachStage( flowProcess, eachElement );
165      else if( eachElement.isFilter() )
166        rhsDuct = new FilterEachStage( flowProcess, eachElement );
167      else if( eachElement.isValueAssertion() )
168        rhsDuct = new ValueAssertionEachStage( flowProcess, eachElement );
169      else
170        throw new IllegalStateException( "unknown operation: " + eachElement.getOperation().getClass().getCanonicalName() );
171      }
172    else if( element instanceof Every )
173      {
174      Every everyElement = (Every) element;
175
176      if( everyElement.isBuffer() )
177        rhsDuct = new BufferEveryWindow( flowProcess, everyElement );
178      else if( everyElement.isAggregator() )
179        rhsDuct = new AggregatorEveryStage( flowProcess, everyElement );
180      else if( everyElement.isGroupAssertion() )
181        rhsDuct = new GroupAssertionEveryStage( flowProcess, everyElement );
182      else
183        throw new IllegalStateException( "unknown operation: " + everyElement.getOperation().getClass().getCanonicalName() );
184      }
185    else if( element instanceof Boundary )
186      {
187      rhsDuct = createBoundaryStage( (Boundary) element, role );
188      }
189    else if( element instanceof Splice )
190      {
191      Splice spliceElement = (Splice) element;
192
193      if( spliceElement.isGroupBy() )
194        rhsDuct = createGroupByGate( (GroupBy) spliceElement, role );
195      else if( spliceElement.isCoGroup() )
196        rhsDuct = createCoGroupGate( (CoGroup) spliceElement, role );
197      else if( spliceElement.isMerge() )
198        rhsDuct = createMergeStage( (Merge) element, role );
199      else
200        rhsDuct = createHashJoinGate( (HashJoin) element );
201      }
202    else if( element instanceof Tap )
203      {
204      rhsDuct = createSinkStage( (Tap) element );
205      }
206    else
207      throw new IllegalStateException( "unknown element type: " + element.getClass().getName() );
208
209    return rhsDuct;
210    }
211
212  protected Duct createBoundaryStage( Boundary element, IORole role )
213    {
214    // could return MergeStage at this point as they are roughly equivalent
215    throw new UnsupportedOperationException( "boundary not supported by planner" );
216    }
217
218  protected SinkStage createSinkStage( Tap element )
219    {
220    return new SinkStage( flowProcess, element );
221    }
222
223  protected abstract Gate createCoGroupGate( CoGroup element, IORole role );
224
225  protected abstract Gate createGroupByGate( GroupBy element, IORole role );
226
227  protected Duct createMergeStage( Merge merge, IORole both )
228    {
229    return new MergeStage( flowProcess, merge );
230    }
231
232  protected Gate createHashJoinGate( HashJoin join )
233    {
234    if( join.getNumSelfJoins() != 0 )
235      return createBlockingJoinGate( join );
236
237    // lets not block the streamed side unless it will cause a deadlock
238    if( hasElementAnnotation( BlockingMode.Blocked, join ) )
239      return createBlockingJoinGate( join );
240
241    return createNonBlockingJoinGate( join );
242    }
243
244  private boolean hasElementAnnotation( Enum annotation, FlowElement flowElement )
245    {
246    if( !( (AnnotatedGraph) elementGraph ).hasAnnotations() )
247      return false;
248
249    return ( (AnnotatedGraph) elementGraph ).getAnnotations().hadKey( annotation, flowElement );
250    }
251
252  protected GroupingSpliceGate createNonBlockingJoinGate( HashJoin join )
253    {
254    return new MemoryHashJoinGate( flowProcess, join );
255    }
256
257  protected MemoryCoGroupGate createBlockingJoinGate( HashJoin join )
258    {
259    return new MemoryCoGroupGate( flowProcess, join );
260    }
261
262  protected Duct findExisting( Duct current )
263    {
264    Collection<Duct> allDucts = getAllDucts();
265
266    for( Duct duct : allDucts )
267      {
268      if( duct.equals( current ) )
269        return duct;
270      }
271
272    return current;
273    }
274
275  protected void setTraps()
276    {
277    Collection<Duct> ducts = getAllDucts();
278
279    for( Duct duct : ducts )
280      {
281      if( !( duct instanceof ElementDuct ) )
282        continue;
283
284      ElementDuct elementDuct = (ElementDuct) duct;
285      FlowElement flowElement = elementDuct.getFlowElement();
286
287      Set<String> branchNames = new TreeSet<String>();
288
289      if( flowElement instanceof Pipe )
290        branchNames.add( ( (Pipe) flowElement ).getName() );
291      else if( flowElement instanceof Tap )
292        branchNames.addAll( getTapBranchNamesFor( duct ) );
293      else
294        throw new IllegalStateException( "unexpected duct type" + duct.getClass().getCanonicalName() );
295
296      elementDuct.setBranchNames( branchNames );
297
298      for( String branchName : branchNames )
299        {
300        Tap trap = node.getTrap( branchName );
301
302        if( trap != null )
303          {
304          FlowProcess elementFlowProcess = new ElementFlowProcess( flowProcess, trap.getConfigDef() );
305          elementDuct.setTrapHandler( new TrapHandler( elementFlowProcess, flowElement, trap, branchName ) );
306          break;
307          }
308        }
309
310      if( !elementDuct.hasTrapHandler() )
311        elementDuct.setTrapHandler( new TrapHandler( flowProcess ) );
312      }
313    }
314
315  /**
316   * Returns a Set as a given tap may be bound to multiple branches
317   *
318   * @param duct
319   * @return
320   */
321  private Set<String> getTapBranchNamesFor( Duct duct )
322    {
323    if( ( (Tap) ( (ElementDuct) duct ).getFlowElement() ).isTemporary() )
324      return Collections.emptySet();
325
326    if( duct instanceof SourceStage )
327      return node.getSourceTapNames( (Tap) ( (SourceStage) duct ).getFlowElement() );
328    else if( duct instanceof SinkStage )
329      return node.getSinkTapNames( (Tap) ( (SinkStage) duct ).getFlowElement() );
330    else
331      throw new IllegalStateException( "duct does not wrap a Tap: " + duct.getClass().getCanonicalName() );
332    }
333
334  protected void setScopes()
335    {
336    Collection<Duct> ducts = getAllDucts();
337
338    for( Duct duct : ducts )
339      {
340      if( !( duct instanceof ElementDuct ) )
341        continue;
342
343      ElementDuct elementDuct = (ElementDuct) duct;
344
345      // get the actual incoming/outgoing scopes for the full node as we need the total number of branches
346      elementDuct.getIncomingScopes().addAll( node.getPreviousScopes( elementDuct.getFlowElement() ) );
347      elementDuct.getOutgoingScopes().addAll( node.getNextScopes( elementDuct.getFlowElement() ) );
348      }
349    }
350  }