001 /* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.flow.planner; 022 023 import java.io.File; 024 import java.io.FileWriter; 025 import java.io.IOException; 026 import java.io.Writer; 027 import java.util.ArrayList; 028 import java.util.Collection; 029 import java.util.HashMap; 030 import java.util.HashSet; 031 import java.util.LinkedList; 032 import java.util.List; 033 import java.util.Map; 034 import java.util.Set; 035 036 import cascading.flow.FlowElement; 037 import cascading.operation.PlannedOperation; 038 import cascading.operation.PlannerLevel; 039 import cascading.pipe.Checkpoint; 040 import cascading.pipe.CoGroup; 041 import cascading.pipe.Each; 042 import cascading.pipe.Every; 043 import cascading.pipe.Group; 044 import cascading.pipe.Operator; 045 import cascading.pipe.Pipe; 046 import cascading.pipe.Splice; 047 import cascading.pipe.SubAssembly; 048 import cascading.property.AppProps; 049 import cascading.tap.Tap; 050 import cascading.util.Util; 051 import cascading.util.Version; 052 import org.jgrapht.GraphPath; 053 import org.jgrapht.Graphs; 054 import org.jgrapht.alg.KShortestPaths; 055 import org.jgrapht.ext.EdgeNameProvider; 056 import org.jgrapht.ext.IntegerNameProvider; 057 import org.jgrapht.ext.VertexNameProvider; 058 import org.jgrapht.graph.SimpleDirectedGraph; 059 import org.jgrapht.traverse.DepthFirstIterator; 060 import org.jgrapht.traverse.TopologicalOrderIterator; 061 import org.slf4j.Logger; 062 import org.slf4j.LoggerFactory; 063 064 /** Class ElementGraph represents the executable FlowElement graph. */ 065 public class ElementGraph extends SimpleDirectedGraph<FlowElement, Scope> 066 { 067 /** Field LOG */ 068 private static final Logger LOG = LoggerFactory.getLogger( ElementGraph.class ); 069 070 /** Field head */ 071 public static final Extent head = new Extent( "head" ); 072 /** Field tail */ 073 public static final Extent tail = new Extent( "tail" ); 074 /** Field resolved */ 075 private boolean resolved; 076 077 private PlatformInfo platformInfo; 078 /** Field sources */ 079 private Map<String, Tap> sources; 080 /** Field sinks */ 081 private Map<String, Tap> sinks; 082 /** Field traps */ 083 private Map<String, Tap> traps; 084 /** Field checkpoints */ 085 private Map<String, Tap> checkpoints; 086 /** Field requireUniqueCheckpoints */ 087 private boolean requireUniqueCheckpoints; 088 /** Field assertionLevel */ 089 private PlannerLevel[] plannerLevels; 090 091 ElementGraph() 092 { 093 super( Scope.class ); 094 } 095 096 public ElementGraph( ElementGraph elementGraph ) 097 { 098 this(); 099 this.platformInfo = elementGraph.platformInfo; 100 this.sources = elementGraph.sources; 101 this.sinks = elementGraph.sinks; 102 this.traps = elementGraph.traps; 103 this.checkpoints = elementGraph.checkpoints; 104 this.plannerLevels = elementGraph.plannerLevels; 105 this.requireUniqueCheckpoints = elementGraph.requireUniqueCheckpoints; 106 107 Graphs.addAllVertices( this, elementGraph.vertexSet() ); 108 Graphs.addAllEdges( this, elementGraph, elementGraph.edgeSet() ); 109 } 110 111 /** 112 * Constructor ElementGraph creates a new ElementGraph instance. 113 * 114 * @param pipes of type Pipe[] 115 * @param sources of type Map<String, Tap> 116 * @param sinks of type Map<String, Tap> 117 */ 118 public ElementGraph( PlatformInfo platformInfo, Pipe[] pipes, Map<String, Tap> sources, Map<String, Tap> sinks, Map<String, Tap> traps, Map<String, Tap> checkpoints, boolean requireUniqueCheckpoints, PlannerLevel... plannerLevels ) 119 { 120 super( Scope.class ); 121 this.platformInfo = platformInfo; 122 this.sources = sources; 123 this.sinks = sinks; 124 this.traps = traps; 125 this.checkpoints = checkpoints; 126 this.requireUniqueCheckpoints = requireUniqueCheckpoints; 127 this.plannerLevels = plannerLevels; 128 129 assembleGraph( pipes, sources, sinks ); 130 131 verifyGraph(); 132 } 133 134 public Map<String, Tap> getSourceMap() 135 { 136 return sources; 137 } 138 139 public Map<String, Tap> getSinkMap() 140 { 141 return sinks; 142 } 143 144 public Map<String, Tap> getTrapMap() 145 { 146 return traps; 147 } 148 149 public Map<String, Tap> getCheckpointsMap() 150 { 151 return checkpoints; 152 } 153 154 public Collection<Tap> getSources() 155 { 156 return sources.values(); 157 } 158 159 public Collection<Tap> getSinks() 160 { 161 return sinks.values(); 162 } 163 164 public Collection<Tap> getTraps() 165 { 166 return traps.values(); 167 } 168 169 private void assembleGraph( Pipe[] pipes, Map<String, Tap> sources, Map<String, Tap> sinks ) 170 { 171 HashMap<String, Tap> sourcesCopy = new HashMap<String, Tap>( sources ); 172 HashMap<String, Tap> sinksCopy = new HashMap<String, Tap>( sinks ); 173 174 for( Pipe pipe : pipes ) 175 makeGraph( pipe, sourcesCopy, sinksCopy ); 176 177 addExtents( sources, sinks ); 178 } 179 180 /** Method verifyGraphConnections ... */ 181 private void verifyGraph() 182 { 183 if( vertexSet().isEmpty() ) 184 return; 185 186 Set<String> checkpointNames = new HashSet<String>(); 187 188 // need to verify that only Extent instances are origins in this graph. Otherwise a Tap was not properly connected 189 TopologicalOrderIterator<FlowElement, Scope> iterator = getTopologicalIterator(); 190 191 FlowElement flowElement = null; 192 193 while( iterator.hasNext() ) 194 { 195 try 196 { 197 flowElement = iterator.next(); 198 } 199 catch( IllegalArgumentException exception ) 200 { 201 if( flowElement == null ) 202 throw new ElementGraphException( "unable to traverse to the first element" ); 203 204 throw new ElementGraphException( flowElement, "unable to traverse to the next element after " + flowElement ); 205 } 206 207 if( requireUniqueCheckpoints && flowElement instanceof Checkpoint ) 208 { 209 String name = ( (Checkpoint) flowElement ).getName(); 210 211 if( checkpointNames.contains( name ) ) 212 throw new ElementGraphException( (Pipe) flowElement, "may not have duplicate checkpoint names in assembly, found: " + name ); 213 214 checkpointNames.add( name ); 215 } 216 217 if( incomingEdgesOf( flowElement ).size() != 0 && outgoingEdgesOf( flowElement ).size() != 0 ) 218 continue; 219 220 if( flowElement instanceof Extent ) 221 continue; 222 223 if( flowElement instanceof Pipe ) 224 { 225 if( incomingEdgesOf( flowElement ).size() == 0 ) 226 throw new ElementGraphException( (Pipe) flowElement, "no Tap connected to head Pipe: " + flowElement + ", possible ambiguous branching, try explicitly naming heads" ); 227 else 228 throw new ElementGraphException( (Pipe) flowElement, "no Tap connected to tail Pipe: " + flowElement + ", possible ambiguous branching, try explicitly naming tails" ); 229 } 230 231 if( flowElement instanceof Tap ) 232 throw new ElementGraphException( (Tap) flowElement, "no Pipe connected to Tap: " + flowElement ); 233 else 234 throw new ElementGraphException( flowElement, "unknown element type: " + flowElement ); 235 } 236 } 237 238 /** 239 * Method copyGraph returns a partial copy of the current ElementGraph. Only Vertices and Edges are copied. 240 * 241 * @return ElementGraph 242 */ 243 public ElementGraph copyElementGraph() 244 { 245 ElementGraph copy = new ElementGraph(); 246 Graphs.addGraph( copy, this ); 247 248 copy.traps = new HashMap<String, Tap>( this.traps ); 249 250 return copy; 251 } 252 253 /** 254 * created to support the ability to generate all paths between the head and tail of the process. 255 * 256 * @param sources 257 * @param sinks 258 */ 259 private void addExtents( Map<String, Tap> sources, Map<String, Tap> sinks ) 260 { 261 addVertex( head ); 262 263 for( String source : sources.keySet() ) 264 { 265 Scope scope = addEdge( head, sources.get( source ) ); 266 267 // edge may already exist, if so, above returns null 268 if( scope != null ) 269 scope.setName( source ); 270 } 271 272 addVertex( tail ); 273 274 for( String sink : sinks.keySet() ) 275 { 276 Scope scope; 277 278 try 279 { 280 scope = addEdge( sinks.get( sink ), tail ); 281 } 282 catch( IllegalArgumentException exception ) 283 { 284 throw new ElementGraphException( "missing pipe for sink tap: [" + sink + "]" ); 285 } 286 287 if( scope == null ) 288 throw new ElementGraphException( "cannot sink to the same path from multiple branches: [" + Util.join( sinks.values() ) + "]" ); 289 290 scope.setName( sink ); 291 } 292 } 293 294 /** 295 * Performs one rule check, verifies group does not join duplicate tap resources. 296 * <p/> 297 * Scopes are always named after the source side of the source -> target relationship 298 */ 299 private void makeGraph( Pipe current, Map<String, Tap> sources, Map<String, Tap> sinks ) 300 { 301 if( LOG.isDebugEnabled() ) 302 LOG.debug( "adding pipe: " + current ); 303 304 if( current instanceof SubAssembly ) 305 { 306 for( Pipe pipe : SubAssembly.unwind( current.getPrevious() ) ) 307 makeGraph( pipe, sources, sinks ); 308 309 return; 310 } 311 312 if( containsVertex( current ) ) 313 return; 314 315 addVertex( current ); 316 317 Tap sink = sinks.remove( current.getName() ); 318 319 if( sink != null ) 320 { 321 if( LOG.isDebugEnabled() ) 322 LOG.debug( "adding sink: " + sink ); 323 324 addVertex( sink ); 325 326 if( LOG.isDebugEnabled() ) 327 LOG.debug( "adding edge: " + current + " -> " + sink ); 328 329 addEdge( current, sink ).setName( current.getName() ); // name scope after sink 330 } 331 332 // PipeAssemblies should always have a previous 333 if( SubAssembly.unwind( current.getPrevious() ).length == 0 ) 334 { 335 Tap source = sources.remove( current.getName() ); 336 337 if( source != null ) 338 { 339 if( LOG.isDebugEnabled() ) 340 LOG.debug( "adding source: " + source ); 341 342 addVertex( source ); 343 344 if( LOG.isDebugEnabled() ) 345 LOG.debug( "adding edge: " + source + " -> " + current ); 346 347 addEdge( source, current ).setName( current.getName() ); // name scope after source 348 } 349 } 350 351 for( Pipe previous : SubAssembly.unwind( current.getPrevious() ) ) 352 { 353 makeGraph( previous, sources, sinks ); 354 355 if( LOG.isDebugEnabled() ) 356 LOG.debug( "adding edge: " + previous + " -> " + current ); 357 358 if( getEdge( previous, current ) != null ) 359 throw new ElementGraphException( previous, "cannot distinguish pipe branches, give pipe unique name: " + previous ); 360 361 addEdge( previous, current ).setName( previous.getName() ); // name scope after previous pipe 362 } 363 } 364 365 /** 366 * Method getTopologicalIterator returns the topologicalIterator of this ElementGraph object. 367 * 368 * @return the topologicalIterator (type TopologicalOrderIterator<FlowElement, Scope>) of this ElementGraph object. 369 */ 370 public TopologicalOrderIterator<FlowElement, Scope> getTopologicalIterator() 371 { 372 return new TopologicalOrderIterator<FlowElement, Scope>( this ); 373 } 374 375 /** 376 * Method getAllShortestPathsFrom ... 377 * 378 * @param flowElement of type FlowElement 379 * @return List<GraphPath<FlowElement, Scope>> 380 */ 381 public List<GraphPath<FlowElement, Scope>> getAllShortestPathsFrom( FlowElement flowElement ) 382 { 383 return ElementGraphs.getAllShortestPathsBetween( this, flowElement, tail ); 384 } 385 386 /** 387 * Method getAllShortestPathsTo ... 388 * 389 * @param flowElement of type FlowElement 390 * @return List<GraphPath<FlowElement, Scope>> 391 */ 392 public List<GraphPath<FlowElement, Scope>> getAllShortestPathsTo( FlowElement flowElement ) 393 { 394 return ElementGraphs.getAllShortestPathsBetween( this, head, flowElement ); 395 } 396 397 /** 398 * Method getAllShortestPathsBetweenExtents returns the allShortestPathsBetweenExtents of this ElementGraph object. 399 * 400 * @return the allShortestPathsBetweenExtents (type List<GraphPath<FlowElement, Scope>>) of this ElementGraph object. 401 */ 402 public List<GraphPath<FlowElement, Scope>> getAllShortestPathsBetweenExtents() 403 { 404 List<GraphPath<FlowElement, Scope>> paths = new KShortestPaths<FlowElement, Scope>( this, head, Integer.MAX_VALUE ).getPaths( tail ); 405 406 if( paths == null ) 407 return new ArrayList<GraphPath<FlowElement, Scope>>(); 408 409 return paths; 410 } 411 412 /** 413 * Method getDepthFirstIterator returns the depthFirstIterator of this ElementGraph object. 414 * 415 * @return the depthFirstIterator (type DepthFirstIterator<FlowElement, Scope>) of this ElementGraph object. 416 */ 417 public DepthFirstIterator<FlowElement, Scope> getDepthFirstIterator() 418 { 419 return new DepthFirstIterator<FlowElement, Scope>( this, head ); 420 } 421 422 private SimpleDirectedGraph<FlowElement, Scope> copyWithTraps() 423 { 424 ElementGraph copy = this.copyElementGraph(); 425 426 copy.addTraps(); 427 428 return copy; 429 } 430 431 private void addTraps() 432 { 433 DepthFirstIterator<FlowElement, Scope> iterator = getDepthFirstIterator(); 434 435 while( iterator.hasNext() ) 436 { 437 FlowElement element = iterator.next(); 438 439 if( !( element instanceof Pipe ) ) 440 continue; 441 442 Pipe pipe = (Pipe) element; 443 Tap trap = traps.get( pipe.getName() ); 444 445 if( trap == null ) 446 continue; 447 448 addVertex( trap ); 449 450 if( LOG.isDebugEnabled() ) 451 LOG.debug( "adding trap edge: " + pipe + " -> " + trap ); 452 453 if( getEdge( pipe, trap ) != null ) 454 continue; 455 456 addEdge( pipe, trap ).setName( pipe.getName() ); // name scope after previous pipe 457 } 458 } 459 460 /** 461 * Method writeDOT writes this element graph to a DOT file for easy visualization and debugging. 462 * 463 * @param filename of type String 464 */ 465 public void writeDOT( String filename ) 466 { 467 printElementGraph( filename, this.copyWithTraps() ); 468 } 469 470 protected void printElementGraph( String filename, final SimpleDirectedGraph<FlowElement, Scope> graph ) 471 { 472 try 473 { 474 File parentFile = new File( filename ).getParentFile(); 475 476 if( parentFile != null && !parentFile.exists() ) 477 parentFile.mkdirs(); 478 479 Writer writer = new FileWriter( filename ); 480 481 Util.writeDOT( writer, graph, new IntegerNameProvider<FlowElement>(), new VertexNameProvider<FlowElement>() 482 { 483 public String getVertexName( FlowElement object ) 484 { 485 if( graph.incomingEdgesOf( object ).isEmpty() ) 486 { 487 String result = object.toString().replaceAll( "\"", "\'" ); 488 String versionString = Version.getRelease(); 489 490 if( platformInfo != null ) 491 versionString = ( versionString == null ? "" : versionString + "\\n" ) + platformInfo; 492 493 result = versionString == null ? result : result + "\\n" + versionString; 494 495 return result + "\\napp id: " + AppProps.getApplicationID( null ); 496 } 497 498 if( object instanceof Tap || object instanceof Extent ) 499 return object.toString().replaceAll( "\"", "\'" ); 500 501 Scope scope = graph.outgoingEdgesOf( object ).iterator().next(); 502 503 return ( (Pipe) object ).print( scope ).replaceAll( "\"", "\'" ); 504 } 505 }, new EdgeNameProvider<Scope>() 506 { 507 public String getEdgeName( Scope object ) 508 { 509 return object.toString().replaceAll( "\"", "\'" ).replaceAll( "\n", "\\\\n" ); // fix for newlines in graphviz 510 } 511 } 512 ); 513 514 writer.close(); 515 } 516 catch( IOException exception ) 517 { 518 LOG.error( "failed printing graph to: {}, with exception: {}", filename, exception ); 519 } 520 } 521 522 /** Method removeEmptyPipes performs a depth first traversal and removes instance of {@link cascading.pipe.Pipe} or {@link cascading.pipe.SubAssembly}. */ 523 public void removeUnnecessaryPipes() 524 { 525 while( !internalRemoveUnnecessaryPipes() ) 526 ; 527 528 int numPipes = 0; 529 for( FlowElement flowElement : vertexSet() ) 530 { 531 if( flowElement instanceof Pipe ) 532 numPipes++; 533 } 534 535 if( numPipes == 0 ) 536 throw new ElementGraphException( "resulting graph has no pipe elements after removing empty Pipe, assertions, and SubAssembly containers" ); 537 } 538 539 private boolean internalRemoveUnnecessaryPipes() 540 { 541 DepthFirstIterator<FlowElement, Scope> iterator = getDepthFirstIterator(); 542 543 while( iterator.hasNext() ) 544 { 545 FlowElement flowElement = iterator.next(); 546 547 if( flowElement.getClass() == Pipe.class || flowElement.getClass() == Checkpoint.class || 548 flowElement instanceof SubAssembly || testPlannerLevel( flowElement ) ) 549 { 550 // Pipe class is guaranteed to have one input 551 removeElement( flowElement ); 552 553 return false; 554 } 555 } 556 557 return true; 558 } 559 560 private void removeElement( FlowElement flowElement ) 561 { 562 LOG.debug( "removing: " + flowElement ); 563 564 Set<Scope> incomingScopes = incomingEdgesOf( flowElement ); 565 566 if( incomingScopes.size() != 1 ) 567 throw new IllegalStateException( "flow element:" + flowElement + ", has multiple input paths: " + incomingScopes.size() ); 568 569 Scope incoming = incomingScopes.iterator().next(); 570 Set<Scope> outgoingScopes = outgoingEdgesOf( flowElement ); 571 572 // source -> incoming -> flowElement -> outgoing -> target 573 FlowElement source = getEdgeSource( incoming ); 574 575 for( Scope outgoing : outgoingScopes ) 576 { 577 FlowElement target = getEdgeTarget( outgoing ); 578 579 addEdge( source, target, new Scope( outgoing ) ); 580 } 581 582 removeVertex( flowElement ); 583 } 584 585 private boolean testPlannerLevel( FlowElement flowElement ) 586 { 587 if( !( flowElement instanceof Operator ) ) 588 return false; 589 590 Operator operator = (Operator) flowElement; 591 592 if( !operator.hasPlannerLevel() ) 593 return false; 594 595 for( PlannerLevel plannerLevel : plannerLevels ) 596 { 597 if( ( (PlannedOperation) operator.getOperation() ).supportsPlannerLevel( plannerLevel ) ) 598 return operator.getPlannerLevel().isStricterThan( plannerLevel ); 599 } 600 601 throw new IllegalStateException( "encountered unsupported planner level: " + operator.getPlannerLevel().getClass().getName() ); 602 } 603 604 /** Method resolveFields performs a breadth first traversal and resolves the tuple fields between each Pipe instance. */ 605 public void resolveFields() 606 { 607 if( resolved ) 608 throw new IllegalStateException( "element graph already resolved" ); 609 610 TopologicalOrderIterator<FlowElement, Scope> iterator = getTopologicalIterator(); 611 612 while( iterator.hasNext() ) 613 resolveFields( iterator.next() ); 614 615 resolved = true; 616 } 617 618 private void resolveFields( FlowElement source ) 619 { 620 if( source instanceof Extent ) 621 return; 622 623 Set<Scope> incomingScopes = incomingEdgesOf( source ); 624 Set<Scope> outgoingScopes = outgoingEdgesOf( source ); 625 626 List<FlowElement> flowElements = Graphs.successorListOf( this, source ); 627 628 if( flowElements.size() == 0 ) 629 throw new IllegalStateException( "unable to find next elements in pipeline from: " + source.toString() ); 630 631 Scope outgoingScope = source.outgoingScopeFor( incomingScopes ); 632 633 if( LOG.isDebugEnabled() && outgoingScope != null ) 634 { 635 LOG.debug( "for modifier: " + source ); 636 if( outgoingScope.getArgumentsSelector() != null ) 637 LOG.debug( "setting outgoing arguments: " + outgoingScope.getArgumentsSelector() ); 638 if( outgoingScope.getOperationDeclaredFields() != null ) 639 LOG.debug( "setting outgoing declared: " + outgoingScope.getOperationDeclaredFields() ); 640 if( outgoingScope.getKeySelectors() != null ) 641 LOG.debug( "setting outgoing group: " + outgoingScope.getKeySelectors() ); 642 if( outgoingScope.getOutValuesSelector() != null ) 643 LOG.debug( "setting outgoing values: " + outgoingScope.getOutValuesSelector() ); 644 } 645 646 for( Scope scope : outgoingScopes ) 647 scope.copyFields( outgoingScope ); 648 } 649 650 /** 651 * Finds all groups that merge/join streams. returned in topological order. 652 * 653 * @return a List fo Group instances 654 */ 655 public List<Group> findAllMergeJoinGroups() 656 { 657 return findAllOfType( 2, 1, Group.class, new LinkedList<Group>() ); 658 } 659 660 /** 661 * Finds all splices that merge/join streams. returned in topological order. 662 * 663 * @return a List fo Group instances 664 */ 665 public List<Splice> findAllMergeJoinSplices() 666 { 667 return findAllOfType( 2, 1, Splice.class, new LinkedList<Splice>() ); 668 } 669 670 public List<CoGroup> findAllCoGroups() 671 { 672 return findAllOfType( 2, 1, CoGroup.class, new LinkedList<CoGroup>() ); 673 } 674 675 /** 676 * Method findAllGroups ... 677 * 678 * @return List<Group> 679 */ 680 public List<Group> findAllGroups() 681 { 682 return findAllOfType( 1, 1, Group.class, new LinkedList<Group>() ); 683 } 684 685 /** 686 * Method findAllEveries ... 687 * 688 * @return List<Every> 689 */ 690 public List<Every> findAllEveries() 691 { 692 return findAllOfType( 1, 1, Every.class, new LinkedList<Every>() ); 693 } 694 695 /** 696 * Method findAllTaps ... 697 * 698 * @return List<Tap> 699 */ 700 public List<Tap> findAllTaps() 701 { 702 return findAllOfType( 1, 1, Tap.class, new LinkedList<Tap>() ); 703 } 704 705 /** 706 * Method findAllSplits ... 707 * 708 * @return List<FlowElement> 709 */ 710 public List<Each> findAllEachSplits() 711 { 712 return findAllOfType( 1, 2, Each.class, new LinkedList<Each>() ); 713 } 714 715 public List<Pipe> findAllPipeSplits() 716 { 717 return findAllOfType( 1, 2, Pipe.class, new LinkedList<Pipe>() ); 718 } 719 720 /** 721 * Method findAllOfType ... 722 * 723 * @param minInDegree of type int 724 * @param minOutDegree 725 * @param type of type Class<P> 726 * @param results of type List<P> @return List<P> 727 */ 728 public <P> List<P> findAllOfType( int minInDegree, int minOutDegree, Class<P> type, List<P> results ) 729 { 730 TopologicalOrderIterator<FlowElement, Scope> topoIterator = getTopologicalIterator(); 731 732 while( topoIterator.hasNext() ) 733 { 734 FlowElement flowElement = topoIterator.next(); 735 736 if( type.isInstance( flowElement ) && inDegreeOf( flowElement ) >= minInDegree && outDegreeOf( flowElement ) >= minOutDegree ) 737 results.add( (P) flowElement ); 738 } 739 740 return results; 741 } 742 743 public void insertFlowElementAfter( FlowElement previousElement, FlowElement flowElement ) 744 { 745 Set<Scope> outgoing = new HashSet<Scope>( outgoingEdgesOf( previousElement ) ); 746 747 addVertex( flowElement ); 748 749 String name = previousElement.toString(); 750 751 if( previousElement instanceof Pipe ) 752 name = ( (Pipe) previousElement ).getName(); 753 754 addEdge( previousElement, flowElement, new Scope( name ) ); 755 756 for( Scope scope : outgoing ) 757 { 758 FlowElement target = getEdgeTarget( scope ); 759 removeEdge( previousElement, target ); // remove scope 760 addEdge( flowElement, target, scope ); // add scope back 761 } 762 } 763 764 /** Simple class that acts in as the root of the graph */ 765 /** 766 * Method makeTapGraph returns a directed graph of all taps in the current element graph. 767 * 768 * @return SimpleDirectedGraph<Tap, Integer> 769 */ 770 public SimpleDirectedGraph<Tap, Integer> makeTapGraph() 771 { 772 SimpleDirectedGraph<Tap, Integer> tapGraph = new SimpleDirectedGraph<Tap, Integer>( Integer.class ); 773 List<GraphPath<FlowElement, Scope>> paths = getAllShortestPathsBetweenExtents(); 774 int count = 0; 775 776 if( LOG.isDebugEnabled() ) 777 LOG.debug( "found num paths: " + paths.size() ); 778 779 for( GraphPath<FlowElement, Scope> element : paths ) 780 { 781 List<Scope> path = element.getEdgeList(); 782 Tap lastTap = null; 783 784 for( Scope scope : path ) 785 { 786 FlowElement target = getEdgeTarget( scope ); 787 788 if( target instanceof Extent ) 789 continue; 790 791 if( !( target instanceof Tap ) ) 792 continue; 793 794 tapGraph.addVertex( (Tap) target ); 795 796 if( lastTap != null ) 797 { 798 if( LOG.isDebugEnabled() ) 799 LOG.debug( "adding tap edge: " + lastTap + " -> " + target ); 800 801 if( tapGraph.getEdge( lastTap, (Tap) target ) == null && !tapGraph.addEdge( lastTap, (Tap) target, count++ ) ) 802 throw new ElementGraphException( "could not add graph edge: " + lastTap + " -> " + target ); 803 } 804 805 lastTap = (Tap) target; 806 } 807 } 808 809 return tapGraph; 810 } 811 812 public int getMaxNumPathsBetweenElementAndGroupingMergeJoin( FlowElement flowElement ) 813 { 814 List<Group> groups = findAllMergeJoinGroups(); 815 816 int maxPaths = 0; 817 818 if( groups == null ) 819 return 0; 820 821 for( Group group : groups ) 822 { 823 if( flowElement != group ) 824 { 825 List<GraphPath<FlowElement, Scope>> paths = ElementGraphs.getAllShortestPathsBetween( this, flowElement, group ); 826 827 if( paths != null ) 828 maxPaths = Math.max( maxPaths, paths.size() ); 829 } 830 } 831 832 return maxPaths; 833 } 834 835 public List<FlowElement> getAllSuccessors( FlowElement element ) 836 { 837 return Graphs.successorListOf( this, element ); 838 } 839 840 public void replaceElementWith( FlowElement element, FlowElement replacement ) 841 { 842 Set<Scope> incoming = new HashSet<Scope>( incomingEdgesOf( element ) ); 843 Set<Scope> outgoing = new HashSet<Scope>( outgoingEdgesOf( element ) ); 844 845 if( !containsVertex( replacement ) ) 846 addVertex( replacement ); 847 848 for( Scope scope : incoming ) 849 { 850 FlowElement source = getEdgeSource( scope ); 851 removeEdge( source, element ); // remove scope 852 853 // drop edge between, if any 854 if( source != replacement ) 855 addEdge( source, replacement, scope ); // add scope back 856 } 857 858 for( Scope scope : outgoing ) 859 { 860 FlowElement target = getEdgeTarget( scope ); 861 removeEdge( element, target ); // remove scope 862 863 // drop edge between, if any 864 if( target != replacement ) 865 addEdge( replacement, target, scope ); // add scope back 866 } 867 868 removeVertex( element ); 869 } 870 871 public <A extends FlowElement> Set<A> getAllChildrenOfType( FlowElement flowElement, Class<A> type ) 872 { 873 Set<A> allChildren = new HashSet<A>(); 874 875 getAllChildrenOfType( allChildren, flowElement, type ); 876 877 return allChildren; 878 } 879 880 private <A extends FlowElement> void getAllChildrenOfType( Set<A> allSuccessors, FlowElement flowElement, Class<A> type ) 881 { 882 List<FlowElement> successors = getAllSuccessors( flowElement ); 883 884 for( FlowElement successor : successors ) 885 { 886 if( type.isInstance( successor ) ) 887 allSuccessors.add( (A) successor ); 888 else 889 getAllChildrenOfType( allSuccessors, successor, type ); 890 } 891 } 892 893 public Set<FlowElement> getAllChildrenNotExactlyType( FlowElement flowElement, Class<? extends FlowElement> type ) 894 { 895 Set<FlowElement> allChildren = new HashSet<FlowElement>(); 896 897 getAllChildrenNotExactlyType( allChildren, flowElement, type ); 898 899 return allChildren; 900 } 901 902 private void getAllChildrenNotExactlyType( Set<FlowElement> allSuccessors, FlowElement flowElement, Class<? extends FlowElement> type ) 903 { 904 List<FlowElement> successors = getAllSuccessors( flowElement ); 905 906 for( FlowElement successor : successors ) 907 { 908 if( type != successor.getClass() ) 909 allSuccessors.add( successor ); 910 else 911 getAllChildrenNotExactlyType( allSuccessors, successor, type ); 912 } 913 } 914 915 public static class Extent extends Pipe 916 { 917 918 /** @see cascading.pipe.Pipe#Pipe(String) */ 919 public Extent( String name ) 920 { 921 super( name ); 922 } 923 924 @Override 925 public Scope outgoingScopeFor( Set<Scope> scopes ) 926 { 927 return new Scope(); 928 } 929 930 @Override 931 public String toString() 932 { 933 return "[" + getName() + "]"; 934 } 935 936 public boolean equals( Object object ) 937 { 938 if( object == null ) 939 return false; 940 941 if( this == object ) 942 return true; 943 944 if( object.getClass() != this.getClass() ) 945 return false; 946 947 return this.getName().equals( ( (Pipe) object ).getName() ); 948 } 949 } 950 }