001/*
002 * Copyright (c) 2016-2017 Chris K Wensel. All Rights Reserved.
003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
004 *
005 * Project and contact information: http://www.cascading.org/
006 *
007 * This file is part of the Cascading project.
008 *
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *     http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 */
021
022package cascading.flow.planner;
023
024import java.io.IOException;
025import java.io.Serializable;
026import java.lang.reflect.Type;
027import java.util.ArrayList;
028import java.util.Collection;
029import java.util.Collections;
030import java.util.Date;
031import java.util.HashMap;
032import java.util.HashSet;
033import java.util.Iterator;
034import java.util.LinkedList;
035import java.util.List;
036import java.util.Map;
037import java.util.Objects;
038import java.util.Set;
039import java.util.stream.Collectors;
040
041import cascading.flow.Flow;
042import cascading.flow.FlowElement;
043import cascading.flow.FlowException;
044import cascading.flow.FlowNode;
045import cascading.flow.FlowProcess;
046import cascading.flow.FlowStep;
047import cascading.flow.FlowStepListener;
048import cascading.flow.planner.graph.AnnotatedGraph;
049import cascading.flow.planner.graph.ElementGraph;
050import cascading.flow.planner.graph.ElementGraphs;
051import cascading.flow.planner.process.FlowNodeGraph;
052import cascading.flow.stream.annotations.StreamMode;
053import cascading.management.CascadingServices;
054import cascading.management.state.ClientState;
055import cascading.operation.Operation;
056import cascading.pipe.Group;
057import cascading.pipe.Operator;
058import cascading.pipe.Pipe;
059import cascading.pipe.Splice;
060import cascading.pipe.SubAssembly;
061import cascading.property.ConfigDef;
062import cascading.stats.FlowStepStats;
063import cascading.tap.Tap;
064import cascading.tuple.Fields;
065import cascading.tuple.type.SerializableType;
066import cascading.util.EnumMultiMap;
067import cascading.util.ProcessLogger;
068import cascading.util.Util;
069
070import static cascading.flow.planner.graph.ElementGraphs.findAllGroups;
071import static cascading.util.Util.narrowIdentitySet;
072
073/**
074 * Class FlowStep is an internal representation of a given Job to be executed on a remote cluster. During
075 * planning, pipe assemblies are broken down into "steps" and encapsulated in this class.
076 * <p>
077 * FlowSteps are submitted in order of dependency. If two or more steps do not share the same dependencies and all
078 * can be scheduled simultaneously, the {@link #getSubmitPriority()} value determines the order in which
079 * all steps will be submitted for execution. The default submit priority is 5.
080 * <p>
081 * This class is for internal use, there are no stable public methods.
082 */
083public abstract class BaseFlowStep<Config> implements FlowStep<Config>, ProcessLogger, Serializable
084  {
085  /** Field flow */
086  private transient Flow<Config> flow;
087  /** Field flowName */
088  private String flowName;
089  /** Field flowID */
090  private String flowID;
091
092  private transient Config flowStepConf;
093
094  /** Field submitPriority */
095  private int submitPriority = 5;
096
097  /** Field name */
098  String name;
099  private String id;
100  private int ordinal;
101  private Map<String, String> processAnnotations;
102
103  /** Field step listeners */
104  private List<SafeFlowStepListener> listeners;
105
106  /** Field elementGraph */
107  protected ElementGraph elementGraph;
108  /** Field flowNodeGraph */
109  protected FlowNodeGraph flowNodeGraph;
110
111  /** Field sources */
112  protected final Map<Tap, Set<String>> sources = new HashMap<>(); // all sources
113  /** Field sink */
114  protected final Map<Tap, Set<String>> sinks = new HashMap<>(); // all sinks
115  /** Field traps */
116  private final Map<String, Tap> traps = new HashMap<>();
117
118  /** Field tempSink */
119  protected Tap tempSink; // used if we need to bypass the filesystem
120
121  /** Field groups */
122  private final List<Group> groups = new ArrayList<Group>();
123
124  protected transient FlowStepStats flowStepStats;
125
126  private transient FlowStepJob<Config> flowStepJob;
127
128  /** optional metadata about the FlowStep */
129  private Map<String, String> flowStepDescriptor = Collections.emptyMap();
130
131  protected BaseFlowStep( String name, int ordinal )
132    {
133    this( name, ordinal, null );
134    }
135
136  protected BaseFlowStep( String name, int ordinal, Map<String, String> flowStepDescriptor )
137    {
138    this( name, ordinal, null, flowStepDescriptor );
139    }
140
141  protected BaseFlowStep( String name, int ordinal, FlowNodeGraph flowNodeGraph, Map<String, String> flowStepDescriptor )
142    {
143    this();
144    setName( name );
145    this.ordinal = ordinal;
146
147    this.elementGraph = null;
148    this.flowNodeGraph = flowNodeGraph;
149
150    setFlowStepDescriptor( flowStepDescriptor );
151    }
152
153  protected BaseFlowStep( ElementGraph elementStepGraph, FlowNodeGraph flowNodeGraph )
154    {
155    this( elementStepGraph, flowNodeGraph, null );
156    }
157
158  protected BaseFlowStep( ElementGraph elementStepGraph, FlowNodeGraph flowNodeGraph, Map<String, String> flowStepDescriptor )
159    {
160    this();
161    this.elementGraph = elementStepGraph;
162    this.flowNodeGraph = flowNodeGraph; // TODO: verify no missing elements in the union of the node graphs
163
164    setFlowStepDescriptor( flowStepDescriptor );
165
166    configure();
167    }
168
169  protected BaseFlowStep()
170    {
171    this.id = Util.createUniqueIDWhichStartsWithAChar(); // timeline server cannot filter strings that start with a number
172    }
173
174  protected void configure()
175    {
176    addSources( this, getElementGraph(), getFlowNodeGraph().getSourceTaps() );
177    addSinks( this, getElementGraph(), getFlowNodeGraph().getSinkTaps() );
178
179    addAllGroups();
180
181    traps.putAll( getFlowNodeGraph().getTrapsMap() );
182    }
183
184  protected void addAllGroups()
185    {
186    addGroups( findAllGroups( getElementGraph() ) );
187    }
188
189  @Override
190  public String getID()
191    {
192    return id;
193    }
194
195  public void setOrdinal( int ordinal )
196    {
197    this.ordinal = ordinal;
198    }
199
200  @Override
201  public int getOrdinal()
202    {
203    return ordinal;
204    }
205
206  @Override
207  public String getName()
208    {
209    return name;
210    }
211
212  public void setName( String name )
213    {
214    if( name == null || name.isEmpty() )
215      throw new IllegalArgumentException( "step name may not be null or empty" );
216
217    this.name = name;
218    }
219
220  @Override
221  public Map<String, String> getFlowStepDescriptor()
222    {
223    return Collections.unmodifiableMap( flowStepDescriptor );
224    }
225
226  protected void setFlowStepDescriptor( Map<String, String> flowStepDescriptor )
227    {
228    if( flowStepDescriptor != null )
229      this.flowStepDescriptor = flowStepDescriptor;
230    }
231
232  @Override
233  public Map<String, String> getProcessAnnotations()
234    {
235    if( processAnnotations == null )
236      return Collections.emptyMap();
237
238    return Collections.unmodifiableMap( processAnnotations );
239    }
240
241  @Override
242  public void addProcessAnnotation( Enum annotation )
243    {
244    if( annotation == null )
245      return;
246
247    addProcessAnnotation( annotation.getDeclaringClass().getName(), annotation.name() );
248    }
249
250  @Override
251  public void addProcessAnnotation( String key, String value )
252    {
253    if( processAnnotations == null )
254      processAnnotations = new HashMap<>();
255
256    processAnnotations.put( key, value );
257    }
258
259  public void setFlow( Flow<Config> flow )
260    {
261    this.flow = flow;
262    this.flowID = flow.getID();
263    this.flowName = flow.getName();
264    }
265
266  @Override
267  public Flow<Config> getFlow()
268    {
269    return flow;
270    }
271
272  @Override
273  public String getFlowID()
274    {
275    return flowID;
276    }
277
278  @Override
279  public String getFlowName()
280    {
281    return flowName;
282    }
283
284  protected void setFlowName( String flowName )
285    {
286    this.flowName = flowName;
287    }
288
289  @Override
290  public Config getConfig()
291    {
292    return flowStepConf;
293    }
294
295  @Override
296  public Map<Object, Object> getConfigAsProperties()
297    {
298    return Collections.emptyMap();
299    }
300
301  /**
302   * Set the initialized flowStepConf Config instance
303   *
304   * @param flowStepConf of type Config
305   */
306  protected void setConfig( Config flowStepConf )
307    {
308    this.flowStepConf = flowStepConf;
309    }
310
311  @Override
312  public String getStepDisplayName()
313    {
314    return getStepDisplayName( Util.ID_LENGTH );
315    }
316
317  protected String getStepDisplayName( int idLength )
318    {
319    if( idLength < 0 || idLength > Util.ID_LENGTH )
320      idLength = Util.ID_LENGTH;
321
322    if( idLength == 0 )
323      return String.format( "%s/%s", getFlowName(), getName() );
324
325    String flowID = getFlowID().substring( 0, idLength );
326    String stepID = getID().substring( 0, idLength );
327
328    return String.format( "[%s/%s] %s/%s", flowID, stepID, getFlowName(), getName() );
329    }
330
331  protected String getNodeDisplayName( FlowNode flowNode, int idLength )
332    {
333    if( idLength > Util.ID_LENGTH )
334      idLength = Util.ID_LENGTH;
335
336    String flowID = getFlowID().substring( 0, idLength );
337    String stepID = getID().substring( 0, idLength );
338    String nodeID = flowNode.getID().substring( 0, idLength );
339
340    return String.format( "[%s/%s/%s] %s/%s", flowID, stepID, nodeID, getFlowName(), getName() );
341    }
342
343  @Override
344  public int getSubmitPriority()
345    {
346    return submitPriority;
347    }
348
349  @Override
350  public void setSubmitPriority( int submitPriority )
351    {
352    if( submitPriority < 1 || submitPriority > 10 )
353      throw new IllegalArgumentException( "submitPriority must be between 1 and 10 inclusive, was: " + submitPriority );
354
355    this.submitPriority = submitPriority;
356    }
357
358  @Override
359  public void setFlowStepStats( FlowStepStats flowStepStats )
360    {
361    this.flowStepStats = flowStepStats;
362    }
363
364  @Override
365  public FlowStepStats getFlowStepStats()
366    {
367    return flowStepStats;
368    }
369
370  @Override
371  public ElementGraph getElementGraph()
372    {
373    return elementGraph;
374    }
375
376  protected EnumMultiMap getAnnotations()
377    {
378    return ( (AnnotatedGraph) elementGraph ).getAnnotations();
379    }
380
381  @Override
382  public FlowNodeGraph getFlowNodeGraph()
383    {
384    return flowNodeGraph;
385    }
386
387  @Override
388  public int getNumFlowNodes()
389    {
390    return flowNodeGraph.vertexSet().size();
391    }
392
393  public Set<FlowElement> getSourceElements()
394    {
395    return ElementGraphs.findSources( getElementGraph(), FlowElement.class );
396    }
397
398  public Set<FlowElement> getSinkElements()
399    {
400    return ElementGraphs.findSinks( getElementGraph(), FlowElement.class );
401    }
402
403  @Override
404  public Group getGroup()
405    {
406    if( groups.isEmpty() )
407      return null;
408
409    if( groups.size() > 1 )
410      throw new IllegalStateException( "more than one group" );
411
412    return groups.get( 0 );
413    }
414
415  @Override
416  public Collection<Group> getGroups()
417    {
418    return groups;
419    }
420
421  public void addGroups( Collection<Group> groups )
422    {
423    for( Group group : groups )
424      addGroup( group );
425    }
426
427  public void addGroup( Group group )
428    {
429    if( !groups.contains( group ) )
430      groups.add( group );
431    }
432
433  /**
434   * Returns all source Tap instances annotated by the planner as being {@link StreamMode#Accumulated}.
435   *
436   * @return Set of accumulated Tap instances
437   */
438  public Set<Tap> getAllAccumulatedSources()
439    {
440    return narrowIdentitySet( Tap.class, getFlowNodeGraph().getFlowElementsFor( StreamMode.Accumulated ) );
441    }
442
443  /**
444   * Returns all source Tap instances annotated by the planner as being {@link StreamMode#Streamed}.
445   *
446   * @return Set of streamed Tap instances
447   */
448  public Set<Tap> getAllStreamedSources()
449    {
450    return narrowIdentitySet( Tap.class, getFlowNodeGraph().getFlowElementsFor( StreamMode.Streamed ) );
451    }
452
453  public void addSource( String name, Tap source )
454    {
455    if( !sources.containsKey( source ) )
456      sources.put( source, new HashSet<String>() );
457
458    sources.get( source ).add( name );
459    }
460
461  public void addSink( String name, Tap sink )
462    {
463    if( !sinks.containsKey( sink ) )
464      sinks.put( sink, new HashSet<String>() );
465
466    sinks.get( sink ).add( name );
467    }
468
469  @Override
470  public Set<Tap> getSourceTaps()
471    {
472    return Collections.unmodifiableSet( new HashSet<Tap>( sources.keySet() ) );
473    }
474
475  @Override
476  public Set<Tap> getSinkTaps()
477    {
478    return Collections.unmodifiableSet( new HashSet<Tap>( sinks.keySet() ) );
479    }
480
481  @Override
482  public Tap getSink()
483    {
484    if( sinks.size() == 0 )
485      return null;
486
487    if( sinks.size() > 1 )
488      throw new IllegalStateException( "more than one sink" );
489
490    return sinks.keySet().iterator().next();
491    }
492
493  @Override
494  public Set<String> getSourceName( Tap source )
495    {
496    return Collections.unmodifiableSet( sources.get( source ) );
497    }
498
499  @Override
500  public Set<String> getSinkName( Tap sink )
501    {
502    return Collections.unmodifiableSet( sinks.get( sink ) );
503    }
504
505  @Override
506  public Tap getSourceWith( String identifier )
507    {
508    if( Util.isEmpty( identifier ) )
509      return null;
510
511    for( Tap tap : sources.keySet() )
512      {
513      if( identifier.equalsIgnoreCase( tap.getIdentifier() ) )
514        return tap;
515      }
516
517    return null;
518    }
519
520  @Override
521  public Tap getSinkWith( String identifier )
522    {
523    if( Util.isEmpty( identifier ) )
524      return null;
525
526    for( Tap tap : sinks.keySet() )
527      {
528      if( identifier.equalsIgnoreCase( tap.getIdentifier() ) )
529        return tap;
530      }
531
532    return null;
533    }
534
535  @Override
536  public Map<String, Tap> getTrapMap()
537    {
538    return traps;
539    }
540
541  @Override
542  public Set<Tap> getTraps()
543    {
544    return Collections.unmodifiableSet( new HashSet<Tap>( traps.values() ) );
545    }
546
547  public Tap getTrap( String name )
548    {
549    return getTrapMap().get( name );
550    }
551
552  boolean allSourcesExist() throws IOException
553    {
554    for( Tap tap : sources.keySet() )
555      {
556      if( !tap.resourceExists( getConfig() ) )
557        return false;
558      }
559
560    return true;
561    }
562
563  boolean areSourcesNewer( long sinkModified ) throws IOException
564    {
565    Config config = getConfig();
566    Iterator<Tap> values = sources.keySet().iterator();
567
568    long sourceModified = 0;
569
570    try
571      {
572      sourceModified = Util.getSourceModified( config, values, sinkModified );
573
574      if( sinkModified < sourceModified )
575        return true;
576
577      return false;
578      }
579    finally
580      {
581      if( isInfoEnabled() )
582        logInfo( "source modification date at: " + new Date( sourceModified ) ); // not oldest, we didnt check them all
583      }
584    }
585
586  long getSinkModified() throws IOException
587    {
588    long sinkModified = Util.getSinkModified( getConfig(), sinks.keySet() );
589
590    if( isInfoEnabled() )
591      {
592      if( sinkModified == -1L )
593        logInfo( "at least one sink is marked for delete" );
594      if( sinkModified == 0L )
595        logInfo( "at least one sink does not exist" );
596      else
597        logInfo( "sink oldest modified date: " + new Date( sinkModified ) );
598      }
599
600    return sinkModified;
601    }
602
603  protected Throwable prepareResources()
604    {
605    Throwable throwable = prepareResources( getSourceTaps(), false );
606
607    if( throwable == null )
608      throwable = prepareResources( getSinkTaps(), true );
609
610    if( throwable == null )
611      throwable = prepareResources( getTraps(), true );
612
613    return throwable;
614    }
615
616  private Throwable prepareResources( Collection<Tap> taps, boolean forWrite )
617    {
618    Throwable throwable = null;
619
620    for( Tap tap : taps )
621      {
622      throwable = prepareResource( tap, forWrite );
623
624      if( throwable != null )
625        break;
626      }
627
628    return throwable;
629    }
630
631  private Throwable prepareResource( Tap tap, boolean forWrite )
632    {
633    Throwable throwable = null;
634
635    try
636      {
637      boolean result;
638
639      if( forWrite )
640        result = tap.prepareResourceForWrite( getConfig() );
641      else
642        result = tap.prepareResourceForRead( getConfig() );
643
644      if( !result )
645        {
646        String message = String.format( "unable to prepare tap for %s: %s", forWrite ? "write" : "read", tap.getFullIdentifier( getConfig() ) );
647
648        logError( message );
649
650        throwable = new FlowException( message );
651        }
652      }
653    catch( Throwable exception )
654      {
655      String message = String.format( "unable to prepare tap for %s: %s", forWrite ? "write" : "read", tap.getFullIdentifier( getConfig() ) );
656
657      logError( message, exception );
658
659      throwable = new FlowException( message, exception );
660      }
661
662    return throwable;
663    }
664
665  protected Throwable commitSinks()
666    {
667    Throwable throwable = null;
668
669    for( Tap tap : sinks.keySet() )
670      {
671      if( throwable != null )
672        rollbackResource( tap );
673      else
674        throwable = commitResource( tap );
675      }
676
677    return throwable;
678    }
679
680  private Throwable commitResource( Tap tap )
681    {
682    Throwable throwable = null;
683
684    try
685      {
686      if( !tap.commitResource( getConfig() ) )
687        {
688        String message = "unable to commit sink: " + tap.getFullIdentifier( getConfig() );
689
690        logError( message );
691
692        throwable = new FlowException( message );
693        }
694      }
695    catch( Throwable exception )
696      {
697      String message = "unable to commit sink: " + tap.getFullIdentifier( getConfig() );
698
699      logError( message, exception );
700
701      throwable = new FlowException( message, exception );
702      }
703
704    return throwable;
705    }
706
707  private Throwable rollbackResource( Tap tap )
708    {
709    Throwable throwable = null;
710
711    try
712      {
713      if( !tap.rollbackResource( getConfig() ) )
714        {
715        String message = "unable to rollback sink: " + tap.getFullIdentifier( getConfig() );
716
717        logError( message );
718
719        throwable = new FlowException( message );
720        }
721      }
722    catch( Throwable exception )
723      {
724      String message = "unable to rollback sink: " + tap.getFullIdentifier( getConfig() );
725
726      logError( message, exception );
727
728      throwable = new FlowException( message, exception );
729      }
730
731    return throwable;
732    }
733
734  protected Throwable rollbackSinks()
735    {
736    Throwable throwable = null;
737
738    for( Tap tap : sinks.keySet() )
739      {
740      if( throwable != null )
741        rollbackResource( tap );
742      else
743        throwable = rollbackResource( tap );
744      }
745
746    return throwable;
747    }
748
749  /**
750   * Public for testing.
751   *
752   * @param flowProcess
753   * @param parentConfig
754   * @return
755   */
756  public abstract Config createInitializedConfig( FlowProcess<Config> flowProcess, Config parentConfig );
757
758  protected Set<String> getFieldDeclaredSerializations( Class base )
759    {
760    Collection<SerializableType> serializableTypes = findAllSerializableTypes();
761
762    return serializableTypes.stream()
763      .map( type -> type.getSerializer( base ) )
764      .filter( Objects::nonNull )
765      .map( Class::getName )
766      .collect( Collectors.toSet() );
767    }
768
769  protected Collection<SerializableType> findAllSerializableTypes()
770    {
771    Set<FlowElement> elements = Util.createIdentitySet();
772
773    //todo mark FlowElements as relying on serialization w/ an annotation
774    elements.addAll( narrowIdentitySet( Tap.class, elementGraph.vertexSet() ) );
775    elements.addAll( narrowIdentitySet( Splice.class, elementGraph.vertexSet() ) );
776
777    Set<SerializableType> types = new HashSet<>();
778
779    for( FlowElement element : elements )
780      {
781      Fields fields = elementGraph.outgoingEdgesOf( element ).iterator().next().getOutValuesFields();
782
783      Type[] fieldTypes = fields.getTypes();
784
785      if( fieldTypes == null )
786        continue;
787
788      for( Type type : fieldTypes )
789        {
790        if( type instanceof SerializableType )
791          types.add( (SerializableType) type );
792        }
793      }
794
795    return types;
796    }
797
798  /**
799   * Method getPreviousScopes returns the previous Scope instances. If the flowElement is a Group (specifically a CoGroup),
800   * there will be more than one instance.
801   *
802   * @param flowElement of type FlowElement
803   * @return Set
804   */
805  public Set<Scope> getPreviousScopes( FlowElement flowElement )
806    {
807    return getElementGraph().incomingEdgesOf( flowElement );
808    }
809
810  /**
811   * Method getNextScope returns the next Scope instance in the graph. There will always only be one next.
812   *
813   * @param flowElement of type FlowElement
814   * @return Scope
815   */
816  public Scope getNextScope( FlowElement flowElement )
817    {
818    Set<Scope> set = getElementGraph().outgoingEdgesOf( flowElement );
819
820    if( set.size() != 1 )
821      throw new IllegalStateException( "should only be one scope after current flow element: " + flowElement + " found: " + set.size() );
822
823    return set.iterator().next();
824    }
825
826  public FlowElement getNextFlowElement( Scope scope )
827    {
828    return getElementGraph().getEdgeTarget( scope );
829    }
830
831  public Collection<Operation> getAllOperations()
832    {
833    Set<FlowElement> vertices = getElementGraph().vertexSet();
834    List<Operation> operations = new ArrayList<Operation>(); // operations impl equals, so two instance may be the same
835
836    for( FlowElement vertex : vertices )
837      {
838      if( vertex instanceof Operator )
839        operations.add( ( (Operator) vertex ).getOperation() );
840      }
841
842    return operations;
843    }
844
845  @Override
846  public boolean containsPipeNamed( String pipeName )
847    {
848    Set<FlowElement> vertices = getElementGraph().vertexSet();
849
850    for( FlowElement vertex : vertices )
851      {
852      if( vertex instanceof Pipe && ( (Pipe) vertex ).getName().equals( pipeName ) )
853        return true;
854      }
855
856    return false;
857    }
858
859  public void clean()
860    {
861    // use step config by default
862    clean( getConfig() );
863    }
864
865  public abstract void clean( Config config );
866
867  List<SafeFlowStepListener> getListeners()
868    {
869    if( listeners == null )
870      listeners = new LinkedList<SafeFlowStepListener>();
871
872    return listeners;
873    }
874
875  @Override
876  public boolean hasListeners()
877    {
878    return listeners != null && !listeners.isEmpty();
879    }
880
881  @Override
882  public void addListener( FlowStepListener flowStepListener )
883    {
884    getListeners().add( new SafeFlowStepListener( flowStepListener ) );
885    }
886
887  @Override
888  public boolean removeListener( FlowStepListener flowStepListener )
889    {
890    return getListeners().remove( new SafeFlowStepListener( flowStepListener ) );
891    }
892
893  protected void fireOnCompleted()
894    {
895    if( hasListeners() )
896      {
897      if( isDebugEnabled() )
898        logDebug( "firing onCompleted event: " + getListeners().size() );
899
900      for( Object flowStepListener : getListeners() )
901        ( (FlowStepListener) flowStepListener ).onStepCompleted( this );
902      }
903    }
904
905  protected void fireOnThrowable( Throwable throwable )
906    {
907    if( hasListeners() )
908      {
909      if( isDebugEnabled() )
910        logDebug( "firing onThrowable event: " + getListeners().size() );
911
912      for( Object flowStepListener : getListeners() )
913        ( (FlowStepListener) flowStepListener ).onStepThrowable( this, throwable );
914      }
915    }
916
917  protected void fireOnStopping()
918    {
919    if( hasListeners() )
920      {
921      if( isDebugEnabled() )
922        logDebug( "firing onStopping event: " + getListeners() );
923
924      for( Object flowStepListener : getListeners() )
925        ( (FlowStepListener) flowStepListener ).onStepStopping( this );
926      }
927    }
928
929  protected void fireOnStarting()
930    {
931    if( hasListeners() )
932      {
933      if( isDebugEnabled() )
934        logDebug( "firing onStarting event: " + getListeners().size() );
935
936      for( Object flowStepListener : getListeners() )
937        ( (FlowStepListener) flowStepListener ).onStepStarting( this );
938      }
939    }
940
941  protected void fireOnRunning()
942    {
943    if( hasListeners() )
944      {
945      if( isDebugEnabled() )
946        logDebug( "firing onRunning event: " + getListeners().size() );
947
948      for( Object flowStepListener : getListeners() )
949        ( (FlowStepListener) flowStepListener ).onStepRunning( this );
950      }
951    }
952
953  protected ClientState createClientState( FlowProcess flowProcess )
954    {
955    CascadingServices services = flowProcess.getCurrentSession().getCascadingServices();
956
957    if( services == null )
958      return ClientState.NULL;
959
960    return services.createClientState( getID() );
961    }
962
963  public FlowStepJob<Config> getFlowStepJob()
964    {
965    return flowStepJob;
966    }
967
968  public FlowStepJob<Config> getCreateFlowStepJob( FlowProcess<Config> flowProcess, Config parentConfig )
969    {
970    if( flowStepJob != null )
971      return flowStepJob;
972
973    if( flowProcess == null )
974      return null;
975
976    Config initializedConfig = createInitializedConfig( flowProcess, parentConfig );
977
978    setConfig( initializedConfig );
979
980    ClientState clientState = createClientState( flowProcess );
981
982    flowStepJob = createFlowStepJob( clientState, flowProcess, initializedConfig );
983
984    return flowStepJob;
985    }
986
987  protected abstract FlowStepJob createFlowStepJob( ClientState clientState, FlowProcess<Config> flowProcess, Config initializedStepConfig );
988
989  protected void initConfFromNodeConfigDef( ElementGraph nodeElementGraph, ConfigDef.Setter setter )
990    {
991    nodeElementGraph = ElementGraphs.asExtentMaskedSubGraph( nodeElementGraph );
992
993    ElementGraph stepElementGraph = ElementGraphs.asExtentMaskedSubGraph( getElementGraph() );
994
995    // applies each mode in order, topologically
996    for( ConfigDef.Mode mode : ConfigDef.Mode.values() )
997      {
998      Iterator<FlowElement> iterator = ElementGraphs.getTopologicalIterator( nodeElementGraph );
999
1000      while( iterator.hasNext() )
1001        {
1002        FlowElement element = iterator.next();
1003
1004        while( element != null )
1005          {
1006          // intentionally skip any element that spans downstream nodes, like a GroupBy
1007          // this way GroupBy is applied on the inbound side (where partitioning happens)
1008          // not the outbound side.
1009          // parent sub-assemblies (like Unique) will be applied if they have leading Pipes to the current spanning Pipe
1010          if( elementSpansDownStream( stepElementGraph, nodeElementGraph, element ) )
1011            {
1012            element = null;
1013            continue;
1014            }
1015
1016          if( element instanceof ScopedElement && ( (ScopedElement) element ).hasNodeConfigDef() )
1017            ( (ScopedElement) element ).getNodeConfigDef().apply( mode, setter );
1018
1019          // walk up the sub-assembly parent hierarchy
1020          if( element instanceof Pipe )
1021            element = ( (Pipe) element ).getParent();
1022          else
1023            element = null;
1024          }
1025        }
1026      }
1027    }
1028
1029  private boolean elementSpansDownStream( ElementGraph stepElementGraph, ElementGraph nodeElementGraph, FlowElement element )
1030    {
1031    boolean spansNodes = !( element instanceof SubAssembly );
1032
1033    if( spansNodes )
1034      spansNodes = nodeElementGraph.outDegreeOf( element ) == 0 && stepElementGraph.outDegreeOf( element ) > 0;
1035
1036    return spansNodes;
1037    }
1038
1039  protected void initConfFromStepConfigDef( ConfigDef.Setter setter )
1040    {
1041    ElementGraph stepElementGraph = ElementGraphs.asExtentMaskedSubGraph( getElementGraph() );
1042
1043    // applies each mode in order, topologically
1044    for( ConfigDef.Mode mode : ConfigDef.Mode.values() )
1045      {
1046      Iterator<FlowElement> iterator = ElementGraphs.getTopologicalIterator( stepElementGraph );
1047
1048      while( iterator.hasNext() )
1049        {
1050        FlowElement element = iterator.next();
1051
1052        while( element != null )
1053          {
1054          if( element instanceof ScopedElement && ( (ScopedElement) element ).hasStepConfigDef() )
1055            ( (ScopedElement) element ).getStepConfigDef().apply( mode, setter );
1056
1057          // walk up the sub-assembly parent hierarchy
1058          if( element instanceof Pipe )
1059            element = ( (Pipe) element ).getParent();
1060          else
1061            element = null;
1062          }
1063        }
1064      }
1065    }
1066
1067  protected static void addSources( BaseFlowStep flowStep, ElementGraph elementGraph, Set<Tap> sources )
1068    {
1069    for( Tap tap : sources )
1070      {
1071      for( Scope scope : elementGraph.outgoingEdgesOf( tap ) )
1072        flowStep.addSource( scope.getName(), tap );
1073      }
1074    }
1075
1076  protected static void addSinks( BaseFlowStep flowStep, ElementGraph elementGraph, Set<Tap> sinks )
1077    {
1078    for( Tap tap : sinks )
1079      {
1080      for( Scope scope : elementGraph.incomingEdgesOf( tap ) )
1081        flowStep.addSink( scope.getName(), tap );
1082      }
1083    }
1084
1085  @Override
1086  public boolean equals( Object object )
1087    {
1088    if( this == object )
1089      return true;
1090    if( object == null || getClass() != object.getClass() )
1091      return false;
1092
1093    BaseFlowStep flowStep = (BaseFlowStep) object;
1094
1095    if( id != null ? !id.equals( flowStep.id ) : flowStep.id != null )
1096      return false;
1097
1098    return true;
1099    }
1100
1101  @Override
1102  public int hashCode()
1103    {
1104    return id != null ? id.hashCode() : 0;
1105    }
1106
1107  @Override
1108  public String toString()
1109    {
1110    StringBuffer buffer = new StringBuffer();
1111
1112    buffer.append( getClass().getSimpleName() );
1113    buffer.append( "[name: " ).append( getName() ).append( "]" );
1114
1115    return buffer.toString();
1116    }
1117
1118  @Override
1119  public final boolean isInfoEnabled()
1120    {
1121    return getLogger().isInfoEnabled();
1122    }
1123
1124  private ProcessLogger getLogger()
1125    {
1126    if( flow != null && flow instanceof ProcessLogger )
1127      return (ProcessLogger) flow;
1128
1129    return ProcessLogger.NULL;
1130    }
1131
1132  @Override
1133  public final boolean isDebugEnabled()
1134    {
1135    return ( getLogger() ).isDebugEnabled();
1136    }
1137
1138  @Override
1139  public void logDebug( String message, Object... arguments )
1140    {
1141    getLogger().logDebug( message, arguments );
1142    }
1143
1144  @Override
1145  public void logInfo( String message, Object... arguments )
1146    {
1147    getLogger().logInfo( message, arguments );
1148    }
1149
1150  @Override
1151  public void logWarn( String message )
1152    {
1153    getLogger().logWarn( message );
1154    }
1155
1156  @Override
1157  public void logWarn( String message, Throwable throwable )
1158    {
1159    getLogger().logWarn( message, throwable );
1160    }
1161
1162  @Override
1163  public void logWarn( String message, Object... arguments )
1164    {
1165    getLogger().logWarn( message, arguments );
1166    }
1167
1168  @Override
1169  public void logError( String message, Object... arguments )
1170    {
1171    getLogger().logError( message, arguments );
1172    }
1173
1174  @Override
1175  public void logError( String message, Throwable throwable )
1176    {
1177    getLogger().logError( message, throwable );
1178    }
1179
1180  /**
1181   * Class SafeFlowStepListener safely calls a wrapped FlowStepListener.
1182   * <p>
1183   * This is done for a few reasons, the primary reason is so exceptions thrown by the Listener
1184   * can be caught by the calling Thread. Since Flow is asynchronous, much of the work is done in the run() method
1185   * which in turn is run in a new Thread.
1186   */
1187  private class SafeFlowStepListener implements FlowStepListener
1188    {
1189    /** Field flowListener */
1190    final FlowStepListener flowStepListener;
1191    /** Field throwable */
1192    Throwable throwable;
1193
1194    private SafeFlowStepListener( FlowStepListener flowStepListener )
1195      {
1196      this.flowStepListener = flowStepListener;
1197      }
1198
1199    public void onStepStarting( FlowStep flowStep )
1200      {
1201      try
1202        {
1203        flowStepListener.onStepStarting( flowStep );
1204        }
1205      catch( Throwable throwable )
1206        {
1207        handleThrowable( throwable );
1208        }
1209      }
1210
1211    public void onStepStopping( FlowStep flowStep )
1212      {
1213      try
1214        {
1215        flowStepListener.onStepStopping( flowStep );
1216        }
1217      catch( Throwable throwable )
1218        {
1219        handleThrowable( throwable );
1220        }
1221      }
1222
1223    public void onStepCompleted( FlowStep flowStep )
1224      {
1225      try
1226        {
1227        flowStepListener.onStepCompleted( flowStep );
1228        }
1229      catch( Throwable throwable )
1230        {
1231        handleThrowable( throwable );
1232        }
1233      }
1234
1235    public void onStepRunning( FlowStep flowStep )
1236      {
1237      try
1238        {
1239        flowStepListener.onStepRunning( flowStep );
1240        }
1241      catch( Throwable throwable )
1242        {
1243        handleThrowable( throwable );
1244        }
1245      }
1246
1247    public boolean onStepThrowable( FlowStep flowStep, Throwable flowStepThrowable )
1248      {
1249      try
1250        {
1251        return flowStepListener.onStepThrowable( flowStep, flowStepThrowable );
1252        }
1253      catch( Throwable throwable )
1254        {
1255        handleThrowable( throwable );
1256        }
1257
1258      return false;
1259      }
1260
1261    private void handleThrowable( Throwable throwable )
1262      {
1263      this.throwable = throwable;
1264
1265      logWarn( String.format( "flow step listener %s threw throwable", flowStepListener ), throwable );
1266      }
1267
1268    public boolean equals( Object object )
1269      {
1270      if( object instanceof BaseFlowStep.SafeFlowStepListener )
1271        return flowStepListener.equals( ( (BaseFlowStep.SafeFlowStepListener) object ).flowStepListener );
1272
1273      return flowStepListener.equals( object );
1274      }
1275
1276    public int hashCode()
1277      {
1278      return flowStepListener.hashCode();
1279      }
1280    }
1281  }