001/* 002 * Copyright (c) 2016-2017 Chris K Wensel <chris@wensel.net>. All Rights Reserved. 003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 004 * 005 * Project and contact information: http://www.cascading.org/ 006 * 007 * This file is part of the Cascading project. 008 * 009 * Licensed under the Apache License, Version 2.0 (the "License"); 010 * you may not use this file except in compliance with the License. 011 * You may obtain a copy of the License at 012 * 013 * http://www.apache.org/licenses/LICENSE-2.0 014 * 015 * Unless required by applicable law or agreed to in writing, software 016 * distributed under the License is distributed on an "AS IS" BASIS, 017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 018 * See the License for the specific language governing permissions and 019 * limitations under the License. 020 */ 021 022package cascading.flow.planner.graph; 023 024import java.io.File; 025import java.io.FileWriter; 026import java.io.IOException; 027import java.io.Writer; 028import java.util.ArrayList; 029import java.util.Arrays; 030import java.util.Collection; 031import java.util.Collections; 032import java.util.HashMap; 033import java.util.HashSet; 034import java.util.IdentityHashMap; 035import java.util.Iterator; 036import java.util.LinkedHashSet; 037import java.util.LinkedList; 038import java.util.List; 039import java.util.ListIterator; 040import java.util.Map; 041import java.util.Set; 042 043import cascading.flow.FlowElement; 044import cascading.flow.FlowElements; 045import cascading.flow.planner.PlatformInfo; 046import cascading.flow.planner.Scope; 047import cascading.flow.planner.iso.expression.ElementCapture; 048import cascading.flow.planner.iso.expression.ExpressionGraph; 049import cascading.flow.planner.iso.expression.FlowElementExpression; 050import cascading.flow.planner.iso.expression.TypeExpression; 051import cascading.flow.planner.iso.finder.SearchOrder; 052import cascading.flow.planner.iso.subgraph.SubGraphIterator; 053import cascading.flow.planner.iso.subgraph.iterator.ExpressionSubGraphIterator; 054import cascading.flow.planner.process.FlowStepGraph; 055import cascading.flow.planner.process.ProcessGraph; 056import cascading.flow.planner.process.ProcessModel; 057import cascading.pipe.Group; 058import cascading.pipe.Operator; 059import cascading.pipe.Pipe; 060import cascading.pipe.Splice; 061import cascading.tap.Tap; 062import cascading.util.DOTProcessGraphWriter; 063import cascading.util.EnumMultiMap; 064import cascading.util.Murmur3; 065import cascading.util.Pair; 066import cascading.util.Util; 067import cascading.util.Version; 068import cascading.util.jgrapht.ComponentAttributeProvider; 069import cascading.util.jgrapht.EdgeNameProvider; 070import cascading.util.jgrapht.IntegerNameProvider; 071import cascading.util.jgrapht.VertexNameProvider; 072import org.jgrapht.DirectedGraph; 073import org.jgrapht.Graph; 074import org.jgrapht.GraphPath; 075import org.jgrapht.Graphs; 076import org.jgrapht.alg.shortestpath.DijkstraShortestPath; 077import org.jgrapht.alg.shortestpath.FloydWarshallShortestPaths; 078import org.jgrapht.alg.shortestpath.KShortestPaths; 079import org.jgrapht.graph.AbstractGraph; 080import org.jgrapht.graph.DirectedMultigraph; 081import org.jgrapht.graph.EdgeReversedGraph; 082import org.jgrapht.graph.SimpleDirectedGraph; 083import org.jgrapht.graph.specifics.DirectedEdgeContainer; 084import org.jgrapht.graph.specifics.DirectedSpecifics; 085import org.jgrapht.traverse.TopologicalOrderIterator; 086import org.jgrapht.util.TypeUtil; 087import org.slf4j.Logger; 088import org.slf4j.LoggerFactory; 089 090import static cascading.util.Util.*; 091 092/** 093 * Class ElementGraphs maintains a collection of operations that can be performed on an {@link ElementGraph}. 094 */ 095public class ElementGraphs 096 { 097 private static final Logger LOG = LoggerFactory.getLogger( ElementGraphs.class ); 098 099 // not for instantiation 100 private ElementGraphs() 101 { 102 } 103 104 public static FlowElementGraph asFlowElementGraph( PlatformInfo platformInfo, FlowStepGraph flowStepGraph ) 105 { 106 ElementMultiGraph elementDirectedGraph = asElementMultiGraph( flowStepGraph.getElementGraphs() ); 107 108 Map<String, Tap> sources = asSourceMap( elementDirectedGraph, findSources( elementDirectedGraph ) ); 109 Map<String, Tap> sinks = asSinkMap( elementDirectedGraph, findSinks( elementDirectedGraph ) ); 110 111 return new FlowElementGraph( platformInfo, elementDirectedGraph, sources, sinks, null, null ); 112 } 113 114 public static ElementDirectedGraph asElementDirectedGraph( Collection<ElementGraph> elementGraphs ) 115 { 116 ElementDirectedGraph elementDirectedGraph = new ElementDirectedGraph(); 117 118 for( ElementGraph elementGraph : elementGraphs ) 119 elementDirectedGraph.copyFrom( asExtentMaskedSubGraph( elementGraph ) ); 120 121 return elementDirectedGraph; 122 } 123 124 public static ElementMultiGraph asElementMultiGraph( Collection<ElementGraph> elementGraphs ) 125 { 126 ElementMultiGraph elementDirectedGraph = new ElementMultiGraph(); 127 128 for( ElementGraph elementGraph : elementGraphs ) 129 elementDirectedGraph.copyFrom( asExtentMaskedSubGraph( elementGraph ) ); 130 131 return elementDirectedGraph; 132 } 133 134 public static boolean isEmpty( ElementGraph elementGraph ) 135 { 136 int size = elementGraph.vertexSet().size(); 137 138 if( size == 0 ) 139 return true; 140 141 return size == 2 && elementGraph.containsVertex( Extent.head ) && elementGraph.containsVertex( Extent.tail ); 142 } 143 144 private static class IdentityDirectedGraph<V, E> extends SimpleDirectedGraph<V, E> 145 { 146 public IdentityDirectedGraph( Class<? extends E> edgeClass ) 147 { 148 super( edgeClass ); 149 } 150 151 @Override 152 protected DirectedSpecifics createSpecifics( boolean directed ) 153 { 154 return new DirectedSpecifics( this, new IdentityHashMap<V, DirectedEdgeContainer<V, E>>() ); 155 } 156 } 157 158 private static class IdentityMultiGraphGraph<V, E> extends DirectedMultigraph<V, E> 159 { 160 public IdentityMultiGraphGraph( Class<? extends E> edgeClass ) 161 { 162 super( edgeClass ); 163 } 164 165 @Override 166 protected DirectedSpecifics createSpecifics( boolean directed ) 167 { 168 return new DirectedSpecifics( this, new IdentityHashMap<V, DirectedEdgeContainer<V, E>>() ); 169 } 170 } 171 172 public static Graph<FlowElement, Scope> directed( ElementGraph elementGraph ) 173 { 174 if( elementGraph == null ) 175 return null; 176 177 if( elementGraph instanceof DecoratedElementGraph ) 178 return directed( ( (DecoratedElementGraph) elementGraph ).getDecorated() ); 179 180 return ( (BaseElementGraph) elementGraph ).graph; 181 } 182 183 public static EnumMultiMap<FlowElement> annotations( ElementGraph elementGraph ) 184 { 185 if( elementGraph == null ) 186 return null; 187 188 if( elementGraph instanceof DecoratedElementGraph ) 189 return annotations( ( (DecoratedElementGraph) elementGraph ).getDecorated() ); 190 191 if( elementGraph instanceof AnnotatedGraph ) 192 return ( (AnnotatedGraph) elementGraph ).getAnnotations(); 193 194 return null; 195 } 196 197 public static boolean isMultiGraph( ElementGraph elementGraph ) 198 { 199 return directed( elementGraph ) instanceof DirectedMultigraph; 200 } 201 202 public static int hashCodeIgnoreAnnotations( ElementGraph elementGraph ) 203 { 204 return hashCodeIgnoreAnnotations( directed( elementGraph ) ); 205 } 206 207 public static <V, E> int hashCodeIgnoreAnnotations( Graph<V, E> graph ) 208 { 209 int hash = graph.vertexSet().hashCode(); 210 211 for( E e : graph.edgeSet() ) 212 { 213 int part = e.hashCode(); 214 215 int source = graph.getEdgeSource( e ).hashCode(); 216 int target = graph.getEdgeTarget( e ).hashCode(); 217 218 int pairing = pair( source, target ); 219 220 part = ( 27 * part ) + pairing; 221 222 long weight = (long) graph.getEdgeWeight( e ); 223 part = ( 27 * part ) + (int) ( weight ^ ( weight >>> 32 ) ); 224 225 hash += part; 226 } 227 228 return hash; 229 } 230 231 public static boolean equalsIgnoreAnnotations( ElementGraph lhs, ElementGraph rhs ) 232 { 233 return equalsIgnoreAnnotations( directed( lhs ), directed( rhs ) ); 234 } 235 236 public static <V, E> boolean equalsIgnoreAnnotations( Graph<V, E> lhs, Graph<V, E> rhs ) 237 { 238 if( lhs == rhs ) 239 return true; 240 241 TypeUtil<Graph<V, E>> typeDecl = null; 242 Graph<V, E> lhsGraph = TypeUtil.uncheckedCast( lhs, typeDecl ); 243 Graph<V, E> rhsGraph = TypeUtil.uncheckedCast( rhs, typeDecl ); 244 245 if( !lhsGraph.vertexSet().equals( rhsGraph.vertexSet() ) ) 246 return false; 247 248 if( lhsGraph.edgeSet().size() != rhsGraph.edgeSet().size() ) 249 return false; 250 251 for( E e : lhsGraph.edgeSet() ) 252 { 253 V source = lhsGraph.getEdgeSource( e ); 254 V target = lhsGraph.getEdgeTarget( e ); 255 256 if( !rhsGraph.containsEdge( e ) ) 257 return false; 258 259 if( !rhsGraph.getEdgeSource( e ).equals( source ) || !rhsGraph.getEdgeTarget( e ).equals( target ) ) 260 return false; 261 262 if( Math.abs( lhsGraph.getEdgeWeight( e ) - rhsGraph.getEdgeWeight( e ) ) > 10e-7 ) 263 return false; 264 } 265 266 return true; 267 } 268 269 public static boolean equals( ElementGraph lhs, ElementGraph rhs ) 270 { 271 if( !equalsIgnoreAnnotations( lhs, rhs ) ) 272 return false; 273 274 if( !( lhs instanceof AnnotatedGraph ) && !( rhs instanceof AnnotatedGraph ) ) 275 return true; 276 277 if( !( lhs instanceof AnnotatedGraph ) || !( rhs instanceof AnnotatedGraph ) ) 278 return false; 279 280 AnnotatedGraph lhsAnnotated = (AnnotatedGraph) lhs; 281 AnnotatedGraph rhsAnnotated = (AnnotatedGraph) rhs; 282 283 if( lhsAnnotated.hasAnnotations() != rhsAnnotated.hasAnnotations() ) 284 return false; 285 286 if( !lhsAnnotated.hasAnnotations() ) 287 return true; 288 289 return lhsAnnotated.getAnnotations().equals( rhsAnnotated.getAnnotations() ); 290 } 291 292 public static String canonicalHash( ElementGraph graph ) 293 { 294 return canonicalHash( directed( graph ) ); 295 } 296 297 public static String canonicalHash( Graph<FlowElement, Scope> graph ) 298 { 299 int hash = Murmur3.SEED; 300 301 int edges = 0; 302 boolean hasExtents = false; 303 304 for( Scope e : graph.edgeSet() ) 305 { 306 FlowElement edgeSource = graph.getEdgeSource( e ); 307 FlowElement edgeTarget = graph.getEdgeTarget( e ); 308 309 // simpler to ignore extents here 310 if( edgeSource instanceof Extent || edgeTarget instanceof Extent ) 311 { 312 hasExtents = true; 313 continue; 314 } 315 316 int source = hash( edgeSource ); 317 int target = hash( edgeTarget ); 318 int pairing = pair( source, target ); 319 320 hash += pairing; // don't make edge traversal order significant 321 edges++; 322 } 323 324 int vertexes = graph.vertexSet().size() - ( hasExtents ? 2 : 0 ); 325 326 hash = Murmur3.fmix( hash, vertexes * edges ); 327 328 return Util.getHex( Util.intToByteArray( hash ) ); 329 } 330 331 private static int hash( FlowElement flowElement ) 332 { 333 int lhs = flowElement.getClass().getName().hashCode(); 334 int rhs = 0; 335 336 if( flowElement instanceof Operator && ( (Operator) flowElement ).getOperation() != null ) 337 rhs = ( (Operator) flowElement ).getOperation().getClass().getName().hashCode(); 338 else if( flowElement instanceof Tap && ( (Tap) flowElement ).getScheme() != null ) 339 rhs = ( (Tap) flowElement ).getScheme().getClass().getName().hashCode(); 340 else if( flowElement instanceof Splice ) 341 rhs = ( (Splice) flowElement ).getJoiner().getClass().getName().hashCode() + 31 * ( (Splice) flowElement ).getNumSelfJoins(); 342 343 return pair( lhs, rhs ); 344 } 345 346 protected static int pair( int lhs, int rhs ) 347 { 348 if( rhs == 0 ) 349 return lhs; 350 351 // see http://en.wikipedia.org/wiki/Pairing_function 352 return ( ( lhs + rhs ) * ( lhs + rhs + 1 ) / 2 ) + rhs; 353 } 354 355 public static Iterator<FlowElement> getTopologicalIterator( ElementGraph graph ) 356 { 357 if( graph == null ) 358 return Collections.emptyIterator(); 359 360 return new TopologicalOrderIterator<>( directed( graph ) ); 361 } 362 363 public static Iterator<FlowElement> getReverseTopologicalIterator( ElementGraph graph ) 364 { 365 return new TopologicalOrderIterator<>( new EdgeReversedGraph<>( directed( graph ) ) ); 366 } 367 368 public static Collection<FlowElement> getAllElementsBetweenExclusive( ElementGraph graph, FlowElement from, FlowElement to ) 369 { 370 Collection<FlowElement> results = getAllElementsBetweenInclusive( graph, from, to ); 371 372 results.remove( from ); 373 results.remove( to ); 374 375 return results; 376 } 377 378 public static Collection<FlowElement> getAllElementsBetweenInclusive( ElementGraph graph, FlowElement from, FlowElement to ) 379 { 380 Set<FlowElement> results = Util.createIdentitySet(); 381 382 List<GraphPath<FlowElement, Scope>> pathsBetween = getAllShortestPathsBetween( graph, from, to ); 383 384 for( GraphPath<FlowElement, Scope> graphPath : pathsBetween ) 385 results.addAll( graphPath.getVertexList() ); 386 387 return results; 388 } 389 390 public static List<GraphPath<FlowElement, Scope>> getAllShortestPathsBetween( ElementGraph graph, FlowElement from, FlowElement to ) 391 { 392 return getAllShortestPathsBetween( directed( graph ), from, to ); 393 } 394 395 public static <V, E> List<GraphPath<V, E>> getAllShortestPathsBetween( Graph<V, E> graph, V from, V to ) 396 { 397 List<GraphPath<V, E>> paths = new KShortestPaths<>( graph, Integer.MAX_VALUE ).getPaths( from, to ); 398 399 if( paths == null ) 400 return new ArrayList<>(); 401 402 return paths; 403 } 404 405 public static ElementSubGraph asSubGraph( ElementGraph elementGraph, ElementGraph contractedGraph, Set<FlowElement> excludes ) 406 { 407 elementGraph = asExtentMaskedSubGraph( elementGraph ); // returns same instance if not bounded 408 409 Pair<Set<FlowElement>, Set<Scope>> pair = findClosureViaFloydWarshall( directed( elementGraph ), directed( contractedGraph ), excludes ); 410 Set<FlowElement> vertices = pair.getLhs(); 411 Set<Scope> excludeEdges = pair.getRhs(); 412 413 Set<Scope> scopes = new HashSet<>( elementGraph.edgeSet() ); 414 scopes.removeAll( excludeEdges ); 415 416 return new ElementSubGraph( elementGraph, vertices, scopes ); 417 } 418 419 /** 420 * Returns a new ElementGraph (a MaskedSubGraph) of the given ElementGraph that will not contain the {@link Extent} 421 * head or tail instances. 422 * <p> 423 * If the given ElementGraph does not contain head or tail, it will be returned unchanged. 424 * 425 * @param elementGraph 426 * @return 427 */ 428 public static ElementGraph asExtentMaskedSubGraph( ElementGraph elementGraph ) 429 { 430 if( elementGraph.containsVertex( Extent.head ) || elementGraph.containsVertex( Extent.tail ) ) 431 return new ElementMaskSubGraph( elementGraph, Extent.head, Extent.tail ); 432 433 return elementGraph; 434 } 435 436 public static <V, E> Pair<Set<V>, Set<E>> findClosureViaFloydWarshall( DirectedGraph<V, E> full, DirectedGraph<V, E> contracted ) 437 { 438 return findClosureViaFloydWarshall( full, contracted, null ); 439 } 440 441 public static <V, E> Pair<Set<V>, Set<E>> findClosureViaFloydWarshall( Graph<V, E> full, Graph<V, E> contracted, Set<V> excludes ) 442 { 443 Set<V> vertices = createIdentitySet( contracted.vertexSet() ); 444 LinkedList<V> allVertices = new LinkedList<>( full.vertexSet() ); 445 446 allVertices.removeAll( vertices ); 447 448 Set<E> excludeEdges = new HashSet<>(); 449 450 // prevent distinguished elements from being included inside the sub-graph 451 if( excludes != null ) 452 { 453 for( V v : excludes ) 454 { 455 if( !full.containsVertex( v ) ) 456 continue; 457 458 excludeEdges.addAll( full.incomingEdgesOf( v ) ); 459 excludeEdges.addAll( full.outgoingEdgesOf( v ) ); 460 } 461 } 462 463 for( V v : contracted.vertexSet() ) 464 { 465 if( contracted.inDegreeOf( v ) == 0 ) 466 excludeEdges.addAll( full.incomingEdgesOf( v ) ); 467 468 if( contracted.outDegreeOf( v ) == 0 ) 469 excludeEdges.addAll( full.outgoingEdgesOf( v ) ); 470 } 471 472 // exclude edged between nodes in full that are not in the contracted graph 473 for( V outer : contracted.vertexSet() ) 474 { 475 for( V inner : contracted.vertexSet() ) 476 { 477 if( outer == inner ) 478 continue; 479 480 if( contracted.getAllEdges( inner, outer ).isEmpty() ) 481 excludeEdges.addAll( full.getAllEdges( inner, outer ) ); 482 } 483 } 484 485 Graph<V, E> disconnected = disconnectExtentsAndExclude( full, excludeEdges ); 486 487 FloydWarshallShortestPaths<V, E> paths = new FloydWarshallShortestPaths<>( disconnected ); 488 489 for( E edge : contracted.edgeSet() ) 490 { 491 V edgeSource = contracted.getEdgeSource( edge ); 492 V edgeTarget = contracted.getEdgeTarget( edge ); 493 494 ListIterator<V> iterator = allVertices.listIterator(); 495 while( iterator.hasNext() ) 496 { 497 V vertex = iterator.next(); 498 499 if( !isBetween( paths, edgeSource, edgeTarget, vertex ) ) 500 continue; 501 502 vertices.add( vertex ); 503 iterator.remove(); 504 } 505 } 506 507 return new Pair<>( vertices, excludeEdges ); 508 } 509 510 private static <V, E> Graph<V, E> disconnectExtentsAndExclude( Graph<V, E> full, Set<E> withoutEdges ) 511 { 512 IdentityMultiGraphGraph<V, E> copy = (IdentityMultiGraphGraph<V, E>) new IdentityMultiGraphGraph<>( Object.class ); 513 514 Graphs.addAllVertices( copy, full.vertexSet() ); 515 516 copy.removeVertex( (V) Extent.head ); 517 copy.removeVertex( (V) Extent.tail ); 518 519 Set<E> edges = full.edgeSet(); 520 521 if( !withoutEdges.isEmpty() ) 522 { 523 edges = new HashSet<>( edges ); 524 edges.removeAll( withoutEdges ); 525 } 526 527 Graphs.addAllEdges( copy, full, edges ); 528 529 return copy; 530 } 531 532 private static <V, E> boolean isBetween( FloydWarshallShortestPaths<V, E> paths, V edgeSource, V edgeTarget, V vertex ) 533 { 534 return paths.getFirstHop( edgeSource, vertex ) != null && paths.getFirstHop( vertex, edgeTarget ) != null; 535 } 536 537 public static void removeAndContract( ElementGraph elementGraph, FlowElement flowElement ) 538 { 539 LOG.debug( "removing element, contracting edge for: {}", flowElement ); 540 541 Set<Scope> incomingScopes = elementGraph.incomingEdgesOf( flowElement ); 542 543 boolean contractIncoming = true; 544 545 if( !contractIncoming ) 546 { 547 if( incomingScopes.size() != 1 ) 548 throw new IllegalStateException( "flow element:" + flowElement + ", has multiple input paths: " + incomingScopes.size() ); 549 } 550 551 boolean isJoin = flowElement instanceof Splice && ( (Splice) flowElement ).isJoin(); 552 553 for( Scope incoming : incomingScopes ) 554 { 555 Set<Scope> outgoingScopes = elementGraph.outgoingEdgesOf( flowElement ); 556 557 // source -> incoming -> flowElement -> outgoing -> target 558 FlowElement source = elementGraph.getEdgeSource( incoming ); 559 560 for( Scope outgoing : outgoingScopes ) 561 { 562 FlowElement target = elementGraph.getEdgeTarget( outgoing ); 563 564 boolean isNonBlocking = outgoing.isNonBlocking(); 565 566 if( isJoin ) 567 isNonBlocking = isNonBlocking && incoming.isNonBlocking(); 568 569 Scope scope = new Scope( outgoing ); 570 571 // unsure if necessary since we track blocking independently 572 // when removing a pipe, pull ordinal up to tap 573 // when removing a Splice retain ordinal 574 if( flowElement instanceof Splice ) 575 scope.setOrdinal( incoming.getOrdinal() ); 576 else 577 scope.setOrdinal( outgoing.getOrdinal() ); 578 579 scope.setNonBlocking( isNonBlocking ); 580 scope.addPriorNames( incoming, outgoing ); // not copied 581 582 boolean success = elementGraph.addEdge( source, target, scope ); 583 584 if( !success ) 585 throw new IllegalStateException( "during graph contraction, unable to add new edge between: " + source + " and " + target ); 586 } 587 } 588 589 elementGraph.removeVertex( flowElement ); 590 } 591 592 public static boolean printElementGraph( String filename, final ElementGraph graph, final PlatformInfo platformInfo ) 593 { 594 try 595 { 596 File parentFile = new File( filename ).getParentFile(); 597 598 if( parentFile != null && !parentFile.exists() ) 599 parentFile.mkdirs(); 600 601 Writer writer = new FileWriter( filename ); 602 603 Util.writeDOT( writer, ElementGraphs.directed( graph ), 604 new IntegerNameProvider<FlowElement>(), 605 new FlowElementVertexNameProvider( graph, platformInfo ), 606 new ScopeEdgeNameProvider(), 607 new VertexAttributeProvider(), new EdgeAttributeProvider() ); 608 609 writer.close(); 610 return true; 611 } 612 catch( NullPointerException | IOException exception ) 613 { 614 LOG.error( "failed printing graph to: {}, with exception: {}", filename, exception ); 615 } 616 617 return false; 618 } 619 620 public static boolean printProcessGraph( String filename, final ElementGraph graph, final ProcessGraph<? extends ProcessModel> processGraph ) 621 { 622 try 623 { 624 File parentFile = new File( filename ).getParentFile(); 625 626 if( parentFile != null && !parentFile.exists() ) 627 parentFile.mkdirs(); 628 629 Writer writer = new FileWriter( filename ); 630 631 DOTProcessGraphWriter graphWriter = new DOTProcessGraphWriter( 632 new IntegerNameProvider<>(), 633 new FlowElementVertexNameProvider( graph, null ), 634 new ScopeEdgeNameProvider(), 635 new VertexAttributeProvider(), new EdgeAttributeProvider(), 636 new ProcessGraphNameProvider(), new ProcessGraphLabelProvider() 637 ); 638 639 graphWriter.writeGraph( writer, graph, processGraph ); 640 641 writer.close(); 642 return true; 643 } 644 catch( IOException exception ) 645 { 646 LOG.error( "failed printing graph to: {}, with exception: {}", filename, exception ); 647 } 648 649 return false; 650 } 651 652 public static void insertFlowElementAfter( ElementGraph elementGraph, FlowElement previousElement, FlowElement flowElement ) 653 { 654 Set<Scope> outgoing = new HashSet<>( elementGraph.outgoingEdgesOf( previousElement ) ); 655 656 elementGraph.addVertex( flowElement ); 657 658 String name = previousElement.toString(); 659 660 if( previousElement instanceof Pipe ) 661 name = ( (Pipe) previousElement ).getName(); 662 663 elementGraph.addEdge( previousElement, flowElement, new Scope( name ) ); 664 665 for( Scope scope : outgoing ) 666 { 667 FlowElement target = elementGraph.getEdgeTarget( scope ); 668 Scope foundScope = elementGraph.removeEdge( previousElement, target ); // remove scope 669 670 if( foundScope != scope ) 671 throw new IllegalStateException( "did not remove proper scope" ); 672 673 elementGraph.addEdge( flowElement, target, scope ); // add scope back 674 } 675 } 676 677 public static void insertFlowElementBefore( ElementGraph graph, FlowElement nextElement, FlowElement flowElement ) 678 { 679 Set<Scope> incoming = new HashSet<>( graph.incomingEdgesOf( nextElement ) ); 680 681 graph.addVertex( flowElement ); 682 683 String name = nextElement.toString(); 684 685 if( nextElement instanceof Pipe ) 686 name = ( (Pipe) nextElement ).getName(); 687 688 graph.addEdge( flowElement, nextElement, new Scope( name ) ); 689 690 for( Scope scope : incoming ) 691 { 692 FlowElement target = graph.getEdgeSource( scope ); 693 Scope foundScope = graph.removeEdge( target, nextElement ); // remove scope 694 695 if( foundScope != scope ) 696 throw new IllegalStateException( "did not remove proper scope" ); 697 698 graph.addEdge( target, flowElement, scope ); // add scope back 699 } 700 } 701 702 public static void insertFlowElementBetweenEdge( ElementGraph elementGraph, Scope previousEdge, FlowElement newElement ) 703 { 704 FlowElement previousElement = elementGraph.getEdgeSource( previousEdge ); 705 FlowElement nextElement = elementGraph.getEdgeTarget( previousEdge ); 706 707 elementGraph.addVertex( newElement ); 708 709 // add edge between previous and new 710 elementGraph.addEdge( previousElement, newElement, new Scope( previousEdge ) ); 711 712 // add edge between new and next 713 Scope scope = new Scope( previousEdge ); 714 715 scope.setOrdinal( previousEdge.getOrdinal() ); 716 717 if( nextElement instanceof Splice && ( (Splice) nextElement ).isJoin() && previousEdge.getOrdinal() != 0 ) 718 scope.setNonBlocking( false ); 719 720 elementGraph.addEdge( newElement, nextElement, scope ); 721 722 // remove previous edge 723 elementGraph.removeEdge( previousElement, nextElement ); 724 } 725 726 public static <F extends FlowElement> Map<String, F> asSourceMap( ElementGraph elementGraph, Set<F> elements ) 727 { 728 Map<String, F> map = new HashMap<>(); 729 730 for( F element : elements ) 731 { 732 for( Scope scope : elementGraph.outgoingEdgesOf( element ) ) 733 map.put( scope.getName(), element ); 734 } 735 736 return map; 737 } 738 739 public static Set<Tap> findSources( ElementGraph elementGraph ) 740 { 741 return findSources( elementGraph, Tap.class ); 742 } 743 744 public static <F extends FlowElement> Set<F> findSources( ElementGraph elementGraph, Class<F> type ) 745 { 746 if( elementGraph == null ) 747 return Collections.emptySet(); 748 749 if( elementGraph.containsVertex( Extent.head ) ) 750 return narrowIdentitySet( type, elementGraph.successorListOf( Extent.head ) ); 751 752 SubGraphIterator iterator = new ExpressionSubGraphIterator( 753 new ExpressionGraph( SearchOrder.Topological, new FlowElementExpression( ElementCapture.Primary, type, TypeExpression.Topo.Head ) ), 754 elementGraph 755 ); 756 757 return narrowIdentitySet( type, getAllVertices( iterator ) ); 758 } 759 760 public static <F extends FlowElement> Map<String, F> asSinkMap( ElementGraph elementGraph, Set<F> elements ) 761 { 762 Map<String, F> map = new HashMap<>(); 763 764 for( F element : elements ) 765 { 766 for( Scope scope : elementGraph.incomingEdgesOf( element ) ) 767 map.put( scope.getName(), element ); 768 } 769 770 return map; 771 } 772 773 public static <F extends FlowElement> Set<F> findSinks( ElementGraph elementGraph, Class<F> type ) 774 { 775 if( elementGraph == null ) 776 return Collections.emptySet(); 777 778 if( elementGraph.containsVertex( Extent.tail ) ) 779 return narrowIdentitySet( type, elementGraph.predecessorListOf( Extent.tail ) ); 780 781 SubGraphIterator iterator = new ExpressionSubGraphIterator( 782 new ExpressionGraph( SearchOrder.ReverseTopological, new FlowElementExpression( ElementCapture.Primary, type, TypeExpression.Topo.Tail ) ), 783 elementGraph 784 ); 785 786 return narrowIdentitySet( type, getAllVertices( iterator ) ); 787 } 788 789 public static Set<Tap> findSinks( ElementGraph elementGraph ) 790 { 791 return findSinks( elementGraph, Tap.class ); 792 } 793 794 public static Set<Group> findAllGroups( ElementGraph elementGraph ) 795 { 796 SubGraphIterator iterator = new ExpressionSubGraphIterator( 797 new ExpressionGraph( SearchOrder.Topological, new FlowElementExpression( ElementCapture.Primary, Group.class ) ), 798 elementGraph 799 ); 800 801 return narrowIdentitySet( Group.class, getAllVertices( iterator ) ); 802 } 803 804 private static Set<FlowElement> getAllVertices( SubGraphIterator iterator ) 805 { 806 Set<FlowElement> vertices = createIdentitySet(); 807 808 while( iterator.hasNext() ) 809 vertices.addAll( iterator.next().vertexSet() ); 810 811 return vertices; 812 } 813 814 public static Set<Scope> getAllMultiEdgesBetween( Collection<Scope> edgeList, ElementGraph current ) 815 { 816 Set<Scope> allEdgesBetween = new HashSet<>(); 817 818 for( Scope scope : edgeList ) 819 { 820 FlowElement edgeSource = current.getEdgeSource( scope ); 821 FlowElement edgeTarget = current.getEdgeTarget( scope ); 822 823 allEdgesBetween.addAll( current.getAllEdges( edgeSource, edgeTarget ) ); 824 } 825 826 return allEdgesBetween; 827 } 828 829 public static void replaceElementWith( ElementGraph elementGraph, FlowElement replace, FlowElement replaceWith ) 830 { 831 Set<Scope> incoming = new HashSet<Scope>( elementGraph.incomingEdgesOf( replace ) ); 832 Set<Scope> outgoing = new HashSet<Scope>( elementGraph.outgoingEdgesOf( replace ) ); 833 834 if( !elementGraph.containsVertex( replaceWith ) ) 835 elementGraph.addVertex( replaceWith ); 836 837 for( Scope scope : incoming ) 838 { 839 FlowElement source = elementGraph.getEdgeSource( scope ); 840 elementGraph.removeEdge( source, replace ); // remove scope 841 842 // drop edge between, if any 843 if( source != replaceWith ) 844 elementGraph.addEdge( source, replaceWith, scope ); // add scope back 845 } 846 847 for( Scope scope : outgoing ) 848 { 849 FlowElement target = elementGraph.getEdgeTarget( scope ); 850 elementGraph.removeEdge( replace, target ); // remove scope 851 852 // drop edge between, if any 853 if( target != replaceWith ) 854 elementGraph.addEdge( replaceWith, target, scope ); // add scope back 855 } 856 857 elementGraph.removeVertex( replace ); 858 } 859 860 public static Pipe findFirstPipeNamed( ElementGraph elementGraph, String name ) 861 { 862 Iterator<FlowElement> iterator = getTopologicalIterator( elementGraph ); 863 864 return find( name, iterator ); 865 } 866 867 public static Pipe findLastPipeNamed( ElementGraph elementGraph, String name ) 868 { 869 Iterator<FlowElement> iterator = getReverseTopologicalIterator( elementGraph ); 870 871 return find( name, iterator ); 872 } 873 874 private static Pipe find( String name, Iterator<FlowElement> iterator ) 875 { 876 while( iterator.hasNext() ) 877 { 878 FlowElement flowElement = iterator.next(); 879 880 if( flowElement instanceof Pipe && ( (Pipe) flowElement ).getName().equals( name ) ) 881 return (Pipe) flowElement; 882 } 883 884 return null; 885 } 886 887 public static boolean removeBranchContaining( ElementGraph elementGraph, FlowElement flowElement ) 888 { 889 Set<FlowElement> branch = new LinkedHashSet<>(); 890 891 walkUp( branch, elementGraph, flowElement ); 892 893 walkDown( branch, elementGraph, flowElement ); 894 895 if( branch.isEmpty() ) 896 return false; 897 898 for( FlowElement element : branch ) 899 elementGraph.removeVertex( element ); 900 901 return true; 902 } 903 904 public static boolean removeBranchBetween( ElementGraph elementGraph, FlowElement first, FlowElement second, boolean inclusive ) 905 { 906 Set<FlowElement> branch = new LinkedHashSet<>( Arrays.asList( first, second ) ); 907 908 walkDown( branch, elementGraph, first ); 909 910 if( !inclusive ) 911 { 912 branch.remove( first ); 913 branch.remove( second ); 914 } 915 916 if( branch.isEmpty() ) 917 return false; 918 919 for( FlowElement element : branch ) 920 elementGraph.removeVertex( element ); 921 922 return true; 923 } 924 925 private static void walkDown( Set<FlowElement> branch, ElementGraph elementGraph, FlowElement flowElement ) 926 { 927 FlowElement current; 928 current = flowElement; 929 930 while( true ) 931 { 932 if( !branch.contains( current ) && ( elementGraph.inDegreeOf( current ) != 1 || elementGraph.outDegreeOf( current ) != 1 ) ) 933 break; 934 935 branch.add( current ); 936 937 FlowElement element = elementGraph.getEdgeTarget( getFirst( elementGraph.outgoingEdgesOf( current ) ) ); 938 939 if( element instanceof Extent || branch.contains( element ) ) 940 break; 941 942 current = element; 943 } 944 } 945 946 private static void walkUp( Set<FlowElement> branch, ElementGraph elementGraph, FlowElement flowElement ) 947 { 948 FlowElement current = flowElement; 949 950 while( true ) 951 { 952 if( elementGraph.inDegreeOf( current ) != 1 || elementGraph.outDegreeOf( current ) != 1 ) 953 break; 954 955 branch.add( current ); 956 957 FlowElement element = elementGraph.getEdgeSource( getFirst( elementGraph.incomingEdgesOf( current ) ) ); 958 959 if( element instanceof Extent || branch.contains( element ) ) 960 break; 961 962 current = element; 963 } 964 } 965 966 /** 967 * Returns the number of edges found on the shortest distance between the lhs and rhs. 968 */ 969 public static int shortestDistance( ElementGraph graph, FlowElement lhs, FlowElement rhs ) 970 { 971 return DijkstraShortestPath.findPathBetween( directed( graph ), lhs, rhs ).getLength(); 972 } 973 974 private static class FlowElementVertexNameProvider implements VertexNameProvider<FlowElement> 975 { 976 private final ElementGraph elementGraph; 977 private final PlatformInfo platformInfo; 978 979 public FlowElementVertexNameProvider( ElementGraph elementGraph, PlatformInfo platformInfo ) 980 { 981 this.elementGraph = elementGraph; 982 this.platformInfo = platformInfo; 983 } 984 985 public String getVertexName( FlowElement object ) 986 { 987 if( object instanceof Extent ) // is head/tail 988 { 989 String result = object.toString().replaceAll( "\"", "\'" ); 990 991 if( object == Extent.tail ) 992 return result; 993 994 result = result + "|hash: " + canonicalHash( elementGraph ); 995 996 String versionString = Version.getRelease(); 997 998 if( platformInfo != null ) 999 versionString = ( versionString == null ? "" : versionString + "|" ) + platformInfo; 1000 1001 return "{" + ( versionString == null ? result : result + "|" + versionString ) + "}"; 1002 } 1003 1004 String label; 1005 1006 Iterator<Scope> iterator = elementGraph.outgoingEdgesOf( object ).iterator(); 1007 1008 if( !( object instanceof Pipe ) || !iterator.hasNext() ) 1009 { 1010 label = object.toString().replaceAll( "\"", "\'" ).replaceAll( "(\\)|\\])(\\[)", "$1|$2" ).replaceAll( "(^[^(\\[]+)(\\(|\\[)", "$1|$2" ); 1011 } 1012 else 1013 { 1014 Scope scope = iterator.next(); 1015 1016 label = ( (Pipe) object ).print( scope ).replaceAll( "\"", "\'" ).replaceAll( "(\\)|\\])(\\[)", "$1|$2" ).replaceAll( "(^[^(\\[]+)(\\(|\\[)", "$1|$2" ); 1017 } 1018 1019 label = label.replaceFirst( "([^|]+)\\|(.*)", "$1 : " + getID( object ) + "|$2" ); // insert id 1020 1021 label = "{" + label.replaceAll( "\\{", "\\\\{" ).replaceAll( "\\}", "\\\\}" ).replaceAll( "<", "\\\\<" ).replaceAll( ">", "\\\\>" ) + "}"; 1022 1023 if( !( elementGraph instanceof AnnotatedGraph ) || !( (AnnotatedGraph) elementGraph ).hasAnnotations() ) 1024 return label; 1025 1026 Set<Enum> annotations = ( (AnnotatedGraph) elementGraph ).getAnnotations().getKeysFor( object ); 1027 1028 if( !annotations.isEmpty() ) 1029 label += "|{" + Util.join( annotations, "|" ) + "}"; 1030 1031 return label; 1032 } 1033 1034 protected String getID( FlowElement object ) 1035 { 1036 return FlowElements.id( object ).substring( 0, 5 ); 1037 } 1038 } 1039 1040 private static class ScopeEdgeNameProvider implements EdgeNameProvider<Scope> 1041 { 1042 public String getEdgeName( Scope object ) 1043 { 1044 return object.toString().replaceAll( "\"", "\'" ).replaceAll( "\n", "\\\\n" ); // fix for newlines in graphviz 1045 } 1046 } 1047 1048 private static class VertexAttributeProvider implements ComponentAttributeProvider<FlowElement> 1049 { 1050 static Map<String, String> defaultNode = new HashMap<String, String>() 1051 { 1052 {put( "shape", "Mrecord" );} 1053 }; 1054 1055 public VertexAttributeProvider() 1056 { 1057 } 1058 1059 @Override 1060 public Map<String, String> getComponentAttributes( FlowElement object ) 1061 { 1062 return defaultNode; 1063 } 1064 } 1065 1066 private static class EdgeAttributeProvider implements ComponentAttributeProvider<Scope> 1067 { 1068 static Map<String, String> attributes = new HashMap<String, String>() 1069 { 1070 {put( "style", "dotted" );} 1071 1072 {put( "arrowhead", "dot" );} 1073 }; 1074 1075 @Override 1076 public Map<String, String> getComponentAttributes( Scope scope ) 1077 { 1078 if( scope.isNonBlocking() ) 1079 return null; 1080 1081 return attributes; 1082 } 1083 } 1084 1085 private static class ProcessGraphNameProvider implements VertexNameProvider<ProcessModel> 1086 { 1087 @Override 1088 public String getVertexName( ProcessModel processModel ) 1089 { 1090 return "" + processModel.getOrdinal(); 1091 } 1092 } 1093 1094 private static class ProcessGraphLabelProvider implements VertexNameProvider<ProcessModel> 1095 { 1096 @Override 1097 public String getVertexName( ProcessModel processModel ) 1098 { 1099 return "ordinal: " + processModel.getOrdinal() + 1100 "\\nid: " + processModel.getID() + 1101 "\\nhash: " + canonicalHash( processModel.getElementGraph() ); 1102 } 1103 } 1104 1105 static void injectIdentityMap( AbstractGraph graph ) 1106 { 1107 // this overcomes jgrapht 0.9.0 using a LinkedHashMap vs an IdentityHashMap 1108 // vertex not found errors will be thrown if this fails 1109 Object specifics = Util.returnInstanceFieldIfExistsSafe( graph, "specifics" ); 1110 1111 if( specifics == null ) 1112 { 1113 LOG.warn( "unable to get jgrapht Specifics for identity map injection, may be using an incompatible jgrapht version" ); 1114 return; 1115 } 1116 1117 boolean success = Util.setInstanceFieldIfExistsSafe( specifics, "vertexMapDirected", new IdentityHashMap<>() ); 1118 1119 if( !success ) 1120 LOG.warn( "unable to set IdentityHashMap on jgrapht Specifics, may be using an incompatible jgrapht version" ); 1121 } 1122 }