001 /* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.flow; 022 023 import java.io.IOException; 024 import java.util.ArrayList; 025 import java.util.Collection; 026 import java.util.Collections; 027 import java.util.Date; 028 import java.util.HashMap; 029 import java.util.HashSet; 030 import java.util.Iterator; 031 import java.util.LinkedHashMap; 032 import java.util.LinkedList; 033 import java.util.List; 034 import java.util.Map; 035 import java.util.Properties; 036 import java.util.Set; 037 import java.util.concurrent.Callable; 038 import java.util.concurrent.Future; 039 import java.util.concurrent.TimeUnit; 040 import java.util.concurrent.locks.ReentrantLock; 041 042 import cascading.CascadingException; 043 import cascading.cascade.Cascade; 044 import cascading.flow.planner.BaseFlowStep; 045 import cascading.flow.planner.ElementGraph; 046 import cascading.flow.planner.FlowStepGraph; 047 import cascading.flow.planner.FlowStepJob; 048 import cascading.flow.planner.PlatformInfo; 049 import cascading.management.CascadingServices; 050 import cascading.management.UnitOfWorkExecutorStrategy; 051 import cascading.management.UnitOfWorkSpawnStrategy; 052 import cascading.management.state.ClientState; 053 import cascading.property.AppProps; 054 import cascading.property.PropertyUtil; 055 import cascading.stats.FlowStats; 056 import cascading.tap.Tap; 057 import cascading.tuple.Fields; 058 import cascading.tuple.TupleEntryCollector; 059 import cascading.tuple.TupleEntryIterator; 060 import cascading.util.ShutdownUtil; 061 import cascading.util.Update; 062 import cascading.util.Util; 063 import cascading.util.Version; 064 import org.jgrapht.traverse.TopologicalOrderIterator; 065 import org.slf4j.Logger; 066 import org.slf4j.LoggerFactory; 067 import riffle.process.DependencyIncoming; 068 import riffle.process.DependencyOutgoing; 069 import riffle.process.ProcessCleanup; 070 import riffle.process.ProcessComplete; 071 import riffle.process.ProcessPrepare; 072 import riffle.process.ProcessStart; 073 import riffle.process.ProcessStop; 074 075 import static org.jgrapht.Graphs.predecessorListOf; 076 077 @riffle.process.Process 078 public abstract class BaseFlow<Config> implements Flow<Config> 079 { 080 /** Field LOG */ 081 private static final Logger LOG = LoggerFactory.getLogger( Flow.class ); 082 083 private PlatformInfo platformInfo; 084 085 /** Field id */ 086 private String id; 087 /** Field name */ 088 private String name; 089 /** Fields runID */ 090 private String runID; 091 /** Fields classpath */ 092 private List<String> classPath; // may remain null 093 /** Field tags */ 094 private String tags; 095 /** Field listeners */ 096 private List<SafeFlowListener> listeners; 097 /** Field skipStrategy */ 098 private FlowSkipStrategy flowSkipStrategy = new FlowSkipIfSinkNotStale(); 099 /** Field flowStats */ 100 protected FlowStats flowStats; // don't use a listener to set values 101 /** Field sources */ 102 protected Map<String, Tap> sources = Collections.EMPTY_MAP; 103 /** Field sinks */ 104 protected Map<String, Tap> sinks = Collections.EMPTY_MAP; 105 /** Field traps */ 106 private Map<String, Tap> traps = Collections.EMPTY_MAP; 107 /** Field checkpoints */ 108 private Map<String, Tap> checkpoints = Collections.EMPTY_MAP; 109 /** Field stopJobsOnExit */ 110 protected boolean stopJobsOnExit = true; 111 /** Field submitPriority */ 112 private int submitPriority = 5; 113 114 /** Field stepGraph */ 115 private FlowStepGraph<Config> flowStepGraph; 116 /** Field thread */ 117 protected transient Thread thread; 118 /** Field throwable */ 119 private Throwable throwable; 120 /** Field stop */ 121 protected boolean stop; 122 123 /** Field pipeGraph */ 124 private ElementGraph pipeGraph; // only used for documentation purposes 125 126 private transient CascadingServices cascadingServices; 127 128 private FlowStepStrategy<Config> flowStepStrategy = null; 129 /** Field steps */ 130 private transient List<FlowStep<Config>> steps; 131 /** Field jobsMap */ 132 private transient Map<String, FlowStepJob<Config>> jobsMap; 133 private transient UnitOfWorkSpawnStrategy spawnStrategy = new UnitOfWorkExecutorStrategy(); 134 135 private transient ReentrantLock stopLock = new ReentrantLock( true ); 136 protected ShutdownUtil.Hook shutdownHook; 137 138 private HashMap<String, String> flowDescriptor; 139 140 /** 141 * Returns property stopJobsOnExit. 142 * 143 * @param properties of type Map 144 * @return a boolean 145 */ 146 static boolean getStopJobsOnExit( Map<Object, Object> properties ) 147 { 148 return Boolean.parseBoolean( PropertyUtil.getProperty( properties, FlowProps.STOP_JOBS_ON_EXIT, "true" ) ); 149 } 150 151 /** Used for testing. */ 152 protected BaseFlow() 153 { 154 this.name = "NA"; 155 this.flowStats = createPrepareFlowStats(); 156 } 157 158 protected BaseFlow( PlatformInfo platformInfo, Map<Object, Object> properties, Config defaultConfig, String name ) 159 { 160 this( platformInfo, properties, defaultConfig, name, new LinkedHashMap<String, String>() ); 161 } 162 163 protected BaseFlow( PlatformInfo platformInfo, Map<Object, Object> properties, Config defaultConfig, String name, Map<String, String> flowDescriptor ) 164 { 165 this.platformInfo = platformInfo; 166 this.name = name; 167 168 if( flowDescriptor != null ) 169 this.flowDescriptor = new LinkedHashMap<String, String>( flowDescriptor ); 170 171 addSessionProperties( properties ); 172 initConfig( properties, defaultConfig ); 173 174 this.flowStats = createPrepareFlowStats(); // must be last 175 } 176 177 protected BaseFlow( PlatformInfo platformInfo, Map<Object, Object> properties, Config defaultConfig, FlowDef flowDef ) 178 { 179 this.platformInfo = platformInfo; 180 this.name = flowDef.getName(); 181 this.tags = flowDef.getTags(); 182 this.runID = flowDef.getRunID(); 183 this.classPath = flowDef.getClassPath(); 184 185 if( !flowDef.getFlowDescriptor().isEmpty() ) 186 this.flowDescriptor = new LinkedHashMap<String, String>( flowDef.getFlowDescriptor() ); 187 188 addSessionProperties( properties ); 189 initConfig( properties, defaultConfig ); 190 setSources( flowDef.getSourcesCopy() ); 191 setSinks( flowDef.getSinksCopy() ); 192 setTraps( flowDef.getTrapsCopy() ); 193 setCheckpoints( flowDef.getCheckpointsCopy() ); 194 initFromTaps(); 195 196 retrieveSourceFields(); 197 retrieveSinkFields(); 198 } 199 200 public PlatformInfo getPlatformInfo() 201 { 202 return platformInfo; 203 } 204 205 public void initialize( ElementGraph pipeGraph, FlowStepGraph<Config> flowStepGraph ) 206 { 207 this.pipeGraph = pipeGraph; 208 this.flowStepGraph = flowStepGraph; 209 210 initSteps(); 211 212 this.flowStats = createPrepareFlowStats(); // must be last 213 214 initializeNewJobsMap(); 215 } 216 217 public ElementGraph updateSchemes( ElementGraph pipeGraph ) 218 { 219 presentSourceFields( pipeGraph ); 220 221 presentSinkFields( pipeGraph ); 222 223 return new ElementGraph( pipeGraph ); 224 } 225 226 /** Force a Scheme to fetch any fields from a meta-data store */ 227 protected void retrieveSourceFields() 228 { 229 for( Tap tap : sources.values() ) 230 tap.retrieveSourceFields( getFlowProcess() ); 231 } 232 233 /** 234 * Present the current resolved fields for the Tap 235 * 236 * @param pipeGraph 237 */ 238 protected void presentSourceFields( ElementGraph pipeGraph ) 239 { 240 for( Tap tap : sources.values() ) 241 { 242 if( pipeGraph.containsVertex( tap ) ) 243 tap.presentSourceFields( getFlowProcess(), getFieldsFor( pipeGraph, tap ) ); 244 } 245 246 for( Tap tap : checkpoints.values() ) 247 { 248 if( pipeGraph.containsVertex( tap ) ) 249 tap.presentSourceFields( getFlowProcess(), getFieldsFor( pipeGraph, tap ) ); 250 } 251 } 252 253 /** Force a Scheme to fetch any fields from a meta-data store */ 254 protected void retrieveSinkFields() 255 { 256 for( Tap tap : sinks.values() ) 257 tap.retrieveSinkFields( getFlowProcess() ); 258 } 259 260 /** 261 * Present the current resolved fields for the Tap 262 * 263 * @param pipeGraph 264 */ 265 protected void presentSinkFields( ElementGraph pipeGraph ) 266 { 267 for( Tap tap : sinks.values() ) 268 { 269 if( pipeGraph.containsVertex( tap ) ) 270 tap.presentSinkFields( getFlowProcess(), getFieldsFor( pipeGraph, tap ) ); 271 } 272 273 for( Tap tap : checkpoints.values() ) 274 { 275 if( pipeGraph.containsVertex( tap ) ) 276 tap.presentSinkFields( getFlowProcess(), getFieldsFor( pipeGraph, tap ) ); 277 } 278 } 279 280 protected Fields getFieldsFor( ElementGraph pipeGraph, Tap tap ) 281 { 282 return pipeGraph.outgoingEdgesOf( tap ).iterator().next().getOutValuesFields(); 283 } 284 285 private void addSessionProperties( Map<Object, Object> properties ) 286 { 287 if( properties == null ) 288 return; 289 290 PropertyUtil.setProperty( properties, CASCADING_FLOW_ID, getID() ); 291 PropertyUtil.setProperty( properties, "cascading.flow.tags", getTags() ); 292 AppProps.setApplicationID( properties ); 293 PropertyUtil.setProperty( properties, "cascading.app.name", makeAppName( properties ) ); 294 PropertyUtil.setProperty( properties, "cascading.app.version", makeAppVersion( properties ) ); 295 } 296 297 private String makeAppName( Map<Object, Object> properties ) 298 { 299 if( properties == null ) 300 return null; 301 302 String name = AppProps.getApplicationName( properties ); 303 304 if( name != null ) 305 return name; 306 307 return Util.findName( AppProps.getApplicationJarPath( properties ) ); 308 } 309 310 private String makeAppVersion( Map<Object, Object> properties ) 311 { 312 if( properties == null ) 313 return null; 314 315 String name = AppProps.getApplicationVersion( properties ); 316 317 if( name != null ) 318 return name; 319 320 return Util.findVersion( AppProps.getApplicationJarPath( properties ) ); 321 } 322 323 private FlowStats createPrepareFlowStats() 324 { 325 FlowStats flowStats = new FlowStats( this, getClientState() ); 326 327 flowStats.prepare(); 328 flowStats.markPending(); 329 330 return flowStats; 331 } 332 333 public CascadingServices getCascadingServices() 334 { 335 if( cascadingServices == null ) 336 cascadingServices = new CascadingServices( getConfigAsProperties() ); 337 338 return cascadingServices; 339 } 340 341 private ClientState getClientState() 342 { 343 return getFlowSession().getCascadingServices().createClientState( getID() ); 344 } 345 346 protected void initSteps() 347 { 348 if( flowStepGraph == null ) 349 return; 350 351 for( Object flowStep : flowStepGraph.vertexSet() ) 352 ( (BaseFlowStep<Config>) flowStep ).setFlow( this ); 353 } 354 355 private void initFromTaps() 356 { 357 initFromTaps( sources ); 358 initFromTaps( sinks ); 359 initFromTaps( traps ); 360 } 361 362 private void initFromTaps( Map<String, Tap> taps ) 363 { 364 for( Tap tap : taps.values() ) 365 tap.flowConfInit( this ); 366 } 367 368 @Override 369 public String getName() 370 { 371 return name; 372 } 373 374 protected void setName( String name ) 375 { 376 this.name = name; 377 } 378 379 @Override 380 public String getID() 381 { 382 if( id == null ) 383 id = Util.createUniqueID(); 384 385 return id; 386 } 387 388 @Override 389 public String getTags() 390 { 391 return tags; 392 } 393 394 @Override 395 public int getSubmitPriority() 396 { 397 return submitPriority; 398 } 399 400 @Override 401 public void setSubmitPriority( int submitPriority ) 402 { 403 if( submitPriority < 1 || submitPriority > 10 ) 404 throw new IllegalArgumentException( "submitPriority must be between 1 and 10 inclusive, was: " + submitPriority ); 405 406 this.submitPriority = submitPriority; 407 } 408 409 ElementGraph getPipeGraph() 410 { 411 return pipeGraph; 412 } 413 414 FlowStepGraph getFlowStepGraph() 415 { 416 return flowStepGraph; 417 } 418 419 protected void setSources( Map<String, Tap> sources ) 420 { 421 addListeners( sources.values() ); 422 this.sources = sources; 423 } 424 425 protected void setSinks( Map<String, Tap> sinks ) 426 { 427 addListeners( sinks.values() ); 428 this.sinks = sinks; 429 } 430 431 protected void setTraps( Map<String, Tap> traps ) 432 { 433 addListeners( traps.values() ); 434 this.traps = traps; 435 } 436 437 protected void setCheckpoints( Map<String, Tap> checkpoints ) 438 { 439 addListeners( checkpoints.values() ); 440 this.checkpoints = checkpoints; 441 } 442 443 protected void setFlowStepGraph( FlowStepGraph flowStepGraph ) 444 { 445 this.flowStepGraph = flowStepGraph; 446 } 447 448 /** 449 * This method creates a new internal Config with the parentConfig as defaults using the properties to override 450 * the defaults. 451 * 452 * @param properties of type Map 453 * @param parentConfig of type Config 454 */ 455 protected abstract void initConfig( Map<Object, Object> properties, Config parentConfig ); 456 457 public Config createConfig( Map<Object, Object> properties, Config defaultConfig ) 458 { 459 Config config = newConfig( defaultConfig ); 460 461 if( properties == null ) 462 return config; 463 464 Set<Object> keys = new HashSet<Object>( properties.keySet() ); 465 466 // keys will only be grabbed if both key/value are String, so keep orig keys 467 if( properties instanceof Properties ) 468 keys.addAll( ( (Properties) properties ).stringPropertyNames() ); 469 470 for( Object key : keys ) 471 { 472 Object value = properties.get( key ); 473 474 if( value == null && properties instanceof Properties && key instanceof String ) 475 value = ( (Properties) properties ).getProperty( (String) key ); 476 477 if( value == null ) // don't stuff null values 478 continue; 479 480 setConfigProperty( config, key, value ); 481 } 482 483 return config; 484 } 485 486 protected abstract void setConfigProperty( Config config, Object key, Object value ); 487 488 protected abstract Config newConfig( Config defaultConfig ); 489 490 protected void initFromProperties( Map<Object, Object> properties ) 491 { 492 stopJobsOnExit = getStopJobsOnExit( properties ); 493 } 494 495 public FlowSession getFlowSession() 496 { 497 return new FlowSession( getCascadingServices() ); 498 } 499 500 @Override 501 public FlowStats getFlowStats() 502 { 503 return flowStats; 504 } 505 506 @Override 507 public Map<String, String> getFlowDescriptor() 508 { 509 if( flowDescriptor == null ) 510 return Collections.emptyMap(); 511 512 return Collections.unmodifiableMap( flowDescriptor ); 513 } 514 515 @Override 516 public FlowStats getStats() 517 { 518 return getFlowStats(); 519 } 520 521 void addListeners( Collection listeners ) 522 { 523 for( Object listener : listeners ) 524 { 525 if( listener instanceof FlowListener ) 526 addListener( (FlowListener) listener ); 527 } 528 } 529 530 List<SafeFlowListener> getListeners() 531 { 532 if( listeners == null ) 533 listeners = new LinkedList<SafeFlowListener>(); 534 535 return listeners; 536 } 537 538 @Override 539 public boolean hasListeners() 540 { 541 return listeners != null && !listeners.isEmpty(); 542 } 543 544 @Override 545 public void addListener( FlowListener flowListener ) 546 { 547 getListeners().add( new SafeFlowListener( flowListener ) ); 548 } 549 550 @Override 551 public boolean removeListener( FlowListener flowListener ) 552 { 553 return getListeners().remove( new SafeFlowListener( flowListener ) ); 554 } 555 556 @Override 557 public boolean hasStepListeners() 558 { 559 boolean hasStepListeners = false; 560 561 for( FlowStep step : getFlowSteps() ) 562 hasStepListeners |= step.hasListeners(); 563 564 return hasStepListeners; 565 } 566 567 @Override 568 public void addStepListener( FlowStepListener flowStepListener ) 569 { 570 for( FlowStep step : getFlowSteps() ) 571 step.addListener( flowStepListener ); 572 } 573 574 @Override 575 public boolean removeStepListener( FlowStepListener flowStepListener ) 576 { 577 boolean listenerRemoved = true; 578 579 for( FlowStep step : getFlowSteps() ) 580 listenerRemoved &= step.removeListener( flowStepListener ); 581 582 return listenerRemoved; 583 } 584 585 @Override 586 public Map<String, Tap> getSources() 587 { 588 return Collections.unmodifiableMap( sources ); 589 } 590 591 @Override 592 public List<String> getSourceNames() 593 { 594 return new ArrayList<String>( sources.keySet() ); 595 } 596 597 @Override 598 public Tap getSource( String name ) 599 { 600 return sources.get( name ); 601 } 602 603 @Override 604 @DependencyIncoming 605 public Collection<Tap> getSourcesCollection() 606 { 607 return getSources().values(); 608 } 609 610 @Override 611 public Map<String, Tap> getSinks() 612 { 613 return Collections.unmodifiableMap( sinks ); 614 } 615 616 @Override 617 public List<String> getSinkNames() 618 { 619 return new ArrayList<String>( sinks.keySet() ); 620 } 621 622 @Override 623 public Tap getSink( String name ) 624 { 625 return sinks.get( name ); 626 } 627 628 @Override 629 @DependencyOutgoing 630 public Collection<Tap> getSinksCollection() 631 { 632 return getSinks().values(); 633 } 634 635 @Override 636 public Tap getSink() 637 { 638 return sinks.values().iterator().next(); 639 } 640 641 @Override 642 public Map<String, Tap> getTraps() 643 { 644 return Collections.unmodifiableMap( traps ); 645 } 646 647 @Override 648 public List<String> getTrapNames() 649 { 650 return new ArrayList<String>( traps.keySet() ); 651 } 652 653 @Override 654 public Collection<Tap> getTrapsCollection() 655 { 656 return getTraps().values(); 657 } 658 659 @Override 660 public Map<String, Tap> getCheckpoints() 661 { 662 return Collections.unmodifiableMap( checkpoints ); 663 } 664 665 @Override 666 public List<String> getCheckpointNames() 667 { 668 return new ArrayList<String>( checkpoints.keySet() ); 669 } 670 671 @Override 672 public Collection<Tap> getCheckpointsCollection() 673 { 674 return getCheckpoints().values(); 675 } 676 677 @Override 678 public boolean isStopJobsOnExit() 679 { 680 return stopJobsOnExit; 681 } 682 683 @Override 684 public FlowSkipStrategy getFlowSkipStrategy() 685 { 686 return flowSkipStrategy; 687 } 688 689 @Override 690 public FlowSkipStrategy setFlowSkipStrategy( FlowSkipStrategy flowSkipStrategy ) 691 { 692 if( flowSkipStrategy == null ) 693 throw new IllegalArgumentException( "flowSkipStrategy may not be null" ); 694 695 try 696 { 697 return this.flowSkipStrategy; 698 } 699 finally 700 { 701 this.flowSkipStrategy = flowSkipStrategy; 702 } 703 } 704 705 @Override 706 public boolean isSkipFlow() throws IOException 707 { 708 return flowSkipStrategy.skipFlow( this ); 709 } 710 711 @Override 712 public boolean areSinksStale() throws IOException 713 { 714 return areSourcesNewer( getSinkModified() ); 715 } 716 717 @Override 718 public boolean areSourcesNewer( long sinkModified ) throws IOException 719 { 720 Config confCopy = getConfigCopy(); 721 Iterator<Tap> values = sources.values().iterator(); 722 723 long sourceModified = 0; 724 725 try 726 { 727 sourceModified = Util.getSourceModified( confCopy, values, sinkModified ); 728 729 if( sinkModified < sourceModified ) 730 return true; 731 732 return false; 733 } 734 finally 735 { 736 if( LOG.isInfoEnabled() ) 737 logInfo( "source modification date at: " + new Date( sourceModified ) ); // not oldest, we didnt check them all 738 } 739 } 740 741 @Override 742 public long getSinkModified() throws IOException 743 { 744 long sinkModified = Util.getSinkModified( getConfigCopy(), sinks.values() ); 745 746 if( LOG.isInfoEnabled() ) 747 { 748 if( sinkModified == -1L ) 749 logInfo( "at least one sink is marked for delete" ); 750 if( sinkModified == 0L ) 751 logInfo( "at least one sink does not exist" ); 752 else 753 logInfo( "sink oldest modified date: " + new Date( sinkModified ) ); 754 } 755 756 return sinkModified; 757 } 758 759 @Override 760 public FlowStepStrategy getFlowStepStrategy() 761 { 762 return flowStepStrategy; 763 } 764 765 @Override 766 public void setFlowStepStrategy( FlowStepStrategy flowStepStrategy ) 767 { 768 this.flowStepStrategy = flowStepStrategy; 769 } 770 771 @Override 772 public List<FlowStep<Config>> getFlowSteps() 773 { 774 if( steps != null ) 775 return steps; 776 777 if( flowStepGraph == null ) 778 return Collections.EMPTY_LIST; 779 780 TopologicalOrderIterator<FlowStep<Config>, Integer> topoIterator = flowStepGraph.getTopologicalIterator(); 781 782 steps = new ArrayList<FlowStep<Config>>(); 783 784 while( topoIterator.hasNext() ) 785 steps.add( topoIterator.next() ); 786 787 return steps; 788 } 789 790 @Override 791 @ProcessPrepare 792 public void prepare() 793 { 794 try 795 { 796 deleteSinksIfNotUpdate(); 797 deleteTrapsIfNotUpdate(); 798 deleteCheckpointsIfNotUpdate(); 799 } 800 catch( IOException exception ) 801 { 802 throw new FlowException( "unable to prepare flow", exception ); 803 } 804 } 805 806 @Override 807 @ProcessStart 808 public synchronized void start() 809 { 810 if( thread != null ) 811 return; 812 813 if( stop ) 814 return; 815 816 registerShutdownHook(); 817 818 internalStart(); 819 820 String threadName = ( "flow " + Util.toNull( getName() ) ).trim(); 821 822 thread = createFlowThread( threadName ); 823 824 thread.start(); 825 } 826 827 protected Thread createFlowThread( String threadName ) 828 { 829 return new Thread( new Runnable() 830 { 831 @Override 832 public void run() 833 { 834 BaseFlow.this.run(); 835 } 836 }, threadName ); 837 } 838 839 protected abstract void internalStart(); 840 841 @Override 842 @ProcessStop 843 public synchronized void stop() 844 { 845 stopLock.lock(); 846 847 try 848 { 849 if( stop ) 850 return; 851 852 stop = true; 853 854 fireOnStopping(); 855 856 if( !flowStats.isFinished() ) 857 flowStats.markStopped(); 858 859 internalStopAllJobs(); 860 861 handleExecutorShutdown(); 862 863 internalClean( true ); 864 } 865 finally 866 { 867 flowStats.cleanup(); 868 stopLock.unlock(); 869 } 870 } 871 872 protected abstract void internalClean( boolean stop ); 873 874 @Override 875 @ProcessComplete 876 public void complete() 877 { 878 start(); 879 880 try 881 { 882 try 883 { 884 synchronized( this ) // prevent NPE on quick stop() & complete() after start() 885 { 886 while( thread == null && !stop ) 887 Util.safeSleep( 10 ); 888 } 889 890 if( thread != null ) 891 thread.join(); 892 } 893 catch( InterruptedException exception ) 894 { 895 throw new FlowException( getName(), "thread interrupted", exception ); 896 } 897 898 // if in #stop and stopping, lets wait till its done in this thread 899 try 900 { 901 stopLock.lock(); 902 } 903 finally 904 { 905 stopLock.unlock(); 906 } 907 908 if( throwable instanceof FlowException ) 909 ( (FlowException) throwable ).setFlowName( getName() ); 910 911 if( throwable instanceof CascadingException ) 912 throw (CascadingException) throwable; 913 914 if( throwable instanceof OutOfMemoryError ) 915 throw (OutOfMemoryError) throwable; 916 917 if( throwable != null ) 918 throw new FlowException( getName(), "unhandled exception", throwable ); 919 920 if( hasListeners() ) 921 { 922 for( SafeFlowListener safeFlowListener : getListeners() ) 923 { 924 if( safeFlowListener.throwable != null ) 925 throw new FlowException( getName(), "unhandled listener exception", throwable ); 926 } 927 } 928 } 929 finally 930 { 931 thread = null; 932 throwable = null; 933 } 934 } 935 936 private void commitTraps() 937 { 938 // commit all the traps, don't fail on an error 939 for( Tap tap : traps.values() ) 940 { 941 try 942 { 943 if( !tap.commitResource( getConfig() ) ) 944 logError( "unable to commit trap: " + tap.getFullIdentifier( getConfig() ), null ); 945 } 946 catch( IOException exception ) 947 { 948 logError( "unable to commit trap: " + tap.getFullIdentifier( getConfig() ), exception ); 949 } 950 } 951 } 952 953 @Override 954 @ProcessCleanup 955 public void cleanup() 956 { 957 // do nothing 958 } 959 960 @Override 961 public TupleEntryIterator openSource() throws IOException 962 { 963 return sources.values().iterator().next().openForRead( getFlowProcess() ); 964 } 965 966 @Override 967 public TupleEntryIterator openSource( String name ) throws IOException 968 { 969 if( !sources.containsKey( name ) ) 970 throw new IllegalArgumentException( "source does not exist: " + name ); 971 972 return sources.get( name ).openForRead( getFlowProcess() ); 973 } 974 975 @Override 976 public TupleEntryIterator openSink() throws IOException 977 { 978 return sinks.values().iterator().next().openForRead( getFlowProcess() ); 979 } 980 981 @Override 982 public TupleEntryIterator openSink( String name ) throws IOException 983 { 984 if( !sinks.containsKey( name ) ) 985 throw new IllegalArgumentException( "sink does not exist: " + name ); 986 987 return sinks.get( name ).openForRead( getFlowProcess() ); 988 } 989 990 @Override 991 public TupleEntryIterator openTrap() throws IOException 992 { 993 return traps.values().iterator().next().openForRead( getFlowProcess() ); 994 } 995 996 @Override 997 public TupleEntryIterator openTrap( String name ) throws IOException 998 { 999 if( !traps.containsKey( name ) ) 1000 throw new IllegalArgumentException( "trap does not exist: " + name ); 1001 1002 return traps.get( name ).openForRead( getFlowProcess() ); 1003 } 1004 1005 /** 1006 * Method deleteSinks deletes all sinks, whether or not they are configured for {@link cascading.tap.SinkMode#UPDATE}. 1007 * <p/> 1008 * Use with caution. 1009 * 1010 * @throws IOException when 1011 * @see BaseFlow#deleteSinksIfNotUpdate() 1012 */ 1013 public void deleteSinks() throws IOException 1014 { 1015 for( Tap tap : sinks.values() ) 1016 deleteOrFail( tap ); 1017 } 1018 1019 private void deleteOrFail( Tap tap ) throws IOException 1020 { 1021 if( !tap.resourceExists( getConfig() ) ) 1022 return; 1023 1024 if( !tap.deleteResource( getConfig() ) ) 1025 throw new FlowException( "unable to delete resource: " + tap.getFullIdentifier( getFlowProcess() ) ); 1026 } 1027 1028 /** 1029 * Method deleteSinksIfNotUpdate deletes all sinks if they are not configured with the {@link cascading.tap.SinkMode#UPDATE} flag. 1030 * <p/> 1031 * Typically used by a {@link Cascade} before executing the flow if the sinks are stale. 1032 * <p/> 1033 * Use with caution. 1034 * 1035 * @throws IOException when 1036 */ 1037 public void deleteSinksIfNotUpdate() throws IOException 1038 { 1039 for( Tap tap : sinks.values() ) 1040 { 1041 if( !tap.isUpdate() ) 1042 deleteOrFail( tap ); 1043 } 1044 } 1045 1046 public void deleteSinksIfReplace() throws IOException 1047 { 1048 for( Tap tap : sinks.values() ) 1049 { 1050 if( tap.isReplace() ) 1051 deleteOrFail( tap ); 1052 } 1053 } 1054 1055 public void deleteTrapsIfNotUpdate() throws IOException 1056 { 1057 for( Tap tap : traps.values() ) 1058 { 1059 if( !tap.isUpdate() ) 1060 deleteOrFail( tap ); 1061 } 1062 } 1063 1064 public void deleteCheckpointsIfNotUpdate() throws IOException 1065 { 1066 for( Tap tap : checkpoints.values() ) 1067 { 1068 if( !tap.isUpdate() ) 1069 deleteOrFail( tap ); 1070 } 1071 } 1072 1073 public void deleteTrapsIfReplace() throws IOException 1074 { 1075 for( Tap tap : traps.values() ) 1076 { 1077 if( tap.isReplace() ) 1078 deleteOrFail( tap ); 1079 } 1080 } 1081 1082 public void deleteCheckpointsIfReplace() throws IOException 1083 { 1084 for( Tap tap : checkpoints.values() ) 1085 { 1086 if( tap.isReplace() ) 1087 deleteOrFail( tap ); 1088 } 1089 } 1090 1091 @Override 1092 public boolean resourceExists( Tap tap ) throws IOException 1093 { 1094 return tap.resourceExists( getConfig() ); 1095 } 1096 1097 @Override 1098 public TupleEntryIterator openTapForRead( Tap tap ) throws IOException 1099 { 1100 return tap.openForRead( getFlowProcess() ); 1101 } 1102 1103 @Override 1104 public TupleEntryCollector openTapForWrite( Tap tap ) throws IOException 1105 { 1106 return tap.openForWrite( getFlowProcess() ); 1107 } 1108 1109 /** Method run implements the Runnable run method and should not be called by users. */ 1110 private void run() 1111 { 1112 if( thread == null ) 1113 throw new IllegalStateException( "to start a Flow call start() or complete(), not Runnable#run()" ); 1114 1115 Version.printBanner(); 1116 Update.checkForUpdate( getPlatformInfo() ); 1117 1118 try 1119 { 1120 if( stop ) 1121 return; 1122 1123 flowStats.markStarted(); 1124 1125 fireOnStarting(); 1126 1127 if( LOG.isInfoEnabled() ) 1128 { 1129 logInfo( "starting" ); 1130 1131 for( Tap source : getSourcesCollection() ) 1132 logInfo( " source: " + source ); 1133 for( Tap sink : getSinksCollection() ) 1134 logInfo( " sink: " + sink ); 1135 } 1136 1137 // if jobs are run local, then only use one thread to force execution serially 1138 //int numThreads = jobsAreLocal() ? 1 : getMaxConcurrentSteps( getJobConf() ); 1139 int numThreads = getMaxNumParallelSteps(); 1140 1141 if( numThreads == 0 ) 1142 numThreads = jobsMap.size(); 1143 1144 if( numThreads == 0 ) 1145 throw new IllegalStateException( "no jobs rendered for flow: " + getName() ); 1146 1147 if( LOG.isInfoEnabled() ) 1148 { 1149 logInfo( " parallel execution is enabled: " + ( getMaxNumParallelSteps() != 1 ) ); 1150 logInfo( " starting jobs: " + jobsMap.size() ); 1151 logInfo( " allocating threads: " + numThreads ); 1152 } 1153 1154 List<Future<Throwable>> futures = spawnJobs( numThreads ); 1155 1156 for( Future<Throwable> future : futures ) 1157 { 1158 throwable = future.get(); 1159 1160 if( throwable != null ) 1161 { 1162 if( !stop ) 1163 internalStopAllJobs(); 1164 1165 handleExecutorShutdown(); 1166 break; 1167 } 1168 } 1169 } 1170 catch( Throwable throwable ) 1171 { 1172 this.throwable = throwable; 1173 } 1174 finally 1175 { 1176 handleThrowableAndMarkFailed(); 1177 1178 if( !stop && !flowStats.isFinished() ) 1179 flowStats.markSuccessful(); 1180 1181 internalClean( stop ); // cleaning temp taps may be determined by success/failure 1182 1183 commitTraps(); 1184 1185 try 1186 { 1187 fireOnCompleted(); 1188 } 1189 finally 1190 { 1191 flowStats.cleanup(); 1192 internalShutdown(); 1193 deregisterShutdownHook(); 1194 } 1195 } 1196 } 1197 1198 protected abstract int getMaxNumParallelSteps(); 1199 1200 protected abstract void internalShutdown(); 1201 1202 private List<Future<Throwable>> spawnJobs( int numThreads ) throws InterruptedException 1203 { 1204 if( stop ) 1205 return new ArrayList<Future<Throwable>>(); 1206 1207 List<Callable<Throwable>> list = new ArrayList<Callable<Throwable>>(); 1208 1209 for( FlowStepJob<Config> job : jobsMap.values() ) 1210 list.add( job ); 1211 1212 return spawnStrategy.start( this, numThreads, list ); 1213 } 1214 1215 private void handleThrowableAndMarkFailed() 1216 { 1217 if( throwable != null && !stop ) 1218 { 1219 flowStats.markFailed( throwable ); 1220 1221 fireOnThrowable(); 1222 } 1223 } 1224 1225 Map<String, FlowStepJob<Config>> getJobsMap() 1226 { 1227 return jobsMap; 1228 } 1229 1230 protected void initializeNewJobsMap() 1231 { 1232 // keep topo order 1233 jobsMap = new LinkedHashMap<String, FlowStepJob<Config>>(); 1234 TopologicalOrderIterator<FlowStep<Config>, Integer> topoIterator = flowStepGraph.getTopologicalIterator(); 1235 1236 while( topoIterator.hasNext() ) 1237 { 1238 BaseFlowStep<Config> step = (BaseFlowStep<Config>) topoIterator.next(); 1239 FlowStepJob<Config> flowStepJob = step.getFlowStepJob( getFlowProcess(), getConfig() ); 1240 1241 jobsMap.put( step.getName(), flowStepJob ); 1242 1243 List<FlowStepJob<Config>> predecessors = new ArrayList<FlowStepJob<Config>>(); 1244 1245 for( Object flowStep : predecessorListOf( flowStepGraph, step ) ) 1246 predecessors.add( jobsMap.get( ( (FlowStep<Config>) flowStep ).getName() ) ); 1247 1248 flowStepJob.setPredecessors( predecessors ); 1249 1250 flowStats.addStepStats( flowStepJob.getStepStats() ); 1251 } 1252 } 1253 1254 protected void internalStopAllJobs() 1255 { 1256 logInfo( "stopping all jobs" ); 1257 1258 try 1259 { 1260 if( jobsMap == null ) 1261 return; 1262 1263 List<FlowStepJob<Config>> jobs = new ArrayList<FlowStepJob<Config>>( jobsMap.values() ); 1264 1265 Collections.reverse( jobs ); 1266 1267 for( FlowStepJob<Config> job : jobs ) 1268 job.stop(); 1269 } 1270 finally 1271 { 1272 logInfo( "stopped all jobs" ); 1273 } 1274 } 1275 1276 protected void handleExecutorShutdown() 1277 { 1278 if( spawnStrategy.isCompleted( this ) ) 1279 return; 1280 1281 logInfo( "shutting down job executor" ); 1282 1283 try 1284 { 1285 spawnStrategy.complete( this, 5 * 60, TimeUnit.SECONDS ); 1286 } 1287 catch( InterruptedException exception ) 1288 { 1289 // ignore 1290 } 1291 1292 logInfo( "shutdown complete" ); 1293 } 1294 1295 protected void fireOnCompleted() 1296 { 1297 if( hasListeners() ) 1298 { 1299 if( LOG.isDebugEnabled() ) 1300 logDebug( "firing onCompleted event: " + getListeners().size() ); 1301 1302 for( FlowListener flowListener : getListeners() ) 1303 flowListener.onCompleted( this ); 1304 } 1305 } 1306 1307 protected void fireOnThrowable( Throwable throwable ) 1308 { 1309 this.throwable = throwable; 1310 fireOnThrowable(); 1311 } 1312 1313 protected void fireOnThrowable() 1314 { 1315 if( hasListeners() ) 1316 { 1317 if( LOG.isDebugEnabled() ) 1318 logDebug( "firing onThrowable event: " + getListeners().size() ); 1319 1320 boolean isHandled = false; 1321 1322 for( FlowListener flowListener : getListeners() ) 1323 isHandled = flowListener.onThrowable( this, throwable ) || isHandled; 1324 1325 if( isHandled ) 1326 throwable = null; 1327 } 1328 } 1329 1330 protected void fireOnStopping() 1331 { 1332 if( hasListeners() ) 1333 { 1334 if( LOG.isDebugEnabled() ) 1335 logDebug( "firing onStopping event: " + getListeners().size() ); 1336 1337 for( FlowListener flowListener : getListeners() ) 1338 flowListener.onStopping( this ); 1339 } 1340 } 1341 1342 protected void fireOnStarting() 1343 { 1344 if( hasListeners() ) 1345 { 1346 if( LOG.isDebugEnabled() ) 1347 logDebug( "firing onStarting event: " + getListeners().size() ); 1348 1349 for( FlowListener flowListener : getListeners() ) 1350 flowListener.onStarting( this ); 1351 } 1352 } 1353 1354 @Override 1355 public String toString() 1356 { 1357 StringBuffer buffer = new StringBuffer(); 1358 1359 if( getName() != null ) 1360 buffer.append( getName() ).append( ": " ); 1361 1362 for( FlowStep step : getFlowSteps() ) 1363 buffer.append( step ); 1364 1365 return buffer.toString(); 1366 } 1367 1368 protected void logInfo( String message ) 1369 { 1370 LOG.info( "[" + Util.truncate( getName(), 25 ) + "] " + message ); 1371 } 1372 1373 private void logDebug( String message ) 1374 { 1375 LOG.debug( "[" + Util.truncate( getName(), 25 ) + "] " + message ); 1376 } 1377 1378 private void logWarn( String message, Throwable throwable ) 1379 { 1380 LOG.warn( "[" + Util.truncate( getName(), 25 ) + "] " + message, throwable ); 1381 } 1382 1383 private void logError( String message, Throwable throwable ) 1384 { 1385 LOG.error( "[" + Util.truncate( getName(), 25 ) + "] " + message, throwable ); 1386 } 1387 1388 @Override 1389 public void writeDOT( String filename ) 1390 { 1391 if( pipeGraph == null ) 1392 throw new UnsupportedOperationException( "this flow instance cannot write a DOT file" ); 1393 1394 pipeGraph.writeDOT( filename ); 1395 } 1396 1397 @Override 1398 public void writeStepsDOT( String filename ) 1399 { 1400 if( flowStepGraph == null ) 1401 throw new UnsupportedOperationException( "this flow instance cannot write a DOT file" ); 1402 1403 flowStepGraph.writeDOT( filename ); 1404 } 1405 1406 /** 1407 * Used to return a simple wrapper for use as an edge in a graph where there can only be 1408 * one instance of every edge. 1409 * 1410 * @return FlowHolder 1411 */ 1412 public FlowHolder getHolder() 1413 { 1414 return new FlowHolder( this ); 1415 } 1416 1417 public void setCascade( Cascade cascade ) 1418 { 1419 setConfigProperty( getConfig(), "cascading.cascade.id", cascade.getID() ); 1420 flowStats.recordInfo(); 1421 } 1422 1423 @Override 1424 public String getCascadeID() 1425 { 1426 return getProperty( "cascading.cascade.id" ); 1427 } 1428 1429 @Override 1430 public String getRunID() 1431 { 1432 return runID; 1433 } 1434 1435 protected List<String> getClassPath() 1436 { 1437 return classPath; 1438 } 1439 1440 @Override 1441 public void setSpawnStrategy( UnitOfWorkSpawnStrategy spawnStrategy ) 1442 { 1443 this.spawnStrategy = spawnStrategy; 1444 } 1445 1446 @Override 1447 public UnitOfWorkSpawnStrategy getSpawnStrategy() 1448 { 1449 return spawnStrategy; 1450 } 1451 1452 protected void registerShutdownHook() 1453 { 1454 if( !isStopJobsOnExit() ) 1455 return; 1456 1457 shutdownHook = new ShutdownUtil.Hook() 1458 { 1459 @Override 1460 public Priority priority() 1461 { 1462 return Priority.WORK_CHILD; 1463 } 1464 1465 @Override 1466 public void execute() 1467 { 1468 logInfo( "shutdown hook calling stop on flow" ); 1469 1470 BaseFlow.this.stop(); 1471 } 1472 }; 1473 1474 ShutdownUtil.addHook( shutdownHook ); 1475 } 1476 1477 private void deregisterShutdownHook() 1478 { 1479 if( !isStopJobsOnExit() || stop ) 1480 return; 1481 1482 ShutdownUtil.removeHook( shutdownHook ); 1483 } 1484 1485 /** Class FlowHolder is a helper class for wrapping Flow instances. */ 1486 public static class FlowHolder 1487 { 1488 /** Field flow */ 1489 public Flow flow; 1490 1491 public FlowHolder() 1492 { 1493 } 1494 1495 public FlowHolder( Flow flow ) 1496 { 1497 this.flow = flow; 1498 } 1499 } 1500 1501 /** 1502 * Class SafeFlowListener safely calls a wrapped FlowListener. 1503 * <p/> 1504 * This is done for a few reasons, the primary reason is so exceptions thrown by the Listener 1505 * can be caught by the calling Thread. Since Flow is asynchronous, much of the work is done in the run() method 1506 * which in turn is run in a new Thread. 1507 */ 1508 private class SafeFlowListener implements FlowListener 1509 { 1510 /** Field flowListener */ 1511 final FlowListener flowListener; 1512 /** Field throwable */ 1513 Throwable throwable; 1514 1515 private SafeFlowListener( FlowListener flowListener ) 1516 { 1517 this.flowListener = flowListener; 1518 } 1519 1520 public void onStarting( Flow flow ) 1521 { 1522 try 1523 { 1524 flowListener.onStarting( flow ); 1525 } 1526 catch( Throwable throwable ) 1527 { 1528 handleThrowable( throwable ); 1529 } 1530 } 1531 1532 public void onStopping( Flow flow ) 1533 { 1534 try 1535 { 1536 flowListener.onStopping( flow ); 1537 } 1538 catch( Throwable throwable ) 1539 { 1540 handleThrowable( throwable ); 1541 } 1542 } 1543 1544 public void onCompleted( Flow flow ) 1545 { 1546 try 1547 { 1548 flowListener.onCompleted( flow ); 1549 } 1550 catch( Throwable throwable ) 1551 { 1552 handleThrowable( throwable ); 1553 } 1554 } 1555 1556 public boolean onThrowable( Flow flow, Throwable flowThrowable ) 1557 { 1558 try 1559 { 1560 return flowListener.onThrowable( flow, flowThrowable ); 1561 } 1562 catch( Throwable throwable ) 1563 { 1564 handleThrowable( throwable ); 1565 } 1566 1567 return false; 1568 } 1569 1570 private void handleThrowable( Throwable throwable ) 1571 { 1572 this.throwable = throwable; 1573 1574 logWarn( String.format( "flow listener %s threw throwable", flowListener ), throwable ); 1575 1576 // stop this flow 1577 stop(); 1578 } 1579 1580 public boolean equals( Object object ) 1581 { 1582 if( object instanceof BaseFlow.SafeFlowListener ) 1583 return flowListener.equals( ( (BaseFlow.SafeFlowListener) object ).flowListener ); 1584 1585 return flowListener.equals( object ); 1586 } 1587 1588 public int hashCode() 1589 { 1590 return flowListener.hashCode(); 1591 } 1592 } 1593 }