001/*
002 * Copyright (c) 2016 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
004 *
005 * Project and contact information: http://www.cascading.org/
006 *
007 * This file is part of the Cascading project.
008 *
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *     http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 */
021
022package cascading.flow.planner.graph;
023
024import java.io.File;
025import java.io.FileWriter;
026import java.io.IOException;
027import java.io.Writer;
028import java.util.ArrayList;
029import java.util.Arrays;
030import java.util.Collection;
031import java.util.Collections;
032import java.util.HashMap;
033import java.util.HashSet;
034import java.util.IdentityHashMap;
035import java.util.Iterator;
036import java.util.LinkedHashSet;
037import java.util.LinkedList;
038import java.util.List;
039import java.util.ListIterator;
040import java.util.Map;
041import java.util.Set;
042
043import cascading.flow.FlowElement;
044import cascading.flow.FlowElements;
045import cascading.flow.planner.PlatformInfo;
046import cascading.flow.planner.Scope;
047import cascading.flow.planner.iso.expression.ElementCapture;
048import cascading.flow.planner.iso.expression.ExpressionGraph;
049import cascading.flow.planner.iso.expression.FlowElementExpression;
050import cascading.flow.planner.iso.expression.TypeExpression;
051import cascading.flow.planner.iso.finder.SearchOrder;
052import cascading.flow.planner.iso.subgraph.SubGraphIterator;
053import cascading.flow.planner.iso.subgraph.iterator.ExpressionSubGraphIterator;
054import cascading.flow.planner.process.FlowStepGraph;
055import cascading.flow.planner.process.ProcessGraph;
056import cascading.flow.planner.process.ProcessModel;
057import cascading.pipe.Group;
058import cascading.pipe.Operator;
059import cascading.pipe.Pipe;
060import cascading.pipe.Splice;
061import cascading.tap.Tap;
062import cascading.util.DOTProcessGraphWriter;
063import cascading.util.EnumMultiMap;
064import cascading.util.Murmur3;
065import cascading.util.Pair;
066import cascading.util.Util;
067import cascading.util.Version;
068import cascading.util.jgrapht.ComponentAttributeProvider;
069import cascading.util.jgrapht.EdgeNameProvider;
070import cascading.util.jgrapht.IntegerNameProvider;
071import cascading.util.jgrapht.VertexNameProvider;
072import org.jgrapht.DirectedGraph;
073import org.jgrapht.Graph;
074import org.jgrapht.GraphPath;
075import org.jgrapht.Graphs;
076import org.jgrapht.alg.DijkstraShortestPath;
077import org.jgrapht.alg.FloydWarshallShortestPaths;
078import org.jgrapht.alg.KShortestPaths;
079import org.jgrapht.graph.AbstractGraph;
080import org.jgrapht.graph.DirectedMultigraph;
081import org.jgrapht.graph.EdgeReversedGraph;
082import org.jgrapht.graph.SimpleDirectedGraph;
083import org.jgrapht.traverse.TopologicalOrderIterator;
084import org.jgrapht.util.TypeUtil;
085import org.slf4j.Logger;
086import org.slf4j.LoggerFactory;
087
088import static cascading.util.Util.*;
089import static java.lang.Double.POSITIVE_INFINITY;
090
091/**
092 * Class ElementGraphs maintains a collection of operations that can be performed on an {@link ElementGraph}.
093 */
094public class ElementGraphs
095  {
096  private static final Logger LOG = LoggerFactory.getLogger( ElementGraphs.class );
097
098  // not for instantiation
099  private ElementGraphs()
100    {
101    }
102
103  public static FlowElementGraph asFlowElementGraph( PlatformInfo platformInfo, FlowStepGraph flowStepGraph )
104    {
105    ElementMultiGraph elementDirectedGraph = asElementMultiGraph( flowStepGraph.getElementGraphs() );
106
107    Map<String, Tap> sources = asSourceMap( elementDirectedGraph, findSources( elementDirectedGraph ) );
108    Map<String, Tap> sinks = asSinkMap( elementDirectedGraph, findSinks( elementDirectedGraph ) );
109
110    return new FlowElementGraph( platformInfo, elementDirectedGraph, sources, sinks, null, null );
111    }
112
113  public static ElementDirectedGraph asElementDirectedGraph( Collection<ElementGraph> elementGraphs )
114    {
115    ElementDirectedGraph elementDirectedGraph = new ElementDirectedGraph();
116
117    for( ElementGraph elementGraph : elementGraphs )
118      elementDirectedGraph.copyFrom( asExtentMaskedSubGraph( elementGraph ) );
119
120    return elementDirectedGraph;
121    }
122
123  public static ElementMultiGraph asElementMultiGraph( Collection<ElementGraph> elementGraphs )
124    {
125    ElementMultiGraph elementDirectedGraph = new ElementMultiGraph();
126
127    for( ElementGraph elementGraph : elementGraphs )
128      elementDirectedGraph.copyFrom( asExtentMaskedSubGraph( elementGraph ) );
129
130    return elementDirectedGraph;
131    }
132
133  public static boolean isEmpty( ElementGraph elementGraph )
134    {
135    int size = elementGraph.vertexSet().size();
136
137    if( size == 0 )
138      return true;
139
140    return size == 2 && elementGraph.containsVertex( Extent.head ) && elementGraph.containsVertex( Extent.tail );
141    }
142
143  private static class IdentityDirectedGraph<V, E> extends SimpleDirectedGraph<V, E>
144    {
145    public IdentityDirectedGraph( Class<? extends E> edgeClass )
146      {
147      super( edgeClass );
148      }
149
150    @Override
151    protected DirectedSpecifics createDirectedSpecifics()
152      {
153      return new DirectedSpecifics( new IdentityHashMap<V, DirectedEdgeContainer<V, E>>() );
154      }
155    }
156
157  private static class IdentityMultiGraphGraph<V, E> extends DirectedMultigraph<V, E>
158    {
159    public IdentityMultiGraphGraph( Class<? extends E> edgeClass )
160      {
161      super( edgeClass );
162      }
163
164    @Override
165    protected DirectedSpecifics createDirectedSpecifics()
166      {
167      return new DirectedSpecifics( new IdentityHashMap<V, DirectedEdgeContainer<V, E>>() );
168      }
169    }
170
171  public static DirectedGraph<FlowElement, Scope> directed( ElementGraph elementGraph )
172    {
173    if( elementGraph == null )
174      return null;
175
176    if( elementGraph instanceof DecoratedElementGraph )
177      return directed( ( (DecoratedElementGraph) elementGraph ).getDecorated() );
178
179    return ( (BaseElementGraph) elementGraph ).graph;
180    }
181
182  public static EnumMultiMap<FlowElement> annotations( ElementGraph elementGraph )
183    {
184    if( elementGraph == null )
185      return null;
186
187    if( elementGraph instanceof DecoratedElementGraph )
188      return annotations( ( (DecoratedElementGraph) elementGraph ).getDecorated() );
189
190    if( elementGraph instanceof AnnotatedGraph )
191      return ( (AnnotatedGraph) elementGraph ).getAnnotations();
192
193    return null;
194    }
195
196  public static boolean isMultiGraph( ElementGraph elementGraph )
197    {
198    return directed( elementGraph ) instanceof DirectedMultigraph;
199    }
200
201  public static int hashCodeIgnoreAnnotations( ElementGraph elementGraph )
202    {
203    return hashCodeIgnoreAnnotations( directed( elementGraph ) );
204    }
205
206  public static <V, E> int hashCodeIgnoreAnnotations( Graph<V, E> graph )
207    {
208    int hash = graph.vertexSet().hashCode();
209
210    for( E e : graph.edgeSet() )
211      {
212      int part = e.hashCode();
213
214      int source = graph.getEdgeSource( e ).hashCode();
215      int target = graph.getEdgeTarget( e ).hashCode();
216
217      int pairing = pair( source, target );
218
219      part = ( 27 * part ) + pairing;
220
221      long weight = (long) graph.getEdgeWeight( e );
222      part = ( 27 * part ) + (int) ( weight ^ ( weight >>> 32 ) );
223
224      hash += part;
225      }
226
227    return hash;
228    }
229
230  public static boolean equalsIgnoreAnnotations( ElementGraph lhs, ElementGraph rhs )
231    {
232    return equalsIgnoreAnnotations( directed( lhs ), directed( rhs ) );
233    }
234
235  public static <V, E> boolean equalsIgnoreAnnotations( Graph<V, E> lhs, Graph<V, E> rhs )
236    {
237    if( lhs == rhs )
238      return true;
239
240    TypeUtil<Graph<V, E>> typeDecl = null;
241    Graph<V, E> lhsGraph = TypeUtil.uncheckedCast( lhs, typeDecl );
242    Graph<V, E> rhsGraph = TypeUtil.uncheckedCast( rhs, typeDecl );
243
244    if( !lhsGraph.vertexSet().equals( rhsGraph.vertexSet() ) )
245      return false;
246
247    if( lhsGraph.edgeSet().size() != rhsGraph.edgeSet().size() )
248      return false;
249
250    for( E e : lhsGraph.edgeSet() )
251      {
252      V source = lhsGraph.getEdgeSource( e );
253      V target = lhsGraph.getEdgeTarget( e );
254
255      if( !rhsGraph.containsEdge( e ) )
256        return false;
257
258      if( !rhsGraph.getEdgeSource( e ).equals( source ) || !rhsGraph.getEdgeTarget( e ).equals( target ) )
259        return false;
260
261      if( Math.abs( lhsGraph.getEdgeWeight( e ) - rhsGraph.getEdgeWeight( e ) ) > 10e-7 )
262        return false;
263      }
264
265    return true;
266    }
267
268  public static boolean equals( ElementGraph lhs, ElementGraph rhs )
269    {
270    if( !equalsIgnoreAnnotations( lhs, rhs ) )
271      return false;
272
273    if( !( lhs instanceof AnnotatedGraph ) && !( rhs instanceof AnnotatedGraph ) )
274      return true;
275
276    if( !( lhs instanceof AnnotatedGraph ) || !( rhs instanceof AnnotatedGraph ) )
277      return false;
278
279    AnnotatedGraph lhsAnnotated = (AnnotatedGraph) lhs;
280    AnnotatedGraph rhsAnnotated = (AnnotatedGraph) rhs;
281
282    if( lhsAnnotated.hasAnnotations() != rhsAnnotated.hasAnnotations() )
283      return false;
284
285    if( !lhsAnnotated.hasAnnotations() )
286      return true;
287
288    return lhsAnnotated.getAnnotations().equals( rhsAnnotated.getAnnotations() );
289    }
290
291  public static String canonicalHash( ElementGraph graph )
292    {
293    return canonicalHash( directed( graph ) );
294    }
295
296  public static String canonicalHash( Graph<FlowElement, Scope> graph )
297    {
298    int hash = Murmur3.SEED;
299
300    int edges = 0;
301    boolean hasExtents = false;
302
303    for( Scope e : graph.edgeSet() )
304      {
305      FlowElement edgeSource = graph.getEdgeSource( e );
306      FlowElement edgeTarget = graph.getEdgeTarget( e );
307
308      // simpler to ignore extents here
309      if( edgeSource instanceof Extent || edgeTarget instanceof Extent )
310        {
311        hasExtents = true;
312        continue;
313        }
314
315      int source = hash( edgeSource );
316      int target = hash( edgeTarget );
317      int pairing = pair( source, target );
318
319      hash += pairing; // don't make edge traversal order significant
320      edges++;
321      }
322
323    int vertexes = graph.vertexSet().size() - ( hasExtents ? 2 : 0 );
324
325    hash = Murmur3.fmix( hash, vertexes * edges );
326
327    return Util.getHex( Util.intToByteArray( hash ) );
328    }
329
330  private static int hash( FlowElement flowElement )
331    {
332    int lhs = flowElement.getClass().getName().hashCode();
333    int rhs = 0;
334
335    if( flowElement instanceof Operator && ( (Operator) flowElement ).getOperation() != null )
336      rhs = ( (Operator) flowElement ).getOperation().getClass().getName().hashCode();
337    else if( flowElement instanceof Tap && ( (Tap) flowElement ).getScheme() != null )
338      rhs = ( (Tap) flowElement ).getScheme().getClass().getName().hashCode();
339    else if( flowElement instanceof Splice )
340      rhs = ( (Splice) flowElement ).getJoiner().getClass().getName().hashCode() + 31 * ( (Splice) flowElement ).getNumSelfJoins();
341
342    return pair( lhs, rhs );
343    }
344
345  protected static int pair( int lhs, int rhs )
346    {
347    if( rhs == 0 )
348      return lhs;
349
350    // see http://en.wikipedia.org/wiki/Pairing_function
351    return ( ( lhs + rhs ) * ( lhs + rhs + 1 ) / 2 ) + rhs;
352    }
353
354  public static Iterator<FlowElement> getTopologicalIterator( ElementGraph graph )
355    {
356    if( graph == null )
357      return Collections.emptyIterator();
358
359    return new TopologicalOrderIterator<>( directed( graph ) );
360    }
361
362  public static Iterator<FlowElement> getReverseTopologicalIterator( ElementGraph graph )
363    {
364    return new TopologicalOrderIterator<>( new EdgeReversedGraph<>( directed( graph ) ) );
365    }
366
367  public static Collection<FlowElement> getAllElementsBetweenExclusive( ElementGraph graph, FlowElement from, FlowElement to )
368    {
369    Collection<FlowElement> results = getAllElementsBetweenInclusive( graph, from, to );
370
371    results.remove( from );
372    results.remove( to );
373
374    return results;
375    }
376
377  public static Collection<FlowElement> getAllElementsBetweenInclusive( ElementGraph graph, FlowElement from, FlowElement to )
378    {
379    Set<FlowElement> results = Util.createIdentitySet();
380
381    List<GraphPath<FlowElement, Scope>> pathsBetween = getAllShortestPathsBetween( graph, from, to );
382
383    for( GraphPath<FlowElement, Scope> graphPath : pathsBetween )
384      results.addAll( Graphs.getPathVertexList( graphPath ) );
385
386    return results;
387    }
388
389  public static List<GraphPath<FlowElement, Scope>> getAllShortestPathsBetween( ElementGraph graph, FlowElement from, FlowElement to )
390    {
391    return getAllShortestPathsBetween( directed( graph ), from, to );
392    }
393
394  public static <V, E> List<GraphPath<V, E>> getAllShortestPathsBetween( DirectedGraph<V, E> graph, V from, V to )
395    {
396    List<GraphPath<V, E>> paths = new KShortestPaths<>( graph, from, Integer.MAX_VALUE ).getPaths( to );
397
398    if( paths == null )
399      return new ArrayList<>();
400
401    return paths;
402    }
403
404  public static ElementSubGraph asSubGraph( ElementGraph elementGraph, ElementGraph contractedGraph, Set<FlowElement> excludes )
405    {
406    elementGraph = asExtentMaskedSubGraph( elementGraph ); // returns same instance if not bounded
407
408    Pair<Set<FlowElement>, Set<Scope>> pair = findClosureViaFloydWarshall( directed( elementGraph ), directed( contractedGraph ), excludes );
409    Set<FlowElement> vertices = pair.getLhs();
410    Set<Scope> excludeEdges = pair.getRhs();
411
412    Set<Scope> scopes = new HashSet<>( elementGraph.edgeSet() );
413    scopes.removeAll( excludeEdges );
414
415    return new ElementSubGraph( elementGraph, vertices, scopes );
416    }
417
418  /**
419   * Returns a new ElementGraph (a MaskedSubGraph) of the given ElementGraph that will not contain the {@link Extent}
420   * head or tail instances.
421   * <p/>
422   * If the given ElementGraph does not contain head or tail, it will be returned unchanged.
423   *
424   * @param elementGraph
425   * @return
426   */
427  public static ElementGraph asExtentMaskedSubGraph( ElementGraph elementGraph )
428    {
429    if( elementGraph.containsVertex( Extent.head ) || elementGraph.containsVertex( Extent.tail ) )
430      return new ElementMaskSubGraph( elementGraph, Extent.head, Extent.tail );
431
432    return elementGraph;
433    }
434
435  public static <V, E> Pair<Set<V>, Set<E>> findClosureViaFloydWarshall( DirectedGraph<V, E> full, DirectedGraph<V, E> contracted )
436    {
437    return findClosureViaFloydWarshall( full, contracted, null );
438    }
439
440  public static <V, E> Pair<Set<V>, Set<E>> findClosureViaFloydWarshall( DirectedGraph<V, E> full, DirectedGraph<V, E> contracted, Set<V> excludes )
441    {
442    Set<V> vertices = createIdentitySet( contracted.vertexSet() );
443    LinkedList<V> allVertices = new LinkedList<>( full.vertexSet() );
444
445    allVertices.removeAll( vertices );
446
447    Set<E> excludeEdges = new HashSet<>();
448
449    // prevent distinguished elements from being included inside the sub-graph
450    if( excludes != null )
451      {
452      for( V v : excludes )
453        {
454        if( !full.containsVertex( v ) )
455          continue;
456
457        excludeEdges.addAll( full.incomingEdgesOf( v ) );
458        excludeEdges.addAll( full.outgoingEdgesOf( v ) );
459        }
460      }
461
462    for( V v : contracted.vertexSet() )
463      {
464      if( contracted.inDegreeOf( v ) == 0 )
465        excludeEdges.addAll( full.incomingEdgesOf( v ) );
466
467      if( contracted.outDegreeOf( v ) == 0 )
468        excludeEdges.addAll( full.outgoingEdgesOf( v ) );
469      }
470
471    // exclude edged between nodes in full that are not in the contracted graph
472    for( V outer : contracted.vertexSet() )
473      {
474      for( V inner : contracted.vertexSet() )
475        {
476        if( outer == inner )
477          continue;
478
479        if( contracted.getAllEdges( inner, outer ).isEmpty() )
480          excludeEdges.addAll( full.getAllEdges( inner, outer ) );
481        }
482      }
483
484    DirectedGraph<V, E> disconnected = disconnectExtentsAndExclude( full, excludeEdges );
485
486    FloydWarshallShortestPaths<V, E> paths = new FloydWarshallShortestPaths<>( disconnected );
487
488    for( E edge : contracted.edgeSet() )
489      {
490      V edgeSource = contracted.getEdgeSource( edge );
491      V edgeTarget = contracted.getEdgeTarget( edge );
492
493      ListIterator<V> iterator = allVertices.listIterator();
494      while( iterator.hasNext() )
495        {
496        V vertex = iterator.next();
497
498        if( !isBetween( paths, edgeSource, edgeTarget, vertex ) )
499          continue;
500
501        vertices.add( vertex );
502        iterator.remove();
503        }
504      }
505
506    return new Pair<>( vertices, excludeEdges );
507    }
508
509  private static <V, E> DirectedGraph<V, E> disconnectExtentsAndExclude( DirectedGraph<V, E> full, Set<E> withoutEdges )
510    {
511    IdentityMultiGraphGraph<V, E> copy = (IdentityMultiGraphGraph<V, E>) new IdentityMultiGraphGraph<>( Object.class );
512
513    Graphs.addAllVertices( copy, full.vertexSet() );
514
515    copy.removeVertex( (V) Extent.head );
516    copy.removeVertex( (V) Extent.tail );
517
518    Set<E> edges = full.edgeSet();
519
520    if( !withoutEdges.isEmpty() )
521      {
522      edges = new HashSet<>( edges );
523      edges.removeAll( withoutEdges );
524      }
525
526    Graphs.addAllEdges( copy, full, edges );
527
528    return copy;
529    }
530
531  private static <V, E> boolean isBetween( FloydWarshallShortestPaths<V, E> paths, V edgeSource, V edgeTarget, V vertex )
532    {
533    return paths.shortestDistance( edgeSource, vertex ) != POSITIVE_INFINITY && paths.shortestDistance( vertex, edgeTarget ) != POSITIVE_INFINITY;
534    }
535
536  public static void removeAndContract( ElementGraph elementGraph, FlowElement flowElement )
537    {
538    LOG.debug( "removing element, contracting edge for: {}", flowElement );
539
540    Set<Scope> incomingScopes = elementGraph.incomingEdgesOf( flowElement );
541
542    boolean contractIncoming = true;
543
544    if( !contractIncoming )
545      {
546      if( incomingScopes.size() != 1 )
547        throw new IllegalStateException( "flow element:" + flowElement + ", has multiple input paths: " + incomingScopes.size() );
548      }
549
550    boolean isJoin = flowElement instanceof Splice && ( (Splice) flowElement ).isJoin();
551
552    for( Scope incoming : incomingScopes )
553      {
554      Set<Scope> outgoingScopes = elementGraph.outgoingEdgesOf( flowElement );
555
556      // source -> incoming -> flowElement -> outgoing -> target
557      FlowElement source = elementGraph.getEdgeSource( incoming );
558
559      for( Scope outgoing : outgoingScopes )
560        {
561        FlowElement target = elementGraph.getEdgeTarget( outgoing );
562
563        boolean isNonBlocking = outgoing.isNonBlocking();
564
565        if( isJoin )
566          isNonBlocking = isNonBlocking && incoming.isNonBlocking();
567
568        Scope scope = new Scope( outgoing );
569
570        // unsure if necessary since we track blocking independently
571        // when removing a pipe, pull ordinal up to tap
572        // when removing a Splice retain ordinal
573        if( flowElement instanceof Splice )
574          scope.setOrdinal( incoming.getOrdinal() );
575        else
576          scope.setOrdinal( outgoing.getOrdinal() );
577
578        scope.setNonBlocking( isNonBlocking );
579        scope.addPriorNames( incoming, outgoing ); // not copied
580
581        boolean success = elementGraph.addEdge( source, target, scope );
582
583        if( !success )
584          throw new IllegalStateException( "during graph contraction, unable to add new edge between: " + source + " and " + target );
585        }
586      }
587
588    elementGraph.removeVertex( flowElement );
589    }
590
591  public static boolean printElementGraph( String filename, final ElementGraph graph, final PlatformInfo platformInfo )
592    {
593    try
594      {
595      File parentFile = new File( filename ).getParentFile();
596
597      if( parentFile != null && !parentFile.exists() )
598        parentFile.mkdirs();
599
600      Writer writer = new FileWriter( filename );
601
602      Util.writeDOT( writer, ElementGraphs.directed( graph ),
603        new IntegerNameProvider<FlowElement>(),
604        new FlowElementVertexNameProvider( graph, platformInfo ),
605        new ScopeEdgeNameProvider(),
606        new VertexAttributeProvider(), new EdgeAttributeProvider() );
607
608      writer.close();
609      return true;
610      }
611    catch( NullPointerException | IOException exception )
612      {
613      LOG.error( "failed printing graph to: {}, with exception: {}", filename, exception );
614      }
615
616    return false;
617    }
618
619  public static boolean printProcessGraph( String filename, final ElementGraph graph, final ProcessGraph<? extends ProcessModel> processGraph )
620    {
621    try
622      {
623      File parentFile = new File( filename ).getParentFile();
624
625      if( parentFile != null && !parentFile.exists() )
626        parentFile.mkdirs();
627
628      Writer writer = new FileWriter( filename );
629
630      DOTProcessGraphWriter graphWriter = new DOTProcessGraphWriter(
631        new IntegerNameProvider<Pair<ElementGraph, FlowElement>>(),
632        new FlowElementVertexNameProvider( graph, null ),
633        new ScopeEdgeNameProvider(),
634        new VertexAttributeProvider(), new EdgeAttributeProvider(),
635        new ProcessGraphNameProvider(), new ProcessGraphLabelProvider()
636      );
637
638      graphWriter.writeGraph( writer, graph, processGraph );
639
640      writer.close();
641      return true;
642      }
643    catch( IOException exception )
644      {
645      LOG.error( "failed printing graph to: {}, with exception: {}", filename, exception );
646      }
647
648    return false;
649    }
650
651  public static void insertFlowElementAfter( ElementGraph elementGraph, FlowElement previousElement, FlowElement flowElement )
652    {
653    Set<Scope> outgoing = new HashSet<>( elementGraph.outgoingEdgesOf( previousElement ) );
654
655    elementGraph.addVertex( flowElement );
656
657    String name = previousElement.toString();
658
659    if( previousElement instanceof Pipe )
660      name = ( (Pipe) previousElement ).getName();
661
662    elementGraph.addEdge( previousElement, flowElement, new Scope( name ) );
663
664    for( Scope scope : outgoing )
665      {
666      FlowElement target = elementGraph.getEdgeTarget( scope );
667      Scope foundScope = elementGraph.removeEdge( previousElement, target ); // remove scope
668
669      if( foundScope != scope )
670        throw new IllegalStateException( "did not remove proper scope" );
671
672      elementGraph.addEdge( flowElement, target, scope ); // add scope back
673      }
674    }
675
676  public static void insertFlowElementBefore( ElementGraph graph, FlowElement nextElement, FlowElement flowElement )
677    {
678    Set<Scope> incoming = new HashSet<>( graph.incomingEdgesOf( nextElement ) );
679
680    graph.addVertex( flowElement );
681
682    String name = nextElement.toString();
683
684    if( nextElement instanceof Pipe )
685      name = ( (Pipe) nextElement ).getName();
686
687    graph.addEdge( flowElement, nextElement, new Scope( name ) );
688
689    for( Scope scope : incoming )
690      {
691      FlowElement target = graph.getEdgeSource( scope );
692      Scope foundScope = graph.removeEdge( target, nextElement ); // remove scope
693
694      if( foundScope != scope )
695        throw new IllegalStateException( "did not remove proper scope" );
696
697      graph.addEdge( target, flowElement, scope ); // add scope back
698      }
699    }
700
701  public static void insertFlowElementBetweenEdge( ElementGraph elementGraph, Scope previousEdge, FlowElement newElement )
702    {
703    FlowElement previousElement = elementGraph.getEdgeSource( previousEdge );
704    FlowElement nextElement = elementGraph.getEdgeTarget( previousEdge );
705
706    elementGraph.addVertex( newElement );
707
708    // add edge between previous and new
709    elementGraph.addEdge( previousElement, newElement, new Scope( previousEdge ) );
710
711    // add edge between new and next
712    Scope scope = new Scope( previousEdge );
713
714    scope.setOrdinal( previousEdge.getOrdinal() );
715
716    if( nextElement instanceof Splice && ( (Splice) nextElement ).isJoin() && previousEdge.getOrdinal() != 0 )
717      scope.setNonBlocking( false );
718
719    elementGraph.addEdge( newElement, nextElement, scope );
720
721    // remove previous edge
722    elementGraph.removeEdge( previousElement, nextElement );
723    }
724
725  public static <F extends FlowElement> Map<String, F> asSourceMap( ElementGraph elementGraph, Set<F> elements )
726    {
727    Map<String, F> map = new HashMap<>();
728
729    for( F element : elements )
730      {
731      for( Scope scope : elementGraph.outgoingEdgesOf( element ) )
732        map.put( scope.getName(), element );
733      }
734
735    return map;
736    }
737
738  public static Set<Tap> findSources( ElementGraph elementGraph )
739    {
740    return findSources( elementGraph, Tap.class );
741    }
742
743  public static <F extends FlowElement> Set<F> findSources( ElementGraph elementGraph, Class<F> type )
744    {
745    if( elementGraph == null )
746      return Collections.emptySet();
747
748    if( elementGraph.containsVertex( Extent.head ) )
749      return narrowIdentitySet( type, elementGraph.successorListOf( Extent.head ) );
750
751    SubGraphIterator iterator = new ExpressionSubGraphIterator(
752      new ExpressionGraph( SearchOrder.Topological, new FlowElementExpression( ElementCapture.Primary, type, TypeExpression.Topo.Head ) ),
753      elementGraph
754    );
755
756    return narrowIdentitySet( type, getAllVertices( iterator ) );
757    }
758
759  public static <F extends FlowElement> Map<String, F> asSinkMap( ElementGraph elementGraph, Set<F> elements )
760    {
761    Map<String, F> map = new HashMap<>();
762
763    for( F element : elements )
764      {
765      for( Scope scope : elementGraph.incomingEdgesOf( element ) )
766        map.put( scope.getName(), element );
767      }
768
769    return map;
770    }
771
772  public static <F extends FlowElement> Set<F> findSinks( ElementGraph elementGraph, Class<F> type )
773    {
774    if( elementGraph == null )
775      return Collections.emptySet();
776
777    if( elementGraph.containsVertex( Extent.tail ) )
778      return narrowIdentitySet( type, elementGraph.predecessorListOf( Extent.tail ) );
779
780    SubGraphIterator iterator = new ExpressionSubGraphIterator(
781      new ExpressionGraph( SearchOrder.ReverseTopological, new FlowElementExpression( ElementCapture.Primary, type, TypeExpression.Topo.Tail ) ),
782      elementGraph
783    );
784
785    return narrowIdentitySet( type, getAllVertices( iterator ) );
786    }
787
788  public static Set<Tap> findSinks( ElementGraph elementGraph )
789    {
790    return findSinks( elementGraph, Tap.class );
791    }
792
793  public static Set<Group> findAllGroups( ElementGraph elementGraph )
794    {
795    SubGraphIterator iterator = new ExpressionSubGraphIterator(
796      new ExpressionGraph( SearchOrder.Topological, new FlowElementExpression( ElementCapture.Primary, Group.class ) ),
797      elementGraph
798    );
799
800    return narrowIdentitySet( Group.class, getAllVertices( iterator ) );
801    }
802
803  private static Set<FlowElement> getAllVertices( SubGraphIterator iterator )
804    {
805    Set<FlowElement> vertices = createIdentitySet();
806
807    while( iterator.hasNext() )
808      vertices.addAll( iterator.next().vertexSet() );
809
810    return vertices;
811    }
812
813  public static Set<Scope> getAllMultiEdgesBetween( Collection<Scope> edgeList, ElementGraph current )
814    {
815    Set<Scope> allEdgesBetween = new HashSet<>();
816
817    for( Scope scope : edgeList )
818      {
819      FlowElement edgeSource = current.getEdgeSource( scope );
820      FlowElement edgeTarget = current.getEdgeTarget( scope );
821
822      allEdgesBetween.addAll( current.getAllEdges( edgeSource, edgeTarget ) );
823      }
824
825    return allEdgesBetween;
826    }
827
828  public static void replaceElementWith( ElementGraph elementGraph, FlowElement replace, FlowElement replaceWith )
829    {
830    Set<Scope> incoming = new HashSet<Scope>( elementGraph.incomingEdgesOf( replace ) );
831    Set<Scope> outgoing = new HashSet<Scope>( elementGraph.outgoingEdgesOf( replace ) );
832
833    if( !elementGraph.containsVertex( replaceWith ) )
834      elementGraph.addVertex( replaceWith );
835
836    for( Scope scope : incoming )
837      {
838      FlowElement source = elementGraph.getEdgeSource( scope );
839      elementGraph.removeEdge( source, replace ); // remove scope
840
841      // drop edge between, if any
842      if( source != replaceWith )
843        elementGraph.addEdge( source, replaceWith, scope ); // add scope back
844      }
845
846    for( Scope scope : outgoing )
847      {
848      FlowElement target = elementGraph.getEdgeTarget( scope );
849      elementGraph.removeEdge( replace, target ); // remove scope
850
851      // drop edge between, if any
852      if( target != replaceWith )
853        elementGraph.addEdge( replaceWith, target, scope ); // add scope back
854      }
855
856    elementGraph.removeVertex( replace );
857    }
858
859  public static Pipe findFirstPipeNamed( ElementGraph elementGraph, String name )
860    {
861    Iterator<FlowElement> iterator = getTopologicalIterator( elementGraph );
862
863    return find( name, iterator );
864    }
865
866  public static Pipe findLastPipeNamed( ElementGraph elementGraph, String name )
867    {
868    Iterator<FlowElement> iterator = getReverseTopologicalIterator( elementGraph );
869
870    return find( name, iterator );
871    }
872
873  private static Pipe find( String name, Iterator<FlowElement> iterator )
874    {
875    while( iterator.hasNext() )
876      {
877      FlowElement flowElement = iterator.next();
878
879      if( flowElement instanceof Pipe && ( (Pipe) flowElement ).getName().equals( name ) )
880        return (Pipe) flowElement;
881      }
882
883    return null;
884    }
885
886  public static boolean removeBranchContaining( ElementGraph elementGraph, FlowElement flowElement )
887    {
888    Set<FlowElement> branch = new LinkedHashSet<>();
889
890    walkUp( branch, elementGraph, flowElement );
891
892    walkDown( branch, elementGraph, flowElement );
893
894    if( branch.isEmpty() )
895      return false;
896
897    for( FlowElement element : branch )
898      elementGraph.removeVertex( element );
899
900    return true;
901    }
902
903  public static boolean removeBranchBetween( ElementGraph elementGraph, FlowElement first, FlowElement second, boolean inclusive )
904    {
905    Set<FlowElement> branch = new LinkedHashSet<>( Arrays.asList( first, second ) );
906
907    walkDown( branch, elementGraph, first );
908
909    if( !inclusive )
910      {
911      branch.remove( first );
912      branch.remove( second );
913      }
914
915    if( branch.isEmpty() )
916      return false;
917
918    for( FlowElement element : branch )
919      elementGraph.removeVertex( element );
920
921    return true;
922    }
923
924  private static void walkDown( Set<FlowElement> branch, ElementGraph elementGraph, FlowElement flowElement )
925    {
926    FlowElement current;
927    current = flowElement;
928
929    while( true )
930      {
931      if( !branch.contains( current ) && ( elementGraph.inDegreeOf( current ) != 1 || elementGraph.outDegreeOf( current ) != 1 ) )
932        break;
933
934      branch.add( current );
935
936      FlowElement element = elementGraph.getEdgeTarget( getFirst( elementGraph.outgoingEdgesOf( current ) ) );
937
938      if( element instanceof Extent || branch.contains( element ) )
939        break;
940
941      current = element;
942      }
943    }
944
945  private static void walkUp( Set<FlowElement> branch, ElementGraph elementGraph, FlowElement flowElement )
946    {
947    FlowElement current = flowElement;
948
949    while( true )
950      {
951      if( elementGraph.inDegreeOf( current ) != 1 || elementGraph.outDegreeOf( current ) != 1 )
952        break;
953
954      branch.add( current );
955
956      FlowElement element = elementGraph.getEdgeSource( getFirst( elementGraph.incomingEdgesOf( current ) ) );
957
958      if( element instanceof Extent || branch.contains( element ) )
959        break;
960
961      current = element;
962      }
963    }
964
965  /**
966   * Returns the number of edges found on the shortest distance between the lhs and rhs.
967   */
968  public static int shortestDistance( ElementGraph graph, FlowElement lhs, FlowElement rhs )
969    {
970    return DijkstraShortestPath.findPathBetween( directed( graph ), lhs, rhs ).size();
971    }
972
973  private static class FlowElementVertexNameProvider implements VertexNameProvider<FlowElement>
974    {
975    private final ElementGraph elementGraph;
976    private final PlatformInfo platformInfo;
977
978    public FlowElementVertexNameProvider( ElementGraph elementGraph, PlatformInfo platformInfo )
979      {
980      this.elementGraph = elementGraph;
981      this.platformInfo = platformInfo;
982      }
983
984    public String getVertexName( FlowElement object )
985      {
986      if( object instanceof Extent ) // is head/tail
987        {
988        String result = object.toString().replaceAll( "\"", "\'" );
989
990        if( object == Extent.tail )
991          return result;
992
993        result = result + "|hash: " + canonicalHash( elementGraph );
994
995        String versionString = Version.getRelease();
996
997        if( platformInfo != null )
998          versionString = ( versionString == null ? "" : versionString + "|" ) + platformInfo;
999
1000        return "{" + ( versionString == null ? result : result + "|" + versionString ) + "}";
1001        }
1002
1003      String label;
1004
1005      Iterator<Scope> iterator = elementGraph.outgoingEdgesOf( object ).iterator();
1006
1007      if( !( object instanceof Pipe ) || !iterator.hasNext() )
1008        {
1009        label = object.toString().replaceAll( "\"", "\'" ).replaceAll( "(\\)|\\])(\\[)", "$1|$2" ).replaceAll( "(^[^(\\[]+)(\\(|\\[)", "$1|$2" );
1010        }
1011      else
1012        {
1013        Scope scope = iterator.next();
1014
1015        label = ( (Pipe) object ).print( scope ).replaceAll( "\"", "\'" ).replaceAll( "(\\)|\\])(\\[)", "$1|$2" ).replaceAll( "(^[^(\\[]+)(\\(|\\[)", "$1|$2" );
1016        }
1017
1018      label = label.replaceFirst( "([^|]+)\\|(.*)", "$1 : " + getID( object ) + "|$2" ); // insert id
1019
1020      label = "{" + label.replaceAll( "\\{", "\\\\{" ).replaceAll( "\\}", "\\\\}" ).replaceAll( "<", "\\\\<" ).replaceAll( ">", "\\\\>" ) + "}";
1021
1022      if( !( elementGraph instanceof AnnotatedGraph ) || !( (AnnotatedGraph) elementGraph ).hasAnnotations() )
1023        return label;
1024
1025      Set<Enum> annotations = ( (AnnotatedGraph) elementGraph ).getAnnotations().getKeysFor( object );
1026
1027      if( !annotations.isEmpty() )
1028        label += "|{" + Util.join( annotations, "|" ) + "}";
1029
1030      return label;
1031      }
1032
1033    protected String getID( FlowElement object )
1034      {
1035      return FlowElements.id( object ).substring( 0, 5 );
1036      }
1037    }
1038
1039  private static class ScopeEdgeNameProvider implements EdgeNameProvider<Scope>
1040    {
1041    public String getEdgeName( Scope object )
1042      {
1043      return object.toString().replaceAll( "\"", "\'" ).replaceAll( "\n", "\\\\n" ); // fix for newlines in graphviz
1044      }
1045    }
1046
1047  private static class VertexAttributeProvider implements ComponentAttributeProvider<FlowElement>
1048    {
1049    static Map<String, String> defaultNode = new HashMap<String, String>()
1050      {
1051      {put( "shape", "Mrecord" );}
1052      };
1053
1054    public VertexAttributeProvider()
1055      {
1056      }
1057
1058    @Override
1059    public Map<String, String> getComponentAttributes( FlowElement object )
1060      {
1061      return defaultNode;
1062      }
1063    }
1064
1065  private static class EdgeAttributeProvider implements ComponentAttributeProvider<Scope>
1066    {
1067    static Map<String, String> attributes = new HashMap<String, String>()
1068      {
1069      {put( "style", "dotted" );}
1070
1071      {put( "arrowhead", "dot" );}
1072      };
1073
1074    @Override
1075    public Map<String, String> getComponentAttributes( Scope scope )
1076      {
1077      if( scope.isNonBlocking() )
1078        return null;
1079
1080      return attributes;
1081      }
1082    }
1083
1084  private static class ProcessGraphNameProvider implements VertexNameProvider<ProcessModel>
1085    {
1086    @Override
1087    public String getVertexName( ProcessModel processModel )
1088      {
1089      return "" + processModel.getOrdinal();
1090      }
1091    }
1092
1093  private static class ProcessGraphLabelProvider implements VertexNameProvider<ProcessModel>
1094    {
1095    @Override
1096    public String getVertexName( ProcessModel processModel )
1097      {
1098      return "ordinal: " + processModel.getOrdinal() +
1099        "\\nid: " + processModel.getID() +
1100        "\\nhash: " + canonicalHash( processModel.getElementGraph() );
1101      }
1102    }
1103
1104  static void injectIdentityMap( AbstractGraph graph )
1105    {
1106    // this overcomes jgrapht 0.9.0 using a LinkedHashMap vs an IdentityHashMap
1107    // vertex not found errors will be thrown if this fails
1108    Object specifics = Util.returnInstanceFieldIfExistsSafe( graph, "specifics" );
1109
1110    if( specifics == null )
1111      {
1112      LOG.warn( "unable to get jgrapht Specifics for identity map injection, may be using an incompatible jgrapht version" );
1113      return;
1114      }
1115
1116    boolean success = Util.setInstanceFieldIfExistsSafe( specifics, "vertexMapDirected", new IdentityHashMap<>() );
1117
1118    if( !success )
1119      LOG.warn( "unable to set IdentityHashMap on jgrapht Specifics, may be using an incompatible jgrapht version" );
1120    }
1121  }