001    /*
002     * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.flow.planner;
022    
023    import java.io.File;
024    import java.io.FileWriter;
025    import java.io.IOException;
026    import java.io.Writer;
027    import java.util.ArrayList;
028    import java.util.Collection;
029    import java.util.HashMap;
030    import java.util.HashSet;
031    import java.util.LinkedList;
032    import java.util.List;
033    import java.util.Map;
034    import java.util.Set;
035    
036    import cascading.flow.FlowElement;
037    import cascading.operation.PlannedOperation;
038    import cascading.operation.PlannerLevel;
039    import cascading.pipe.Checkpoint;
040    import cascading.pipe.CoGroup;
041    import cascading.pipe.Each;
042    import cascading.pipe.Every;
043    import cascading.pipe.Group;
044    import cascading.pipe.Operator;
045    import cascading.pipe.Pipe;
046    import cascading.pipe.Splice;
047    import cascading.pipe.SubAssembly;
048    import cascading.property.AppProps;
049    import cascading.tap.Tap;
050    import cascading.util.Util;
051    import cascading.util.Version;
052    import org.jgrapht.GraphPath;
053    import org.jgrapht.Graphs;
054    import org.jgrapht.alg.KShortestPaths;
055    import org.jgrapht.ext.EdgeNameProvider;
056    import org.jgrapht.ext.IntegerNameProvider;
057    import org.jgrapht.ext.VertexNameProvider;
058    import org.jgrapht.graph.SimpleDirectedGraph;
059    import org.jgrapht.traverse.DepthFirstIterator;
060    import org.jgrapht.traverse.TopologicalOrderIterator;
061    import org.slf4j.Logger;
062    import org.slf4j.LoggerFactory;
063    
064    /** Class ElementGraph represents the executable FlowElement graph. */
065    public class ElementGraph extends SimpleDirectedGraph<FlowElement, Scope>
066      {
067      /** Field LOG */
068      private static final Logger LOG = LoggerFactory.getLogger( ElementGraph.class );
069    
070      /** Field head */
071      public static final Extent head = new Extent( "head" );
072      /** Field tail */
073      public static final Extent tail = new Extent( "tail" );
074      /** Field resolved */
075      private boolean resolved;
076    
077      private PlatformInfo platformInfo;
078      /** Field sources */
079      private Map<String, Tap> sources;
080      /** Field sinks */
081      private Map<String, Tap> sinks;
082      /** Field traps */
083      private Map<String, Tap> traps;
084      /** Field checkpoints */
085      private Map<String, Tap> checkpoints;
086      /** Field requireUniqueCheckpoints */
087      private boolean requireUniqueCheckpoints;
088      /** Field assertionLevel */
089      private PlannerLevel[] plannerLevels;
090    
091      ElementGraph()
092        {
093        super( Scope.class );
094        }
095    
096      public ElementGraph( ElementGraph elementGraph )
097        {
098        this();
099        this.platformInfo = elementGraph.platformInfo;
100        this.sources = elementGraph.sources;
101        this.sinks = elementGraph.sinks;
102        this.traps = elementGraph.traps;
103        this.checkpoints = elementGraph.checkpoints;
104        this.plannerLevels = elementGraph.plannerLevels;
105        this.requireUniqueCheckpoints = elementGraph.requireUniqueCheckpoints;
106    
107        Graphs.addAllVertices( this, elementGraph.vertexSet() );
108        Graphs.addAllEdges( this, elementGraph, elementGraph.edgeSet() );
109        }
110    
111      /**
112       * Constructor ElementGraph creates a new ElementGraph instance.
113       *
114       * @param pipes   of type Pipe[]
115       * @param sources of type Map<String, Tap>
116       * @param sinks   of type Map<String, Tap>
117       */
118      public ElementGraph( PlatformInfo platformInfo, Pipe[] pipes, Map<String, Tap> sources, Map<String, Tap> sinks, Map<String, Tap> traps, Map<String, Tap> checkpoints, boolean requireUniqueCheckpoints, PlannerLevel... plannerLevels )
119        {
120        super( Scope.class );
121        this.platformInfo = platformInfo;
122        this.sources = sources;
123        this.sinks = sinks;
124        this.traps = traps;
125        this.checkpoints = checkpoints;
126        this.requireUniqueCheckpoints = requireUniqueCheckpoints;
127        this.plannerLevels = plannerLevels;
128    
129        assembleGraph( pipes, sources, sinks );
130    
131        verifyGraph();
132        }
133    
134      public Map<String, Tap> getSourceMap()
135        {
136        return sources;
137        }
138    
139      public Map<String, Tap> getSinkMap()
140        {
141        return sinks;
142        }
143    
144      public Map<String, Tap> getTrapMap()
145        {
146        return traps;
147        }
148    
149      public Map<String, Tap> getCheckpointsMap()
150        {
151        return checkpoints;
152        }
153    
154      public Collection<Tap> getSources()
155        {
156        return sources.values();
157        }
158    
159      public Collection<Tap> getSinks()
160        {
161        return sinks.values();
162        }
163    
164      public Collection<Tap> getTraps()
165        {
166        return traps.values();
167        }
168    
169      private void assembleGraph( Pipe[] pipes, Map<String, Tap> sources, Map<String, Tap> sinks )
170        {
171        HashMap<String, Tap> sourcesCopy = new HashMap<String, Tap>( sources );
172        HashMap<String, Tap> sinksCopy = new HashMap<String, Tap>( sinks );
173    
174        for( Pipe pipe : pipes )
175          makeGraph( pipe, sourcesCopy, sinksCopy );
176    
177        addExtents( sources, sinks );
178        }
179    
180      /** Method verifyGraphConnections ... */
181      private void verifyGraph()
182        {
183        if( vertexSet().isEmpty() )
184          return;
185    
186        Set<String> checkpointNames = new HashSet<String>();
187    
188        // need to verify that only Extent instances are origins in this graph. Otherwise a Tap was not properly connected
189        TopologicalOrderIterator<FlowElement, Scope> iterator = getTopologicalIterator();
190    
191        FlowElement flowElement = null;
192    
193        while( iterator.hasNext() )
194          {
195          try
196            {
197            flowElement = iterator.next();
198            }
199          catch( IllegalArgumentException exception )
200            {
201            if( flowElement == null )
202              throw new ElementGraphException( "unable to traverse to the first element" );
203    
204            throw new ElementGraphException( flowElement, "unable to traverse to the next element after " + flowElement );
205            }
206    
207          if( requireUniqueCheckpoints && flowElement instanceof Checkpoint )
208            {
209            String name = ( (Checkpoint) flowElement ).getName();
210    
211            if( checkpointNames.contains( name ) )
212              throw new ElementGraphException( (Pipe) flowElement, "may not have duplicate checkpoint names in assembly, found: " + name );
213    
214            checkpointNames.add( name );
215            }
216    
217          if( incomingEdgesOf( flowElement ).size() != 0 && outgoingEdgesOf( flowElement ).size() != 0 )
218            continue;
219    
220          if( flowElement instanceof Extent )
221            continue;
222    
223          if( flowElement instanceof Pipe )
224            {
225            if( incomingEdgesOf( flowElement ).size() == 0 )
226              throw new ElementGraphException( (Pipe) flowElement, "no Tap connected to head Pipe: " + flowElement + ", possible ambiguous branching, try explicitly naming heads" );
227            else
228              throw new ElementGraphException( (Pipe) flowElement, "no Tap connected to tail Pipe: " + flowElement + ", possible ambiguous branching, try explicitly naming tails" );
229            }
230    
231          if( flowElement instanceof Tap )
232            throw new ElementGraphException( (Tap) flowElement, "no Pipe connected to Tap: " + flowElement );
233          else
234            throw new ElementGraphException( flowElement, "unknown element type: " + flowElement );
235          }
236        }
237    
238      /**
239       * Method copyGraph returns a partial copy of the current ElementGraph. Only Vertices and Edges are copied.
240       *
241       * @return ElementGraph
242       */
243      public ElementGraph copyElementGraph()
244        {
245        ElementGraph copy = new ElementGraph();
246        Graphs.addGraph( copy, this );
247    
248        copy.traps = new HashMap<String, Tap>( this.traps );
249    
250        return copy;
251        }
252    
253      /**
254       * created to support the ability to generate all paths between the head and tail of the process.
255       *
256       * @param sources
257       * @param sinks
258       */
259      private void addExtents( Map<String, Tap> sources, Map<String, Tap> sinks )
260        {
261        addVertex( head );
262    
263        for( String source : sources.keySet() )
264          {
265          Scope scope = addEdge( head, sources.get( source ) );
266    
267          // edge may already exist, if so, above returns null
268          if( scope != null )
269            scope.setName( source );
270          }
271    
272        addVertex( tail );
273    
274        for( String sink : sinks.keySet() )
275          {
276          Scope scope;
277    
278          try
279            {
280            scope = addEdge( sinks.get( sink ), tail );
281            }
282          catch( IllegalArgumentException exception )
283            {
284            throw new ElementGraphException( "missing pipe for sink tap: [" + sink + "]" );
285            }
286    
287          if( scope == null )
288            throw new ElementGraphException( "cannot sink to the same path from multiple branches: [" + Util.join( sinks.values() ) + "]" );
289    
290          scope.setName( sink );
291          }
292        }
293    
294      /**
295       * Performs one rule check, verifies group does not join duplicate tap resources.
296       * <p/>
297       * Scopes are always named after the source side of the source -> target relationship
298       */
299      private void makeGraph( Pipe current, Map<String, Tap> sources, Map<String, Tap> sinks )
300        {
301        if( LOG.isDebugEnabled() )
302          LOG.debug( "adding pipe: " + current );
303    
304        if( current instanceof SubAssembly )
305          {
306          for( Pipe pipe : SubAssembly.unwind( current.getPrevious() ) )
307            makeGraph( pipe, sources, sinks );
308    
309          return;
310          }
311    
312        if( containsVertex( current ) )
313          return;
314    
315        addVertex( current );
316    
317        Tap sink = sinks.remove( current.getName() );
318    
319        if( sink != null )
320          {
321          if( LOG.isDebugEnabled() )
322            LOG.debug( "adding sink: " + sink );
323    
324          addVertex( sink );
325    
326          if( LOG.isDebugEnabled() )
327            LOG.debug( "adding edge: " + current + " -> " + sink );
328    
329          addEdge( current, sink ).setName( current.getName() ); // name scope after sink
330          }
331    
332        // PipeAssemblies should always have a previous
333        if( SubAssembly.unwind( current.getPrevious() ).length == 0 )
334          {
335          Tap source = sources.remove( current.getName() );
336    
337          if( source != null )
338            {
339            if( LOG.isDebugEnabled() )
340              LOG.debug( "adding source: " + source );
341    
342            addVertex( source );
343    
344            if( LOG.isDebugEnabled() )
345              LOG.debug( "adding edge: " + source + " -> " + current );
346    
347            addEdge( source, current ).setName( current.getName() ); // name scope after source
348            }
349          }
350    
351        for( Pipe previous : SubAssembly.unwind( current.getPrevious() ) )
352          {
353          makeGraph( previous, sources, sinks );
354    
355          if( LOG.isDebugEnabled() )
356            LOG.debug( "adding edge: " + previous + " -> " + current );
357    
358          if( getEdge( previous, current ) != null )
359            throw new ElementGraphException( previous, "cannot distinguish pipe branches, give pipe unique name: " + previous );
360    
361          addEdge( previous, current ).setName( previous.getName() ); // name scope after previous pipe
362          }
363        }
364    
365      /**
366       * Method getTopologicalIterator returns the topologicalIterator of this ElementGraph object.
367       *
368       * @return the topologicalIterator (type TopologicalOrderIterator<FlowElement, Scope>) of this ElementGraph object.
369       */
370      public TopologicalOrderIterator<FlowElement, Scope> getTopologicalIterator()
371        {
372        return new TopologicalOrderIterator<FlowElement, Scope>( this );
373        }
374    
375      /**
376       * Method getAllShortestPathsFrom ...
377       *
378       * @param flowElement of type FlowElement
379       * @return List<GraphPath<FlowElement, Scope>>
380       */
381      public List<GraphPath<FlowElement, Scope>> getAllShortestPathsFrom( FlowElement flowElement )
382        {
383        return ElementGraphs.getAllShortestPathsBetween( this, flowElement, tail );
384        }
385    
386      /**
387       * Method getAllShortestPathsTo ...
388       *
389       * @param flowElement of type FlowElement
390       * @return List<GraphPath<FlowElement, Scope>>
391       */
392      public List<GraphPath<FlowElement, Scope>> getAllShortestPathsTo( FlowElement flowElement )
393        {
394        return ElementGraphs.getAllShortestPathsBetween( this, head, flowElement );
395        }
396    
397      /**
398       * Method getAllShortestPathsBetweenExtents returns the allShortestPathsBetweenExtents of this ElementGraph object.
399       *
400       * @return the allShortestPathsBetweenExtents (type List<GraphPath<FlowElement, Scope>>) of this ElementGraph object.
401       */
402      public List<GraphPath<FlowElement, Scope>> getAllShortestPathsBetweenExtents()
403        {
404        List<GraphPath<FlowElement, Scope>> paths = new KShortestPaths<FlowElement, Scope>( this, head, Integer.MAX_VALUE ).getPaths( tail );
405    
406        if( paths == null )
407          return new ArrayList<GraphPath<FlowElement, Scope>>();
408    
409        return paths;
410        }
411    
412      /**
413       * Method getDepthFirstIterator returns the depthFirstIterator of this ElementGraph object.
414       *
415       * @return the depthFirstIterator (type DepthFirstIterator<FlowElement, Scope>) of this ElementGraph object.
416       */
417      public DepthFirstIterator<FlowElement, Scope> getDepthFirstIterator()
418        {
419        return new DepthFirstIterator<FlowElement, Scope>( this, head );
420        }
421    
422      private SimpleDirectedGraph<FlowElement, Scope> copyWithTraps()
423        {
424        ElementGraph copy = this.copyElementGraph();
425    
426        copy.addTraps();
427    
428        return copy;
429        }
430    
431      private void addTraps()
432        {
433        DepthFirstIterator<FlowElement, Scope> iterator = getDepthFirstIterator();
434    
435        while( iterator.hasNext() )
436          {
437          FlowElement element = iterator.next();
438    
439          if( !( element instanceof Pipe ) )
440            continue;
441    
442          Pipe pipe = (Pipe) element;
443          Tap trap = traps.get( pipe.getName() );
444    
445          if( trap == null )
446            continue;
447    
448          addVertex( trap );
449    
450          if( LOG.isDebugEnabled() )
451            LOG.debug( "adding trap edge: " + pipe + " -> " + trap );
452    
453          if( getEdge( pipe, trap ) != null )
454            continue;
455    
456          addEdge( pipe, trap ).setName( pipe.getName() ); // name scope after previous pipe
457          }
458        }
459    
460      /**
461       * Method writeDOT writes this element graph to a DOT file for easy visualization and debugging.
462       *
463       * @param filename of type String
464       */
465      public void writeDOT( String filename )
466        {
467        printElementGraph( filename, this.copyWithTraps() );
468        }
469    
470      protected void printElementGraph( String filename, final SimpleDirectedGraph<FlowElement, Scope> graph )
471        {
472        try
473          {
474          File parentFile = new File( filename ).getParentFile();
475    
476          if( parentFile != null && !parentFile.exists() )
477            parentFile.mkdirs();
478    
479          Writer writer = new FileWriter( filename );
480    
481          Util.writeDOT( writer, graph, new IntegerNameProvider<FlowElement>(), new VertexNameProvider<FlowElement>()
482            {
483            public String getVertexName( FlowElement object )
484              {
485              if( graph.incomingEdgesOf( object ).isEmpty() )
486                {
487                String result = object.toString().replaceAll( "\"", "\'" );
488                String versionString = Version.getRelease();
489    
490                if( platformInfo != null )
491                  versionString = ( versionString == null ? "" : versionString + "\\n" ) + platformInfo;
492    
493                result = versionString == null ? result : result + "\\n" + versionString;
494    
495                return result + "\\napp id: " + AppProps.getApplicationID( null );
496                }
497    
498              if( object instanceof Tap || object instanceof Extent )
499                return object.toString().replaceAll( "\"", "\'" );
500    
501              Scope scope = graph.outgoingEdgesOf( object ).iterator().next();
502    
503              return ( (Pipe) object ).print( scope ).replaceAll( "\"", "\'" );
504              }
505            }, new EdgeNameProvider<Scope>()
506            {
507            public String getEdgeName( Scope object )
508              {
509              return object.toString().replaceAll( "\"", "\'" ).replaceAll( "\n", "\\\\n" ); // fix for newlines in graphviz
510              }
511            }
512          );
513    
514          writer.close();
515          }
516        catch( IOException exception )
517          {
518          LOG.error( "failed printing graph to: {}, with exception: {}", filename, exception );
519          }
520        }
521    
522      /** Method removeEmptyPipes performs a depth first traversal and removes instance of {@link cascading.pipe.Pipe} or {@link cascading.pipe.SubAssembly}. */
523      public void removeUnnecessaryPipes()
524        {
525        while( !internalRemoveUnnecessaryPipes() )
526          ;
527    
528        int numPipes = 0;
529        for( FlowElement flowElement : vertexSet() )
530          {
531          if( flowElement instanceof Pipe )
532            numPipes++;
533          }
534    
535        if( numPipes == 0 )
536          throw new ElementGraphException( "resulting graph has no pipe elements after removing empty Pipe, assertions, and SubAssembly containers" );
537        }
538    
539      private boolean internalRemoveUnnecessaryPipes()
540        {
541        DepthFirstIterator<FlowElement, Scope> iterator = getDepthFirstIterator();
542    
543        while( iterator.hasNext() )
544          {
545          FlowElement flowElement = iterator.next();
546    
547          if( flowElement.getClass() == Pipe.class || flowElement.getClass() == Checkpoint.class ||
548            flowElement instanceof SubAssembly || testPlannerLevel( flowElement ) )
549            {
550            // Pipe class is guaranteed to have one input
551            removeElement( flowElement );
552    
553            return false;
554            }
555          }
556    
557        return true;
558        }
559    
560      private void removeElement( FlowElement flowElement )
561        {
562        LOG.debug( "removing: " + flowElement );
563    
564        Set<Scope> incomingScopes = incomingEdgesOf( flowElement );
565    
566        if( incomingScopes.size() != 1 )
567          throw new IllegalStateException( "flow element:" + flowElement + ", has multiple input paths: " + incomingScopes.size() );
568    
569        Scope incoming = incomingScopes.iterator().next();
570        Set<Scope> outgoingScopes = outgoingEdgesOf( flowElement );
571    
572        // source -> incoming -> flowElement -> outgoing -> target
573        FlowElement source = getEdgeSource( incoming );
574    
575        for( Scope outgoing : outgoingScopes )
576          {
577          FlowElement target = getEdgeTarget( outgoing );
578    
579          addEdge( source, target, new Scope( outgoing ) );
580          }
581    
582        removeVertex( flowElement );
583        }
584    
585      private boolean testPlannerLevel( FlowElement flowElement )
586        {
587        if( !( flowElement instanceof Operator ) )
588          return false;
589    
590        Operator operator = (Operator) flowElement;
591    
592        if( !operator.hasPlannerLevel() )
593          return false;
594    
595        for( PlannerLevel plannerLevel : plannerLevels )
596          {
597          if( ( (PlannedOperation) operator.getOperation() ).supportsPlannerLevel( plannerLevel ) )
598            return operator.getPlannerLevel().isStricterThan( plannerLevel );
599          }
600    
601        throw new IllegalStateException( "encountered unsupported planner level: " + operator.getPlannerLevel().getClass().getName() );
602        }
603    
604      /** Method resolveFields performs a breadth first traversal and resolves the tuple fields between each Pipe instance. */
605      public void resolveFields()
606        {
607        if( resolved )
608          throw new IllegalStateException( "element graph already resolved" );
609    
610        TopologicalOrderIterator<FlowElement, Scope> iterator = getTopologicalIterator();
611    
612        while( iterator.hasNext() )
613          resolveFields( iterator.next() );
614    
615        resolved = true;
616        }
617    
618      private void resolveFields( FlowElement source )
619        {
620        if( source instanceof Extent )
621          return;
622    
623        Set<Scope> incomingScopes = incomingEdgesOf( source );
624        Set<Scope> outgoingScopes = outgoingEdgesOf( source );
625    
626        List<FlowElement> flowElements = Graphs.successorListOf( this, source );
627    
628        if( flowElements.size() == 0 )
629          throw new IllegalStateException( "unable to find next elements in pipeline from: " + source.toString() );
630    
631        Scope outgoingScope = source.outgoingScopeFor( incomingScopes );
632    
633        if( LOG.isDebugEnabled() && outgoingScope != null )
634          {
635          LOG.debug( "for modifier: " + source );
636          if( outgoingScope.getArgumentsSelector() != null )
637            LOG.debug( "setting outgoing arguments: " + outgoingScope.getArgumentsSelector() );
638          if( outgoingScope.getOperationDeclaredFields() != null )
639            LOG.debug( "setting outgoing declared: " + outgoingScope.getOperationDeclaredFields() );
640          if( outgoingScope.getKeySelectors() != null )
641            LOG.debug( "setting outgoing group: " + outgoingScope.getKeySelectors() );
642          if( outgoingScope.getOutValuesSelector() != null )
643            LOG.debug( "setting outgoing values: " + outgoingScope.getOutValuesSelector() );
644          }
645    
646        for( Scope scope : outgoingScopes )
647          scope.copyFields( outgoingScope );
648        }
649    
650      /**
651       * Finds all groups that merge/join streams. returned in topological order.
652       *
653       * @return a List fo Group instances
654       */
655      public List<Group> findAllMergeJoinGroups()
656        {
657        return findAllOfType( 2, 1, Group.class, new LinkedList<Group>() );
658        }
659    
660      /**
661       * Finds all splices that merge/join streams. returned in topological order.
662       *
663       * @return a List fo Group instances
664       */
665      public List<Splice> findAllMergeJoinSplices()
666        {
667        return findAllOfType( 2, 1, Splice.class, new LinkedList<Splice>() );
668        }
669    
670      public List<CoGroup> findAllCoGroups()
671        {
672        return findAllOfType( 2, 1, CoGroup.class, new LinkedList<CoGroup>() );
673        }
674    
675      /**
676       * Method findAllGroups ...
677       *
678       * @return List<Group>
679       */
680      public List<Group> findAllGroups()
681        {
682        return findAllOfType( 1, 1, Group.class, new LinkedList<Group>() );
683        }
684    
685      /**
686       * Method findAllEveries ...
687       *
688       * @return List<Every>
689       */
690      public List<Every> findAllEveries()
691        {
692        return findAllOfType( 1, 1, Every.class, new LinkedList<Every>() );
693        }
694    
695      /**
696       * Method findAllTaps ...
697       *
698       * @return List<Tap>
699       */
700      public List<Tap> findAllTaps()
701        {
702        return findAllOfType( 1, 1, Tap.class, new LinkedList<Tap>() );
703        }
704    
705      /**
706       * Method findAllSplits ...
707       *
708       * @return List<FlowElement>
709       */
710      public List<Each> findAllEachSplits()
711        {
712        return findAllOfType( 1, 2, Each.class, new LinkedList<Each>() );
713        }
714    
715      public List<Pipe> findAllPipeSplits()
716        {
717        return findAllOfType( 1, 2, Pipe.class, new LinkedList<Pipe>() );
718        }
719    
720      /**
721       * Method findAllOfType ...
722       *
723       * @param minInDegree  of type int
724       * @param minOutDegree
725       * @param type         of type Class<P>
726       * @param results      of type List<P>   @return List<P>
727       */
728      public <P> List<P> findAllOfType( int minInDegree, int minOutDegree, Class<P> type, List<P> results )
729        {
730        TopologicalOrderIterator<FlowElement, Scope> topoIterator = getTopologicalIterator();
731    
732        while( topoIterator.hasNext() )
733          {
734          FlowElement flowElement = topoIterator.next();
735    
736          if( type.isInstance( flowElement ) && inDegreeOf( flowElement ) >= minInDegree && outDegreeOf( flowElement ) >= minOutDegree )
737            results.add( (P) flowElement );
738          }
739    
740        return results;
741        }
742    
743      public void insertFlowElementAfter( FlowElement previousElement, FlowElement flowElement )
744        {
745        Set<Scope> outgoing = new HashSet<Scope>( outgoingEdgesOf( previousElement ) );
746    
747        addVertex( flowElement );
748    
749        String name = previousElement.toString();
750    
751        if( previousElement instanceof Pipe )
752          name = ( (Pipe) previousElement ).getName();
753    
754        addEdge( previousElement, flowElement, new Scope( name ) );
755    
756        for( Scope scope : outgoing )
757          {
758          FlowElement target = getEdgeTarget( scope );
759          removeEdge( previousElement, target ); // remove scope
760          addEdge( flowElement, target, scope ); // add scope back
761          }
762        }
763    
764      /** Simple class that acts in as the root of the graph */
765      /**
766       * Method makeTapGraph returns a directed graph of all taps in the current element graph.
767       *
768       * @return SimpleDirectedGraph<Tap, Integer>
769       */
770      public SimpleDirectedGraph<Tap, Integer> makeTapGraph()
771        {
772        SimpleDirectedGraph<Tap, Integer> tapGraph = new SimpleDirectedGraph<Tap, Integer>( Integer.class );
773        List<GraphPath<FlowElement, Scope>> paths = getAllShortestPathsBetweenExtents();
774        int count = 0;
775    
776        if( LOG.isDebugEnabled() )
777          LOG.debug( "found num paths: " + paths.size() );
778    
779        for( GraphPath<FlowElement, Scope> element : paths )
780          {
781          List<Scope> path = element.getEdgeList();
782          Tap lastTap = null;
783    
784          for( Scope scope : path )
785            {
786            FlowElement target = getEdgeTarget( scope );
787    
788            if( target instanceof Extent )
789              continue;
790    
791            if( !( target instanceof Tap ) )
792              continue;
793    
794            tapGraph.addVertex( (Tap) target );
795    
796            if( lastTap != null )
797              {
798              if( LOG.isDebugEnabled() )
799                LOG.debug( "adding tap edge: " + lastTap + " -> " + target );
800    
801              if( tapGraph.getEdge( lastTap, (Tap) target ) == null && !tapGraph.addEdge( lastTap, (Tap) target, count++ ) )
802                throw new ElementGraphException( "could not add graph edge: " + lastTap + " -> " + target );
803              }
804    
805            lastTap = (Tap) target;
806            }
807          }
808    
809        return tapGraph;
810        }
811    
812      public int getMaxNumPathsBetweenElementAndGroupingMergeJoin( FlowElement flowElement )
813        {
814        List<Group> groups = findAllMergeJoinGroups();
815    
816        int maxPaths = 0;
817    
818        if( groups == null )
819          return 0;
820    
821        for( Group group : groups )
822          {
823          if( flowElement != group )
824            {
825            List<GraphPath<FlowElement, Scope>> paths = ElementGraphs.getAllShortestPathsBetween( this, flowElement, group );
826    
827            if( paths != null )
828              maxPaths = Math.max( maxPaths, paths.size() );
829            }
830          }
831    
832        return maxPaths;
833        }
834    
835      public List<FlowElement> getAllSuccessors( FlowElement element )
836        {
837        return Graphs.successorListOf( this, element );
838        }
839    
840      public void replaceElementWith( FlowElement element, FlowElement replacement )
841        {
842        Set<Scope> incoming = new HashSet<Scope>( incomingEdgesOf( element ) );
843        Set<Scope> outgoing = new HashSet<Scope>( outgoingEdgesOf( element ) );
844    
845        if( !containsVertex( replacement ) )
846          addVertex( replacement );
847    
848        for( Scope scope : incoming )
849          {
850          FlowElement source = getEdgeSource( scope );
851          removeEdge( source, element ); // remove scope
852    
853          // drop edge between, if any
854          if( source != replacement )
855            addEdge( source, replacement, scope ); // add scope back
856          }
857    
858        for( Scope scope : outgoing )
859          {
860          FlowElement target = getEdgeTarget( scope );
861          removeEdge( element, target ); // remove scope
862    
863          // drop edge between, if any
864          if( target != replacement )
865            addEdge( replacement, target, scope ); // add scope back
866          }
867    
868        removeVertex( element );
869        }
870    
871      public <A extends FlowElement> Set<A> getAllChildrenOfType( FlowElement flowElement, Class<A> type )
872        {
873        Set<A> allChildren = new HashSet<A>();
874    
875        getAllChildrenOfType( allChildren, flowElement, type );
876    
877        return allChildren;
878        }
879    
880      private <A extends FlowElement> void getAllChildrenOfType( Set<A> allSuccessors, FlowElement flowElement, Class<A> type )
881        {
882        List<FlowElement> successors = getAllSuccessors( flowElement );
883    
884        for( FlowElement successor : successors )
885          {
886          if( type.isInstance( successor ) )
887            allSuccessors.add( (A) successor );
888          else
889            getAllChildrenOfType( allSuccessors, successor, type );
890          }
891        }
892    
893      public Set<FlowElement> getAllChildrenNotExactlyType( FlowElement flowElement, Class<? extends FlowElement> type )
894        {
895        Set<FlowElement> allChildren = new HashSet<FlowElement>();
896    
897        getAllChildrenNotExactlyType( allChildren, flowElement, type );
898    
899        return allChildren;
900        }
901    
902      private void getAllChildrenNotExactlyType( Set<FlowElement> allSuccessors, FlowElement flowElement, Class<? extends FlowElement> type )
903        {
904        List<FlowElement> successors = getAllSuccessors( flowElement );
905    
906        for( FlowElement successor : successors )
907          {
908          if( type != successor.getClass() )
909            allSuccessors.add( successor );
910          else
911            getAllChildrenNotExactlyType( allSuccessors, successor, type );
912          }
913        }
914    
915      public static class Extent extends Pipe
916        {
917    
918        /** @see cascading.pipe.Pipe#Pipe(String) */
919        public Extent( String name )
920          {
921          super( name );
922          }
923    
924        @Override
925        public Scope outgoingScopeFor( Set<Scope> scopes )
926          {
927          return new Scope();
928          }
929    
930        @Override
931        public String toString()
932          {
933          return "[" + getName() + "]";
934          }
935    
936        public boolean equals( Object object )
937          {
938          if( object == null )
939            return false;
940    
941          if( this == object )
942            return true;
943    
944          if( object.getClass() != this.getClass() )
945            return false;
946    
947          return this.getName().equals( ( (Pipe) object ).getName() );
948          }
949        }
950      }