001/*
002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow.planner.graph;
022
023import java.io.File;
024import java.io.FileWriter;
025import java.io.IOException;
026import java.io.Writer;
027import java.util.ArrayList;
028import java.util.Arrays;
029import java.util.Collections;
030import java.util.HashMap;
031import java.util.HashSet;
032import java.util.IdentityHashMap;
033import java.util.Iterator;
034import java.util.LinkedHashSet;
035import java.util.LinkedList;
036import java.util.List;
037import java.util.ListIterator;
038import java.util.Map;
039import java.util.Set;
040
041import cascading.flow.FlowElement;
042import cascading.flow.FlowElements;
043import cascading.flow.planner.PlatformInfo;
044import cascading.flow.planner.Scope;
045import cascading.flow.planner.iso.expression.ElementCapture;
046import cascading.flow.planner.iso.expression.ExpressionGraph;
047import cascading.flow.planner.iso.expression.FlowElementExpression;
048import cascading.flow.planner.iso.expression.TypeExpression;
049import cascading.flow.planner.iso.finder.SearchOrder;
050import cascading.flow.planner.iso.subgraph.SubGraphIterator;
051import cascading.flow.planner.iso.subgraph.iterator.ExpressionSubGraphIterator;
052import cascading.flow.planner.process.ProcessGraph;
053import cascading.flow.planner.process.ProcessModel;
054import cascading.pipe.Group;
055import cascading.pipe.Operator;
056import cascading.pipe.Pipe;
057import cascading.pipe.Splice;
058import cascading.tap.Tap;
059import cascading.util.DOTProcessGraphWriter;
060import cascading.util.Murmur3;
061import cascading.util.Pair;
062import cascading.util.Util;
063import cascading.util.Version;
064import cascading.util.jgrapht.ComponentAttributeProvider;
065import cascading.util.jgrapht.EdgeNameProvider;
066import cascading.util.jgrapht.IntegerNameProvider;
067import cascading.util.jgrapht.VertexNameProvider;
068import org.jgrapht.DirectedGraph;
069import org.jgrapht.Graph;
070import org.jgrapht.GraphPath;
071import org.jgrapht.Graphs;
072import org.jgrapht.alg.DijkstraShortestPath;
073import org.jgrapht.alg.FloydWarshallShortestPaths;
074import org.jgrapht.alg.KShortestPaths;
075import org.jgrapht.graph.AbstractGraph;
076import org.jgrapht.graph.EdgeReversedGraph;
077import org.jgrapht.graph.SimpleDirectedGraph;
078import org.jgrapht.traverse.TopologicalOrderIterator;
079import org.jgrapht.util.TypeUtil;
080import org.slf4j.Logger;
081import org.slf4j.LoggerFactory;
082
083import static cascading.util.Util.*;
084import static java.lang.Double.POSITIVE_INFINITY;
085
086/**
087 * Class ElementGraphs maintains a collection of operations that can be performed on an {@link ElementGraph}.
088 */
089public class ElementGraphs
090  {
091  private static final Logger LOG = LoggerFactory.getLogger( ElementGraphs.class );
092
093  // not for instantiation
094  private ElementGraphs()
095    {
096    }
097
098  private static class IdentityDirectedGraph<V, E> extends SimpleDirectedGraph<V, E>
099    {
100    public IdentityDirectedGraph( Class<? extends E> edgeClass )
101      {
102      super( edgeClass );
103      }
104
105    @Override
106    protected DirectedSpecifics createDirectedSpecifics()
107      {
108      return new DirectedSpecifics( new IdentityHashMap<V, DirectedEdgeContainer<V, E>>() );
109      }
110    }
111
112  public static DirectedGraph<FlowElement, Scope> directed( ElementGraph elementGraph )
113    {
114    if( elementGraph == null )
115      return null;
116
117    if( elementGraph instanceof DecoratedElementGraph )
118      return directed( ( (DecoratedElementGraph) elementGraph ).getDecorated() );
119
120    return ( (BaseElementGraph) elementGraph ).graph;
121    }
122
123  public static int hashCodeIgnoreAnnotations( ElementGraph elementGraph )
124    {
125    return hashCodeIgnoreAnnotations( directed( elementGraph ) );
126    }
127
128  public static <V, E> int hashCodeIgnoreAnnotations( Graph<V, E> graph )
129    {
130    int hash = graph.vertexSet().hashCode();
131
132    for( E e : graph.edgeSet() )
133      {
134      int part = e.hashCode();
135
136      int source = graph.getEdgeSource( e ).hashCode();
137      int target = graph.getEdgeTarget( e ).hashCode();
138
139      int pairing = pair( source, target );
140
141      part = ( 27 * part ) + pairing;
142
143      long weight = (long) graph.getEdgeWeight( e );
144      part = ( 27 * part ) + (int) ( weight ^ ( weight >>> 32 ) );
145
146      hash += part;
147      }
148
149    return hash;
150    }
151
152  public static boolean equalsIgnoreAnnotations( ElementGraph lhs, ElementGraph rhs )
153    {
154    return equalsIgnoreAnnotations( directed( lhs ), directed( rhs ) );
155    }
156
157  public static <V, E> boolean equalsIgnoreAnnotations( Graph<V, E> lhs, Graph<V, E> rhs )
158    {
159    if( lhs == rhs )
160      return true;
161
162    TypeUtil<Graph<V, E>> typeDecl = null;
163    Graph<V, E> lhsGraph = TypeUtil.uncheckedCast( lhs, typeDecl );
164    Graph<V, E> rhsGraph = TypeUtil.uncheckedCast( rhs, typeDecl );
165
166    if( !lhsGraph.vertexSet().equals( rhsGraph.vertexSet() ) )
167      return false;
168
169    if( lhsGraph.edgeSet().size() != rhsGraph.edgeSet().size() )
170      return false;
171
172    for( E e : lhsGraph.edgeSet() )
173      {
174      V source = lhsGraph.getEdgeSource( e );
175      V target = lhsGraph.getEdgeTarget( e );
176
177      if( !rhsGraph.containsEdge( e ) )
178        return false;
179
180      if( !rhsGraph.getEdgeSource( e ).equals( source ) || !rhsGraph.getEdgeTarget( e ).equals( target ) )
181        return false;
182
183      if( Math.abs( lhsGraph.getEdgeWeight( e ) - rhsGraph.getEdgeWeight( e ) ) > 10e-7 )
184        return false;
185      }
186
187    return true;
188    }
189
190  public static boolean equals( ElementGraph lhs, ElementGraph rhs )
191    {
192    if( !equalsIgnoreAnnotations( lhs, rhs ) )
193      return false;
194
195    if( !( lhs instanceof AnnotatedGraph ) && !( rhs instanceof AnnotatedGraph ) )
196      return true;
197
198    if( !( lhs instanceof AnnotatedGraph ) || !( rhs instanceof AnnotatedGraph ) )
199      return false;
200
201    AnnotatedGraph lhsAnnotated = (AnnotatedGraph) lhs;
202    AnnotatedGraph rhsAnnotated = (AnnotatedGraph) rhs;
203
204    if( lhsAnnotated.hasAnnotations() != rhsAnnotated.hasAnnotations() )
205      return false;
206
207    if( !lhsAnnotated.hasAnnotations() )
208      return true;
209
210    return lhsAnnotated.getAnnotations().equals( rhsAnnotated.getAnnotations() );
211    }
212
213  public static String canonicalHash( ElementGraph graph )
214    {
215    return canonicalHash( directed( graph ) );
216    }
217
218  public static String canonicalHash( Graph<FlowElement, Scope> graph )
219    {
220    int hash = Murmur3.SEED;
221
222    int edges = 0;
223    boolean hasExtents = false;
224
225    for( Scope e : graph.edgeSet() )
226      {
227      FlowElement edgeSource = graph.getEdgeSource( e );
228      FlowElement edgeTarget = graph.getEdgeTarget( e );
229
230      // simpler to ignore extents here
231      if( edgeSource instanceof Extent || edgeTarget instanceof Extent )
232        {
233        hasExtents = true;
234        continue;
235        }
236
237      int source = hash( edgeSource );
238      int target = hash( edgeTarget );
239      int pairing = pair( source, target );
240
241      hash += pairing; // don't make edge traversal order significant
242      edges++;
243      }
244
245    int vertexes = graph.vertexSet().size() - ( hasExtents ? 2 : 0 );
246
247    hash = Murmur3.fmix( hash, vertexes * edges );
248
249    return Util.getHex( Util.intToByteArray( hash ) );
250    }
251
252  private static int hash( FlowElement flowElement )
253    {
254    int lhs = flowElement.getClass().getName().hashCode();
255    int rhs = 0;
256
257    if( flowElement instanceof Operator && ( (Operator) flowElement ).getOperation() != null )
258      rhs = ( (Operator) flowElement ).getOperation().getClass().getName().hashCode();
259    else if( flowElement instanceof Tap && ( (Tap) flowElement ).getScheme() != null )
260      rhs = ( (Tap) flowElement ).getScheme().getClass().getName().hashCode();
261    else if( flowElement instanceof Splice )
262      rhs = ( (Splice) flowElement ).getJoiner().getClass().getName().hashCode() + 31 * ( (Splice) flowElement ).getNumSelfJoins();
263
264    return pair( lhs, rhs );
265    }
266
267  protected static int pair( int lhs, int rhs )
268    {
269    if( rhs == 0 )
270      return lhs;
271
272    // see http://en.wikipedia.org/wiki/Pairing_function
273    return ( ( lhs + rhs ) * ( lhs + rhs + 1 ) / 2 ) + rhs;
274    }
275
276  public static Iterator<FlowElement> getTopologicalIterator( ElementGraph graph )
277    {
278    if( graph == null )
279      return Collections.emptyIterator();
280
281    return new TopologicalOrderIterator<>( directed( graph ) );
282    }
283
284  public static Iterator<FlowElement> getReverseTopologicalIterator( ElementGraph graph )
285    {
286    return new TopologicalOrderIterator<>( new EdgeReversedGraph<>( directed( graph ) ) );
287    }
288
289  public static List<GraphPath<FlowElement, Scope>> getAllShortestPathsBetween( ElementGraph graph, FlowElement from, FlowElement to )
290    {
291    return getAllShortestPathsBetween( directed( graph ), from, to );
292    }
293
294  public static <V, E> List<GraphPath<V, E>> getAllShortestPathsBetween( DirectedGraph<V, E> graph, V from, V to )
295    {
296    List<GraphPath<V, E>> paths = new KShortestPaths<>( graph, from, Integer.MAX_VALUE ).getPaths( to );
297
298    if( paths == null )
299      return new ArrayList<>();
300
301    return paths;
302    }
303
304  public static ElementSubGraph asSubGraph( ElementGraph elementGraph, ElementGraph contractedGraph, Set<FlowElement> excludes )
305    {
306    elementGraph = asExtentMaskedSubGraph( elementGraph ); // returns same instance if not bounded
307
308    Pair<Set<FlowElement>, Set<Scope>> pair = findClosureViaFloydWarshall( directed( elementGraph ), directed( contractedGraph ), excludes );
309    Set<FlowElement> vertices = pair.getLhs();
310    Set<Scope> excludeEdges = pair.getRhs();
311
312    Set<Scope> scopes = new HashSet<>( elementGraph.edgeSet() );
313    scopes.removeAll( excludeEdges );
314
315    return new ElementSubGraph( elementGraph, vertices, scopes );
316    }
317
318  /**
319   * Returns a new ElementGraph (a MaskedSubGraph) of the given ElementGraph that will not contain the {@link Extent}
320   * head or tail instances.
321   * <p/>
322   * If the given ElementGraph does not contain head or tail, it will be returned unchanged.
323   *
324   * @param elementGraph
325   * @return
326   */
327  public static ElementGraph asExtentMaskedSubGraph( ElementGraph elementGraph )
328    {
329    if( elementGraph.containsVertex( Extent.head ) || elementGraph.containsVertex( Extent.tail ) )
330      return new ElementMaskSubGraph( elementGraph, Extent.head, Extent.tail );
331
332    return elementGraph;
333    }
334
335  public static <V, E> Pair<Set<V>, Set<E>> findClosureViaFloydWarshall( DirectedGraph<V, E> full, DirectedGraph<V, E> contracted )
336    {
337    return findClosureViaFloydWarshall( full, contracted, null );
338    }
339
340  public static <V, E> Pair<Set<V>, Set<E>> findClosureViaFloydWarshall( DirectedGraph<V, E> full, DirectedGraph<V, E> contracted, Set<V> excludes )
341    {
342    Set<V> vertices = createIdentitySet( contracted.vertexSet() );
343    LinkedList<V> allVertices = new LinkedList<>( full.vertexSet() );
344
345    allVertices.removeAll( vertices );
346
347    Set<E> excludeEdges = new HashSet<>();
348
349    // prevent distinguished elements from being included inside the sub-graph
350    if( excludes != null )
351      {
352      for( V v : excludes )
353        {
354        if( !full.containsVertex( v ) )
355          continue;
356
357        excludeEdges.addAll( full.incomingEdgesOf( v ) );
358        excludeEdges.addAll( full.outgoingEdgesOf( v ) );
359        }
360      }
361
362    for( V v : contracted.vertexSet() )
363      {
364      if( contracted.inDegreeOf( v ) == 0 )
365        excludeEdges.addAll( full.incomingEdgesOf( v ) );
366      }
367
368    for( V v : contracted.vertexSet() )
369      {
370      if( contracted.outDegreeOf( v ) == 0 )
371        excludeEdges.addAll( full.outgoingEdgesOf( v ) );
372      }
373
374    DirectedGraph<V, E> disconnected = disconnectExtentsAndExclude( full, excludeEdges );
375
376    FloydWarshallShortestPaths<V, E> paths = new FloydWarshallShortestPaths<>( disconnected );
377
378    for( E edge : contracted.edgeSet() )
379      {
380      V edgeSource = contracted.getEdgeSource( edge );
381      V edgeTarget = contracted.getEdgeTarget( edge );
382
383      ListIterator<V> iterator = allVertices.listIterator();
384      while( iterator.hasNext() )
385        {
386        V vertex = iterator.next();
387
388        if( !isBetween( paths, edgeSource, edgeTarget, vertex ) )
389          continue;
390
391        vertices.add( vertex );
392        iterator.remove();
393        }
394      }
395
396    return new Pair<>( vertices, excludeEdges );
397    }
398
399  private static <V, E> DirectedGraph<V, E> disconnectExtentsAndExclude( DirectedGraph<V, E> full, Set<E> withoutEdges )
400    {
401    IdentityDirectedGraph<V, E> copy = (IdentityDirectedGraph<V, E>) new IdentityDirectedGraph<>( Object.class );
402
403    Graphs.addAllVertices( copy, full.vertexSet() );
404
405    copy.removeVertex( (V) Extent.head );
406    copy.removeVertex( (V) Extent.tail );
407
408    Set<E> edges = full.edgeSet();
409
410    if( !withoutEdges.isEmpty() )
411      {
412      edges = new HashSet<>( edges );
413      edges.removeAll( withoutEdges );
414      }
415
416    Graphs.addAllEdges( copy, full, edges );
417
418    return copy;
419    }
420
421  private static <V, E> boolean isBetween( FloydWarshallShortestPaths<V, E> paths, V edgeSource, V edgeTarget, V vertex )
422    {
423    return paths.shortestDistance( edgeSource, vertex ) != POSITIVE_INFINITY && paths.shortestDistance( vertex, edgeTarget ) != POSITIVE_INFINITY;
424    }
425
426  public static void removeAndContract( ElementGraph elementGraph, FlowElement flowElement )
427    {
428    LOG.debug( "removing element, contracting edge for: {}", flowElement );
429
430    Set<Scope> incomingScopes = elementGraph.incomingEdgesOf( flowElement );
431
432    boolean contractIncoming = true;
433
434    if( !contractIncoming )
435      {
436      if( incomingScopes.size() != 1 )
437        throw new IllegalStateException( "flow element:" + flowElement + ", has multiple input paths: " + incomingScopes.size() );
438      }
439
440    boolean isJoin = flowElement instanceof Splice && ( (Splice) flowElement ).isJoin();
441
442    for( Scope incoming : incomingScopes )
443      {
444      Set<Scope> outgoingScopes = elementGraph.outgoingEdgesOf( flowElement );
445
446      // source -> incoming -> flowElement -> outgoing -> target
447      FlowElement source = elementGraph.getEdgeSource( incoming );
448
449      for( Scope outgoing : outgoingScopes )
450        {
451        FlowElement target = elementGraph.getEdgeTarget( outgoing );
452
453        boolean isNonBlocking = outgoing.isNonBlocking();
454
455        if( isJoin )
456          isNonBlocking = isNonBlocking && incoming.isNonBlocking();
457
458        Scope scope = new Scope( outgoing );
459
460        // unsure if necessary since we track blocking independently
461        // when removing a pipe, pull ordinal up to tap
462        // when removing a Splice retain ordinal
463        if( flowElement instanceof Splice )
464          scope.setOrdinal( incoming.getOrdinal() );
465        else
466          scope.setOrdinal( outgoing.getOrdinal() );
467
468        scope.setNonBlocking( isNonBlocking );
469        scope.addPriorNames( incoming, outgoing ); // not copied
470        elementGraph.addEdge( source, target, scope );
471        }
472      }
473
474    elementGraph.removeVertex( flowElement );
475    }
476
477  public static boolean printElementGraph( String filename, final ElementGraph graph, final PlatformInfo platformInfo )
478    {
479    try
480      {
481      File parentFile = new File( filename ).getParentFile();
482
483      if( parentFile != null && !parentFile.exists() )
484        parentFile.mkdirs();
485
486      Writer writer = new FileWriter( filename );
487
488      Util.writeDOT( writer, ElementGraphs.directed( graph ),
489        new IntegerNameProvider<FlowElement>(),
490        new FlowElementVertexNameProvider( graph, platformInfo ),
491        new ScopeEdgeNameProvider(),
492        new VertexAttributeProvider(), new EdgeAttributeProvider() );
493
494      writer.close();
495      return true;
496      }
497    catch( NullPointerException | IOException exception )
498      {
499      LOG.error( "failed printing graph to: {}, with exception: {}", filename, exception );
500      }
501
502    return false;
503    }
504
505  public static boolean printProcessGraph( String filename, final ElementGraph graph, final ProcessGraph<? extends ProcessModel> processGraph )
506    {
507    try
508      {
509      File parentFile = new File( filename ).getParentFile();
510
511      if( parentFile != null && !parentFile.exists() )
512        parentFile.mkdirs();
513
514      Writer writer = new FileWriter( filename );
515
516      DOTProcessGraphWriter graphWriter = new DOTProcessGraphWriter(
517        new IntegerNameProvider<Pair<ElementGraph, FlowElement>>(),
518        new FlowElementVertexNameProvider( graph, null ),
519        new ScopeEdgeNameProvider(),
520        new VertexAttributeProvider(), new EdgeAttributeProvider(),
521        new ProcessGraphNameProvider(), new ProcessGraphLabelProvider()
522      );
523
524      graphWriter.writeGraph( writer, graph, processGraph );
525
526      writer.close();
527      return true;
528      }
529    catch( IOException exception )
530      {
531      LOG.error( "failed printing graph to: {}, with exception: {}", filename, exception );
532      }
533
534    return false;
535    }
536
537  public static void insertFlowElementAfter( ElementGraph elementGraph, FlowElement previousElement, FlowElement flowElement )
538    {
539    Set<Scope> outgoing = new HashSet<>( elementGraph.outgoingEdgesOf( previousElement ) );
540
541    elementGraph.addVertex( flowElement );
542
543    String name = previousElement.toString();
544
545    if( previousElement instanceof Pipe )
546      name = ( (Pipe) previousElement ).getName();
547
548    elementGraph.addEdge( previousElement, flowElement, new Scope( name ) );
549
550    for( Scope scope : outgoing )
551      {
552      FlowElement target = elementGraph.getEdgeTarget( scope );
553      Scope foundScope = elementGraph.removeEdge( previousElement, target ); // remove scope
554
555      if( foundScope != scope )
556        throw new IllegalStateException( "did not remove proper scope" );
557
558      elementGraph.addEdge( flowElement, target, scope ); // add scope back
559      }
560    }
561
562  public static void insertFlowElementBefore( ElementGraph graph, FlowElement nextElement, FlowElement flowElement )
563    {
564    Set<Scope> incoming = new HashSet<>( graph.incomingEdgesOf( nextElement ) );
565
566    graph.addVertex( flowElement );
567
568    String name = nextElement.toString();
569
570    if( nextElement instanceof Pipe )
571      name = ( (Pipe) nextElement ).getName();
572
573    graph.addEdge( flowElement, nextElement, new Scope( name ) );
574
575    for( Scope scope : incoming )
576      {
577      FlowElement target = graph.getEdgeSource( scope );
578      Scope foundScope = graph.removeEdge( target, nextElement ); // remove scope
579
580      if( foundScope != scope )
581        throw new IllegalStateException( "did not remove proper scope" );
582
583      graph.addEdge( target, flowElement, scope ); // add scope back
584      }
585    }
586
587  public static void insertFlowElementBetweenEdge( ElementGraph elementGraph, Scope previousEdge, FlowElement newElement )
588    {
589    FlowElement previousElement = elementGraph.getEdgeSource( previousEdge );
590    FlowElement nextElement = elementGraph.getEdgeTarget( previousEdge );
591
592    elementGraph.addVertex( newElement );
593
594    // add edge between previous and new
595    elementGraph.addEdge( previousElement, newElement, new Scope( previousEdge ) );
596
597    // add edge between new and next
598    Scope scope = new Scope( previousEdge );
599
600    scope.setOrdinal( previousEdge.getOrdinal() );
601
602    if( nextElement instanceof Splice && ( (Splice) nextElement ).isJoin() && previousEdge.getOrdinal() != 0 )
603      scope.setNonBlocking( false );
604
605    elementGraph.addEdge( newElement, nextElement, scope );
606
607    // remove previous edge
608    elementGraph.removeEdge( previousElement, nextElement );
609    }
610
611  public static Set<Tap> findSources( ElementGraph elementGraph )
612    {
613    return findSources( elementGraph, Tap.class );
614    }
615
616  public static <F extends FlowElement> Set<F> findSources( ElementGraph elementGraph, Class<F> type )
617    {
618    if( elementGraph == null )
619      return Collections.emptySet();
620
621    if( elementGraph.containsVertex( Extent.head ) )
622      return narrowIdentitySet( type, elementGraph.successorListOf( Extent.head ) );
623
624    SubGraphIterator iterator = new ExpressionSubGraphIterator(
625      new ExpressionGraph( SearchOrder.Topological, new FlowElementExpression( ElementCapture.Primary, type, TypeExpression.Topo.Head ) ),
626      elementGraph
627    );
628
629    return narrowIdentitySet( type, getAllVertices( iterator ) );
630    }
631
632  public static <F extends FlowElement> Set<F> findSinks( ElementGraph elementGraph, Class<F> type )
633    {
634    if( elementGraph == null )
635      return Collections.emptySet();
636
637    if( elementGraph.containsVertex( Extent.tail ) )
638      return narrowIdentitySet( type, elementGraph.predecessorListOf( Extent.tail ) );
639
640    SubGraphIterator iterator = new ExpressionSubGraphIterator(
641      new ExpressionGraph( SearchOrder.ReverseTopological, new FlowElementExpression( ElementCapture.Primary, type, TypeExpression.Topo.Tail ) ),
642      elementGraph
643    );
644
645    return narrowIdentitySet( type, getAllVertices( iterator ) );
646    }
647
648  public static Set<Tap> findSinks( ElementGraph elementGraph )
649    {
650    return findSinks( elementGraph, Tap.class );
651    }
652
653  public static Set<Group> findAllGroups( ElementGraph elementGraph )
654    {
655    SubGraphIterator iterator = new ExpressionSubGraphIterator(
656      new ExpressionGraph( SearchOrder.Topological, new FlowElementExpression( ElementCapture.Primary, Group.class ) ),
657      elementGraph
658    );
659
660    return narrowIdentitySet( Group.class, getAllVertices( iterator ) );
661    }
662
663  private static Set<FlowElement> getAllVertices( SubGraphIterator iterator )
664    {
665    Set<FlowElement> vertices = createIdentitySet();
666
667    while( iterator.hasNext() )
668      vertices.addAll( iterator.next().vertexSet() );
669
670    return vertices;
671    }
672
673  public static void replaceElementWith( ElementGraph elementGraph, FlowElement replace, FlowElement replaceWith )
674    {
675    Set<Scope> incoming = new HashSet<Scope>( elementGraph.incomingEdgesOf( replace ) );
676    Set<Scope> outgoing = new HashSet<Scope>( elementGraph.outgoingEdgesOf( replace ) );
677
678    if( !elementGraph.containsVertex( replaceWith ) )
679      elementGraph.addVertex( replaceWith );
680
681    for( Scope scope : incoming )
682      {
683      FlowElement source = elementGraph.getEdgeSource( scope );
684      elementGraph.removeEdge( source, replace ); // remove scope
685
686      // drop edge between, if any
687      if( source != replaceWith )
688        elementGraph.addEdge( source, replaceWith, scope ); // add scope back
689      }
690
691    for( Scope scope : outgoing )
692      {
693      FlowElement target = elementGraph.getEdgeTarget( scope );
694      elementGraph.removeEdge( replace, target ); // remove scope
695
696      // drop edge between, if any
697      if( target != replaceWith )
698        elementGraph.addEdge( replaceWith, target, scope ); // add scope back
699      }
700
701    elementGraph.removeVertex( replace );
702    }
703
704  public static Pipe findFirstPipeNamed( ElementGraph elementGraph, String name )
705    {
706    Iterator<FlowElement> iterator = getTopologicalIterator( elementGraph );
707
708    return find( name, iterator );
709    }
710
711  public static Pipe findLastPipeNamed( ElementGraph elementGraph, String name )
712    {
713    Iterator<FlowElement> iterator = getReverseTopologicalIterator( elementGraph );
714
715    return find( name, iterator );
716    }
717
718  private static Pipe find( String name, Iterator<FlowElement> iterator )
719    {
720    while( iterator.hasNext() )
721      {
722      FlowElement flowElement = iterator.next();
723
724      if( flowElement instanceof Pipe && ( (Pipe) flowElement ).getName().equals( name ) )
725        return (Pipe) flowElement;
726      }
727
728    return null;
729    }
730
731  public static boolean removeBranchContaining( ElementGraph elementGraph, FlowElement flowElement )
732    {
733    Set<FlowElement> branch = new LinkedHashSet<>();
734
735    walkUp( branch, elementGraph, flowElement );
736
737    walkDown( branch, elementGraph, flowElement );
738
739    if( branch.isEmpty() )
740      return false;
741
742    for( FlowElement element : branch )
743      elementGraph.removeVertex( element );
744
745    return true;
746    }
747
748  public static boolean removeBranchBetween( ElementGraph elementGraph, FlowElement first, FlowElement second, boolean inclusive )
749    {
750    Set<FlowElement> branch = new LinkedHashSet<>( Arrays.asList( first, second ) );
751
752    walkDown( branch, elementGraph, first );
753
754    if( !inclusive )
755      {
756      branch.remove( first );
757      branch.remove( second );
758      }
759
760    if( branch.isEmpty() )
761      return false;
762
763    for( FlowElement element : branch )
764      elementGraph.removeVertex( element );
765
766    return true;
767    }
768
769  private static void walkDown( Set<FlowElement> branch, ElementGraph elementGraph, FlowElement flowElement )
770    {
771    FlowElement current;
772    current = flowElement;
773
774    while( true )
775      {
776      if( !branch.contains( current ) && ( elementGraph.inDegreeOf( current ) != 1 || elementGraph.outDegreeOf( current ) != 1 ) )
777        break;
778
779      branch.add( current );
780
781      FlowElement element = elementGraph.getEdgeTarget( getFirst( elementGraph.outgoingEdgesOf( current ) ) );
782
783      if( element instanceof Extent || branch.contains( element ) )
784        break;
785
786      current = element;
787      }
788    }
789
790  private static void walkUp( Set<FlowElement> branch, ElementGraph elementGraph, FlowElement flowElement )
791    {
792    FlowElement current = flowElement;
793
794    while( true )
795      {
796      if( elementGraph.inDegreeOf( current ) != 1 || elementGraph.outDegreeOf( current ) != 1 )
797        break;
798
799      branch.add( current );
800
801      FlowElement element = elementGraph.getEdgeSource( getFirst( elementGraph.incomingEdgesOf( current ) ) );
802
803      if( element instanceof Extent || branch.contains( element ) )
804        break;
805
806      current = element;
807      }
808    }
809
810  /**
811   * Returns the number of edges found on the shortest distance between the lhs and rhs.
812   */
813  public static int shortestDistance( ElementGraph graph, FlowElement lhs, FlowElement rhs )
814    {
815    return DijkstraShortestPath.findPathBetween( directed( graph ), lhs, rhs ).size();
816    }
817
818  private static class FlowElementVertexNameProvider implements VertexNameProvider<FlowElement>
819    {
820    private final ElementGraph elementGraph;
821    private final PlatformInfo platformInfo;
822
823    public FlowElementVertexNameProvider( ElementGraph elementGraph, PlatformInfo platformInfo )
824      {
825      this.elementGraph = elementGraph;
826      this.platformInfo = platformInfo;
827      }
828
829    public String getVertexName( FlowElement object )
830      {
831      if( object instanceof Extent ) // is head/tail
832        {
833        String result = object.toString().replaceAll( "\"", "\'" );
834
835        if( object == Extent.tail )
836          return result;
837
838        result = result + "|hash: " + canonicalHash( elementGraph );
839
840        String versionString = Version.getRelease();
841
842        if( platformInfo != null )
843          versionString = ( versionString == null ? "" : versionString + "|" ) + platformInfo;
844
845        return "{" + ( versionString == null ? result : result + "|" + versionString ) + "}";
846        }
847
848      String label;
849
850      Iterator<Scope> iterator = elementGraph.outgoingEdgesOf( object ).iterator();
851
852      if( !( object instanceof Pipe ) || !iterator.hasNext() )
853        {
854        label = object.toString().replaceAll( "\"", "\'" ).replaceAll( "(\\)|\\])(\\[)", "$1|$2" ).replaceAll( "(^[^(\\[]+)(\\(|\\[)", "$1|$2" );
855        }
856      else
857        {
858        Scope scope = iterator.next();
859
860        label = ( (Pipe) object ).print( scope ).replaceAll( "\"", "\'" ).replaceAll( "(\\)|\\])(\\[)", "$1|$2" ).replaceAll( "(^[^(\\[]+)(\\(|\\[)", "$1|$2" );
861        }
862
863      label = label.replaceFirst( "([^|]+)\\|(.*)", "$1 : " + getID( object ) + "|$2" ); // insert id
864
865      label = "{" + label.replaceAll( "\\{", "\\\\{" ).replaceAll( "\\}", "\\\\}" ).replaceAll( ">", "\\\\>" ) + "}";
866
867      if( !( elementGraph instanceof AnnotatedGraph ) || !( (AnnotatedGraph) elementGraph ).hasAnnotations() )
868        return label;
869
870      Set<Enum> annotations = ( (AnnotatedGraph) elementGraph ).getAnnotations().getKeysFor( object );
871
872      if( !annotations.isEmpty() )
873        label += "|{" + Util.join( annotations, "|" ) + "}";
874
875      return label;
876      }
877
878    protected String getID( FlowElement object )
879      {
880      return FlowElements.id( object ).substring( 0, 5 );
881      }
882    }
883
884  private static class ScopeEdgeNameProvider implements EdgeNameProvider<Scope>
885    {
886    public String getEdgeName( Scope object )
887      {
888      return object.toString().replaceAll( "\"", "\'" ).replaceAll( "\n", "\\\\n" ); // fix for newlines in graphviz
889      }
890    }
891
892  private static class VertexAttributeProvider implements ComponentAttributeProvider<FlowElement>
893    {
894    static Map<String, String> defaultNode = new HashMap<String, String>()
895    {
896    {put( "shape", "Mrecord" );}
897    };
898
899    public VertexAttributeProvider()
900      {
901      }
902
903    @Override
904    public Map<String, String> getComponentAttributes( FlowElement object )
905      {
906      return defaultNode;
907      }
908    }
909
910  private static class EdgeAttributeProvider implements ComponentAttributeProvider<Scope>
911    {
912    static Map<String, String> attributes = new HashMap<String, String>()
913    {
914    {put( "style", "dotted" );}
915
916    {put( "arrowhead", "dot" );}
917    };
918
919    @Override
920    public Map<String, String> getComponentAttributes( Scope scope )
921      {
922      if( scope.isNonBlocking() )
923        return null;
924
925      return attributes;
926      }
927    }
928
929  private static class ProcessGraphNameProvider implements VertexNameProvider<ProcessModel>
930    {
931    @Override
932    public String getVertexName( ProcessModel processModel )
933      {
934      return "" + processModel.getOrdinal();
935      }
936    }
937
938  private static class ProcessGraphLabelProvider implements VertexNameProvider<ProcessModel>
939    {
940    @Override
941    public String getVertexName( ProcessModel processModel )
942      {
943      return "ordinal: " + processModel.getOrdinal() +
944        "\\nid: " + processModel.getID() +
945        "\\nhash: " + canonicalHash( processModel.getElementGraph() );
946      }
947    }
948
949  static void injectIdentityMap( AbstractGraph graph )
950    {
951    // this overcomes jgrapht 0.9.0 using a LinkedHashMap vs an IdentityHashMap
952    // vertex not found errors will be thrown if this fails
953    Object specifics = Util.returnInstanceFieldIfExistsSafe( graph, "specifics" );
954
955    if( specifics == null )
956      {
957      LOG.warn( "unable to get jgrapht Specifics for identity map injection, may be using an incompatible jgrapht version" );
958      return;
959      }
960
961    boolean success = Util.setInstanceFieldIfExistsSafe( specifics, "vertexMapDirected", new IdentityHashMap<>() );
962
963    if( !success )
964      LOG.warn( "unable to set IdentityHashMap on jgrapht Specifics, may be using an incompatible jgrapht version" );
965    }
966  }