001/*
002 * Copyright (c) 2016-2017 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
004 *
005 * Project and contact information: http://www.cascading.org/
006 *
007 * This file is part of the Cascading project.
008 *
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *     http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 */
021
022package cascading.flow.planner.graph;
023
024import java.io.File;
025import java.io.FileWriter;
026import java.io.IOException;
027import java.io.Writer;
028import java.util.ArrayList;
029import java.util.Arrays;
030import java.util.Collection;
031import java.util.Collections;
032import java.util.HashMap;
033import java.util.HashSet;
034import java.util.IdentityHashMap;
035import java.util.Iterator;
036import java.util.LinkedHashSet;
037import java.util.LinkedList;
038import java.util.List;
039import java.util.ListIterator;
040import java.util.Map;
041import java.util.Set;
042
043import cascading.flow.FlowElement;
044import cascading.flow.FlowElements;
045import cascading.flow.planner.PlatformInfo;
046import cascading.flow.planner.Scope;
047import cascading.flow.planner.iso.expression.ElementCapture;
048import cascading.flow.planner.iso.expression.ExpressionGraph;
049import cascading.flow.planner.iso.expression.FlowElementExpression;
050import cascading.flow.planner.iso.expression.TypeExpression;
051import cascading.flow.planner.iso.finder.SearchOrder;
052import cascading.flow.planner.iso.subgraph.SubGraphIterator;
053import cascading.flow.planner.iso.subgraph.iterator.ExpressionSubGraphIterator;
054import cascading.flow.planner.process.FlowStepGraph;
055import cascading.flow.planner.process.ProcessGraph;
056import cascading.flow.planner.process.ProcessModel;
057import cascading.pipe.Group;
058import cascading.pipe.Operator;
059import cascading.pipe.Pipe;
060import cascading.pipe.Splice;
061import cascading.tap.Tap;
062import cascading.util.DOTProcessGraphWriter;
063import cascading.util.EnumMultiMap;
064import cascading.util.Murmur3;
065import cascading.util.Pair;
066import cascading.util.Util;
067import cascading.util.Version;
068import cascading.util.jgrapht.ComponentAttributeProvider;
069import cascading.util.jgrapht.EdgeNameProvider;
070import cascading.util.jgrapht.IntegerNameProvider;
071import cascading.util.jgrapht.VertexNameProvider;
072import org.jgrapht.DirectedGraph;
073import org.jgrapht.Graph;
074import org.jgrapht.GraphPath;
075import org.jgrapht.Graphs;
076import org.jgrapht.alg.shortestpath.DijkstraShortestPath;
077import org.jgrapht.alg.shortestpath.FloydWarshallShortestPaths;
078import org.jgrapht.alg.shortestpath.KShortestPaths;
079import org.jgrapht.graph.AbstractGraph;
080import org.jgrapht.graph.DirectedMultigraph;
081import org.jgrapht.graph.EdgeReversedGraph;
082import org.jgrapht.graph.SimpleDirectedGraph;
083import org.jgrapht.graph.specifics.DirectedEdgeContainer;
084import org.jgrapht.graph.specifics.DirectedSpecifics;
085import org.jgrapht.traverse.TopologicalOrderIterator;
086import org.jgrapht.util.TypeUtil;
087import org.slf4j.Logger;
088import org.slf4j.LoggerFactory;
089
090import static cascading.util.Util.*;
091
092/**
093 * Class ElementGraphs maintains a collection of operations that can be performed on an {@link ElementGraph}.
094 */
095public class ElementGraphs
096  {
097  private static final Logger LOG = LoggerFactory.getLogger( ElementGraphs.class );
098
099  // not for instantiation
100  private ElementGraphs()
101    {
102    }
103
104  public static FlowElementGraph asFlowElementGraph( PlatformInfo platformInfo, FlowStepGraph flowStepGraph )
105    {
106    ElementMultiGraph elementDirectedGraph = asElementMultiGraph( flowStepGraph.getElementGraphs() );
107
108    Map<String, Tap> sources = asSourceMap( elementDirectedGraph, findSources( elementDirectedGraph ) );
109    Map<String, Tap> sinks = asSinkMap( elementDirectedGraph, findSinks( elementDirectedGraph ) );
110
111    return new FlowElementGraph( platformInfo, elementDirectedGraph, sources, sinks, null, null );
112    }
113
114  public static ElementDirectedGraph asElementDirectedGraph( Collection<ElementGraph> elementGraphs )
115    {
116    ElementDirectedGraph elementDirectedGraph = new ElementDirectedGraph();
117
118    for( ElementGraph elementGraph : elementGraphs )
119      elementDirectedGraph.copyFrom( asExtentMaskedSubGraph( elementGraph ) );
120
121    return elementDirectedGraph;
122    }
123
124  public static ElementMultiGraph asElementMultiGraph( Collection<ElementGraph> elementGraphs )
125    {
126    ElementMultiGraph elementDirectedGraph = new ElementMultiGraph();
127
128    for( ElementGraph elementGraph : elementGraphs )
129      elementDirectedGraph.copyFrom( asExtentMaskedSubGraph( elementGraph ) );
130
131    return elementDirectedGraph;
132    }
133
134  public static boolean isEmpty( ElementGraph elementGraph )
135    {
136    int size = elementGraph.vertexSet().size();
137
138    if( size == 0 )
139      return true;
140
141    return size == 2 && elementGraph.containsVertex( Extent.head ) && elementGraph.containsVertex( Extent.tail );
142    }
143
144  private static class IdentityDirectedGraph<V, E> extends SimpleDirectedGraph<V, E>
145    {
146    public IdentityDirectedGraph( Class<? extends E> edgeClass )
147      {
148      super( edgeClass );
149      }
150
151    @Override
152    protected DirectedSpecifics createSpecifics( boolean directed )
153      {
154      return new DirectedSpecifics( this, new IdentityHashMap<V, DirectedEdgeContainer<V, E>>() );
155      }
156    }
157
158  private static class IdentityMultiGraphGraph<V, E> extends DirectedMultigraph<V, E>
159    {
160    public IdentityMultiGraphGraph( Class<? extends E> edgeClass )
161      {
162      super( edgeClass );
163      }
164
165    @Override
166    protected DirectedSpecifics createSpecifics( boolean directed )
167      {
168      return new DirectedSpecifics( this, new IdentityHashMap<V, DirectedEdgeContainer<V, E>>() );
169      }
170    }
171
172  public static Graph<FlowElement, Scope> directed( ElementGraph elementGraph )
173    {
174    if( elementGraph == null )
175      return null;
176
177    if( elementGraph instanceof DecoratedElementGraph )
178      return directed( ( (DecoratedElementGraph) elementGraph ).getDecorated() );
179
180    return ( (BaseElementGraph) elementGraph ).graph;
181    }
182
183  public static EnumMultiMap<FlowElement> annotations( ElementGraph elementGraph )
184    {
185    if( elementGraph == null )
186      return null;
187
188    if( elementGraph instanceof DecoratedElementGraph )
189      return annotations( ( (DecoratedElementGraph) elementGraph ).getDecorated() );
190
191    if( elementGraph instanceof AnnotatedGraph )
192      return ( (AnnotatedGraph) elementGraph ).getAnnotations();
193
194    return null;
195    }
196
197  public static boolean isMultiGraph( ElementGraph elementGraph )
198    {
199    return directed( elementGraph ) instanceof DirectedMultigraph;
200    }
201
202  public static int hashCodeIgnoreAnnotations( ElementGraph elementGraph )
203    {
204    return hashCodeIgnoreAnnotations( directed( elementGraph ) );
205    }
206
207  public static <V, E> int hashCodeIgnoreAnnotations( Graph<V, E> graph )
208    {
209    int hash = graph.vertexSet().hashCode();
210
211    for( E e : graph.edgeSet() )
212      {
213      int part = e.hashCode();
214
215      int source = graph.getEdgeSource( e ).hashCode();
216      int target = graph.getEdgeTarget( e ).hashCode();
217
218      int pairing = pair( source, target );
219
220      part = ( 27 * part ) + pairing;
221
222      long weight = (long) graph.getEdgeWeight( e );
223      part = ( 27 * part ) + (int) ( weight ^ ( weight >>> 32 ) );
224
225      hash += part;
226      }
227
228    return hash;
229    }
230
231  public static boolean equalsIgnoreAnnotations( ElementGraph lhs, ElementGraph rhs )
232    {
233    return equalsIgnoreAnnotations( directed( lhs ), directed( rhs ) );
234    }
235
236  public static <V, E> boolean equalsIgnoreAnnotations( Graph<V, E> lhs, Graph<V, E> rhs )
237    {
238    if( lhs == rhs )
239      return true;
240
241    TypeUtil<Graph<V, E>> typeDecl = null;
242    Graph<V, E> lhsGraph = TypeUtil.uncheckedCast( lhs, typeDecl );
243    Graph<V, E> rhsGraph = TypeUtil.uncheckedCast( rhs, typeDecl );
244
245    if( !lhsGraph.vertexSet().equals( rhsGraph.vertexSet() ) )
246      return false;
247
248    if( lhsGraph.edgeSet().size() != rhsGraph.edgeSet().size() )
249      return false;
250
251    for( E e : lhsGraph.edgeSet() )
252      {
253      V source = lhsGraph.getEdgeSource( e );
254      V target = lhsGraph.getEdgeTarget( e );
255
256      if( !rhsGraph.containsEdge( e ) )
257        return false;
258
259      if( !rhsGraph.getEdgeSource( e ).equals( source ) || !rhsGraph.getEdgeTarget( e ).equals( target ) )
260        return false;
261
262      if( Math.abs( lhsGraph.getEdgeWeight( e ) - rhsGraph.getEdgeWeight( e ) ) > 10e-7 )
263        return false;
264      }
265
266    return true;
267    }
268
269  public static boolean equals( ElementGraph lhs, ElementGraph rhs )
270    {
271    if( !equalsIgnoreAnnotations( lhs, rhs ) )
272      return false;
273
274    if( !( lhs instanceof AnnotatedGraph ) && !( rhs instanceof AnnotatedGraph ) )
275      return true;
276
277    if( !( lhs instanceof AnnotatedGraph ) || !( rhs instanceof AnnotatedGraph ) )
278      return false;
279
280    AnnotatedGraph lhsAnnotated = (AnnotatedGraph) lhs;
281    AnnotatedGraph rhsAnnotated = (AnnotatedGraph) rhs;
282
283    if( lhsAnnotated.hasAnnotations() != rhsAnnotated.hasAnnotations() )
284      return false;
285
286    if( !lhsAnnotated.hasAnnotations() )
287      return true;
288
289    return lhsAnnotated.getAnnotations().equals( rhsAnnotated.getAnnotations() );
290    }
291
292  public static String canonicalHash( ElementGraph graph )
293    {
294    return canonicalHash( directed( graph ) );
295    }
296
297  public static String canonicalHash( Graph<FlowElement, Scope> graph )
298    {
299    int hash = Murmur3.SEED;
300
301    int edges = 0;
302    boolean hasExtents = false;
303
304    for( Scope e : graph.edgeSet() )
305      {
306      FlowElement edgeSource = graph.getEdgeSource( e );
307      FlowElement edgeTarget = graph.getEdgeTarget( e );
308
309      // simpler to ignore extents here
310      if( edgeSource instanceof Extent || edgeTarget instanceof Extent )
311        {
312        hasExtents = true;
313        continue;
314        }
315
316      int source = hash( edgeSource );
317      int target = hash( edgeTarget );
318      int pairing = pair( source, target );
319
320      hash += pairing; // don't make edge traversal order significant
321      edges++;
322      }
323
324    int vertexes = graph.vertexSet().size() - ( hasExtents ? 2 : 0 );
325
326    hash = Murmur3.fmix( hash, vertexes * edges );
327
328    return Util.getHex( Util.intToByteArray( hash ) );
329    }
330
331  private static int hash( FlowElement flowElement )
332    {
333    int lhs = flowElement.getClass().getName().hashCode();
334    int rhs = 0;
335
336    if( flowElement instanceof Operator && ( (Operator) flowElement ).getOperation() != null )
337      rhs = ( (Operator) flowElement ).getOperation().getClass().getName().hashCode();
338    else if( flowElement instanceof Tap && ( (Tap) flowElement ).getScheme() != null )
339      rhs = ( (Tap) flowElement ).getScheme().getClass().getName().hashCode();
340    else if( flowElement instanceof Splice )
341      rhs = ( (Splice) flowElement ).getJoiner().getClass().getName().hashCode() + 31 * ( (Splice) flowElement ).getNumSelfJoins();
342
343    return pair( lhs, rhs );
344    }
345
346  protected static int pair( int lhs, int rhs )
347    {
348    if( rhs == 0 )
349      return lhs;
350
351    // see http://en.wikipedia.org/wiki/Pairing_function
352    return ( ( lhs + rhs ) * ( lhs + rhs + 1 ) / 2 ) + rhs;
353    }
354
355  public static Iterator<FlowElement> getTopologicalIterator( ElementGraph graph )
356    {
357    if( graph == null )
358      return Collections.emptyIterator();
359
360    return new TopologicalOrderIterator<>( directed( graph ) );
361    }
362
363  public static Iterator<FlowElement> getReverseTopologicalIterator( ElementGraph graph )
364    {
365    return new TopologicalOrderIterator<>( new EdgeReversedGraph<>( directed( graph ) ) );
366    }
367
368  public static Collection<FlowElement> getAllElementsBetweenExclusive( ElementGraph graph, FlowElement from, FlowElement to )
369    {
370    Collection<FlowElement> results = getAllElementsBetweenInclusive( graph, from, to );
371
372    results.remove( from );
373    results.remove( to );
374
375    return results;
376    }
377
378  public static Collection<FlowElement> getAllElementsBetweenInclusive( ElementGraph graph, FlowElement from, FlowElement to )
379    {
380    Set<FlowElement> results = Util.createIdentitySet();
381
382    List<GraphPath<FlowElement, Scope>> pathsBetween = getAllShortestPathsBetween( graph, from, to );
383
384    for( GraphPath<FlowElement, Scope> graphPath : pathsBetween )
385      results.addAll( graphPath.getVertexList() );
386
387    return results;
388    }
389
390  public static List<GraphPath<FlowElement, Scope>> getAllShortestPathsBetween( ElementGraph graph, FlowElement from, FlowElement to )
391    {
392    return getAllShortestPathsBetween( directed( graph ), from, to );
393    }
394
395  public static <V, E> List<GraphPath<V, E>> getAllShortestPathsBetween( Graph<V, E> graph, V from, V to )
396    {
397    List<GraphPath<V, E>> paths = new KShortestPaths<>( graph, Integer.MAX_VALUE ).getPaths( from, to );
398
399    if( paths == null )
400      return new ArrayList<>();
401
402    return paths;
403    }
404
405  public static ElementSubGraph asSubGraph( ElementGraph elementGraph, ElementGraph contractedGraph, Set<FlowElement> excludes )
406    {
407    elementGraph = asExtentMaskedSubGraph( elementGraph ); // returns same instance if not bounded
408
409    Pair<Set<FlowElement>, Set<Scope>> pair = findClosureViaFloydWarshall( directed( elementGraph ), directed( contractedGraph ), excludes );
410    Set<FlowElement> vertices = pair.getLhs();
411    Set<Scope> excludeEdges = pair.getRhs();
412
413    Set<Scope> scopes = new HashSet<>( elementGraph.edgeSet() );
414    scopes.removeAll( excludeEdges );
415
416    return new ElementSubGraph( elementGraph, vertices, scopes );
417    }
418
419  /**
420   * Returns a new ElementGraph (a MaskedSubGraph) of the given ElementGraph that will not contain the {@link Extent}
421   * head or tail instances.
422   * <p>
423   * If the given ElementGraph does not contain head or tail, it will be returned unchanged.
424   *
425   * @param elementGraph
426   * @return
427   */
428  public static ElementGraph asExtentMaskedSubGraph( ElementGraph elementGraph )
429    {
430    if( elementGraph.containsVertex( Extent.head ) || elementGraph.containsVertex( Extent.tail ) )
431      return new ElementMaskSubGraph( elementGraph, Extent.head, Extent.tail );
432
433    return elementGraph;
434    }
435
436  public static <V, E> Pair<Set<V>, Set<E>> findClosureViaFloydWarshall( DirectedGraph<V, E> full, DirectedGraph<V, E> contracted )
437    {
438    return findClosureViaFloydWarshall( full, contracted, null );
439    }
440
441  public static <V, E> Pair<Set<V>, Set<E>> findClosureViaFloydWarshall( Graph<V, E> full, Graph<V, E> contracted, Set<V> excludes )
442    {
443    Set<V> vertices = createIdentitySet( contracted.vertexSet() );
444    LinkedList<V> allVertices = new LinkedList<>( full.vertexSet() );
445
446    allVertices.removeAll( vertices );
447
448    Set<E> excludeEdges = new HashSet<>();
449
450    // prevent distinguished elements from being included inside the sub-graph
451    if( excludes != null )
452      {
453      for( V v : excludes )
454        {
455        if( !full.containsVertex( v ) )
456          continue;
457
458        excludeEdges.addAll( full.incomingEdgesOf( v ) );
459        excludeEdges.addAll( full.outgoingEdgesOf( v ) );
460        }
461      }
462
463    for( V v : contracted.vertexSet() )
464      {
465      if( contracted.inDegreeOf( v ) == 0 )
466        excludeEdges.addAll( full.incomingEdgesOf( v ) );
467
468      if( contracted.outDegreeOf( v ) == 0 )
469        excludeEdges.addAll( full.outgoingEdgesOf( v ) );
470      }
471
472    // exclude edged between nodes in full that are not in the contracted graph
473    for( V outer : contracted.vertexSet() )
474      {
475      for( V inner : contracted.vertexSet() )
476        {
477        if( outer == inner )
478          continue;
479
480        if( contracted.getAllEdges( inner, outer ).isEmpty() )
481          excludeEdges.addAll( full.getAllEdges( inner, outer ) );
482        }
483      }
484
485    Graph<V, E> disconnected = disconnectExtentsAndExclude( full, excludeEdges );
486
487    FloydWarshallShortestPaths<V, E> paths = new FloydWarshallShortestPaths<>( disconnected );
488
489    for( E edge : contracted.edgeSet() )
490      {
491      V edgeSource = contracted.getEdgeSource( edge );
492      V edgeTarget = contracted.getEdgeTarget( edge );
493
494      ListIterator<V> iterator = allVertices.listIterator();
495      while( iterator.hasNext() )
496        {
497        V vertex = iterator.next();
498
499        if( !isBetween( paths, edgeSource, edgeTarget, vertex ) )
500          continue;
501
502        vertices.add( vertex );
503        iterator.remove();
504        }
505      }
506
507    return new Pair<>( vertices, excludeEdges );
508    }
509
510  private static <V, E> Graph<V, E> disconnectExtentsAndExclude( Graph<V, E> full, Set<E> withoutEdges )
511    {
512    IdentityMultiGraphGraph<V, E> copy = (IdentityMultiGraphGraph<V, E>) new IdentityMultiGraphGraph<>( Object.class );
513
514    Graphs.addAllVertices( copy, full.vertexSet() );
515
516    copy.removeVertex( (V) Extent.head );
517    copy.removeVertex( (V) Extent.tail );
518
519    Set<E> edges = full.edgeSet();
520
521    if( !withoutEdges.isEmpty() )
522      {
523      edges = new HashSet<>( edges );
524      edges.removeAll( withoutEdges );
525      }
526
527    Graphs.addAllEdges( copy, full, edges );
528
529    return copy;
530    }
531
532  private static <V, E> boolean isBetween( FloydWarshallShortestPaths<V, E> paths, V edgeSource, V edgeTarget, V vertex )
533    {
534    return paths.getFirstHop( edgeSource, vertex ) != null && paths.getFirstHop( vertex, edgeTarget ) != null;
535    }
536
537  public static void removeAndContract( ElementGraph elementGraph, FlowElement flowElement )
538    {
539    LOG.debug( "removing element, contracting edge for: {}", flowElement );
540
541    Set<Scope> incomingScopes = elementGraph.incomingEdgesOf( flowElement );
542
543    boolean contractIncoming = true;
544
545    if( !contractIncoming )
546      {
547      if( incomingScopes.size() != 1 )
548        throw new IllegalStateException( "flow element:" + flowElement + ", has multiple input paths: " + incomingScopes.size() );
549      }
550
551    boolean isJoin = flowElement instanceof Splice && ( (Splice) flowElement ).isJoin();
552
553    for( Scope incoming : incomingScopes )
554      {
555      Set<Scope> outgoingScopes = elementGraph.outgoingEdgesOf( flowElement );
556
557      // source -> incoming -> flowElement -> outgoing -> target
558      FlowElement source = elementGraph.getEdgeSource( incoming );
559
560      for( Scope outgoing : outgoingScopes )
561        {
562        FlowElement target = elementGraph.getEdgeTarget( outgoing );
563
564        boolean isNonBlocking = outgoing.isNonBlocking();
565
566        if( isJoin )
567          isNonBlocking = isNonBlocking && incoming.isNonBlocking();
568
569        Scope scope = new Scope( outgoing );
570
571        // unsure if necessary since we track blocking independently
572        // when removing a pipe, pull ordinal up to tap
573        // when removing a Splice retain ordinal
574        if( flowElement instanceof Splice )
575          scope.setOrdinal( incoming.getOrdinal() );
576        else
577          scope.setOrdinal( outgoing.getOrdinal() );
578
579        scope.setNonBlocking( isNonBlocking );
580        scope.addPriorNames( incoming, outgoing ); // not copied
581
582        boolean success = elementGraph.addEdge( source, target, scope );
583
584        if( !success )
585          throw new IllegalStateException( "during graph contraction, unable to add new edge between: " + source + " and " + target );
586        }
587      }
588
589    elementGraph.removeVertex( flowElement );
590    }
591
592  public static boolean printElementGraph( String filename, final ElementGraph graph, final PlatformInfo platformInfo )
593    {
594    try
595      {
596      File parentFile = new File( filename ).getParentFile();
597
598      if( parentFile != null && !parentFile.exists() )
599        parentFile.mkdirs();
600
601      Writer writer = new FileWriter( filename );
602
603      Util.writeDOT( writer, ElementGraphs.directed( graph ),
604        new IntegerNameProvider<FlowElement>(),
605        new FlowElementVertexNameProvider( graph, platformInfo ),
606        new ScopeEdgeNameProvider(),
607        new VertexAttributeProvider(), new EdgeAttributeProvider() );
608
609      writer.close();
610      return true;
611      }
612    catch( NullPointerException | IOException exception )
613      {
614      LOG.error( "failed printing graph to: {}, with exception: {}", filename, exception );
615      }
616
617    return false;
618    }
619
620  public static boolean printProcessGraph( String filename, final ElementGraph graph, final ProcessGraph<? extends ProcessModel> processGraph )
621    {
622    try
623      {
624      File parentFile = new File( filename ).getParentFile();
625
626      if( parentFile != null && !parentFile.exists() )
627        parentFile.mkdirs();
628
629      Writer writer = new FileWriter( filename );
630
631      DOTProcessGraphWriter graphWriter = new DOTProcessGraphWriter(
632        new IntegerNameProvider<>(),
633        new FlowElementVertexNameProvider( graph, null ),
634        new ScopeEdgeNameProvider(),
635        new VertexAttributeProvider(), new EdgeAttributeProvider(),
636        new ProcessGraphNameProvider(), new ProcessGraphLabelProvider()
637      );
638
639      graphWriter.writeGraph( writer, graph, processGraph );
640
641      writer.close();
642      return true;
643      }
644    catch( IOException exception )
645      {
646      LOG.error( "failed printing graph to: {}, with exception: {}", filename, exception );
647      }
648
649    return false;
650    }
651
652  public static void insertFlowElementAfter( ElementGraph elementGraph, FlowElement previousElement, FlowElement flowElement )
653    {
654    Set<Scope> outgoing = new HashSet<>( elementGraph.outgoingEdgesOf( previousElement ) );
655
656    elementGraph.addVertex( flowElement );
657
658    String name = previousElement.toString();
659
660    if( previousElement instanceof Pipe )
661      name = ( (Pipe) previousElement ).getName();
662
663    elementGraph.addEdge( previousElement, flowElement, new Scope( name ) );
664
665    for( Scope scope : outgoing )
666      {
667      FlowElement target = elementGraph.getEdgeTarget( scope );
668      Scope foundScope = elementGraph.removeEdge( previousElement, target ); // remove scope
669
670      if( foundScope != scope )
671        throw new IllegalStateException( "did not remove proper scope" );
672
673      elementGraph.addEdge( flowElement, target, scope ); // add scope back
674      }
675    }
676
677  public static void insertFlowElementBefore( ElementGraph graph, FlowElement nextElement, FlowElement flowElement )
678    {
679    Set<Scope> incoming = new HashSet<>( graph.incomingEdgesOf( nextElement ) );
680
681    graph.addVertex( flowElement );
682
683    String name = nextElement.toString();
684
685    if( nextElement instanceof Pipe )
686      name = ( (Pipe) nextElement ).getName();
687
688    graph.addEdge( flowElement, nextElement, new Scope( name ) );
689
690    for( Scope scope : incoming )
691      {
692      FlowElement target = graph.getEdgeSource( scope );
693      Scope foundScope = graph.removeEdge( target, nextElement ); // remove scope
694
695      if( foundScope != scope )
696        throw new IllegalStateException( "did not remove proper scope" );
697
698      graph.addEdge( target, flowElement, scope ); // add scope back
699      }
700    }
701
702  public static void insertFlowElementBetweenEdge( ElementGraph elementGraph, Scope previousEdge, FlowElement newElement )
703    {
704    FlowElement previousElement = elementGraph.getEdgeSource( previousEdge );
705    FlowElement nextElement = elementGraph.getEdgeTarget( previousEdge );
706
707    elementGraph.addVertex( newElement );
708
709    // add edge between previous and new
710    elementGraph.addEdge( previousElement, newElement, new Scope( previousEdge ) );
711
712    // add edge between new and next
713    Scope scope = new Scope( previousEdge );
714
715    scope.setOrdinal( previousEdge.getOrdinal() );
716
717    if( nextElement instanceof Splice && ( (Splice) nextElement ).isJoin() && previousEdge.getOrdinal() != 0 )
718      scope.setNonBlocking( false );
719
720    elementGraph.addEdge( newElement, nextElement, scope );
721
722    // remove previous edge
723    elementGraph.removeEdge( previousElement, nextElement );
724    }
725
726  public static <F extends FlowElement> Map<String, F> asSourceMap( ElementGraph elementGraph, Set<F> elements )
727    {
728    Map<String, F> map = new HashMap<>();
729
730    for( F element : elements )
731      {
732      for( Scope scope : elementGraph.outgoingEdgesOf( element ) )
733        map.put( scope.getName(), element );
734      }
735
736    return map;
737    }
738
739  public static Set<Tap> findSources( ElementGraph elementGraph )
740    {
741    return findSources( elementGraph, Tap.class );
742    }
743
744  public static <F extends FlowElement> Set<F> findSources( ElementGraph elementGraph, Class<F> type )
745    {
746    if( elementGraph == null )
747      return Collections.emptySet();
748
749    if( elementGraph.containsVertex( Extent.head ) )
750      return narrowIdentitySet( type, elementGraph.successorListOf( Extent.head ) );
751
752    SubGraphIterator iterator = new ExpressionSubGraphIterator(
753      new ExpressionGraph( SearchOrder.Topological, new FlowElementExpression( ElementCapture.Primary, type, TypeExpression.Topo.Head ) ),
754      elementGraph
755    );
756
757    return narrowIdentitySet( type, getAllVertices( iterator ) );
758    }
759
760  public static <F extends FlowElement> Map<String, F> asSinkMap( ElementGraph elementGraph, Set<F> elements )
761    {
762    Map<String, F> map = new HashMap<>();
763
764    for( F element : elements )
765      {
766      for( Scope scope : elementGraph.incomingEdgesOf( element ) )
767        map.put( scope.getName(), element );
768      }
769
770    return map;
771    }
772
773  public static <F extends FlowElement> Set<F> findSinks( ElementGraph elementGraph, Class<F> type )
774    {
775    if( elementGraph == null )
776      return Collections.emptySet();
777
778    if( elementGraph.containsVertex( Extent.tail ) )
779      return narrowIdentitySet( type, elementGraph.predecessorListOf( Extent.tail ) );
780
781    SubGraphIterator iterator = new ExpressionSubGraphIterator(
782      new ExpressionGraph( SearchOrder.ReverseTopological, new FlowElementExpression( ElementCapture.Primary, type, TypeExpression.Topo.Tail ) ),
783      elementGraph
784    );
785
786    return narrowIdentitySet( type, getAllVertices( iterator ) );
787    }
788
789  public static Set<Tap> findSinks( ElementGraph elementGraph )
790    {
791    return findSinks( elementGraph, Tap.class );
792    }
793
794  public static Set<Group> findAllGroups( ElementGraph elementGraph )
795    {
796    SubGraphIterator iterator = new ExpressionSubGraphIterator(
797      new ExpressionGraph( SearchOrder.Topological, new FlowElementExpression( ElementCapture.Primary, Group.class ) ),
798      elementGraph
799    );
800
801    return narrowIdentitySet( Group.class, getAllVertices( iterator ) );
802    }
803
804  private static Set<FlowElement> getAllVertices( SubGraphIterator iterator )
805    {
806    Set<FlowElement> vertices = createIdentitySet();
807
808    while( iterator.hasNext() )
809      vertices.addAll( iterator.next().vertexSet() );
810
811    return vertices;
812    }
813
814  public static Set<Scope> getAllMultiEdgesBetween( Collection<Scope> edgeList, ElementGraph current )
815    {
816    Set<Scope> allEdgesBetween = new HashSet<>();
817
818    for( Scope scope : edgeList )
819      {
820      FlowElement edgeSource = current.getEdgeSource( scope );
821      FlowElement edgeTarget = current.getEdgeTarget( scope );
822
823      allEdgesBetween.addAll( current.getAllEdges( edgeSource, edgeTarget ) );
824      }
825
826    return allEdgesBetween;
827    }
828
829  public static void replaceElementWith( ElementGraph elementGraph, FlowElement replace, FlowElement replaceWith )
830    {
831    Set<Scope> incoming = new HashSet<Scope>( elementGraph.incomingEdgesOf( replace ) );
832    Set<Scope> outgoing = new HashSet<Scope>( elementGraph.outgoingEdgesOf( replace ) );
833
834    if( !elementGraph.containsVertex( replaceWith ) )
835      elementGraph.addVertex( replaceWith );
836
837    for( Scope scope : incoming )
838      {
839      FlowElement source = elementGraph.getEdgeSource( scope );
840      elementGraph.removeEdge( source, replace ); // remove scope
841
842      // drop edge between, if any
843      if( source != replaceWith )
844        elementGraph.addEdge( source, replaceWith, scope ); // add scope back
845      }
846
847    for( Scope scope : outgoing )
848      {
849      FlowElement target = elementGraph.getEdgeTarget( scope );
850      elementGraph.removeEdge( replace, target ); // remove scope
851
852      // drop edge between, if any
853      if( target != replaceWith )
854        elementGraph.addEdge( replaceWith, target, scope ); // add scope back
855      }
856
857    elementGraph.removeVertex( replace );
858    }
859
860  public static Pipe findFirstPipeNamed( ElementGraph elementGraph, String name )
861    {
862    Iterator<FlowElement> iterator = getTopologicalIterator( elementGraph );
863
864    return find( name, iterator );
865    }
866
867  public static Pipe findLastPipeNamed( ElementGraph elementGraph, String name )
868    {
869    Iterator<FlowElement> iterator = getReverseTopologicalIterator( elementGraph );
870
871    return find( name, iterator );
872    }
873
874  private static Pipe find( String name, Iterator<FlowElement> iterator )
875    {
876    while( iterator.hasNext() )
877      {
878      FlowElement flowElement = iterator.next();
879
880      if( flowElement instanceof Pipe && ( (Pipe) flowElement ).getName().equals( name ) )
881        return (Pipe) flowElement;
882      }
883
884    return null;
885    }
886
887  public static boolean removeBranchContaining( ElementGraph elementGraph, FlowElement flowElement )
888    {
889    Set<FlowElement> branch = new LinkedHashSet<>();
890
891    walkUp( branch, elementGraph, flowElement );
892
893    walkDown( branch, elementGraph, flowElement );
894
895    if( branch.isEmpty() )
896      return false;
897
898    for( FlowElement element : branch )
899      elementGraph.removeVertex( element );
900
901    return true;
902    }
903
904  public static boolean removeBranchBetween( ElementGraph elementGraph, FlowElement first, FlowElement second, boolean inclusive )
905    {
906    Set<FlowElement> branch = new LinkedHashSet<>( Arrays.asList( first, second ) );
907
908    walkDown( branch, elementGraph, first );
909
910    if( !inclusive )
911      {
912      branch.remove( first );
913      branch.remove( second );
914      }
915
916    if( branch.isEmpty() )
917      return false;
918
919    for( FlowElement element : branch )
920      elementGraph.removeVertex( element );
921
922    return true;
923    }
924
925  private static void walkDown( Set<FlowElement> branch, ElementGraph elementGraph, FlowElement flowElement )
926    {
927    FlowElement current;
928    current = flowElement;
929
930    while( true )
931      {
932      if( !branch.contains( current ) && ( elementGraph.inDegreeOf( current ) != 1 || elementGraph.outDegreeOf( current ) != 1 ) )
933        break;
934
935      branch.add( current );
936
937      FlowElement element = elementGraph.getEdgeTarget( getFirst( elementGraph.outgoingEdgesOf( current ) ) );
938
939      if( element instanceof Extent || branch.contains( element ) )
940        break;
941
942      current = element;
943      }
944    }
945
946  private static void walkUp( Set<FlowElement> branch, ElementGraph elementGraph, FlowElement flowElement )
947    {
948    FlowElement current = flowElement;
949
950    while( true )
951      {
952      if( elementGraph.inDegreeOf( current ) != 1 || elementGraph.outDegreeOf( current ) != 1 )
953        break;
954
955      branch.add( current );
956
957      FlowElement element = elementGraph.getEdgeSource( getFirst( elementGraph.incomingEdgesOf( current ) ) );
958
959      if( element instanceof Extent || branch.contains( element ) )
960        break;
961
962      current = element;
963      }
964    }
965
966  /**
967   * Returns the number of edges found on the shortest distance between the lhs and rhs.
968   */
969  public static int shortestDistance( ElementGraph graph, FlowElement lhs, FlowElement rhs )
970    {
971    return DijkstraShortestPath.findPathBetween( directed( graph ), lhs, rhs ).getLength();
972    }
973
974  private static class FlowElementVertexNameProvider implements VertexNameProvider<FlowElement>
975    {
976    private final ElementGraph elementGraph;
977    private final PlatformInfo platformInfo;
978
979    public FlowElementVertexNameProvider( ElementGraph elementGraph, PlatformInfo platformInfo )
980      {
981      this.elementGraph = elementGraph;
982      this.platformInfo = platformInfo;
983      }
984
985    public String getVertexName( FlowElement object )
986      {
987      if( object instanceof Extent ) // is head/tail
988        {
989        String result = object.toString().replaceAll( "\"", "\'" );
990
991        if( object == Extent.tail )
992          return result;
993
994        result = result + "|hash: " + canonicalHash( elementGraph );
995
996        String versionString = Version.getRelease();
997
998        if( platformInfo != null )
999          versionString = ( versionString == null ? "" : versionString + "|" ) + platformInfo;
1000
1001        return "{" + ( versionString == null ? result : result + "|" + versionString ) + "}";
1002        }
1003
1004      String label;
1005
1006      Iterator<Scope> iterator = elementGraph.outgoingEdgesOf( object ).iterator();
1007
1008      if( !( object instanceof Pipe ) || !iterator.hasNext() )
1009        {
1010        label = object.toString().replaceAll( "\"", "\'" ).replaceAll( "(\\)|\\])(\\[)", "$1|$2" ).replaceAll( "(^[^(\\[]+)(\\(|\\[)", "$1|$2" );
1011        }
1012      else
1013        {
1014        Scope scope = iterator.next();
1015
1016        label = ( (Pipe) object ).print( scope ).replaceAll( "\"", "\'" ).replaceAll( "(\\)|\\])(\\[)", "$1|$2" ).replaceAll( "(^[^(\\[]+)(\\(|\\[)", "$1|$2" );
1017        }
1018
1019      label = label.replaceFirst( "([^|]+)\\|(.*)", "$1 : " + getID( object ) + "|$2" ); // insert id
1020
1021      label = "{" + label.replaceAll( "\\{", "\\\\{" ).replaceAll( "\\}", "\\\\}" ).replaceAll( "<", "\\\\<" ).replaceAll( ">", "\\\\>" ) + "}";
1022
1023      if( !( elementGraph instanceof AnnotatedGraph ) || !( (AnnotatedGraph) elementGraph ).hasAnnotations() )
1024        return label;
1025
1026      Set<Enum> annotations = ( (AnnotatedGraph) elementGraph ).getAnnotations().getKeysFor( object );
1027
1028      if( !annotations.isEmpty() )
1029        label += "|{" + Util.join( annotations, "|" ) + "}";
1030
1031      return label;
1032      }
1033
1034    protected String getID( FlowElement object )
1035      {
1036      return FlowElements.id( object ).substring( 0, 5 );
1037      }
1038    }
1039
1040  private static class ScopeEdgeNameProvider implements EdgeNameProvider<Scope>
1041    {
1042    public String getEdgeName( Scope object )
1043      {
1044      return object.toString().replaceAll( "\"", "\'" ).replaceAll( "\n", "\\\\n" ); // fix for newlines in graphviz
1045      }
1046    }
1047
1048  private static class VertexAttributeProvider implements ComponentAttributeProvider<FlowElement>
1049    {
1050    static Map<String, String> defaultNode = new HashMap<String, String>()
1051      {
1052      {put( "shape", "Mrecord" );}
1053      };
1054
1055    public VertexAttributeProvider()
1056      {
1057      }
1058
1059    @Override
1060    public Map<String, String> getComponentAttributes( FlowElement object )
1061      {
1062      return defaultNode;
1063      }
1064    }
1065
1066  private static class EdgeAttributeProvider implements ComponentAttributeProvider<Scope>
1067    {
1068    static Map<String, String> attributes = new HashMap<String, String>()
1069      {
1070      {put( "style", "dotted" );}
1071
1072      {put( "arrowhead", "dot" );}
1073      };
1074
1075    @Override
1076    public Map<String, String> getComponentAttributes( Scope scope )
1077      {
1078      if( scope.isNonBlocking() )
1079        return null;
1080
1081      return attributes;
1082      }
1083    }
1084
1085  private static class ProcessGraphNameProvider implements VertexNameProvider<ProcessModel>
1086    {
1087    @Override
1088    public String getVertexName( ProcessModel processModel )
1089      {
1090      return "" + processModel.getOrdinal();
1091      }
1092    }
1093
1094  private static class ProcessGraphLabelProvider implements VertexNameProvider<ProcessModel>
1095    {
1096    @Override
1097    public String getVertexName( ProcessModel processModel )
1098      {
1099      return "ordinal: " + processModel.getOrdinal() +
1100        "\\nid: " + processModel.getID() +
1101        "\\nhash: " + canonicalHash( processModel.getElementGraph() );
1102      }
1103    }
1104
1105  static void injectIdentityMap( AbstractGraph graph )
1106    {
1107    // this overcomes jgrapht 0.9.0 using a LinkedHashMap vs an IdentityHashMap
1108    // vertex not found errors will be thrown if this fails
1109    Object specifics = Util.returnInstanceFieldIfExistsSafe( graph, "specifics" );
1110
1111    if( specifics == null )
1112      {
1113      LOG.warn( "unable to get jgrapht Specifics for identity map injection, may be using an incompatible jgrapht version" );
1114      return;
1115      }
1116
1117    boolean success = Util.setInstanceFieldIfExistsSafe( specifics, "vertexMapDirected", new IdentityHashMap<>() );
1118
1119    if( !success )
1120      LOG.warn( "unable to set IdentityHashMap on jgrapht Specifics, may be using an incompatible jgrapht version" );
1121    }
1122  }