001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.flow;
022    
023    import java.io.IOException;
024    import java.util.Collection;
025    import java.util.LinkedHashMap;
026    import java.util.List;
027    import java.util.Map;
028    
029    import cascading.flow.planner.PlatformInfo;
030    import cascading.management.UnitOfWork;
031    import cascading.stats.FlowStats;
032    import cascading.tap.Tap;
033    import cascading.tuple.TupleEntryCollector;
034    import cascading.tuple.TupleEntryIterator;
035    
036    /**
037     * A Flow is a logical unit of work declared by an assembly of {@link cascading.pipe.Pipe} instances connected to source
038     * and sink {@link Tap} instances.
039     * <p/>
040     * A Flow is then executed to push the incoming source data through the assembly into one or more sinks.
041     * <p/>
042     * A Flow sub-class instance may not be instantiated directly in most cases, see sub-classes of {@link FlowConnector} class
043     * for supported platforms.
044     * <p/>
045     * Note that {@link cascading.pipe.Pipe} assemblies can be reused in multiple Flow instances. They maintain
046     * no state regarding the Flow execution. Subsequently, {@link cascading.pipe.Pipe} assemblies can be given
047     * parameters through its calling Flow so they can be built in a generic fashion.
048     * <p/>
049     * When a Flow is created, an optimized internal representation is created that is then executed
050     * on the underlying execution platform. This is typically done by creating one or more {@link FlowStep} instances.
051     * </p>
052     * Flows are submitted in order of dependency when used with a {@link cascading.cascade.Cascade}. If two or more steps do not share the
053     * same dependencies and all can be scheduled simultaneously, the {@link #getSubmitPriority()} value determines
054     * the order in which all steps will be submitted for execution. The default submit priority is 5.
055     * <p/>
056     * Use the {@link FlowListener} to receive any events on the life-cycle of the Flow as it executes. Any
057     * {@link Tap} instances owned by the Flow also implementing FlowListener will automatically be added to the
058     * set of listeners.
059     *
060     * @see FlowListener
061     * @see cascading.flow.FlowConnector
062     */
063    public interface Flow<Config> extends UnitOfWork<FlowStats>
064      {
065      String CASCADING_FLOW_ID = "cascading.flow.id";
066    
067      /**
068       * Method getName returns the name of this Flow object.
069       *
070       * @return the name (type String) of this Flow object.
071       */
072      @Override
073      String getName();
074    
075      /**
076       * Method prepare is used by a {@link cascading.cascade.Cascade} to notify the given Flow it should initialize or clear any resources
077       * necessary for {@link #start()} to be called successfully.
078       * <p/>
079       * Specifically, this implementation calls {@link BaseFlow#deleteSinksIfNotUpdate()} && {@link BaseFlow#deleteTrapsIfNotUpdate()}.
080       *
081       * @throws java.io.IOException when
082       */
083      @Override
084      void prepare();
085    
086      /**
087       * Method start begins the execution of this Flow instance. It will return immediately. Use the method {@link #complete()}
088       * to block until this Flow completes.
089       */
090      @Override
091      void start();
092    
093      /** Method stop stops all running jobs, killing any currently executing. */
094      @Override
095      void stop();
096    
097      /** Method complete starts the current Flow instance if it has not be previously started, then block until completion. */
098      @Override
099      void complete();
100    
101      @Override
102      void cleanup();
103    
104      /**
105       * Method getConfig returns the internal configuration object.
106       * <p/>
107       * Any changes to this object will not be reflected in child steps. See {@link cascading.flow.FlowConnector} for setting
108       * default properties visible to children. Or see {@link cascading.flow.FlowStepStrategy} for setting properties on
109       * individual steps before they are executed.
110       *
111       * @return the default configuration of this Flow
112       */
113      Config getConfig();
114    
115      /**
116       * Method getConfigCopy returns a copy of the internal configuration object. This object can be safely
117       * modified.
118       *
119       * @return a copy of the default configuration of this Flow
120       */
121      Config getConfigCopy();
122    
123      /**
124       * Method getConfiAsProperties converts the internal configuration object into a {@link java.util.Map} of
125       * key value pairs.
126       *
127       * @return a Map of key/value pairs
128       */
129      Map<Object, Object> getConfigAsProperties();
130    
131      String getProperty( String key );
132    
133      /**
134       * Method getID returns the ID of this Flow object.
135       * <p/>
136       * The ID value is a long HEX String used to identify this instance globally. Subsequent Flow
137       * instances created with identical parameters will not return the same ID.
138       *
139       * @return the ID (type String) of this Flow object.
140       */
141      @Override
142      String getID();
143    
144      /**
145       * Returns an immutable map of properties giving more details about the Flow object.
146       * <p/>
147       * See {@link cascading.flow.FlowDef#addDescription(String, String)} to set values on a given Flow.
148       * <p/>
149       * Flow descriptions provide meta-data to monitoring systems describing the workload a given Flow represents.
150       * For known description types, see {@link cascading.flow.FLowDescriptors}.
151       *
152       * @return Map<String,String>
153       */
154      public Map<String, String> getFlowDescriptor();
155    
156      @Override
157      String getTags();
158    
159      /**
160       * Method getSubmitPriority returns the submitPriority of this Flow object.
161       * <p/>
162       * 10 is lowest, 1 is the highest, 5 is the default.
163       *
164       * @return the submitPriority (type int) of this FlowStep object.
165       */
166      int getSubmitPriority();
167    
168      /**
169       * Method setSubmitPriority sets the submitPriority of this Flow object.
170       * <p/>
171       * 10 is lowest, 1 is the highest, 5 is the default.
172       *
173       * @param submitPriority the submitPriority of this FlowStep object.
174       */
175      void setSubmitPriority( int submitPriority );
176    
177      FlowProcess<Config> getFlowProcess();
178    
179      /**
180       * Method getFlowStats returns the flowStats of this Flow object.
181       *
182       * @return the flowStats (type FlowStats) of this Flow object.
183       */
184      FlowStats getFlowStats();
185    
186      /**
187       * Method hasListeners returns true if {@link FlowListener} instances have been registered.
188       *
189       * @return boolean
190       */
191      boolean hasListeners();
192    
193      /**
194       * Method addListener registers the given flowListener with this instance.
195       *
196       * @param flowListener of type FlowListener
197       */
198      void addListener( FlowListener flowListener );
199    
200      /**
201       * Method removeListener removes the given flowListener from this instance.
202       *
203       * @param flowListener of type FlowListener
204       * @return true if the listener was removed
205       */
206      boolean removeListener( FlowListener flowListener );
207    
208      /**
209       * Method hasStepListeners returns true if {@link FlowStepListener} instances have been registered
210       * with any of the {@link FlowStep}s belonging to this instance
211       *
212       * @return boolean
213       */
214      boolean hasStepListeners();
215    
216      /**
217       * Method addStepListener registers the given flowStepListener with this instance.
218       *
219       * @param flowStepListener of type addStepListener
220       */
221      void addStepListener( FlowStepListener flowStepListener );
222    
223      /**
224       * Method removeStepListener removes the given flowStepListener from this instance.
225       *
226       * @param flowStepListener of type FlowStepListener
227       * @return true if the listener was removed from all the {@link FlowStep} belonging to this instance
228       */
229      boolean removeStepListener( FlowStepListener flowStepListener );
230    
231      /**
232       * Method getSources returns the sources of this Flow object.
233       *
234       * @return the sources (type Map) of this Flow object.
235       */
236      Map<String, Tap> getSources();
237    
238      List<String> getSourceNames();
239    
240      Tap getSource( String name );
241    
242      /**
243       * Method getSourcesCollection returns a {@link Collection} of source {@link Tap}s for this Flow object.
244       *
245       * @return the sourcesCollection (type Collection<Tap>) of this Flow object.
246       */
247      Collection<Tap> getSourcesCollection();
248    
249      /**
250       * Method getSinks returns the sinks of this Flow object.
251       *
252       * @return the sinks (type Map) of this Flow object.
253       */
254      Map<String, Tap> getSinks();
255    
256      List<String> getSinkNames();
257    
258      Tap getSink( String name );
259    
260      /**
261       * Method getSinksCollection returns a {@link Collection} of sink {@link Tap}s for this Flow object.
262       *
263       * @return the sinkCollection (type Collection<Tap>) of this Flow object.
264       */
265      Collection<Tap> getSinksCollection();
266    
267      /**
268       * Method getSink returns the first sink of this Flow object.
269       *
270       * @return the sink (type Tap) of this Flow object.
271       */
272      Tap getSink();
273    
274      /**
275       * Method getTraps returns the traps of this Flow object.
276       *
277       * @return the traps (type Map<String, Tap>) of this Flow object.
278       */
279      Map<String, Tap> getTraps();
280    
281      List<String> getTrapNames();
282    
283      /**
284       * Method getTrapsCollection returns a {@link Collection} of trap {@link Tap}s for this Flow object.
285       *
286       * @return the trapsCollection (type Collection<Tap>) of this Flow object.
287       */
288      Collection<Tap> getTrapsCollection();
289    
290      /**
291       * Method getCheckpoints returns the checkpoint taps of this Flow object.
292       *
293       * @return the traps (type Map<String, Tap>) of this Flow object.
294       */
295      Map<String, Tap> getCheckpoints();
296    
297      List<String> getCheckpointNames();
298    
299      /**
300       * Method getCheckpointsCollection returns a {@link Collection} of checkpoint {@link Tap}s for this Flow object.
301       *
302       * @return the trapsCollection (type Collection<Tap>) of this Flow object.
303       */
304      Collection<Tap> getCheckpointsCollection();
305    
306      /**
307       * Method getFlowSkipStrategy returns the current {@link cascading.flow.FlowSkipStrategy} used by this Flow.
308       *
309       * @return FlowSkipStrategy
310       */
311      FlowSkipStrategy getFlowSkipStrategy();
312    
313      /**
314       * Method setFlowSkipStrategy sets a new {@link cascading.flow.FlowSkipStrategy}, the current strategy is returned.
315       * <p/>
316       * FlowSkipStrategy instances define when a Flow instance should be skipped. The default strategy is {@link FlowSkipIfSinkNotStale}.
317       * An alternative strategy would be {@link cascading.flow.FlowSkipIfSinkExists}.
318       * <p/>
319       * A FlowSkipStrategy will not be consulted when executing a Flow directly through {@link #start()} or {@link #complete()}. Only
320       * when the Flow is executed through a {@link cascading.cascade.Cascade} instance.
321       *
322       * @param flowSkipStrategy of type FlowSkipStrategy
323       * @return FlowSkipStrategy
324       */
325      FlowSkipStrategy setFlowSkipStrategy( FlowSkipStrategy flowSkipStrategy );
326    
327      /**
328       * Method isSkipFlow returns true if the parent {@link cascading.cascade.Cascade} should skip this Flow instance. True is returned
329       * if the current {@link cascading.flow.FlowSkipStrategy} returns true.
330       *
331       * @return the skipFlow (type boolean) of this Flow object.
332       * @throws IOException when
333       */
334      boolean isSkipFlow() throws IOException;
335    
336      /**
337       * Method areSinksStale returns true if any of the sinks referenced are out of date in relation to the sources. Or
338       * if any sink method {@link cascading.tap.Tap#isReplace()} returns true.
339       *
340       * @return boolean
341       * @throws java.io.IOException when
342       */
343      boolean areSinksStale() throws IOException;
344    
345      /**
346       * Method areSourcesNewer returns true if any source is newer than the given sinkModified date value.
347       *
348       * @param sinkModified of type long
349       * @return boolean
350       * @throws java.io.IOException when
351       */
352      boolean areSourcesNewer( long sinkModified ) throws IOException;
353    
354      /**
355       * Method getSinkModified returns the youngest modified date of any sink {@link cascading.tap.Tap} managed by this Flow instance.
356       * <p/>
357       * If zero (0) is returned, at least one of the sink resources does not exist. If minus one (-1) is returned,
358       * atleast one of the sinks are marked for delete ({@link cascading.tap.Tap#isReplace() returns true}).
359       *
360       * @return the sinkModified (type long) of this Flow object.
361       * @throws java.io.IOException when
362       */
363      long getSinkModified() throws IOException;
364    
365      /**
366       * Returns the current {@link FlowStepStrategy} instance.
367       *
368       * @return FlowStepStrategy
369       */
370      FlowStepStrategy getFlowStepStrategy();
371    
372      /**
373       * Sets a default {@link FlowStepStrategy} instance.
374       * <p/>
375       * Use a FlowStepStrategy to change {@link cascading.flow.FlowStep} configuration properties
376       * before the properties are submitted to the underlying platform for the step
377       * unit of work.
378       *
379       * @param flowStepStrategy The FlowStepStrategy to use.
380       */
381      void setFlowStepStrategy( FlowStepStrategy flowStepStrategy );
382    
383      /**
384       * Method getFlowSteps returns the flowSteps of this Flow object. They will be in topological order.
385       *
386       * @return the steps (type List<FlowStep>) of this Flow object.
387       */
388      List<FlowStep<Config>> getFlowSteps();
389    
390      /**
391       * Method openSource opens the first source Tap.
392       *
393       * @return TupleIterator
394       * @throws IOException when
395       */
396      TupleEntryIterator openSource() throws IOException;
397    
398      /**
399       * Method openSource opens the named source Tap.
400       *
401       * @param name of type String
402       * @return TupleIterator
403       * @throws IOException when
404       */
405      TupleEntryIterator openSource( String name ) throws IOException;
406    
407      /**
408       * Method openSink opens the first sink Tap.
409       *
410       * @return TupleIterator
411       * @throws IOException when
412       */
413      TupleEntryIterator openSink() throws IOException;
414    
415      /**
416       * Method openSink opens the named sink Tap.
417       *
418       * @param name of type String
419       * @return TupleIterator
420       * @throws IOException when
421       */
422      TupleEntryIterator openSink( String name ) throws IOException;
423    
424      /**
425       * Method openTrap opens the first trap Tap.
426       *
427       * @return TupleIterator
428       * @throws IOException when
429       */
430      TupleEntryIterator openTrap() throws IOException;
431    
432      /**
433       * Method openTrap opens the named trap Tap.
434       *
435       * @param name of type String
436       * @return TupleIterator
437       * @throws IOException when
438       */
439      TupleEntryIterator openTrap( String name ) throws IOException;
440    
441      /**
442       * Method resourceExists returns true if the resource represented by the given Tap instance exists.
443       *
444       * @param tap of type Tap
445       * @return boolean
446       * @throws IOException when
447       */
448      boolean resourceExists( Tap tap ) throws IOException;
449    
450      /**
451       * Method openTapForRead return a {@link cascading.tuple.TupleEntryIterator} for the given Tap instance.
452       * <p/>
453       * Note the returned iterator will return the same instance of {@link cascading.tuple.TupleEntry} on every call,
454       * thus a copy must be made of either the TupleEntry or the underlying {@code Tuple} instance if they are to be
455       * stored in a Collection.
456       *
457       * @param tap of type Tap
458       * @return TupleIterator
459       * @throws IOException when there is an error opening the resource
460       */
461      TupleEntryIterator openTapForRead( Tap tap ) throws IOException;
462    
463      /**
464       * Method openTapForWrite returns a (@link TupleCollector} for the given Tap instance.
465       *
466       * @param tap of type Tap
467       * @return TupleCollector
468       * @throws IOException when there is an error opening the resource
469       */
470      TupleEntryCollector openTapForWrite( Tap tap ) throws IOException;
471    
472      /**
473       * Method writeDOT writes this Flow instance to the given filename as a DOT file for import into a graphics package.
474       *
475       * @param filename of type String
476       */
477      void writeDOT( String filename );
478    
479      /**
480       * Method writeStepsDOT writes this Flow step graph to the given filename as a DOT file for import into a graphics package.
481       *
482       * @param filename of type String
483       */
484      void writeStepsDOT( String filename );
485    
486      String getCascadeID();
487    
488      String getRunID();
489    
490      PlatformInfo getPlatformInfo();
491    
492      /**
493       * Method jobsAreLocal returns true if all jobs are executed in-process as a single map and reduce task.
494       *
495       * @return boolean
496       */
497      boolean stepsAreLocal();
498    
499      /**
500       * Method isStopJobsOnExit returns the stopJobsOnExit of this Flow object. Defaults to {@code true}.
501       *
502       * @return the stopJobsOnExit (type boolean) of this Flow object.
503       */
504      boolean isStopJobsOnExit();
505      }