001 /* 002 * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.flow; 022 023 import java.io.IOException; 024 import java.util.ArrayList; 025 import java.util.Collection; 026 import java.util.Collections; 027 import java.util.Date; 028 import java.util.HashMap; 029 import java.util.HashSet; 030 import java.util.Iterator; 031 import java.util.LinkedHashMap; 032 import java.util.LinkedList; 033 import java.util.List; 034 import java.util.Map; 035 import java.util.Properties; 036 import java.util.Set; 037 import java.util.concurrent.Callable; 038 import java.util.concurrent.Future; 039 import java.util.concurrent.TimeUnit; 040 import java.util.concurrent.locks.ReentrantLock; 041 042 import cascading.CascadingException; 043 import cascading.cascade.Cascade; 044 import cascading.flow.planner.BaseFlowStep; 045 import cascading.flow.planner.ElementGraph; 046 import cascading.flow.planner.FlowStepGraph; 047 import cascading.flow.planner.FlowStepJob; 048 import cascading.flow.planner.PlatformInfo; 049 import cascading.management.CascadingServices; 050 import cascading.management.UnitOfWorkExecutorStrategy; 051 import cascading.management.UnitOfWorkSpawnStrategy; 052 import cascading.management.state.ClientState; 053 import cascading.property.AppProps; 054 import cascading.property.PropertyUtil; 055 import cascading.stats.FlowStats; 056 import cascading.tap.Tap; 057 import cascading.tuple.Fields; 058 import cascading.tuple.TupleEntryCollector; 059 import cascading.tuple.TupleEntryIterator; 060 import cascading.util.ShutdownUtil; 061 import cascading.util.Update; 062 import cascading.util.Util; 063 import cascading.util.Version; 064 import org.jgrapht.traverse.TopologicalOrderIterator; 065 import org.slf4j.Logger; 066 import org.slf4j.LoggerFactory; 067 import riffle.process.DependencyIncoming; 068 import riffle.process.DependencyOutgoing; 069 import riffle.process.ProcessCleanup; 070 import riffle.process.ProcessComplete; 071 import riffle.process.ProcessPrepare; 072 import riffle.process.ProcessStart; 073 import riffle.process.ProcessStop; 074 075 import static org.jgrapht.Graphs.predecessorListOf; 076 077 @riffle.process.Process 078 public abstract class BaseFlow<Config> implements Flow<Config> 079 { 080 /** Field LOG */ 081 private static final Logger LOG = LoggerFactory.getLogger( Flow.class ); 082 083 private PlatformInfo platformInfo; 084 085 /** Field id */ 086 private String id; 087 /** Field name */ 088 private String name; 089 /** Fields runID */ 090 private String runID; 091 /** Fields classpath */ 092 private List<String> classPath; // may remain null 093 /** Field tags */ 094 private String tags; 095 /** Field listeners */ 096 private List<SafeFlowListener> listeners; 097 /** Field skipStrategy */ 098 private FlowSkipStrategy flowSkipStrategy = new FlowSkipIfSinkNotStale(); 099 /** Field flowStats */ 100 protected FlowStats flowStats; // don't use a listener to set values 101 /** Field sources */ 102 protected Map<String, Tap> sources = Collections.EMPTY_MAP; 103 /** Field sinks */ 104 protected Map<String, Tap> sinks = Collections.EMPTY_MAP; 105 /** Field traps */ 106 private Map<String, Tap> traps = Collections.EMPTY_MAP; 107 /** Field checkpoints */ 108 private Map<String, Tap> checkpoints = Collections.EMPTY_MAP; 109 /** Field stopJobsOnExit */ 110 protected boolean stopJobsOnExit = true; 111 /** Field submitPriority */ 112 private int submitPriority = 5; 113 114 /** Field stepGraph */ 115 private FlowStepGraph<Config> flowStepGraph; 116 /** Field thread */ 117 protected transient Thread thread; 118 /** Field throwable */ 119 private Throwable throwable; 120 /** Field stop */ 121 protected boolean stop; 122 123 /** Field pipeGraph */ 124 private ElementGraph pipeGraph; // only used for documentation purposes 125 126 private transient CascadingServices cascadingServices; 127 128 private FlowStepStrategy<Config> flowStepStrategy = null; 129 /** Field steps */ 130 private transient List<FlowStep<Config>> steps; 131 /** Field jobsMap */ 132 private transient Map<String, FlowStepJob<Config>> jobsMap; 133 private transient UnitOfWorkSpawnStrategy spawnStrategy = new UnitOfWorkExecutorStrategy(); 134 135 private transient ReentrantLock stopLock = new ReentrantLock( true ); 136 protected ShutdownUtil.Hook shutdownHook; 137 138 private HashMap<String, String> flowDescriptor; 139 140 /** 141 * Returns property stopJobsOnExit. 142 * 143 * @param properties of type Map 144 * @return a boolean 145 */ 146 static boolean getStopJobsOnExit( Map<Object, Object> properties ) 147 { 148 return Boolean.parseBoolean( PropertyUtil.getProperty( properties, FlowProps.STOP_JOBS_ON_EXIT, "true" ) ); 149 } 150 151 /** Used for testing. */ 152 protected BaseFlow() 153 { 154 this.name = "NA"; 155 this.flowStats = createPrepareFlowStats(); 156 } 157 158 protected BaseFlow( PlatformInfo platformInfo, Map<Object, Object> properties, Config defaultConfig, String name ) 159 { 160 this( platformInfo, properties, defaultConfig, name, new LinkedHashMap<String, String>() ); 161 } 162 163 protected BaseFlow( PlatformInfo platformInfo, Map<Object, Object> properties, Config defaultConfig, String name, Map<String, String> flowDescriptor ) 164 { 165 this.platformInfo = platformInfo; 166 this.name = name; 167 168 if( flowDescriptor != null ) 169 this.flowDescriptor = new LinkedHashMap<String, String>( flowDescriptor ); 170 171 addSessionProperties( properties ); 172 initConfig( properties, defaultConfig ); 173 174 this.flowStats = createPrepareFlowStats(); // must be last 175 } 176 177 protected BaseFlow( PlatformInfo platformInfo, Map<Object, Object> properties, Config defaultConfig, FlowDef flowDef ) 178 { 179 this.platformInfo = platformInfo; 180 this.name = flowDef.getName(); 181 this.tags = flowDef.getTags(); 182 this.runID = flowDef.getRunID(); 183 this.classPath = flowDef.getClassPath(); 184 185 if( !flowDef.getFlowDescriptor().isEmpty() ) 186 this.flowDescriptor = new LinkedHashMap<String, String>( flowDef.getFlowDescriptor() ); 187 188 addSessionProperties( properties ); 189 initConfig( properties, defaultConfig ); 190 setSources( flowDef.getSourcesCopy() ); 191 setSinks( flowDef.getSinksCopy() ); 192 setTraps( flowDef.getTrapsCopy() ); 193 setCheckpoints( flowDef.getCheckpointsCopy() ); 194 initFromTaps(); 195 196 retrieveSourceFields(); 197 retrieveSinkFields(); 198 } 199 200 public PlatformInfo getPlatformInfo() 201 { 202 return platformInfo; 203 } 204 205 public void initialize( ElementGraph pipeGraph, FlowStepGraph<Config> flowStepGraph ) 206 { 207 this.pipeGraph = pipeGraph; 208 this.flowStepGraph = flowStepGraph; 209 210 initSteps(); 211 212 this.flowStats = createPrepareFlowStats(); // must be last 213 214 initializeNewJobsMap(); 215 } 216 217 public ElementGraph updateSchemes( ElementGraph pipeGraph ) 218 { 219 presentSourceFields( pipeGraph ); 220 221 presentSinkFields( pipeGraph ); 222 223 return new ElementGraph( pipeGraph ); 224 } 225 226 /** Force a Scheme to fetch any fields from a meta-data store */ 227 protected void retrieveSourceFields() 228 { 229 for( Tap tap : sources.values() ) 230 tap.retrieveSourceFields( getFlowProcess() ); 231 } 232 233 /** 234 * Present the current resolved fields for the Tap 235 * 236 * @param pipeGraph 237 */ 238 protected void presentSourceFields( ElementGraph pipeGraph ) 239 { 240 for( Tap tap : sources.values() ) 241 { 242 if( pipeGraph.containsVertex( tap ) ) 243 tap.presentSourceFields( getFlowProcess(), getFieldsFor( pipeGraph, tap ) ); 244 } 245 246 for( Tap tap : checkpoints.values() ) 247 { 248 if( pipeGraph.containsVertex( tap ) ) 249 tap.presentSourceFields( getFlowProcess(), getFieldsFor( pipeGraph, tap ) ); 250 } 251 } 252 253 /** Force a Scheme to fetch any fields from a meta-data store */ 254 protected void retrieveSinkFields() 255 { 256 for( Tap tap : sinks.values() ) 257 tap.retrieveSinkFields( getFlowProcess() ); 258 } 259 260 /** 261 * Present the current resolved fields for the Tap 262 * 263 * @param pipeGraph 264 */ 265 protected void presentSinkFields( ElementGraph pipeGraph ) 266 { 267 for( Tap tap : sinks.values() ) 268 { 269 if( pipeGraph.containsVertex( tap ) ) 270 tap.presentSinkFields( getFlowProcess(), getFieldsFor( pipeGraph, tap ) ); 271 } 272 273 for( Tap tap : checkpoints.values() ) 274 { 275 if( pipeGraph.containsVertex( tap ) ) 276 tap.presentSinkFields( getFlowProcess(), getFieldsFor( pipeGraph, tap ) ); 277 } 278 } 279 280 protected Fields getFieldsFor( ElementGraph pipeGraph, Tap tap ) 281 { 282 return pipeGraph.outgoingEdgesOf( tap ).iterator().next().getOutValuesFields(); 283 } 284 285 private void addSessionProperties( Map<Object, Object> properties ) 286 { 287 if( properties == null ) 288 return; 289 290 PropertyUtil.setProperty( properties, CASCADING_FLOW_ID, getID() ); 291 PropertyUtil.setProperty( properties, "cascading.flow.tags", getTags() ); 292 AppProps.setApplicationID( properties ); 293 PropertyUtil.setProperty( properties, "cascading.app.name", makeAppName( properties ) ); 294 PropertyUtil.setProperty( properties, "cascading.app.version", makeAppVersion( properties ) ); 295 } 296 297 private String makeAppName( Map<Object, Object> properties ) 298 { 299 if( properties == null ) 300 return null; 301 302 String name = AppProps.getApplicationName( properties ); 303 304 if( name != null ) 305 return name; 306 307 return Util.findName( AppProps.getApplicationJarPath( properties ) ); 308 } 309 310 private String makeAppVersion( Map<Object, Object> properties ) 311 { 312 if( properties == null ) 313 return null; 314 315 String name = AppProps.getApplicationVersion( properties ); 316 317 if( name != null ) 318 return name; 319 320 return Util.findVersion( AppProps.getApplicationJarPath( properties ) ); 321 } 322 323 private FlowStats createPrepareFlowStats() 324 { 325 FlowStats flowStats = new FlowStats( this, getClientState() ); 326 327 flowStats.prepare(); 328 flowStats.markPending(); 329 330 return flowStats; 331 } 332 333 public CascadingServices getCascadingServices() 334 { 335 if( cascadingServices == null ) 336 cascadingServices = new CascadingServices( getConfigAsProperties() ); 337 338 return cascadingServices; 339 } 340 341 private ClientState getClientState() 342 { 343 return getFlowSession().getCascadingServices().createClientState( getID() ); 344 } 345 346 protected void initSteps() 347 { 348 if( flowStepGraph == null ) 349 return; 350 351 for( Object flowStep : flowStepGraph.vertexSet() ) 352 ( (BaseFlowStep<Config>) flowStep ).setFlow( this ); 353 } 354 355 private void initFromTaps() 356 { 357 initFromTaps( sources ); 358 initFromTaps( sinks ); 359 initFromTaps( traps ); 360 } 361 362 private void initFromTaps( Map<String, Tap> taps ) 363 { 364 for( Tap tap : taps.values() ) 365 tap.flowConfInit( this ); 366 } 367 368 @Override 369 public String getName() 370 { 371 return name; 372 } 373 374 protected void setName( String name ) 375 { 376 this.name = name; 377 } 378 379 @Override 380 public String getID() 381 { 382 if( id == null ) 383 id = Util.createUniqueID(); 384 385 return id; 386 } 387 388 @Override 389 public String getTags() 390 { 391 return tags; 392 } 393 394 @Override 395 public int getSubmitPriority() 396 { 397 return submitPriority; 398 } 399 400 @Override 401 public void setSubmitPriority( int submitPriority ) 402 { 403 if( submitPriority < 1 || submitPriority > 10 ) 404 throw new IllegalArgumentException( "submitPriority must be between 1 and 10 inclusive, was: " + submitPriority ); 405 406 this.submitPriority = submitPriority; 407 } 408 409 ElementGraph getPipeGraph() 410 { 411 return pipeGraph; 412 } 413 414 FlowStepGraph getFlowStepGraph() 415 { 416 return flowStepGraph; 417 } 418 419 protected void setSources( Map<String, Tap> sources ) 420 { 421 addListeners( sources.values() ); 422 this.sources = sources; 423 } 424 425 protected void setSinks( Map<String, Tap> sinks ) 426 { 427 addListeners( sinks.values() ); 428 this.sinks = sinks; 429 } 430 431 protected void setTraps( Map<String, Tap> traps ) 432 { 433 addListeners( traps.values() ); 434 this.traps = traps; 435 } 436 437 protected void setCheckpoints( Map<String, Tap> checkpoints ) 438 { 439 addListeners( checkpoints.values() ); 440 this.checkpoints = checkpoints; 441 } 442 443 protected void setFlowStepGraph( FlowStepGraph flowStepGraph ) 444 { 445 this.flowStepGraph = flowStepGraph; 446 } 447 448 /** 449 * This method creates a new internal Config with the parentConfig as defaults using the properties to override 450 * the defaults. 451 * 452 * @param properties of type Map 453 * @param parentConfig of type Config 454 */ 455 protected abstract void initConfig( Map<Object, Object> properties, Config parentConfig ); 456 457 public Config createConfig( Map<Object, Object> properties, Config defaultConfig ) 458 { 459 Config config = newConfig( defaultConfig ); 460 461 if( properties == null ) 462 return config; 463 464 Set<Object> keys = new HashSet<Object>( properties.keySet() ); 465 466 // keys will only be grabbed if both key/value are String, so keep orig keys 467 if( properties instanceof Properties ) 468 keys.addAll( ( (Properties) properties ).stringPropertyNames() ); 469 470 for( Object key : keys ) 471 { 472 Object value = properties.get( key ); 473 474 if( value == null && properties instanceof Properties && key instanceof String ) 475 value = ( (Properties) properties ).getProperty( (String) key ); 476 477 if( value == null ) // don't stuff null values 478 continue; 479 480 setConfigProperty( config, key, value ); 481 } 482 483 return config; 484 } 485 486 protected abstract void setConfigProperty( Config config, Object key, Object value ); 487 488 protected abstract Config newConfig( Config defaultConfig ); 489 490 protected void initFromProperties( Map<Object, Object> properties ) 491 { 492 stopJobsOnExit = getStopJobsOnExit( properties ); 493 } 494 495 public FlowSession getFlowSession() 496 { 497 return new FlowSession( getCascadingServices() ); 498 } 499 500 @Override 501 public FlowStats getFlowStats() 502 { 503 return flowStats; 504 } 505 506 @Override 507 public Map<String, String> getFlowDescriptor() 508 { 509 if( flowDescriptor == null ) 510 return Collections.emptyMap(); 511 512 return Collections.unmodifiableMap( flowDescriptor ); 513 } 514 515 @Override 516 public FlowStats getStats() 517 { 518 return getFlowStats(); 519 } 520 521 void addListeners( Collection listeners ) 522 { 523 for( Object listener : listeners ) 524 { 525 if( listener instanceof FlowListener ) 526 addListener( (FlowListener) listener ); 527 } 528 } 529 530 List<SafeFlowListener> getListeners() 531 { 532 if( listeners == null ) 533 listeners = new LinkedList<SafeFlowListener>(); 534 535 return listeners; 536 } 537 538 @Override 539 public boolean hasListeners() 540 { 541 return listeners != null && !listeners.isEmpty(); 542 } 543 544 @Override 545 public void addListener( FlowListener flowListener ) 546 { 547 getListeners().add( new SafeFlowListener( flowListener ) ); 548 } 549 550 @Override 551 public boolean removeListener( FlowListener flowListener ) 552 { 553 return getListeners().remove( new SafeFlowListener( flowListener ) ); 554 } 555 556 @Override 557 public boolean hasStepListeners() 558 { 559 boolean hasStepListeners = false; 560 561 for( FlowStep step : getFlowSteps() ) 562 hasStepListeners |= step.hasListeners(); 563 564 return hasStepListeners; 565 } 566 567 @Override 568 public void addStepListener( FlowStepListener flowStepListener ) 569 { 570 for( FlowStep step : getFlowSteps() ) 571 step.addListener( flowStepListener ); 572 } 573 574 @Override 575 public boolean removeStepListener( FlowStepListener flowStepListener ) 576 { 577 boolean listenerRemoved = true; 578 579 for( FlowStep step : getFlowSteps() ) 580 listenerRemoved &= step.removeListener( flowStepListener ); 581 582 return listenerRemoved; 583 } 584 585 @Override 586 public Map<String, Tap> getSources() 587 { 588 return Collections.unmodifiableMap( sources ); 589 } 590 591 @Override 592 public List<String> getSourceNames() 593 { 594 return new ArrayList<String>( sources.keySet() ); 595 } 596 597 @Override 598 public Tap getSource( String name ) 599 { 600 return sources.get( name ); 601 } 602 603 @Override 604 @DependencyIncoming 605 public Collection<Tap> getSourcesCollection() 606 { 607 return getSources().values(); 608 } 609 610 @Override 611 public Map<String, Tap> getSinks() 612 { 613 return Collections.unmodifiableMap( sinks ); 614 } 615 616 @Override 617 public List<String> getSinkNames() 618 { 619 return new ArrayList<String>( sinks.keySet() ); 620 } 621 622 @Override 623 public Tap getSink( String name ) 624 { 625 return sinks.get( name ); 626 } 627 628 @Override 629 @DependencyOutgoing 630 public Collection<Tap> getSinksCollection() 631 { 632 return getSinks().values(); 633 } 634 635 @Override 636 public Tap getSink() 637 { 638 return sinks.values().iterator().next(); 639 } 640 641 @Override 642 public Map<String, Tap> getTraps() 643 { 644 return Collections.unmodifiableMap( traps ); 645 } 646 647 @Override 648 public List<String> getTrapNames() 649 { 650 return new ArrayList<String>( traps.keySet() ); 651 } 652 653 @Override 654 public Collection<Tap> getTrapsCollection() 655 { 656 return getTraps().values(); 657 } 658 659 @Override 660 public Map<String, Tap> getCheckpoints() 661 { 662 return Collections.unmodifiableMap( checkpoints ); 663 } 664 665 @Override 666 public List<String> getCheckpointNames() 667 { 668 return new ArrayList<String>( checkpoints.keySet() ); 669 } 670 671 @Override 672 public Collection<Tap> getCheckpointsCollection() 673 { 674 return getCheckpoints().values(); 675 } 676 677 @Override 678 public boolean isStopJobsOnExit() 679 { 680 return stopJobsOnExit; 681 } 682 683 @Override 684 public FlowSkipStrategy getFlowSkipStrategy() 685 { 686 return flowSkipStrategy; 687 } 688 689 @Override 690 public FlowSkipStrategy setFlowSkipStrategy( FlowSkipStrategy flowSkipStrategy ) 691 { 692 if( flowSkipStrategy == null ) 693 throw new IllegalArgumentException( "flowSkipStrategy may not be null" ); 694 695 try 696 { 697 return this.flowSkipStrategy; 698 } 699 finally 700 { 701 this.flowSkipStrategy = flowSkipStrategy; 702 } 703 } 704 705 @Override 706 public boolean isSkipFlow() throws IOException 707 { 708 return flowSkipStrategy.skipFlow( this ); 709 } 710 711 @Override 712 public boolean areSinksStale() throws IOException 713 { 714 return areSourcesNewer( getSinkModified() ); 715 } 716 717 @Override 718 public boolean areSourcesNewer( long sinkModified ) throws IOException 719 { 720 Config confCopy = getConfigCopy(); 721 Iterator<Tap> values = sources.values().iterator(); 722 723 long sourceModified = 0; 724 725 try 726 { 727 sourceModified = Util.getSourceModified( confCopy, values, sinkModified ); 728 729 if( sinkModified < sourceModified ) 730 return true; 731 732 return false; 733 } 734 finally 735 { 736 if( LOG.isInfoEnabled() ) 737 logInfo( "source modification date at: " + new Date( sourceModified ) ); // not oldest, we didnt check them all 738 } 739 } 740 741 @Override 742 public long getSinkModified() throws IOException 743 { 744 long sinkModified = Util.getSinkModified( getConfigCopy(), sinks.values() ); 745 746 if( LOG.isInfoEnabled() ) 747 { 748 if( sinkModified == -1L ) 749 logInfo( "at least one sink is marked for delete" ); 750 if( sinkModified == 0L ) 751 logInfo( "at least one sink does not exist" ); 752 else 753 logInfo( "sink oldest modified date: " + new Date( sinkModified ) ); 754 } 755 756 return sinkModified; 757 } 758 759 @Override 760 public FlowStepStrategy getFlowStepStrategy() 761 { 762 return flowStepStrategy; 763 } 764 765 @Override 766 public void setFlowStepStrategy( FlowStepStrategy flowStepStrategy ) 767 { 768 this.flowStepStrategy = flowStepStrategy; 769 } 770 771 @Override 772 public List<FlowStep<Config>> getFlowSteps() 773 { 774 if( steps != null ) 775 return steps; 776 777 if( flowStepGraph == null ) 778 return Collections.EMPTY_LIST; 779 780 TopologicalOrderIterator<FlowStep<Config>, Integer> topoIterator = flowStepGraph.getTopologicalIterator(); 781 782 steps = new ArrayList<FlowStep<Config>>(); 783 784 while( topoIterator.hasNext() ) 785 steps.add( topoIterator.next() ); 786 787 return steps; 788 } 789 790 @Override 791 @ProcessPrepare 792 public void prepare() 793 { 794 try 795 { 796 deleteSinksIfNotUpdate(); 797 deleteTrapsIfNotUpdate(); 798 deleteCheckpointsIfNotUpdate(); 799 } 800 catch( IOException exception ) 801 { 802 throw new FlowException( "unable to prepare flow", exception ); 803 } 804 } 805 806 @Override 807 @ProcessStart 808 public synchronized void start() 809 { 810 if( thread != null ) 811 return; 812 813 if( stop ) 814 return; 815 816 registerShutdownHook(); 817 818 internalStart(); 819 820 String threadName = ( "flow " + Util.toNull( getName() ) ).trim(); 821 822 thread = createFlowThread( threadName ); 823 824 thread.start(); 825 } 826 827 protected Thread createFlowThread( String threadName ) 828 { 829 return new Thread( new Runnable() 830 { 831 @Override 832 public void run() 833 { 834 BaseFlow.this.run(); 835 } 836 }, threadName ); 837 } 838 839 protected abstract void internalStart(); 840 841 @Override 842 @ProcessStop 843 public synchronized void stop() 844 { 845 stopLock.lock(); 846 847 try 848 { 849 if( stop ) 850 return; 851 852 stop = true; 853 854 fireOnStopping(); 855 856 if( !flowStats.isFinished() ) 857 flowStats.markStopped(); 858 859 internalStopAllJobs(); 860 861 handleExecutorShutdown(); 862 863 internalClean( true ); 864 } 865 finally 866 { 867 flowStats.cleanup(); 868 stopLock.unlock(); 869 } 870 } 871 872 protected abstract void internalClean( boolean stop ); 873 874 @Override 875 @ProcessComplete 876 public void complete() 877 { 878 start(); 879 880 try 881 { 882 try 883 { 884 synchronized( this ) // prevent NPE on quick stop() & complete() after start() 885 { 886 while( thread == null && !stop ) 887 Util.safeSleep( 10 ); 888 } 889 890 if( thread != null ) 891 thread.join(); 892 } 893 catch( InterruptedException exception ) 894 { 895 throw new FlowException( getName(), "thread interrupted", exception ); 896 } 897 898 // if in #stop and stopping, lets wait till its done in this thread 899 try 900 { 901 stopLock.lock(); 902 } 903 finally 904 { 905 stopLock.unlock(); 906 } 907 908 if( throwable instanceof FlowException ) 909 ( (FlowException) throwable ).setFlowName( getName() ); 910 911 if( throwable instanceof CascadingException ) 912 throw (CascadingException) throwable; 913 914 if( throwable instanceof OutOfMemoryError ) 915 throw (OutOfMemoryError) throwable; 916 917 if( throwable != null ) 918 throw new FlowException( getName(), "unhandled exception", throwable ); 919 920 if( hasListeners() ) 921 { 922 for( SafeFlowListener safeFlowListener : getListeners() ) 923 { 924 if( safeFlowListener.throwable != null ) 925 throw new FlowException( getName(), "unhandled listener exception", throwable ); 926 } 927 } 928 } 929 finally 930 { 931 thread = null; 932 throwable = null; 933 934 try 935 { 936 commitTraps(); 937 938 if( hasListeners() ) 939 { 940 for( SafeFlowListener safeFlowListener : getListeners() ) 941 safeFlowListener.throwable = null; 942 } 943 } 944 finally 945 { 946 flowStats.cleanup(); 947 } 948 } 949 } 950 951 private void commitTraps() 952 { 953 // commit all the traps, don't fail on an error 954 955 for( Tap tap : traps.values() ) 956 { 957 try 958 { 959 if( !tap.commitResource( getConfig() ) ) 960 logError( "unable to commit trap: " + tap.getFullIdentifier( getConfig() ), null ); 961 } 962 catch( IOException exception ) 963 { 964 logError( "unable to commit trap: " + tap.getFullIdentifier( getConfig() ), exception ); 965 } 966 } 967 } 968 969 @Override 970 @ProcessCleanup 971 public void cleanup() 972 { 973 // do nothing 974 } 975 976 @Override 977 public TupleEntryIterator openSource() throws IOException 978 { 979 return sources.values().iterator().next().openForRead( getFlowProcess() ); 980 } 981 982 @Override 983 public TupleEntryIterator openSource( String name ) throws IOException 984 { 985 if( !sources.containsKey( name ) ) 986 throw new IllegalArgumentException( "source does not exist: " + name ); 987 988 return sources.get( name ).openForRead( getFlowProcess() ); 989 } 990 991 @Override 992 public TupleEntryIterator openSink() throws IOException 993 { 994 return sinks.values().iterator().next().openForRead( getFlowProcess() ); 995 } 996 997 @Override 998 public TupleEntryIterator openSink( String name ) throws IOException 999 { 1000 if( !sinks.containsKey( name ) ) 1001 throw new IllegalArgumentException( "sink does not exist: " + name ); 1002 1003 return sinks.get( name ).openForRead( getFlowProcess() ); 1004 } 1005 1006 @Override 1007 public TupleEntryIterator openTrap() throws IOException 1008 { 1009 return traps.values().iterator().next().openForRead( getFlowProcess() ); 1010 } 1011 1012 @Override 1013 public TupleEntryIterator openTrap( String name ) throws IOException 1014 { 1015 if( !traps.containsKey( name ) ) 1016 throw new IllegalArgumentException( "trap does not exist: " + name ); 1017 1018 return traps.get( name ).openForRead( getFlowProcess() ); 1019 } 1020 1021 /** 1022 * Method deleteSinks deletes all sinks, whether or not they are configured for {@link cascading.tap.SinkMode#UPDATE}. 1023 * <p/> 1024 * Use with caution. 1025 * 1026 * @throws IOException when 1027 * @see BaseFlow#deleteSinksIfNotUpdate() 1028 */ 1029 public void deleteSinks() throws IOException 1030 { 1031 for( Tap tap : sinks.values() ) 1032 deleteOrFail( tap ); 1033 } 1034 1035 private void deleteOrFail( Tap tap ) throws IOException 1036 { 1037 if( !tap.resourceExists( getConfig() ) ) 1038 return; 1039 1040 if( !tap.deleteResource( getConfig() ) ) 1041 throw new FlowException( "unable to delete resource: " + tap.getFullIdentifier( getFlowProcess() ) ); 1042 } 1043 1044 /** 1045 * Method deleteSinksIfNotUpdate deletes all sinks if they are not configured with the {@link cascading.tap.SinkMode#UPDATE} flag. 1046 * <p/> 1047 * Typically used by a {@link Cascade} before executing the flow if the sinks are stale. 1048 * <p/> 1049 * Use with caution. 1050 * 1051 * @throws IOException when 1052 */ 1053 public void deleteSinksIfNotUpdate() throws IOException 1054 { 1055 for( Tap tap : sinks.values() ) 1056 { 1057 if( !tap.isUpdate() ) 1058 deleteOrFail( tap ); 1059 } 1060 } 1061 1062 public void deleteSinksIfReplace() throws IOException 1063 { 1064 for( Tap tap : sinks.values() ) 1065 { 1066 if( tap.isReplace() ) 1067 deleteOrFail( tap ); 1068 } 1069 } 1070 1071 public void deleteTrapsIfNotUpdate() throws IOException 1072 { 1073 for( Tap tap : traps.values() ) 1074 { 1075 if( !tap.isUpdate() ) 1076 deleteOrFail( tap ); 1077 } 1078 } 1079 1080 public void deleteCheckpointsIfNotUpdate() throws IOException 1081 { 1082 for( Tap tap : checkpoints.values() ) 1083 { 1084 if( !tap.isUpdate() ) 1085 deleteOrFail( tap ); 1086 } 1087 } 1088 1089 public void deleteTrapsIfReplace() throws IOException 1090 { 1091 for( Tap tap : traps.values() ) 1092 { 1093 if( tap.isReplace() ) 1094 deleteOrFail( tap ); 1095 } 1096 } 1097 1098 public void deleteCheckpointsIfReplace() throws IOException 1099 { 1100 for( Tap tap : checkpoints.values() ) 1101 { 1102 if( tap.isReplace() ) 1103 deleteOrFail( tap ); 1104 } 1105 } 1106 1107 @Override 1108 public boolean resourceExists( Tap tap ) throws IOException 1109 { 1110 return tap.resourceExists( getConfig() ); 1111 } 1112 1113 @Override 1114 public TupleEntryIterator openTapForRead( Tap tap ) throws IOException 1115 { 1116 return tap.openForRead( getFlowProcess() ); 1117 } 1118 1119 @Override 1120 public TupleEntryCollector openTapForWrite( Tap tap ) throws IOException 1121 { 1122 return tap.openForWrite( getFlowProcess() ); 1123 } 1124 1125 /** Method run implements the Runnable run method and should not be called by users. */ 1126 private void run() 1127 { 1128 if( thread == null ) 1129 throw new IllegalStateException( "to start a Flow call start() or complete(), not Runnable#run()" ); 1130 1131 Version.printBanner(); 1132 Update.checkForUpdate( getPlatformInfo() ); 1133 1134 try 1135 { 1136 if( stop ) 1137 return; 1138 1139 flowStats.markStarted(); 1140 1141 fireOnStarting(); 1142 1143 if( LOG.isInfoEnabled() ) 1144 { 1145 logInfo( "starting" ); 1146 1147 for( Tap source : getSourcesCollection() ) 1148 logInfo( " source: " + source ); 1149 for( Tap sink : getSinksCollection() ) 1150 logInfo( " sink: " + sink ); 1151 } 1152 1153 // if jobs are run local, then only use one thread to force execution serially 1154 //int numThreads = jobsAreLocal() ? 1 : getMaxConcurrentSteps( getJobConf() ); 1155 int numThreads = getMaxNumParallelSteps(); 1156 1157 if( numThreads == 0 ) 1158 numThreads = jobsMap.size(); 1159 1160 if( numThreads == 0 ) 1161 throw new IllegalStateException( "no jobs rendered for flow: " + getName() ); 1162 1163 if( LOG.isInfoEnabled() ) 1164 { 1165 logInfo( " parallel execution is enabled: " + ( getMaxNumParallelSteps() != 1 ) ); 1166 logInfo( " starting jobs: " + jobsMap.size() ); 1167 logInfo( " allocating threads: " + numThreads ); 1168 } 1169 1170 List<Future<Throwable>> futures = spawnJobs( numThreads ); 1171 1172 for( Future<Throwable> future : futures ) 1173 { 1174 throwable = future.get(); 1175 1176 if( throwable != null ) 1177 { 1178 if( !stop ) 1179 internalStopAllJobs(); 1180 1181 handleExecutorShutdown(); 1182 break; 1183 } 1184 } 1185 } 1186 catch( Throwable throwable ) 1187 { 1188 this.throwable = throwable; 1189 } 1190 finally 1191 { 1192 handleThrowableAndMarkFailed(); 1193 1194 if( !stop && !flowStats.isFinished() ) 1195 flowStats.markSuccessful(); 1196 1197 internalClean( stop ); // cleaning temp taps may be determined by success/failure 1198 1199 try 1200 { 1201 fireOnCompleted(); 1202 } 1203 finally 1204 { 1205 flowStats.cleanup(); 1206 internalShutdown(); 1207 deregisterShutdownHook(); 1208 } 1209 } 1210 } 1211 1212 protected abstract int getMaxNumParallelSteps(); 1213 1214 protected abstract void internalShutdown(); 1215 1216 private List<Future<Throwable>> spawnJobs( int numThreads ) throws InterruptedException 1217 { 1218 if( stop ) 1219 return new ArrayList<Future<Throwable>>(); 1220 1221 List<Callable<Throwable>> list = new ArrayList<Callable<Throwable>>(); 1222 1223 for( FlowStepJob<Config> job : jobsMap.values() ) 1224 list.add( job ); 1225 1226 return spawnStrategy.start( this, numThreads, list ); 1227 } 1228 1229 private void handleThrowableAndMarkFailed() 1230 { 1231 if( throwable != null && !stop ) 1232 { 1233 flowStats.markFailed( throwable ); 1234 1235 fireOnThrowable(); 1236 } 1237 } 1238 1239 Map<String, FlowStepJob<Config>> getJobsMap() 1240 { 1241 return jobsMap; 1242 } 1243 1244 protected void initializeNewJobsMap() 1245 { 1246 // keep topo order 1247 jobsMap = new LinkedHashMap<String, FlowStepJob<Config>>(); 1248 TopologicalOrderIterator<FlowStep<Config>, Integer> topoIterator = flowStepGraph.getTopologicalIterator(); 1249 1250 while( topoIterator.hasNext() ) 1251 { 1252 BaseFlowStep<Config> step = (BaseFlowStep<Config>) topoIterator.next(); 1253 FlowStepJob<Config> flowStepJob = step.getFlowStepJob( getFlowProcess(), getConfig() ); 1254 1255 jobsMap.put( step.getName(), flowStepJob ); 1256 1257 List<FlowStepJob<Config>> predecessors = new ArrayList<FlowStepJob<Config>>(); 1258 1259 for( Object flowStep : predecessorListOf( flowStepGraph, step ) ) 1260 predecessors.add( jobsMap.get( ( (FlowStep<Config>) flowStep ).getName() ) ); 1261 1262 flowStepJob.setPredecessors( predecessors ); 1263 1264 flowStats.addStepStats( flowStepJob.getStepStats() ); 1265 } 1266 } 1267 1268 protected void internalStopAllJobs() 1269 { 1270 logInfo( "stopping all jobs" ); 1271 1272 try 1273 { 1274 if( jobsMap == null ) 1275 return; 1276 1277 List<FlowStepJob<Config>> jobs = new ArrayList<FlowStepJob<Config>>( jobsMap.values() ); 1278 1279 Collections.reverse( jobs ); 1280 1281 for( FlowStepJob<Config> job : jobs ) 1282 job.stop(); 1283 } 1284 finally 1285 { 1286 logInfo( "stopped all jobs" ); 1287 } 1288 } 1289 1290 protected void handleExecutorShutdown() 1291 { 1292 if( spawnStrategy.isCompleted( this ) ) 1293 return; 1294 1295 logInfo( "shutting down job executor" ); 1296 1297 try 1298 { 1299 spawnStrategy.complete( this, 5 * 60, TimeUnit.SECONDS ); 1300 } 1301 catch( InterruptedException exception ) 1302 { 1303 // ignore 1304 } 1305 1306 logInfo( "shutdown complete" ); 1307 } 1308 1309 protected void fireOnCompleted() 1310 { 1311 if( hasListeners() ) 1312 { 1313 if( LOG.isDebugEnabled() ) 1314 logDebug( "firing onCompleted event: " + getListeners().size() ); 1315 1316 for( FlowListener flowListener : getListeners() ) 1317 flowListener.onCompleted( this ); 1318 } 1319 } 1320 1321 protected void fireOnThrowable() 1322 { 1323 if( hasListeners() ) 1324 { 1325 if( LOG.isDebugEnabled() ) 1326 logDebug( "firing onThrowable event: " + getListeners().size() ); 1327 1328 boolean isHandled = false; 1329 1330 for( FlowListener flowListener : getListeners() ) 1331 isHandled = flowListener.onThrowable( this, throwable ) || isHandled; 1332 1333 if( isHandled ) 1334 throwable = null; 1335 } 1336 } 1337 1338 protected void fireOnStopping() 1339 { 1340 if( hasListeners() ) 1341 { 1342 if( LOG.isDebugEnabled() ) 1343 logDebug( "firing onStopping event: " + getListeners().size() ); 1344 1345 for( FlowListener flowListener : getListeners() ) 1346 flowListener.onStopping( this ); 1347 } 1348 } 1349 1350 protected void fireOnStarting() 1351 { 1352 if( hasListeners() ) 1353 { 1354 if( LOG.isDebugEnabled() ) 1355 logDebug( "firing onStarting event: " + getListeners().size() ); 1356 1357 for( FlowListener flowListener : getListeners() ) 1358 flowListener.onStarting( this ); 1359 } 1360 } 1361 1362 @Override 1363 public String toString() 1364 { 1365 StringBuffer buffer = new StringBuffer(); 1366 1367 if( getName() != null ) 1368 buffer.append( getName() ).append( ": " ); 1369 1370 for( FlowStep step : getFlowSteps() ) 1371 buffer.append( step ); 1372 1373 return buffer.toString(); 1374 } 1375 1376 protected void logInfo( String message ) 1377 { 1378 LOG.info( "[" + Util.truncate( getName(), 25 ) + "] " + message ); 1379 } 1380 1381 private void logDebug( String message ) 1382 { 1383 LOG.debug( "[" + Util.truncate( getName(), 25 ) + "] " + message ); 1384 } 1385 1386 private void logWarn( String message, Throwable throwable ) 1387 { 1388 LOG.warn( "[" + Util.truncate( getName(), 25 ) + "] " + message, throwable ); 1389 } 1390 1391 private void logError( String message, Throwable throwable ) 1392 { 1393 LOG.error( "[" + Util.truncate( getName(), 25 ) + "] " + message, throwable ); 1394 } 1395 1396 @Override 1397 public void writeDOT( String filename ) 1398 { 1399 if( pipeGraph == null ) 1400 throw new UnsupportedOperationException( "this flow instance cannot write a DOT file" ); 1401 1402 pipeGraph.writeDOT( filename ); 1403 } 1404 1405 @Override 1406 public void writeStepsDOT( String filename ) 1407 { 1408 if( flowStepGraph == null ) 1409 throw new UnsupportedOperationException( "this flow instance cannot write a DOT file" ); 1410 1411 flowStepGraph.writeDOT( filename ); 1412 } 1413 1414 /** 1415 * Used to return a simple wrapper for use as an edge in a graph where there can only be 1416 * one instance of every edge. 1417 * 1418 * @return FlowHolder 1419 */ 1420 public FlowHolder getHolder() 1421 { 1422 return new FlowHolder( this ); 1423 } 1424 1425 public void setCascade( Cascade cascade ) 1426 { 1427 setConfigProperty( getConfig(), "cascading.cascade.id", cascade.getID() ); 1428 flowStats.recordInfo(); 1429 } 1430 1431 @Override 1432 public String getCascadeID() 1433 { 1434 return getProperty( "cascading.cascade.id" ); 1435 } 1436 1437 @Override 1438 public String getRunID() 1439 { 1440 return runID; 1441 } 1442 1443 protected List<String> getClassPath() 1444 { 1445 return classPath; 1446 } 1447 1448 @Override 1449 public void setSpawnStrategy( UnitOfWorkSpawnStrategy spawnStrategy ) 1450 { 1451 this.spawnStrategy = spawnStrategy; 1452 } 1453 1454 @Override 1455 public UnitOfWorkSpawnStrategy getSpawnStrategy() 1456 { 1457 return spawnStrategy; 1458 } 1459 1460 protected void registerShutdownHook() 1461 { 1462 if( !isStopJobsOnExit() ) 1463 return; 1464 1465 shutdownHook = new ShutdownUtil.Hook() 1466 { 1467 @Override 1468 public Priority priority() 1469 { 1470 return Priority.WORK_CHILD; 1471 } 1472 1473 @Override 1474 public void execute() 1475 { 1476 logInfo( "shutdown hook calling stop on flow" ); 1477 1478 BaseFlow.this.stop(); 1479 } 1480 }; 1481 1482 ShutdownUtil.addHook( shutdownHook ); 1483 } 1484 1485 private void deregisterShutdownHook() 1486 { 1487 if( !isStopJobsOnExit() || stop ) 1488 return; 1489 1490 ShutdownUtil.removeHook( shutdownHook ); 1491 } 1492 1493 /** Class FlowHolder is a helper class for wrapping Flow instances. */ 1494 public static class FlowHolder 1495 { 1496 /** Field flow */ 1497 public Flow flow; 1498 1499 public FlowHolder() 1500 { 1501 } 1502 1503 public FlowHolder( Flow flow ) 1504 { 1505 this.flow = flow; 1506 } 1507 } 1508 1509 /** 1510 * Class SafeFlowListener safely calls a wrapped FlowListener. 1511 * <p/> 1512 * This is done for a few reasons, the primary reason is so exceptions thrown by the Listener 1513 * can be caught by the calling Thread. Since Flow is asynchronous, much of the work is done in the run() method 1514 * which in turn is run in a new Thread. 1515 */ 1516 private class SafeFlowListener implements FlowListener 1517 { 1518 /** Field flowListener */ 1519 final FlowListener flowListener; 1520 /** Field throwable */ 1521 Throwable throwable; 1522 1523 private SafeFlowListener( FlowListener flowListener ) 1524 { 1525 this.flowListener = flowListener; 1526 } 1527 1528 public void onStarting( Flow flow ) 1529 { 1530 try 1531 { 1532 flowListener.onStarting( flow ); 1533 } 1534 catch( Throwable throwable ) 1535 { 1536 handleThrowable( throwable ); 1537 } 1538 } 1539 1540 public void onStopping( Flow flow ) 1541 { 1542 try 1543 { 1544 flowListener.onStopping( flow ); 1545 } 1546 catch( Throwable throwable ) 1547 { 1548 handleThrowable( throwable ); 1549 } 1550 } 1551 1552 public void onCompleted( Flow flow ) 1553 { 1554 try 1555 { 1556 flowListener.onCompleted( flow ); 1557 } 1558 catch( Throwable throwable ) 1559 { 1560 handleThrowable( throwable ); 1561 } 1562 } 1563 1564 public boolean onThrowable( Flow flow, Throwable flowThrowable ) 1565 { 1566 try 1567 { 1568 return flowListener.onThrowable( flow, flowThrowable ); 1569 } 1570 catch( Throwable throwable ) 1571 { 1572 handleThrowable( throwable ); 1573 } 1574 1575 return false; 1576 } 1577 1578 private void handleThrowable( Throwable throwable ) 1579 { 1580 this.throwable = throwable; 1581 1582 logWarn( String.format( "flow listener %s threw throwable", flowListener ), throwable ); 1583 1584 // stop this flow 1585 stop(); 1586 } 1587 1588 public boolean equals( Object object ) 1589 { 1590 if( object instanceof BaseFlow.SafeFlowListener ) 1591 return flowListener.equals( ( (BaseFlow.SafeFlowListener) object ).flowListener ); 1592 1593 return flowListener.equals( object ); 1594 } 1595 1596 public int hashCode() 1597 { 1598 return flowListener.hashCode(); 1599 } 1600 } 1601 }