001/*
002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.cascade;
022
023import java.io.FileWriter;
024import java.io.IOException;
025import java.io.Writer;
026import java.util.ArrayList;
027import java.util.Collection;
028import java.util.Collections;
029import java.util.HashSet;
030import java.util.LinkedHashMap;
031import java.util.LinkedList;
032import java.util.List;
033import java.util.Map;
034import java.util.Set;
035import java.util.concurrent.Callable;
036import java.util.concurrent.CountDownLatch;
037import java.util.concurrent.Future;
038import java.util.concurrent.TimeUnit;
039
040import cascading.CascadingException;
041import cascading.cascade.planner.FlowGraph;
042import cascading.cascade.planner.IdentifierGraph;
043import cascading.cascade.planner.TapGraph;
044import cascading.flow.BaseFlow;
045import cascading.flow.Flow;
046import cascading.flow.FlowException;
047import cascading.flow.FlowSkipStrategy;
048import cascading.flow.Flows;
049import cascading.management.CascadingServices;
050import cascading.management.UnitOfWorkExecutorStrategy;
051import cascading.management.UnitOfWorkSpawnStrategy;
052import cascading.management.state.ClientState;
053import cascading.stats.CascadeStats;
054import cascading.tap.Tap;
055import cascading.util.ProcessLogger;
056import cascading.util.ShutdownUtil;
057import cascading.util.Util;
058import cascading.util.Version;
059import cascading.util.jgrapht.EdgeNameProvider;
060import cascading.util.jgrapht.IntegerNameProvider;
061import cascading.util.jgrapht.VertexNameProvider;
062import org.jgrapht.Graphs;
063import org.jgrapht.graph.SimpleDirectedGraph;
064import org.jgrapht.traverse.TopologicalOrderIterator;
065import org.slf4j.Logger;
066import org.slf4j.LoggerFactory;
067
068import static cascading.property.PropertyUtil.getProperty;
069
070public class BaseCascade implements ProcessLogger, Cascade
071  {
072  /** Field LOG */
073  private static final Logger LOG = LoggerFactory.getLogger( Cascade.class );
074
075  /** Field id */
076  private String id;
077  /** Field name */
078  private final String name;
079  /** Field tags */
080  private String tags;
081  /** Field properties */
082  private final Map<Object, Object> properties;
083  /** Fields listeners */
084  private List<SafeCascadeListener> listeners;
085  /** Field jobGraph */
086  private final FlowGraph flowGraph;
087  /** Field tapGraph */
088  private final IdentifierGraph identifierGraph;
089  /** Field cascadeStats */
090  private final CascadeStats cascadeStats;
091  /** Field cascadingServices */
092  private CascadingServices cascadingServices;
093  /** Field thread */
094  private Thread thread;
095  /** Field throwable */
096  private Throwable throwable;
097  private transient UnitOfWorkSpawnStrategy spawnStrategy = new UnitOfWorkExecutorStrategy();
098  /** Field shutdownHook */
099  private ShutdownUtil.Hook shutdownHook;
100  /** Field jobsMap */
101  private final Map<String, Callable<Throwable>> jobsMap = new LinkedHashMap<>();
102  /** Field stop */
103  private boolean stop;
104  /** Field flowSkipStrategy */
105  private FlowSkipStrategy flowSkipStrategy = null;
106  /** Field maxConcurrentFlows */
107  private int maxConcurrentFlows = 0;
108
109  /** Field tapGraph * */
110  private transient TapGraph tapGraph;
111
112  static int getMaxConcurrentFlows( Map<Object, Object> properties, int maxConcurrentFlows )
113    {
114    if( maxConcurrentFlows != -1 ) // CascadeDef is -1 by default
115      return maxConcurrentFlows;
116
117    return Integer.parseInt( getProperty( properties, CascadeProps.MAX_CONCURRENT_FLOWS, "0" ) );
118    }
119
120  /** for testing */
121  protected BaseCascade()
122    {
123    this.name = null;
124    this.tags = null;
125    this.properties = null;
126    this.flowGraph = null;
127    this.identifierGraph = null;
128    this.cascadeStats = null;
129    }
130
131  BaseCascade( CascadeDef cascadeDef, Map<Object, Object> properties, FlowGraph flowGraph, IdentifierGraph identifierGraph )
132    {
133    this.name = cascadeDef.getName();
134    this.tags = cascadeDef.getTags();
135    this.properties = properties;
136    this.flowGraph = flowGraph;
137    this.identifierGraph = identifierGraph;
138    this.cascadeStats = createPrepareCascadeStats();
139    setIDOnFlow();
140    this.maxConcurrentFlows = cascadeDef.getMaxConcurrentFlows();
141
142    addListeners( getAllTaps() );
143    }
144
145  private CascadeStats createPrepareCascadeStats()
146    {
147    CascadeStats cascadeStats = new CascadeStats( this, getClientState() );
148
149    cascadeStats.prepare();
150    cascadeStats.markPending();
151
152    return cascadeStats;
153    }
154
155  /**
156   * Method getName returns the name of this Cascade object.
157   *
158   * @return the name (type String) of this Cascade object.
159   */
160  @Override
161  public String getName()
162    {
163    return name;
164    }
165
166  /**
167   * Method getID returns the ID of this Cascade object.
168   * <p/>
169   * The ID value is a long HEX String used to identify this instance globally. Subsequent Cascade
170   * instances created with identical parameters will not return the same ID.
171   *
172   * @return the ID (type String) of this Cascade object.
173   */
174  @Override
175  public String getID()
176    {
177    if( id == null )
178      id = Util.createUniqueID();
179
180    return id;
181    }
182
183  /**
184   * Method getTags returns the tags associated with this Cascade object.
185   *
186   * @return the tags (type String) of this Cascade object.
187   */
188  @Override
189  public String getTags()
190    {
191    return tags;
192    }
193
194  void addListeners( Collection listeners )
195    {
196    for( Object listener : listeners )
197      {
198      if( listener instanceof CascadeListener )
199        addListener( (CascadeListener) listener );
200      }
201    }
202
203  List<SafeCascadeListener> getListeners()
204    {
205    if( listeners == null )
206      listeners = new LinkedList<SafeCascadeListener>();
207
208    return listeners;
209    }
210
211  @Override
212  public boolean hasListeners()
213    {
214    return listeners != null && !listeners.isEmpty();
215    }
216
217  @Override
218  public void addListener( CascadeListener cascadeListener )
219    {
220    getListeners().add( new SafeCascadeListener( cascadeListener ) );
221    }
222
223  @Override
224  public boolean removeListener( CascadeListener flowListener )
225    {
226    return getListeners().remove( new SafeCascadeListener( flowListener ) );
227    }
228
229  private void fireOnCompleted()
230    {
231    if( hasListeners() )
232      {
233      if( isDebugEnabled() )
234        logDebug( "firing onCompleted event: " + getListeners().size() );
235
236      for( CascadeListener cascadeListener : getListeners() )
237        cascadeListener.onCompleted( this );
238      }
239    }
240
241  private void fireOnThrowable()
242    {
243    if( hasListeners() )
244      {
245      if( isDebugEnabled() )
246        logDebug( "firing onThrowable event: " + getListeners().size() );
247
248      boolean isHandled = false;
249
250      for( CascadeListener cascadeListener : getListeners() )
251        isHandled = cascadeListener.onThrowable( this, throwable ) || isHandled;
252
253      if( isHandled )
254        throwable = null;
255      }
256    }
257
258  protected void fireOnStopping()
259    {
260    if( hasListeners() )
261      {
262      if( isDebugEnabled() )
263        logDebug( "firing onStopping event: " + getListeners().size() );
264
265      for( CascadeListener cascadeListener : getListeners() )
266        cascadeListener.onStopping( this );
267      }
268    }
269
270  protected void fireOnStarting()
271    {
272    if( hasListeners() )
273      {
274      if( isDebugEnabled() )
275        logDebug( "firing onStarting event: " + getListeners().size() );
276
277      for( CascadeListener cascadeListener : getListeners() )
278        cascadeListener.onStarting( this );
279      }
280    }
281
282  private CascadingServices getCascadingServices()
283    {
284    if( cascadingServices == null )
285      cascadingServices = new CascadingServices( properties );
286
287    return cascadingServices;
288    }
289
290  private ClientState getClientState()
291    {
292    return getCascadingServices().createClientState( getID() );
293    }
294
295  @Override
296  public CascadeStats getCascadeStats()
297    {
298    return cascadeStats;
299    }
300
301  @Override
302  public CascadeStats getStats()
303    {
304    return getCascadeStats();
305    }
306
307  private void setIDOnFlow()
308    {
309    for( Flow<?> flow : getFlows() )
310      ( (BaseFlow<?>) flow ).setCascade( this );
311    }
312
313  protected FlowGraph getFlowGraph()
314    {
315    return flowGraph;
316    }
317
318  protected IdentifierGraph getIdentifierGraph()
319    {
320    return identifierGraph;
321    }
322
323  @Override
324  public List<Flow> getFlows()
325    {
326    List<Flow> flows = new LinkedList<Flow>();
327    TopologicalOrderIterator<Flow, Integer> topoIterator = flowGraph.getTopologicalIterator();
328
329    while( topoIterator.hasNext() )
330      flows.add( topoIterator.next() );
331
332    return flows;
333    }
334
335  @Override
336  public List<Flow> findFlows( String regex )
337    {
338    List<Flow> flows = new ArrayList<Flow>();
339
340    for( Flow flow : getFlows() )
341      {
342      if( flow.getName().matches( regex ) )
343        flows.add( flow );
344      }
345
346    return flows;
347    }
348
349  @Override
350  public Collection<Flow> getHeadFlows()
351    {
352    Set<Flow> flows = new HashSet<Flow>();
353
354    for( Flow flow : flowGraph.vertexSet() )
355      {
356      if( flowGraph.inDegreeOf( flow ) == 0 )
357        flows.add( flow );
358      }
359
360    return flows;
361    }
362
363  @Override
364  public Collection<Flow> getTailFlows()
365    {
366    Set<Flow> flows = new HashSet<Flow>();
367
368    for( Flow flow : flowGraph.vertexSet() )
369      {
370      if( flowGraph.outDegreeOf( flow ) == 0 )
371        flows.add( flow );
372      }
373
374    return flows;
375    }
376
377  @Override
378  public Collection<Flow> getIntermediateFlows()
379    {
380    Set<Flow> flows = new HashSet<Flow>( flowGraph.vertexSet() );
381
382    flows.removeAll( getHeadFlows() );
383    flows.removeAll( getTailFlows() );
384
385    return flows;
386    }
387
388  protected TapGraph getTapGraph()
389    {
390    if( tapGraph == null )
391      tapGraph = new TapGraph( flowGraph.vertexSet() );
392
393    return tapGraph;
394    }
395
396  @Override
397  public Collection<Tap> getSourceTaps()
398    {
399    TapGraph tapGraph = getTapGraph();
400    Set<Tap> taps = new HashSet<Tap>();
401
402    for( Tap tap : tapGraph.vertexSet() )
403      {
404      if( tapGraph.inDegreeOf( tap ) == 0 )
405        taps.add( tap );
406      }
407
408    return taps;
409    }
410
411  @Override
412  public Collection<Tap> getSinkTaps()
413    {
414    TapGraph tapGraph = getTapGraph();
415    Set<Tap> taps = new HashSet<Tap>();
416
417    for( Tap tap : tapGraph.vertexSet() )
418      {
419      if( tapGraph.outDegreeOf( tap ) == 0 )
420        taps.add( tap );
421      }
422
423    return taps;
424    }
425
426  @Override
427  public Collection<Tap> getCheckpointsTaps()
428    {
429    Set<Tap> taps = new HashSet<Tap>();
430
431    for( Flow flow : getFlows() )
432      taps.addAll( flow.getCheckpointsCollection() );
433
434    return taps;
435    }
436
437  @Override
438  public Collection<Tap> getIntermediateTaps()
439    {
440    TapGraph tapGraph = getTapGraph();
441    Set<Tap> taps = new HashSet<Tap>( tapGraph.vertexSet() );
442
443    taps.removeAll( getSourceTaps() );
444    taps.removeAll( getSinkTaps() );
445
446    return taps;
447    }
448
449  @Override
450  public Collection<Tap> getAllTaps()
451    {
452    return new HashSet<Tap>( getTapGraph().vertexSet() );
453    }
454
455  @Override
456  public Collection<Flow> getSuccessorFlows( Flow flow )
457    {
458    return Graphs.successorListOf( flowGraph, flow );
459    }
460
461  @Override
462  public Collection<Flow> getPredecessorFlows( Flow flow )
463    {
464    return Graphs.predecessorListOf( flowGraph, flow );
465    }
466
467  @Override
468  public Collection<Flow> findFlowsSourcingFrom( String identifier )
469    {
470    try
471      {
472      return unwrapFlows( identifierGraph.outgoingEdgesOf( identifier ) );
473      }
474    catch( Exception exception )
475      {
476      return Collections.emptySet();
477      }
478    }
479
480  @Override
481  public Collection<Flow> findFlowsSinkingTo( String identifier )
482    {
483    try
484      {
485      return unwrapFlows( identifierGraph.incomingEdgesOf( identifier ) );
486      }
487    catch( Exception exception )
488      {
489      return Collections.emptySet();
490      }
491    }
492
493  private Collection<Flow> unwrapFlows( Set<BaseFlow.FlowHolder> flowHolders )
494    {
495    Set<Flow> flows = new HashSet<Flow>();
496
497    for( BaseFlow.FlowHolder flowHolder : flowHolders )
498      flows.add( flowHolder.flow );
499
500    return flows;
501    }
502
503  @Override
504  public FlowSkipStrategy getFlowSkipStrategy()
505    {
506    return flowSkipStrategy;
507    }
508
509  @Override
510  public FlowSkipStrategy setFlowSkipStrategy( FlowSkipStrategy flowSkipStrategy )
511    {
512    try
513      {
514      return this.flowSkipStrategy;
515      }
516    finally
517      {
518      this.flowSkipStrategy = flowSkipStrategy;
519      }
520    }
521
522  @Override
523  public void prepare()
524    {
525    }
526
527  @Override
528  public void start()
529    {
530    if( thread != null )
531      return;
532
533    thread = new Thread( new Runnable()
534    {
535    @Override
536    public void run()
537      {
538      BaseCascade.this.run();
539      }
540    }, ( "cascade " + Util.toNull( getName() ) ).trim() );
541
542    thread.start();
543    }
544
545  @Override
546  public void complete()
547    {
548    start();
549
550    try
551      {
552      try
553        {
554        thread.join();
555        }
556      catch( InterruptedException exception )
557        {
558        throw new FlowException( "thread interrupted", exception );
559        }
560
561      if( throwable instanceof CascadingException )
562        throw (CascadingException) throwable;
563
564      if( throwable != null )
565        throw new CascadeException( "unhandled exception", throwable );
566      }
567    finally
568      {
569      thread = null;
570      throwable = null;
571      shutdownHook = null;
572      cascadeStats.cleanup();
573      }
574    }
575
576  @Override
577  public synchronized void stop()
578    {
579    if( stop )
580      return;
581
582    stop = true;
583
584    fireOnStopping();
585
586    if( !cascadeStats.isFinished() )
587      cascadeStats.markStopped();
588
589    internalStopAllFlows();
590    handleExecutorShutdown();
591
592    cascadeStats.cleanup();
593    }
594
595  @Override
596  public void cleanup()
597    {
598    }
599
600  /** Method run implements the Runnable run method. */
601  private void run()
602    {
603    Version.printBanner();
604
605    if( LOG.isInfoEnabled() )
606      logInfo( "starting" );
607
608    registerShutdownHook();
609
610    try
611      {
612      if( stop )
613        return;
614
615      // mark started, not submitted
616      cascadeStats.markStartedThenRunning();
617
618      fireOnStarting();
619
620      initializeNewJobsMap();
621
622      int numThreads = getMaxConcurrentFlows( properties, maxConcurrentFlows );
623
624      if( numThreads == 0 )
625        numThreads = jobsMap.size();
626
627      int numLocalFlows = numLocalFlows();
628
629      boolean runFlowsLocal = numLocalFlows > 1;
630
631      if( runFlowsLocal )
632        numThreads = 1;
633
634      if( isInfoEnabled() )
635        {
636        logInfo( " parallel execution of flows is enabled: " + ( numThreads != 1 ) );
637        logInfo( " executing total flows: " + jobsMap.size() );
638        logInfo( " allocating management threads: " + numThreads );
639        }
640
641      List<Future<Throwable>> futures = spawnStrategy.start( this, numThreads, jobsMap.values() );
642
643      for( Future<Throwable> future : futures )
644        {
645        throwable = future.get();
646
647        if( throwable != null )
648          {
649          if( !stop )
650            {
651            if( !cascadeStats.isFinished() )
652              cascadeStats.markFailed( throwable );
653            internalStopAllFlows();
654            fireOnThrowable();
655            }
656
657          handleExecutorShutdown();
658          break;
659          }
660        }
661      }
662    catch( Throwable throwable )
663      {
664      this.throwable = throwable;
665      }
666    finally
667      {
668      if( !cascadeStats.isFinished() )
669        cascadeStats.markSuccessful();
670
671      try
672        {
673        fireOnCompleted();
674        }
675      finally
676        {
677        deregisterShutdownHook();
678        }
679      }
680    }
681
682  private void registerShutdownHook()
683    {
684    if( !isStopJobsOnExit() )
685      return;
686
687    shutdownHook = new ShutdownUtil.Hook()
688    {
689    @Override
690    public Priority priority()
691      {
692      return Priority.WORK_PARENT;
693      }
694
695    @Override
696    public void execute()
697      {
698      logInfo( "shutdown hook calling stop on cascade" );
699
700      BaseCascade.this.stop();
701      }
702    };
703
704    ShutdownUtil.addHook( shutdownHook );
705    }
706
707  private void deregisterShutdownHook()
708    {
709    if( !isStopJobsOnExit() || stop )
710      return;
711
712    ShutdownUtil.removeHook( shutdownHook );
713    }
714
715  private boolean isStopJobsOnExit()
716    {
717    if( getFlows().isEmpty() )
718      return false; // don't bother registering hook
719
720    return getFlows().get( 0 ).isStopJobsOnExit();
721    }
722
723  /**
724   * If the number of flows that are local is greater than one, force the Cascade to run without parallelization.
725   *
726   * @return of type int
727   */
728  private int numLocalFlows()
729    {
730    int countLocalJobs = 0;
731
732    for( Flow flow : getFlows() )
733      {
734      if( flow.stepsAreLocal() )
735        countLocalJobs++;
736      }
737
738    return countLocalJobs;
739    }
740
741  private void initializeNewJobsMap()
742    {
743    synchronized( jobsMap )
744      {
745      // keep topo order
746      TopologicalOrderIterator<Flow, Integer> topoIterator = flowGraph.getTopologicalIterator();
747
748      while( topoIterator.hasNext() )
749        {
750        Flow flow = topoIterator.next();
751
752        cascadeStats.addFlowStats( flow.getFlowStats() );
753
754        CascadeJob job = new CascadeJob( flow );
755
756        jobsMap.put( flow.getName(), job );
757
758        List<CascadeJob> predecessors = new ArrayList<CascadeJob>();
759
760        for( Flow predecessor : Graphs.predecessorListOf( flowGraph, flow ) )
761          predecessors.add( (CascadeJob) jobsMap.get( predecessor.getName() ) );
762
763        job.init( predecessors );
764        }
765      }
766    }
767
768  private void handleExecutorShutdown()
769    {
770    if( spawnStrategy.isCompleted( this ) )
771      return;
772
773    logInfo( "shutting down flow executor" );
774
775    try
776      {
777      spawnStrategy.complete( this, 5 * 60, TimeUnit.SECONDS );
778      }
779    catch( InterruptedException exception )
780      {
781      // ignore
782      }
783
784    logInfo( "shutdown complete" );
785    }
786
787  private void internalStopAllFlows()
788    {
789    logInfo( "stopping all flows" );
790
791    synchronized( jobsMap )
792      {
793      List<Callable<Throwable>> jobs = new ArrayList<Callable<Throwable>>( jobsMap.values() );
794
795      Collections.reverse( jobs );
796
797      for( Callable<Throwable> callable : jobs )
798        ( (CascadeJob) callable ).stop();
799      }
800
801    logInfo( "stopped all flows" );
802    }
803
804  @Override
805  public void writeDOT( String filename )
806    {
807    printElementGraph( filename, identifierGraph );
808    }
809
810  protected void printElementGraph( String filename, SimpleDirectedGraph<String, BaseFlow.FlowHolder> graph )
811    {
812    try
813      {
814      Writer writer = new FileWriter( filename );
815
816      Util.writeDOT( writer, graph, new IntegerNameProvider<String>(), new VertexNameProvider<String>()
817        {
818        public String getVertexName( String object )
819          {
820          return object.toString().replaceAll( "\"", "\'" );
821          }
822        }, new EdgeNameProvider<BaseFlow.FlowHolder>()
823        {
824        public String getEdgeName( BaseFlow.FlowHolder object )
825          {
826          return object.flow.getName().replaceAll( "\"", "\'" ).replaceAll( "\n", "\\\\n" ); // fix for newlines in graphviz
827          }
828        }
829      );
830
831      writer.close();
832      }
833    catch( IOException exception )
834      {
835      logError( "failed printing graph to: {}, with exception: {}", filename, exception );
836      }
837    }
838
839  @Override
840  public String toString()
841    {
842    return getName();
843    }
844
845  @Override
846  public boolean isInfoEnabled()
847    {
848    return LOG.isInfoEnabled();
849    }
850
851  @Override
852  public boolean isDebugEnabled()
853    {
854    return LOG.isDebugEnabled();
855    }
856
857  @Override
858  public void logInfo( String message, Object... arguments )
859    {
860    LOG.info( "[" + Util.truncate( getName(), 25 ) + "] " + message, arguments );
861    }
862
863  @Override
864  public void logDebug( String message, Object... arguments )
865    {
866    LOG.debug( "[" + Util.truncate( getName(), 25 ) + "] " + message, arguments );
867    }
868
869  @Override
870  public void logWarn( String message )
871    {
872    LOG.warn( "[" + Util.truncate( getName(), 25 ) + "] " + message );
873    }
874
875  @Override
876  public void logWarn( String message, Throwable throwable )
877    {
878    LOG.warn( "[" + Util.truncate( getName(), 25 ) + "] " + message, throwable );
879    }
880
881  @Override
882  public void logWarn( String message, Object... arguments )
883    {
884    LOG.warn( "[" + Util.truncate( getName(), 25 ) + "] " + message, arguments );
885    }
886
887  @Override
888  public void logError( String message, Object... arguments )
889    {
890    LOG.warn( "[" + Util.truncate( getName(), 25 ) + "] " + message, arguments );
891    }
892
893  @Override
894  public void logError( String message, Throwable throwable )
895    {
896    LOG.warn( "[" + Util.truncate( getName(), 25 ) + "] " + message, throwable );
897    }
898
899  /** Class CascadeJob manages Flow execution in the current Cascade instance. */
900  protected class CascadeJob implements Callable<Throwable>
901    {
902    /** Field flow */
903    final Flow flow;
904    /** Field predecessors */
905    private List<CascadeJob> predecessors;
906    /** Field latch */
907    private final CountDownLatch latch = new CountDownLatch( 1 );
908    /** Field stop */
909    private boolean stop = false;
910    /** Field failed */
911    private boolean failed = false;
912
913    public CascadeJob( Flow flow )
914      {
915      this.flow = flow;
916      }
917
918    public String getName()
919      {
920      return flow.getName();
921      }
922
923    public Throwable call()
924      {
925      try
926        {
927        for( CascadeJob predecessor : predecessors )
928          {
929          if( !predecessor.isSuccessful() )
930            return null;
931          }
932
933        if( stop || cascadeStats.isFinished() )
934          return null;
935
936        try
937          {
938          if( LOG.isInfoEnabled() )
939            logInfo( "starting flow: " + flow.getName() );
940
941          if( flowSkipStrategy == null ? flow.isSkipFlow() : flowSkipStrategy.skipFlow( flow ) )
942            {
943            if( LOG.isInfoEnabled() )
944              logInfo( "skipping flow: " + flow.getName() );
945
946            flow.getFlowStats().markSkipped();
947            Flows.fireOnCompleted( flow );
948
949            return null;
950            }
951
952          flow.prepare(); // do not delete append/update mode taps
953          flow.complete();
954
955          if( LOG.isInfoEnabled() )
956            logInfo( "completed flow: " + flow.getName() );
957          }
958        catch( Throwable exception )
959          {
960          failed = true;
961          logWarn( "flow failed: " + flow.getName(), exception );
962
963          CascadeException cascadeException = new CascadeException( "flow failed: " + flow.getName(), exception );
964
965          if( !cascadeStats.isFinished() )
966            cascadeStats.markFailed( cascadeException );
967
968          return cascadeException;
969          }
970        finally
971          {
972          flow.cleanup();
973          }
974        }
975      catch( Throwable throwable )
976        {
977        failed = true;
978        return throwable;
979        }
980      finally
981        {
982        latch.countDown();
983        }
984
985      return null;
986      }
987
988    public void init( List<CascadeJob> predecessors )
989      {
990      this.predecessors = predecessors;
991      }
992
993    public void stop()
994      {
995      if( LOG.isInfoEnabled() )
996        logInfo( "stopping flow: " + flow.getName() );
997
998      stop = true;
999
1000      if( flow != null )
1001        flow.stop();
1002      }
1003
1004    public boolean isSuccessful()
1005      {
1006      try
1007        {
1008        latch.await();
1009
1010        return flow != null && !failed && !stop;
1011        }
1012      catch( InterruptedException exception )
1013        {
1014        logWarn( "latch interrupted", exception );
1015        }
1016
1017      return false;
1018      }
1019    }
1020
1021  @Override
1022  public UnitOfWorkSpawnStrategy getSpawnStrategy()
1023    {
1024    return spawnStrategy;
1025    }
1026
1027  @Override
1028  public void setSpawnStrategy( UnitOfWorkSpawnStrategy spawnStrategy )
1029    {
1030    this.spawnStrategy = spawnStrategy;
1031    }
1032
1033  /**
1034   * Class SafeCascadeListener safely calls a wrapped CascadeListener.
1035   * <p/>
1036   * This is done for a few reasons, the primary reason is so exceptions thrown by the Listener
1037   * can be caught by the calling Thread. Since Cascade is asynchronous, much of the work is done in the run() method
1038   * which in turn is run in a new Thread.
1039   */
1040  private class SafeCascadeListener implements CascadeListener
1041    {
1042    /** Field flowListener */
1043    final CascadeListener cascadeListener;
1044    /** Field throwable */
1045    Throwable throwable;
1046
1047    private SafeCascadeListener( CascadeListener cascadeListener )
1048      {
1049      this.cascadeListener = cascadeListener;
1050      }
1051
1052    public void onStarting( Cascade cascade )
1053      {
1054      try
1055        {
1056        cascadeListener.onStarting( cascade );
1057        }
1058      catch( Throwable throwable )
1059        {
1060        handleThrowable( throwable );
1061        }
1062      }
1063
1064    public void onStopping( Cascade cascade )
1065      {
1066      try
1067        {
1068        cascadeListener.onStopping( cascade );
1069        }
1070      catch( Throwable throwable )
1071        {
1072        handleThrowable( throwable );
1073        }
1074      }
1075
1076    public void onCompleted( Cascade cascade )
1077      {
1078      try
1079        {
1080        cascadeListener.onCompleted( cascade );
1081        }
1082      catch( Throwable throwable )
1083        {
1084        handleThrowable( throwable );
1085        }
1086      }
1087
1088    public boolean onThrowable( Cascade cascade, Throwable flowThrowable )
1089      {
1090      try
1091        {
1092        return cascadeListener.onThrowable( cascade, flowThrowable );
1093        }
1094      catch( Throwable throwable )
1095        {
1096        handleThrowable( throwable );
1097        }
1098
1099      return false;
1100      }
1101
1102    private void handleThrowable( Throwable throwable )
1103      {
1104      this.throwable = throwable;
1105
1106      logWarn( String.format( "cascade listener %s threw throwable", cascadeListener ), throwable );
1107
1108      // stop this flow
1109      stop();
1110      }
1111
1112    public boolean equals( Object object )
1113      {
1114      if( object instanceof SafeCascadeListener )
1115        return cascadeListener.equals( ( (SafeCascadeListener) object ).cascadeListener );
1116
1117      return cascadeListener.equals( object );
1118      }
1119
1120    public int hashCode()
1121      {
1122      return cascadeListener.hashCode();
1123      }
1124    }
1125  }