001/* 002 * Copyright (c) 2016-2017 Chris K Wensel <chris@wensel.net>. All Rights Reserved. 003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 004 * 005 * Project and contact information: http://www.cascading.org/ 006 * 007 * This file is part of the Cascading project. 008 * 009 * Licensed under the Apache License, Version 2.0 (the "License"); 010 * you may not use this file except in compliance with the License. 011 * You may obtain a copy of the License at 012 * 013 * http://www.apache.org/licenses/LICENSE-2.0 014 * 015 * Unless required by applicable law or agreed to in writing, software 016 * distributed under the License is distributed on an "AS IS" BASIS, 017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 018 * See the License for the specific language governing permissions and 019 * limitations under the License. 020 */ 021 022package cascading.flow; 023 024import java.io.IOException; 025import java.util.ArrayList; 026import java.util.Collection; 027import java.util.Collections; 028import java.util.Date; 029import java.util.HashMap; 030import java.util.HashSet; 031import java.util.Iterator; 032import java.util.LinkedHashMap; 033import java.util.LinkedList; 034import java.util.List; 035import java.util.Map; 036import java.util.Properties; 037import java.util.Set; 038import java.util.concurrent.Callable; 039import java.util.concurrent.ExecutionException; 040import java.util.concurrent.Future; 041import java.util.concurrent.TimeUnit; 042import java.util.concurrent.locks.ReentrantLock; 043 044import cascading.CascadingException; 045import cascading.cascade.Cascade; 046import cascading.flow.planner.BaseFlowNode; 047import cascading.flow.planner.BaseFlowStep; 048import cascading.flow.planner.FlowStepJob; 049import cascading.flow.planner.PlannerInfo; 050import cascading.flow.planner.PlatformInfo; 051import cascading.flow.planner.graph.ElementGraphs; 052import cascading.flow.planner.graph.FlowElementGraph; 053import cascading.flow.planner.process.FlowStepGraph; 054import cascading.flow.planner.process.ProcessGraphs; 055import cascading.management.CascadingServices; 056import cascading.management.UnitOfWorkExecutorStrategy; 057import cascading.management.UnitOfWorkSpawnStrategy; 058import cascading.management.state.ClientState; 059import cascading.property.AppProps; 060import cascading.property.PropertyUtil; 061import cascading.stats.FlowStats; 062import cascading.tap.Tap; 063import cascading.tuple.Fields; 064import cascading.tuple.TupleEntryCollector; 065import cascading.tuple.TupleEntryIterator; 066import cascading.util.ProcessLogger; 067import cascading.util.ShutdownUtil; 068import cascading.util.Update; 069import cascading.util.Util; 070import cascading.util.Version; 071import org.slf4j.Logger; 072import org.slf4j.LoggerFactory; 073import riffle.process.DependencyIncoming; 074import riffle.process.DependencyOutgoing; 075import riffle.process.ProcessCleanup; 076import riffle.process.ProcessComplete; 077import riffle.process.ProcessPrepare; 078import riffle.process.ProcessStart; 079import riffle.process.ProcessStop; 080 081import static cascading.util.Util.formatDurationFromMillis; 082 083@riffle.process.Process 084public abstract class BaseFlow<Config> implements Flow<Config>, ProcessLogger 085 { 086 /** Field LOG */ 087 private static final Logger LOG = LoggerFactory.getLogger( Flow.class ); // wrapped by ProcessLogger interface methods 088 private static final int LOG_FLOW_NAME_MAX = 25; 089 090 private PlannerInfo plannerInfo = PlannerInfo.NULL; 091 protected PlatformInfo platformInfo = PlatformInfo.NULL; 092 093 /** Field id */ 094 private String id; 095 /** Field name */ 096 private String name; 097 /** Fields runID */ 098 private String runID; 099 /** Fields classpath */ 100 private List<String> classPath; // may remain null 101 /** Field tags */ 102 private String tags; 103 /** Field listeners */ 104 private List<SafeFlowListener> listeners; 105 /** Field skipStrategy */ 106 private FlowSkipStrategy flowSkipStrategy = new FlowSkipIfSinkNotStale(); 107 /** Field flowStats */ 108 protected FlowStats flowStats; // don't use a listener to set values 109 /** Field sources */ 110 protected Map<String, Tap> sources = Collections.emptyMap(); 111 /** Field sinks */ 112 protected Map<String, Tap> sinks = Collections.emptyMap(); 113 /** Field traps */ 114 private Map<String, Tap> traps = Collections.emptyMap(); 115 /** Field checkpoints */ 116 private Map<String, Tap> checkpoints = Collections.emptyMap(); 117 /** Field stopJobsOnExit */ 118 protected boolean stopJobsOnExit = true; 119 /** Field submitPriority */ 120 private int submitPriority = 5; 121 122 /** Field stepGraph */ 123 protected FlowStepGraph flowStepGraph; 124 /** Field thread */ 125 protected transient Thread thread; 126 /** Field throwable */ 127 protected Throwable throwable; 128 /** Field stop */ 129 protected volatile boolean stop; 130 /** Field completed */ 131 protected volatile boolean completed = false; 132 133 /** Field flowCanonicalHash */ 134 protected String flowCanonicalHash; 135 /** Field flowElementGraph */ 136 protected FlowElementGraph flowElementGraph; // only used for documentation purposes 137 138 private transient CascadingServices cascadingServices; 139 140 private FlowStepStrategy<Config> flowStepStrategy = null; 141 /** Field steps */ 142 protected transient List<FlowStep<Config>> steps; 143 /** Field jobsMap */ 144 private transient Map<String, FlowStepJob<Config>> jobsMap; 145 private transient UnitOfWorkSpawnStrategy spawnStrategy = new UnitOfWorkExecutorStrategy(); 146 147 private transient ReentrantLock stopLock = new ReentrantLock( true ); 148 protected ShutdownUtil.Hook shutdownHook; 149 150 protected HashMap<String, String> flowDescriptor; 151 152 /** 153 * Returns property stopJobsOnExit. 154 * 155 * @param properties of type Map 156 * @return a boolean 157 */ 158 static boolean getStopJobsOnExit( Map<Object, Object> properties ) 159 { 160 return Boolean.parseBoolean( PropertyUtil.getProperty( properties, FlowProps.STOP_JOBS_ON_EXIT, "true" ) ); 161 } 162 163 /** Used for testing. */ 164 protected BaseFlow() 165 { 166 this.name = "NA"; 167 this.flowStats = createPrepareFlowStats(); 168 } 169 170 /** 171 * Does not initialize stats 172 * 173 * @param name 174 */ 175 protected BaseFlow( PlatformInfo platformInfo, String name ) 176 { 177 if( platformInfo != null ) 178 this.platformInfo = platformInfo; 179 180 this.name = name; 181 } 182 183 protected BaseFlow( PlatformInfo platformInfo, Map<Object, Object> properties, Config defaultConfig, String name, Map<String, String> flowDescriptor ) 184 { 185 if( platformInfo != null ) 186 this.platformInfo = platformInfo; 187 188 this.name = name; 189 190 if( flowDescriptor != null ) 191 this.flowDescriptor = new LinkedHashMap<>( flowDescriptor ); 192 193 addSessionProperties( properties ); 194 initConfig( properties, defaultConfig ); 195 } 196 197 protected BaseFlow( PlatformInfo platformInfo, Map<Object, Object> properties, Config defaultConfig, FlowDef flowDef ) 198 { 199 properties = PropertyUtil.asFlatMap( properties ); 200 201 if( platformInfo != null ) 202 this.platformInfo = platformInfo; 203 204 this.name = flowDef.getName(); 205 this.tags = flowDef.getTags(); 206 this.runID = flowDef.getRunID(); 207 this.classPath = flowDef.getClassPath(); 208 209 if( !flowDef.getFlowDescriptor().isEmpty() ) 210 this.flowDescriptor = new LinkedHashMap<>( flowDef.getFlowDescriptor() ); 211 212 addSessionProperties( properties ); 213 initConfig( properties, defaultConfig ); 214 setSources( flowDef.getSourcesCopy() ); 215 setSinks( flowDef.getSinksCopy() ); 216 setTraps( flowDef.getTrapsCopy() ); 217 setCheckpoints( flowDef.getCheckpointsCopy() ); 218 initFromTaps(); 219 220 retrieveSourceFields(); 221 retrieveSinkFields(); 222 } 223 224 public void setPlannerInfo( PlannerInfo plannerInfo ) 225 { 226 this.plannerInfo = plannerInfo; 227 } 228 229 @Override 230 public PlannerInfo getPlannerInfo() 231 { 232 return plannerInfo; 233 } 234 235 @Override 236 public PlatformInfo getPlatformInfo() 237 { 238 return platformInfo; 239 } 240 241 public void initialize( FlowElementGraph flowElementGraph, FlowStepGraph flowStepGraph ) 242 { 243 addPlannerProperties(); 244 this.flowElementGraph = flowElementGraph; 245 this.flowStepGraph = flowStepGraph; 246 247 initSteps(); 248 249 this.flowStats = createPrepareFlowStats(); 250 251 initializeNewJobsMap(); 252 253 initializeChildStats(); 254 } 255 256 public FlowElementGraph updateSchemes( FlowElementGraph pipeGraph ) 257 { 258 presentSourceFields( pipeGraph ); 259 260 presentSinkFields( pipeGraph ); 261 262 return new FlowElementGraph( pipeGraph ); 263 } 264 265 /** Force a Scheme to fetch any fields from a meta-data store */ 266 protected void retrieveSourceFields() 267 { 268 for( Tap tap : sources.values() ) 269 tap.retrieveSourceFields( getFlowProcess() ); 270 } 271 272 /** 273 * Present the current resolved fields for the Tap 274 * 275 * @param pipeGraph 276 */ 277 protected void presentSourceFields( FlowElementGraph pipeGraph ) 278 { 279 for( Tap tap : sources.values() ) 280 { 281 if( pipeGraph.containsVertex( tap ) ) 282 tap.presentSourceFields( getFlowProcess(), getFieldsFor( pipeGraph, tap ) ); 283 } 284 285 for( Tap tap : checkpoints.values() ) 286 { 287 if( pipeGraph.containsVertex( tap ) ) 288 tap.presentSourceFields( getFlowProcess(), getFieldsFor( pipeGraph, tap ) ); 289 } 290 } 291 292 /** Force a Scheme to fetch any fields from a meta-data store */ 293 protected void retrieveSinkFields() 294 { 295 for( Tap tap : sinks.values() ) 296 tap.retrieveSinkFields( getFlowProcess() ); 297 } 298 299 /** 300 * Present the current resolved fields for the Tap 301 * 302 * @param pipeGraph 303 */ 304 protected void presentSinkFields( FlowElementGraph pipeGraph ) 305 { 306 for( Tap tap : sinks.values() ) 307 { 308 if( pipeGraph.containsVertex( tap ) ) 309 tap.presentSinkFields( getFlowProcess(), getFieldsFor( pipeGraph, tap ) ); 310 } 311 312 for( Tap tap : checkpoints.values() ) 313 { 314 if( pipeGraph.containsVertex( tap ) ) 315 tap.presentSinkFields( getFlowProcess(), getFieldsFor( pipeGraph, tap ) ); 316 } 317 } 318 319 protected Fields getFieldsFor( FlowElementGraph pipeGraph, Tap tap ) 320 { 321 return pipeGraph.outgoingEdgesOf( tap ).iterator().next().getOutValuesFields(); 322 } 323 324 protected void addSessionProperties( Map<Object, Object> properties ) 325 { 326 if( properties == null ) 327 return; 328 329 PropertyUtil.setProperty( properties, CASCADING_FLOW_ID, getID() ); 330 PropertyUtil.setProperty( properties, "cascading.flow.tags", getTags() ); 331 AppProps.setApplicationID( properties ); 332 PropertyUtil.setProperty( properties, "cascading.app.name", makeAppName( properties ) ); 333 PropertyUtil.setProperty( properties, "cascading.app.version", makeAppVersion( properties ) ); 334 } 335 336 protected void addPlannerProperties() 337 { 338 setConfigProperty( getConfig(), "cascading.flow.planner", getPlannerInfo().name ); 339 setConfigProperty( getConfig(), "cascading.flow.platform", getPlannerInfo().platform ); 340 setConfigProperty( getConfig(), "cascading.flow.registry", getPlannerInfo().registry ); 341 } 342 343 private String makeAppName( Map<Object, Object> properties ) 344 { 345 if( properties == null ) 346 return null; 347 348 String name = AppProps.getApplicationName( properties ); 349 350 if( name != null ) 351 return name; 352 353 return Util.findName( AppProps.getApplicationJarPath( properties ) ); 354 } 355 356 private String makeAppVersion( Map<Object, Object> properties ) 357 { 358 if( properties == null ) 359 return null; 360 361 String name = AppProps.getApplicationVersion( properties ); 362 363 if( name != null ) 364 return name; 365 366 return Util.findVersion( AppProps.getApplicationJarPath( properties ) ); 367 } 368 369 protected FlowStats createPrepareFlowStats() 370 { 371 FlowStats flowStats = createFlowStats(); 372 373 flowStats.prepare(); 374 flowStats.markPending(); 375 376 return flowStats; 377 } 378 379 protected FlowStats createFlowStats() 380 { 381 return new FlowStats( this, getClientState() ); 382 } 383 384 public CascadingServices getCascadingServices() 385 { 386 if( cascadingServices == null ) 387 cascadingServices = new CascadingServices( getConfigAsProperties() ); 388 389 return cascadingServices; 390 } 391 392 protected ClientState getClientState() 393 { 394 return getFlowSession().getCascadingServices().createClientState( getID() ); 395 } 396 397 protected void initSteps() 398 { 399 if( flowStepGraph == null ) 400 return; 401 402 Set<FlowStep> flowSteps = flowStepGraph.vertexSet(); 403 404 for( FlowStep flowStep : flowSteps ) 405 { 406 ( (BaseFlowStep) flowStep ).setFlow( this ); 407 408 Set<FlowNode> flowNodes = flowStep.getFlowNodeGraph().vertexSet(); 409 410 for( FlowNode flowNode : flowNodes ) 411 ( (BaseFlowNode) flowNode ).setFlowStep( flowStep ); 412 } 413 } 414 415 private void initFromTaps() 416 { 417 initFromTaps( sources ); 418 initFromTaps( sinks ); 419 initFromTaps( traps ); 420 } 421 422 private void initFromTaps( Map<String, Tap> taps ) 423 { 424 for( Tap tap : taps.values() ) 425 tap.flowConfInit( this ); 426 } 427 428 @Override 429 public String getName() 430 { 431 return name; 432 } 433 434 protected void setName( String name ) 435 { 436 this.name = name; 437 } 438 439 @Override 440 public String getID() 441 { 442 if( id == null ) 443 id = Util.createUniqueID(); 444 445 return id; 446 } 447 448 @Override 449 public String getTags() 450 { 451 return tags; 452 } 453 454 @Override 455 public int getSubmitPriority() 456 { 457 return submitPriority; 458 } 459 460 @Override 461 public void setSubmitPriority( int submitPriority ) 462 { 463 if( submitPriority < 1 || submitPriority > 10 ) 464 throw new IllegalArgumentException( "submitPriority must be between 1 and 10 inclusive, was: " + submitPriority ); 465 466 this.submitPriority = submitPriority; 467 } 468 469 /** 470 * The hash value can be used to determine if two unique Flow instances performed the same work. 471 * <p/> 472 * The source and sink taps are not relevant to the hash. 473 * 474 * @return a String value 475 */ 476 public String getFlowCanonicalHash() 477 { 478 if( flowCanonicalHash != null || flowElementGraph == null ) 479 return flowCanonicalHash; 480 481 // synchronize on flowElementGraph to prevent duplicate hash creation - can be high overhead 482 // and to prevent deadlocks if the DocumentService calls back into the flow when transitioning 483 // into a running state 484 synchronized( flowElementGraph ) 485 { 486 flowCanonicalHash = createFlowCanonicalHash( flowElementGraph ); 487 } 488 489 return flowCanonicalHash; 490 } 491 492 // allow for sub-class overrides 493 protected String createFlowCanonicalHash( FlowElementGraph flowElementGraph ) 494 { 495 if( flowElementGraph == null ) 496 return null; 497 498 return ElementGraphs.canonicalHash( flowElementGraph ); 499 } 500 501 protected FlowElementGraph getFlowElementGraph() 502 { 503 return flowElementGraph; 504 } 505 506 protected void setFlowElementGraph( FlowElementGraph flowElementGraph ) 507 { 508 this.flowElementGraph = flowElementGraph; 509 } 510 511 protected FlowStepGraph getFlowStepGraph() 512 { 513 return flowStepGraph; 514 } 515 516 protected void setFlowStepGraph( FlowStepGraph flowStepGraph ) 517 { 518 this.flowStepGraph = flowStepGraph; 519 } 520 521 protected void setSources( Map<String, Tap> sources ) 522 { 523 if( sources == null ) 524 return; 525 526 addListeners( sources.values() ); 527 this.sources = sources; 528 } 529 530 protected void setSinks( Map<String, Tap> sinks ) 531 { 532 if( sinks == null ) 533 return; 534 535 addListeners( sinks.values() ); 536 this.sinks = sinks; 537 } 538 539 protected void setTraps( Map<String, Tap> traps ) 540 { 541 addListeners( traps.values() ); 542 this.traps = traps; 543 } 544 545 protected void setCheckpoints( Map<String, Tap> checkpoints ) 546 { 547 addListeners( checkpoints.values() ); 548 this.checkpoints = checkpoints; 549 } 550 551 /** 552 * This method creates a new internal Config with the parentConfig as defaults using the properties to override 553 * the defaults. 554 * 555 * @param properties of type Map 556 * @param parentConfig of type Config 557 */ 558 protected abstract void initConfig( Map<Object, Object> properties, Config parentConfig ); 559 560 public Config createConfig( Map<Object, Object> properties, Config defaultConfig ) 561 { 562 Config config = newConfig( defaultConfig ); 563 564 if( properties == null ) 565 return config; 566 567 Set<Object> keys = new HashSet<>( properties.keySet() ); 568 569 // keys will only be grabbed if both key/value are String, so keep orig keys 570 if( properties instanceof Properties ) 571 keys.addAll( ( (Properties) properties ).stringPropertyNames() ); 572 573 for( Object key : keys ) 574 { 575 Object value = properties.get( key ); 576 577 if( value == null && properties instanceof Properties && key instanceof String ) 578 value = ( (Properties) properties ).getProperty( (String) key ); 579 580 if( value == null ) // don't stuff null values 581 continue; 582 583 setConfigProperty( config, key, value ); 584 } 585 586 return config; 587 } 588 589 protected abstract void setConfigProperty( Config config, Object key, Object value ); 590 591 protected abstract Config newConfig( Config defaultConfig ); 592 593 protected void initFromProperties( Map<Object, Object> properties ) 594 { 595 stopJobsOnExit = getStopJobsOnExit( properties ); 596 } 597 598 public FlowSession getFlowSession() 599 { 600 return new FlowSession( getCascadingServices() ); 601 } 602 603 @Override 604 public FlowStats getFlowStats() 605 { 606 return flowStats; 607 } 608 609 @Override 610 public Map<String, String> getFlowDescriptor() 611 { 612 if( flowDescriptor == null ) 613 return Collections.emptyMap(); 614 615 return Collections.unmodifiableMap( flowDescriptor ); 616 } 617 618 @Override 619 public FlowStats getStats() 620 { 621 return getFlowStats(); 622 } 623 624 void addListeners( Collection listeners ) 625 { 626 for( Object listener : listeners ) 627 { 628 if( listener instanceof FlowListener ) 629 addListener( (FlowListener) listener ); 630 } 631 } 632 633 protected void removeListeners( Collection listeners ) 634 { 635 if( listeners == null || listeners.isEmpty() ) 636 return; 637 638 for( Object listener : listeners ) 639 { 640 if( listener instanceof FlowListener ) 641 removeListener( (FlowListener) listener ); 642 } 643 } 644 645 List<SafeFlowListener> getListeners() 646 { 647 if( listeners == null ) 648 listeners = new LinkedList<SafeFlowListener>(); 649 650 return listeners; 651 } 652 653 @Override 654 public boolean hasListeners() 655 { 656 return listeners != null && !listeners.isEmpty(); 657 } 658 659 @Override 660 public void addListener( FlowListener flowListener ) 661 { 662 getListeners().add( new SafeFlowListener( flowListener ) ); 663 } 664 665 @Override 666 public boolean removeListener( FlowListener flowListener ) 667 { 668 return getListeners().remove( new SafeFlowListener( flowListener ) ); 669 } 670 671 @Override 672 public boolean hasStepListeners() 673 { 674 boolean hasStepListeners = false; 675 676 for( FlowStep step : getFlowSteps() ) 677 hasStepListeners |= step.hasListeners(); 678 679 return hasStepListeners; 680 } 681 682 @Override 683 public void addStepListener( FlowStepListener flowStepListener ) 684 { 685 for( FlowStep step : getFlowSteps() ) 686 step.addListener( flowStepListener ); 687 } 688 689 @Override 690 public boolean removeStepListener( FlowStepListener flowStepListener ) 691 { 692 boolean listenerRemoved = true; 693 694 for( FlowStep step : getFlowSteps() ) 695 listenerRemoved &= step.removeListener( flowStepListener ); 696 697 return listenerRemoved; 698 } 699 700 @Override 701 public Map<String, Tap> getSources() 702 { 703 return Collections.unmodifiableMap( sources ); 704 } 705 706 @Override 707 public List<String> getSourceNames() 708 { 709 return new ArrayList<String>( sources.keySet() ); 710 } 711 712 @Override 713 public Tap getSource( String name ) 714 { 715 return sources.get( name ); 716 } 717 718 @Override 719 @DependencyIncoming 720 public Collection<Tap> getSourcesCollection() 721 { 722 return getSources().values(); 723 } 724 725 @Override 726 public Map<String, Tap> getSinks() 727 { 728 return Collections.unmodifiableMap( sinks ); 729 } 730 731 @Override 732 public List<String> getSinkNames() 733 { 734 return new ArrayList<String>( sinks.keySet() ); 735 } 736 737 @Override 738 public Tap getSink( String name ) 739 { 740 return sinks.get( name ); 741 } 742 743 @Override 744 @DependencyOutgoing 745 public Collection<Tap> getSinksCollection() 746 { 747 return getSinks().values(); 748 } 749 750 @Override 751 public Tap getSink() 752 { 753 return sinks.values().iterator().next(); 754 } 755 756 @Override 757 public Map<String, Tap> getTraps() 758 { 759 return Collections.unmodifiableMap( traps ); 760 } 761 762 @Override 763 public List<String> getTrapNames() 764 { 765 return new ArrayList<String>( traps.keySet() ); 766 } 767 768 @Override 769 public Collection<Tap> getTrapsCollection() 770 { 771 return getTraps().values(); 772 } 773 774 @Override 775 public Map<String, Tap> getCheckpoints() 776 { 777 return Collections.unmodifiableMap( checkpoints ); 778 } 779 780 @Override 781 public List<String> getCheckpointNames() 782 { 783 return new ArrayList<String>( checkpoints.keySet() ); 784 } 785 786 @Override 787 public Collection<Tap> getCheckpointsCollection() 788 { 789 return getCheckpoints().values(); 790 } 791 792 @Override 793 public boolean isStopJobsOnExit() 794 { 795 return stopJobsOnExit; 796 } 797 798 @Override 799 public FlowSkipStrategy getFlowSkipStrategy() 800 { 801 return flowSkipStrategy; 802 } 803 804 @Override 805 public FlowSkipStrategy setFlowSkipStrategy( FlowSkipStrategy flowSkipStrategy ) 806 { 807 if( flowSkipStrategy == null ) 808 throw new IllegalArgumentException( "flowSkipStrategy may not be null" ); 809 810 try 811 { 812 return this.flowSkipStrategy; 813 } 814 finally 815 { 816 this.flowSkipStrategy = flowSkipStrategy; 817 } 818 } 819 820 @Override 821 public boolean isSkipFlow() throws IOException 822 { 823 return flowSkipStrategy.skipFlow( this ); 824 } 825 826 @Override 827 public boolean areSinksStale() throws IOException 828 { 829 return areSourcesNewer( getSinkModified() ); 830 } 831 832 @Override 833 public boolean areSourcesNewer( long sinkModified ) throws IOException 834 { 835 Config config = getConfig(); 836 Iterator<Tap> values = sources.values().iterator(); 837 838 long sourceModified = 0; 839 840 try 841 { 842 sourceModified = Util.getSourceModified( config, values, sinkModified ); 843 844 if( sinkModified < sourceModified ) 845 return true; 846 847 return false; 848 } 849 finally 850 { 851 if( LOG.isInfoEnabled() ) 852 logInfo( "source modification date at: " + new Date( sourceModified ) ); // not oldest, we didnt check them all 853 } 854 } 855 856 @Override 857 public long getSinkModified() throws IOException 858 { 859 long sinkModified = Util.getSinkModified( getConfig(), sinks.values() ); 860 861 if( LOG.isInfoEnabled() ) 862 { 863 if( sinkModified == -1L ) 864 logInfo( "at least one sink is marked for delete" ); 865 if( sinkModified == 0L ) 866 logInfo( "at least one sink does not exist" ); 867 else 868 logInfo( "sink oldest modified date: " + new Date( sinkModified ) ); 869 } 870 871 return sinkModified; 872 } 873 874 @Override 875 public FlowStepStrategy getFlowStepStrategy() 876 { 877 return flowStepStrategy; 878 } 879 880 @Override 881 public void setFlowStepStrategy( FlowStepStrategy flowStepStrategy ) 882 { 883 this.flowStepStrategy = flowStepStrategy; 884 } 885 886 @Override 887 public List<FlowStep<Config>> getFlowSteps() 888 { 889 if( steps != null ) 890 return steps; 891 892 if( flowStepGraph == null ) 893 return Collections.emptyList(); 894 895 Iterator<FlowStep> topoIterator = flowStepGraph.getTopologicalIterator(); 896 897 steps = new ArrayList<>(); 898 899 while( topoIterator.hasNext() ) 900 steps.add( topoIterator.next() ); 901 902 return steps; 903 } 904 905 @Override 906 @ProcessPrepare 907 public void prepare() 908 { 909 try 910 { 911 deleteSinksIfNotUpdate(); 912 deleteTrapsIfNotUpdate(); 913 deleteCheckpointsIfNotUpdate(); 914 } 915 catch( IOException exception ) 916 { 917 throw new FlowTapException( "unable to prepare flow", exception ); 918 } 919 } 920 921 @Override 922 @ProcessStart 923 public synchronized void start() 924 { 925 if( thread != null ) 926 return; 927 928 if( stop ) 929 return; 930 931 registerShutdownHook(); 932 933 internalStart(); 934 935 String threadName = ( "flow " + Util.toNull( getName() ) ).trim(); 936 937 thread = createFlowThread( threadName ); 938 939 thread.start(); 940 } 941 942 protected Thread createFlowThread( String threadName ) 943 { 944 return new Thread( new Runnable() 945 { 946 @Override 947 public void run() 948 { 949 BaseFlow.this.run(); 950 } 951 }, threadName ); 952 } 953 954 protected abstract void internalStart(); 955 956 @Override 957 @ProcessStop 958 public synchronized void stop() 959 { 960 stopLock.lock(); 961 962 try 963 { 964 if( stop ) 965 return; 966 967 stop = true; 968 969 fireOnStopping(); 970 971 if( !flowStats.isFinished() ) 972 flowStats.markStopped(); 973 974 internalStopAllJobs(); 975 976 handleExecutorShutdown(); 977 978 internalClean( true ); 979 } 980 finally 981 { 982 flowStats.cleanup(); 983 stopLock.unlock(); 984 } 985 } 986 987 protected abstract void internalClean( boolean stop ); 988 989 @Override 990 @ProcessComplete 991 public void complete() 992 { 993 if( !completed ) 994 start(); 995 996 try 997 { 998 try 999 { 1000 synchronized( this ) // prevent NPE on quick stop() & complete() after start() 1001 { 1002 while( !completed && thread == null && !stop ) 1003 Util.safeSleep( 10 ); 1004 } 1005 1006 if( thread != null ) 1007 thread.join(); 1008 } 1009 catch( InterruptedException exception ) 1010 { 1011 throw new FlowException( getName(), "thread interrupted", exception ); 1012 } 1013 1014 // if in #stop and stopping, lets wait till its done in this thread 1015 try 1016 { 1017 stopLock.lock(); 1018 } 1019 finally 1020 { 1021 stopLock.unlock(); 1022 } 1023 1024 // if multiple threads enter #complete, each will get a copy of the exception, if any 1025 if( throwable instanceof FlowException ) 1026 ( (FlowException) throwable ).setFlowName( getName() ); 1027 1028 if( throwable instanceof CascadingException ) 1029 throw (CascadingException) throwable; 1030 1031 if( throwable instanceof OutOfMemoryError ) 1032 throw (OutOfMemoryError) throwable; 1033 1034 if( throwable != null ) 1035 throw new FlowException( getName(), "unhandled exception", throwable ); 1036 1037 if( hasListeners() ) 1038 { 1039 for( SafeFlowListener safeFlowListener : getListeners() ) 1040 { 1041 if( safeFlowListener.throwable != null ) 1042 throw new FlowException( getName(), "unhandled listener exception", throwable ); 1043 } 1044 } 1045 } 1046 finally 1047 { 1048 completed = true; 1049 thread = null; 1050 throwable = null; 1051 } 1052 } 1053 1054 private void commitTraps() 1055 { 1056 // commit all the traps, don't fail on an error 1057 for( Tap tap : traps.values() ) 1058 { 1059 try 1060 { 1061 if( !tap.commitResource( getConfig() ) ) 1062 logError( "unable to commit trap: " + tap.getFullIdentifier( getConfig() ) ); 1063 } 1064 catch( IOException exception ) 1065 { 1066 logError( "unable to commit trap: " + tap.getFullIdentifier( getConfig() ), exception ); 1067 } 1068 } 1069 } 1070 1071 @Override 1072 @ProcessCleanup 1073 public void cleanup() 1074 { 1075 // do nothing 1076 } 1077 1078 @Override 1079 public TupleEntryIterator openSource() throws IOException 1080 { 1081 return sources.values().iterator().next().openForRead( getFlowProcess() ); 1082 } 1083 1084 @Override 1085 public TupleEntryIterator openSource( String name ) throws IOException 1086 { 1087 if( !sources.containsKey( name ) ) 1088 throw new IllegalArgumentException( "source does not exist: " + name ); 1089 1090 return sources.get( name ).openForRead( getFlowProcess() ); 1091 } 1092 1093 @Override 1094 public TupleEntryIterator openSink() throws IOException 1095 { 1096 return sinks.values().iterator().next().openForRead( getFlowProcess() ); 1097 } 1098 1099 @Override 1100 public TupleEntryIterator openSink( String name ) throws IOException 1101 { 1102 if( !sinks.containsKey( name ) ) 1103 throw new IllegalArgumentException( "sink does not exist: " + name ); 1104 1105 return sinks.get( name ).openForRead( getFlowProcess() ); 1106 } 1107 1108 @Override 1109 public TupleEntryIterator openTrap() throws IOException 1110 { 1111 return traps.values().iterator().next().openForRead( getFlowProcess() ); 1112 } 1113 1114 @Override 1115 public TupleEntryIterator openTrap( String name ) throws IOException 1116 { 1117 if( !traps.containsKey( name ) ) 1118 throw new IllegalArgumentException( "trap does not exist: " + name ); 1119 1120 return traps.get( name ).openForRead( getFlowProcess() ); 1121 } 1122 1123 /** 1124 * Method deleteSinks deletes all sinks, whether or not they are configured for {@link cascading.tap.SinkMode#UPDATE}. 1125 * <p/> 1126 * Use with caution. 1127 * 1128 * @throws IOException when 1129 * @see BaseFlow#deleteSinksIfNotUpdate() 1130 */ 1131 public void deleteSinks() throws IOException 1132 { 1133 for( Tap tap : sinks.values() ) 1134 deleteOrFail( tap ); 1135 } 1136 1137 private void deleteOrFail( Tap tap ) throws IOException 1138 { 1139 if( !tap.resourceExists( getConfig() ) ) 1140 return; 1141 1142 if( !tap.deleteResource( getConfig() ) ) 1143 throw new FlowTapException( "unable to delete resource: " + tap.getFullIdentifier( getFlowProcess() ) ); 1144 } 1145 1146 /** 1147 * Method deleteSinksIfNotUpdate deletes all sinks if they are not configured with the {@link cascading.tap.SinkMode#UPDATE} flag. 1148 * <p/> 1149 * Typically used by a {@link Cascade} before executing the flow if the sinks are stale. 1150 * <p/> 1151 * Use with caution. 1152 * 1153 * @throws IOException when 1154 */ 1155 public void deleteSinksIfNotUpdate() throws IOException 1156 { 1157 for( Tap tap : sinks.values() ) 1158 { 1159 if( !tap.isUpdate() ) 1160 deleteOrFail( tap ); 1161 } 1162 } 1163 1164 /** 1165 * Method deleteSinksIfReplace deletes all sinks that are configured with the {@link cascading.tap.SinkMode#REPLACE} flag. 1166 * 1167 * @throws IOException 1168 */ 1169 public void deleteSinksIfReplace() throws IOException 1170 { 1171 // verify all sinks before incrementally deleting for a replace 1172 for( Tap tap : sinks.values() ) 1173 { 1174 if( tap.isKeep() && tap.resourceExists( getConfig() ) ) 1175 throw new FlowTapException( "resource exists and sink mode is KEEP, cannot overwrite: " + tap.getFullIdentifier( getFlowProcess() ) ); 1176 } 1177 1178 for( Tap tap : sinks.values() ) 1179 { 1180 if( tap.isReplace() ) 1181 deleteOrFail( tap ); 1182 } 1183 } 1184 1185 public void deleteTrapsIfNotUpdate() throws IOException 1186 { 1187 for( Tap tap : traps.values() ) 1188 { 1189 if( !tap.isUpdate() ) 1190 deleteOrFail( tap ); 1191 } 1192 } 1193 1194 public void deleteCheckpointsIfNotUpdate() throws IOException 1195 { 1196 for( Tap tap : checkpoints.values() ) 1197 { 1198 if( !tap.isUpdate() ) 1199 deleteOrFail( tap ); 1200 } 1201 } 1202 1203 public void deleteTrapsIfReplace() throws IOException 1204 { 1205 for( Tap tap : traps.values() ) 1206 { 1207 if( tap.isReplace() ) 1208 deleteOrFail( tap ); 1209 } 1210 } 1211 1212 public void deleteCheckpointsIfReplace() throws IOException 1213 { 1214 for( Tap tap : checkpoints.values() ) 1215 { 1216 if( tap.isReplace() ) 1217 deleteOrFail( tap ); 1218 } 1219 } 1220 1221 @Override 1222 public boolean resourceExists( Tap tap ) throws IOException 1223 { 1224 return tap.resourceExists( getConfig() ); 1225 } 1226 1227 @Override 1228 public TupleEntryIterator openTapForRead( Tap tap ) throws IOException 1229 { 1230 return tap.openForRead( getFlowProcess() ); 1231 } 1232 1233 @Override 1234 public TupleEntryCollector openTapForWrite( Tap tap ) throws IOException 1235 { 1236 return tap.openForWrite( getFlowProcess() ); 1237 } 1238 1239 /** Method run implements the Runnable run method and should not be called by users. */ 1240 private void run() 1241 { 1242 if( thread == null ) 1243 throw new IllegalStateException( "to start a Flow call start() or complete(), not Runnable#run()" ); 1244 1245 Version.printBanner(); 1246 Update.checkForUpdate( getPlatformInfo() ); 1247 1248 try 1249 { 1250 if( stop ) 1251 return; 1252 1253 flowStats.markStarted(); 1254 1255 fireOnStarting(); 1256 1257 if( isInfoEnabled() ) 1258 { 1259 logInfo( "starting" ); 1260 1261 for( Tap source : getSourcesCollection() ) 1262 logInfo( " source: " + source ); 1263 for( Tap sink : getSinksCollection() ) 1264 logInfo( " sink: " + sink ); 1265 } 1266 1267 spawnSteps(); 1268 } 1269 catch( Throwable throwable ) 1270 { 1271 this.throwable = throwable; 1272 } 1273 finally 1274 { 1275 handleThrowableAndMarkFailed(); 1276 1277 if( !stop && !flowStats.isFinished() ) 1278 flowStats.markSuccessful(); 1279 1280 internalClean( stop ); // cleaning temp taps may be determined by success/failure 1281 1282 commitTraps(); 1283 1284 try 1285 { 1286 fireOnCompleted(); 1287 } 1288 finally 1289 { 1290 if( LOG.isInfoEnabled() ) 1291 { 1292 long totalSliceCPUSeconds = getTotalSliceCPUMilliSeconds(); 1293 1294 if( totalSliceCPUSeconds == -1 ) 1295 logInfo( " completed in: " + formatDurationFromMillis( flowStats.getDuration() ) ); 1296 else 1297 logInfo( " completed in: " + formatDurationFromMillis( flowStats.getDuration() ) + ", using cpu time: " + formatDurationFromMillis( totalSliceCPUSeconds ) ); 1298 } 1299 1300 flowStats.cleanup(); 1301 internalShutdown(); 1302 deregisterShutdownHook(); 1303 } 1304 } 1305 } 1306 1307 /** 1308 * Reentrant method to launch all step jobs, returns false if any of the current steps failed. 1309 * 1310 * @return 1311 * @throws InterruptedException 1312 * @throws ExecutionException 1313 */ 1314 protected boolean spawnSteps() throws InterruptedException, ExecutionException 1315 { 1316 // if jobs are run local, then only use one thread to force execution serially 1317 //int numThreads = jobsAreLocal() ? 1 : getMaxConcurrentSteps( getJobConf() ); 1318 int numThreads = getMaxNumParallelSteps(); 1319 int eligibleJobsSize = getEligibleJobsSize(); // only jobs that have not previously been started 1320 1321 if( numThreads == 0 ) 1322 numThreads = eligibleJobsSize; 1323 1324 if( numThreads == 0 ) 1325 throw new IllegalStateException( "no jobs rendered for flow: " + getName() ); 1326 1327 if( LOG.isInfoEnabled() ) 1328 { 1329 logInfo( " parallel execution of steps is enabled: " + ( getMaxNumParallelSteps() != 1 ) ); 1330 logInfo( " executing total steps: " + eligibleJobsSize ); 1331 logInfo( " allocating management threads: " + numThreads ); 1332 } 1333 1334 List<Future<Throwable>> futures = spawnJobs( numThreads ); 1335 1336 for( Future<Throwable> future : futures ) 1337 { 1338 throwable = future.get(); 1339 1340 if( throwable != null ) 1341 { 1342 if( !stop ) 1343 internalStopAllJobs(); 1344 1345 handleExecutorShutdown(); 1346 return false; 1347 } 1348 } 1349 1350 return true; 1351 } 1352 1353 protected long getTotalSliceCPUMilliSeconds() 1354 { 1355 return -1; 1356 } 1357 1358 protected abstract int getMaxNumParallelSteps(); 1359 1360 protected abstract void internalShutdown(); 1361 1362 private List<Future<Throwable>> spawnJobs( int numThreads ) throws InterruptedException 1363 { 1364 if( spawnStrategy == null ) 1365 { 1366 logError( "no spawnStrategy set" ); 1367 return new ArrayList<>(); 1368 } 1369 1370 if( stop ) 1371 return new ArrayList<>(); 1372 1373 List<Callable<Throwable>> list = getJobMapCallables(); 1374 1375 return spawnStrategy.start( this, numThreads, list ); 1376 } 1377 1378 private void handleThrowableAndMarkFailed() 1379 { 1380 if( throwable != null && !stop ) 1381 { 1382 flowStats.markFailed( throwable ); 1383 1384 fireOnThrowable(); 1385 } 1386 } 1387 1388 Map<String, FlowStepJob<Config>> getJobsMap() 1389 { 1390 return jobsMap; 1391 } 1392 1393 protected boolean isJobsMapInitialized() 1394 { 1395 return jobsMap != null; 1396 } 1397 1398 protected int getEligibleJobsSize() 1399 { 1400 return getJobMapCallables().size(); 1401 } 1402 1403 protected List<Callable<Throwable>> getJobMapCallables() 1404 { 1405 List<Callable<Throwable>> list = new ArrayList<>(); 1406 1407 for( FlowStepJob<Config> job : jobsMap.values() ) 1408 { 1409 if( job.isCallableStarted() ) 1410 continue; 1411 1412 list.add( job ); 1413 } 1414 1415 return list; 1416 } 1417 1418 protected void initializeNewJobsMap() 1419 { 1420 jobsMap = updateJobsMap( flowStepGraph, new LinkedHashMap<String, FlowStepJob<Config>>() ); // keep topo order 1421 } 1422 1423 protected void updateJobsMap() 1424 { 1425 jobsMap = updateJobsMap( flowStepGraph, new LinkedHashMap<>( jobsMap ) ); 1426 } 1427 1428 Map<String, FlowStepJob<Config>> updateJobsMap( FlowStepGraph flowStepGraph, Map<String, FlowStepJob<Config>> jobsMap ) 1429 { 1430 Iterator<FlowStep> iterator = flowStepGraph.getTopologicalIterator(); 1431 1432 while( iterator.hasNext() ) 1433 { 1434 BaseFlowStep<Config> step = (BaseFlowStep) iterator.next(); 1435 FlowStepJob<Config> flowStepJob = jobsMap.get( step.getID() ); 1436 1437 if( flowStepJob == null ) 1438 { 1439 flowStepJob = step.getCreateFlowStepJob( getFlowProcess(), getConfig() ); 1440 1441 jobsMap.put( step.getID(), flowStepJob ); 1442 } 1443 1444 List<FlowStepJob<Config>> predecessors = new ArrayList<>(); 1445 1446 for( Object flowStep : ProcessGraphs.predecessorListOf( flowStepGraph, step ) ) 1447 predecessors.add( jobsMap.get( ( (FlowStep) flowStep ).getID() ) ); 1448 1449 flowStepJob.setPredecessors( predecessors ); 1450 } 1451 1452 return jobsMap; 1453 } 1454 1455 protected void initializeChildStats() 1456 { 1457 for( FlowStepJob<Config> flowStepJob : jobsMap.values() ) 1458 flowStats.addStepStats( flowStepJob.getStepStats() ); 1459 } 1460 1461 protected void internalStopAllJobs() 1462 { 1463 logInfo( "stopping all jobs" ); 1464 1465 try 1466 { 1467 if( jobsMap == null ) 1468 return; 1469 1470 List<FlowStepJob<Config>> jobs = new ArrayList<FlowStepJob<Config>>( jobsMap.values() ); 1471 1472 Collections.reverse( jobs ); 1473 1474 for( FlowStepJob<Config> job : jobs ) 1475 job.stop(); 1476 } 1477 finally 1478 { 1479 logInfo( "stopped all jobs" ); 1480 } 1481 } 1482 1483 protected void handleExecutorShutdown() 1484 { 1485 if( spawnStrategy == null ) 1486 return; 1487 1488 if( spawnStrategy.isCompleted( this ) ) 1489 return; 1490 1491 logDebug( "shutting down job executor" ); 1492 1493 try 1494 { 1495 spawnStrategy.complete( this, 5 * 60, TimeUnit.SECONDS ); 1496 } 1497 catch( InterruptedException exception ) 1498 { 1499 // ignore 1500 } 1501 1502 logDebug( "shutdown of job executor complete" ); 1503 } 1504 1505 protected void fireOnCompleted() 1506 { 1507 if( hasListeners() ) 1508 { 1509 if( isDebugEnabled() ) 1510 logDebug( "firing onCompleted event: " + getListeners().size() ); 1511 1512 for( FlowListener flowListener : getListeners() ) 1513 flowListener.onCompleted( this ); 1514 } 1515 } 1516 1517 protected void fireOnThrowable( Throwable throwable ) 1518 { 1519 this.throwable = throwable; 1520 fireOnThrowable(); 1521 } 1522 1523 protected void fireOnThrowable() 1524 { 1525 if( hasListeners() ) 1526 { 1527 if( isDebugEnabled() ) 1528 logDebug( "firing onThrowable event: " + getListeners().size() ); 1529 1530 boolean isHandled = false; 1531 1532 for( FlowListener flowListener : getListeners() ) 1533 isHandled = flowListener.onThrowable( this, throwable ) || isHandled; 1534 1535 if( isHandled ) 1536 throwable = null; 1537 } 1538 } 1539 1540 protected void fireOnStopping() 1541 { 1542 if( hasListeners() ) 1543 { 1544 if( isDebugEnabled() ) 1545 logDebug( "firing onStopping event: " + getListeners().size() ); 1546 1547 for( FlowListener flowListener : getListeners() ) 1548 flowListener.onStopping( this ); 1549 } 1550 } 1551 1552 protected void fireOnStarting() 1553 { 1554 if( hasListeners() ) 1555 { 1556 if( isDebugEnabled() ) 1557 logDebug( "firing onStarting event: " + getListeners().size() ); 1558 1559 for( FlowListener flowListener : getListeners() ) 1560 flowListener.onStarting( this ); 1561 } 1562 } 1563 1564 @Override 1565 public String toString() 1566 { 1567 StringBuffer buffer = new StringBuffer(); 1568 1569 if( getName() != null ) 1570 buffer.append( getName() ).append( ": " ); 1571 1572 for( FlowStep step : getFlowSteps() ) 1573 buffer.append( step ); 1574 1575 return buffer.toString(); 1576 } 1577 1578 @Override 1579 public final boolean isInfoEnabled() 1580 { 1581 return LOG.isInfoEnabled(); 1582 } 1583 1584 @Override 1585 public final boolean isDebugEnabled() 1586 { 1587 return LOG.isDebugEnabled(); 1588 } 1589 1590 @Override 1591 public void logInfo( String message, Object... arguments ) 1592 { 1593 LOG.info( "[" + Util.truncate( getName(), LOG_FLOW_NAME_MAX ) + "] " + message, arguments ); 1594 } 1595 1596 @Override 1597 public void logDebug( String message, Object... arguments ) 1598 { 1599 LOG.debug( "[" + Util.truncate( getName(), LOG_FLOW_NAME_MAX ) + "] " + message, arguments ); 1600 } 1601 1602 @Override 1603 public void logWarn( String message ) 1604 { 1605 LOG.warn( "[" + Util.truncate( getName(), LOG_FLOW_NAME_MAX ) + "] " + message ); 1606 } 1607 1608 @Override 1609 public void logWarn( String message, Throwable throwable ) 1610 { 1611 LOG.warn( "[" + Util.truncate( getName(), LOG_FLOW_NAME_MAX ) + "] " + message, throwable ); 1612 } 1613 1614 @Override 1615 public void logWarn( String message, Object... arguments ) 1616 { 1617 LOG.warn( "[" + Util.truncate( getName(), LOG_FLOW_NAME_MAX ) + "] " + message, arguments ); 1618 } 1619 1620 @Override 1621 public void logError( String message, Object... arguments ) 1622 { 1623 LOG.error( "[" + Util.truncate( getName(), LOG_FLOW_NAME_MAX ) + "] " + message, arguments ); 1624 } 1625 1626 @Override 1627 public void logError( String message, Throwable throwable ) 1628 { 1629 LOG.error( "[" + Util.truncate( getName(), LOG_FLOW_NAME_MAX ) + "] " + message, throwable ); 1630 } 1631 1632 @Override 1633 public void writeDOT( String filename ) 1634 { 1635 if( flowElementGraph == null ) 1636 throw new UnsupportedOperationException( "this flow instance cannot write a DOT file" ); 1637 1638 flowElementGraph.writeDOT( filename ); 1639 } 1640 1641 @Override 1642 public void writeStepsDOT( String filename ) 1643 { 1644 if( flowStepGraph == null ) 1645 throw new UnsupportedOperationException( "this flow instance cannot write a DOT file" ); 1646 1647 flowStepGraph.writeDOT( filename ); 1648 } 1649 1650 /** 1651 * Used to return a simple wrapper for use as an edge in a graph where there can only be 1652 * one instance of every edge. 1653 * 1654 * @return FlowHolder 1655 */ 1656 public FlowHolder getHolder() 1657 { 1658 return new FlowHolder( this ); 1659 } 1660 1661 public void setCascade( Cascade cascade ) 1662 { 1663 setConfigProperty( getConfig(), "cascading.cascade.id", cascade.getID() ); 1664 flowStats.recordInfo(); 1665 } 1666 1667 @Override 1668 public String getCascadeID() 1669 { 1670 return getProperty( "cascading.cascade.id" ); 1671 } 1672 1673 @Override 1674 public String getRunID() 1675 { 1676 return runID; 1677 } 1678 1679 public List<String> getClassPath() 1680 { 1681 return classPath; 1682 } 1683 1684 @Override 1685 public void setSpawnStrategy( UnitOfWorkSpawnStrategy spawnStrategy ) 1686 { 1687 this.spawnStrategy = spawnStrategy; 1688 } 1689 1690 @Override 1691 public UnitOfWorkSpawnStrategy getSpawnStrategy() 1692 { 1693 return spawnStrategy; 1694 } 1695 1696 protected void registerShutdownHook() 1697 { 1698 if( !isStopJobsOnExit() ) 1699 return; 1700 1701 shutdownHook = new ShutdownUtil.Hook() 1702 { 1703 @Override 1704 public Priority priority() 1705 { 1706 return Priority.WORK_CHILD; 1707 } 1708 1709 @Override 1710 public void execute() 1711 { 1712 logInfo( "shutdown hook calling stop on flow" ); 1713 1714 BaseFlow.this.stop(); 1715 } 1716 }; 1717 1718 ShutdownUtil.addHook( shutdownHook ); 1719 } 1720 1721 private void deregisterShutdownHook() 1722 { 1723 if( !isStopJobsOnExit() || stop ) 1724 return; 1725 1726 ShutdownUtil.removeHook( shutdownHook ); 1727 } 1728 1729 /** Class FlowHolder is a helper class for wrapping Flow instances. */ 1730 public static class FlowHolder 1731 { 1732 /** Field flow */ 1733 public Flow flow; 1734 1735 public FlowHolder() 1736 { 1737 } 1738 1739 public FlowHolder( Flow flow ) 1740 { 1741 this.flow = flow; 1742 } 1743 } 1744 1745 /** 1746 * Class SafeFlowListener safely calls a wrapped FlowListener. 1747 * <p/> 1748 * This is done for a few reasons, the primary reason is so exceptions thrown by the Listener 1749 * can be caught by the calling Thread. Since Flow is asynchronous, much of the work is done in the run() method 1750 * which in turn is run in a new Thread. 1751 */ 1752 private class SafeFlowListener implements FlowListener 1753 { 1754 /** Field flowListener */ 1755 final FlowListener flowListener; 1756 /** Field throwable */ 1757 Throwable throwable; 1758 1759 private SafeFlowListener( FlowListener flowListener ) 1760 { 1761 this.flowListener = flowListener; 1762 } 1763 1764 public void onStarting( Flow flow ) 1765 { 1766 try 1767 { 1768 flowListener.onStarting( flow ); 1769 } 1770 catch( Throwable throwable ) 1771 { 1772 handleThrowable( throwable ); 1773 } 1774 } 1775 1776 public void onStopping( Flow flow ) 1777 { 1778 try 1779 { 1780 flowListener.onStopping( flow ); 1781 } 1782 catch( Throwable throwable ) 1783 { 1784 handleThrowable( throwable ); 1785 } 1786 } 1787 1788 public void onCompleted( Flow flow ) 1789 { 1790 try 1791 { 1792 flowListener.onCompleted( flow ); 1793 } 1794 catch( Throwable throwable ) 1795 { 1796 handleThrowable( throwable ); 1797 } 1798 } 1799 1800 public boolean onThrowable( Flow flow, Throwable flowThrowable ) 1801 { 1802 try 1803 { 1804 return flowListener.onThrowable( flow, flowThrowable ); 1805 } 1806 catch( Throwable throwable ) 1807 { 1808 handleThrowable( throwable ); 1809 } 1810 1811 return false; 1812 } 1813 1814 private void handleThrowable( Throwable throwable ) 1815 { 1816 this.throwable = throwable; 1817 1818 logWarn( String.format( "flow listener %s threw throwable", flowListener ), throwable ); 1819 1820 // stop this flow 1821 stop(); 1822 } 1823 1824 public boolean equals( Object object ) 1825 { 1826 if( object instanceof BaseFlow.SafeFlowListener ) 1827 return flowListener.equals( ( (BaseFlow.SafeFlowListener) object ).flowListener ); 1828 1829 return flowListener.equals( object ); 1830 } 1831 1832 public int hashCode() 1833 { 1834 return flowListener.hashCode(); 1835 } 1836 } 1837 }