001/*
002 * Copyright (c) 2016-2017 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
004 *
005 * Project and contact information: http://www.cascading.org/
006 *
007 * This file is part of the Cascading project.
008 *
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *     http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 */
021
022package cascading.flow;
023
024import java.io.IOException;
025import java.util.ArrayList;
026import java.util.Collection;
027import java.util.Collections;
028import java.util.Date;
029import java.util.HashMap;
030import java.util.HashSet;
031import java.util.Iterator;
032import java.util.LinkedHashMap;
033import java.util.LinkedList;
034import java.util.List;
035import java.util.Map;
036import java.util.Properties;
037import java.util.Set;
038import java.util.concurrent.Callable;
039import java.util.concurrent.ExecutionException;
040import java.util.concurrent.Future;
041import java.util.concurrent.TimeUnit;
042import java.util.concurrent.locks.ReentrantLock;
043
044import cascading.CascadingException;
045import cascading.cascade.Cascade;
046import cascading.flow.planner.BaseFlowNode;
047import cascading.flow.planner.BaseFlowStep;
048import cascading.flow.planner.FlowStepJob;
049import cascading.flow.planner.PlannerInfo;
050import cascading.flow.planner.PlatformInfo;
051import cascading.flow.planner.graph.ElementGraphs;
052import cascading.flow.planner.graph.FlowElementGraph;
053import cascading.flow.planner.process.FlowStepGraph;
054import cascading.flow.planner.process.ProcessGraphs;
055import cascading.management.CascadingServices;
056import cascading.management.UnitOfWorkExecutorStrategy;
057import cascading.management.UnitOfWorkSpawnStrategy;
058import cascading.management.state.ClientState;
059import cascading.property.AppProps;
060import cascading.property.PropertyUtil;
061import cascading.stats.FlowStats;
062import cascading.tap.Tap;
063import cascading.tuple.Fields;
064import cascading.tuple.TupleEntryCollector;
065import cascading.tuple.TupleEntryIterator;
066import cascading.util.ProcessLogger;
067import cascading.util.ShutdownUtil;
068import cascading.util.Update;
069import cascading.util.Util;
070import cascading.util.Version;
071import org.slf4j.Logger;
072import org.slf4j.LoggerFactory;
073import riffle.process.DependencyIncoming;
074import riffle.process.DependencyOutgoing;
075import riffle.process.ProcessCleanup;
076import riffle.process.ProcessComplete;
077import riffle.process.ProcessPrepare;
078import riffle.process.ProcessStart;
079import riffle.process.ProcessStop;
080
081import static cascading.util.Util.formatDurationFromMillis;
082
083@riffle.process.Process
084public abstract class BaseFlow<Config> implements Flow<Config>, ProcessLogger
085  {
086  /** Field LOG */
087  private static final Logger LOG = LoggerFactory.getLogger( Flow.class ); // wrapped by ProcessLogger interface methods
088  private static final int LOG_FLOW_NAME_MAX = 25;
089
090  private PlannerInfo plannerInfo = PlannerInfo.NULL;
091  protected PlatformInfo platformInfo = PlatformInfo.NULL;
092
093  /** Field id */
094  private String id;
095  /** Field name */
096  private String name;
097  /** Fields runID */
098  private String runID;
099  /** Fields classpath */
100  private List<String> classPath; // may remain null
101  /** Field tags */
102  private String tags;
103  /** Field listeners */
104  private List<SafeFlowListener> listeners;
105  /** Field skipStrategy */
106  private FlowSkipStrategy flowSkipStrategy = new FlowSkipIfSinkNotStale();
107  /** Field flowStats */
108  protected FlowStats flowStats; // don't use a listener to set values
109  /** Field sources */
110  protected Map<String, Tap> sources = Collections.emptyMap();
111  /** Field sinks */
112  protected Map<String, Tap> sinks = Collections.emptyMap();
113  /** Field traps */
114  private Map<String, Tap> traps = Collections.emptyMap();
115  /** Field checkpoints */
116  private Map<String, Tap> checkpoints = Collections.emptyMap();
117  /** Field stopJobsOnExit */
118  protected boolean stopJobsOnExit = true;
119  /** Field submitPriority */
120  private int submitPriority = 5;
121
122  /** Field stepGraph */
123  protected FlowStepGraph flowStepGraph;
124  /** Field thread */
125  protected transient Thread thread;
126  /** Field throwable */
127  protected Throwable throwable;
128  /** Field stop */
129  protected volatile boolean stop;
130  /** Field completed */
131  protected volatile boolean completed = false;
132
133  /** Field flowCanonicalHash */
134  protected String flowCanonicalHash;
135  /** Field flowElementGraph */
136  protected FlowElementGraph flowElementGraph; // only used for documentation purposes
137
138  private transient CascadingServices cascadingServices;
139
140  private FlowStepStrategy<Config> flowStepStrategy = null;
141  /** Field steps */
142  protected transient List<FlowStep<Config>> steps;
143  /** Field jobsMap */
144  private transient Map<String, FlowStepJob<Config>> jobsMap;
145  private transient UnitOfWorkSpawnStrategy spawnStrategy = new UnitOfWorkExecutorStrategy();
146
147  private transient ReentrantLock stopLock = new ReentrantLock( true );
148  protected ShutdownUtil.Hook shutdownHook;
149
150  protected HashMap<String, String> flowDescriptor;
151
152  /**
153   * Returns property stopJobsOnExit.
154   *
155   * @param properties of type Map
156   * @return a boolean
157   */
158  static boolean getStopJobsOnExit( Map<Object, Object> properties )
159    {
160    return Boolean.parseBoolean( PropertyUtil.getProperty( properties, FlowProps.STOP_JOBS_ON_EXIT, "true" ) );
161    }
162
163  /** Used for testing. */
164  protected BaseFlow()
165    {
166    this.name = "NA";
167    this.flowStats = createPrepareFlowStats();
168    }
169
170  /**
171   * Does not initialize stats
172   *
173   * @param name
174   */
175  protected BaseFlow( PlatformInfo platformInfo, String name )
176    {
177    if( platformInfo != null )
178      this.platformInfo = platformInfo;
179
180    this.name = name;
181    }
182
183  protected BaseFlow( PlatformInfo platformInfo, Map<Object, Object> properties, Config defaultConfig, String name, Map<String, String> flowDescriptor )
184    {
185    if( platformInfo != null )
186      this.platformInfo = platformInfo;
187
188    this.name = name;
189
190    if( flowDescriptor != null )
191      this.flowDescriptor = new LinkedHashMap<>( flowDescriptor );
192
193    addSessionProperties( properties );
194    initConfig( properties, defaultConfig );
195    }
196
197  protected BaseFlow( PlatformInfo platformInfo, Map<Object, Object> properties, Config defaultConfig, FlowDef flowDef )
198    {
199    properties = PropertyUtil.asFlatMap( properties );
200
201    if( platformInfo != null )
202      this.platformInfo = platformInfo;
203
204    this.name = flowDef.getName();
205    this.tags = flowDef.getTags();
206    this.runID = flowDef.getRunID();
207    this.classPath = flowDef.getClassPath();
208
209    if( !flowDef.getFlowDescriptor().isEmpty() )
210      this.flowDescriptor = new LinkedHashMap<>( flowDef.getFlowDescriptor() );
211
212    addSessionProperties( properties );
213    initConfig( properties, defaultConfig );
214    setSources( flowDef.getSourcesCopy() );
215    setSinks( flowDef.getSinksCopy() );
216    setTraps( flowDef.getTrapsCopy() );
217    setCheckpoints( flowDef.getCheckpointsCopy() );
218    initFromTaps();
219
220    retrieveSourceFields();
221    retrieveSinkFields();
222    }
223
224  public void setPlannerInfo( PlannerInfo plannerInfo )
225    {
226    this.plannerInfo = plannerInfo;
227    }
228
229  @Override
230  public PlannerInfo getPlannerInfo()
231    {
232    return plannerInfo;
233    }
234
235  @Override
236  public PlatformInfo getPlatformInfo()
237    {
238    return platformInfo;
239    }
240
241  public void initialize( FlowElementGraph flowElementGraph, FlowStepGraph flowStepGraph )
242    {
243    addPlannerProperties();
244    this.flowElementGraph = flowElementGraph;
245    this.flowStepGraph = flowStepGraph;
246
247    initSteps();
248
249    this.flowStats = createPrepareFlowStats();
250
251    initializeNewJobsMap();
252
253    initializeChildStats();
254    }
255
256  public FlowElementGraph updateSchemes( FlowElementGraph pipeGraph )
257    {
258    presentSourceFields( pipeGraph );
259
260    presentSinkFields( pipeGraph );
261
262    return new FlowElementGraph( pipeGraph );
263    }
264
265  /** Force a Scheme to fetch any fields from a meta-data store */
266  protected void retrieveSourceFields()
267    {
268    for( Tap tap : sources.values() )
269      tap.retrieveSourceFields( getFlowProcess() );
270    }
271
272  /**
273   * Present the current resolved fields for the Tap
274   *
275   * @param pipeGraph
276   */
277  protected void presentSourceFields( FlowElementGraph pipeGraph )
278    {
279    for( Tap tap : sources.values() )
280      {
281      if( pipeGraph.containsVertex( tap ) )
282        tap.presentSourceFields( getFlowProcess(), getFieldsFor( pipeGraph, tap ) );
283      }
284
285    for( Tap tap : checkpoints.values() )
286      {
287      if( pipeGraph.containsVertex( tap ) )
288        tap.presentSourceFields( getFlowProcess(), getFieldsFor( pipeGraph, tap ) );
289      }
290    }
291
292  /** Force a Scheme to fetch any fields from a meta-data store */
293  protected void retrieveSinkFields()
294    {
295    for( Tap tap : sinks.values() )
296      tap.retrieveSinkFields( getFlowProcess() );
297    }
298
299  /**
300   * Present the current resolved fields for the Tap
301   *
302   * @param pipeGraph
303   */
304  protected void presentSinkFields( FlowElementGraph pipeGraph )
305    {
306    for( Tap tap : sinks.values() )
307      {
308      if( pipeGraph.containsVertex( tap ) )
309        tap.presentSinkFields( getFlowProcess(), getFieldsFor( pipeGraph, tap ) );
310      }
311
312    for( Tap tap : checkpoints.values() )
313      {
314      if( pipeGraph.containsVertex( tap ) )
315        tap.presentSinkFields( getFlowProcess(), getFieldsFor( pipeGraph, tap ) );
316      }
317    }
318
319  protected Fields getFieldsFor( FlowElementGraph pipeGraph, Tap tap )
320    {
321    return pipeGraph.outgoingEdgesOf( tap ).iterator().next().getOutValuesFields();
322    }
323
324  protected void addSessionProperties( Map<Object, Object> properties )
325    {
326    if( properties == null )
327      return;
328
329    PropertyUtil.setProperty( properties, CASCADING_FLOW_ID, getID() );
330    PropertyUtil.setProperty( properties, "cascading.flow.tags", getTags() );
331    AppProps.setApplicationID( properties );
332    PropertyUtil.setProperty( properties, "cascading.app.name", makeAppName( properties ) );
333    PropertyUtil.setProperty( properties, "cascading.app.version", makeAppVersion( properties ) );
334    }
335
336  protected void addPlannerProperties()
337    {
338    setConfigProperty( getConfig(), "cascading.flow.planner", getPlannerInfo().name );
339    setConfigProperty( getConfig(), "cascading.flow.platform", getPlannerInfo().platform );
340    setConfigProperty( getConfig(), "cascading.flow.registry", getPlannerInfo().registry );
341    }
342
343  private String makeAppName( Map<Object, Object> properties )
344    {
345    if( properties == null )
346      return null;
347
348    String name = AppProps.getApplicationName( properties );
349
350    if( name != null )
351      return name;
352
353    return Util.findName( AppProps.getApplicationJarPath( properties ) );
354    }
355
356  private String makeAppVersion( Map<Object, Object> properties )
357    {
358    if( properties == null )
359      return null;
360
361    String name = AppProps.getApplicationVersion( properties );
362
363    if( name != null )
364      return name;
365
366    return Util.findVersion( AppProps.getApplicationJarPath( properties ) );
367    }
368
369  protected FlowStats createPrepareFlowStats()
370    {
371    FlowStats flowStats = createFlowStats();
372
373    flowStats.prepare();
374    flowStats.markPending();
375
376    return flowStats;
377    }
378
379  protected FlowStats createFlowStats()
380    {
381    return new FlowStats( this, getClientState() );
382    }
383
384  public CascadingServices getCascadingServices()
385    {
386    if( cascadingServices == null )
387      cascadingServices = new CascadingServices( getConfigAsProperties() );
388
389    return cascadingServices;
390    }
391
392  protected ClientState getClientState()
393    {
394    return getFlowSession().getCascadingServices().createClientState( getID() );
395    }
396
397  protected void initSteps()
398    {
399    if( flowStepGraph == null )
400      return;
401
402    Set<FlowStep> flowSteps = flowStepGraph.vertexSet();
403
404    for( FlowStep flowStep : flowSteps )
405      {
406      ( (BaseFlowStep) flowStep ).setFlow( this );
407
408      Set<FlowNode> flowNodes = flowStep.getFlowNodeGraph().vertexSet();
409
410      for( FlowNode flowNode : flowNodes )
411        ( (BaseFlowNode) flowNode ).setFlowStep( flowStep );
412      }
413    }
414
415  private void initFromTaps()
416    {
417    initFromTaps( sources );
418    initFromTaps( sinks );
419    initFromTaps( traps );
420    }
421
422  private void initFromTaps( Map<String, Tap> taps )
423    {
424    for( Tap tap : taps.values() )
425      tap.flowConfInit( this );
426    }
427
428  @Override
429  public String getName()
430    {
431    return name;
432    }
433
434  protected void setName( String name )
435    {
436    this.name = name;
437    }
438
439  @Override
440  public String getID()
441    {
442    if( id == null )
443      id = Util.createUniqueID();
444
445    return id;
446    }
447
448  @Override
449  public String getTags()
450    {
451    return tags;
452    }
453
454  @Override
455  public int getSubmitPriority()
456    {
457    return submitPriority;
458    }
459
460  @Override
461  public void setSubmitPriority( int submitPriority )
462    {
463    if( submitPriority < 1 || submitPriority > 10 )
464      throw new IllegalArgumentException( "submitPriority must be between 1 and 10 inclusive, was: " + submitPriority );
465
466    this.submitPriority = submitPriority;
467    }
468
469  /**
470   * The hash value can be used to determine if two unique Flow instances performed the same work.
471   * <p/>
472   * The source and sink taps are not relevant to the hash.
473   *
474   * @return a String value
475   */
476  public String getFlowCanonicalHash()
477    {
478    if( flowCanonicalHash != null || flowElementGraph == null )
479      return flowCanonicalHash;
480
481    // synchronize on flowElementGraph to prevent duplicate hash creation - can be high overhead
482    // and to prevent deadlocks if the DocumentService calls back into the flow when transitioning
483    // into a running state
484    synchronized( flowElementGraph )
485      {
486      flowCanonicalHash = createFlowCanonicalHash( flowElementGraph );
487      }
488
489    return flowCanonicalHash;
490    }
491
492  // allow for sub-class overrides
493  protected String createFlowCanonicalHash( FlowElementGraph flowElementGraph )
494    {
495    if( flowElementGraph == null )
496      return null;
497
498    return ElementGraphs.canonicalHash( flowElementGraph );
499    }
500
501  protected FlowElementGraph getFlowElementGraph()
502    {
503    return flowElementGraph;
504    }
505
506  protected void setFlowElementGraph( FlowElementGraph flowElementGraph )
507    {
508    this.flowElementGraph = flowElementGraph;
509    }
510
511  protected FlowStepGraph getFlowStepGraph()
512    {
513    return flowStepGraph;
514    }
515
516  protected void setFlowStepGraph( FlowStepGraph flowStepGraph )
517    {
518    this.flowStepGraph = flowStepGraph;
519    }
520
521  protected void setSources( Map<String, Tap> sources )
522    {
523    if( sources == null )
524      return;
525
526    addListeners( sources.values() );
527    this.sources = sources;
528    }
529
530  protected void setSinks( Map<String, Tap> sinks )
531    {
532    if( sinks == null )
533      return;
534
535    addListeners( sinks.values() );
536    this.sinks = sinks;
537    }
538
539  protected void setTraps( Map<String, Tap> traps )
540    {
541    addListeners( traps.values() );
542    this.traps = traps;
543    }
544
545  protected void setCheckpoints( Map<String, Tap> checkpoints )
546    {
547    addListeners( checkpoints.values() );
548    this.checkpoints = checkpoints;
549    }
550
551  /**
552   * This method creates a new internal Config with the parentConfig as defaults using the properties to override
553   * the defaults.
554   *
555   * @param properties   of type Map
556   * @param parentConfig of type Config
557   */
558  protected abstract void initConfig( Map<Object, Object> properties, Config parentConfig );
559
560  public Config createConfig( Map<Object, Object> properties, Config defaultConfig )
561    {
562    Config config = newConfig( defaultConfig );
563
564    if( properties == null )
565      return config;
566
567    Set<Object> keys = new HashSet<>( properties.keySet() );
568
569    // keys will only be grabbed if both key/value are String, so keep orig keys
570    if( properties instanceof Properties )
571      keys.addAll( ( (Properties) properties ).stringPropertyNames() );
572
573    for( Object key : keys )
574      {
575      Object value = properties.get( key );
576
577      if( value == null && properties instanceof Properties && key instanceof String )
578        value = ( (Properties) properties ).getProperty( (String) key );
579
580      if( value == null ) // don't stuff null values
581        continue;
582
583      setConfigProperty( config, key, value );
584      }
585
586    return config;
587    }
588
589  protected abstract void setConfigProperty( Config config, Object key, Object value );
590
591  protected abstract Config newConfig( Config defaultConfig );
592
593  protected void initFromProperties( Map<Object, Object> properties )
594    {
595    stopJobsOnExit = getStopJobsOnExit( properties );
596    }
597
598  public FlowSession getFlowSession()
599    {
600    return new FlowSession( getCascadingServices() );
601    }
602
603  @Override
604  public FlowStats getFlowStats()
605    {
606    return flowStats;
607    }
608
609  @Override
610  public Map<String, String> getFlowDescriptor()
611    {
612    if( flowDescriptor == null )
613      return Collections.emptyMap();
614
615    return Collections.unmodifiableMap( flowDescriptor );
616    }
617
618  @Override
619  public FlowStats getStats()
620    {
621    return getFlowStats();
622    }
623
624  void addListeners( Collection listeners )
625    {
626    for( Object listener : listeners )
627      {
628      if( listener instanceof FlowListener )
629        addListener( (FlowListener) listener );
630      }
631    }
632
633  protected void removeListeners( Collection listeners )
634    {
635    if( listeners == null || listeners.isEmpty() )
636      return;
637
638    for( Object listener : listeners )
639      {
640      if( listener instanceof FlowListener )
641        removeListener( (FlowListener) listener );
642      }
643    }
644
645  List<SafeFlowListener> getListeners()
646    {
647    if( listeners == null )
648      listeners = new LinkedList<SafeFlowListener>();
649
650    return listeners;
651    }
652
653  @Override
654  public boolean hasListeners()
655    {
656    return listeners != null && !listeners.isEmpty();
657    }
658
659  @Override
660  public void addListener( FlowListener flowListener )
661    {
662    getListeners().add( new SafeFlowListener( flowListener ) );
663    }
664
665  @Override
666  public boolean removeListener( FlowListener flowListener )
667    {
668    return getListeners().remove( new SafeFlowListener( flowListener ) );
669    }
670
671  @Override
672  public boolean hasStepListeners()
673    {
674    boolean hasStepListeners = false;
675
676    for( FlowStep step : getFlowSteps() )
677      hasStepListeners |= step.hasListeners();
678
679    return hasStepListeners;
680    }
681
682  @Override
683  public void addStepListener( FlowStepListener flowStepListener )
684    {
685    for( FlowStep step : getFlowSteps() )
686      step.addListener( flowStepListener );
687    }
688
689  @Override
690  public boolean removeStepListener( FlowStepListener flowStepListener )
691    {
692    boolean listenerRemoved = true;
693
694    for( FlowStep step : getFlowSteps() )
695      listenerRemoved &= step.removeListener( flowStepListener );
696
697    return listenerRemoved;
698    }
699
700  @Override
701  public Map<String, Tap> getSources()
702    {
703    return Collections.unmodifiableMap( sources );
704    }
705
706  @Override
707  public List<String> getSourceNames()
708    {
709    return new ArrayList<String>( sources.keySet() );
710    }
711
712  @Override
713  public Tap getSource( String name )
714    {
715    return sources.get( name );
716    }
717
718  @Override
719  @DependencyIncoming
720  public Collection<Tap> getSourcesCollection()
721    {
722    return getSources().values();
723    }
724
725  @Override
726  public Map<String, Tap> getSinks()
727    {
728    return Collections.unmodifiableMap( sinks );
729    }
730
731  @Override
732  public List<String> getSinkNames()
733    {
734    return new ArrayList<String>( sinks.keySet() );
735    }
736
737  @Override
738  public Tap getSink( String name )
739    {
740    return sinks.get( name );
741    }
742
743  @Override
744  @DependencyOutgoing
745  public Collection<Tap> getSinksCollection()
746    {
747    return getSinks().values();
748    }
749
750  @Override
751  public Tap getSink()
752    {
753    return sinks.values().iterator().next();
754    }
755
756  @Override
757  public Map<String, Tap> getTraps()
758    {
759    return Collections.unmodifiableMap( traps );
760    }
761
762  @Override
763  public List<String> getTrapNames()
764    {
765    return new ArrayList<String>( traps.keySet() );
766    }
767
768  @Override
769  public Collection<Tap> getTrapsCollection()
770    {
771    return getTraps().values();
772    }
773
774  @Override
775  public Map<String, Tap> getCheckpoints()
776    {
777    return Collections.unmodifiableMap( checkpoints );
778    }
779
780  @Override
781  public List<String> getCheckpointNames()
782    {
783    return new ArrayList<String>( checkpoints.keySet() );
784    }
785
786  @Override
787  public Collection<Tap> getCheckpointsCollection()
788    {
789    return getCheckpoints().values();
790    }
791
792  @Override
793  public boolean isStopJobsOnExit()
794    {
795    return stopJobsOnExit;
796    }
797
798  @Override
799  public FlowSkipStrategy getFlowSkipStrategy()
800    {
801    return flowSkipStrategy;
802    }
803
804  @Override
805  public FlowSkipStrategy setFlowSkipStrategy( FlowSkipStrategy flowSkipStrategy )
806    {
807    if( flowSkipStrategy == null )
808      throw new IllegalArgumentException( "flowSkipStrategy may not be null" );
809
810    try
811      {
812      return this.flowSkipStrategy;
813      }
814    finally
815      {
816      this.flowSkipStrategy = flowSkipStrategy;
817      }
818    }
819
820  @Override
821  public boolean isSkipFlow() throws IOException
822    {
823    return flowSkipStrategy.skipFlow( this );
824    }
825
826  @Override
827  public boolean areSinksStale() throws IOException
828    {
829    return areSourcesNewer( getSinkModified() );
830    }
831
832  @Override
833  public boolean areSourcesNewer( long sinkModified ) throws IOException
834    {
835    Config config = getConfig();
836    Iterator<Tap> values = sources.values().iterator();
837
838    long sourceModified = 0;
839
840    try
841      {
842      sourceModified = Util.getSourceModified( config, values, sinkModified );
843
844      if( sinkModified < sourceModified )
845        return true;
846
847      return false;
848      }
849    finally
850      {
851      if( LOG.isInfoEnabled() )
852        logInfo( "source modification date at: " + new Date( sourceModified ) ); // not oldest, we didnt check them all
853      }
854    }
855
856  @Override
857  public long getSinkModified() throws IOException
858    {
859    long sinkModified = Util.getSinkModified( getConfig(), sinks.values() );
860
861    if( LOG.isInfoEnabled() )
862      {
863      if( sinkModified == -1L )
864        logInfo( "at least one sink is marked for delete" );
865      if( sinkModified == 0L )
866        logInfo( "at least one sink does not exist" );
867      else
868        logInfo( "sink oldest modified date: " + new Date( sinkModified ) );
869      }
870
871    return sinkModified;
872    }
873
874  @Override
875  public FlowStepStrategy getFlowStepStrategy()
876    {
877    return flowStepStrategy;
878    }
879
880  @Override
881  public void setFlowStepStrategy( FlowStepStrategy flowStepStrategy )
882    {
883    this.flowStepStrategy = flowStepStrategy;
884    }
885
886  @Override
887  public List<FlowStep<Config>> getFlowSteps()
888    {
889    if( steps != null )
890      return steps;
891
892    if( flowStepGraph == null )
893      return Collections.emptyList();
894
895    Iterator<FlowStep> topoIterator = flowStepGraph.getTopologicalIterator();
896
897    steps = new ArrayList<>();
898
899    while( topoIterator.hasNext() )
900      steps.add( topoIterator.next() );
901
902    return steps;
903    }
904
905  @Override
906  @ProcessPrepare
907  public void prepare()
908    {
909    try
910      {
911      deleteSinksIfNotUpdate();
912      deleteTrapsIfNotUpdate();
913      deleteCheckpointsIfNotUpdate();
914      }
915    catch( IOException exception )
916      {
917      throw new FlowTapException( "unable to prepare flow", exception );
918      }
919    }
920
921  @Override
922  @ProcessStart
923  public synchronized void start()
924    {
925    if( thread != null )
926      return;
927
928    if( stop )
929      return;
930
931    registerShutdownHook();
932
933    internalStart();
934
935    String threadName = ( "flow " + Util.toNull( getName() ) ).trim();
936
937    thread = createFlowThread( threadName );
938
939    thread.start();
940    }
941
942  protected Thread createFlowThread( String threadName )
943    {
944    return new Thread( new Runnable()
945      {
946      @Override
947      public void run()
948        {
949        BaseFlow.this.run();
950        }
951      }, threadName );
952    }
953
954  protected abstract void internalStart();
955
956  @Override
957  @ProcessStop
958  public synchronized void stop()
959    {
960    stopLock.lock();
961
962    try
963      {
964      if( stop )
965        return;
966
967      stop = true;
968
969      fireOnStopping();
970
971      if( !flowStats.isFinished() )
972        flowStats.markStopped();
973
974      internalStopAllJobs();
975
976      handleExecutorShutdown();
977
978      internalClean( true );
979      }
980    finally
981      {
982      flowStats.cleanup();
983      stopLock.unlock();
984      }
985    }
986
987  protected abstract void internalClean( boolean stop );
988
989  @Override
990  @ProcessComplete
991  public void complete()
992    {
993    if( !completed )
994      start();
995
996    try
997      {
998      try
999        {
1000        synchronized( this ) // prevent NPE on quick stop() & complete() after start()
1001          {
1002          while( !completed && thread == null && !stop )
1003            Util.safeSleep( 10 );
1004          }
1005
1006        if( thread != null )
1007          thread.join();
1008        }
1009      catch( InterruptedException exception )
1010        {
1011        throw new FlowException( getName(), "thread interrupted", exception );
1012        }
1013
1014      // if in #stop and stopping, lets wait till its done in this thread
1015      try
1016        {
1017        stopLock.lock();
1018        }
1019      finally
1020        {
1021        stopLock.unlock();
1022        }
1023
1024      // if multiple threads enter #complete, each will get a copy of the exception, if any
1025      if( throwable instanceof FlowException )
1026        ( (FlowException) throwable ).setFlowName( getName() );
1027
1028      if( throwable instanceof CascadingException )
1029        throw (CascadingException) throwable;
1030
1031      if( throwable instanceof OutOfMemoryError )
1032        throw (OutOfMemoryError) throwable;
1033
1034      if( throwable != null )
1035        throw new FlowException( getName(), "unhandled exception", throwable );
1036
1037      if( hasListeners() )
1038        {
1039        for( SafeFlowListener safeFlowListener : getListeners() )
1040          {
1041          if( safeFlowListener.throwable != null )
1042            throw new FlowException( getName(), "unhandled listener exception", throwable );
1043          }
1044        }
1045      }
1046    finally
1047      {
1048      completed = true;
1049      thread = null;
1050      throwable = null;
1051      }
1052    }
1053
1054  private void commitTraps()
1055    {
1056    // commit all the traps, don't fail on an error
1057    for( Tap tap : traps.values() )
1058      {
1059      try
1060        {
1061        if( !tap.commitResource( getConfig() ) )
1062          logError( "unable to commit trap: " + tap.getFullIdentifier( getConfig() ) );
1063        }
1064      catch( IOException exception )
1065        {
1066        logError( "unable to commit trap: " + tap.getFullIdentifier( getConfig() ), exception );
1067        }
1068      }
1069    }
1070
1071  @Override
1072  @ProcessCleanup
1073  public void cleanup()
1074    {
1075    // do nothing
1076    }
1077
1078  @Override
1079  public TupleEntryIterator openSource() throws IOException
1080    {
1081    return sources.values().iterator().next().openForRead( getFlowProcess() );
1082    }
1083
1084  @Override
1085  public TupleEntryIterator openSource( String name ) throws IOException
1086    {
1087    if( !sources.containsKey( name ) )
1088      throw new IllegalArgumentException( "source does not exist: " + name );
1089
1090    return sources.get( name ).openForRead( getFlowProcess() );
1091    }
1092
1093  @Override
1094  public TupleEntryIterator openSink() throws IOException
1095    {
1096    return sinks.values().iterator().next().openForRead( getFlowProcess() );
1097    }
1098
1099  @Override
1100  public TupleEntryIterator openSink( String name ) throws IOException
1101    {
1102    if( !sinks.containsKey( name ) )
1103      throw new IllegalArgumentException( "sink does not exist: " + name );
1104
1105    return sinks.get( name ).openForRead( getFlowProcess() );
1106    }
1107
1108  @Override
1109  public TupleEntryIterator openTrap() throws IOException
1110    {
1111    return traps.values().iterator().next().openForRead( getFlowProcess() );
1112    }
1113
1114  @Override
1115  public TupleEntryIterator openTrap( String name ) throws IOException
1116    {
1117    if( !traps.containsKey( name ) )
1118      throw new IllegalArgumentException( "trap does not exist: " + name );
1119
1120    return traps.get( name ).openForRead( getFlowProcess() );
1121    }
1122
1123  /**
1124   * Method deleteSinks deletes all sinks, whether or not they are configured for {@link cascading.tap.SinkMode#UPDATE}.
1125   * <p/>
1126   * Use with caution.
1127   *
1128   * @throws IOException when
1129   * @see BaseFlow#deleteSinksIfNotUpdate()
1130   */
1131  public void deleteSinks() throws IOException
1132    {
1133    for( Tap tap : sinks.values() )
1134      deleteOrFail( tap );
1135    }
1136
1137  private void deleteOrFail( Tap tap ) throws IOException
1138    {
1139    if( !tap.resourceExists( getConfig() ) )
1140      return;
1141
1142    if( !tap.deleteResource( getConfig() ) )
1143      throw new FlowTapException( "unable to delete resource: " + tap.getFullIdentifier( getFlowProcess() ) );
1144    }
1145
1146  /**
1147   * Method deleteSinksIfNotUpdate deletes all sinks if they are not configured with the {@link cascading.tap.SinkMode#UPDATE} flag.
1148   * <p/>
1149   * Typically used by a {@link Cascade} before executing the flow if the sinks are stale.
1150   * <p/>
1151   * Use with caution.
1152   *
1153   * @throws IOException when
1154   */
1155  public void deleteSinksIfNotUpdate() throws IOException
1156    {
1157    for( Tap tap : sinks.values() )
1158      {
1159      if( !tap.isUpdate() )
1160        deleteOrFail( tap );
1161      }
1162    }
1163
1164  /**
1165   * Method deleteSinksIfReplace deletes all sinks that are configured with the {@link cascading.tap.SinkMode#REPLACE} flag.
1166   *
1167   * @throws IOException
1168   */
1169  public void deleteSinksIfReplace() throws IOException
1170    {
1171    // verify all sinks before incrementally deleting for a replace
1172    for( Tap tap : sinks.values() )
1173      {
1174      if( tap.isKeep() && tap.resourceExists( getConfig() ) )
1175        throw new FlowTapException( "resource exists and sink mode is KEEP, cannot overwrite: " + tap.getFullIdentifier( getFlowProcess() ) );
1176      }
1177
1178    for( Tap tap : sinks.values() )
1179      {
1180      if( tap.isReplace() )
1181        deleteOrFail( tap );
1182      }
1183    }
1184
1185  public void deleteTrapsIfNotUpdate() throws IOException
1186    {
1187    for( Tap tap : traps.values() )
1188      {
1189      if( !tap.isUpdate() )
1190        deleteOrFail( tap );
1191      }
1192    }
1193
1194  public void deleteCheckpointsIfNotUpdate() throws IOException
1195    {
1196    for( Tap tap : checkpoints.values() )
1197      {
1198      if( !tap.isUpdate() )
1199        deleteOrFail( tap );
1200      }
1201    }
1202
1203  public void deleteTrapsIfReplace() throws IOException
1204    {
1205    for( Tap tap : traps.values() )
1206      {
1207      if( tap.isReplace() )
1208        deleteOrFail( tap );
1209      }
1210    }
1211
1212  public void deleteCheckpointsIfReplace() throws IOException
1213    {
1214    for( Tap tap : checkpoints.values() )
1215      {
1216      if( tap.isReplace() )
1217        deleteOrFail( tap );
1218      }
1219    }
1220
1221  @Override
1222  public boolean resourceExists( Tap tap ) throws IOException
1223    {
1224    return tap.resourceExists( getConfig() );
1225    }
1226
1227  @Override
1228  public TupleEntryIterator openTapForRead( Tap tap ) throws IOException
1229    {
1230    return tap.openForRead( getFlowProcess() );
1231    }
1232
1233  @Override
1234  public TupleEntryCollector openTapForWrite( Tap tap ) throws IOException
1235    {
1236    return tap.openForWrite( getFlowProcess() );
1237    }
1238
1239  /** Method run implements the Runnable run method and should not be called by users. */
1240  private void run()
1241    {
1242    if( thread == null )
1243      throw new IllegalStateException( "to start a Flow call start() or complete(), not Runnable#run()" );
1244
1245    Version.printBanner();
1246    Update.checkForUpdate( getPlatformInfo() );
1247
1248    try
1249      {
1250      if( stop )
1251        return;
1252
1253      flowStats.markStarted();
1254
1255      fireOnStarting();
1256
1257      if( isInfoEnabled() )
1258        {
1259        logInfo( "starting" );
1260
1261        for( Tap source : getSourcesCollection() )
1262          logInfo( " source: " + source );
1263        for( Tap sink : getSinksCollection() )
1264          logInfo( " sink: " + sink );
1265        }
1266
1267      spawnSteps();
1268      }
1269    catch( Throwable throwable )
1270      {
1271      this.throwable = throwable;
1272      }
1273    finally
1274      {
1275      handleThrowableAndMarkFailed();
1276
1277      if( !stop && !flowStats.isFinished() )
1278        flowStats.markSuccessful();
1279
1280      internalClean( stop ); // cleaning temp taps may be determined by success/failure
1281
1282      commitTraps();
1283
1284      try
1285        {
1286        fireOnCompleted();
1287        }
1288      finally
1289        {
1290        if( LOG.isInfoEnabled() )
1291          {
1292          long totalSliceCPUSeconds = getTotalSliceCPUMilliSeconds();
1293
1294          if( totalSliceCPUSeconds == -1 )
1295            logInfo( " completed in: " + formatDurationFromMillis( flowStats.getDuration() ) );
1296          else
1297            logInfo( " completed in: " + formatDurationFromMillis( flowStats.getDuration() ) + ", using cpu time: " + formatDurationFromMillis( totalSliceCPUSeconds ) );
1298          }
1299
1300        flowStats.cleanup();
1301        internalShutdown();
1302        deregisterShutdownHook();
1303        }
1304      }
1305    }
1306
1307  /**
1308   * Reentrant method to launch all step jobs, returns false if any of the current steps failed.
1309   *
1310   * @return
1311   * @throws InterruptedException
1312   * @throws ExecutionException
1313   */
1314  protected boolean spawnSteps() throws InterruptedException, ExecutionException
1315    {
1316    // if jobs are run local, then only use one thread to force execution serially
1317    //int numThreads = jobsAreLocal() ? 1 : getMaxConcurrentSteps( getJobConf() );
1318    int numThreads = getMaxNumParallelSteps();
1319    int eligibleJobsSize = getEligibleJobsSize(); // only jobs that have not previously been started
1320
1321    if( numThreads == 0 )
1322      numThreads = eligibleJobsSize;
1323
1324    if( numThreads == 0 )
1325      throw new IllegalStateException( "no jobs rendered for flow: " + getName() );
1326
1327    if( LOG.isInfoEnabled() )
1328      {
1329      logInfo( " parallel execution of steps is enabled: " + ( getMaxNumParallelSteps() != 1 ) );
1330      logInfo( " executing total steps: " + eligibleJobsSize );
1331      logInfo( " allocating management threads: " + numThreads );
1332      }
1333
1334    List<Future<Throwable>> futures = spawnJobs( numThreads );
1335
1336    for( Future<Throwable> future : futures )
1337      {
1338      throwable = future.get();
1339
1340      if( throwable != null )
1341        {
1342        if( !stop )
1343          internalStopAllJobs();
1344
1345        handleExecutorShutdown();
1346        return false;
1347        }
1348      }
1349
1350    return true;
1351    }
1352
1353  protected long getTotalSliceCPUMilliSeconds()
1354    {
1355    return -1;
1356    }
1357
1358  protected abstract int getMaxNumParallelSteps();
1359
1360  protected abstract void internalShutdown();
1361
1362  private List<Future<Throwable>> spawnJobs( int numThreads ) throws InterruptedException
1363    {
1364    if( spawnStrategy == null )
1365      {
1366      logError( "no spawnStrategy set" );
1367      return new ArrayList<>();
1368      }
1369
1370    if( stop )
1371      return new ArrayList<>();
1372
1373    List<Callable<Throwable>> list = getJobMapCallables();
1374
1375    return spawnStrategy.start( this, numThreads, list );
1376    }
1377
1378  private void handleThrowableAndMarkFailed()
1379    {
1380    if( throwable != null && !stop )
1381      {
1382      flowStats.markFailed( throwable );
1383
1384      fireOnThrowable();
1385      }
1386    }
1387
1388  Map<String, FlowStepJob<Config>> getJobsMap()
1389    {
1390    return jobsMap;
1391    }
1392
1393  protected boolean isJobsMapInitialized()
1394    {
1395    return jobsMap != null;
1396    }
1397
1398  protected int getEligibleJobsSize()
1399    {
1400    return getJobMapCallables().size();
1401    }
1402
1403  protected List<Callable<Throwable>> getJobMapCallables()
1404    {
1405    List<Callable<Throwable>> list = new ArrayList<>();
1406
1407    for( FlowStepJob<Config> job : jobsMap.values() )
1408      {
1409      if( job.isCallableStarted() )
1410        continue;
1411
1412      list.add( job );
1413      }
1414
1415    return list;
1416    }
1417
1418  protected void initializeNewJobsMap()
1419    {
1420    jobsMap = updateJobsMap( flowStepGraph, new LinkedHashMap<String, FlowStepJob<Config>>() ); // keep topo order
1421    }
1422
1423  protected void updateJobsMap()
1424    {
1425    jobsMap = updateJobsMap( flowStepGraph, new LinkedHashMap<>( jobsMap ) );
1426    }
1427
1428  Map<String, FlowStepJob<Config>> updateJobsMap( FlowStepGraph flowStepGraph, Map<String, FlowStepJob<Config>> jobsMap )
1429    {
1430    Iterator<FlowStep> iterator = flowStepGraph.getTopologicalIterator();
1431
1432    while( iterator.hasNext() )
1433      {
1434      BaseFlowStep<Config> step = (BaseFlowStep) iterator.next();
1435      FlowStepJob<Config> flowStepJob = jobsMap.get( step.getID() );
1436
1437      if( flowStepJob == null )
1438        {
1439        flowStepJob = step.getCreateFlowStepJob( getFlowProcess(), getConfig() );
1440
1441        jobsMap.put( step.getID(), flowStepJob );
1442        }
1443
1444      List<FlowStepJob<Config>> predecessors = new ArrayList<>();
1445
1446      for( Object flowStep : ProcessGraphs.predecessorListOf( flowStepGraph, step ) )
1447        predecessors.add( jobsMap.get( ( (FlowStep) flowStep ).getID() ) );
1448
1449      flowStepJob.setPredecessors( predecessors );
1450      }
1451
1452    return jobsMap;
1453    }
1454
1455  protected void initializeChildStats()
1456    {
1457    for( FlowStepJob<Config> flowStepJob : jobsMap.values() )
1458      flowStats.addStepStats( flowStepJob.getStepStats() );
1459    }
1460
1461  protected void internalStopAllJobs()
1462    {
1463    logInfo( "stopping all jobs" );
1464
1465    try
1466      {
1467      if( jobsMap == null )
1468        return;
1469
1470      List<FlowStepJob<Config>> jobs = new ArrayList<FlowStepJob<Config>>( jobsMap.values() );
1471
1472      Collections.reverse( jobs );
1473
1474      for( FlowStepJob<Config> job : jobs )
1475        job.stop();
1476      }
1477    finally
1478      {
1479      logInfo( "stopped all jobs" );
1480      }
1481    }
1482
1483  protected void handleExecutorShutdown()
1484    {
1485    if( spawnStrategy == null )
1486      return;
1487
1488    if( spawnStrategy.isCompleted( this ) )
1489      return;
1490
1491    logDebug( "shutting down job executor" );
1492
1493    try
1494      {
1495      spawnStrategy.complete( this, 5 * 60, TimeUnit.SECONDS );
1496      }
1497    catch( InterruptedException exception )
1498      {
1499      // ignore
1500      }
1501
1502    logDebug( "shutdown of job executor complete" );
1503    }
1504
1505  protected void fireOnCompleted()
1506    {
1507    if( hasListeners() )
1508      {
1509      if( isDebugEnabled() )
1510        logDebug( "firing onCompleted event: " + getListeners().size() );
1511
1512      for( FlowListener flowListener : getListeners() )
1513        flowListener.onCompleted( this );
1514      }
1515    }
1516
1517  protected void fireOnThrowable( Throwable throwable )
1518    {
1519    this.throwable = throwable;
1520    fireOnThrowable();
1521    }
1522
1523  protected void fireOnThrowable()
1524    {
1525    if( hasListeners() )
1526      {
1527      if( isDebugEnabled() )
1528        logDebug( "firing onThrowable event: " + getListeners().size() );
1529
1530      boolean isHandled = false;
1531
1532      for( FlowListener flowListener : getListeners() )
1533        isHandled = flowListener.onThrowable( this, throwable ) || isHandled;
1534
1535      if( isHandled )
1536        throwable = null;
1537      }
1538    }
1539
1540  protected void fireOnStopping()
1541    {
1542    if( hasListeners() )
1543      {
1544      if( isDebugEnabled() )
1545        logDebug( "firing onStopping event: " + getListeners().size() );
1546
1547      for( FlowListener flowListener : getListeners() )
1548        flowListener.onStopping( this );
1549      }
1550    }
1551
1552  protected void fireOnStarting()
1553    {
1554    if( hasListeners() )
1555      {
1556      if( isDebugEnabled() )
1557        logDebug( "firing onStarting event: " + getListeners().size() );
1558
1559      for( FlowListener flowListener : getListeners() )
1560        flowListener.onStarting( this );
1561      }
1562    }
1563
1564  @Override
1565  public String toString()
1566    {
1567    StringBuffer buffer = new StringBuffer();
1568
1569    if( getName() != null )
1570      buffer.append( getName() ).append( ": " );
1571
1572    for( FlowStep step : getFlowSteps() )
1573      buffer.append( step );
1574
1575    return buffer.toString();
1576    }
1577
1578  @Override
1579  public final boolean isInfoEnabled()
1580    {
1581    return LOG.isInfoEnabled();
1582    }
1583
1584  @Override
1585  public final boolean isDebugEnabled()
1586    {
1587    return LOG.isDebugEnabled();
1588    }
1589
1590  @Override
1591  public void logInfo( String message, Object... arguments )
1592    {
1593    LOG.info( "[" + Util.truncate( getName(), LOG_FLOW_NAME_MAX ) + "] " + message, arguments );
1594    }
1595
1596  @Override
1597  public void logDebug( String message, Object... arguments )
1598    {
1599    LOG.debug( "[" + Util.truncate( getName(), LOG_FLOW_NAME_MAX ) + "] " + message, arguments );
1600    }
1601
1602  @Override
1603  public void logWarn( String message )
1604    {
1605    LOG.warn( "[" + Util.truncate( getName(), LOG_FLOW_NAME_MAX ) + "] " + message );
1606    }
1607
1608  @Override
1609  public void logWarn( String message, Throwable throwable )
1610    {
1611    LOG.warn( "[" + Util.truncate( getName(), LOG_FLOW_NAME_MAX ) + "] " + message, throwable );
1612    }
1613
1614  @Override
1615  public void logWarn( String message, Object... arguments )
1616    {
1617    LOG.warn( "[" + Util.truncate( getName(), LOG_FLOW_NAME_MAX ) + "] " + message, arguments );
1618    }
1619
1620  @Override
1621  public void logError( String message, Object... arguments )
1622    {
1623    LOG.error( "[" + Util.truncate( getName(), LOG_FLOW_NAME_MAX ) + "] " + message, arguments );
1624    }
1625
1626  @Override
1627  public void logError( String message, Throwable throwable )
1628    {
1629    LOG.error( "[" + Util.truncate( getName(), LOG_FLOW_NAME_MAX ) + "] " + message, throwable );
1630    }
1631
1632  @Override
1633  public void writeDOT( String filename )
1634    {
1635    if( flowElementGraph == null )
1636      throw new UnsupportedOperationException( "this flow instance cannot write a DOT file" );
1637
1638    flowElementGraph.writeDOT( filename );
1639    }
1640
1641  @Override
1642  public void writeStepsDOT( String filename )
1643    {
1644    if( flowStepGraph == null )
1645      throw new UnsupportedOperationException( "this flow instance cannot write a DOT file" );
1646
1647    flowStepGraph.writeDOT( filename );
1648    }
1649
1650  /**
1651   * Used to return a simple wrapper for use as an edge in a graph where there can only be
1652   * one instance of every edge.
1653   *
1654   * @return FlowHolder
1655   */
1656  public FlowHolder getHolder()
1657    {
1658    return new FlowHolder( this );
1659    }
1660
1661  public void setCascade( Cascade cascade )
1662    {
1663    setConfigProperty( getConfig(), "cascading.cascade.id", cascade.getID() );
1664    flowStats.recordInfo();
1665    }
1666
1667  @Override
1668  public String getCascadeID()
1669    {
1670    return getProperty( "cascading.cascade.id" );
1671    }
1672
1673  @Override
1674  public String getRunID()
1675    {
1676    return runID;
1677    }
1678
1679  public List<String> getClassPath()
1680    {
1681    return classPath;
1682    }
1683
1684  @Override
1685  public void setSpawnStrategy( UnitOfWorkSpawnStrategy spawnStrategy )
1686    {
1687    this.spawnStrategy = spawnStrategy;
1688    }
1689
1690  @Override
1691  public UnitOfWorkSpawnStrategy getSpawnStrategy()
1692    {
1693    return spawnStrategy;
1694    }
1695
1696  protected void registerShutdownHook()
1697    {
1698    if( !isStopJobsOnExit() )
1699      return;
1700
1701    shutdownHook = new ShutdownUtil.Hook()
1702      {
1703      @Override
1704      public Priority priority()
1705        {
1706        return Priority.WORK_CHILD;
1707        }
1708
1709      @Override
1710      public void execute()
1711        {
1712        logInfo( "shutdown hook calling stop on flow" );
1713
1714        BaseFlow.this.stop();
1715        }
1716      };
1717
1718    ShutdownUtil.addHook( shutdownHook );
1719    }
1720
1721  private void deregisterShutdownHook()
1722    {
1723    if( !isStopJobsOnExit() || stop )
1724      return;
1725
1726    ShutdownUtil.removeHook( shutdownHook );
1727    }
1728
1729  /** Class FlowHolder is a helper class for wrapping Flow instances. */
1730  public static class FlowHolder
1731    {
1732    /** Field flow */
1733    public Flow flow;
1734
1735    public FlowHolder()
1736      {
1737      }
1738
1739    public FlowHolder( Flow flow )
1740      {
1741      this.flow = flow;
1742      }
1743    }
1744
1745  /**
1746   * Class SafeFlowListener safely calls a wrapped FlowListener.
1747   * <p/>
1748   * This is done for a few reasons, the primary reason is so exceptions thrown by the Listener
1749   * can be caught by the calling Thread. Since Flow is asynchronous, much of the work is done in the run() method
1750   * which in turn is run in a new Thread.
1751   */
1752  private class SafeFlowListener implements FlowListener
1753    {
1754    /** Field flowListener */
1755    final FlowListener flowListener;
1756    /** Field throwable */
1757    Throwable throwable;
1758
1759    private SafeFlowListener( FlowListener flowListener )
1760      {
1761      this.flowListener = flowListener;
1762      }
1763
1764    public void onStarting( Flow flow )
1765      {
1766      try
1767        {
1768        flowListener.onStarting( flow );
1769        }
1770      catch( Throwable throwable )
1771        {
1772        handleThrowable( throwable );
1773        }
1774      }
1775
1776    public void onStopping( Flow flow )
1777      {
1778      try
1779        {
1780        flowListener.onStopping( flow );
1781        }
1782      catch( Throwable throwable )
1783        {
1784        handleThrowable( throwable );
1785        }
1786      }
1787
1788    public void onCompleted( Flow flow )
1789      {
1790      try
1791        {
1792        flowListener.onCompleted( flow );
1793        }
1794      catch( Throwable throwable )
1795        {
1796        handleThrowable( throwable );
1797        }
1798      }
1799
1800    public boolean onThrowable( Flow flow, Throwable flowThrowable )
1801      {
1802      try
1803        {
1804        return flowListener.onThrowable( flow, flowThrowable );
1805        }
1806      catch( Throwable throwable )
1807        {
1808        handleThrowable( throwable );
1809        }
1810
1811      return false;
1812      }
1813
1814    private void handleThrowable( Throwable throwable )
1815      {
1816      this.throwable = throwable;
1817
1818      logWarn( String.format( "flow listener %s threw throwable", flowListener ), throwable );
1819
1820      // stop this flow
1821      stop();
1822      }
1823
1824    public boolean equals( Object object )
1825      {
1826      if( object instanceof BaseFlow.SafeFlowListener )
1827        return flowListener.equals( ( (BaseFlow.SafeFlowListener) object ).flowListener );
1828
1829      return flowListener.equals( object );
1830      }
1831
1832    public int hashCode()
1833      {
1834      return flowListener.hashCode();
1835      }
1836    }
1837  }