001 /* 002 * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.cascade; 022 023 import java.io.FileWriter; 024 import java.io.IOException; 025 import java.io.Writer; 026 import java.util.ArrayList; 027 import java.util.Collection; 028 import java.util.Collections; 029 import java.util.HashSet; 030 import java.util.LinkedHashMap; 031 import java.util.LinkedList; 032 import java.util.List; 033 import java.util.Map; 034 import java.util.Set; 035 import java.util.concurrent.Callable; 036 import java.util.concurrent.CountDownLatch; 037 import java.util.concurrent.Future; 038 import java.util.concurrent.TimeUnit; 039 040 import cascading.CascadingException; 041 import cascading.cascade.planner.FlowGraph; 042 import cascading.cascade.planner.IdentifierGraph; 043 import cascading.cascade.planner.TapGraph; 044 import cascading.flow.BaseFlow; 045 import cascading.flow.Flow; 046 import cascading.flow.FlowException; 047 import cascading.flow.FlowSkipStrategy; 048 import cascading.management.CascadingServices; 049 import cascading.management.UnitOfWork; 050 import cascading.management.UnitOfWorkExecutorStrategy; 051 import cascading.management.UnitOfWorkSpawnStrategy; 052 import cascading.management.state.ClientState; 053 import cascading.stats.CascadeStats; 054 import cascading.tap.Tap; 055 import cascading.util.ShutdownUtil; 056 import cascading.util.Util; 057 import cascading.util.Version; 058 import org.jgrapht.Graphs; 059 import org.jgrapht.ext.EdgeNameProvider; 060 import org.jgrapht.ext.IntegerNameProvider; 061 import org.jgrapht.ext.VertexNameProvider; 062 import org.jgrapht.graph.SimpleDirectedGraph; 063 import org.jgrapht.traverse.TopologicalOrderIterator; 064 import org.slf4j.Logger; 065 import org.slf4j.LoggerFactory; 066 067 import static cascading.property.PropertyUtil.getProperty; 068 069 /** 070 * A Cascade is an assembly of {@link cascading.flow.Flow} instances that share or depend on equivalent {@link Tap} instances and are executed as 071 * a single group. The most common case is where one Flow instance depends on a Tap created by a second Flow instance. This 072 * dependency chain can continue as practical. 073 * <p/> 074 * Note Flow instances that have no shared dependencies will be executed in parallel. 075 * <p/> 076 * Additionally, a Cascade allows for incremental builds of complex data processing processes. If a given source {@link Tap} is newer than 077 * a subsequent sink {@link Tap} in the assembly, the connecting {@link cascading.flow.Flow}(s) will be executed 078 * when the Cascade executed. If all the targets (sinks) are up to date, the Cascade exits immediately and does nothing. 079 * <p/> 080 * The concept of 'stale' is pluggable, see the {@link cascading.flow.FlowSkipStrategy} class. 081 * <p/> 082 * When a Cascade starts up, if first verifies which Flow instances have stale sinks, if the sinks are not stale, the 083 * method {@link cascading.flow.BaseFlow#deleteSinksIfNotUpdate()} is called. Before appends/updates were supported (logically) 084 * the Cascade deleted all the sinks in a Flow. 085 * <p/> 086 * The new consequence of this is if the Cascade fails, but does complete a Flow that appended or updated data, re-running 087 * the Cascade (and the successful append/update Flow) will re-update data to the source. Some systems may be idempotent and 088 * may not have any side-effects. So plan accordingly. 089 * <p/> 090 * Use the {@link CascadeListener} to receive any events on the life-cycle of the Cascade as it executes. Any 091 * {@link Tap} instances owned by managed Flows also implementing CascadeListener will automatically be added to the 092 * set of listeners. 093 * 094 * @see CascadeListener 095 * @see cascading.flow.Flow 096 * @see cascading.flow.FlowSkipStrategy 097 */ 098 public class Cascade implements UnitOfWork<CascadeStats> 099 { 100 /** Field LOG */ 101 private static final Logger LOG = LoggerFactory.getLogger( Cascade.class ); 102 103 /** Field id */ 104 private String id; 105 /** Field name */ 106 private final String name; 107 /** Field tags */ 108 private String tags; 109 /** Field properties */ 110 private final Map<Object, Object> properties; 111 /** Fields listeners */ 112 private List<SafeCascadeListener> listeners; 113 /** Field jobGraph */ 114 private final FlowGraph flowGraph; 115 /** Field tapGraph */ 116 private final IdentifierGraph identifierGraph; 117 /** Field cascadeStats */ 118 private final CascadeStats cascadeStats; 119 /** Field cascadingServices */ 120 private CascadingServices cascadingServices; 121 /** Field thread */ 122 private Thread thread; 123 /** Field throwable */ 124 private Throwable throwable; 125 private transient UnitOfWorkSpawnStrategy spawnStrategy = new UnitOfWorkExecutorStrategy(); 126 /** Field shutdownHook */ 127 private ShutdownUtil.Hook shutdownHook; 128 /** Field jobsMap */ 129 private final Map<String, Callable<Throwable>> jobsMap = new LinkedHashMap<String, Callable<Throwable>>(); 130 /** Field stop */ 131 private boolean stop; 132 /** Field flowSkipStrategy */ 133 private FlowSkipStrategy flowSkipStrategy = null; 134 /** Field maxConcurrentFlows */ 135 private int maxConcurrentFlows = 0; 136 137 /** Field tapGraph * */ 138 private transient TapGraph tapGraph; 139 140 static int getMaxConcurrentFlows( Map<Object, Object> properties, int maxConcurrentFlows ) 141 { 142 if( maxConcurrentFlows != -1 ) // CascadeDef is -1 by default 143 return maxConcurrentFlows; 144 145 return Integer.parseInt( getProperty( properties, CascadeProps.MAX_CONCURRENT_FLOWS, "0" ) ); 146 } 147 148 /** for testing */ 149 protected Cascade() 150 { 151 this.name = null; 152 this.tags = null; 153 this.properties = null; 154 this.flowGraph = null; 155 this.identifierGraph = null; 156 this.cascadeStats = null; 157 } 158 159 Cascade( CascadeDef cascadeDef, Map<Object, Object> properties, FlowGraph flowGraph, IdentifierGraph identifierGraph ) 160 { 161 this.name = cascadeDef.getName(); 162 this.tags = cascadeDef.getTags(); 163 this.properties = properties; 164 this.flowGraph = flowGraph; 165 this.identifierGraph = identifierGraph; 166 this.cascadeStats = createPrepareCascadeStats(); 167 setIDOnFlow(); 168 this.maxConcurrentFlows = cascadeDef.getMaxConcurrentFlows(); 169 170 addListeners( getAllTaps() ); 171 } 172 173 private CascadeStats createPrepareCascadeStats() 174 { 175 CascadeStats cascadeStats = new CascadeStats( this, getClientState() ); 176 177 cascadeStats.prepare(); 178 cascadeStats.markPending(); 179 180 return cascadeStats; 181 } 182 183 /** 184 * Method getName returns the name of this Cascade object. 185 * 186 * @return the name (type String) of this Cascade object. 187 */ 188 @Override 189 public String getName() 190 { 191 return name; 192 } 193 194 /** 195 * Method getID returns the ID of this Cascade object. 196 * <p/> 197 * The ID value is a long HEX String used to identify this instance globally. Subsequent Cascade 198 * instances created with identical parameters will not return the same ID. 199 * 200 * @return the ID (type String) of this Cascade object. 201 */ 202 @Override 203 public String getID() 204 { 205 if( id == null ) 206 id = Util.createUniqueID(); 207 208 return id; 209 } 210 211 /** 212 * Method getTags returns the tags associated with this Cascade object. 213 * 214 * @return the tags (type String) of this Cascade object. 215 */ 216 @Override 217 public String getTags() 218 { 219 return tags; 220 } 221 222 void addListeners( Collection listeners ) 223 { 224 for( Object listener : listeners ) 225 { 226 if( listener instanceof CascadeListener ) 227 addListener( (CascadeListener) listener ); 228 } 229 } 230 231 List<SafeCascadeListener> getListeners() 232 { 233 if( listeners == null ) 234 listeners = new LinkedList<SafeCascadeListener>(); 235 236 return listeners; 237 } 238 239 public boolean hasListeners() 240 { 241 return listeners != null && !listeners.isEmpty(); 242 } 243 244 public void addListener( CascadeListener flowListener ) 245 { 246 getListeners().add( new SafeCascadeListener( flowListener ) ); 247 } 248 249 public boolean removeListener( CascadeListener flowListener ) 250 { 251 return getListeners().remove( new SafeCascadeListener( flowListener ) ); 252 } 253 254 private void fireOnCompleted() 255 { 256 if( hasListeners() ) 257 { 258 if( LOG.isDebugEnabled() ) 259 logDebug( "firing onCompleted event: " + getListeners().size() ); 260 261 for( CascadeListener cascadeListener : getListeners() ) 262 cascadeListener.onCompleted( this ); 263 } 264 } 265 266 private void fireOnThrowable() 267 { 268 if( hasListeners() ) 269 { 270 if( LOG.isDebugEnabled() ) 271 logDebug( "firing onThrowable event: " + getListeners().size() ); 272 273 boolean isHandled = false; 274 275 for( CascadeListener cascadeListener : getListeners() ) 276 isHandled = cascadeListener.onThrowable( this, throwable ) || isHandled; 277 278 if( isHandled ) 279 throwable = null; 280 } 281 } 282 283 protected void fireOnStopping() 284 { 285 if( hasListeners() ) 286 { 287 if( LOG.isDebugEnabled() ) 288 logDebug( "firing onStopping event: " + getListeners().size() ); 289 290 for( CascadeListener cascadeListener : getListeners() ) 291 cascadeListener.onStopping( this ); 292 } 293 } 294 295 protected void fireOnStarting() 296 { 297 if( hasListeners() ) 298 { 299 if( LOG.isDebugEnabled() ) 300 logDebug( "firing onStarting event: " + getListeners().size() ); 301 302 for( CascadeListener cascadeListener : getListeners() ) 303 cascadeListener.onStarting( this ); 304 } 305 } 306 307 private CascadingServices getCascadingServices() 308 { 309 if( cascadingServices == null ) 310 cascadingServices = new CascadingServices( properties ); 311 312 return cascadingServices; 313 } 314 315 private ClientState getClientState() 316 { 317 return getCascadingServices().createClientState( getID() ); 318 } 319 320 /** 321 * Method getCascadeStats returns the cascadeStats of this Cascade object. 322 * 323 * @return the cascadeStats (type CascadeStats) of this Cascade object. 324 */ 325 public CascadeStats getCascadeStats() 326 { 327 return cascadeStats; 328 } 329 330 @Override 331 public CascadeStats getStats() 332 { 333 return getCascadeStats(); 334 } 335 336 private void setIDOnFlow() 337 { 338 for( Flow<?> flow : getFlows() ) 339 ( (BaseFlow<?>) flow ).setCascade( this ); 340 } 341 342 protected FlowGraph getFlowGraph() 343 { 344 return flowGraph; 345 } 346 347 protected IdentifierGraph getIdentifierGraph() 348 { 349 return identifierGraph; 350 } 351 352 /** 353 * Method getFlows returns the flows managed by this Cascade object. The returned {@link cascading.flow.Flow} instances 354 * will be in topological order. 355 * 356 * @return the flows (type Collection<Flow>) of this Cascade object. 357 */ 358 public List<Flow> getFlows() 359 { 360 List<Flow> flows = new LinkedList<Flow>(); 361 TopologicalOrderIterator<Flow, Integer> topoIterator = flowGraph.getTopologicalIterator(); 362 363 while( topoIterator.hasNext() ) 364 flows.add( topoIterator.next() ); 365 366 return flows; 367 } 368 369 /** 370 * Method findFlows returns a List of flows whose names match the given regex pattern. 371 * 372 * @param regex of type String 373 * @return List<Flow> 374 */ 375 public List<Flow> findFlows( String regex ) 376 { 377 List<Flow> flows = new ArrayList<Flow>(); 378 379 for( Flow flow : getFlows() ) 380 { 381 if( flow.getName().matches( regex ) ) 382 flows.add( flow ); 383 } 384 385 return flows; 386 } 387 388 /** 389 * Method getHeadFlows returns all Flow instances that are at the "head" of the flow graph. 390 * <p/> 391 * That is, they are the first to execute and have no Tap source dependencies with Flow instances in the this Cascade 392 * instance. 393 * 394 * @return Collection<Flow> 395 */ 396 public Collection<Flow> getHeadFlows() 397 { 398 Set<Flow> flows = new HashSet<Flow>(); 399 400 for( Flow flow : flowGraph.vertexSet() ) 401 { 402 if( flowGraph.inDegreeOf( flow ) == 0 ) 403 flows.add( flow ); 404 } 405 406 return flows; 407 } 408 409 /** 410 * Method getTailFlows returns all Flow instances that are at the "tail" of the flow graph. 411 * <p/> 412 * That is, they are the last to execute and have no Tap sink dependencies with Flow instances in the this Cascade 413 * instance. 414 * 415 * @return Collection<Flow> 416 */ 417 public Collection<Flow> getTailFlows() 418 { 419 Set<Flow> flows = new HashSet<Flow>(); 420 421 for( Flow flow : flowGraph.vertexSet() ) 422 { 423 if( flowGraph.outDegreeOf( flow ) == 0 ) 424 flows.add( flow ); 425 } 426 427 return flows; 428 } 429 430 /** 431 * Method getIntermediateFlows returns all Flow instances that are neither at the "tail" or "tail" of the flow graph. 432 * 433 * @return Collection<Flow> 434 */ 435 public Collection<Flow> getIntermediateFlows() 436 { 437 Set<Flow> flows = new HashSet<Flow>( flowGraph.vertexSet() ); 438 439 flows.removeAll( getHeadFlows() ); 440 flows.removeAll( getTailFlows() ); 441 442 return flows; 443 } 444 445 protected TapGraph getTapGraph() 446 { 447 if( tapGraph == null ) 448 tapGraph = new TapGraph( flowGraph.vertexSet() ); 449 450 return tapGraph; 451 } 452 453 /** 454 * Method getSourceTaps returns all source Tap instances in this Cascade instance. 455 * <p/> 456 * That is, none of returned Tap instances are the sinks of other Flow instances in this Cascade. 457 * <p/> 458 * All {@link cascading.tap.CompositeTap} instances are unwound if addressed directly by a managed Flow instance. 459 * 460 * @return Collection<Tap> 461 */ 462 public Collection<Tap> getSourceTaps() 463 { 464 TapGraph tapGraph = getTapGraph(); 465 Set<Tap> taps = new HashSet<Tap>(); 466 467 for( Tap tap : tapGraph.vertexSet() ) 468 { 469 if( tapGraph.inDegreeOf( tap ) == 0 ) 470 taps.add( tap ); 471 } 472 473 return taps; 474 } 475 476 /** 477 * Method getSinkTaps returns all sink Tap instances in this Cascade instance. 478 * <p/> 479 * That is, none of returned Tap instances are the sources of other Flow instances in this Cascade. 480 * <p/> 481 * All {@link cascading.tap.CompositeTap} instances are unwound if addressed directly by a managed Flow instance. 482 * <p/> 483 * This method will return checkpoint Taps managed by Flow instances if not used as a source by other Flow instances. 484 * 485 * @return Collection<Tap> 486 */ 487 public Collection<Tap> getSinkTaps() 488 { 489 TapGraph tapGraph = getTapGraph(); 490 Set<Tap> taps = new HashSet<Tap>(); 491 492 for( Tap tap : tapGraph.vertexSet() ) 493 { 494 if( tapGraph.outDegreeOf( tap ) == 0 ) 495 taps.add( tap ); 496 } 497 498 return taps; 499 } 500 501 /** 502 * Method getCheckpointTaps returns all checkpoint Tap instances from all the Flow instances in this Cascade instance. 503 * 504 * @return Collection<Tap> 505 */ 506 public Collection<Tap> getCheckpointsTaps() 507 { 508 Set<Tap> taps = new HashSet<Tap>(); 509 510 for( Flow flow : getFlows() ) 511 taps.addAll( flow.getCheckpointsCollection() ); 512 513 return taps; 514 } 515 516 /** 517 * Method getIntermediateTaps returns all Tap instances that are neither at the source or sink of the flow graph. 518 * <p/> 519 * This method does consider checkpoint Taps managed by Flow instances in this Cascade instance. 520 * 521 * @return Collection<Flow> 522 */ 523 public Collection<Tap> getIntermediateTaps() 524 { 525 TapGraph tapGraph = getTapGraph(); 526 Set<Tap> taps = new HashSet<Tap>( tapGraph.vertexSet() ); 527 528 taps.removeAll( getSourceTaps() ); 529 taps.removeAll( getSinkTaps() ); 530 531 return taps; 532 } 533 534 /** 535 * Method getAllTaps returns all source, sink, and checkpoint Tap instances associated with the managed 536 * Flow instances in this Cascade instance. 537 * 538 * @return Collection<Tap> 539 */ 540 public Collection<Tap> getAllTaps() 541 { 542 return new HashSet<Tap>( getTapGraph().vertexSet() ); 543 } 544 545 /** 546 * Method getSuccessorFlows returns a Collection of all the Flow instances that will be 547 * executed after the given Flow instance. 548 * 549 * @param flow of type Flow 550 * @return Collection<Flow> 551 */ 552 public Collection<Flow> getSuccessorFlows( Flow flow ) 553 { 554 return Graphs.successorListOf( flowGraph, flow ); 555 } 556 557 /** 558 * Method getPredecessorFlows returns a Collection of all the Flow instances that will be 559 * executed before the given Flow instance. 560 * 561 * @param flow of type Flow 562 * @return Collection<Flow> 563 */ 564 public Collection<Flow> getPredecessorFlows( Flow flow ) 565 { 566 return Graphs.predecessorListOf( flowGraph, flow ); 567 } 568 569 /** 570 * Method findFlowsSourcingFrom returns all Flow instances that reads from a source with the given identifier. 571 * 572 * @param identifier of type String 573 * @return Collection<Flow> 574 */ 575 public Collection<Flow> findFlowsSourcingFrom( String identifier ) 576 { 577 try 578 { 579 return unwrapFlows( identifierGraph.outgoingEdgesOf( identifier ) ); 580 } 581 catch( Exception exception ) 582 { 583 return Collections.emptySet(); 584 } 585 } 586 587 /** 588 * Method findFlowsSinkingTo returns all Flow instances that writes to a sink with the given identifier. 589 * 590 * @param identifier of type String 591 * @return Collection<Flow> 592 */ 593 public Collection<Flow> findFlowsSinkingTo( String identifier ) 594 { 595 try 596 { 597 return unwrapFlows( identifierGraph.incomingEdgesOf( identifier ) ); 598 } 599 catch( Exception exception ) 600 { 601 return Collections.emptySet(); 602 } 603 } 604 605 private Collection<Flow> unwrapFlows( Set<BaseFlow.FlowHolder> flowHolders ) 606 { 607 Set<Flow> flows = new HashSet<Flow>(); 608 609 for( BaseFlow.FlowHolder flowHolder : flowHolders ) 610 flows.add( flowHolder.flow ); 611 612 return flows; 613 } 614 615 /** 616 * Method getFlowSkipStrategy returns the current {@link cascading.flow.FlowSkipStrategy} used by this Flow. 617 * 618 * @return FlowSkipStrategy 619 */ 620 public FlowSkipStrategy getFlowSkipStrategy() 621 { 622 return flowSkipStrategy; 623 } 624 625 /** 626 * Method setFlowSkipStrategy sets a new {@link cascading.flow.FlowSkipStrategy}, the current strategy, if any, is returned. 627 * If a strategy is given, it will be used as the strategy for all {@link cascading.flow.BaseFlow} instances managed by this Cascade instance. 628 * To revert back to consulting the strategies associated with each Flow instance, re-set this value to {@code null}, its 629 * default value. 630 * <p/> 631 * FlowSkipStrategy instances define when a Flow instance should be skipped. The default strategy is {@link cascading.flow.FlowSkipIfSinkNotStale} 632 * and is inherited from the Flow instance in question. An alternative strategy would be {@link cascading.flow.FlowSkipIfSinkExists}. 633 * <p/> 634 * A FlowSkipStrategy will not be consulted when executing a Flow directly through {@link #start()} 635 * 636 * @param flowSkipStrategy of type FlowSkipStrategy 637 * @return FlowSkipStrategy 638 */ 639 public FlowSkipStrategy setFlowSkipStrategy( FlowSkipStrategy flowSkipStrategy ) 640 { 641 try 642 { 643 return this.flowSkipStrategy; 644 } 645 finally 646 { 647 this.flowSkipStrategy = flowSkipStrategy; 648 } 649 } 650 651 @Override 652 public void prepare() 653 { 654 } 655 656 /** 657 * Method start begins the current Cascade process. It returns immediately. See method {@link #complete()} to block 658 * until the Cascade completes. 659 */ 660 public void start() 661 { 662 if( thread != null ) 663 return; 664 665 thread = new Thread( new Runnable() 666 { 667 @Override 668 public void run() 669 { 670 Cascade.this.run(); 671 } 672 }, ( "cascade " + Util.toNull( getName() ) ).trim() ); 673 674 thread.start(); 675 } 676 677 /** 678 * Method complete begins the current Cascade process if method {@link #start()} was not previously called. This method 679 * blocks until the process completes. 680 * 681 * @throws RuntimeException wrapping any exception thrown internally. 682 */ 683 public void complete() 684 { 685 start(); 686 687 try 688 { 689 try 690 { 691 thread.join(); 692 } 693 catch( InterruptedException exception ) 694 { 695 throw new FlowException( "thread interrupted", exception ); 696 } 697 698 if( throwable instanceof CascadingException ) 699 throw (CascadingException) throwable; 700 701 if( throwable != null ) 702 throw new CascadeException( "unhandled exception", throwable ); 703 } 704 finally 705 { 706 thread = null; 707 throwable = null; 708 shutdownHook = null; 709 cascadeStats.cleanup(); 710 } 711 } 712 713 public synchronized void stop() 714 { 715 if( stop ) 716 return; 717 718 stop = true; 719 720 fireOnStopping(); 721 722 if( !cascadeStats.isFinished() ) 723 cascadeStats.markStopped(); 724 725 internalStopAllFlows(); 726 handleExecutorShutdown(); 727 728 cascadeStats.cleanup(); 729 } 730 731 @Override 732 public void cleanup() 733 { 734 } 735 736 /** Method run implements the Runnable run method. */ 737 private void run() 738 { 739 Version.printBanner(); 740 741 if( LOG.isInfoEnabled() ) 742 logInfo( "starting" ); 743 744 registerShutdownHook(); 745 746 try 747 { 748 if( stop ) 749 return; 750 751 // mark started, not submitted 752 cascadeStats.markStartedThenRunning(); 753 754 fireOnStarting(); 755 756 initializeNewJobsMap(); 757 758 int numThreads = getMaxConcurrentFlows( properties, maxConcurrentFlows ); 759 760 if( numThreads == 0 ) 761 numThreads = jobsMap.size(); 762 763 int numLocalFlows = numLocalFlows(); 764 765 boolean runFlowsLocal = numLocalFlows > 1; 766 767 if( runFlowsLocal ) 768 numThreads = 1; 769 770 if( LOG.isInfoEnabled() ) 771 { 772 logInfo( " parallel execution is enabled: " + !runFlowsLocal ); 773 logInfo( " starting flows: " + jobsMap.size() ); 774 logInfo( " allocating threads: " + numThreads ); 775 } 776 777 List<Future<Throwable>> futures = spawnStrategy.start( this, numThreads, jobsMap.values() ); 778 779 for( Future<Throwable> future : futures ) 780 { 781 throwable = future.get(); 782 783 if( throwable != null ) 784 { 785 if( !stop ) 786 { 787 cascadeStats.markFailed( throwable ); 788 internalStopAllFlows(); 789 fireOnThrowable(); 790 } 791 792 handleExecutorShutdown(); 793 break; 794 } 795 } 796 } 797 catch( Throwable throwable ) 798 { 799 this.throwable = throwable; 800 } 801 finally 802 { 803 if( !cascadeStats.isFinished() ) 804 cascadeStats.markSuccessful(); 805 806 try 807 { 808 fireOnCompleted(); 809 } 810 finally 811 { 812 deregisterShutdownHook(); 813 } 814 } 815 } 816 817 private void registerShutdownHook() 818 { 819 if( !isStopJobsOnExit() ) 820 return; 821 822 shutdownHook = new ShutdownUtil.Hook() 823 { 824 @Override 825 public Priority priority() 826 { 827 return Priority.WORK_PARENT; 828 } 829 830 @Override 831 public void execute() 832 { 833 logInfo( "shutdown hook calling stop on cascade" ); 834 835 Cascade.this.stop(); 836 } 837 }; 838 839 ShutdownUtil.addHook( shutdownHook ); 840 } 841 842 private void deregisterShutdownHook() 843 { 844 if( !isStopJobsOnExit() || stop ) 845 return; 846 847 ShutdownUtil.removeHook( shutdownHook ); 848 } 849 850 private boolean isStopJobsOnExit() 851 { 852 if( getFlows().isEmpty() ) 853 return false; // don't bother registering hook 854 855 return getFlows().get( 0 ).isStopJobsOnExit(); 856 } 857 858 /** 859 * If the number of flows that are local is greater than one, force the Cascade to run without parallelization. 860 * 861 * @return of type int 862 */ 863 private int numLocalFlows() 864 { 865 int countLocalJobs = 0; 866 867 for( Flow flow : getFlows() ) 868 { 869 if( flow.stepsAreLocal() ) 870 countLocalJobs++; 871 } 872 873 return countLocalJobs; 874 } 875 876 private void initializeNewJobsMap() 877 { 878 synchronized( jobsMap ) 879 { 880 // keep topo order 881 TopologicalOrderIterator<Flow, Integer> topoIterator = flowGraph.getTopologicalIterator(); 882 883 while( topoIterator.hasNext() ) 884 { 885 Flow flow = topoIterator.next(); 886 887 cascadeStats.addFlowStats( flow.getFlowStats() ); 888 889 CascadeJob job = new CascadeJob( flow ); 890 891 jobsMap.put( flow.getName(), job ); 892 893 List<CascadeJob> predecessors = new ArrayList<CascadeJob>(); 894 895 for( Flow predecessor : Graphs.predecessorListOf( flowGraph, flow ) ) 896 predecessors.add( (CascadeJob) jobsMap.get( predecessor.getName() ) ); 897 898 job.init( predecessors ); 899 } 900 } 901 } 902 903 private void handleExecutorShutdown() 904 { 905 if( spawnStrategy.isCompleted( this ) ) 906 return; 907 908 logInfo( "shutting down flow executor" ); 909 910 try 911 { 912 spawnStrategy.complete( this, 5 * 60, TimeUnit.SECONDS ); 913 } 914 catch( InterruptedException exception ) 915 { 916 // ignore 917 } 918 919 logInfo( "shutdown complete" ); 920 } 921 922 private void internalStopAllFlows() 923 { 924 logInfo( "stopping all flows" ); 925 926 synchronized( jobsMap ) 927 { 928 List<Callable<Throwable>> jobs = new ArrayList<Callable<Throwable>>( jobsMap.values() ); 929 930 Collections.reverse( jobs ); 931 932 for( Callable<Throwable> callable : jobs ) 933 ( (CascadeJob) callable ).stop(); 934 } 935 936 logInfo( "stopped all flows" ); 937 } 938 939 /** 940 * Method writeDOT writes this element graph to a DOT file for easy visualization and debugging. 941 * 942 * @param filename of type String 943 */ 944 public void writeDOT( String filename ) 945 { 946 printElementGraph( filename, identifierGraph ); 947 } 948 949 protected void printElementGraph( String filename, SimpleDirectedGraph<String, BaseFlow.FlowHolder> graph ) 950 { 951 try 952 { 953 Writer writer = new FileWriter( filename ); 954 955 Util.writeDOT( writer, graph, new IntegerNameProvider<String>(), new VertexNameProvider<String>() 956 { 957 public String getVertexName( String object ) 958 { 959 return object.toString().replaceAll( "\"", "\'" ); 960 } 961 }, new EdgeNameProvider<BaseFlow.FlowHolder>() 962 { 963 public String getEdgeName( BaseFlow.FlowHolder object ) 964 { 965 return object.flow.getName().replaceAll( "\"", "\'" ).replaceAll( "\n", "\\\\n" ); // fix for newlines in graphviz 966 } 967 } 968 ); 969 970 writer.close(); 971 } 972 catch( IOException exception ) 973 { 974 LOG.error( "failed printing graph to: {}, with exception: {}", filename, exception ); 975 } 976 } 977 978 @Override 979 public String toString() 980 { 981 return getName(); 982 } 983 984 private void logDebug( String message ) 985 { 986 LOG.debug( "[" + Util.truncate( getName(), 25 ) + "] " + message ); 987 } 988 989 private void logInfo( String message ) 990 { 991 LOG.info( "[" + Util.truncate( getName(), 25 ) + "] " + message ); 992 } 993 994 private void logWarn( String message ) 995 { 996 logWarn( message, null ); 997 } 998 999 private void logWarn( String message, Throwable throwable ) 1000 { 1001 LOG.warn( "[" + Util.truncate( getName(), 25 ) + "] " + message, throwable ); 1002 } 1003 1004 /** Class CascadeJob manages Flow execution in the current Cascade instance. */ 1005 protected class CascadeJob implements Callable<Throwable> 1006 { 1007 /** Field flow */ 1008 final Flow flow; 1009 /** Field predecessors */ 1010 private List<CascadeJob> predecessors; 1011 /** Field latch */ 1012 private final CountDownLatch latch = new CountDownLatch( 1 ); 1013 /** Field stop */ 1014 private boolean stop = false; 1015 /** Field failed */ 1016 private boolean failed = false; 1017 1018 public CascadeJob( Flow flow ) 1019 { 1020 this.flow = flow; 1021 } 1022 1023 public String getName() 1024 { 1025 return flow.getName(); 1026 } 1027 1028 public Throwable call() 1029 { 1030 try 1031 { 1032 for( CascadeJob predecessor : predecessors ) 1033 { 1034 if( !predecessor.isSuccessful() ) 1035 return null; 1036 } 1037 1038 if( stop ) 1039 return null; 1040 1041 try 1042 { 1043 if( LOG.isInfoEnabled() ) 1044 logInfo( "starting flow: " + flow.getName() ); 1045 1046 if( flowSkipStrategy == null ? flow.isSkipFlow() : flowSkipStrategy.skipFlow( flow ) ) 1047 { 1048 if( LOG.isInfoEnabled() ) 1049 logInfo( "skipping flow: " + flow.getName() ); 1050 1051 flow.getFlowStats().markSkipped(); 1052 1053 return null; 1054 } 1055 1056 flow.prepare(); // do not delete append/update mode taps 1057 flow.complete(); 1058 1059 if( LOG.isInfoEnabled() ) 1060 logInfo( "completed flow: " + flow.getName() ); 1061 } 1062 catch( Throwable exception ) 1063 { 1064 logWarn( "flow failed: " + flow.getName(), exception ); 1065 failed = true; 1066 return new CascadeException( "flow failed: " + flow.getName(), exception ); 1067 } 1068 finally 1069 { 1070 flow.cleanup(); 1071 } 1072 } 1073 catch( Throwable throwable ) 1074 { 1075 failed = true; 1076 return throwable; 1077 } 1078 finally 1079 { 1080 latch.countDown(); 1081 } 1082 1083 return null; 1084 } 1085 1086 public void init( List<CascadeJob> predecessors ) 1087 { 1088 this.predecessors = predecessors; 1089 } 1090 1091 public void stop() 1092 { 1093 if( LOG.isInfoEnabled() ) 1094 logInfo( "stopping flow: " + flow.getName() ); 1095 1096 stop = true; 1097 1098 if( flow != null ) 1099 flow.stop(); 1100 } 1101 1102 public boolean isSuccessful() 1103 { 1104 try 1105 { 1106 latch.await(); 1107 1108 return flow != null && !failed; 1109 } 1110 catch( InterruptedException exception ) 1111 { 1112 logWarn( "latch interrupted", exception ); 1113 } 1114 1115 return false; 1116 } 1117 } 1118 1119 @Override 1120 public UnitOfWorkSpawnStrategy getSpawnStrategy() 1121 { 1122 return spawnStrategy; 1123 } 1124 1125 @Override 1126 public void setSpawnStrategy( UnitOfWorkSpawnStrategy spawnStrategy ) 1127 { 1128 this.spawnStrategy = spawnStrategy; 1129 } 1130 1131 /** 1132 * Class SafeCascadeListener safely calls a wrapped CascadeListener. 1133 * <p/> 1134 * This is done for a few reasons, the primary reason is so exceptions thrown by the Listener 1135 * can be caught by the calling Thread. Since Cascade is asynchronous, much of the work is done in the run() method 1136 * which in turn is run in a new Thread. 1137 */ 1138 private class SafeCascadeListener implements CascadeListener 1139 { 1140 /** Field flowListener */ 1141 final CascadeListener cascadeListener; 1142 /** Field throwable */ 1143 Throwable throwable; 1144 1145 private SafeCascadeListener( CascadeListener cascadeListener ) 1146 { 1147 this.cascadeListener = cascadeListener; 1148 } 1149 1150 public void onStarting( Cascade cascade ) 1151 { 1152 try 1153 { 1154 cascadeListener.onStarting( cascade ); 1155 } 1156 catch( Throwable throwable ) 1157 { 1158 handleThrowable( throwable ); 1159 } 1160 } 1161 1162 public void onStopping( Cascade cascade ) 1163 { 1164 try 1165 { 1166 cascadeListener.onStopping( cascade ); 1167 } 1168 catch( Throwable throwable ) 1169 { 1170 handleThrowable( throwable ); 1171 } 1172 } 1173 1174 public void onCompleted( Cascade cascade ) 1175 { 1176 try 1177 { 1178 cascadeListener.onCompleted( cascade ); 1179 } 1180 catch( Throwable throwable ) 1181 { 1182 handleThrowable( throwable ); 1183 } 1184 } 1185 1186 public boolean onThrowable( Cascade cascade, Throwable flowThrowable ) 1187 { 1188 try 1189 { 1190 return cascadeListener.onThrowable( cascade, flowThrowable ); 1191 } 1192 catch( Throwable throwable ) 1193 { 1194 handleThrowable( throwable ); 1195 } 1196 1197 return false; 1198 } 1199 1200 private void handleThrowable( Throwable throwable ) 1201 { 1202 this.throwable = throwable; 1203 1204 logWarn( String.format( "cascade listener %s threw throwable", cascadeListener ), throwable ); 1205 1206 // stop this flow 1207 stop(); 1208 } 1209 1210 public boolean equals( Object object ) 1211 { 1212 if( object instanceof SafeCascadeListener ) 1213 return cascadeListener.equals( ( (SafeCascadeListener) object ).cascadeListener ); 1214 1215 return cascadeListener.equals( object ); 1216 } 1217 1218 public int hashCode() 1219 { 1220 return cascadeListener.hashCode(); 1221 } 1222 } 1223 }