001    /*
002     * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.flow.stream;
022    
023    import java.util.Collection;
024    import java.util.List;
025    import java.util.Set;
026    import java.util.TreeSet;
027    
028    import cascading.flow.FlowElement;
029    import cascading.flow.FlowProcess;
030    import cascading.flow.planner.BaseFlowStep;
031    import cascading.flow.planner.Scope;
032    import cascading.pipe.CoGroup;
033    import cascading.pipe.Each;
034    import cascading.pipe.Every;
035    import cascading.pipe.GroupBy;
036    import cascading.pipe.HashJoin;
037    import cascading.pipe.Merge;
038    import cascading.pipe.Pipe;
039    import cascading.pipe.Splice;
040    import cascading.tap.Tap;
041    import org.jgrapht.GraphPath;
042    
043    import static cascading.flow.planner.ElementGraphs.getAllShortestPathsBetween;
044    
045    /**
046     *
047     */
048    public abstract class StepStreamGraph extends StreamGraph
049      {
050      protected FlowProcess flowProcess;
051      protected final BaseFlowStep step;
052    
053      public StepStreamGraph( FlowProcess flowProcess, BaseFlowStep step )
054        {
055        this.flowProcess = flowProcess;
056        this.step = step;
057        }
058    
059      protected Object getProperty( String name )
060        {
061        return flowProcess.getProperty( name );
062        }
063    
064      protected void handleDuct( FlowElement lhsElement, Duct lhsDuct )
065        {
066        List<FlowElement> successors = step.getSuccessors( lhsElement );
067    
068        if( !stopOnElement( lhsElement, successors ) )
069          handleSuccessors( lhsDuct, successors );
070        else
071          addTail( lhsDuct );
072        }
073    
074      protected abstract boolean stopOnElement( FlowElement lhsElement, List<FlowElement> successors );
075    
076      private void handleSuccessors( Duct lhsDuct, List<FlowElement> successors )
077        {
078        for( FlowElement rhsElement : successors )
079          {
080          Duct newRhsDuct = createDuctFor( rhsElement );
081          Duct rhsDuct = findExisting( newRhsDuct );
082    
083          int ordinal = findEdgeOrdinal( lhsDuct, rhsDuct );
084    
085          addPath( lhsDuct, ordinal, rhsDuct );
086    
087          if( rhsDuct != newRhsDuct ) // don't keep going if we have already seen rhs
088            continue;
089    
090          handleDuct( rhsElement, rhsDuct );
091          }
092        }
093    
094      private int findEdgeOrdinal( Duct lhsDuct, Duct rhsDuct )
095        {
096        if( !( rhsDuct instanceof SpliceGate ) )
097          return 0;
098    
099        FlowElement lhsElement = ( (ElementDuct) lhsDuct ).getFlowElement();
100        Splice rhsElement = (Splice) ( (SpliceGate) rhsDuct ).getFlowElement();
101    
102        List<GraphPath<FlowElement, Scope>> paths = getAllShortestPathsBetween( step.getGraph(), lhsElement, rhsElement );
103    
104        for( GraphPath<FlowElement, Scope> path : paths )
105          {
106          if( path.getEdgeList().size() == 1 )
107            return rhsElement.getPipePos().get( path.getEdgeList().get( 0 ).getName() );
108          }
109    
110        throw new IllegalStateException( "could not find ordinal" );
111        }
112    
113      private Duct createDuctFor( FlowElement element )
114        {
115        Duct rhsDuct;
116    
117        if( element instanceof Each )
118          {
119          Each eachElement = (Each) element;
120    
121          if( eachElement.isFunction() )
122            rhsDuct = new FunctionEachStage( flowProcess, eachElement );
123          else if( eachElement.isFilter() )
124            rhsDuct = new FilterEachStage( flowProcess, eachElement );
125          else if( eachElement.isValueAssertion() )
126            rhsDuct = new ValueAssertionEachStage( flowProcess, eachElement );
127          else
128            throw new IllegalStateException( "unknown operation: " + eachElement.getOperation().getClass().getCanonicalName() );
129          }
130        else if( element instanceof Every )
131          {
132          Every everyElement = (Every) element;
133    
134          if( everyElement.isBuffer() )
135            rhsDuct = new BufferEveryWindow( flowProcess, everyElement );
136          else if( everyElement.isAggregator() )
137            rhsDuct = new AggregatorEveryStage( flowProcess, everyElement );
138          else if( everyElement.isGroupAssertion() )
139            rhsDuct = new GroupAssertionEveryStage( flowProcess, everyElement );
140          else
141            throw new IllegalStateException( "unknown operation: " + everyElement.getOperation().getClass().getCanonicalName() );
142          }
143        else if( element instanceof Splice )
144          {
145          Splice spliceElement = (Splice) element;
146    
147          if( spliceElement.isGroupBy() )
148            rhsDuct = createGroupByGate( (GroupBy) spliceElement );
149          else if( spliceElement.isCoGroup() )
150            rhsDuct = createCoGroupGate( (CoGroup) spliceElement );
151          else if( spliceElement.isMerge() )
152            rhsDuct = createMergeStage( (Merge) element );
153          else
154            rhsDuct = createHashJoinGate( (HashJoin) element );
155          }
156        else if( element instanceof Tap )
157          {
158          rhsDuct = createSinkStage( (Tap) element );
159          }
160        else
161          throw new IllegalStateException( "unknown element type: " + element.getClass().getName() );
162    
163        return rhsDuct;
164        }
165    
166      protected SinkStage createSinkStage( Tap element )
167        {
168        return new SinkStage( flowProcess, element );
169        }
170    
171      protected abstract Gate createCoGroupGate( CoGroup element );
172    
173      protected abstract Gate createGroupByGate( GroupBy element );
174    
175      protected Duct createMergeStage( Merge merge )
176        {
177        return new MergeStage( flowProcess, merge );
178        }
179    
180      protected Gate createHashJoinGate( HashJoin join )
181        {
182        if( join.getNumSelfJoins() != 0 )
183          return createBlockingJoinGate( join );
184    
185        // lets not block the streamed side unless it will cause a deadlock
186        if( joinHasSameStreamedSource( join ) )
187          return createBlockingJoinGate( join );
188    
189        return createNonBlockingJoinGate( join );
190        }
191    
192      protected MemoryHashJoinGate createNonBlockingJoinGate( HashJoin join )
193        {
194        return new MemoryHashJoinGate( flowProcess, join );
195        }
196    
197      protected MemoryCoGroupGate createBlockingJoinGate( HashJoin join )
198        {
199        return new MemoryCoGroupGate( flowProcess, join );
200        }
201    
202      private boolean joinHasSameStreamedSource( HashJoin join )
203        {
204        if( !step.getStreamedSourceByJoin().isEmpty() )
205          {
206          // if streamed source is multi path
207          Object tap = step.getStreamedSourceByJoin().get( join );
208    
209          return getNumImmediateBranches( (FlowElement) tap, join ) > 1;
210          }
211    
212        // means we are in local mode if joins is empty
213        for( Object tap : step.getSources() )
214          {
215          if( getNumImmediateBranches( (FlowElement) tap, join ) > 1 )
216            return true;
217          }
218    
219        return false;
220        }
221    
222      private int getNumImmediateBranches( FlowElement tap, HashJoin join )
223        {
224        return getAllShortestPathsBetween( step.getGraph(), tap, join ).size();
225        }
226    
227      protected Duct findExisting( Duct current )
228        {
229        Collection<Duct> allDucts = getAllDucts();
230    
231        for( Duct duct : allDucts )
232          {
233          if( duct.equals( current ) )
234            return duct;
235          }
236    
237        return current;
238        }
239    
240      protected void setTraps()
241        {
242        Collection<Duct> ducts = getAllDucts();
243    
244        for( Duct duct : ducts )
245          {
246          if( !( duct instanceof ElementDuct ) )
247            continue;
248    
249          ElementDuct elementDuct = (ElementDuct) duct;
250          FlowElement flowElement = elementDuct.getFlowElement();
251    
252          Set<String> branchNames = new TreeSet<String>();
253    
254          if( flowElement instanceof Pipe )
255            branchNames.add( ( (Pipe) flowElement ).getName() );
256          else if( flowElement instanceof Tap )
257            branchNames.addAll( getTapBranchNamesFor( duct ) );
258          else
259            throw new IllegalStateException( "unexpected duct type" + duct.getClass().getCanonicalName() );
260    
261          elementDuct.setBranchNames( branchNames );
262    
263          for( String branchName : branchNames )
264            {
265            Tap trap = step.getTrap( branchName );
266    
267            if( trap != null )
268              {
269              FlowProcess elementFlowProcess = new ElementFlowProcess( flowProcess, trap.getConfigDef() );
270              elementDuct.setTrapHandler( new TrapHandler( elementFlowProcess, flowElement, trap, branchName ) );
271              break;
272              }
273            }
274    
275          if( !elementDuct.hasTrapHandler() )
276            elementDuct.setTrapHandler( new TrapHandler( flowProcess ) );
277          }
278        }
279    
280      /**
281       * Returns a Set as a given tap may be bound to multiple branches
282       *
283       * @param duct
284       * @return
285       */
286      private Set<String> getTapBranchNamesFor( Duct duct )
287        {
288        if( duct instanceof SourceStage )
289          return step.getSourceName( (Tap) ( (SourceStage) duct ).getFlowElement() );
290        else if( duct instanceof SinkStage )
291          return step.getSinkName( (Tap) ( (SinkStage) duct ).getFlowElement() );
292        else
293          throw new IllegalStateException( "duct does not wrap a Tap: " + duct.getClass().getCanonicalName() );
294        }
295    
296      protected void setScopes()
297        {
298        Collection<Duct> ducts = getAllDucts();
299    
300        for( Duct duct : ducts )
301          {
302          if( !( duct instanceof ElementDuct ) )
303            continue;
304    
305          ElementDuct elementDuct = (ElementDuct) duct;
306    
307          elementDuct.getIncomingScopes().addAll( step.getPreviousScopes( elementDuct.getFlowElement() ) );
308          elementDuct.getOutgoingScopes().addAll( step.getNextScopes( elementDuct.getFlowElement() ) );
309          }
310        }
311      }