001    /*
002     * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.flow;
022    
023    import java.io.IOException;
024    import java.util.Collection;
025    import java.util.List;
026    import java.util.Map;
027    
028    import cascading.flow.planner.PlatformInfo;
029    import cascading.management.UnitOfWork;
030    import cascading.stats.FlowStats;
031    import cascading.tap.Tap;
032    import cascading.tuple.TupleEntryCollector;
033    import cascading.tuple.TupleEntryIterator;
034    
035    /**
036     * A Flow is a logical unit of work declared by an assembly of {@link cascading.pipe.Pipe} instances connected to source
037     * and sink {@link Tap} instances.
038     * <p/>
039     * A Flow is then executed to push the incoming source data through the assembly into one or more sinks.
040     * <p/>
041     * A Flow sub-class instance may not be instantiated directly in most cases, see sub-classes of {@link FlowConnector} class
042     * for supported platforms.
043     * <p/>
044     * Note that {@link cascading.pipe.Pipe} assemblies can be reused in multiple Flow instances. They maintain
045     * no state regarding the Flow execution. Subsequently, {@link cascading.pipe.Pipe} assemblies can be given
046     * parameters through its calling Flow so they can be built in a generic fashion.
047     * <p/>
048     * When a Flow is created, an optimized internal representation is created that is then executed
049     * on the underlying execution platform. This is typically done by creating one or more {@link FlowStep} instances.
050     * </p>
051     * Flows are submitted in order of dependency when used with a {@link cascading.cascade.Cascade}. If two or more steps do not share the
052     * same dependencies and all can be scheduled simultaneously, the {@link #getSubmitPriority()} value determines
053     * the order in which all steps will be submitted for execution. The default submit priority is 5.
054     * <p/>
055     * Use the {@link FlowListener} to receive any events on the life-cycle of the Flow as it executes. Any
056     * {@link Tap} instances owned by the Flow also implementing FlowListener will automatically be added to the
057     * set of listeners.
058     *
059     * @see FlowListener
060     * @see cascading.flow.FlowConnector
061     */
062    public interface Flow<Config> extends UnitOfWork<FlowStats>
063      {
064      String CASCADING_FLOW_ID = "cascading.flow.id";
065    
066      /**
067       * Method getName returns the name of this Flow object.
068       *
069       * @return the name (type String) of this Flow object.
070       */
071      @Override
072      String getName();
073    
074      /**
075       * Method prepare is used by a {@link cascading.cascade.Cascade} to notify the given Flow it should initialize or clear any resources
076       * necessary for {@link #start()} to be called successfully.
077       * <p/>
078       * Specifically, this implementation calls {@link BaseFlow#deleteSinksIfNotUpdate()} && {@link BaseFlow#deleteTrapsIfNotUpdate()}.
079       *
080       * @throws java.io.IOException when
081       */
082      @Override
083      void prepare();
084    
085      /**
086       * Method start begins the execution of this Flow instance. It will return immediately. Use the method {@link #complete()}
087       * to block until this Flow completes.
088       */
089      @Override
090      void start();
091    
092      /** Method stop stops all running jobs, killing any currently executing. */
093      @Override
094      void stop();
095    
096      /** Method complete starts the current Flow instance if it has not be previously started, then block until completion. */
097      @Override
098      void complete();
099    
100      @Override
101      void cleanup();
102    
103      /**
104       * Method getConfig returns the internal configuration object.
105       * <p/>
106       * Any changes to this object will not be reflected in child steps. See {@link cascading.flow.FlowConnector} for setting
107       * default properties visible to children. Or see {@link cascading.flow.FlowStepStrategy} for setting properties on
108       * individual steps before they are executed.
109       *
110       * @return the default configuration of this Flow
111       */
112      Config getConfig();
113    
114      /**
115       * Method getConfigCopy returns a copy of the internal configuration object. This object can be safely
116       * modified.
117       *
118       * @return a copy of the default configuration of this Flow
119       */
120      Config getConfigCopy();
121    
122      /**
123       * Method getConfiAsProperties converts the internal configuration object into a {@link java.util.Map} of
124       * key value pairs.
125       *
126       * @return a Map of key/value pairs
127       */
128      Map<Object, Object> getConfigAsProperties();
129    
130      String getProperty( String key );
131    
132      /**
133       * Method getID returns the ID of this Flow object.
134       * <p/>
135       * The ID value is a long HEX String used to identify this instance globally. Subsequent Flow
136       * instances created with identical parameters will not return the same ID.
137       *
138       * @return the ID (type String) of this Flow object.
139       */
140      @Override
141      String getID();
142    
143      /**
144       * Returns an immutable map of properties giving more details about the Flow object.
145       * <p/>
146       * See {@link cascading.flow.FlowDef#addDescription(String, String)} to set values on a given Flow.
147       * <p/>
148       * Flow descriptions provide meta-data to monitoring systems describing the workload a given Flow represents.
149       * For known description types, see {@link FlowDescriptors}.
150       *
151       * @return Map<String,String>
152       */
153      public Map<String, String> getFlowDescriptor();
154    
155      @Override
156      String getTags();
157    
158      /**
159       * Method getSubmitPriority returns the submitPriority of this Flow object.
160       * <p/>
161       * 10 is lowest, 1 is the highest, 5 is the default.
162       *
163       * @return the submitPriority (type int) of this FlowStep object.
164       */
165      int getSubmitPriority();
166    
167      /**
168       * Method setSubmitPriority sets the submitPriority of this Flow object.
169       * <p/>
170       * 10 is lowest, 1 is the highest, 5 is the default.
171       *
172       * @param submitPriority the submitPriority of this FlowStep object.
173       */
174      void setSubmitPriority( int submitPriority );
175    
176      FlowProcess<Config> getFlowProcess();
177    
178      /**
179       * Method getFlowStats returns the flowStats of this Flow object.
180       *
181       * @return the flowStats (type FlowStats) of this Flow object.
182       */
183      FlowStats getFlowStats();
184    
185      /**
186       * Method hasListeners returns true if {@link FlowListener} instances have been registered.
187       *
188       * @return boolean
189       */
190      boolean hasListeners();
191    
192      /**
193       * Method addListener registers the given flowListener with this instance.
194       *
195       * @param flowListener of type FlowListener
196       */
197      void addListener( FlowListener flowListener );
198    
199      /**
200       * Method removeListener removes the given flowListener from this instance.
201       *
202       * @param flowListener of type FlowListener
203       * @return true if the listener was removed
204       */
205      boolean removeListener( FlowListener flowListener );
206    
207      /**
208       * Method hasStepListeners returns true if {@link FlowStepListener} instances have been registered
209       * with any of the {@link FlowStep}s belonging to this instance
210       *
211       * @return boolean
212       */
213      boolean hasStepListeners();
214    
215      /**
216       * Method addStepListener registers the given flowStepListener with this instance.
217       *
218       * @param flowStepListener of type addStepListener
219       */
220      void addStepListener( FlowStepListener flowStepListener );
221    
222      /**
223       * Method removeStepListener removes the given flowStepListener from this instance.
224       *
225       * @param flowStepListener of type FlowStepListener
226       * @return true if the listener was removed from all the {@link FlowStep} belonging to this instance
227       */
228      boolean removeStepListener( FlowStepListener flowStepListener );
229    
230      /**
231       * Method getSources returns the sources of this Flow object.
232       *
233       * @return the sources (type Map) of this Flow object.
234       */
235      Map<String, Tap> getSources();
236    
237      List<String> getSourceNames();
238    
239      Tap getSource( String name );
240    
241      /**
242       * Method getSourcesCollection returns a {@link Collection} of source {@link Tap}s for this Flow object.
243       *
244       * @return the sourcesCollection (type Collection<Tap>) of this Flow object.
245       */
246      Collection<Tap> getSourcesCollection();
247    
248      /**
249       * Method getSinks returns the sinks of this Flow object.
250       *
251       * @return the sinks (type Map) of this Flow object.
252       */
253      Map<String, Tap> getSinks();
254    
255      List<String> getSinkNames();
256    
257      Tap getSink( String name );
258    
259      /**
260       * Method getSinksCollection returns a {@link Collection} of sink {@link Tap}s for this Flow object.
261       *
262       * @return the sinkCollection (type Collection<Tap>) of this Flow object.
263       */
264      Collection<Tap> getSinksCollection();
265    
266      /**
267       * Method getSink returns the first sink of this Flow object.
268       *
269       * @return the sink (type Tap) of this Flow object.
270       */
271      Tap getSink();
272    
273      /**
274       * Method getTraps returns the traps of this Flow object.
275       *
276       * @return the traps (type Map<String, Tap>) of this Flow object.
277       */
278      Map<String, Tap> getTraps();
279    
280      List<String> getTrapNames();
281    
282      /**
283       * Method getTrapsCollection returns a {@link Collection} of trap {@link Tap}s for this Flow object.
284       *
285       * @return the trapsCollection (type Collection<Tap>) of this Flow object.
286       */
287      Collection<Tap> getTrapsCollection();
288    
289      /**
290       * Method getCheckpoints returns the checkpoint taps of this Flow object.
291       *
292       * @return the traps (type Map<String, Tap>) of this Flow object.
293       */
294      Map<String, Tap> getCheckpoints();
295    
296      List<String> getCheckpointNames();
297    
298      /**
299       * Method getCheckpointsCollection returns a {@link Collection} of checkpoint {@link Tap}s for this Flow object.
300       *
301       * @return the trapsCollection (type Collection<Tap>) of this Flow object.
302       */
303      Collection<Tap> getCheckpointsCollection();
304    
305      /**
306       * Method getFlowSkipStrategy returns the current {@link cascading.flow.FlowSkipStrategy} used by this Flow.
307       *
308       * @return FlowSkipStrategy
309       */
310      FlowSkipStrategy getFlowSkipStrategy();
311    
312      /**
313       * Method setFlowSkipStrategy sets a new {@link cascading.flow.FlowSkipStrategy}, the current strategy is returned.
314       * <p/>
315       * FlowSkipStrategy instances define when a Flow instance should be skipped. The default strategy is {@link FlowSkipIfSinkNotStale}.
316       * An alternative strategy would be {@link cascading.flow.FlowSkipIfSinkExists}.
317       * <p/>
318       * A FlowSkipStrategy will not be consulted when executing a Flow directly through {@link #start()} or {@link #complete()}. Only
319       * when the Flow is executed through a {@link cascading.cascade.Cascade} instance.
320       *
321       * @param flowSkipStrategy of type FlowSkipStrategy
322       * @return FlowSkipStrategy
323       */
324      FlowSkipStrategy setFlowSkipStrategy( FlowSkipStrategy flowSkipStrategy );
325    
326      /**
327       * Method isSkipFlow returns true if the parent {@link cascading.cascade.Cascade} should skip this Flow instance. True is returned
328       * if the current {@link cascading.flow.FlowSkipStrategy} returns true.
329       *
330       * @return the skipFlow (type boolean) of this Flow object.
331       * @throws IOException when
332       */
333      boolean isSkipFlow() throws IOException;
334    
335      /**
336       * Method areSinksStale returns true if any of the sinks referenced are out of date in relation to the sources. Or
337       * if any sink method {@link cascading.tap.Tap#isReplace()} returns true.
338       *
339       * @return boolean
340       * @throws java.io.IOException when
341       */
342      boolean areSinksStale() throws IOException;
343    
344      /**
345       * Method areSourcesNewer returns true if any source is newer than the given sinkModified date value.
346       *
347       * @param sinkModified of type long
348       * @return boolean
349       * @throws java.io.IOException when
350       */
351      boolean areSourcesNewer( long sinkModified ) throws IOException;
352    
353      /**
354       * Method getSinkModified returns the youngest modified date of any sink {@link cascading.tap.Tap} managed by this Flow instance.
355       * <p/>
356       * If zero (0) is returned, at least one of the sink resources does not exist. If minus one (-1) is returned,
357       * atleast one of the sinks are marked for delete ({@link cascading.tap.Tap#isReplace() returns true}).
358       *
359       * @return the sinkModified (type long) of this Flow object.
360       * @throws java.io.IOException when
361       */
362      long getSinkModified() throws IOException;
363    
364      /**
365       * Returns the current {@link FlowStepStrategy} instance.
366       *
367       * @return FlowStepStrategy
368       */
369      FlowStepStrategy getFlowStepStrategy();
370    
371      /**
372       * Sets a default {@link FlowStepStrategy} instance.
373       * <p/>
374       * Use a FlowStepStrategy to change {@link cascading.flow.FlowStep} configuration properties
375       * before the properties are submitted to the underlying platform for the step
376       * unit of work.
377       *
378       * @param flowStepStrategy The FlowStepStrategy to use.
379       */
380      void setFlowStepStrategy( FlowStepStrategy flowStepStrategy );
381    
382      /**
383       * Method getFlowSteps returns the flowSteps of this Flow object. They will be in topological order.
384       *
385       * @return the steps (type List<FlowStep>) of this Flow object.
386       */
387      List<FlowStep<Config>> getFlowSteps();
388    
389      /**
390       * Method openSource opens the first source Tap.
391       *
392       * @return TupleIterator
393       * @throws IOException when
394       */
395      TupleEntryIterator openSource() throws IOException;
396    
397      /**
398       * Method openSource opens the named source Tap.
399       *
400       * @param name of type String
401       * @return TupleIterator
402       * @throws IOException when
403       */
404      TupleEntryIterator openSource( String name ) throws IOException;
405    
406      /**
407       * Method openSink opens the first sink Tap.
408       *
409       * @return TupleIterator
410       * @throws IOException when
411       */
412      TupleEntryIterator openSink() throws IOException;
413    
414      /**
415       * Method openSink opens the named sink Tap.
416       *
417       * @param name of type String
418       * @return TupleIterator
419       * @throws IOException when
420       */
421      TupleEntryIterator openSink( String name ) throws IOException;
422    
423      /**
424       * Method openTrap opens the first trap Tap.
425       *
426       * @return TupleIterator
427       * @throws IOException when
428       */
429      TupleEntryIterator openTrap() throws IOException;
430    
431      /**
432       * Method openTrap opens the named trap Tap.
433       *
434       * @param name of type String
435       * @return TupleIterator
436       * @throws IOException when
437       */
438      TupleEntryIterator openTrap( String name ) throws IOException;
439    
440      /**
441       * Method resourceExists returns true if the resource represented by the given Tap instance exists.
442       *
443       * @param tap of type Tap
444       * @return boolean
445       * @throws IOException when
446       */
447      boolean resourceExists( Tap tap ) throws IOException;
448    
449      /**
450       * Method openTapForRead return a {@link cascading.tuple.TupleEntryIterator} for the given Tap instance.
451       * <p/>
452       * Note the returned iterator will return the same instance of {@link cascading.tuple.TupleEntry} on every call,
453       * thus a copy must be made of either the TupleEntry or the underlying {@code Tuple} instance if they are to be
454       * stored in a Collection.
455       *
456       * @param tap of type Tap
457       * @return TupleIterator
458       * @throws IOException when there is an error opening the resource
459       */
460      TupleEntryIterator openTapForRead( Tap tap ) throws IOException;
461    
462      /**
463       * Method openTapForWrite returns a (@link TupleCollector} for the given Tap instance.
464       *
465       * @param tap of type Tap
466       * @return TupleCollector
467       * @throws IOException when there is an error opening the resource
468       */
469      TupleEntryCollector openTapForWrite( Tap tap ) throws IOException;
470    
471      /**
472       * Method writeDOT writes this Flow instance to the given filename as a DOT file for import into a graphics package.
473       *
474       * @param filename of type String
475       */
476      void writeDOT( String filename );
477    
478      /**
479       * Method writeStepsDOT writes this Flow step graph to the given filename as a DOT file for import into a graphics package.
480       *
481       * @param filename of type String
482       */
483      void writeStepsDOT( String filename );
484    
485      String getCascadeID();
486    
487      String getRunID();
488    
489      PlatformInfo getPlatformInfo();
490    
491      /**
492       * Method jobsAreLocal returns true if all jobs are executed in-process as a single map and reduce task.
493       *
494       * @return boolean
495       */
496      boolean stepsAreLocal();
497    
498      /**
499       * Method isStopJobsOnExit returns the stopJobsOnExit of this Flow object. Defaults to {@code true}.
500       *
501       * @return the stopJobsOnExit (type boolean) of this Flow object.
502       */
503      boolean isStopJobsOnExit();
504      }