001/* 002 * Copyright (c) 2016 Chris K Wensel <chris@wensel.net>. All Rights Reserved. 003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 004 * 005 * Project and contact information: http://www.cascading.org/ 006 * 007 * This file is part of the Cascading project. 008 * 009 * Licensed under the Apache License, Version 2.0 (the "License"); 010 * you may not use this file except in compliance with the License. 011 * You may obtain a copy of the License at 012 * 013 * http://www.apache.org/licenses/LICENSE-2.0 014 * 015 * Unless required by applicable law or agreed to in writing, software 016 * distributed under the License is distributed on an "AS IS" BASIS, 017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 018 * See the License for the specific language governing permissions and 019 * limitations under the License. 020 */ 021 022package cascading.flow.planner.graph; 023 024import java.io.File; 025import java.io.FileWriter; 026import java.io.IOException; 027import java.io.Writer; 028import java.util.ArrayList; 029import java.util.Arrays; 030import java.util.Collection; 031import java.util.Collections; 032import java.util.HashMap; 033import java.util.HashSet; 034import java.util.IdentityHashMap; 035import java.util.Iterator; 036import java.util.LinkedHashSet; 037import java.util.LinkedList; 038import java.util.List; 039import java.util.ListIterator; 040import java.util.Map; 041import java.util.Set; 042 043import cascading.flow.FlowElement; 044import cascading.flow.FlowElements; 045import cascading.flow.planner.PlatformInfo; 046import cascading.flow.planner.Scope; 047import cascading.flow.planner.iso.expression.ElementCapture; 048import cascading.flow.planner.iso.expression.ExpressionGraph; 049import cascading.flow.planner.iso.expression.FlowElementExpression; 050import cascading.flow.planner.iso.expression.TypeExpression; 051import cascading.flow.planner.iso.finder.SearchOrder; 052import cascading.flow.planner.iso.subgraph.SubGraphIterator; 053import cascading.flow.planner.iso.subgraph.iterator.ExpressionSubGraphIterator; 054import cascading.flow.planner.process.FlowStepGraph; 055import cascading.flow.planner.process.ProcessGraph; 056import cascading.flow.planner.process.ProcessModel; 057import cascading.pipe.Group; 058import cascading.pipe.Operator; 059import cascading.pipe.Pipe; 060import cascading.pipe.Splice; 061import cascading.tap.Tap; 062import cascading.util.DOTProcessGraphWriter; 063import cascading.util.EnumMultiMap; 064import cascading.util.Murmur3; 065import cascading.util.Pair; 066import cascading.util.Util; 067import cascading.util.Version; 068import cascading.util.jgrapht.ComponentAttributeProvider; 069import cascading.util.jgrapht.EdgeNameProvider; 070import cascading.util.jgrapht.IntegerNameProvider; 071import cascading.util.jgrapht.VertexNameProvider; 072import org.jgrapht.DirectedGraph; 073import org.jgrapht.Graph; 074import org.jgrapht.GraphPath; 075import org.jgrapht.Graphs; 076import org.jgrapht.alg.DijkstraShortestPath; 077import org.jgrapht.alg.FloydWarshallShortestPaths; 078import org.jgrapht.alg.KShortestPaths; 079import org.jgrapht.graph.AbstractGraph; 080import org.jgrapht.graph.DirectedMultigraph; 081import org.jgrapht.graph.EdgeReversedGraph; 082import org.jgrapht.graph.SimpleDirectedGraph; 083import org.jgrapht.traverse.TopologicalOrderIterator; 084import org.jgrapht.util.TypeUtil; 085import org.slf4j.Logger; 086import org.slf4j.LoggerFactory; 087 088import static cascading.util.Util.*; 089import static java.lang.Double.POSITIVE_INFINITY; 090 091/** 092 * Class ElementGraphs maintains a collection of operations that can be performed on an {@link ElementGraph}. 093 */ 094public class ElementGraphs 095 { 096 private static final Logger LOG = LoggerFactory.getLogger( ElementGraphs.class ); 097 098 // not for instantiation 099 private ElementGraphs() 100 { 101 } 102 103 public static FlowElementGraph asFlowElementGraph( PlatformInfo platformInfo, FlowStepGraph flowStepGraph ) 104 { 105 ElementMultiGraph elementDirectedGraph = asElementMultiGraph( flowStepGraph.getElementGraphs() ); 106 107 Map<String, Tap> sources = asSourceMap( elementDirectedGraph, findSources( elementDirectedGraph ) ); 108 Map<String, Tap> sinks = asSinkMap( elementDirectedGraph, findSinks( elementDirectedGraph ) ); 109 110 return new FlowElementGraph( platformInfo, elementDirectedGraph, sources, sinks, null, null ); 111 } 112 113 public static ElementDirectedGraph asElementDirectedGraph( Collection<ElementGraph> elementGraphs ) 114 { 115 ElementDirectedGraph elementDirectedGraph = new ElementDirectedGraph(); 116 117 for( ElementGraph elementGraph : elementGraphs ) 118 elementDirectedGraph.copyFrom( asExtentMaskedSubGraph( elementGraph ) ); 119 120 return elementDirectedGraph; 121 } 122 123 public static ElementMultiGraph asElementMultiGraph( Collection<ElementGraph> elementGraphs ) 124 { 125 ElementMultiGraph elementDirectedGraph = new ElementMultiGraph(); 126 127 for( ElementGraph elementGraph : elementGraphs ) 128 elementDirectedGraph.copyFrom( asExtentMaskedSubGraph( elementGraph ) ); 129 130 return elementDirectedGraph; 131 } 132 133 public static boolean isEmpty( ElementGraph elementGraph ) 134 { 135 int size = elementGraph.vertexSet().size(); 136 137 if( size == 0 ) 138 return true; 139 140 return size == 2 && elementGraph.containsVertex( Extent.head ) && elementGraph.containsVertex( Extent.tail ); 141 } 142 143 private static class IdentityDirectedGraph<V, E> extends SimpleDirectedGraph<V, E> 144 { 145 public IdentityDirectedGraph( Class<? extends E> edgeClass ) 146 { 147 super( edgeClass ); 148 } 149 150 @Override 151 protected DirectedSpecifics createDirectedSpecifics() 152 { 153 return new DirectedSpecifics( new IdentityHashMap<V, DirectedEdgeContainer<V, E>>() ); 154 } 155 } 156 157 private static class IdentityMultiGraphGraph<V, E> extends DirectedMultigraph<V, E> 158 { 159 public IdentityMultiGraphGraph( Class<? extends E> edgeClass ) 160 { 161 super( edgeClass ); 162 } 163 164 @Override 165 protected DirectedSpecifics createDirectedSpecifics() 166 { 167 return new DirectedSpecifics( new IdentityHashMap<V, DirectedEdgeContainer<V, E>>() ); 168 } 169 } 170 171 public static DirectedGraph<FlowElement, Scope> directed( ElementGraph elementGraph ) 172 { 173 if( elementGraph == null ) 174 return null; 175 176 if( elementGraph instanceof DecoratedElementGraph ) 177 return directed( ( (DecoratedElementGraph) elementGraph ).getDecorated() ); 178 179 return ( (BaseElementGraph) elementGraph ).graph; 180 } 181 182 public static EnumMultiMap<FlowElement> annotations( ElementGraph elementGraph ) 183 { 184 if( elementGraph == null ) 185 return null; 186 187 if( elementGraph instanceof DecoratedElementGraph ) 188 return annotations( ( (DecoratedElementGraph) elementGraph ).getDecorated() ); 189 190 if( elementGraph instanceof AnnotatedGraph ) 191 return ( (AnnotatedGraph) elementGraph ).getAnnotations(); 192 193 return null; 194 } 195 196 public static boolean isMultiGraph( ElementGraph elementGraph ) 197 { 198 return directed( elementGraph ) instanceof DirectedMultigraph; 199 } 200 201 public static int hashCodeIgnoreAnnotations( ElementGraph elementGraph ) 202 { 203 return hashCodeIgnoreAnnotations( directed( elementGraph ) ); 204 } 205 206 public static <V, E> int hashCodeIgnoreAnnotations( Graph<V, E> graph ) 207 { 208 int hash = graph.vertexSet().hashCode(); 209 210 for( E e : graph.edgeSet() ) 211 { 212 int part = e.hashCode(); 213 214 int source = graph.getEdgeSource( e ).hashCode(); 215 int target = graph.getEdgeTarget( e ).hashCode(); 216 217 int pairing = pair( source, target ); 218 219 part = ( 27 * part ) + pairing; 220 221 long weight = (long) graph.getEdgeWeight( e ); 222 part = ( 27 * part ) + (int) ( weight ^ ( weight >>> 32 ) ); 223 224 hash += part; 225 } 226 227 return hash; 228 } 229 230 public static boolean equalsIgnoreAnnotations( ElementGraph lhs, ElementGraph rhs ) 231 { 232 return equalsIgnoreAnnotations( directed( lhs ), directed( rhs ) ); 233 } 234 235 public static <V, E> boolean equalsIgnoreAnnotations( Graph<V, E> lhs, Graph<V, E> rhs ) 236 { 237 if( lhs == rhs ) 238 return true; 239 240 TypeUtil<Graph<V, E>> typeDecl = null; 241 Graph<V, E> lhsGraph = TypeUtil.uncheckedCast( lhs, typeDecl ); 242 Graph<V, E> rhsGraph = TypeUtil.uncheckedCast( rhs, typeDecl ); 243 244 if( !lhsGraph.vertexSet().equals( rhsGraph.vertexSet() ) ) 245 return false; 246 247 if( lhsGraph.edgeSet().size() != rhsGraph.edgeSet().size() ) 248 return false; 249 250 for( E e : lhsGraph.edgeSet() ) 251 { 252 V source = lhsGraph.getEdgeSource( e ); 253 V target = lhsGraph.getEdgeTarget( e ); 254 255 if( !rhsGraph.containsEdge( e ) ) 256 return false; 257 258 if( !rhsGraph.getEdgeSource( e ).equals( source ) || !rhsGraph.getEdgeTarget( e ).equals( target ) ) 259 return false; 260 261 if( Math.abs( lhsGraph.getEdgeWeight( e ) - rhsGraph.getEdgeWeight( e ) ) > 10e-7 ) 262 return false; 263 } 264 265 return true; 266 } 267 268 public static boolean equals( ElementGraph lhs, ElementGraph rhs ) 269 { 270 if( !equalsIgnoreAnnotations( lhs, rhs ) ) 271 return false; 272 273 if( !( lhs instanceof AnnotatedGraph ) && !( rhs instanceof AnnotatedGraph ) ) 274 return true; 275 276 if( !( lhs instanceof AnnotatedGraph ) || !( rhs instanceof AnnotatedGraph ) ) 277 return false; 278 279 AnnotatedGraph lhsAnnotated = (AnnotatedGraph) lhs; 280 AnnotatedGraph rhsAnnotated = (AnnotatedGraph) rhs; 281 282 if( lhsAnnotated.hasAnnotations() != rhsAnnotated.hasAnnotations() ) 283 return false; 284 285 if( !lhsAnnotated.hasAnnotations() ) 286 return true; 287 288 return lhsAnnotated.getAnnotations().equals( rhsAnnotated.getAnnotations() ); 289 } 290 291 public static String canonicalHash( ElementGraph graph ) 292 { 293 return canonicalHash( directed( graph ) ); 294 } 295 296 public static String canonicalHash( Graph<FlowElement, Scope> graph ) 297 { 298 int hash = Murmur3.SEED; 299 300 int edges = 0; 301 boolean hasExtents = false; 302 303 for( Scope e : graph.edgeSet() ) 304 { 305 FlowElement edgeSource = graph.getEdgeSource( e ); 306 FlowElement edgeTarget = graph.getEdgeTarget( e ); 307 308 // simpler to ignore extents here 309 if( edgeSource instanceof Extent || edgeTarget instanceof Extent ) 310 { 311 hasExtents = true; 312 continue; 313 } 314 315 int source = hash( edgeSource ); 316 int target = hash( edgeTarget ); 317 int pairing = pair( source, target ); 318 319 hash += pairing; // don't make edge traversal order significant 320 edges++; 321 } 322 323 int vertexes = graph.vertexSet().size() - ( hasExtents ? 2 : 0 ); 324 325 hash = Murmur3.fmix( hash, vertexes * edges ); 326 327 return Util.getHex( Util.intToByteArray( hash ) ); 328 } 329 330 private static int hash( FlowElement flowElement ) 331 { 332 int lhs = flowElement.getClass().getName().hashCode(); 333 int rhs = 0; 334 335 if( flowElement instanceof Operator && ( (Operator) flowElement ).getOperation() != null ) 336 rhs = ( (Operator) flowElement ).getOperation().getClass().getName().hashCode(); 337 else if( flowElement instanceof Tap && ( (Tap) flowElement ).getScheme() != null ) 338 rhs = ( (Tap) flowElement ).getScheme().getClass().getName().hashCode(); 339 else if( flowElement instanceof Splice ) 340 rhs = ( (Splice) flowElement ).getJoiner().getClass().getName().hashCode() + 31 * ( (Splice) flowElement ).getNumSelfJoins(); 341 342 return pair( lhs, rhs ); 343 } 344 345 protected static int pair( int lhs, int rhs ) 346 { 347 if( rhs == 0 ) 348 return lhs; 349 350 // see http://en.wikipedia.org/wiki/Pairing_function 351 return ( ( lhs + rhs ) * ( lhs + rhs + 1 ) / 2 ) + rhs; 352 } 353 354 public static Iterator<FlowElement> getTopologicalIterator( ElementGraph graph ) 355 { 356 if( graph == null ) 357 return Collections.emptyIterator(); 358 359 return new TopologicalOrderIterator<>( directed( graph ) ); 360 } 361 362 public static Iterator<FlowElement> getReverseTopologicalIterator( ElementGraph graph ) 363 { 364 return new TopologicalOrderIterator<>( new EdgeReversedGraph<>( directed( graph ) ) ); 365 } 366 367 public static Collection<FlowElement> getAllElementsBetweenExclusive( ElementGraph graph, FlowElement from, FlowElement to ) 368 { 369 Collection<FlowElement> results = getAllElementsBetweenInclusive( graph, from, to ); 370 371 results.remove( from ); 372 results.remove( to ); 373 374 return results; 375 } 376 377 public static Collection<FlowElement> getAllElementsBetweenInclusive( ElementGraph graph, FlowElement from, FlowElement to ) 378 { 379 Set<FlowElement> results = Util.createIdentitySet(); 380 381 List<GraphPath<FlowElement, Scope>> pathsBetween = getAllShortestPathsBetween( graph, from, to ); 382 383 for( GraphPath<FlowElement, Scope> graphPath : pathsBetween ) 384 results.addAll( Graphs.getPathVertexList( graphPath ) ); 385 386 return results; 387 } 388 389 public static List<GraphPath<FlowElement, Scope>> getAllShortestPathsBetween( ElementGraph graph, FlowElement from, FlowElement to ) 390 { 391 return getAllShortestPathsBetween( directed( graph ), from, to ); 392 } 393 394 public static <V, E> List<GraphPath<V, E>> getAllShortestPathsBetween( DirectedGraph<V, E> graph, V from, V to ) 395 { 396 List<GraphPath<V, E>> paths = new KShortestPaths<>( graph, from, Integer.MAX_VALUE ).getPaths( to ); 397 398 if( paths == null ) 399 return new ArrayList<>(); 400 401 return paths; 402 } 403 404 public static ElementSubGraph asSubGraph( ElementGraph elementGraph, ElementGraph contractedGraph, Set<FlowElement> excludes ) 405 { 406 elementGraph = asExtentMaskedSubGraph( elementGraph ); // returns same instance if not bounded 407 408 Pair<Set<FlowElement>, Set<Scope>> pair = findClosureViaFloydWarshall( directed( elementGraph ), directed( contractedGraph ), excludes ); 409 Set<FlowElement> vertices = pair.getLhs(); 410 Set<Scope> excludeEdges = pair.getRhs(); 411 412 Set<Scope> scopes = new HashSet<>( elementGraph.edgeSet() ); 413 scopes.removeAll( excludeEdges ); 414 415 return new ElementSubGraph( elementGraph, vertices, scopes ); 416 } 417 418 /** 419 * Returns a new ElementGraph (a MaskedSubGraph) of the given ElementGraph that will not contain the {@link Extent} 420 * head or tail instances. 421 * <p/> 422 * If the given ElementGraph does not contain head or tail, it will be returned unchanged. 423 * 424 * @param elementGraph 425 * @return 426 */ 427 public static ElementGraph asExtentMaskedSubGraph( ElementGraph elementGraph ) 428 { 429 if( elementGraph.containsVertex( Extent.head ) || elementGraph.containsVertex( Extent.tail ) ) 430 return new ElementMaskSubGraph( elementGraph, Extent.head, Extent.tail ); 431 432 return elementGraph; 433 } 434 435 public static <V, E> Pair<Set<V>, Set<E>> findClosureViaFloydWarshall( DirectedGraph<V, E> full, DirectedGraph<V, E> contracted ) 436 { 437 return findClosureViaFloydWarshall( full, contracted, null ); 438 } 439 440 public static <V, E> Pair<Set<V>, Set<E>> findClosureViaFloydWarshall( DirectedGraph<V, E> full, DirectedGraph<V, E> contracted, Set<V> excludes ) 441 { 442 Set<V> vertices = createIdentitySet( contracted.vertexSet() ); 443 LinkedList<V> allVertices = new LinkedList<>( full.vertexSet() ); 444 445 allVertices.removeAll( vertices ); 446 447 Set<E> excludeEdges = new HashSet<>(); 448 449 // prevent distinguished elements from being included inside the sub-graph 450 if( excludes != null ) 451 { 452 for( V v : excludes ) 453 { 454 if( !full.containsVertex( v ) ) 455 continue; 456 457 excludeEdges.addAll( full.incomingEdgesOf( v ) ); 458 excludeEdges.addAll( full.outgoingEdgesOf( v ) ); 459 } 460 } 461 462 for( V v : contracted.vertexSet() ) 463 { 464 if( contracted.inDegreeOf( v ) == 0 ) 465 excludeEdges.addAll( full.incomingEdgesOf( v ) ); 466 467 if( contracted.outDegreeOf( v ) == 0 ) 468 excludeEdges.addAll( full.outgoingEdgesOf( v ) ); 469 } 470 471 // exclude edged between nodes in full that are not in the contracted graph 472 for( V outer : contracted.vertexSet() ) 473 { 474 for( V inner : contracted.vertexSet() ) 475 { 476 if( outer == inner ) 477 continue; 478 479 if( contracted.getAllEdges( inner, outer ).isEmpty() ) 480 excludeEdges.addAll( full.getAllEdges( inner, outer ) ); 481 } 482 } 483 484 DirectedGraph<V, E> disconnected = disconnectExtentsAndExclude( full, excludeEdges ); 485 486 FloydWarshallShortestPaths<V, E> paths = new FloydWarshallShortestPaths<>( disconnected ); 487 488 for( E edge : contracted.edgeSet() ) 489 { 490 V edgeSource = contracted.getEdgeSource( edge ); 491 V edgeTarget = contracted.getEdgeTarget( edge ); 492 493 ListIterator<V> iterator = allVertices.listIterator(); 494 while( iterator.hasNext() ) 495 { 496 V vertex = iterator.next(); 497 498 if( !isBetween( paths, edgeSource, edgeTarget, vertex ) ) 499 continue; 500 501 vertices.add( vertex ); 502 iterator.remove(); 503 } 504 } 505 506 return new Pair<>( vertices, excludeEdges ); 507 } 508 509 private static <V, E> DirectedGraph<V, E> disconnectExtentsAndExclude( DirectedGraph<V, E> full, Set<E> withoutEdges ) 510 { 511 IdentityMultiGraphGraph<V, E> copy = (IdentityMultiGraphGraph<V, E>) new IdentityMultiGraphGraph<>( Object.class ); 512 513 Graphs.addAllVertices( copy, full.vertexSet() ); 514 515 copy.removeVertex( (V) Extent.head ); 516 copy.removeVertex( (V) Extent.tail ); 517 518 Set<E> edges = full.edgeSet(); 519 520 if( !withoutEdges.isEmpty() ) 521 { 522 edges = new HashSet<>( edges ); 523 edges.removeAll( withoutEdges ); 524 } 525 526 Graphs.addAllEdges( copy, full, edges ); 527 528 return copy; 529 } 530 531 private static <V, E> boolean isBetween( FloydWarshallShortestPaths<V, E> paths, V edgeSource, V edgeTarget, V vertex ) 532 { 533 return paths.shortestDistance( edgeSource, vertex ) != POSITIVE_INFINITY && paths.shortestDistance( vertex, edgeTarget ) != POSITIVE_INFINITY; 534 } 535 536 public static void removeAndContract( ElementGraph elementGraph, FlowElement flowElement ) 537 { 538 LOG.debug( "removing element, contracting edge for: {}", flowElement ); 539 540 Set<Scope> incomingScopes = elementGraph.incomingEdgesOf( flowElement ); 541 542 boolean contractIncoming = true; 543 544 if( !contractIncoming ) 545 { 546 if( incomingScopes.size() != 1 ) 547 throw new IllegalStateException( "flow element:" + flowElement + ", has multiple input paths: " + incomingScopes.size() ); 548 } 549 550 boolean isJoin = flowElement instanceof Splice && ( (Splice) flowElement ).isJoin(); 551 552 for( Scope incoming : incomingScopes ) 553 { 554 Set<Scope> outgoingScopes = elementGraph.outgoingEdgesOf( flowElement ); 555 556 // source -> incoming -> flowElement -> outgoing -> target 557 FlowElement source = elementGraph.getEdgeSource( incoming ); 558 559 for( Scope outgoing : outgoingScopes ) 560 { 561 FlowElement target = elementGraph.getEdgeTarget( outgoing ); 562 563 boolean isNonBlocking = outgoing.isNonBlocking(); 564 565 if( isJoin ) 566 isNonBlocking = isNonBlocking && incoming.isNonBlocking(); 567 568 Scope scope = new Scope( outgoing ); 569 570 // unsure if necessary since we track blocking independently 571 // when removing a pipe, pull ordinal up to tap 572 // when removing a Splice retain ordinal 573 if( flowElement instanceof Splice ) 574 scope.setOrdinal( incoming.getOrdinal() ); 575 else 576 scope.setOrdinal( outgoing.getOrdinal() ); 577 578 scope.setNonBlocking( isNonBlocking ); 579 scope.addPriorNames( incoming, outgoing ); // not copied 580 581 boolean success = elementGraph.addEdge( source, target, scope ); 582 583 if( !success ) 584 throw new IllegalStateException( "during graph contraction, unable to add new edge between: " + source + " and " + target ); 585 } 586 } 587 588 elementGraph.removeVertex( flowElement ); 589 } 590 591 public static boolean printElementGraph( String filename, final ElementGraph graph, final PlatformInfo platformInfo ) 592 { 593 try 594 { 595 File parentFile = new File( filename ).getParentFile(); 596 597 if( parentFile != null && !parentFile.exists() ) 598 parentFile.mkdirs(); 599 600 Writer writer = new FileWriter( filename ); 601 602 Util.writeDOT( writer, ElementGraphs.directed( graph ), 603 new IntegerNameProvider<FlowElement>(), 604 new FlowElementVertexNameProvider( graph, platformInfo ), 605 new ScopeEdgeNameProvider(), 606 new VertexAttributeProvider(), new EdgeAttributeProvider() ); 607 608 writer.close(); 609 return true; 610 } 611 catch( NullPointerException | IOException exception ) 612 { 613 LOG.error( "failed printing graph to: {}, with exception: {}", filename, exception ); 614 } 615 616 return false; 617 } 618 619 public static boolean printProcessGraph( String filename, final ElementGraph graph, final ProcessGraph<? extends ProcessModel> processGraph ) 620 { 621 try 622 { 623 File parentFile = new File( filename ).getParentFile(); 624 625 if( parentFile != null && !parentFile.exists() ) 626 parentFile.mkdirs(); 627 628 Writer writer = new FileWriter( filename ); 629 630 DOTProcessGraphWriter graphWriter = new DOTProcessGraphWriter( 631 new IntegerNameProvider<Pair<ElementGraph, FlowElement>>(), 632 new FlowElementVertexNameProvider( graph, null ), 633 new ScopeEdgeNameProvider(), 634 new VertexAttributeProvider(), new EdgeAttributeProvider(), 635 new ProcessGraphNameProvider(), new ProcessGraphLabelProvider() 636 ); 637 638 graphWriter.writeGraph( writer, graph, processGraph ); 639 640 writer.close(); 641 return true; 642 } 643 catch( IOException exception ) 644 { 645 LOG.error( "failed printing graph to: {}, with exception: {}", filename, exception ); 646 } 647 648 return false; 649 } 650 651 public static void insertFlowElementAfter( ElementGraph elementGraph, FlowElement previousElement, FlowElement flowElement ) 652 { 653 Set<Scope> outgoing = new HashSet<>( elementGraph.outgoingEdgesOf( previousElement ) ); 654 655 elementGraph.addVertex( flowElement ); 656 657 String name = previousElement.toString(); 658 659 if( previousElement instanceof Pipe ) 660 name = ( (Pipe) previousElement ).getName(); 661 662 elementGraph.addEdge( previousElement, flowElement, new Scope( name ) ); 663 664 for( Scope scope : outgoing ) 665 { 666 FlowElement target = elementGraph.getEdgeTarget( scope ); 667 Scope foundScope = elementGraph.removeEdge( previousElement, target ); // remove scope 668 669 if( foundScope != scope ) 670 throw new IllegalStateException( "did not remove proper scope" ); 671 672 elementGraph.addEdge( flowElement, target, scope ); // add scope back 673 } 674 } 675 676 public static void insertFlowElementBefore( ElementGraph graph, FlowElement nextElement, FlowElement flowElement ) 677 { 678 Set<Scope> incoming = new HashSet<>( graph.incomingEdgesOf( nextElement ) ); 679 680 graph.addVertex( flowElement ); 681 682 String name = nextElement.toString(); 683 684 if( nextElement instanceof Pipe ) 685 name = ( (Pipe) nextElement ).getName(); 686 687 graph.addEdge( flowElement, nextElement, new Scope( name ) ); 688 689 for( Scope scope : incoming ) 690 { 691 FlowElement target = graph.getEdgeSource( scope ); 692 Scope foundScope = graph.removeEdge( target, nextElement ); // remove scope 693 694 if( foundScope != scope ) 695 throw new IllegalStateException( "did not remove proper scope" ); 696 697 graph.addEdge( target, flowElement, scope ); // add scope back 698 } 699 } 700 701 public static void insertFlowElementBetweenEdge( ElementGraph elementGraph, Scope previousEdge, FlowElement newElement ) 702 { 703 FlowElement previousElement = elementGraph.getEdgeSource( previousEdge ); 704 FlowElement nextElement = elementGraph.getEdgeTarget( previousEdge ); 705 706 elementGraph.addVertex( newElement ); 707 708 // add edge between previous and new 709 elementGraph.addEdge( previousElement, newElement, new Scope( previousEdge ) ); 710 711 // add edge between new and next 712 Scope scope = new Scope( previousEdge ); 713 714 scope.setOrdinal( previousEdge.getOrdinal() ); 715 716 if( nextElement instanceof Splice && ( (Splice) nextElement ).isJoin() && previousEdge.getOrdinal() != 0 ) 717 scope.setNonBlocking( false ); 718 719 elementGraph.addEdge( newElement, nextElement, scope ); 720 721 // remove previous edge 722 elementGraph.removeEdge( previousElement, nextElement ); 723 } 724 725 public static <F extends FlowElement> Map<String, F> asSourceMap( ElementGraph elementGraph, Set<F> elements ) 726 { 727 Map<String, F> map = new HashMap<>(); 728 729 for( F element : elements ) 730 { 731 for( Scope scope : elementGraph.outgoingEdgesOf( element ) ) 732 map.put( scope.getName(), element ); 733 } 734 735 return map; 736 } 737 738 public static Set<Tap> findSources( ElementGraph elementGraph ) 739 { 740 return findSources( elementGraph, Tap.class ); 741 } 742 743 public static <F extends FlowElement> Set<F> findSources( ElementGraph elementGraph, Class<F> type ) 744 { 745 if( elementGraph == null ) 746 return Collections.emptySet(); 747 748 if( elementGraph.containsVertex( Extent.head ) ) 749 return narrowIdentitySet( type, elementGraph.successorListOf( Extent.head ) ); 750 751 SubGraphIterator iterator = new ExpressionSubGraphIterator( 752 new ExpressionGraph( SearchOrder.Topological, new FlowElementExpression( ElementCapture.Primary, type, TypeExpression.Topo.Head ) ), 753 elementGraph 754 ); 755 756 return narrowIdentitySet( type, getAllVertices( iterator ) ); 757 } 758 759 public static <F extends FlowElement> Map<String, F> asSinkMap( ElementGraph elementGraph, Set<F> elements ) 760 { 761 Map<String, F> map = new HashMap<>(); 762 763 for( F element : elements ) 764 { 765 for( Scope scope : elementGraph.incomingEdgesOf( element ) ) 766 map.put( scope.getName(), element ); 767 } 768 769 return map; 770 } 771 772 public static <F extends FlowElement> Set<F> findSinks( ElementGraph elementGraph, Class<F> type ) 773 { 774 if( elementGraph == null ) 775 return Collections.emptySet(); 776 777 if( elementGraph.containsVertex( Extent.tail ) ) 778 return narrowIdentitySet( type, elementGraph.predecessorListOf( Extent.tail ) ); 779 780 SubGraphIterator iterator = new ExpressionSubGraphIterator( 781 new ExpressionGraph( SearchOrder.ReverseTopological, new FlowElementExpression( ElementCapture.Primary, type, TypeExpression.Topo.Tail ) ), 782 elementGraph 783 ); 784 785 return narrowIdentitySet( type, getAllVertices( iterator ) ); 786 } 787 788 public static Set<Tap> findSinks( ElementGraph elementGraph ) 789 { 790 return findSinks( elementGraph, Tap.class ); 791 } 792 793 public static Set<Group> findAllGroups( ElementGraph elementGraph ) 794 { 795 SubGraphIterator iterator = new ExpressionSubGraphIterator( 796 new ExpressionGraph( SearchOrder.Topological, new FlowElementExpression( ElementCapture.Primary, Group.class ) ), 797 elementGraph 798 ); 799 800 return narrowIdentitySet( Group.class, getAllVertices( iterator ) ); 801 } 802 803 private static Set<FlowElement> getAllVertices( SubGraphIterator iterator ) 804 { 805 Set<FlowElement> vertices = createIdentitySet(); 806 807 while( iterator.hasNext() ) 808 vertices.addAll( iterator.next().vertexSet() ); 809 810 return vertices; 811 } 812 813 public static Set<Scope> getAllMultiEdgesBetween( Collection<Scope> edgeList, ElementGraph current ) 814 { 815 Set<Scope> allEdgesBetween = new HashSet<>(); 816 817 for( Scope scope : edgeList ) 818 { 819 FlowElement edgeSource = current.getEdgeSource( scope ); 820 FlowElement edgeTarget = current.getEdgeTarget( scope ); 821 822 allEdgesBetween.addAll( current.getAllEdges( edgeSource, edgeTarget ) ); 823 } 824 825 return allEdgesBetween; 826 } 827 828 public static void replaceElementWith( ElementGraph elementGraph, FlowElement replace, FlowElement replaceWith ) 829 { 830 Set<Scope> incoming = new HashSet<Scope>( elementGraph.incomingEdgesOf( replace ) ); 831 Set<Scope> outgoing = new HashSet<Scope>( elementGraph.outgoingEdgesOf( replace ) ); 832 833 if( !elementGraph.containsVertex( replaceWith ) ) 834 elementGraph.addVertex( replaceWith ); 835 836 for( Scope scope : incoming ) 837 { 838 FlowElement source = elementGraph.getEdgeSource( scope ); 839 elementGraph.removeEdge( source, replace ); // remove scope 840 841 // drop edge between, if any 842 if( source != replaceWith ) 843 elementGraph.addEdge( source, replaceWith, scope ); // add scope back 844 } 845 846 for( Scope scope : outgoing ) 847 { 848 FlowElement target = elementGraph.getEdgeTarget( scope ); 849 elementGraph.removeEdge( replace, target ); // remove scope 850 851 // drop edge between, if any 852 if( target != replaceWith ) 853 elementGraph.addEdge( replaceWith, target, scope ); // add scope back 854 } 855 856 elementGraph.removeVertex( replace ); 857 } 858 859 public static Pipe findFirstPipeNamed( ElementGraph elementGraph, String name ) 860 { 861 Iterator<FlowElement> iterator = getTopologicalIterator( elementGraph ); 862 863 return find( name, iterator ); 864 } 865 866 public static Pipe findLastPipeNamed( ElementGraph elementGraph, String name ) 867 { 868 Iterator<FlowElement> iterator = getReverseTopologicalIterator( elementGraph ); 869 870 return find( name, iterator ); 871 } 872 873 private static Pipe find( String name, Iterator<FlowElement> iterator ) 874 { 875 while( iterator.hasNext() ) 876 { 877 FlowElement flowElement = iterator.next(); 878 879 if( flowElement instanceof Pipe && ( (Pipe) flowElement ).getName().equals( name ) ) 880 return (Pipe) flowElement; 881 } 882 883 return null; 884 } 885 886 public static boolean removeBranchContaining( ElementGraph elementGraph, FlowElement flowElement ) 887 { 888 Set<FlowElement> branch = new LinkedHashSet<>(); 889 890 walkUp( branch, elementGraph, flowElement ); 891 892 walkDown( branch, elementGraph, flowElement ); 893 894 if( branch.isEmpty() ) 895 return false; 896 897 for( FlowElement element : branch ) 898 elementGraph.removeVertex( element ); 899 900 return true; 901 } 902 903 public static boolean removeBranchBetween( ElementGraph elementGraph, FlowElement first, FlowElement second, boolean inclusive ) 904 { 905 Set<FlowElement> branch = new LinkedHashSet<>( Arrays.asList( first, second ) ); 906 907 walkDown( branch, elementGraph, first ); 908 909 if( !inclusive ) 910 { 911 branch.remove( first ); 912 branch.remove( second ); 913 } 914 915 if( branch.isEmpty() ) 916 return false; 917 918 for( FlowElement element : branch ) 919 elementGraph.removeVertex( element ); 920 921 return true; 922 } 923 924 private static void walkDown( Set<FlowElement> branch, ElementGraph elementGraph, FlowElement flowElement ) 925 { 926 FlowElement current; 927 current = flowElement; 928 929 while( true ) 930 { 931 if( !branch.contains( current ) && ( elementGraph.inDegreeOf( current ) != 1 || elementGraph.outDegreeOf( current ) != 1 ) ) 932 break; 933 934 branch.add( current ); 935 936 FlowElement element = elementGraph.getEdgeTarget( getFirst( elementGraph.outgoingEdgesOf( current ) ) ); 937 938 if( element instanceof Extent || branch.contains( element ) ) 939 break; 940 941 current = element; 942 } 943 } 944 945 private static void walkUp( Set<FlowElement> branch, ElementGraph elementGraph, FlowElement flowElement ) 946 { 947 FlowElement current = flowElement; 948 949 while( true ) 950 { 951 if( elementGraph.inDegreeOf( current ) != 1 || elementGraph.outDegreeOf( current ) != 1 ) 952 break; 953 954 branch.add( current ); 955 956 FlowElement element = elementGraph.getEdgeSource( getFirst( elementGraph.incomingEdgesOf( current ) ) ); 957 958 if( element instanceof Extent || branch.contains( element ) ) 959 break; 960 961 current = element; 962 } 963 } 964 965 /** 966 * Returns the number of edges found on the shortest distance between the lhs and rhs. 967 */ 968 public static int shortestDistance( ElementGraph graph, FlowElement lhs, FlowElement rhs ) 969 { 970 return DijkstraShortestPath.findPathBetween( directed( graph ), lhs, rhs ).size(); 971 } 972 973 private static class FlowElementVertexNameProvider implements VertexNameProvider<FlowElement> 974 { 975 private final ElementGraph elementGraph; 976 private final PlatformInfo platformInfo; 977 978 public FlowElementVertexNameProvider( ElementGraph elementGraph, PlatformInfo platformInfo ) 979 { 980 this.elementGraph = elementGraph; 981 this.platformInfo = platformInfo; 982 } 983 984 public String getVertexName( FlowElement object ) 985 { 986 if( object instanceof Extent ) // is head/tail 987 { 988 String result = object.toString().replaceAll( "\"", "\'" ); 989 990 if( object == Extent.tail ) 991 return result; 992 993 result = result + "|hash: " + canonicalHash( elementGraph ); 994 995 String versionString = Version.getRelease(); 996 997 if( platformInfo != null ) 998 versionString = ( versionString == null ? "" : versionString + "|" ) + platformInfo; 999 1000 return "{" + ( versionString == null ? result : result + "|" + versionString ) + "}"; 1001 } 1002 1003 String label; 1004 1005 Iterator<Scope> iterator = elementGraph.outgoingEdgesOf( object ).iterator(); 1006 1007 if( !( object instanceof Pipe ) || !iterator.hasNext() ) 1008 { 1009 label = object.toString().replaceAll( "\"", "\'" ).replaceAll( "(\\)|\\])(\\[)", "$1|$2" ).replaceAll( "(^[^(\\[]+)(\\(|\\[)", "$1|$2" ); 1010 } 1011 else 1012 { 1013 Scope scope = iterator.next(); 1014 1015 label = ( (Pipe) object ).print( scope ).replaceAll( "\"", "\'" ).replaceAll( "(\\)|\\])(\\[)", "$1|$2" ).replaceAll( "(^[^(\\[]+)(\\(|\\[)", "$1|$2" ); 1016 } 1017 1018 label = label.replaceFirst( "([^|]+)\\|(.*)", "$1 : " + getID( object ) + "|$2" ); // insert id 1019 1020 label = "{" + label.replaceAll( "\\{", "\\\\{" ).replaceAll( "\\}", "\\\\}" ).replaceAll( "<", "\\\\<" ).replaceAll( ">", "\\\\>" ) + "}"; 1021 1022 if( !( elementGraph instanceof AnnotatedGraph ) || !( (AnnotatedGraph) elementGraph ).hasAnnotations() ) 1023 return label; 1024 1025 Set<Enum> annotations = ( (AnnotatedGraph) elementGraph ).getAnnotations().getKeysFor( object ); 1026 1027 if( !annotations.isEmpty() ) 1028 label += "|{" + Util.join( annotations, "|" ) + "}"; 1029 1030 return label; 1031 } 1032 1033 protected String getID( FlowElement object ) 1034 { 1035 return FlowElements.id( object ).substring( 0, 5 ); 1036 } 1037 } 1038 1039 private static class ScopeEdgeNameProvider implements EdgeNameProvider<Scope> 1040 { 1041 public String getEdgeName( Scope object ) 1042 { 1043 return object.toString().replaceAll( "\"", "\'" ).replaceAll( "\n", "\\\\n" ); // fix for newlines in graphviz 1044 } 1045 } 1046 1047 private static class VertexAttributeProvider implements ComponentAttributeProvider<FlowElement> 1048 { 1049 static Map<String, String> defaultNode = new HashMap<String, String>() 1050 { 1051 {put( "shape", "Mrecord" );} 1052 }; 1053 1054 public VertexAttributeProvider() 1055 { 1056 } 1057 1058 @Override 1059 public Map<String, String> getComponentAttributes( FlowElement object ) 1060 { 1061 return defaultNode; 1062 } 1063 } 1064 1065 private static class EdgeAttributeProvider implements ComponentAttributeProvider<Scope> 1066 { 1067 static Map<String, String> attributes = new HashMap<String, String>() 1068 { 1069 {put( "style", "dotted" );} 1070 1071 {put( "arrowhead", "dot" );} 1072 }; 1073 1074 @Override 1075 public Map<String, String> getComponentAttributes( Scope scope ) 1076 { 1077 if( scope.isNonBlocking() ) 1078 return null; 1079 1080 return attributes; 1081 } 1082 } 1083 1084 private static class ProcessGraphNameProvider implements VertexNameProvider<ProcessModel> 1085 { 1086 @Override 1087 public String getVertexName( ProcessModel processModel ) 1088 { 1089 return "" + processModel.getOrdinal(); 1090 } 1091 } 1092 1093 private static class ProcessGraphLabelProvider implements VertexNameProvider<ProcessModel> 1094 { 1095 @Override 1096 public String getVertexName( ProcessModel processModel ) 1097 { 1098 return "ordinal: " + processModel.getOrdinal() + 1099 "\\nid: " + processModel.getID() + 1100 "\\nhash: " + canonicalHash( processModel.getElementGraph() ); 1101 } 1102 } 1103 1104 static void injectIdentityMap( AbstractGraph graph ) 1105 { 1106 // this overcomes jgrapht 0.9.0 using a LinkedHashMap vs an IdentityHashMap 1107 // vertex not found errors will be thrown if this fails 1108 Object specifics = Util.returnInstanceFieldIfExistsSafe( graph, "specifics" ); 1109 1110 if( specifics == null ) 1111 { 1112 LOG.warn( "unable to get jgrapht Specifics for identity map injection, may be using an incompatible jgrapht version" ); 1113 return; 1114 } 1115 1116 boolean success = Util.setInstanceFieldIfExistsSafe( specifics, "vertexMapDirected", new IdentityHashMap<>() ); 1117 1118 if( !success ) 1119 LOG.warn( "unable to set IdentityHashMap on jgrapht Specifics, may be using an incompatible jgrapht version" ); 1120 } 1121 }