001/* 002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.flow.planner.graph; 022 023import java.io.File; 024import java.io.FileWriter; 025import java.io.IOException; 026import java.io.Writer; 027import java.util.ArrayList; 028import java.util.Arrays; 029import java.util.Collections; 030import java.util.HashMap; 031import java.util.HashSet; 032import java.util.IdentityHashMap; 033import java.util.Iterator; 034import java.util.LinkedHashSet; 035import java.util.LinkedList; 036import java.util.List; 037import java.util.ListIterator; 038import java.util.Map; 039import java.util.Set; 040 041import cascading.flow.FlowElement; 042import cascading.flow.FlowElements; 043import cascading.flow.planner.PlatformInfo; 044import cascading.flow.planner.Scope; 045import cascading.flow.planner.iso.expression.ElementCapture; 046import cascading.flow.planner.iso.expression.ExpressionGraph; 047import cascading.flow.planner.iso.expression.FlowElementExpression; 048import cascading.flow.planner.iso.expression.TypeExpression; 049import cascading.flow.planner.iso.finder.SearchOrder; 050import cascading.flow.planner.iso.subgraph.SubGraphIterator; 051import cascading.flow.planner.iso.subgraph.iterator.ExpressionSubGraphIterator; 052import cascading.flow.planner.process.ProcessGraph; 053import cascading.flow.planner.process.ProcessModel; 054import cascading.pipe.Group; 055import cascading.pipe.Operator; 056import cascading.pipe.Pipe; 057import cascading.pipe.Splice; 058import cascading.tap.Tap; 059import cascading.util.DOTProcessGraphWriter; 060import cascading.util.Murmur3; 061import cascading.util.Pair; 062import cascading.util.Util; 063import cascading.util.Version; 064import cascading.util.jgrapht.ComponentAttributeProvider; 065import cascading.util.jgrapht.EdgeNameProvider; 066import cascading.util.jgrapht.IntegerNameProvider; 067import cascading.util.jgrapht.VertexNameProvider; 068import org.jgrapht.DirectedGraph; 069import org.jgrapht.Graph; 070import org.jgrapht.GraphPath; 071import org.jgrapht.Graphs; 072import org.jgrapht.alg.DijkstraShortestPath; 073import org.jgrapht.alg.FloydWarshallShortestPaths; 074import org.jgrapht.alg.KShortestPaths; 075import org.jgrapht.graph.AbstractGraph; 076import org.jgrapht.graph.EdgeReversedGraph; 077import org.jgrapht.graph.SimpleDirectedGraph; 078import org.jgrapht.traverse.TopologicalOrderIterator; 079import org.jgrapht.util.TypeUtil; 080import org.slf4j.Logger; 081import org.slf4j.LoggerFactory; 082 083import static cascading.util.Util.*; 084import static java.lang.Double.POSITIVE_INFINITY; 085 086/** 087 * Class ElementGraphs maintains a collection of operations that can be performed on an {@link ElementGraph}. 088 */ 089public class ElementGraphs 090 { 091 private static final Logger LOG = LoggerFactory.getLogger( ElementGraphs.class ); 092 093 // not for instantiation 094 private ElementGraphs() 095 { 096 } 097 098 private static class IdentityDirectedGraph<V, E> extends SimpleDirectedGraph<V, E> 099 { 100 public IdentityDirectedGraph( Class<? extends E> edgeClass ) 101 { 102 super( edgeClass ); 103 } 104 105 @Override 106 protected DirectedSpecifics createDirectedSpecifics() 107 { 108 return new DirectedSpecifics( new IdentityHashMap<V, DirectedEdgeContainer<V, E>>() ); 109 } 110 } 111 112 public static DirectedGraph<FlowElement, Scope> directed( ElementGraph elementGraph ) 113 { 114 if( elementGraph == null ) 115 return null; 116 117 if( elementGraph instanceof DecoratedElementGraph ) 118 return directed( ( (DecoratedElementGraph) elementGraph ).getDecorated() ); 119 120 return ( (BaseElementGraph) elementGraph ).graph; 121 } 122 123 public static int hashCodeIgnoreAnnotations( ElementGraph elementGraph ) 124 { 125 return hashCodeIgnoreAnnotations( directed( elementGraph ) ); 126 } 127 128 public static <V, E> int hashCodeIgnoreAnnotations( Graph<V, E> graph ) 129 { 130 int hash = graph.vertexSet().hashCode(); 131 132 for( E e : graph.edgeSet() ) 133 { 134 int part = e.hashCode(); 135 136 int source = graph.getEdgeSource( e ).hashCode(); 137 int target = graph.getEdgeTarget( e ).hashCode(); 138 139 int pairing = pair( source, target ); 140 141 part = ( 27 * part ) + pairing; 142 143 long weight = (long) graph.getEdgeWeight( e ); 144 part = ( 27 * part ) + (int) ( weight ^ ( weight >>> 32 ) ); 145 146 hash += part; 147 } 148 149 return hash; 150 } 151 152 public static boolean equalsIgnoreAnnotations( ElementGraph lhs, ElementGraph rhs ) 153 { 154 return equalsIgnoreAnnotations( directed( lhs ), directed( rhs ) ); 155 } 156 157 public static <V, E> boolean equalsIgnoreAnnotations( Graph<V, E> lhs, Graph<V, E> rhs ) 158 { 159 if( lhs == rhs ) 160 return true; 161 162 TypeUtil<Graph<V, E>> typeDecl = null; 163 Graph<V, E> lhsGraph = TypeUtil.uncheckedCast( lhs, typeDecl ); 164 Graph<V, E> rhsGraph = TypeUtil.uncheckedCast( rhs, typeDecl ); 165 166 if( !lhsGraph.vertexSet().equals( rhsGraph.vertexSet() ) ) 167 return false; 168 169 if( lhsGraph.edgeSet().size() != rhsGraph.edgeSet().size() ) 170 return false; 171 172 for( E e : lhsGraph.edgeSet() ) 173 { 174 V source = lhsGraph.getEdgeSource( e ); 175 V target = lhsGraph.getEdgeTarget( e ); 176 177 if( !rhsGraph.containsEdge( e ) ) 178 return false; 179 180 if( !rhsGraph.getEdgeSource( e ).equals( source ) || !rhsGraph.getEdgeTarget( e ).equals( target ) ) 181 return false; 182 183 if( Math.abs( lhsGraph.getEdgeWeight( e ) - rhsGraph.getEdgeWeight( e ) ) > 10e-7 ) 184 return false; 185 } 186 187 return true; 188 } 189 190 public static boolean equals( ElementGraph lhs, ElementGraph rhs ) 191 { 192 if( !equalsIgnoreAnnotations( lhs, rhs ) ) 193 return false; 194 195 if( !( lhs instanceof AnnotatedGraph ) && !( rhs instanceof AnnotatedGraph ) ) 196 return true; 197 198 if( !( lhs instanceof AnnotatedGraph ) || !( rhs instanceof AnnotatedGraph ) ) 199 return false; 200 201 AnnotatedGraph lhsAnnotated = (AnnotatedGraph) lhs; 202 AnnotatedGraph rhsAnnotated = (AnnotatedGraph) rhs; 203 204 if( lhsAnnotated.hasAnnotations() != rhsAnnotated.hasAnnotations() ) 205 return false; 206 207 if( !lhsAnnotated.hasAnnotations() ) 208 return true; 209 210 return lhsAnnotated.getAnnotations().equals( rhsAnnotated.getAnnotations() ); 211 } 212 213 public static String canonicalHash( ElementGraph graph ) 214 { 215 return canonicalHash( directed( graph ) ); 216 } 217 218 public static String canonicalHash( Graph<FlowElement, Scope> graph ) 219 { 220 int hash = Murmur3.SEED; 221 222 int edges = 0; 223 boolean hasExtents = false; 224 225 for( Scope e : graph.edgeSet() ) 226 { 227 FlowElement edgeSource = graph.getEdgeSource( e ); 228 FlowElement edgeTarget = graph.getEdgeTarget( e ); 229 230 // simpler to ignore extents here 231 if( edgeSource instanceof Extent || edgeTarget instanceof Extent ) 232 { 233 hasExtents = true; 234 continue; 235 } 236 237 int source = hash( edgeSource ); 238 int target = hash( edgeTarget ); 239 int pairing = pair( source, target ); 240 241 hash += pairing; // don't make edge traversal order significant 242 edges++; 243 } 244 245 int vertexes = graph.vertexSet().size() - ( hasExtents ? 2 : 0 ); 246 247 hash = Murmur3.fmix( hash, vertexes * edges ); 248 249 return Util.getHex( Util.intToByteArray( hash ) ); 250 } 251 252 private static int hash( FlowElement flowElement ) 253 { 254 int lhs = flowElement.getClass().getName().hashCode(); 255 int rhs = 0; 256 257 if( flowElement instanceof Operator && ( (Operator) flowElement ).getOperation() != null ) 258 rhs = ( (Operator) flowElement ).getOperation().getClass().getName().hashCode(); 259 else if( flowElement instanceof Tap && ( (Tap) flowElement ).getScheme() != null ) 260 rhs = ( (Tap) flowElement ).getScheme().getClass().getName().hashCode(); 261 else if( flowElement instanceof Splice ) 262 rhs = ( (Splice) flowElement ).getJoiner().getClass().getName().hashCode() + 31 * ( (Splice) flowElement ).getNumSelfJoins(); 263 264 return pair( lhs, rhs ); 265 } 266 267 protected static int pair( int lhs, int rhs ) 268 { 269 if( rhs == 0 ) 270 return lhs; 271 272 // see http://en.wikipedia.org/wiki/Pairing_function 273 return ( ( lhs + rhs ) * ( lhs + rhs + 1 ) / 2 ) + rhs; 274 } 275 276 public static Iterator<FlowElement> getTopologicalIterator( ElementGraph graph ) 277 { 278 if( graph == null ) 279 return Collections.emptyIterator(); 280 281 return new TopologicalOrderIterator<>( directed( graph ) ); 282 } 283 284 public static Iterator<FlowElement> getReverseTopologicalIterator( ElementGraph graph ) 285 { 286 return new TopologicalOrderIterator<>( new EdgeReversedGraph<>( directed( graph ) ) ); 287 } 288 289 public static List<GraphPath<FlowElement, Scope>> getAllShortestPathsBetween( ElementGraph graph, FlowElement from, FlowElement to ) 290 { 291 return getAllShortestPathsBetween( directed( graph ), from, to ); 292 } 293 294 public static <V, E> List<GraphPath<V, E>> getAllShortestPathsBetween( DirectedGraph<V, E> graph, V from, V to ) 295 { 296 List<GraphPath<V, E>> paths = new KShortestPaths<>( graph, from, Integer.MAX_VALUE ).getPaths( to ); 297 298 if( paths == null ) 299 return new ArrayList<>(); 300 301 return paths; 302 } 303 304 public static ElementSubGraph asSubGraph( ElementGraph elementGraph, ElementGraph contractedGraph, Set<FlowElement> excludes ) 305 { 306 elementGraph = asExtentMaskedSubGraph( elementGraph ); // returns same instance if not bounded 307 308 Pair<Set<FlowElement>, Set<Scope>> pair = findClosureViaFloydWarshall( directed( elementGraph ), directed( contractedGraph ), excludes ); 309 Set<FlowElement> vertices = pair.getLhs(); 310 Set<Scope> excludeEdges = pair.getRhs(); 311 312 Set<Scope> scopes = new HashSet<>( elementGraph.edgeSet() ); 313 scopes.removeAll( excludeEdges ); 314 315 return new ElementSubGraph( elementGraph, vertices, scopes ); 316 } 317 318 /** 319 * Returns a new ElementGraph (a MaskedSubGraph) of the given ElementGraph that will not contain the {@link Extent} 320 * head or tail instances. 321 * <p/> 322 * If the given ElementGraph does not contain head or tail, it will be returned unchanged. 323 * 324 * @param elementGraph 325 * @return 326 */ 327 public static ElementGraph asExtentMaskedSubGraph( ElementGraph elementGraph ) 328 { 329 if( elementGraph.containsVertex( Extent.head ) || elementGraph.containsVertex( Extent.tail ) ) 330 return new ElementMaskSubGraph( elementGraph, Extent.head, Extent.tail ); 331 332 return elementGraph; 333 } 334 335 public static <V, E> Pair<Set<V>, Set<E>> findClosureViaFloydWarshall( DirectedGraph<V, E> full, DirectedGraph<V, E> contracted ) 336 { 337 return findClosureViaFloydWarshall( full, contracted, null ); 338 } 339 340 public static <V, E> Pair<Set<V>, Set<E>> findClosureViaFloydWarshall( DirectedGraph<V, E> full, DirectedGraph<V, E> contracted, Set<V> excludes ) 341 { 342 Set<V> vertices = createIdentitySet( contracted.vertexSet() ); 343 LinkedList<V> allVertices = new LinkedList<>( full.vertexSet() ); 344 345 allVertices.removeAll( vertices ); 346 347 Set<E> excludeEdges = new HashSet<>(); 348 349 // prevent distinguished elements from being included inside the sub-graph 350 if( excludes != null ) 351 { 352 for( V v : excludes ) 353 { 354 if( !full.containsVertex( v ) ) 355 continue; 356 357 excludeEdges.addAll( full.incomingEdgesOf( v ) ); 358 excludeEdges.addAll( full.outgoingEdgesOf( v ) ); 359 } 360 } 361 362 for( V v : contracted.vertexSet() ) 363 { 364 if( contracted.inDegreeOf( v ) == 0 ) 365 excludeEdges.addAll( full.incomingEdgesOf( v ) ); 366 } 367 368 for( V v : contracted.vertexSet() ) 369 { 370 if( contracted.outDegreeOf( v ) == 0 ) 371 excludeEdges.addAll( full.outgoingEdgesOf( v ) ); 372 } 373 374 DirectedGraph<V, E> disconnected = disconnectExtentsAndExclude( full, excludeEdges ); 375 376 FloydWarshallShortestPaths<V, E> paths = new FloydWarshallShortestPaths<>( disconnected ); 377 378 for( E edge : contracted.edgeSet() ) 379 { 380 V edgeSource = contracted.getEdgeSource( edge ); 381 V edgeTarget = contracted.getEdgeTarget( edge ); 382 383 ListIterator<V> iterator = allVertices.listIterator(); 384 while( iterator.hasNext() ) 385 { 386 V vertex = iterator.next(); 387 388 if( !isBetween( paths, edgeSource, edgeTarget, vertex ) ) 389 continue; 390 391 vertices.add( vertex ); 392 iterator.remove(); 393 } 394 } 395 396 return new Pair<>( vertices, excludeEdges ); 397 } 398 399 private static <V, E> DirectedGraph<V, E> disconnectExtentsAndExclude( DirectedGraph<V, E> full, Set<E> withoutEdges ) 400 { 401 IdentityDirectedGraph<V, E> copy = (IdentityDirectedGraph<V, E>) new IdentityDirectedGraph<>( Object.class ); 402 403 Graphs.addAllVertices( copy, full.vertexSet() ); 404 405 copy.removeVertex( (V) Extent.head ); 406 copy.removeVertex( (V) Extent.tail ); 407 408 Set<E> edges = full.edgeSet(); 409 410 if( !withoutEdges.isEmpty() ) 411 { 412 edges = new HashSet<>( edges ); 413 edges.removeAll( withoutEdges ); 414 } 415 416 Graphs.addAllEdges( copy, full, edges ); 417 418 return copy; 419 } 420 421 private static <V, E> boolean isBetween( FloydWarshallShortestPaths<V, E> paths, V edgeSource, V edgeTarget, V vertex ) 422 { 423 return paths.shortestDistance( edgeSource, vertex ) != POSITIVE_INFINITY && paths.shortestDistance( vertex, edgeTarget ) != POSITIVE_INFINITY; 424 } 425 426 public static void removeAndContract( ElementGraph elementGraph, FlowElement flowElement ) 427 { 428 LOG.debug( "removing element, contracting edge for: {}", flowElement ); 429 430 Set<Scope> incomingScopes = elementGraph.incomingEdgesOf( flowElement ); 431 432 boolean contractIncoming = true; 433 434 if( !contractIncoming ) 435 { 436 if( incomingScopes.size() != 1 ) 437 throw new IllegalStateException( "flow element:" + flowElement + ", has multiple input paths: " + incomingScopes.size() ); 438 } 439 440 boolean isJoin = flowElement instanceof Splice && ( (Splice) flowElement ).isJoin(); 441 442 for( Scope incoming : incomingScopes ) 443 { 444 Set<Scope> outgoingScopes = elementGraph.outgoingEdgesOf( flowElement ); 445 446 // source -> incoming -> flowElement -> outgoing -> target 447 FlowElement source = elementGraph.getEdgeSource( incoming ); 448 449 for( Scope outgoing : outgoingScopes ) 450 { 451 FlowElement target = elementGraph.getEdgeTarget( outgoing ); 452 453 boolean isNonBlocking = outgoing.isNonBlocking(); 454 455 if( isJoin ) 456 isNonBlocking = isNonBlocking && incoming.isNonBlocking(); 457 458 Scope scope = new Scope( outgoing ); 459 460 // unsure if necessary since we track blocking independently 461 // when removing a pipe, pull ordinal up to tap 462 // when removing a Splice retain ordinal 463 if( flowElement instanceof Splice ) 464 scope.setOrdinal( incoming.getOrdinal() ); 465 else 466 scope.setOrdinal( outgoing.getOrdinal() ); 467 468 scope.setNonBlocking( isNonBlocking ); 469 scope.addPriorNames( incoming, outgoing ); // not copied 470 elementGraph.addEdge( source, target, scope ); 471 } 472 } 473 474 elementGraph.removeVertex( flowElement ); 475 } 476 477 public static boolean printElementGraph( String filename, final ElementGraph graph, final PlatformInfo platformInfo ) 478 { 479 try 480 { 481 File parentFile = new File( filename ).getParentFile(); 482 483 if( parentFile != null && !parentFile.exists() ) 484 parentFile.mkdirs(); 485 486 Writer writer = new FileWriter( filename ); 487 488 Util.writeDOT( writer, ElementGraphs.directed( graph ), 489 new IntegerNameProvider<FlowElement>(), 490 new FlowElementVertexNameProvider( graph, platformInfo ), 491 new ScopeEdgeNameProvider(), 492 new VertexAttributeProvider(), new EdgeAttributeProvider() ); 493 494 writer.close(); 495 return true; 496 } 497 catch( NullPointerException | IOException exception ) 498 { 499 LOG.error( "failed printing graph to: {}, with exception: {}", filename, exception ); 500 } 501 502 return false; 503 } 504 505 public static boolean printProcessGraph( String filename, final ElementGraph graph, final ProcessGraph<? extends ProcessModel> processGraph ) 506 { 507 try 508 { 509 File parentFile = new File( filename ).getParentFile(); 510 511 if( parentFile != null && !parentFile.exists() ) 512 parentFile.mkdirs(); 513 514 Writer writer = new FileWriter( filename ); 515 516 DOTProcessGraphWriter graphWriter = new DOTProcessGraphWriter( 517 new IntegerNameProvider<Pair<ElementGraph, FlowElement>>(), 518 new FlowElementVertexNameProvider( graph, null ), 519 new ScopeEdgeNameProvider(), 520 new VertexAttributeProvider(), new EdgeAttributeProvider(), 521 new ProcessGraphNameProvider(), new ProcessGraphLabelProvider() 522 ); 523 524 graphWriter.writeGraph( writer, graph, processGraph ); 525 526 writer.close(); 527 return true; 528 } 529 catch( IOException exception ) 530 { 531 LOG.error( "failed printing graph to: {}, with exception: {}", filename, exception ); 532 } 533 534 return false; 535 } 536 537 public static void insertFlowElementAfter( ElementGraph elementGraph, FlowElement previousElement, FlowElement flowElement ) 538 { 539 Set<Scope> outgoing = new HashSet<>( elementGraph.outgoingEdgesOf( previousElement ) ); 540 541 elementGraph.addVertex( flowElement ); 542 543 String name = previousElement.toString(); 544 545 if( previousElement instanceof Pipe ) 546 name = ( (Pipe) previousElement ).getName(); 547 548 elementGraph.addEdge( previousElement, flowElement, new Scope( name ) ); 549 550 for( Scope scope : outgoing ) 551 { 552 FlowElement target = elementGraph.getEdgeTarget( scope ); 553 Scope foundScope = elementGraph.removeEdge( previousElement, target ); // remove scope 554 555 if( foundScope != scope ) 556 throw new IllegalStateException( "did not remove proper scope" ); 557 558 elementGraph.addEdge( flowElement, target, scope ); // add scope back 559 } 560 } 561 562 public static void insertFlowElementBefore( ElementGraph graph, FlowElement nextElement, FlowElement flowElement ) 563 { 564 Set<Scope> incoming = new HashSet<>( graph.incomingEdgesOf( nextElement ) ); 565 566 graph.addVertex( flowElement ); 567 568 String name = nextElement.toString(); 569 570 if( nextElement instanceof Pipe ) 571 name = ( (Pipe) nextElement ).getName(); 572 573 graph.addEdge( flowElement, nextElement, new Scope( name ) ); 574 575 for( Scope scope : incoming ) 576 { 577 FlowElement target = graph.getEdgeSource( scope ); 578 Scope foundScope = graph.removeEdge( target, nextElement ); // remove scope 579 580 if( foundScope != scope ) 581 throw new IllegalStateException( "did not remove proper scope" ); 582 583 graph.addEdge( target, flowElement, scope ); // add scope back 584 } 585 } 586 587 public static void insertFlowElementBetweenEdge( ElementGraph elementGraph, Scope previousEdge, FlowElement newElement ) 588 { 589 FlowElement previousElement = elementGraph.getEdgeSource( previousEdge ); 590 FlowElement nextElement = elementGraph.getEdgeTarget( previousEdge ); 591 592 elementGraph.addVertex( newElement ); 593 594 // add edge between previous and new 595 elementGraph.addEdge( previousElement, newElement, new Scope( previousEdge ) ); 596 597 // add edge between new and next 598 Scope scope = new Scope( previousEdge ); 599 600 scope.setOrdinal( previousEdge.getOrdinal() ); 601 602 if( nextElement instanceof Splice && ( (Splice) nextElement ).isJoin() && previousEdge.getOrdinal() != 0 ) 603 scope.setNonBlocking( false ); 604 605 elementGraph.addEdge( newElement, nextElement, scope ); 606 607 // remove previous edge 608 elementGraph.removeEdge( previousElement, nextElement ); 609 } 610 611 public static Set<Tap> findSources( ElementGraph elementGraph ) 612 { 613 return findSources( elementGraph, Tap.class ); 614 } 615 616 public static <F extends FlowElement> Set<F> findSources( ElementGraph elementGraph, Class<F> type ) 617 { 618 if( elementGraph == null ) 619 return Collections.emptySet(); 620 621 if( elementGraph.containsVertex( Extent.head ) ) 622 return narrowIdentitySet( type, elementGraph.successorListOf( Extent.head ) ); 623 624 SubGraphIterator iterator = new ExpressionSubGraphIterator( 625 new ExpressionGraph( SearchOrder.Topological, new FlowElementExpression( ElementCapture.Primary, type, TypeExpression.Topo.Head ) ), 626 elementGraph 627 ); 628 629 return narrowIdentitySet( type, getAllVertices( iterator ) ); 630 } 631 632 public static <F extends FlowElement> Set<F> findSinks( ElementGraph elementGraph, Class<F> type ) 633 { 634 if( elementGraph == null ) 635 return Collections.emptySet(); 636 637 if( elementGraph.containsVertex( Extent.tail ) ) 638 return narrowIdentitySet( type, elementGraph.predecessorListOf( Extent.tail ) ); 639 640 SubGraphIterator iterator = new ExpressionSubGraphIterator( 641 new ExpressionGraph( SearchOrder.ReverseTopological, new FlowElementExpression( ElementCapture.Primary, type, TypeExpression.Topo.Tail ) ), 642 elementGraph 643 ); 644 645 return narrowIdentitySet( type, getAllVertices( iterator ) ); 646 } 647 648 public static Set<Tap> findSinks( ElementGraph elementGraph ) 649 { 650 return findSinks( elementGraph, Tap.class ); 651 } 652 653 public static Set<Group> findAllGroups( ElementGraph elementGraph ) 654 { 655 SubGraphIterator iterator = new ExpressionSubGraphIterator( 656 new ExpressionGraph( SearchOrder.Topological, new FlowElementExpression( ElementCapture.Primary, Group.class ) ), 657 elementGraph 658 ); 659 660 return narrowIdentitySet( Group.class, getAllVertices( iterator ) ); 661 } 662 663 private static Set<FlowElement> getAllVertices( SubGraphIterator iterator ) 664 { 665 Set<FlowElement> vertices = createIdentitySet(); 666 667 while( iterator.hasNext() ) 668 vertices.addAll( iterator.next().vertexSet() ); 669 670 return vertices; 671 } 672 673 public static void replaceElementWith( ElementGraph elementGraph, FlowElement replace, FlowElement replaceWith ) 674 { 675 Set<Scope> incoming = new HashSet<Scope>( elementGraph.incomingEdgesOf( replace ) ); 676 Set<Scope> outgoing = new HashSet<Scope>( elementGraph.outgoingEdgesOf( replace ) ); 677 678 if( !elementGraph.containsVertex( replaceWith ) ) 679 elementGraph.addVertex( replaceWith ); 680 681 for( Scope scope : incoming ) 682 { 683 FlowElement source = elementGraph.getEdgeSource( scope ); 684 elementGraph.removeEdge( source, replace ); // remove scope 685 686 // drop edge between, if any 687 if( source != replaceWith ) 688 elementGraph.addEdge( source, replaceWith, scope ); // add scope back 689 } 690 691 for( Scope scope : outgoing ) 692 { 693 FlowElement target = elementGraph.getEdgeTarget( scope ); 694 elementGraph.removeEdge( replace, target ); // remove scope 695 696 // drop edge between, if any 697 if( target != replaceWith ) 698 elementGraph.addEdge( replaceWith, target, scope ); // add scope back 699 } 700 701 elementGraph.removeVertex( replace ); 702 } 703 704 public static Pipe findFirstPipeNamed( ElementGraph elementGraph, String name ) 705 { 706 Iterator<FlowElement> iterator = getTopologicalIterator( elementGraph ); 707 708 return find( name, iterator ); 709 } 710 711 public static Pipe findLastPipeNamed( ElementGraph elementGraph, String name ) 712 { 713 Iterator<FlowElement> iterator = getReverseTopologicalIterator( elementGraph ); 714 715 return find( name, iterator ); 716 } 717 718 private static Pipe find( String name, Iterator<FlowElement> iterator ) 719 { 720 while( iterator.hasNext() ) 721 { 722 FlowElement flowElement = iterator.next(); 723 724 if( flowElement instanceof Pipe && ( (Pipe) flowElement ).getName().equals( name ) ) 725 return (Pipe) flowElement; 726 } 727 728 return null; 729 } 730 731 public static boolean removeBranchContaining( ElementGraph elementGraph, FlowElement flowElement ) 732 { 733 Set<FlowElement> branch = new LinkedHashSet<>(); 734 735 walkUp( branch, elementGraph, flowElement ); 736 737 walkDown( branch, elementGraph, flowElement ); 738 739 if( branch.isEmpty() ) 740 return false; 741 742 for( FlowElement element : branch ) 743 elementGraph.removeVertex( element ); 744 745 return true; 746 } 747 748 public static boolean removeBranchBetween( ElementGraph elementGraph, FlowElement first, FlowElement second, boolean inclusive ) 749 { 750 Set<FlowElement> branch = new LinkedHashSet<>( Arrays.asList( first, second ) ); 751 752 walkDown( branch, elementGraph, first ); 753 754 if( !inclusive ) 755 { 756 branch.remove( first ); 757 branch.remove( second ); 758 } 759 760 if( branch.isEmpty() ) 761 return false; 762 763 for( FlowElement element : branch ) 764 elementGraph.removeVertex( element ); 765 766 return true; 767 } 768 769 private static void walkDown( Set<FlowElement> branch, ElementGraph elementGraph, FlowElement flowElement ) 770 { 771 FlowElement current; 772 current = flowElement; 773 774 while( true ) 775 { 776 if( !branch.contains( current ) && ( elementGraph.inDegreeOf( current ) != 1 || elementGraph.outDegreeOf( current ) != 1 ) ) 777 break; 778 779 branch.add( current ); 780 781 FlowElement element = elementGraph.getEdgeTarget( getFirst( elementGraph.outgoingEdgesOf( current ) ) ); 782 783 if( element instanceof Extent || branch.contains( element ) ) 784 break; 785 786 current = element; 787 } 788 } 789 790 private static void walkUp( Set<FlowElement> branch, ElementGraph elementGraph, FlowElement flowElement ) 791 { 792 FlowElement current = flowElement; 793 794 while( true ) 795 { 796 if( elementGraph.inDegreeOf( current ) != 1 || elementGraph.outDegreeOf( current ) != 1 ) 797 break; 798 799 branch.add( current ); 800 801 FlowElement element = elementGraph.getEdgeSource( getFirst( elementGraph.incomingEdgesOf( current ) ) ); 802 803 if( element instanceof Extent || branch.contains( element ) ) 804 break; 805 806 current = element; 807 } 808 } 809 810 /** 811 * Returns the number of edges found on the shortest distance between the lhs and rhs. 812 */ 813 public static int shortestDistance( ElementGraph graph, FlowElement lhs, FlowElement rhs ) 814 { 815 return DijkstraShortestPath.findPathBetween( directed( graph ), lhs, rhs ).size(); 816 } 817 818 private static class FlowElementVertexNameProvider implements VertexNameProvider<FlowElement> 819 { 820 private final ElementGraph elementGraph; 821 private final PlatformInfo platformInfo; 822 823 public FlowElementVertexNameProvider( ElementGraph elementGraph, PlatformInfo platformInfo ) 824 { 825 this.elementGraph = elementGraph; 826 this.platformInfo = platformInfo; 827 } 828 829 public String getVertexName( FlowElement object ) 830 { 831 if( object instanceof Extent ) // is head/tail 832 { 833 String result = object.toString().replaceAll( "\"", "\'" ); 834 835 if( object == Extent.tail ) 836 return result; 837 838 result = result + "|hash: " + canonicalHash( elementGraph ); 839 840 String versionString = Version.getRelease(); 841 842 if( platformInfo != null ) 843 versionString = ( versionString == null ? "" : versionString + "|" ) + platformInfo; 844 845 return "{" + ( versionString == null ? result : result + "|" + versionString ) + "}"; 846 } 847 848 String label; 849 850 Iterator<Scope> iterator = elementGraph.outgoingEdgesOf( object ).iterator(); 851 852 if( !( object instanceof Pipe ) || !iterator.hasNext() ) 853 { 854 label = object.toString().replaceAll( "\"", "\'" ).replaceAll( "(\\)|\\])(\\[)", "$1|$2" ).replaceAll( "(^[^(\\[]+)(\\(|\\[)", "$1|$2" ); 855 } 856 else 857 { 858 Scope scope = iterator.next(); 859 860 label = ( (Pipe) object ).print( scope ).replaceAll( "\"", "\'" ).replaceAll( "(\\)|\\])(\\[)", "$1|$2" ).replaceAll( "(^[^(\\[]+)(\\(|\\[)", "$1|$2" ); 861 } 862 863 label = label.replaceFirst( "([^|]+)\\|(.*)", "$1 : " + getID( object ) + "|$2" ); // insert id 864 865 label = "{" + label.replaceAll( "\\{", "\\\\{" ).replaceAll( "\\}", "\\\\}" ).replaceAll( ">", "\\\\>" ) + "}"; 866 867 if( !( elementGraph instanceof AnnotatedGraph ) || !( (AnnotatedGraph) elementGraph ).hasAnnotations() ) 868 return label; 869 870 Set<Enum> annotations = ( (AnnotatedGraph) elementGraph ).getAnnotations().getKeysFor( object ); 871 872 if( !annotations.isEmpty() ) 873 label += "|{" + Util.join( annotations, "|" ) + "}"; 874 875 return label; 876 } 877 878 protected String getID( FlowElement object ) 879 { 880 return FlowElements.id( object ).substring( 0, 5 ); 881 } 882 } 883 884 private static class ScopeEdgeNameProvider implements EdgeNameProvider<Scope> 885 { 886 public String getEdgeName( Scope object ) 887 { 888 return object.toString().replaceAll( "\"", "\'" ).replaceAll( "\n", "\\\\n" ); // fix for newlines in graphviz 889 } 890 } 891 892 private static class VertexAttributeProvider implements ComponentAttributeProvider<FlowElement> 893 { 894 static Map<String, String> defaultNode = new HashMap<String, String>() 895 { 896 {put( "shape", "Mrecord" );} 897 }; 898 899 public VertexAttributeProvider() 900 { 901 } 902 903 @Override 904 public Map<String, String> getComponentAttributes( FlowElement object ) 905 { 906 return defaultNode; 907 } 908 } 909 910 private static class EdgeAttributeProvider implements ComponentAttributeProvider<Scope> 911 { 912 static Map<String, String> attributes = new HashMap<String, String>() 913 { 914 {put( "style", "dotted" );} 915 916 {put( "arrowhead", "dot" );} 917 }; 918 919 @Override 920 public Map<String, String> getComponentAttributes( Scope scope ) 921 { 922 if( scope.isNonBlocking() ) 923 return null; 924 925 return attributes; 926 } 927 } 928 929 private static class ProcessGraphNameProvider implements VertexNameProvider<ProcessModel> 930 { 931 @Override 932 public String getVertexName( ProcessModel processModel ) 933 { 934 return "" + processModel.getOrdinal(); 935 } 936 } 937 938 private static class ProcessGraphLabelProvider implements VertexNameProvider<ProcessModel> 939 { 940 @Override 941 public String getVertexName( ProcessModel processModel ) 942 { 943 return "ordinal: " + processModel.getOrdinal() + 944 "\\nid: " + processModel.getID() + 945 "\\nhash: " + canonicalHash( processModel.getElementGraph() ); 946 } 947 } 948 949 static void injectIdentityMap( AbstractGraph graph ) 950 { 951 // this overcomes jgrapht 0.9.0 using a LinkedHashMap vs an IdentityHashMap 952 // vertex not found errors will be thrown if this fails 953 Object specifics = Util.returnInstanceFieldIfExistsSafe( graph, "specifics" ); 954 955 if( specifics == null ) 956 { 957 LOG.warn( "unable to get jgrapht Specifics for identity map injection, may be using an incompatible jgrapht version" ); 958 return; 959 } 960 961 boolean success = Util.setInstanceFieldIfExistsSafe( specifics, "vertexMapDirected", new IdentityHashMap<>() ); 962 963 if( !success ) 964 LOG.warn( "unable to set IdentityHashMap on jgrapht Specifics, may be using an incompatible jgrapht version" ); 965 } 966 }