001/* 002 * Copyright (c) 2016-2018 Chris K Wensel <chris@wensel.net>. All Rights Reserved. 003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 004 * 005 * Project and contact information: http://www.cascading.org/ 006 * 007 * This file is part of the Cascading project. 008 * 009 * Licensed under the Apache License, Version 2.0 (the "License"); 010 * you may not use this file except in compliance with the License. 011 * You may obtain a copy of the License at 012 * 013 * http://www.apache.org/licenses/LICENSE-2.0 014 * 015 * Unless required by applicable law or agreed to in writing, software 016 * distributed under the License is distributed on an "AS IS" BASIS, 017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 018 * See the License for the specific language governing permissions and 019 * limitations under the License. 020 */ 021 022package cascading.cascade; 023 024import java.io.FileWriter; 025import java.io.IOException; 026import java.io.Writer; 027import java.util.ArrayList; 028import java.util.Collection; 029import java.util.Collections; 030import java.util.HashSet; 031import java.util.LinkedHashMap; 032import java.util.LinkedList; 033import java.util.List; 034import java.util.Map; 035import java.util.Set; 036import java.util.concurrent.Callable; 037import java.util.concurrent.CountDownLatch; 038import java.util.concurrent.Future; 039import java.util.concurrent.TimeUnit; 040 041import cascading.CascadingException; 042import cascading.cascade.planner.FlowGraph; 043import cascading.cascade.planner.IdentifierGraph; 044import cascading.cascade.planner.TapGraph; 045import cascading.flow.BaseFlow; 046import cascading.flow.Flow; 047import cascading.flow.FlowException; 048import cascading.flow.FlowSkipStrategy; 049import cascading.flow.Flows; 050import cascading.management.CascadingServices; 051import cascading.management.UnitOfWorkExecutorStrategy; 052import cascading.management.UnitOfWorkSpawnStrategy; 053import cascading.management.state.ClientState; 054import cascading.stats.CascadeStats; 055import cascading.tap.Tap; 056import cascading.util.ProcessLogger; 057import cascading.util.ShutdownUtil; 058import cascading.util.Util; 059import cascading.util.Version; 060import cascading.util.jgrapht.EdgeNameProvider; 061import cascading.util.jgrapht.IntegerNameProvider; 062import cascading.util.jgrapht.VertexNameProvider; 063import org.jgrapht.Graphs; 064import org.jgrapht.graph.SimpleDirectedGraph; 065import org.jgrapht.traverse.TopologicalOrderIterator; 066import org.slf4j.Logger; 067import org.slf4j.LoggerFactory; 068 069import static cascading.property.PropertyUtil.getProperty; 070 071public class BaseCascade implements ProcessLogger, Cascade 072 { 073 /** Field LOG */ 074 private static final Logger LOG = LoggerFactory.getLogger( Cascade.class ); 075 076 /** Field id */ 077 private String id; 078 /** Field name */ 079 private final String name; 080 /** Field tags */ 081 private String tags; 082 /** Field properties */ 083 private final Map<Object, Object> properties; 084 /** Fields listeners */ 085 private List<SafeCascadeListener> listeners; 086 /** Field jobGraph */ 087 private final FlowGraph flowGraph; 088 /** Field tapGraph */ 089 private final IdentifierGraph identifierGraph; 090 /** Field cascadeStats */ 091 private final CascadeStats cascadeStats; 092 /** Field cascadingServices */ 093 private CascadingServices cascadingServices; 094 /** Field thread */ 095 private Thread thread; 096 /** Field throwable */ 097 private Throwable throwable; 098 private transient UnitOfWorkSpawnStrategy spawnStrategy = new UnitOfWorkExecutorStrategy(); 099 /** Field shutdownHook */ 100 private ShutdownUtil.Hook shutdownHook; 101 /** Field jobsMap */ 102 private final Map<String, Callable<Throwable>> jobsMap = new LinkedHashMap<>(); 103 /** Field stop */ 104 private boolean stop; 105 /** Field flowSkipStrategy */ 106 private FlowSkipStrategy flowSkipStrategy = null; 107 /** Field maxConcurrentFlows */ 108 private int maxConcurrentFlows = 0; 109 110 /** Field tapGraph * */ 111 private transient TapGraph tapGraph; 112 113 static int getMaxConcurrentFlows( Map<Object, Object> properties, int maxConcurrentFlows ) 114 { 115 if( maxConcurrentFlows != -1 ) // CascadeDef is -1 by default 116 return maxConcurrentFlows; 117 118 return Integer.parseInt( getProperty( properties, CascadeProps.MAX_CONCURRENT_FLOWS, "0" ) ); 119 } 120 121 /** for testing */ 122 protected BaseCascade() 123 { 124 this.name = null; 125 this.tags = null; 126 this.properties = null; 127 this.flowGraph = null; 128 this.identifierGraph = null; 129 this.cascadeStats = null; 130 } 131 132 BaseCascade( CascadeDef cascadeDef, Map<Object, Object> properties, FlowGraph flowGraph, IdentifierGraph identifierGraph ) 133 { 134 this.name = cascadeDef.getName(); 135 this.tags = cascadeDef.getTags(); 136 this.properties = properties; 137 this.flowGraph = flowGraph; 138 this.identifierGraph = identifierGraph; 139 this.cascadeStats = createPrepareCascadeStats(); 140 setIDOnFlow(); 141 this.maxConcurrentFlows = cascadeDef.getMaxConcurrentFlows(); 142 143 addListeners( getAllTaps() ); 144 } 145 146 private CascadeStats createPrepareCascadeStats() 147 { 148 CascadeStats cascadeStats = new CascadeStats( this, getClientState() ); 149 150 cascadeStats.prepare(); 151 cascadeStats.markPending(); 152 153 return cascadeStats; 154 } 155 156 /** 157 * Method getName returns the name of this Cascade object. 158 * 159 * @return the name (type String) of this Cascade object. 160 */ 161 @Override 162 public String getName() 163 { 164 return name; 165 } 166 167 /** 168 * Method getID returns the ID of this Cascade object. 169 * <p> 170 * The ID value is a long HEX String used to identify this instance globally. Subsequent Cascade 171 * instances created with identical parameters will not return the same ID. 172 * 173 * @return the ID (type String) of this Cascade object. 174 */ 175 @Override 176 public String getID() 177 { 178 if( id == null ) 179 id = Util.createUniqueID(); 180 181 return id; 182 } 183 184 /** 185 * Method getTags returns the tags associated with this Cascade object. 186 * 187 * @return the tags (type String) of this Cascade object. 188 */ 189 @Override 190 public String getTags() 191 { 192 return tags; 193 } 194 195 void addListeners( Collection listeners ) 196 { 197 for( Object listener : listeners ) 198 { 199 if( listener instanceof CascadeListener ) 200 addListener( (CascadeListener) listener ); 201 } 202 } 203 204 List<SafeCascadeListener> getListeners() 205 { 206 if( listeners == null ) 207 listeners = new LinkedList<SafeCascadeListener>(); 208 209 return listeners; 210 } 211 212 @Override 213 public boolean hasListeners() 214 { 215 return listeners != null && !listeners.isEmpty(); 216 } 217 218 @Override 219 public void addListener( CascadeListener cascadeListener ) 220 { 221 getListeners().add( new SafeCascadeListener( cascadeListener ) ); 222 } 223 224 @Override 225 public boolean removeListener( CascadeListener flowListener ) 226 { 227 return getListeners().remove( new SafeCascadeListener( flowListener ) ); 228 } 229 230 private void fireOnCompleted() 231 { 232 if( hasListeners() ) 233 { 234 if( isDebugEnabled() ) 235 logDebug( "firing onCompleted event: " + getListeners().size() ); 236 237 for( CascadeListener cascadeListener : getListeners() ) 238 cascadeListener.onCompleted( this ); 239 } 240 } 241 242 private void fireOnThrowable() 243 { 244 if( hasListeners() ) 245 { 246 if( isDebugEnabled() ) 247 logDebug( "firing onThrowable event: " + getListeners().size() ); 248 249 boolean isHandled = false; 250 251 for( CascadeListener cascadeListener : getListeners() ) 252 isHandled = cascadeListener.onThrowable( this, throwable ) || isHandled; 253 254 if( isHandled ) 255 throwable = null; 256 } 257 } 258 259 protected void fireOnStopping() 260 { 261 if( hasListeners() ) 262 { 263 if( isDebugEnabled() ) 264 logDebug( "firing onStopping event: " + getListeners().size() ); 265 266 for( CascadeListener cascadeListener : getListeners() ) 267 cascadeListener.onStopping( this ); 268 } 269 } 270 271 protected void fireOnStarting() 272 { 273 if( hasListeners() ) 274 { 275 if( isDebugEnabled() ) 276 logDebug( "firing onStarting event: " + getListeners().size() ); 277 278 for( CascadeListener cascadeListener : getListeners() ) 279 cascadeListener.onStarting( this ); 280 } 281 } 282 283 private CascadingServices getCascadingServices() 284 { 285 if( cascadingServices == null ) 286 cascadingServices = new CascadingServices( properties ); 287 288 return cascadingServices; 289 } 290 291 private ClientState getClientState() 292 { 293 return getCascadingServices().createClientState( getID() ); 294 } 295 296 @Override 297 public CascadeStats getCascadeStats() 298 { 299 return cascadeStats; 300 } 301 302 @Override 303 public CascadeStats getStats() 304 { 305 return getCascadeStats(); 306 } 307 308 private void setIDOnFlow() 309 { 310 for( Flow<?> flow : getFlows() ) 311 ( (BaseFlow<?>) flow ).setCascade( this ); 312 } 313 314 protected FlowGraph getFlowGraph() 315 { 316 return flowGraph; 317 } 318 319 protected IdentifierGraph getIdentifierGraph() 320 { 321 return identifierGraph; 322 } 323 324 @Override 325 public List<Flow> getFlows() 326 { 327 List<Flow> flows = new LinkedList<Flow>(); 328 TopologicalOrderIterator<Flow, Integer> topoIterator = flowGraph.getTopologicalIterator(); 329 330 while( topoIterator.hasNext() ) 331 flows.add( topoIterator.next() ); 332 333 return flows; 334 } 335 336 @Override 337 public List<Flow> findFlows( String regex ) 338 { 339 List<Flow> flows = new ArrayList<Flow>(); 340 341 for( Flow flow : getFlows() ) 342 { 343 if( flow.getName().matches( regex ) ) 344 flows.add( flow ); 345 } 346 347 return flows; 348 } 349 350 @Override 351 public Collection<Flow> getHeadFlows() 352 { 353 Set<Flow> flows = new HashSet<Flow>(); 354 355 for( Flow flow : flowGraph.vertexSet() ) 356 { 357 if( flowGraph.inDegreeOf( flow ) == 0 ) 358 flows.add( flow ); 359 } 360 361 return flows; 362 } 363 364 @Override 365 public Collection<Flow> getTailFlows() 366 { 367 Set<Flow> flows = new HashSet<Flow>(); 368 369 for( Flow flow : flowGraph.vertexSet() ) 370 { 371 if( flowGraph.outDegreeOf( flow ) == 0 ) 372 flows.add( flow ); 373 } 374 375 return flows; 376 } 377 378 @Override 379 public Collection<Flow> getIntermediateFlows() 380 { 381 Set<Flow> flows = new HashSet<Flow>( flowGraph.vertexSet() ); 382 383 flows.removeAll( getHeadFlows() ); 384 flows.removeAll( getTailFlows() ); 385 386 return flows; 387 } 388 389 protected TapGraph getTapGraph() 390 { 391 if( tapGraph == null ) 392 tapGraph = new TapGraph( flowGraph.vertexSet() ); 393 394 return tapGraph; 395 } 396 397 @Override 398 public Collection<Tap> getSourceTaps() 399 { 400 TapGraph tapGraph = getTapGraph(); 401 Set<Tap> taps = new HashSet<Tap>(); 402 403 for( Tap tap : tapGraph.vertexSet() ) 404 { 405 if( tapGraph.inDegreeOf( tap ) == 0 ) 406 taps.add( tap ); 407 } 408 409 return taps; 410 } 411 412 @Override 413 public Collection<Tap> getSinkTaps() 414 { 415 TapGraph tapGraph = getTapGraph(); 416 Set<Tap> taps = new HashSet<Tap>(); 417 418 for( Tap tap : tapGraph.vertexSet() ) 419 { 420 if( tapGraph.outDegreeOf( tap ) == 0 ) 421 taps.add( tap ); 422 } 423 424 return taps; 425 } 426 427 @Override 428 public Collection<Tap> getCheckpointsTaps() 429 { 430 Set<Tap> taps = new HashSet<Tap>(); 431 432 for( Flow flow : getFlows() ) 433 taps.addAll( flow.getCheckpointsCollection() ); 434 435 return taps; 436 } 437 438 @Override 439 public Collection<Tap> getIntermediateTaps() 440 { 441 TapGraph tapGraph = getTapGraph(); 442 Set<Tap> taps = new HashSet<Tap>( tapGraph.vertexSet() ); 443 444 taps.removeAll( getSourceTaps() ); 445 taps.removeAll( getSinkTaps() ); 446 447 return taps; 448 } 449 450 @Override 451 public Collection<Tap> getAllTaps() 452 { 453 return new HashSet<Tap>( getTapGraph().vertexSet() ); 454 } 455 456 @Override 457 public Collection<Flow> getSuccessorFlows( Flow flow ) 458 { 459 return Graphs.successorListOf( flowGraph, flow ); 460 } 461 462 @Override 463 public Collection<Flow> getPredecessorFlows( Flow flow ) 464 { 465 return Graphs.predecessorListOf( flowGraph, flow ); 466 } 467 468 @Override 469 public Collection<Flow> findFlowsSourcingFrom( String identifier ) 470 { 471 try 472 { 473 return unwrapFlows( identifierGraph.outgoingEdgesOf( identifier ) ); 474 } 475 catch( Exception exception ) 476 { 477 return Collections.emptySet(); 478 } 479 } 480 481 @Override 482 public Collection<Flow> findFlowsSinkingTo( String identifier ) 483 { 484 try 485 { 486 return unwrapFlows( identifierGraph.incomingEdgesOf( identifier ) ); 487 } 488 catch( Exception exception ) 489 { 490 return Collections.emptySet(); 491 } 492 } 493 494 private Collection<Flow> unwrapFlows( Set<BaseFlow.FlowHolder> flowHolders ) 495 { 496 Set<Flow> flows = new HashSet<Flow>(); 497 498 for( BaseFlow.FlowHolder flowHolder : flowHolders ) 499 flows.add( flowHolder.flow ); 500 501 return flows; 502 } 503 504 @Override 505 public FlowSkipStrategy getFlowSkipStrategy() 506 { 507 return flowSkipStrategy; 508 } 509 510 @Override 511 public FlowSkipStrategy setFlowSkipStrategy( FlowSkipStrategy flowSkipStrategy ) 512 { 513 try 514 { 515 return this.flowSkipStrategy; 516 } 517 finally 518 { 519 this.flowSkipStrategy = flowSkipStrategy; 520 } 521 } 522 523 @Override 524 public void prepare() 525 { 526 } 527 528 @Override 529 public void start() 530 { 531 if( thread != null ) 532 return; 533 534 thread = new Thread( new Runnable() 535 { 536 @Override 537 public void run() 538 { 539 BaseCascade.this.run(); 540 } 541 }, ( "cascade " + Util.toNull( getName() ) ).trim() ); 542 543 thread.start(); 544 } 545 546 @Override 547 public void complete() 548 { 549 start(); 550 551 try 552 { 553 try 554 { 555 thread.join(); 556 } 557 catch( InterruptedException exception ) 558 { 559 throw new FlowException( "thread interrupted", exception ); 560 } 561 562 if( throwable instanceof CascadingException ) 563 throw (CascadingException) throwable; 564 565 if( throwable != null ) 566 throw new CascadeException( "unhandled exception", throwable ); 567 } 568 finally 569 { 570 thread = null; 571 throwable = null; 572 shutdownHook = null; 573 cascadeStats.cleanup(); 574 } 575 } 576 577 @Override 578 public synchronized void stop() 579 { 580 if( stop ) 581 return; 582 583 stop = true; 584 585 fireOnStopping(); 586 587 if( !cascadeStats.isFinished() ) 588 cascadeStats.markStopped(); 589 590 internalStopAllFlows(); 591 handleExecutorShutdown(); 592 593 cascadeStats.cleanup(); 594 } 595 596 @Override 597 public void cleanup() 598 { 599 } 600 601 /** Method run implements the Runnable run method. */ 602 private void run() 603 { 604 Version.printBanner(); 605 606 if( LOG.isInfoEnabled() ) 607 logInfo( "starting" ); 608 609 registerShutdownHook(); 610 611 try 612 { 613 synchronized( this ) // prevent race on stopping 614 { 615 if( stop ) 616 return; 617 618 // mark started, not submitted 619 cascadeStats.markStartedThenRunning(); 620 } 621 622 fireOnStarting(); 623 624 initializeNewJobsMap(); 625 626 int numThreads = getMaxConcurrentFlows( properties, maxConcurrentFlows ); 627 628 if( numThreads == 0 ) 629 numThreads = jobsMap.size(); 630 631 int numLocalFlows = numLocalFlows(); 632 633 boolean runFlowsLocal = numLocalFlows > 1; 634 635 if( runFlowsLocal ) 636 numThreads = 1; 637 638 if( isInfoEnabled() ) 639 { 640 logInfo( " parallel execution of flows is enabled: " + ( numThreads != 1 ) ); 641 logInfo( " executing total flows: " + jobsMap.size() ); 642 logInfo( " allocating management threads: " + numThreads ); 643 } 644 645 List<Future<Throwable>> futures = spawnStrategy.start( this, numThreads, jobsMap.values() ); 646 647 for( Future<Throwable> future : futures ) 648 { 649 throwable = future.get(); 650 651 if( throwable != null ) 652 { 653 if( !stop ) 654 { 655 if( !cascadeStats.isFinished() ) 656 cascadeStats.markFailed( throwable ); 657 internalStopAllFlows(); 658 fireOnThrowable(); 659 } 660 661 handleExecutorShutdown(); 662 break; 663 } 664 } 665 } 666 catch( Throwable throwable ) 667 { 668 this.throwable = throwable; 669 } 670 finally 671 { 672 if( !cascadeStats.isFinished() ) 673 cascadeStats.markSuccessful(); 674 675 try 676 { 677 fireOnCompleted(); 678 } 679 finally 680 { 681 deregisterShutdownHook(); 682 } 683 } 684 } 685 686 private void registerShutdownHook() 687 { 688 if( !isStopJobsOnExit() ) 689 return; 690 691 shutdownHook = new ShutdownUtil.Hook() 692 { 693 @Override 694 public Priority priority() 695 { 696 return Priority.WORK_PARENT; 697 } 698 699 @Override 700 public void execute() 701 { 702 logInfo( "shutdown hook calling stop on cascade" ); 703 704 BaseCascade.this.stop(); 705 } 706 }; 707 708 ShutdownUtil.addHook( shutdownHook ); 709 } 710 711 private void deregisterShutdownHook() 712 { 713 if( !isStopJobsOnExit() || stop ) 714 return; 715 716 ShutdownUtil.removeHook( shutdownHook ); 717 } 718 719 private boolean isStopJobsOnExit() 720 { 721 if( getFlows().isEmpty() ) 722 return false; // don't bother registering hook 723 724 return getFlows().get( 0 ).isStopJobsOnExit(); 725 } 726 727 /** 728 * If the number of flows that are local is greater than one, force the Cascade to run without parallelization. 729 * 730 * @return of type int 731 */ 732 private int numLocalFlows() 733 { 734 int countLocalJobs = 0; 735 736 for( Flow flow : getFlows() ) 737 { 738 if( flow.stepsAreLocal() ) 739 countLocalJobs++; 740 } 741 742 return countLocalJobs; 743 } 744 745 private void initializeNewJobsMap() 746 { 747 synchronized( jobsMap ) 748 { 749 // keep topo order 750 TopologicalOrderIterator<Flow, Integer> topoIterator = flowGraph.getTopologicalIterator(); 751 752 while( topoIterator.hasNext() ) 753 { 754 Flow flow = topoIterator.next(); 755 756 cascadeStats.addFlowStats( flow.getFlowStats() ); 757 758 CascadeJob job = new CascadeJob( flow ); 759 760 jobsMap.put( flow.getName(), job ); 761 762 List<CascadeJob> predecessors = new ArrayList<CascadeJob>(); 763 764 for( Flow predecessor : Graphs.predecessorListOf( flowGraph, flow ) ) 765 predecessors.add( (CascadeJob) jobsMap.get( predecessor.getName() ) ); 766 767 job.init( predecessors ); 768 } 769 } 770 } 771 772 private void handleExecutorShutdown() 773 { 774 if( spawnStrategy.isCompleted( this ) ) 775 return; 776 777 logInfo( "shutting down flow executor" ); 778 779 try 780 { 781 spawnStrategy.complete( this, 5 * 60, TimeUnit.SECONDS ); 782 } 783 catch( InterruptedException exception ) 784 { 785 // ignore 786 } 787 788 logInfo( "shutdown complete" ); 789 } 790 791 private void internalStopAllFlows() 792 { 793 logInfo( "stopping all flows" ); 794 795 synchronized( jobsMap ) 796 { 797 List<Callable<Throwable>> jobs = new ArrayList<Callable<Throwable>>( jobsMap.values() ); 798 799 Collections.reverse( jobs ); 800 801 for( Callable<Throwable> callable : jobs ) 802 ( (CascadeJob) callable ).stop(); 803 } 804 805 logInfo( "stopped all flows" ); 806 } 807 808 @Override 809 public void writeDOT( String filename ) 810 { 811 printElementGraph( filename, identifierGraph ); 812 } 813 814 protected void printElementGraph( String filename, SimpleDirectedGraph<String, BaseFlow.FlowHolder> graph ) 815 { 816 try 817 { 818 Writer writer = new FileWriter( filename ); 819 820 Util.writeDOT( writer, graph, new IntegerNameProvider<String>(), new VertexNameProvider<String>() 821 { 822 public String getVertexName( String object ) 823 { 824 return object.toString().replaceAll( "\"", "\'" ); 825 } 826 }, new EdgeNameProvider<BaseFlow.FlowHolder>() 827 { 828 public String getEdgeName( BaseFlow.FlowHolder object ) 829 { 830 return object.flow.getName().replaceAll( "\"", "\'" ).replaceAll( "\n", "\\\\n" ); // fix for newlines in graphviz 831 } 832 } 833 ); 834 835 writer.close(); 836 } 837 catch( IOException exception ) 838 { 839 logError( "failed printing graph to: {}, with exception: {}", filename, exception ); 840 } 841 } 842 843 @Override 844 public String toString() 845 { 846 return getName(); 847 } 848 849 @Override 850 public boolean isInfoEnabled() 851 { 852 return LOG.isInfoEnabled(); 853 } 854 855 @Override 856 public boolean isDebugEnabled() 857 { 858 return LOG.isDebugEnabled(); 859 } 860 861 @Override 862 public void logInfo( String message, Object... arguments ) 863 { 864 LOG.info( "[" + Util.truncate( getName(), 25 ) + "] " + message, arguments ); 865 } 866 867 @Override 868 public void logDebug( String message, Object... arguments ) 869 { 870 LOG.debug( "[" + Util.truncate( getName(), 25 ) + "] " + message, arguments ); 871 } 872 873 @Override 874 public void logWarn( String message ) 875 { 876 LOG.warn( "[" + Util.truncate( getName(), 25 ) + "] " + message ); 877 } 878 879 @Override 880 public void logWarn( String message, Throwable throwable ) 881 { 882 LOG.warn( "[" + Util.truncate( getName(), 25 ) + "] " + message, throwable ); 883 } 884 885 @Override 886 public void logWarn( String message, Object... arguments ) 887 { 888 LOG.warn( "[" + Util.truncate( getName(), 25 ) + "] " + message, arguments ); 889 } 890 891 @Override 892 public void logError( String message, Object... arguments ) 893 { 894 LOG.warn( "[" + Util.truncate( getName(), 25 ) + "] " + message, arguments ); 895 } 896 897 @Override 898 public void logError( String message, Throwable throwable ) 899 { 900 LOG.warn( "[" + Util.truncate( getName(), 25 ) + "] " + message, throwable ); 901 } 902 903 /** Class CascadeJob manages Flow execution in the current Cascade instance. */ 904 protected class CascadeJob implements Callable<Throwable> 905 { 906 /** Field flow */ 907 final Flow flow; 908 /** Field predecessors */ 909 private List<CascadeJob> predecessors; 910 /** Field latch */ 911 private final CountDownLatch latch = new CountDownLatch( 1 ); 912 /** Field stop */ 913 private boolean stop = false; 914 /** Field failed */ 915 private boolean failed = false; 916 917 public CascadeJob( Flow flow ) 918 { 919 this.flow = flow; 920 } 921 922 public String getName() 923 { 924 return flow.getName(); 925 } 926 927 public Throwable call() 928 { 929 try 930 { 931 for( CascadeJob predecessor : predecessors ) 932 { 933 if( !predecessor.isSuccessful() ) 934 return null; 935 } 936 937 if( stop || cascadeStats.isFinished() ) 938 return null; 939 940 try 941 { 942 if( LOG.isInfoEnabled() ) 943 logInfo( "starting flow: " + flow.getName() ); 944 945 if( flowSkipStrategy == null ? flow.isSkipFlow() : flowSkipStrategy.skipFlow( flow ) ) 946 { 947 if( LOG.isInfoEnabled() ) 948 logInfo( "skipping flow: " + flow.getName() ); 949 950 flow.getFlowStats().markSkipped(); 951 Flows.fireOnCompleted( flow ); 952 953 return null; 954 } 955 956 flow.prepare(); // do not delete append/update mode taps 957 flow.complete(); 958 959 if( LOG.isInfoEnabled() ) 960 logInfo( "completed flow: " + flow.getName() ); 961 } 962 catch( Throwable exception ) 963 { 964 failed = true; 965 logWarn( "flow failed: " + flow.getName(), exception ); 966 967 CascadeException cascadeException = new CascadeException( "flow failed: " + flow.getName(), exception ); 968 969 if( !cascadeStats.isFinished() ) 970 cascadeStats.markFailed( cascadeException ); 971 972 return cascadeException; 973 } 974 finally 975 { 976 flow.cleanup(); 977 } 978 } 979 catch( Throwable throwable ) 980 { 981 failed = true; 982 return throwable; 983 } 984 finally 985 { 986 latch.countDown(); 987 } 988 989 return null; 990 } 991 992 public void init( List<CascadeJob> predecessors ) 993 { 994 this.predecessors = predecessors; 995 } 996 997 public void stop() 998 { 999 if( LOG.isInfoEnabled() ) 1000 logInfo( "stopping flow: " + flow.getName() ); 1001 1002 stop = true; 1003 1004 if( flow != null ) 1005 flow.stop(); 1006 } 1007 1008 public boolean isSuccessful() 1009 { 1010 try 1011 { 1012 latch.await(); 1013 1014 return flow != null && !failed && !stop; 1015 } 1016 catch( InterruptedException exception ) 1017 { 1018 logWarn( "latch interrupted", exception ); 1019 } 1020 1021 return false; 1022 } 1023 } 1024 1025 @Override 1026 public UnitOfWorkSpawnStrategy getSpawnStrategy() 1027 { 1028 return spawnStrategy; 1029 } 1030 1031 @Override 1032 public void setSpawnStrategy( UnitOfWorkSpawnStrategy spawnStrategy ) 1033 { 1034 this.spawnStrategy = spawnStrategy; 1035 } 1036 1037 /** 1038 * Class SafeCascadeListener safely calls a wrapped CascadeListener. 1039 * <p> 1040 * This is done for a few reasons, the primary reason is so exceptions thrown by the Listener 1041 * can be caught by the calling Thread. Since Cascade is asynchronous, much of the work is done in the run() method 1042 * which in turn is run in a new Thread. 1043 */ 1044 private class SafeCascadeListener implements CascadeListener 1045 { 1046 /** Field flowListener */ 1047 final CascadeListener cascadeListener; 1048 /** Field throwable */ 1049 Throwable throwable; 1050 1051 private SafeCascadeListener( CascadeListener cascadeListener ) 1052 { 1053 this.cascadeListener = cascadeListener; 1054 } 1055 1056 public void onStarting( Cascade cascade ) 1057 { 1058 try 1059 { 1060 cascadeListener.onStarting( cascade ); 1061 } 1062 catch( Throwable throwable ) 1063 { 1064 handleThrowable( throwable ); 1065 } 1066 } 1067 1068 public void onStopping( Cascade cascade ) 1069 { 1070 try 1071 { 1072 cascadeListener.onStopping( cascade ); 1073 } 1074 catch( Throwable throwable ) 1075 { 1076 handleThrowable( throwable ); 1077 } 1078 } 1079 1080 public void onCompleted( Cascade cascade ) 1081 { 1082 try 1083 { 1084 cascadeListener.onCompleted( cascade ); 1085 } 1086 catch( Throwable throwable ) 1087 { 1088 handleThrowable( throwable ); 1089 } 1090 } 1091 1092 public boolean onThrowable( Cascade cascade, Throwable flowThrowable ) 1093 { 1094 try 1095 { 1096 return cascadeListener.onThrowable( cascade, flowThrowable ); 1097 } 1098 catch( Throwable throwable ) 1099 { 1100 handleThrowable( throwable ); 1101 } 1102 1103 return false; 1104 } 1105 1106 private void handleThrowable( Throwable throwable ) 1107 { 1108 this.throwable = throwable; 1109 1110 logWarn( String.format( "cascade listener %s threw throwable", cascadeListener ), throwable ); 1111 1112 // stop this flow 1113 stop(); 1114 } 1115 1116 public boolean equals( Object object ) 1117 { 1118 if( object instanceof SafeCascadeListener ) 1119 return cascadeListener.equals( ( (SafeCascadeListener) object ).cascadeListener ); 1120 1121 return cascadeListener.equals( object ); 1122 } 1123 1124 public int hashCode() 1125 { 1126 return cascadeListener.hashCode(); 1127 } 1128 } 1129 }