001/* 002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.cascade; 022 023import java.io.FileWriter; 024import java.io.IOException; 025import java.io.Writer; 026import java.util.ArrayList; 027import java.util.Collection; 028import java.util.Collections; 029import java.util.HashSet; 030import java.util.LinkedHashMap; 031import java.util.LinkedList; 032import java.util.List; 033import java.util.Map; 034import java.util.Set; 035import java.util.concurrent.Callable; 036import java.util.concurrent.CountDownLatch; 037import java.util.concurrent.Future; 038import java.util.concurrent.TimeUnit; 039 040import cascading.CascadingException; 041import cascading.cascade.planner.FlowGraph; 042import cascading.cascade.planner.IdentifierGraph; 043import cascading.cascade.planner.TapGraph; 044import cascading.flow.BaseFlow; 045import cascading.flow.Flow; 046import cascading.flow.FlowException; 047import cascading.flow.FlowSkipStrategy; 048import cascading.flow.Flows; 049import cascading.management.CascadingServices; 050import cascading.management.UnitOfWorkExecutorStrategy; 051import cascading.management.UnitOfWorkSpawnStrategy; 052import cascading.management.state.ClientState; 053import cascading.stats.CascadeStats; 054import cascading.tap.Tap; 055import cascading.util.ProcessLogger; 056import cascading.util.ShutdownUtil; 057import cascading.util.Util; 058import cascading.util.Version; 059import cascading.util.jgrapht.EdgeNameProvider; 060import cascading.util.jgrapht.IntegerNameProvider; 061import cascading.util.jgrapht.VertexNameProvider; 062import org.jgrapht.Graphs; 063import org.jgrapht.graph.SimpleDirectedGraph; 064import org.jgrapht.traverse.TopologicalOrderIterator; 065import org.slf4j.Logger; 066import org.slf4j.LoggerFactory; 067 068import static cascading.property.PropertyUtil.getProperty; 069 070public class BaseCascade implements ProcessLogger, Cascade 071 { 072 /** Field LOG */ 073 private static final Logger LOG = LoggerFactory.getLogger( Cascade.class ); 074 075 /** Field id */ 076 private String id; 077 /** Field name */ 078 private final String name; 079 /** Field tags */ 080 private String tags; 081 /** Field properties */ 082 private final Map<Object, Object> properties; 083 /** Fields listeners */ 084 private List<SafeCascadeListener> listeners; 085 /** Field jobGraph */ 086 private final FlowGraph flowGraph; 087 /** Field tapGraph */ 088 private final IdentifierGraph identifierGraph; 089 /** Field cascadeStats */ 090 private final CascadeStats cascadeStats; 091 /** Field cascadingServices */ 092 private CascadingServices cascadingServices; 093 /** Field thread */ 094 private Thread thread; 095 /** Field throwable */ 096 private Throwable throwable; 097 private transient UnitOfWorkSpawnStrategy spawnStrategy = new UnitOfWorkExecutorStrategy(); 098 /** Field shutdownHook */ 099 private ShutdownUtil.Hook shutdownHook; 100 /** Field jobsMap */ 101 private final Map<String, Callable<Throwable>> jobsMap = new LinkedHashMap<>(); 102 /** Field stop */ 103 private boolean stop; 104 /** Field flowSkipStrategy */ 105 private FlowSkipStrategy flowSkipStrategy = null; 106 /** Field maxConcurrentFlows */ 107 private int maxConcurrentFlows = 0; 108 109 /** Field tapGraph * */ 110 private transient TapGraph tapGraph; 111 112 static int getMaxConcurrentFlows( Map<Object, Object> properties, int maxConcurrentFlows ) 113 { 114 if( maxConcurrentFlows != -1 ) // CascadeDef is -1 by default 115 return maxConcurrentFlows; 116 117 return Integer.parseInt( getProperty( properties, CascadeProps.MAX_CONCURRENT_FLOWS, "0" ) ); 118 } 119 120 /** for testing */ 121 protected BaseCascade() 122 { 123 this.name = null; 124 this.tags = null; 125 this.properties = null; 126 this.flowGraph = null; 127 this.identifierGraph = null; 128 this.cascadeStats = null; 129 } 130 131 BaseCascade( CascadeDef cascadeDef, Map<Object, Object> properties, FlowGraph flowGraph, IdentifierGraph identifierGraph ) 132 { 133 this.name = cascadeDef.getName(); 134 this.tags = cascadeDef.getTags(); 135 this.properties = properties; 136 this.flowGraph = flowGraph; 137 this.identifierGraph = identifierGraph; 138 this.cascadeStats = createPrepareCascadeStats(); 139 setIDOnFlow(); 140 this.maxConcurrentFlows = cascadeDef.getMaxConcurrentFlows(); 141 142 addListeners( getAllTaps() ); 143 } 144 145 private CascadeStats createPrepareCascadeStats() 146 { 147 CascadeStats cascadeStats = new CascadeStats( this, getClientState() ); 148 149 cascadeStats.prepare(); 150 cascadeStats.markPending(); 151 152 return cascadeStats; 153 } 154 155 /** 156 * Method getName returns the name of this Cascade object. 157 * 158 * @return the name (type String) of this Cascade object. 159 */ 160 @Override 161 public String getName() 162 { 163 return name; 164 } 165 166 /** 167 * Method getID returns the ID of this Cascade object. 168 * <p/> 169 * The ID value is a long HEX String used to identify this instance globally. Subsequent Cascade 170 * instances created with identical parameters will not return the same ID. 171 * 172 * @return the ID (type String) of this Cascade object. 173 */ 174 @Override 175 public String getID() 176 { 177 if( id == null ) 178 id = Util.createUniqueID(); 179 180 return id; 181 } 182 183 /** 184 * Method getTags returns the tags associated with this Cascade object. 185 * 186 * @return the tags (type String) of this Cascade object. 187 */ 188 @Override 189 public String getTags() 190 { 191 return tags; 192 } 193 194 void addListeners( Collection listeners ) 195 { 196 for( Object listener : listeners ) 197 { 198 if( listener instanceof CascadeListener ) 199 addListener( (CascadeListener) listener ); 200 } 201 } 202 203 List<SafeCascadeListener> getListeners() 204 { 205 if( listeners == null ) 206 listeners = new LinkedList<SafeCascadeListener>(); 207 208 return listeners; 209 } 210 211 @Override 212 public boolean hasListeners() 213 { 214 return listeners != null && !listeners.isEmpty(); 215 } 216 217 @Override 218 public void addListener( CascadeListener cascadeListener ) 219 { 220 getListeners().add( new SafeCascadeListener( cascadeListener ) ); 221 } 222 223 @Override 224 public boolean removeListener( CascadeListener flowListener ) 225 { 226 return getListeners().remove( new SafeCascadeListener( flowListener ) ); 227 } 228 229 private void fireOnCompleted() 230 { 231 if( hasListeners() ) 232 { 233 if( isDebugEnabled() ) 234 logDebug( "firing onCompleted event: " + getListeners().size() ); 235 236 for( CascadeListener cascadeListener : getListeners() ) 237 cascadeListener.onCompleted( this ); 238 } 239 } 240 241 private void fireOnThrowable() 242 { 243 if( hasListeners() ) 244 { 245 if( isDebugEnabled() ) 246 logDebug( "firing onThrowable event: " + getListeners().size() ); 247 248 boolean isHandled = false; 249 250 for( CascadeListener cascadeListener : getListeners() ) 251 isHandled = cascadeListener.onThrowable( this, throwable ) || isHandled; 252 253 if( isHandled ) 254 throwable = null; 255 } 256 } 257 258 protected void fireOnStopping() 259 { 260 if( hasListeners() ) 261 { 262 if( isDebugEnabled() ) 263 logDebug( "firing onStopping event: " + getListeners().size() ); 264 265 for( CascadeListener cascadeListener : getListeners() ) 266 cascadeListener.onStopping( this ); 267 } 268 } 269 270 protected void fireOnStarting() 271 { 272 if( hasListeners() ) 273 { 274 if( isDebugEnabled() ) 275 logDebug( "firing onStarting event: " + getListeners().size() ); 276 277 for( CascadeListener cascadeListener : getListeners() ) 278 cascadeListener.onStarting( this ); 279 } 280 } 281 282 private CascadingServices getCascadingServices() 283 { 284 if( cascadingServices == null ) 285 cascadingServices = new CascadingServices( properties ); 286 287 return cascadingServices; 288 } 289 290 private ClientState getClientState() 291 { 292 return getCascadingServices().createClientState( getID() ); 293 } 294 295 @Override 296 public CascadeStats getCascadeStats() 297 { 298 return cascadeStats; 299 } 300 301 @Override 302 public CascadeStats getStats() 303 { 304 return getCascadeStats(); 305 } 306 307 private void setIDOnFlow() 308 { 309 for( Flow<?> flow : getFlows() ) 310 ( (BaseFlow<?>) flow ).setCascade( this ); 311 } 312 313 protected FlowGraph getFlowGraph() 314 { 315 return flowGraph; 316 } 317 318 protected IdentifierGraph getIdentifierGraph() 319 { 320 return identifierGraph; 321 } 322 323 @Override 324 public List<Flow> getFlows() 325 { 326 List<Flow> flows = new LinkedList<Flow>(); 327 TopologicalOrderIterator<Flow, Integer> topoIterator = flowGraph.getTopologicalIterator(); 328 329 while( topoIterator.hasNext() ) 330 flows.add( topoIterator.next() ); 331 332 return flows; 333 } 334 335 @Override 336 public List<Flow> findFlows( String regex ) 337 { 338 List<Flow> flows = new ArrayList<Flow>(); 339 340 for( Flow flow : getFlows() ) 341 { 342 if( flow.getName().matches( regex ) ) 343 flows.add( flow ); 344 } 345 346 return flows; 347 } 348 349 @Override 350 public Collection<Flow> getHeadFlows() 351 { 352 Set<Flow> flows = new HashSet<Flow>(); 353 354 for( Flow flow : flowGraph.vertexSet() ) 355 { 356 if( flowGraph.inDegreeOf( flow ) == 0 ) 357 flows.add( flow ); 358 } 359 360 return flows; 361 } 362 363 @Override 364 public Collection<Flow> getTailFlows() 365 { 366 Set<Flow> flows = new HashSet<Flow>(); 367 368 for( Flow flow : flowGraph.vertexSet() ) 369 { 370 if( flowGraph.outDegreeOf( flow ) == 0 ) 371 flows.add( flow ); 372 } 373 374 return flows; 375 } 376 377 @Override 378 public Collection<Flow> getIntermediateFlows() 379 { 380 Set<Flow> flows = new HashSet<Flow>( flowGraph.vertexSet() ); 381 382 flows.removeAll( getHeadFlows() ); 383 flows.removeAll( getTailFlows() ); 384 385 return flows; 386 } 387 388 protected TapGraph getTapGraph() 389 { 390 if( tapGraph == null ) 391 tapGraph = new TapGraph( flowGraph.vertexSet() ); 392 393 return tapGraph; 394 } 395 396 @Override 397 public Collection<Tap> getSourceTaps() 398 { 399 TapGraph tapGraph = getTapGraph(); 400 Set<Tap> taps = new HashSet<Tap>(); 401 402 for( Tap tap : tapGraph.vertexSet() ) 403 { 404 if( tapGraph.inDegreeOf( tap ) == 0 ) 405 taps.add( tap ); 406 } 407 408 return taps; 409 } 410 411 @Override 412 public Collection<Tap> getSinkTaps() 413 { 414 TapGraph tapGraph = getTapGraph(); 415 Set<Tap> taps = new HashSet<Tap>(); 416 417 for( Tap tap : tapGraph.vertexSet() ) 418 { 419 if( tapGraph.outDegreeOf( tap ) == 0 ) 420 taps.add( tap ); 421 } 422 423 return taps; 424 } 425 426 @Override 427 public Collection<Tap> getCheckpointsTaps() 428 { 429 Set<Tap> taps = new HashSet<Tap>(); 430 431 for( Flow flow : getFlows() ) 432 taps.addAll( flow.getCheckpointsCollection() ); 433 434 return taps; 435 } 436 437 @Override 438 public Collection<Tap> getIntermediateTaps() 439 { 440 TapGraph tapGraph = getTapGraph(); 441 Set<Tap> taps = new HashSet<Tap>( tapGraph.vertexSet() ); 442 443 taps.removeAll( getSourceTaps() ); 444 taps.removeAll( getSinkTaps() ); 445 446 return taps; 447 } 448 449 @Override 450 public Collection<Tap> getAllTaps() 451 { 452 return new HashSet<Tap>( getTapGraph().vertexSet() ); 453 } 454 455 @Override 456 public Collection<Flow> getSuccessorFlows( Flow flow ) 457 { 458 return Graphs.successorListOf( flowGraph, flow ); 459 } 460 461 @Override 462 public Collection<Flow> getPredecessorFlows( Flow flow ) 463 { 464 return Graphs.predecessorListOf( flowGraph, flow ); 465 } 466 467 @Override 468 public Collection<Flow> findFlowsSourcingFrom( String identifier ) 469 { 470 try 471 { 472 return unwrapFlows( identifierGraph.outgoingEdgesOf( identifier ) ); 473 } 474 catch( Exception exception ) 475 { 476 return Collections.emptySet(); 477 } 478 } 479 480 @Override 481 public Collection<Flow> findFlowsSinkingTo( String identifier ) 482 { 483 try 484 { 485 return unwrapFlows( identifierGraph.incomingEdgesOf( identifier ) ); 486 } 487 catch( Exception exception ) 488 { 489 return Collections.emptySet(); 490 } 491 } 492 493 private Collection<Flow> unwrapFlows( Set<BaseFlow.FlowHolder> flowHolders ) 494 { 495 Set<Flow> flows = new HashSet<Flow>(); 496 497 for( BaseFlow.FlowHolder flowHolder : flowHolders ) 498 flows.add( flowHolder.flow ); 499 500 return flows; 501 } 502 503 @Override 504 public FlowSkipStrategy getFlowSkipStrategy() 505 { 506 return flowSkipStrategy; 507 } 508 509 @Override 510 public FlowSkipStrategy setFlowSkipStrategy( FlowSkipStrategy flowSkipStrategy ) 511 { 512 try 513 { 514 return this.flowSkipStrategy; 515 } 516 finally 517 { 518 this.flowSkipStrategy = flowSkipStrategy; 519 } 520 } 521 522 @Override 523 public void prepare() 524 { 525 } 526 527 @Override 528 public void start() 529 { 530 if( thread != null ) 531 return; 532 533 thread = new Thread( new Runnable() 534 { 535 @Override 536 public void run() 537 { 538 BaseCascade.this.run(); 539 } 540 }, ( "cascade " + Util.toNull( getName() ) ).trim() ); 541 542 thread.start(); 543 } 544 545 @Override 546 public void complete() 547 { 548 start(); 549 550 try 551 { 552 try 553 { 554 thread.join(); 555 } 556 catch( InterruptedException exception ) 557 { 558 throw new FlowException( "thread interrupted", exception ); 559 } 560 561 if( throwable instanceof CascadingException ) 562 throw (CascadingException) throwable; 563 564 if( throwable != null ) 565 throw new CascadeException( "unhandled exception", throwable ); 566 } 567 finally 568 { 569 thread = null; 570 throwable = null; 571 shutdownHook = null; 572 cascadeStats.cleanup(); 573 } 574 } 575 576 @Override 577 public synchronized void stop() 578 { 579 if( stop ) 580 return; 581 582 stop = true; 583 584 fireOnStopping(); 585 586 if( !cascadeStats.isFinished() ) 587 cascadeStats.markStopped(); 588 589 internalStopAllFlows(); 590 handleExecutorShutdown(); 591 592 cascadeStats.cleanup(); 593 } 594 595 @Override 596 public void cleanup() 597 { 598 } 599 600 /** Method run implements the Runnable run method. */ 601 private void run() 602 { 603 Version.printBanner(); 604 605 if( LOG.isInfoEnabled() ) 606 logInfo( "starting" ); 607 608 registerShutdownHook(); 609 610 try 611 { 612 if( stop ) 613 return; 614 615 // mark started, not submitted 616 cascadeStats.markStartedThenRunning(); 617 618 fireOnStarting(); 619 620 initializeNewJobsMap(); 621 622 int numThreads = getMaxConcurrentFlows( properties, maxConcurrentFlows ); 623 624 if( numThreads == 0 ) 625 numThreads = jobsMap.size(); 626 627 int numLocalFlows = numLocalFlows(); 628 629 boolean runFlowsLocal = numLocalFlows > 1; 630 631 if( runFlowsLocal ) 632 numThreads = 1; 633 634 if( isInfoEnabled() ) 635 { 636 logInfo( " parallel execution of flows is enabled: " + ( numThreads != 1 ) ); 637 logInfo( " executing total flows: " + jobsMap.size() ); 638 logInfo( " allocating management threads: " + numThreads ); 639 } 640 641 List<Future<Throwable>> futures = spawnStrategy.start( this, numThreads, jobsMap.values() ); 642 643 for( Future<Throwable> future : futures ) 644 { 645 throwable = future.get(); 646 647 if( throwable != null ) 648 { 649 if( !stop ) 650 { 651 if( !cascadeStats.isFinished() ) 652 cascadeStats.markFailed( throwable ); 653 internalStopAllFlows(); 654 fireOnThrowable(); 655 } 656 657 handleExecutorShutdown(); 658 break; 659 } 660 } 661 } 662 catch( Throwable throwable ) 663 { 664 this.throwable = throwable; 665 } 666 finally 667 { 668 if( !cascadeStats.isFinished() ) 669 cascadeStats.markSuccessful(); 670 671 try 672 { 673 fireOnCompleted(); 674 } 675 finally 676 { 677 deregisterShutdownHook(); 678 } 679 } 680 } 681 682 private void registerShutdownHook() 683 { 684 if( !isStopJobsOnExit() ) 685 return; 686 687 shutdownHook = new ShutdownUtil.Hook() 688 { 689 @Override 690 public Priority priority() 691 { 692 return Priority.WORK_PARENT; 693 } 694 695 @Override 696 public void execute() 697 { 698 logInfo( "shutdown hook calling stop on cascade" ); 699 700 BaseCascade.this.stop(); 701 } 702 }; 703 704 ShutdownUtil.addHook( shutdownHook ); 705 } 706 707 private void deregisterShutdownHook() 708 { 709 if( !isStopJobsOnExit() || stop ) 710 return; 711 712 ShutdownUtil.removeHook( shutdownHook ); 713 } 714 715 private boolean isStopJobsOnExit() 716 { 717 if( getFlows().isEmpty() ) 718 return false; // don't bother registering hook 719 720 return getFlows().get( 0 ).isStopJobsOnExit(); 721 } 722 723 /** 724 * If the number of flows that are local is greater than one, force the Cascade to run without parallelization. 725 * 726 * @return of type int 727 */ 728 private int numLocalFlows() 729 { 730 int countLocalJobs = 0; 731 732 for( Flow flow : getFlows() ) 733 { 734 if( flow.stepsAreLocal() ) 735 countLocalJobs++; 736 } 737 738 return countLocalJobs; 739 } 740 741 private void initializeNewJobsMap() 742 { 743 synchronized( jobsMap ) 744 { 745 // keep topo order 746 TopologicalOrderIterator<Flow, Integer> topoIterator = flowGraph.getTopologicalIterator(); 747 748 while( topoIterator.hasNext() ) 749 { 750 Flow flow = topoIterator.next(); 751 752 cascadeStats.addFlowStats( flow.getFlowStats() ); 753 754 CascadeJob job = new CascadeJob( flow ); 755 756 jobsMap.put( flow.getName(), job ); 757 758 List<CascadeJob> predecessors = new ArrayList<CascadeJob>(); 759 760 for( Flow predecessor : Graphs.predecessorListOf( flowGraph, flow ) ) 761 predecessors.add( (CascadeJob) jobsMap.get( predecessor.getName() ) ); 762 763 job.init( predecessors ); 764 } 765 } 766 } 767 768 private void handleExecutorShutdown() 769 { 770 if( spawnStrategy.isCompleted( this ) ) 771 return; 772 773 logInfo( "shutting down flow executor" ); 774 775 try 776 { 777 spawnStrategy.complete( this, 5 * 60, TimeUnit.SECONDS ); 778 } 779 catch( InterruptedException exception ) 780 { 781 // ignore 782 } 783 784 logInfo( "shutdown complete" ); 785 } 786 787 private void internalStopAllFlows() 788 { 789 logInfo( "stopping all flows" ); 790 791 synchronized( jobsMap ) 792 { 793 List<Callable<Throwable>> jobs = new ArrayList<Callable<Throwable>>( jobsMap.values() ); 794 795 Collections.reverse( jobs ); 796 797 for( Callable<Throwable> callable : jobs ) 798 ( (CascadeJob) callable ).stop(); 799 } 800 801 logInfo( "stopped all flows" ); 802 } 803 804 @Override 805 public void writeDOT( String filename ) 806 { 807 printElementGraph( filename, identifierGraph ); 808 } 809 810 protected void printElementGraph( String filename, SimpleDirectedGraph<String, BaseFlow.FlowHolder> graph ) 811 { 812 try 813 { 814 Writer writer = new FileWriter( filename ); 815 816 Util.writeDOT( writer, graph, new IntegerNameProvider<String>(), new VertexNameProvider<String>() 817 { 818 public String getVertexName( String object ) 819 { 820 return object.toString().replaceAll( "\"", "\'" ); 821 } 822 }, new EdgeNameProvider<BaseFlow.FlowHolder>() 823 { 824 public String getEdgeName( BaseFlow.FlowHolder object ) 825 { 826 return object.flow.getName().replaceAll( "\"", "\'" ).replaceAll( "\n", "\\\\n" ); // fix for newlines in graphviz 827 } 828 } 829 ); 830 831 writer.close(); 832 } 833 catch( IOException exception ) 834 { 835 logError( "failed printing graph to: {}, with exception: {}", filename, exception ); 836 } 837 } 838 839 @Override 840 public String toString() 841 { 842 return getName(); 843 } 844 845 @Override 846 public boolean isInfoEnabled() 847 { 848 return LOG.isInfoEnabled(); 849 } 850 851 @Override 852 public boolean isDebugEnabled() 853 { 854 return LOG.isDebugEnabled(); 855 } 856 857 @Override 858 public void logInfo( String message, Object... arguments ) 859 { 860 LOG.info( "[" + Util.truncate( getName(), 25 ) + "] " + message, arguments ); 861 } 862 863 @Override 864 public void logDebug( String message, Object... arguments ) 865 { 866 LOG.debug( "[" + Util.truncate( getName(), 25 ) + "] " + message, arguments ); 867 } 868 869 @Override 870 public void logWarn( String message ) 871 { 872 LOG.warn( "[" + Util.truncate( getName(), 25 ) + "] " + message ); 873 } 874 875 @Override 876 public void logWarn( String message, Throwable throwable ) 877 { 878 LOG.warn( "[" + Util.truncate( getName(), 25 ) + "] " + message, throwable ); 879 } 880 881 @Override 882 public void logWarn( String message, Object... arguments ) 883 { 884 LOG.warn( "[" + Util.truncate( getName(), 25 ) + "] " + message, arguments ); 885 } 886 887 @Override 888 public void logError( String message, Object... arguments ) 889 { 890 LOG.warn( "[" + Util.truncate( getName(), 25 ) + "] " + message, arguments ); 891 } 892 893 @Override 894 public void logError( String message, Throwable throwable ) 895 { 896 LOG.warn( "[" + Util.truncate( getName(), 25 ) + "] " + message, throwable ); 897 } 898 899 /** Class CascadeJob manages Flow execution in the current Cascade instance. */ 900 protected class CascadeJob implements Callable<Throwable> 901 { 902 /** Field flow */ 903 final Flow flow; 904 /** Field predecessors */ 905 private List<CascadeJob> predecessors; 906 /** Field latch */ 907 private final CountDownLatch latch = new CountDownLatch( 1 ); 908 /** Field stop */ 909 private boolean stop = false; 910 /** Field failed */ 911 private boolean failed = false; 912 913 public CascadeJob( Flow flow ) 914 { 915 this.flow = flow; 916 } 917 918 public String getName() 919 { 920 return flow.getName(); 921 } 922 923 public Throwable call() 924 { 925 try 926 { 927 for( CascadeJob predecessor : predecessors ) 928 { 929 if( !predecessor.isSuccessful() ) 930 return null; 931 } 932 933 if( stop || cascadeStats.isFinished() ) 934 return null; 935 936 try 937 { 938 if( LOG.isInfoEnabled() ) 939 logInfo( "starting flow: " + flow.getName() ); 940 941 if( flowSkipStrategy == null ? flow.isSkipFlow() : flowSkipStrategy.skipFlow( flow ) ) 942 { 943 if( LOG.isInfoEnabled() ) 944 logInfo( "skipping flow: " + flow.getName() ); 945 946 flow.getFlowStats().markSkipped(); 947 Flows.fireOnCompleted( flow ); 948 949 return null; 950 } 951 952 flow.prepare(); // do not delete append/update mode taps 953 flow.complete(); 954 955 if( LOG.isInfoEnabled() ) 956 logInfo( "completed flow: " + flow.getName() ); 957 } 958 catch( Throwable exception ) 959 { 960 failed = true; 961 logWarn( "flow failed: " + flow.getName(), exception ); 962 963 CascadeException cascadeException = new CascadeException( "flow failed: " + flow.getName(), exception ); 964 965 if( !cascadeStats.isFinished() ) 966 cascadeStats.markFailed( cascadeException ); 967 968 return cascadeException; 969 } 970 finally 971 { 972 flow.cleanup(); 973 } 974 } 975 catch( Throwable throwable ) 976 { 977 failed = true; 978 return throwable; 979 } 980 finally 981 { 982 latch.countDown(); 983 } 984 985 return null; 986 } 987 988 public void init( List<CascadeJob> predecessors ) 989 { 990 this.predecessors = predecessors; 991 } 992 993 public void stop() 994 { 995 if( LOG.isInfoEnabled() ) 996 logInfo( "stopping flow: " + flow.getName() ); 997 998 stop = true; 999 1000 if( flow != null ) 1001 flow.stop(); 1002 } 1003 1004 public boolean isSuccessful() 1005 { 1006 try 1007 { 1008 latch.await(); 1009 1010 return flow != null && !failed && !stop; 1011 } 1012 catch( InterruptedException exception ) 1013 { 1014 logWarn( "latch interrupted", exception ); 1015 } 1016 1017 return false; 1018 } 1019 } 1020 1021 @Override 1022 public UnitOfWorkSpawnStrategy getSpawnStrategy() 1023 { 1024 return spawnStrategy; 1025 } 1026 1027 @Override 1028 public void setSpawnStrategy( UnitOfWorkSpawnStrategy spawnStrategy ) 1029 { 1030 this.spawnStrategy = spawnStrategy; 1031 } 1032 1033 /** 1034 * Class SafeCascadeListener safely calls a wrapped CascadeListener. 1035 * <p/> 1036 * This is done for a few reasons, the primary reason is so exceptions thrown by the Listener 1037 * can be caught by the calling Thread. Since Cascade is asynchronous, much of the work is done in the run() method 1038 * which in turn is run in a new Thread. 1039 */ 1040 private class SafeCascadeListener implements CascadeListener 1041 { 1042 /** Field flowListener */ 1043 final CascadeListener cascadeListener; 1044 /** Field throwable */ 1045 Throwable throwable; 1046 1047 private SafeCascadeListener( CascadeListener cascadeListener ) 1048 { 1049 this.cascadeListener = cascadeListener; 1050 } 1051 1052 public void onStarting( Cascade cascade ) 1053 { 1054 try 1055 { 1056 cascadeListener.onStarting( cascade ); 1057 } 1058 catch( Throwable throwable ) 1059 { 1060 handleThrowable( throwable ); 1061 } 1062 } 1063 1064 public void onStopping( Cascade cascade ) 1065 { 1066 try 1067 { 1068 cascadeListener.onStopping( cascade ); 1069 } 1070 catch( Throwable throwable ) 1071 { 1072 handleThrowable( throwable ); 1073 } 1074 } 1075 1076 public void onCompleted( Cascade cascade ) 1077 { 1078 try 1079 { 1080 cascadeListener.onCompleted( cascade ); 1081 } 1082 catch( Throwable throwable ) 1083 { 1084 handleThrowable( throwable ); 1085 } 1086 } 1087 1088 public boolean onThrowable( Cascade cascade, Throwable flowThrowable ) 1089 { 1090 try 1091 { 1092 return cascadeListener.onThrowable( cascade, flowThrowable ); 1093 } 1094 catch( Throwable throwable ) 1095 { 1096 handleThrowable( throwable ); 1097 } 1098 1099 return false; 1100 } 1101 1102 private void handleThrowable( Throwable throwable ) 1103 { 1104 this.throwable = throwable; 1105 1106 logWarn( String.format( "cascade listener %s threw throwable", cascadeListener ), throwable ); 1107 1108 // stop this flow 1109 stop(); 1110 } 1111 1112 public boolean equals( Object object ) 1113 { 1114 if( object instanceof SafeCascadeListener ) 1115 return cascadeListener.equals( ( (SafeCascadeListener) object ).cascadeListener ); 1116 1117 return cascadeListener.equals( object ); 1118 } 1119 1120 public int hashCode() 1121 { 1122 return cascadeListener.hashCode(); 1123 } 1124 } 1125 }