001/*
002 * Copyright (c) 2016-2018 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
004 *
005 * Project and contact information: http://www.cascading.org/
006 *
007 * This file is part of the Cascading project.
008 *
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *     http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 */
021
022package cascading.cascade;
023
024import java.io.FileWriter;
025import java.io.IOException;
026import java.io.Writer;
027import java.util.ArrayList;
028import java.util.Collection;
029import java.util.Collections;
030import java.util.HashSet;
031import java.util.LinkedHashMap;
032import java.util.LinkedList;
033import java.util.List;
034import java.util.Map;
035import java.util.Set;
036import java.util.concurrent.Callable;
037import java.util.concurrent.CountDownLatch;
038import java.util.concurrent.Future;
039import java.util.concurrent.TimeUnit;
040
041import cascading.CascadingException;
042import cascading.cascade.planner.FlowGraph;
043import cascading.cascade.planner.IdentifierGraph;
044import cascading.cascade.planner.TapGraph;
045import cascading.flow.BaseFlow;
046import cascading.flow.Flow;
047import cascading.flow.FlowException;
048import cascading.flow.FlowSkipStrategy;
049import cascading.flow.Flows;
050import cascading.management.CascadingServices;
051import cascading.management.UnitOfWorkExecutorStrategy;
052import cascading.management.UnitOfWorkSpawnStrategy;
053import cascading.management.state.ClientState;
054import cascading.stats.CascadeStats;
055import cascading.tap.Tap;
056import cascading.util.ProcessLogger;
057import cascading.util.ShutdownUtil;
058import cascading.util.Util;
059import cascading.util.Version;
060import cascading.util.jgrapht.EdgeNameProvider;
061import cascading.util.jgrapht.IntegerNameProvider;
062import cascading.util.jgrapht.VertexNameProvider;
063import org.jgrapht.Graphs;
064import org.jgrapht.graph.SimpleDirectedGraph;
065import org.jgrapht.traverse.TopologicalOrderIterator;
066import org.slf4j.Logger;
067import org.slf4j.LoggerFactory;
068
069import static cascading.property.PropertyUtil.getProperty;
070
071public class BaseCascade implements ProcessLogger, Cascade
072  {
073  /** Field LOG */
074  private static final Logger LOG = LoggerFactory.getLogger( Cascade.class );
075
076  /** Field id */
077  private String id;
078  /** Field name */
079  private final String name;
080  /** Field tags */
081  private String tags;
082  /** Field properties */
083  private final Map<Object, Object> properties;
084  /** Fields listeners */
085  private List<SafeCascadeListener> listeners;
086  /** Field jobGraph */
087  private final FlowGraph flowGraph;
088  /** Field tapGraph */
089  private final IdentifierGraph identifierGraph;
090  /** Field cascadeStats */
091  private final CascadeStats cascadeStats;
092  /** Field cascadingServices */
093  private CascadingServices cascadingServices;
094  /** Field thread */
095  private Thread thread;
096  /** Field throwable */
097  private Throwable throwable;
098  private transient UnitOfWorkSpawnStrategy spawnStrategy = new UnitOfWorkExecutorStrategy();
099  /** Field shutdownHook */
100  private ShutdownUtil.Hook shutdownHook;
101  /** Field jobsMap */
102  private final Map<String, Callable<Throwable>> jobsMap = new LinkedHashMap<>();
103  /** Field stop */
104  private boolean stop;
105  /** Field flowSkipStrategy */
106  private FlowSkipStrategy flowSkipStrategy = null;
107  /** Field maxConcurrentFlows */
108  private int maxConcurrentFlows = 0;
109
110  /** Field tapGraph * */
111  private transient TapGraph tapGraph;
112
113  static int getMaxConcurrentFlows( Map<Object, Object> properties, int maxConcurrentFlows )
114    {
115    if( maxConcurrentFlows != -1 ) // CascadeDef is -1 by default
116      return maxConcurrentFlows;
117
118    return Integer.parseInt( getProperty( properties, CascadeProps.MAX_CONCURRENT_FLOWS, "0" ) );
119    }
120
121  /** for testing */
122  protected BaseCascade()
123    {
124    this.name = null;
125    this.tags = null;
126    this.properties = null;
127    this.flowGraph = null;
128    this.identifierGraph = null;
129    this.cascadeStats = null;
130    }
131
132  BaseCascade( CascadeDef cascadeDef, Map<Object, Object> properties, FlowGraph flowGraph, IdentifierGraph identifierGraph )
133    {
134    this.name = cascadeDef.getName();
135    this.tags = cascadeDef.getTags();
136    this.properties = properties;
137    this.flowGraph = flowGraph;
138    this.identifierGraph = identifierGraph;
139    this.cascadeStats = createPrepareCascadeStats();
140    setIDOnFlow();
141    this.maxConcurrentFlows = cascadeDef.getMaxConcurrentFlows();
142
143    addListeners( getAllTaps() );
144    }
145
146  private CascadeStats createPrepareCascadeStats()
147    {
148    CascadeStats cascadeStats = new CascadeStats( this, getClientState() );
149
150    cascadeStats.prepare();
151    cascadeStats.markPending();
152
153    return cascadeStats;
154    }
155
156  /**
157   * Method getName returns the name of this Cascade object.
158   *
159   * @return the name (type String) of this Cascade object.
160   */
161  @Override
162  public String getName()
163    {
164    return name;
165    }
166
167  /**
168   * Method getID returns the ID of this Cascade object.
169   * <p>
170   * The ID value is a long HEX String used to identify this instance globally. Subsequent Cascade
171   * instances created with identical parameters will not return the same ID.
172   *
173   * @return the ID (type String) of this Cascade object.
174   */
175  @Override
176  public String getID()
177    {
178    if( id == null )
179      id = Util.createUniqueID();
180
181    return id;
182    }
183
184  /**
185   * Method getTags returns the tags associated with this Cascade object.
186   *
187   * @return the tags (type String) of this Cascade object.
188   */
189  @Override
190  public String getTags()
191    {
192    return tags;
193    }
194
195  void addListeners( Collection listeners )
196    {
197    for( Object listener : listeners )
198      {
199      if( listener instanceof CascadeListener )
200        addListener( (CascadeListener) listener );
201      }
202    }
203
204  List<SafeCascadeListener> getListeners()
205    {
206    if( listeners == null )
207      listeners = new LinkedList<SafeCascadeListener>();
208
209    return listeners;
210    }
211
212  @Override
213  public boolean hasListeners()
214    {
215    return listeners != null && !listeners.isEmpty();
216    }
217
218  @Override
219  public void addListener( CascadeListener cascadeListener )
220    {
221    getListeners().add( new SafeCascadeListener( cascadeListener ) );
222    }
223
224  @Override
225  public boolean removeListener( CascadeListener flowListener )
226    {
227    return getListeners().remove( new SafeCascadeListener( flowListener ) );
228    }
229
230  private void fireOnCompleted()
231    {
232    if( hasListeners() )
233      {
234      if( isDebugEnabled() )
235        logDebug( "firing onCompleted event: " + getListeners().size() );
236
237      for( CascadeListener cascadeListener : getListeners() )
238        cascadeListener.onCompleted( this );
239      }
240    }
241
242  private void fireOnThrowable()
243    {
244    if( hasListeners() )
245      {
246      if( isDebugEnabled() )
247        logDebug( "firing onThrowable event: " + getListeners().size() );
248
249      boolean isHandled = false;
250
251      for( CascadeListener cascadeListener : getListeners() )
252        isHandled = cascadeListener.onThrowable( this, throwable ) || isHandled;
253
254      if( isHandled )
255        throwable = null;
256      }
257    }
258
259  protected void fireOnStopping()
260    {
261    if( hasListeners() )
262      {
263      if( isDebugEnabled() )
264        logDebug( "firing onStopping event: " + getListeners().size() );
265
266      for( CascadeListener cascadeListener : getListeners() )
267        cascadeListener.onStopping( this );
268      }
269    }
270
271  protected void fireOnStarting()
272    {
273    if( hasListeners() )
274      {
275      if( isDebugEnabled() )
276        logDebug( "firing onStarting event: " + getListeners().size() );
277
278      for( CascadeListener cascadeListener : getListeners() )
279        cascadeListener.onStarting( this );
280      }
281    }
282
283  private CascadingServices getCascadingServices()
284    {
285    if( cascadingServices == null )
286      cascadingServices = new CascadingServices( properties );
287
288    return cascadingServices;
289    }
290
291  private ClientState getClientState()
292    {
293    return getCascadingServices().createClientState( getID() );
294    }
295
296  @Override
297  public CascadeStats getCascadeStats()
298    {
299    return cascadeStats;
300    }
301
302  @Override
303  public CascadeStats getStats()
304    {
305    return getCascadeStats();
306    }
307
308  private void setIDOnFlow()
309    {
310    for( Flow<?> flow : getFlows() )
311      ( (BaseFlow<?>) flow ).setCascade( this );
312    }
313
314  protected FlowGraph getFlowGraph()
315    {
316    return flowGraph;
317    }
318
319  protected IdentifierGraph getIdentifierGraph()
320    {
321    return identifierGraph;
322    }
323
324  @Override
325  public List<Flow> getFlows()
326    {
327    List<Flow> flows = new LinkedList<Flow>();
328    TopologicalOrderIterator<Flow, Integer> topoIterator = flowGraph.getTopologicalIterator();
329
330    while( topoIterator.hasNext() )
331      flows.add( topoIterator.next() );
332
333    return flows;
334    }
335
336  @Override
337  public List<Flow> findFlows( String regex )
338    {
339    List<Flow> flows = new ArrayList<Flow>();
340
341    for( Flow flow : getFlows() )
342      {
343      if( flow.getName().matches( regex ) )
344        flows.add( flow );
345      }
346
347    return flows;
348    }
349
350  @Override
351  public Collection<Flow> getHeadFlows()
352    {
353    Set<Flow> flows = new HashSet<Flow>();
354
355    for( Flow flow : flowGraph.vertexSet() )
356      {
357      if( flowGraph.inDegreeOf( flow ) == 0 )
358        flows.add( flow );
359      }
360
361    return flows;
362    }
363
364  @Override
365  public Collection<Flow> getTailFlows()
366    {
367    Set<Flow> flows = new HashSet<Flow>();
368
369    for( Flow flow : flowGraph.vertexSet() )
370      {
371      if( flowGraph.outDegreeOf( flow ) == 0 )
372        flows.add( flow );
373      }
374
375    return flows;
376    }
377
378  @Override
379  public Collection<Flow> getIntermediateFlows()
380    {
381    Set<Flow> flows = new HashSet<Flow>( flowGraph.vertexSet() );
382
383    flows.removeAll( getHeadFlows() );
384    flows.removeAll( getTailFlows() );
385
386    return flows;
387    }
388
389  protected TapGraph getTapGraph()
390    {
391    if( tapGraph == null )
392      tapGraph = new TapGraph( flowGraph.vertexSet() );
393
394    return tapGraph;
395    }
396
397  @Override
398  public Collection<Tap> getSourceTaps()
399    {
400    TapGraph tapGraph = getTapGraph();
401    Set<Tap> taps = new HashSet<Tap>();
402
403    for( Tap tap : tapGraph.vertexSet() )
404      {
405      if( tapGraph.inDegreeOf( tap ) == 0 )
406        taps.add( tap );
407      }
408
409    return taps;
410    }
411
412  @Override
413  public Collection<Tap> getSinkTaps()
414    {
415    TapGraph tapGraph = getTapGraph();
416    Set<Tap> taps = new HashSet<Tap>();
417
418    for( Tap tap : tapGraph.vertexSet() )
419      {
420      if( tapGraph.outDegreeOf( tap ) == 0 )
421        taps.add( tap );
422      }
423
424    return taps;
425    }
426
427  @Override
428  public Collection<Tap> getCheckpointsTaps()
429    {
430    Set<Tap> taps = new HashSet<Tap>();
431
432    for( Flow flow : getFlows() )
433      taps.addAll( flow.getCheckpointsCollection() );
434
435    return taps;
436    }
437
438  @Override
439  public Collection<Tap> getIntermediateTaps()
440    {
441    TapGraph tapGraph = getTapGraph();
442    Set<Tap> taps = new HashSet<Tap>( tapGraph.vertexSet() );
443
444    taps.removeAll( getSourceTaps() );
445    taps.removeAll( getSinkTaps() );
446
447    return taps;
448    }
449
450  @Override
451  public Collection<Tap> getAllTaps()
452    {
453    return new HashSet<Tap>( getTapGraph().vertexSet() );
454    }
455
456  @Override
457  public Collection<Flow> getSuccessorFlows( Flow flow )
458    {
459    return Graphs.successorListOf( flowGraph, flow );
460    }
461
462  @Override
463  public Collection<Flow> getPredecessorFlows( Flow flow )
464    {
465    return Graphs.predecessorListOf( flowGraph, flow );
466    }
467
468  @Override
469  public Collection<Flow> findFlowsSourcingFrom( String identifier )
470    {
471    try
472      {
473      return unwrapFlows( identifierGraph.outgoingEdgesOf( identifier ) );
474      }
475    catch( Exception exception )
476      {
477      return Collections.emptySet();
478      }
479    }
480
481  @Override
482  public Collection<Flow> findFlowsSinkingTo( String identifier )
483    {
484    try
485      {
486      return unwrapFlows( identifierGraph.incomingEdgesOf( identifier ) );
487      }
488    catch( Exception exception )
489      {
490      return Collections.emptySet();
491      }
492    }
493
494  private Collection<Flow> unwrapFlows( Set<BaseFlow.FlowHolder> flowHolders )
495    {
496    Set<Flow> flows = new HashSet<Flow>();
497
498    for( BaseFlow.FlowHolder flowHolder : flowHolders )
499      flows.add( flowHolder.flow );
500
501    return flows;
502    }
503
504  @Override
505  public FlowSkipStrategy getFlowSkipStrategy()
506    {
507    return flowSkipStrategy;
508    }
509
510  @Override
511  public FlowSkipStrategy setFlowSkipStrategy( FlowSkipStrategy flowSkipStrategy )
512    {
513    try
514      {
515      return this.flowSkipStrategy;
516      }
517    finally
518      {
519      this.flowSkipStrategy = flowSkipStrategy;
520      }
521    }
522
523  @Override
524  public void prepare()
525    {
526    }
527
528  @Override
529  public void start()
530    {
531    if( thread != null )
532      return;
533
534    thread = new Thread( new Runnable()
535      {
536      @Override
537      public void run()
538        {
539        BaseCascade.this.run();
540        }
541      }, ( "cascade " + Util.toNull( getName() ) ).trim() );
542
543    thread.start();
544    }
545
546  @Override
547  public void complete()
548    {
549    start();
550
551    try
552      {
553      try
554        {
555        thread.join();
556        }
557      catch( InterruptedException exception )
558        {
559        throw new FlowException( "thread interrupted", exception );
560        }
561
562      if( throwable instanceof CascadingException )
563        throw (CascadingException) throwable;
564
565      if( throwable != null )
566        throw new CascadeException( "unhandled exception", throwable );
567      }
568    finally
569      {
570      thread = null;
571      throwable = null;
572      shutdownHook = null;
573      cascadeStats.cleanup();
574      }
575    }
576
577  @Override
578  public synchronized void stop()
579    {
580    if( stop )
581      return;
582
583    stop = true;
584
585    fireOnStopping();
586
587    if( !cascadeStats.isFinished() )
588      cascadeStats.markStopped();
589
590    internalStopAllFlows();
591    handleExecutorShutdown();
592
593    cascadeStats.cleanup();
594    }
595
596  @Override
597  public void cleanup()
598    {
599    }
600
601  /** Method run implements the Runnable run method. */
602  private void run()
603    {
604    Version.printBanner();
605
606    if( LOG.isInfoEnabled() )
607      logInfo( "starting" );
608
609    registerShutdownHook();
610
611    try
612      {
613      synchronized( this ) // prevent race on stopping
614        {
615        if( stop )
616          return;
617
618        // mark started, not submitted
619        cascadeStats.markStartedThenRunning();
620        }
621
622      fireOnStarting();
623
624      initializeNewJobsMap();
625
626      int numThreads = getMaxConcurrentFlows( properties, maxConcurrentFlows );
627
628      if( numThreads == 0 )
629        numThreads = jobsMap.size();
630
631      int numLocalFlows = numLocalFlows();
632
633      boolean runFlowsLocal = numLocalFlows > 1;
634
635      if( runFlowsLocal )
636        numThreads = 1;
637
638      if( isInfoEnabled() )
639        {
640        logInfo( " parallel execution of flows is enabled: " + ( numThreads != 1 ) );
641        logInfo( " executing total flows: " + jobsMap.size() );
642        logInfo( " allocating management threads: " + numThreads );
643        }
644
645      List<Future<Throwable>> futures = spawnStrategy.start( this, numThreads, jobsMap.values() );
646
647      for( Future<Throwable> future : futures )
648        {
649        throwable = future.get();
650
651        if( throwable != null )
652          {
653          if( !stop )
654            {
655            if( !cascadeStats.isFinished() )
656              cascadeStats.markFailed( throwable );
657            internalStopAllFlows();
658            fireOnThrowable();
659            }
660
661          handleExecutorShutdown();
662          break;
663          }
664        }
665      }
666    catch( Throwable throwable )
667      {
668      this.throwable = throwable;
669      }
670    finally
671      {
672      if( !cascadeStats.isFinished() )
673        cascadeStats.markSuccessful();
674
675      try
676        {
677        fireOnCompleted();
678        }
679      finally
680        {
681        deregisterShutdownHook();
682        }
683      }
684    }
685
686  private void registerShutdownHook()
687    {
688    if( !isStopJobsOnExit() )
689      return;
690
691    shutdownHook = new ShutdownUtil.Hook()
692      {
693      @Override
694      public Priority priority()
695        {
696        return Priority.WORK_PARENT;
697        }
698
699      @Override
700      public void execute()
701        {
702        logInfo( "shutdown hook calling stop on cascade" );
703
704        BaseCascade.this.stop();
705        }
706      };
707
708    ShutdownUtil.addHook( shutdownHook );
709    }
710
711  private void deregisterShutdownHook()
712    {
713    if( !isStopJobsOnExit() || stop )
714      return;
715
716    ShutdownUtil.removeHook( shutdownHook );
717    }
718
719  private boolean isStopJobsOnExit()
720    {
721    if( getFlows().isEmpty() )
722      return false; // don't bother registering hook
723
724    return getFlows().get( 0 ).isStopJobsOnExit();
725    }
726
727  /**
728   * If the number of flows that are local is greater than one, force the Cascade to run without parallelization.
729   *
730   * @return of type int
731   */
732  private int numLocalFlows()
733    {
734    int countLocalJobs = 0;
735
736    for( Flow flow : getFlows() )
737      {
738      if( flow.stepsAreLocal() )
739        countLocalJobs++;
740      }
741
742    return countLocalJobs;
743    }
744
745  private void initializeNewJobsMap()
746    {
747    synchronized( jobsMap )
748      {
749      // keep topo order
750      TopologicalOrderIterator<Flow, Integer> topoIterator = flowGraph.getTopologicalIterator();
751
752      while( topoIterator.hasNext() )
753        {
754        Flow flow = topoIterator.next();
755
756        cascadeStats.addFlowStats( flow.getFlowStats() );
757
758        CascadeJob job = new CascadeJob( flow );
759
760        jobsMap.put( flow.getName(), job );
761
762        List<CascadeJob> predecessors = new ArrayList<CascadeJob>();
763
764        for( Flow predecessor : Graphs.predecessorListOf( flowGraph, flow ) )
765          predecessors.add( (CascadeJob) jobsMap.get( predecessor.getName() ) );
766
767        job.init( predecessors );
768        }
769      }
770    }
771
772  private void handleExecutorShutdown()
773    {
774    if( spawnStrategy.isCompleted( this ) )
775      return;
776
777    logInfo( "shutting down flow executor" );
778
779    try
780      {
781      spawnStrategy.complete( this, 5 * 60, TimeUnit.SECONDS );
782      }
783    catch( InterruptedException exception )
784      {
785      // ignore
786      }
787
788    logInfo( "shutdown complete" );
789    }
790
791  private void internalStopAllFlows()
792    {
793    logInfo( "stopping all flows" );
794
795    synchronized( jobsMap )
796      {
797      List<Callable<Throwable>> jobs = new ArrayList<Callable<Throwable>>( jobsMap.values() );
798
799      Collections.reverse( jobs );
800
801      for( Callable<Throwable> callable : jobs )
802        ( (CascadeJob) callable ).stop();
803      }
804
805    logInfo( "stopped all flows" );
806    }
807
808  @Override
809  public void writeDOT( String filename )
810    {
811    printElementGraph( filename, identifierGraph );
812    }
813
814  protected void printElementGraph( String filename, SimpleDirectedGraph<String, BaseFlow.FlowHolder> graph )
815    {
816    try
817      {
818      Writer writer = new FileWriter( filename );
819
820      Util.writeDOT( writer, graph, new IntegerNameProvider<String>(), new VertexNameProvider<String>()
821        {
822        public String getVertexName( String object )
823          {
824          return object.toString().replaceAll( "\"", "\'" );
825          }
826        }, new EdgeNameProvider<BaseFlow.FlowHolder>()
827        {
828        public String getEdgeName( BaseFlow.FlowHolder object )
829          {
830          return object.flow.getName().replaceAll( "\"", "\'" ).replaceAll( "\n", "\\\\n" ); // fix for newlines in graphviz
831          }
832        }
833      );
834
835      writer.close();
836      }
837    catch( IOException exception )
838      {
839      logError( "failed printing graph to: {}, with exception: {}", filename, exception );
840      }
841    }
842
843  @Override
844  public String toString()
845    {
846    return getName();
847    }
848
849  @Override
850  public boolean isInfoEnabled()
851    {
852    return LOG.isInfoEnabled();
853    }
854
855  @Override
856  public boolean isDebugEnabled()
857    {
858    return LOG.isDebugEnabled();
859    }
860
861  @Override
862  public void logInfo( String message, Object... arguments )
863    {
864    LOG.info( "[" + Util.truncate( getName(), 25 ) + "] " + message, arguments );
865    }
866
867  @Override
868  public void logDebug( String message, Object... arguments )
869    {
870    LOG.debug( "[" + Util.truncate( getName(), 25 ) + "] " + message, arguments );
871    }
872
873  @Override
874  public void logWarn( String message )
875    {
876    LOG.warn( "[" + Util.truncate( getName(), 25 ) + "] " + message );
877    }
878
879  @Override
880  public void logWarn( String message, Throwable throwable )
881    {
882    LOG.warn( "[" + Util.truncate( getName(), 25 ) + "] " + message, throwable );
883    }
884
885  @Override
886  public void logWarn( String message, Object... arguments )
887    {
888    LOG.warn( "[" + Util.truncate( getName(), 25 ) + "] " + message, arguments );
889    }
890
891  @Override
892  public void logError( String message, Object... arguments )
893    {
894    LOG.warn( "[" + Util.truncate( getName(), 25 ) + "] " + message, arguments );
895    }
896
897  @Override
898  public void logError( String message, Throwable throwable )
899    {
900    LOG.warn( "[" + Util.truncate( getName(), 25 ) + "] " + message, throwable );
901    }
902
903  /** Class CascadeJob manages Flow execution in the current Cascade instance. */
904  protected class CascadeJob implements Callable<Throwable>
905    {
906    /** Field flow */
907    final Flow flow;
908    /** Field predecessors */
909    private List<CascadeJob> predecessors;
910    /** Field latch */
911    private final CountDownLatch latch = new CountDownLatch( 1 );
912    /** Field stop */
913    private boolean stop = false;
914    /** Field failed */
915    private boolean failed = false;
916
917    public CascadeJob( Flow flow )
918      {
919      this.flow = flow;
920      }
921
922    public String getName()
923      {
924      return flow.getName();
925      }
926
927    public Throwable call()
928      {
929      try
930        {
931        for( CascadeJob predecessor : predecessors )
932          {
933          if( !predecessor.isSuccessful() )
934            return null;
935          }
936
937        if( stop || cascadeStats.isFinished() )
938          return null;
939
940        try
941          {
942          if( LOG.isInfoEnabled() )
943            logInfo( "starting flow: " + flow.getName() );
944
945          if( flowSkipStrategy == null ? flow.isSkipFlow() : flowSkipStrategy.skipFlow( flow ) )
946            {
947            if( LOG.isInfoEnabled() )
948              logInfo( "skipping flow: " + flow.getName() );
949
950            flow.getFlowStats().markSkipped();
951            Flows.fireOnCompleted( flow );
952
953            return null;
954            }
955
956          flow.prepare(); // do not delete append/update mode taps
957          flow.complete();
958
959          if( LOG.isInfoEnabled() )
960            logInfo( "completed flow: " + flow.getName() );
961          }
962        catch( Throwable exception )
963          {
964          failed = true;
965          logWarn( "flow failed: " + flow.getName(), exception );
966
967          CascadeException cascadeException = new CascadeException( "flow failed: " + flow.getName(), exception );
968
969          if( !cascadeStats.isFinished() )
970            cascadeStats.markFailed( cascadeException );
971
972          return cascadeException;
973          }
974        finally
975          {
976          flow.cleanup();
977          }
978        }
979      catch( Throwable throwable )
980        {
981        failed = true;
982        return throwable;
983        }
984      finally
985        {
986        latch.countDown();
987        }
988
989      return null;
990      }
991
992    public void init( List<CascadeJob> predecessors )
993      {
994      this.predecessors = predecessors;
995      }
996
997    public void stop()
998      {
999      if( LOG.isInfoEnabled() )
1000        logInfo( "stopping flow: " + flow.getName() );
1001
1002      stop = true;
1003
1004      if( flow != null )
1005        flow.stop();
1006      }
1007
1008    public boolean isSuccessful()
1009      {
1010      try
1011        {
1012        latch.await();
1013
1014        return flow != null && !failed && !stop;
1015        }
1016      catch( InterruptedException exception )
1017        {
1018        logWarn( "latch interrupted", exception );
1019        }
1020
1021      return false;
1022      }
1023    }
1024
1025  @Override
1026  public UnitOfWorkSpawnStrategy getSpawnStrategy()
1027    {
1028    return spawnStrategy;
1029    }
1030
1031  @Override
1032  public void setSpawnStrategy( UnitOfWorkSpawnStrategy spawnStrategy )
1033    {
1034    this.spawnStrategy = spawnStrategy;
1035    }
1036
1037  /**
1038   * Class SafeCascadeListener safely calls a wrapped CascadeListener.
1039   * <p>
1040   * This is done for a few reasons, the primary reason is so exceptions thrown by the Listener
1041   * can be caught by the calling Thread. Since Cascade is asynchronous, much of the work is done in the run() method
1042   * which in turn is run in a new Thread.
1043   */
1044  private class SafeCascadeListener implements CascadeListener
1045    {
1046    /** Field flowListener */
1047    final CascadeListener cascadeListener;
1048    /** Field throwable */
1049    Throwable throwable;
1050
1051    private SafeCascadeListener( CascadeListener cascadeListener )
1052      {
1053      this.cascadeListener = cascadeListener;
1054      }
1055
1056    public void onStarting( Cascade cascade )
1057      {
1058      try
1059        {
1060        cascadeListener.onStarting( cascade );
1061        }
1062      catch( Throwable throwable )
1063        {
1064        handleThrowable( throwable );
1065        }
1066      }
1067
1068    public void onStopping( Cascade cascade )
1069      {
1070      try
1071        {
1072        cascadeListener.onStopping( cascade );
1073        }
1074      catch( Throwable throwable )
1075        {
1076        handleThrowable( throwable );
1077        }
1078      }
1079
1080    public void onCompleted( Cascade cascade )
1081      {
1082      try
1083        {
1084        cascadeListener.onCompleted( cascade );
1085        }
1086      catch( Throwable throwable )
1087        {
1088        handleThrowable( throwable );
1089        }
1090      }
1091
1092    public boolean onThrowable( Cascade cascade, Throwable flowThrowable )
1093      {
1094      try
1095        {
1096        return cascadeListener.onThrowable( cascade, flowThrowable );
1097        }
1098      catch( Throwable throwable )
1099        {
1100        handleThrowable( throwable );
1101        }
1102
1103      return false;
1104      }
1105
1106    private void handleThrowable( Throwable throwable )
1107      {
1108      this.throwable = throwable;
1109
1110      logWarn( String.format( "cascade listener %s threw throwable", cascadeListener ), throwable );
1111
1112      // stop this flow
1113      stop();
1114      }
1115
1116    public boolean equals( Object object )
1117      {
1118      if( object instanceof SafeCascadeListener )
1119        return cascadeListener.equals( ( (SafeCascadeListener) object ).cascadeListener );
1120
1121      return cascadeListener.equals( object );
1122      }
1123
1124    public int hashCode()
1125      {
1126      return cascadeListener.hashCode();
1127      }
1128    }
1129  }