001    /*
002     * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.flow.planner;
022    
023    import java.io.IOException;
024    import java.io.Serializable;
025    import java.util.ArrayList;
026    import java.util.Collection;
027    import java.util.Collections;
028    import java.util.Date;
029    import java.util.HashMap;
030    import java.util.HashSet;
031    import java.util.Iterator;
032    import java.util.LinkedHashMap;
033    import java.util.LinkedList;
034    import java.util.List;
035    import java.util.ListIterator;
036    import java.util.Map;
037    import java.util.Set;
038    
039    import cascading.flow.Flow;
040    import cascading.flow.FlowElement;
041    import cascading.flow.FlowException;
042    import cascading.flow.FlowProcess;
043    import cascading.flow.FlowStep;
044    import cascading.flow.FlowStepListener;
045    import cascading.management.CascadingServices;
046    import cascading.management.state.ClientState;
047    import cascading.operation.Operation;
048    import cascading.pipe.Group;
049    import cascading.pipe.HashJoin;
050    import cascading.pipe.Merge;
051    import cascading.pipe.Operator;
052    import cascading.pipe.Pipe;
053    import cascading.property.ConfigDef;
054    import cascading.stats.FlowStepStats;
055    import cascading.tap.Tap;
056    import cascading.util.Util;
057    import org.jgrapht.GraphPath;
058    import org.jgrapht.Graphs;
059    import org.jgrapht.alg.KShortestPaths;
060    import org.jgrapht.graph.SimpleDirectedGraph;
061    import org.jgrapht.traverse.TopologicalOrderIterator;
062    import org.slf4j.Logger;
063    import org.slf4j.LoggerFactory;
064    
065    /**
066     * Class FlowStep is an internal representation of a given Job to be executed on a remote cluster. During
067     * planning, pipe assemblies are broken down into "steps" and encapsulated in this class.
068     * <p/>
069     * FlowSteps are submitted in order of dependency. If two or more steps do not share the same dependencies and all
070     * can be scheduled simultaneously, the {@link #getSubmitPriority()} value determines the order in which
071     * all steps will be submitted for execution. The default submit priority is 5.
072     * <p/>
073     * This class is for internal use, there are no stable public methods.
074     */
075    public abstract class BaseFlowStep<Config> implements Serializable, FlowStep<Config>
076      {
077      /** Field LOG */
078      private static final Logger LOG = LoggerFactory.getLogger( FlowStep.class );
079    
080      /** Field flow */
081      private transient Flow<Config> flow;
082      /** Field flowName */
083      private String flowName;
084      /** Field flowID */
085      private String flowID;
086    
087      private transient Config conf;
088    
089      /** Field submitPriority */
090      private int submitPriority = 5;
091    
092      /** Field name */
093      String name;
094      /** Field id */
095      private String id;
096      private final int stepNum;
097    
098      /** Field step listeners */
099      private List<SafeFlowStepListener> listeners;
100    
101      /** Field graph */
102      private final SimpleDirectedGraph<FlowElement, Scope> graph = new SimpleDirectedGraph<FlowElement, Scope>( Scope.class );
103    
104      /** Field sources */
105      protected final Map<Tap, Set<String>> sources = new HashMap<Tap, Set<String>>(); // all sources
106      /** Field sink */
107      protected final Map<Tap, Set<String>> sinks = new HashMap<Tap, Set<String>>(); // all sinks
108    
109      /** Field tempSink */
110      protected Tap tempSink; // used if we need to bypass the filesystem
111    
112      /** Field groups */
113      private final List<Group> groups = new ArrayList<Group>();
114    
115      // sources streamed into join - not necessarily all sources
116      protected final Map<HashJoin, Tap> streamedSourceByJoin = new LinkedHashMap<HashJoin, Tap>();
117      // sources accumulated by join
118      protected final Map<HashJoin, Set<Tap>> accumulatedSourcesByJoin = new LinkedHashMap<HashJoin, Set<Tap>>();
119    
120      private transient FlowStepJob<Config> flowStepJob;
121    
122      protected BaseFlowStep( String name, int stepNum )
123        {
124        setName( name );
125        this.stepNum = stepNum;
126        }
127    
128      @Override
129      public String getID()
130        {
131        if( id == null )
132          id = Util.createUniqueID();
133    
134        return id;
135        }
136    
137      @Override
138      public int getStepNum()
139        {
140        return stepNum;
141        }
142    
143      @Override
144      public String getName()
145        {
146        return name;
147        }
148    
149      void setName( String name )
150        {
151        if( name == null || name.isEmpty() )
152          throw new IllegalArgumentException( "step name may not be null or empty" );
153    
154        this.name = name;
155        }
156    
157      public void setFlow( Flow<Config> flow )
158        {
159        this.flow = flow;
160        this.flowID = flow.getID();
161        this.flowName = flow.getName();
162        }
163    
164      @Override
165      public Flow<Config> getFlow()
166        {
167        return flow;
168        }
169    
170      @Override
171      public String getFlowID()
172        {
173        return flowID;
174        }
175    
176      @Override
177      public String getFlowName()
178        {
179        return flowName;
180        }
181    
182      protected void setFlowName( String flowName )
183        {
184        this.flowName = flowName;
185        }
186    
187      @Override
188      public Config getConfig()
189        {
190        return conf;
191        }
192    
193      protected void setConf( Config conf )
194        {
195        this.conf = conf;
196        }
197    
198      @Override
199      public String getStepDisplayName()
200        {
201        return getStepDisplayName( Util.ID_LENGTH );
202        }
203    
204      protected String getStepDisplayName( int idLength )
205        {
206        if( idLength > Util.ID_LENGTH )
207          idLength = Util.ID_LENGTH;
208    
209        String flowID = getFlowID().substring( 0, idLength );
210        String stepID = getID().substring( 0, idLength );
211    
212        return String.format( "[%s/%s] %s/%s", flowID, stepID, getFlowName(), getName() );
213        }
214    
215      @Override
216      public int getSubmitPriority()
217        {
218        return submitPriority;
219        }
220    
221      @Override
222      public void setSubmitPriority( int submitPriority )
223        {
224        if( submitPriority < 1 || submitPriority > 10 )
225          throw new IllegalArgumentException( "submitPriority must be between 1 and 10 inclusive, was: " + submitPriority );
226    
227        this.submitPriority = submitPriority;
228        }
229    
230      @Override
231      public FlowStepStats getFlowStepStats()
232        {
233        return flowStepJob.getStepStats();
234        }
235    
236      public SimpleDirectedGraph<FlowElement, Scope> getGraph()
237        {
238        return graph;
239        }
240    
241      @Override
242      public Group getGroup()
243        {
244        if( groups.isEmpty() )
245          return null;
246    
247        if( groups.size() > 1 )
248          throw new IllegalStateException( "more than one group" );
249    
250        return groups.get( 0 );
251        }
252    
253      @Override
254      public List<Group> getGroups()
255        {
256        return groups;
257        }
258    
259      public void addGroup( Group group )
260        {
261        if( !groups.contains( group ) )
262          groups.add( group );
263        }
264    
265      @Override
266      public Map<HashJoin, Tap> getStreamedSourceByJoin()
267        {
268        return streamedSourceByJoin;
269        }
270    
271      public void addStreamedSourceFor( HashJoin join, Tap streamedSource )
272        {
273        streamedSourceByJoin.put( join, streamedSource );
274        }
275    
276      @Override
277      public Set<Tap> getAllAccumulatedSources()
278        {
279        HashSet<Tap> set = new HashSet<Tap>();
280    
281        for( Set<Tap> taps : accumulatedSourcesByJoin.values() )
282          set.addAll( taps );
283    
284        return set;
285        }
286    
287      public void addAccumulatedSourceFor( HashJoin join, Tap accumulatedSource )
288        {
289        if( !accumulatedSourcesByJoin.containsKey( join ) )
290          accumulatedSourcesByJoin.put( join, new HashSet<Tap>() );
291    
292        accumulatedSourcesByJoin.get( join ).add( accumulatedSource );
293        }
294    
295      public void addSource( String name, Tap source )
296        {
297        if( !sources.containsKey( source ) )
298          sources.put( source, new HashSet<String>() );
299    
300        sources.get( source ).add( name );
301        }
302    
303      public void addSink( String name, Tap sink )
304        {
305        if( !sinks.containsKey( sink ) )
306          sinks.put( sink, new HashSet<String>() );
307    
308        sinks.get( sink ).add( name );
309        }
310    
311      @Override
312      public Set<Tap> getSources()
313        {
314        return Collections.unmodifiableSet( new HashSet<Tap>( sources.keySet() ) );
315        }
316    
317      @Override
318      public Set<Tap> getSinks()
319        {
320        return Collections.unmodifiableSet( new HashSet<Tap>( sinks.keySet() ) );
321        }
322    
323      @Override
324      public Tap getSink()
325        {
326        if( sinks.size() != 1 )
327          throw new IllegalStateException( "more than one sink" );
328    
329        return sinks.keySet().iterator().next();
330        }
331    
332      @Override
333      public Set<String> getSourceName( Tap source )
334        {
335        return Collections.unmodifiableSet( sources.get( source ) );
336        }
337    
338      @Override
339      public Set<String> getSinkName( Tap sink )
340        {
341        return Collections.unmodifiableSet( sinks.get( sink ) );
342        }
343    
344      @Override
345      public Tap getSourceWith( String identifier )
346        {
347        for( Tap tap : sources.keySet() )
348          {
349          if( tap.getIdentifier().equalsIgnoreCase( identifier ) )
350            return tap;
351          }
352    
353        return null;
354        }
355    
356      @Override
357      public Tap getSinkWith( String identifier )
358        {
359        for( Tap tap : sinks.keySet() )
360          {
361          if( tap.getIdentifier().equalsIgnoreCase( identifier ) )
362            return tap;
363          }
364    
365        return null;
366        }
367    
368      boolean allSourcesExist() throws IOException
369        {
370        for( Tap tap : sources.keySet() )
371          {
372          if( !tap.resourceExists( getConfig() ) )
373            return false;
374          }
375    
376        return true;
377        }
378    
379      boolean areSourcesNewer( long sinkModified ) throws IOException
380        {
381        Config config = getConfig();
382        Iterator<Tap> values = sources.keySet().iterator();
383    
384        long sourceModified = 0;
385    
386        try
387          {
388          sourceModified = Util.getSourceModified( config, values, sinkModified );
389    
390          if( sinkModified < sourceModified )
391            return true;
392    
393          return false;
394          }
395        finally
396          {
397          if( LOG.isInfoEnabled() )
398            logInfo( "source modification date at: " + new Date( sourceModified ) ); // not oldest, we didnt check them all
399          }
400        }
401    
402      long getSinkModified() throws IOException
403        {
404        long sinkModified = Util.getSinkModified( getConfig(), sinks.keySet() );
405    
406        if( LOG.isInfoEnabled() )
407          {
408          if( sinkModified == -1L )
409            logInfo( "at least one sink is marked for delete" );
410          if( sinkModified == 0L )
411            logInfo( "at least one sink does not exist" );
412          else
413            logInfo( "sink oldest modified date: " + new Date( sinkModified ) );
414          }
415    
416        return sinkModified;
417        }
418    
419      protected Throwable prepareResources()
420        {
421        Throwable throwable = prepareResources( getSources(), false );
422    
423        if( throwable == null )
424          throwable = prepareResources( getSinks(), true );
425    
426        if( throwable == null )
427          throwable = prepareResources( getTraps(), true );
428    
429        return throwable;
430        }
431    
432      private Throwable prepareResources( Collection<Tap> taps, boolean forWrite )
433        {
434        Throwable throwable = null;
435    
436        for( Tap tap : taps )
437          {
438          throwable = prepareResource( tap, forWrite );
439    
440          if( throwable != null )
441            break;
442          }
443    
444        return throwable;
445        }
446    
447      private Throwable prepareResource( Tap tap, boolean forWrite )
448        {
449        Throwable throwable = null;
450    
451        try
452          {
453          boolean result;
454    
455          if( forWrite )
456            result = tap.prepareResourceForWrite( getConfig() );
457          else
458            result = tap.prepareResourceForRead( getConfig() );
459    
460          if( !result )
461            {
462            String message = String.format( "unable to prepare tap for %s: %s", forWrite ? "write" : "read", tap.getFullIdentifier( getConfig() ) );
463    
464            logError( message, null );
465    
466            throwable = new FlowException( message );
467            }
468          }
469        catch( Throwable exception )
470          {
471          String message = String.format( "unable to prepare tap for %s: %s", forWrite ? "write" : "read", tap.getFullIdentifier( getConfig() ) );
472    
473          logError( message, exception );
474    
475          throwable = new FlowException( message, exception );
476          }
477    
478        return throwable;
479        }
480    
481      protected Throwable commitSinks()
482        {
483        Throwable throwable = null;
484    
485        for( Tap tap : sinks.keySet() )
486          {
487          if( throwable != null )
488            rollbackResource( tap );
489          else
490            throwable = commitResource( tap );
491          }
492    
493        return throwable;
494        }
495    
496      private Throwable commitResource( Tap tap )
497        {
498        Throwable throwable = null;
499    
500        try
501          {
502          if( !tap.commitResource( getConfig() ) )
503            {
504            String message = "unable to commit sink: " + tap.getFullIdentifier( getConfig() );
505    
506            logError( message, null );
507    
508            throwable = new FlowException( message );
509            }
510          }
511        catch( Throwable exception )
512          {
513          String message = "unable to commit sink: " + tap.getFullIdentifier( getConfig() );
514    
515          logError( message, exception );
516    
517          throwable = new FlowException( message, exception );
518          }
519    
520        return throwable;
521        }
522    
523      private Throwable rollbackResource( Tap tap )
524        {
525        Throwable throwable = null;
526    
527        try
528          {
529          if( !tap.rollbackResource( getConfig() ) )
530            {
531            String message = "unable to rollback sink: " + tap.getFullIdentifier( getConfig() );
532    
533            logError( message, null );
534    
535            throwable = new FlowException( message );
536            }
537          }
538        catch( Throwable exception )
539          {
540          String message = "unable to rollback sink: " + tap.getFullIdentifier( getConfig() );
541    
542          logError( message, exception );
543    
544          throwable = new FlowException( message, exception );
545          }
546    
547        return throwable;
548        }
549    
550      protected Throwable rollbackSinks()
551        {
552        Throwable throwable = null;
553    
554        for( Tap tap : sinks.keySet() )
555          {
556          if( throwable != null )
557            rollbackResource( tap );
558          else
559            throwable = rollbackResource( tap );
560          }
561    
562        return throwable;
563        }
564    
565      protected abstract Config getInitializedConfig( FlowProcess<Config> flowProcess, Config parentConfig );
566    
567      /**
568       * Method getPreviousScopes returns the previous Scope instances. If the flowElement is a Group (specifically a CoGroup),
569       * there will be more than one instance.
570       *
571       * @param flowElement of type FlowElement
572       * @return Set<Scope>
573       */
574      public Set<Scope> getPreviousScopes( FlowElement flowElement )
575        {
576        return getGraph().incomingEdgesOf( flowElement );
577        }
578    
579      /**
580       * Method getNextScope returns the next Scope instance in the graph. There will always only be one next.
581       *
582       * @param flowElement of type FlowElement
583       * @return Scope
584       */
585      public Scope getNextScope( FlowElement flowElement )
586        {
587        Set<Scope> set = getGraph().outgoingEdgesOf( flowElement );
588    
589        if( set.size() != 1 )
590          throw new IllegalStateException( "should only be one scope after current flow element: " + flowElement + " found: " + set.size() );
591    
592        return set.iterator().next();
593        }
594    
595      public Scope getScopeFor( FlowElement sourceElement, FlowElement targetElement )
596        {
597        return getGraph().getEdge( sourceElement, targetElement );
598        }
599    
600      public Set<Scope> getNextScopes( FlowElement flowElement )
601        {
602        return getGraph().outgoingEdgesOf( flowElement );
603        }
604    
605      public FlowElement getNextFlowElement( Scope scope )
606        {
607        return getGraph().getEdgeTarget( scope );
608        }
609    
610      public TopologicalOrderIterator<FlowElement, Scope> getTopologicalOrderIterator()
611        {
612        return new TopologicalOrderIterator<FlowElement, Scope>( graph );
613        }
614    
615      public List<FlowElement> getSuccessors( FlowElement element )
616        {
617        return Graphs.successorListOf( graph, element );
618        }
619    
620      public Set<Tap> getJoinTributariesBetween( FlowElement from, FlowElement to )
621        {
622        Set<HashJoin> joins = new HashSet<HashJoin>();
623        Set<Merge> merges = new HashSet<Merge>();
624    
625        List<GraphPath<FlowElement, Scope>> paths = getPathsBetween( from, to );
626    
627        for( GraphPath<FlowElement, Scope> path : paths )
628          {
629          for( FlowElement flowElement : Graphs.getPathVertexList( path ) )
630            {
631            if( flowElement instanceof HashJoin )
632              joins.add( (HashJoin) flowElement );
633    
634            if( flowElement instanceof Merge )
635              merges.add( (Merge) flowElement );
636            }
637          }
638    
639        Set<Tap> tributaries = new HashSet<Tap>();
640    
641        for( HashJoin join : joins )
642          {
643          for( Tap source : sources.keySet() )
644            {
645            List<GraphPath<FlowElement, Scope>> joinPaths = new LinkedList( getPathsBetween( source, join ) );
646    
647            ListIterator<GraphPath<FlowElement, Scope>> iterator = joinPaths.listIterator();
648    
649            while( iterator.hasNext() )
650              {
651              GraphPath<FlowElement, Scope> joinPath = iterator.next();
652    
653              if( !Collections.disjoint( Graphs.getPathVertexList( joinPath ), merges ) )
654                iterator.remove();
655              }
656    
657            if( !joinPaths.isEmpty() )
658              tributaries.add( source );
659            }
660          }
661    
662        return tributaries;
663        }
664    
665      private List<GraphPath<FlowElement, Scope>> getPathsBetween( FlowElement from, FlowElement to )
666        {
667        KShortestPaths<FlowElement, Scope> paths = new KShortestPaths<FlowElement, Scope>( graph, from, Integer.MAX_VALUE );
668        List<GraphPath<FlowElement, Scope>> results = paths.getPaths( to );
669    
670        if( results == null )
671          return Collections.EMPTY_LIST;
672    
673        return results;
674        }
675    
676      public Collection<Operation> getAllOperations()
677        {
678        Set<FlowElement> vertices = getGraph().vertexSet();
679        List<Operation> operations = new ArrayList<Operation>(); // operations impl equals, so two instance may be the same
680    
681        for( FlowElement vertex : vertices )
682          {
683          if( vertex instanceof Operator )
684            operations.add( ( (Operator) vertex ).getOperation() );
685          }
686    
687        return operations;
688        }
689    
690      @Override
691      public boolean containsPipeNamed( String pipeName )
692        {
693        Set<FlowElement> vertices = getGraph().vertexSet();
694    
695        for( FlowElement vertex : vertices )
696          {
697          if( vertex instanceof Pipe && ( (Pipe) vertex ).getName().equals( pipeName ) )
698            return true;
699          }
700    
701        return false;
702        }
703    
704      public void clean()
705        {
706        // use step config by default
707        clean( getConfig() );
708        }
709    
710      public abstract void clean( Config config );
711    
712      List<SafeFlowStepListener> getListeners()
713        {
714        if( listeners == null )
715          listeners = new LinkedList<SafeFlowStepListener>();
716    
717        return listeners;
718        }
719    
720      @Override
721      public boolean hasListeners()
722        {
723        return listeners != null && !listeners.isEmpty();
724        }
725    
726      @Override
727      public void addListener( FlowStepListener flowStepListener )
728        {
729        getListeners().add( new SafeFlowStepListener( flowStepListener ) );
730        }
731    
732      @Override
733      public boolean removeListener( FlowStepListener flowStepListener )
734        {
735        return getListeners().remove( new SafeFlowStepListener( flowStepListener ) );
736        }
737    
738      protected void fireOnCompleted()
739        {
740    
741        if( hasListeners() )
742          {
743          if( LOG.isDebugEnabled() )
744            logDebug( "firing onCompleted event: " + getListeners().size() );
745    
746          for( Object flowStepListener : getListeners() )
747            ( (FlowStepListener) flowStepListener ).onStepCompleted( this );
748          }
749        }
750    
751      protected void fireOnThrowable( Throwable throwable )
752        {
753        if( hasListeners() )
754          {
755          if( LOG.isDebugEnabled() )
756            logDebug( "firing onThrowable event: " + getListeners().size() );
757    
758          for( Object flowStepListener : getListeners() )
759            ( (FlowStepListener) flowStepListener ).onStepThrowable( this, throwable );
760          }
761        }
762    
763      protected void fireOnStopping()
764        {
765        if( hasListeners() )
766          {
767          if( LOG.isDebugEnabled() )
768            logDebug( "firing onStopping event: " + getListeners() );
769    
770          for( Object flowStepListener : getListeners() )
771            ( (FlowStepListener) flowStepListener ).onStepStopping( this );
772          }
773        }
774    
775      protected void fireOnStarting()
776        {
777        if( hasListeners() )
778          {
779          if( LOG.isDebugEnabled() )
780            logDebug( "firing onStarting event: " + getListeners().size() );
781    
782          for( Object flowStepListener : getListeners() )
783            ( (FlowStepListener) flowStepListener ).onStepStarting( this );
784          }
785        }
786    
787      protected void fireOnRunning()
788        {
789        if( hasListeners() )
790          {
791          if( LOG.isDebugEnabled() )
792            logDebug( "firing onRunning event: " + getListeners().size() );
793    
794          for( Object flowStepListener : getListeners() )
795            ( (FlowStepListener) flowStepListener ).onStepRunning( this );
796          }
797        }
798    
799      @Override
800      public boolean equals( Object object )
801        {
802        if( this == object )
803          return true;
804        if( object == null || getClass() != object.getClass() )
805          return false;
806    
807        BaseFlowStep flowStep = (BaseFlowStep) object;
808    
809        if( name != null ? !name.equals( flowStep.name ) : flowStep.name != null )
810          return false;
811    
812        return true;
813        }
814    
815      protected ClientState createClientState( FlowProcess flowProcess )
816        {
817        CascadingServices services = flowProcess.getCurrentSession().getCascadingServices();
818        return services.createClientState( getID() );
819        }
820    
821      public FlowStepJob<Config> getFlowStepJob( FlowProcess<Config> flowProcess, Config parentConfig )
822        {
823        if( flowStepJob != null )
824          return flowStepJob;
825    
826        if( flowProcess == null )
827          return null;
828    
829        flowStepJob = createFlowStepJob( flowProcess, parentConfig );
830    
831        return flowStepJob;
832        }
833    
834      protected abstract FlowStepJob createFlowStepJob( FlowProcess<Config> flowProcess, Config parentConfig );
835    
836      protected void initConfFromProcessConfigDef( ConfigDef.Setter setter )
837        {
838        // applies each mode in order, topologically
839        for( ConfigDef.Mode mode : ConfigDef.Mode.values() )
840          {
841          TopologicalOrderIterator<FlowElement, Scope> iterator = getTopologicalOrderIterator();
842    
843          while( iterator.hasNext() )
844            {
845            FlowElement element = iterator.next();
846    
847            while( element != null )
848              {
849              if( element.hasStepConfigDef() )
850                element.getStepConfigDef().apply( mode, setter );
851    
852              if( element instanceof Pipe )
853                element = ( (Pipe) element ).getParent();
854              else
855                element = null;
856              }
857            }
858          }
859        }
860    
861      @Override
862      public int hashCode()
863        {
864        return name != null ? name.hashCode() : 0;
865        }
866    
867      @Override
868      public String toString()
869        {
870        StringBuffer buffer = new StringBuffer();
871    
872        buffer.append( getClass().getSimpleName() );
873        buffer.append( "[name: " ).append( getName() ).append( "]" );
874    
875        return buffer.toString();
876        }
877    
878      public final boolean isInfoEnabled()
879        {
880        return LOG.isInfoEnabled();
881        }
882    
883      public final boolean isDebugEnabled()
884        {
885        return LOG.isDebugEnabled();
886        }
887    
888      public void logDebug( String message )
889        {
890        LOG.debug( "[" + Util.truncate( getFlowName(), 25 ) + "] " + message );
891        }
892    
893      public void logInfo( String message )
894        {
895        LOG.info( "[" + Util.truncate( getFlowName(), 25 ) + "] " + message );
896        }
897    
898      public void logWarn( String message )
899        {
900        LOG.warn( "[" + Util.truncate( getFlowName(), 25 ) + "] " + message );
901        }
902    
903      public void logWarn( String message, Throwable throwable )
904        {
905        LOG.warn( "[" + Util.truncate( getFlowName(), 25 ) + "] " + message, throwable );
906        }
907    
908      public void logError( String message, Throwable throwable )
909        {
910        LOG.error( "[" + Util.truncate( getFlowName(), 25 ) + "] " + message, throwable );
911        }
912    
913      /**
914       * Class SafeFlowStepListener safely calls a wrapped FlowStepListener.
915       * <p/>
916       * This is done for a few reasons, the primary reason is so exceptions thrown by the Listener
917       * can be caught by the calling Thread. Since Flow is asynchronous, much of the work is done in the run() method
918       * which in turn is run in a new Thread.
919       */
920      private class SafeFlowStepListener implements FlowStepListener
921        {
922        /** Field flowListener */
923        final FlowStepListener flowStepListener;
924        /** Field throwable */
925        Throwable throwable;
926    
927        private SafeFlowStepListener( FlowStepListener flowStepListener )
928          {
929          this.flowStepListener = flowStepListener;
930          }
931    
932        public void onStepStarting( FlowStep flowStep )
933          {
934          try
935            {
936            flowStepListener.onStepStarting( flowStep );
937            }
938          catch( Throwable throwable )
939            {
940            handleThrowable( throwable );
941            }
942          }
943    
944        public void onStepStopping( FlowStep flowStep )
945          {
946          try
947            {
948            flowStepListener.onStepStopping( flowStep );
949            }
950          catch( Throwable throwable )
951            {
952            handleThrowable( throwable );
953            }
954          }
955    
956        public void onStepCompleted( FlowStep flowStep )
957          {
958          try
959            {
960            flowStepListener.onStepCompleted( flowStep );
961            }
962          catch( Throwable throwable )
963            {
964            handleThrowable( throwable );
965            }
966          }
967    
968        public void onStepRunning( FlowStep flowStep )
969          {
970          try
971            {
972            flowStepListener.onStepRunning( flowStep );
973            }
974          catch( Throwable throwable )
975            {
976            handleThrowable( throwable );
977            }
978          }
979    
980        public boolean onStepThrowable( FlowStep flowStep, Throwable flowStepThrowable )
981          {
982          try
983            {
984            return flowStepListener.onStepThrowable( flowStep, flowStepThrowable );
985            }
986          catch( Throwable throwable )
987            {
988            handleThrowable( throwable );
989            }
990    
991          return false;
992          }
993    
994        private void handleThrowable( Throwable throwable )
995          {
996          this.throwable = throwable;
997    
998          logWarn( String.format( "flow step listener %s threw throwable", flowStepListener ), throwable );
999          }
1000    
1001        public boolean equals( Object object )
1002          {
1003          if( object instanceof BaseFlowStep.SafeFlowStepListener )
1004            return flowStepListener.equals( ( (BaseFlowStep.SafeFlowStepListener) object ).flowStepListener );
1005    
1006          return flowStepListener.equals( object );
1007          }
1008    
1009        public int hashCode()
1010          {
1011          return flowStepListener.hashCode();
1012          }
1013        }
1014      }