001/*
002 * Copyright (c) 2016-2017 Chris K Wensel. All Rights Reserved.
003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
004 *
005 * Project and contact information: http://www.cascading.org/
006 *
007 * This file is part of the Cascading project.
008 *
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *     http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 */
021
022package cascading.flow;
023
024import java.io.IOException;
025import java.util.Collection;
026import java.util.List;
027import java.util.Map;
028import java.util.stream.Stream;
029
030import cascading.flow.planner.PlannerInfo;
031import cascading.flow.planner.PlatformInfo;
032import cascading.management.UnitOfWork;
033import cascading.stats.FlowStats;
034import cascading.tap.Tap;
035import cascading.tuple.Fields;
036import cascading.tuple.Tuple;
037import cascading.tuple.TupleEntry;
038import cascading.tuple.TupleEntryCollector;
039import cascading.tuple.TupleEntryIterator;
040import cascading.tuple.TupleEntryStream;
041import cascading.tuple.TupleStream;
042
043/**
044 * A Flow is a logical unit of work declared by an assembly of {@link cascading.pipe.Pipe} instances connected to source
045 * and sink {@link Tap} instances.
046 * <p>
047 * A Flow is then executed to push the incoming source data through the assembly into one or more sinks.
048 * <p>
049 * A Flow sub-class instance may not be instantiated directly in most cases, see sub-classes of {@link FlowConnector} class
050 * for supported platforms.
051 * <p>
052 * Note that {@link cascading.pipe.Pipe} assemblies can be reused in multiple Flow instances. They maintain
053 * no state regarding the Flow execution. Subsequently, {@link cascading.pipe.Pipe} assemblies can be given
054 * parameters through its calling Flow so they can be built in a generic fashion.
055 * <p>
056 * When a Flow is created, an optimized internal representation is created that is then executed
057 * on the underlying execution platform. This is typically done by creating one or more {@link FlowStep} instances.
058 * <p>
059 * Flows are submitted in order of dependency when used with a {@link cascading.cascade.Cascade}. If two or more steps do not share the
060 * same dependencies and all can be scheduled simultaneously, the {@link #getSubmitPriority()} value determines
061 * the order in which all steps will be submitted for execution. The default submit priority is 5.
062 * <p>
063 * Use the {@link FlowListener} to receive any events on the life-cycle of the Flow as it executes. Any
064 * {@link Tap} instances owned by the Flow also implementing FlowListener will automatically be added to the
065 * set of listeners.
066 *
067 * @see FlowListener
068 * @see cascading.flow.FlowConnector
069 */
070public interface Flow<Config> extends UnitOfWork<FlowStats>
071  {
072  String CASCADING_FLOW_ID = "cascading.flow.id";
073
074  /**
075   * Method getName returns the name of this Flow object.
076   *
077   * @return the name (type String) of this Flow object.
078   */
079  @Override
080  String getName();
081
082  /**
083   * Method prepare is used by a {@link cascading.cascade.Cascade} to notify the given Flow it should initialize or clear any resources
084   * necessary for {@link #start()} to be called successfully.
085   * <p>
086   * Specifically, this implementation calls {@link BaseFlow#deleteSinksIfNotUpdate()} &amp;&amp; {@link BaseFlow#deleteTrapsIfNotUpdate()}.
087   *
088   * @throws java.io.IOException when
089   */
090  @Override
091  void prepare();
092
093  /**
094   * Method start begins the execution of this Flow instance. It will return immediately. Use the method {@link #complete()}
095   * to block until this Flow completes.
096   */
097  @Override
098  void start();
099
100  /** Method stop stops all running jobs, killing any currently executing. */
101  @Override
102  void stop();
103
104  /** Method complete starts the current Flow instance if it has not be previously started, then block until completion. */
105  @Override
106  void complete();
107
108  @Override
109  void cleanup();
110
111  /**
112   * Returns any meta-data about the planner that created this Flow instance.
113   *
114   * @return an instance of PlannerInfo
115   */
116  PlannerInfo getPlannerInfo();
117
118  /**
119   * Returns any meta-data about the underlying platform this Flow instance will run against.
120   *
121   * @return an instance of PlatformInfo
122   */
123  PlatformInfo getPlatformInfo();
124
125  /**
126   * Method getConfig returns the internal configuration object.
127   * <p>
128   * Any changes to this object will not be reflected in child steps. See {@link cascading.flow.FlowConnector} for setting
129   * default properties visible to children. Or see {@link cascading.flow.FlowStepStrategy} for setting properties on
130   * individual steps before they are executed.
131   *
132   * @return the default configuration of this Flow
133   */
134  Config getConfig();
135
136  /**
137   * Method getConfigCopy returns a copy of the internal configuration object. This object can be safely
138   * modified.
139   *
140   * @return a copy of the default configuration of this Flow
141   */
142  Config getConfigCopy();
143
144  /**
145   * Method getConfigAsProperties converts the internal configuration object into a {@link java.util.Map} of
146   * key value pairs.
147   *
148   * @return a Map of key/value pairs
149   */
150  Map<Object, Object> getConfigAsProperties();
151
152  /**
153   * Returns the String property associated with the given key from the current Configuration instance.
154   *
155   * @param key of type String
156   * @return the String value
157   */
158  String getProperty( String key );
159
160  /**
161   * Method getID returns the ID of this Flow object.
162   * <p>
163   * The ID value is a long HEX String used to identify this instance globally. Subsequent Flow
164   * instances created with identical parameters will not return the same ID.
165   *
166   * @return the ID (type String) of this Flow object.
167   */
168  @Override
169  String getID();
170
171  /**
172   * Returns an immutable map of properties giving more details about the Flow object.
173   * <p>
174   * See {@link cascading.flow.FlowDef#addDescription(String, String)} to set values on a given Flow.
175   * <p>
176   * Flow descriptions provide meta-data to monitoring systems describing the workload a given Flow represents.
177   * For known description types, see {@link FlowDescriptors}.
178   *
179   * @return Map
180   */
181  Map<String, String> getFlowDescriptor();
182
183  @Override
184  String getTags();
185
186  /**
187   * Method getSubmitPriority returns the submitPriority of this Flow object.
188   * <p>
189   * 10 is lowest, 1 is the highest, 5 is the default.
190   *
191   * @return the submitPriority (type int) of this FlowStep object.
192   */
193  int getSubmitPriority();
194
195  /**
196   * Method setSubmitPriority sets the submitPriority of this Flow object.
197   * <p>
198   * 10 is lowest, 1 is the highest, 5 is the default.
199   *
200   * @param submitPriority the submitPriority of this FlowStep object.
201   */
202  void setSubmitPriority( int submitPriority );
203
204  FlowProcess<Config> getFlowProcess();
205
206  /**
207   * Method getFlowStats returns the flowStats of this Flow object.
208   *
209   * @return the flowStats (type FlowStats) of this Flow object.
210   */
211  FlowStats getFlowStats();
212
213  /**
214   * Method hasListeners returns true if {@link FlowListener} instances have been registered.
215   *
216   * @return boolean
217   */
218  boolean hasListeners();
219
220  /**
221   * Method addListener registers the given flowListener with this instance.
222   *
223   * @param flowListener of type FlowListener
224   */
225  void addListener( FlowListener flowListener );
226
227  /**
228   * Method removeListener removes the given flowListener from this instance.
229   *
230   * @param flowListener of type FlowListener
231   * @return true if the listener was removed
232   */
233  boolean removeListener( FlowListener flowListener );
234
235  /**
236   * Method hasStepListeners returns true if {@link FlowStepListener} instances have been registered
237   * with any of the {@link FlowStep}s belonging to this instance
238   *
239   * @return boolean
240   */
241  boolean hasStepListeners();
242
243  /**
244   * Method addStepListener registers the given flowStepListener with this instance.
245   *
246   * @param flowStepListener of type addStepListener
247   */
248  void addStepListener( FlowStepListener flowStepListener );
249
250  /**
251   * Method removeStepListener removes the given flowStepListener from this instance.
252   *
253   * @param flowStepListener of type FlowStepListener
254   * @return true if the listener was removed from all the {@link FlowStep} belonging to this instance
255   */
256  boolean removeStepListener( FlowStepListener flowStepListener );
257
258  /**
259   * Method getSources returns the sources of this Flow object.
260   *
261   * @return the sources (type Map) of this Flow object.
262   */
263  Map<String, Tap> getSources();
264
265  List<String> getSourceNames();
266
267  Tap getSource( String name );
268
269  /**
270   * Method getSourcesCollection returns a {@link Collection} of source {@link Tap}s for this Flow object.
271   *
272   * @return the sourcesCollection (type Collection) of this Flow object.
273   */
274  Collection<Tap> getSourcesCollection();
275
276  /**
277   * Method getSinks returns the sinks of this Flow object.
278   *
279   * @return the sinks (type Map) of this Flow object.
280   */
281  Map<String, Tap> getSinks();
282
283  List<String> getSinkNames();
284
285  Tap getSink( String name );
286
287  /**
288   * Method getSinksCollection returns a {@link Collection} of sink {@link Tap}s for this Flow object.
289   *
290   * @return the sinkCollection (type Collection) of this Flow object.
291   */
292  Collection<Tap> getSinksCollection();
293
294  /**
295   * Method getSink returns the first sink of this Flow object.
296   *
297   * @return the sink (type Tap) of this Flow object.
298   */
299  Tap getSink();
300
301  /**
302   * Method getTraps returns the traps of this Flow object.
303   *
304   * @return the traps (type Map) of this Flow object.
305   */
306  Map<String, Tap> getTraps();
307
308  List<String> getTrapNames();
309
310  /**
311   * Method getTrapsCollection returns a {@link Collection} of trap {@link Tap}s for this Flow object.
312   *
313   * @return the trapsCollection (type Collection) of this Flow object.
314   */
315  Collection<Tap> getTrapsCollection();
316
317  /**
318   * Method getCheckpoints returns the checkpoint taps of this Flow object.
319   *
320   * @return the traps (type Map) of this Flow object.
321   */
322  Map<String, Tap> getCheckpoints();
323
324  List<String> getCheckpointNames();
325
326  /**
327   * Method getCheckpointsCollection returns a {@link Collection} of checkpoint {@link Tap}s for this Flow object.
328   *
329   * @return the trapsCollection (type Collection) of this Flow object.
330   */
331  Collection<Tap> getCheckpointsCollection();
332
333  /**
334   * Method getFlowSkipStrategy returns the current {@link cascading.flow.FlowSkipStrategy} used by this Flow.
335   *
336   * @return FlowSkipStrategy
337   */
338  FlowSkipStrategy getFlowSkipStrategy();
339
340  /**
341   * Method setFlowSkipStrategy sets a new {@link cascading.flow.FlowSkipStrategy}, the current strategy is returned.
342   * <p>
343   * FlowSkipStrategy instances define when a Flow instance should be skipped. The default strategy is {@link FlowSkipIfSinkNotStale}.
344   * An alternative strategy would be {@link cascading.flow.FlowSkipIfSinkExists}.
345   * <p>
346   * A FlowSkipStrategy will not be consulted when executing a Flow directly through {@link #start()} or {@link #complete()}. Only
347   * when the Flow is executed through a {@link cascading.cascade.Cascade} instance.
348   *
349   * @param flowSkipStrategy of type FlowSkipStrategy
350   * @return FlowSkipStrategy
351   */
352  FlowSkipStrategy setFlowSkipStrategy( FlowSkipStrategy flowSkipStrategy );
353
354  /**
355   * Method isSkipFlow returns true if the parent {@link cascading.cascade.Cascade} should skip this Flow instance. True is returned
356   * if the current {@link cascading.flow.FlowSkipStrategy} returns true.
357   *
358   * @return the skipFlow (type boolean) of this Flow object.
359   * @throws IOException when
360   */
361  boolean isSkipFlow() throws IOException;
362
363  /**
364   * Method areSinksStale returns true if any of the sinks referenced are out of date in relation to the sources. Or
365   * if any sink method {@link cascading.tap.Tap#isReplace()} returns true.
366   *
367   * @return boolean
368   * @throws java.io.IOException when
369   */
370  boolean areSinksStale() throws IOException;
371
372  /**
373   * Method areSourcesNewer returns true if any source is newer than the given sinkModified date value.
374   *
375   * @param sinkModified of type long
376   * @return boolean
377   * @throws java.io.IOException when
378   */
379  boolean areSourcesNewer( long sinkModified ) throws IOException;
380
381  /**
382   * Method getSinkModified returns the youngest modified date of any sink {@link cascading.tap.Tap} managed by this Flow instance.
383   * <p>
384   * If zero (0) is returned, at least one of the sink resources does not exist. If minus one (-1) is returned,
385   * at least one of the sinks are marked for delete ({@link cascading.tap.Tap#isReplace() returns true}).
386   *
387   * @return the sinkModified (type long) of this Flow object.
388   * @throws java.io.IOException when
389   */
390  long getSinkModified() throws IOException;
391
392  /**
393   * Returns the current {@link FlowStepStrategy} instance.
394   *
395   * @return FlowStepStrategy
396   */
397  FlowStepStrategy getFlowStepStrategy();
398
399  /**
400   * Sets a default {@link FlowStepStrategy} instance.
401   * <p>
402   * Use a FlowStepStrategy to change {@link cascading.flow.FlowStep} configuration properties
403   * before the properties are submitted to the underlying platform for the step
404   * unit of work.
405   *
406   * @param flowStepStrategy The FlowStepStrategy to use.
407   */
408  void setFlowStepStrategy( FlowStepStrategy flowStepStrategy );
409
410  /**
411   * Method getFlowSteps returns the flowSteps of this Flow object. They will be in topological order.
412   *
413   * @return the steps (type List) of this Flow object.
414   */
415  List<FlowStep<Config>> getFlowSteps();
416
417  /**
418   * Method openSource opens the first source Tap.
419   *
420   * @return TupleIterator
421   * @throws IOException when
422   */
423  TupleEntryIterator openSource() throws IOException;
424
425  /**
426   * Method openSource opens the named source Tap.
427   *
428   * @param name of type String
429   * @return TupleIterator
430   * @throws IOException when
431   */
432  TupleEntryIterator openSource( String name ) throws IOException;
433
434  /**
435   * Method openSink opens the first sink Tap.
436   *
437   * @return TupleIterator
438   * @throws IOException when
439   */
440  TupleEntryIterator openSink() throws IOException;
441
442  /**
443   * Method openSink opens the named sink Tap.
444   *
445   * @param name of type String
446   * @return TupleIterator
447   * @throws IOException when
448   */
449  TupleEntryIterator openSink( String name ) throws IOException;
450
451  /**
452   * Method openTrap opens the first trap Tap.
453   *
454   * @return TupleIterator
455   * @throws IOException when
456   */
457  TupleEntryIterator openTrap() throws IOException;
458
459  /**
460   * Method openTrap opens the named trap Tap.
461   *
462   * @param name of type String
463   * @return TupleIterator
464   * @throws IOException when
465   */
466  TupleEntryIterator openTrap( String name ) throws IOException;
467
468  /**
469   * Method resourceExists returns true if the resource represented by the given Tap instance exists.
470   *
471   * @param tap of type Tap
472   * @return boolean
473   * @throws IOException when
474   */
475  boolean resourceExists( Tap tap ) throws IOException;
476
477  /**
478   * Method openTapForRead return a {@link cascading.tuple.TupleEntryIterator} for the given Tap instance.
479   * <p>
480   * Note the returned iterator will return the same instance of {@link cascading.tuple.TupleEntry} on every call,
481   * thus a copy must be made of either the TupleEntry or the underlying {@code Tuple} instance if they are to be
482   * stored in a Collection.
483   *
484   * @param tap of type Tap
485   * @return TupleIterator
486   * @throws IOException when there is an error opening the resource
487   */
488  TupleEntryIterator openTapForRead( Tap tap ) throws IOException;
489
490  /**
491   * Method openTapForWrite returns a (@link TupleCollector} for the given Tap instance.
492   *
493   * @param tap of type Tap
494   * @return TupleCollector
495   * @throws IOException when there is an error opening the resource
496   */
497  TupleEntryCollector openTapForWrite( Tap tap ) throws IOException;
498
499  /**
500   * Method writeDOT writes this Flow instance to the given filename as a DOT file for import into a graphics package.
501   *
502   * @param filename of type String
503   */
504  void writeDOT( String filename );
505
506  /**
507   * Method writeStepsDOT writes this Flow step graph to the given filename as a DOT file for import into a graphics package.
508   *
509   * @param filename of type String
510   */
511  void writeStepsDOT( String filename );
512
513  /**
514   * Returns the parent Cascade ID that owns this Flow instance.
515   *
516   * @return of type String
517   */
518  String getCascadeID();
519
520  /**
521   * Returns the run ID given when this Flow instance was defined in the FlowDef.
522   *
523   * @return of type String
524   */
525  String getRunID();
526
527  /**
528   * Method jobsAreLocal returns true if all jobs are executed in-process as a single map and reduce task.
529   *
530   * @return boolean
531   */
532  boolean stepsAreLocal();
533
534  /**
535   * Method isStopJobsOnExit returns the stopJobsOnExit of this Flow object. Defaults to {@code true}.
536   *
537   * @return the stopJobsOnExit (type boolean) of this Flow object.
538   */
539  boolean isStopJobsOnExit();
540
541  default Stream<TupleEntry> getSourceEntryStream( String name )
542    {
543    return TupleEntryStream.entryStream( getSource( name ), getFlowProcess() );
544    }
545
546  default Stream<TupleEntry> getSourceEntryStream( String name, Fields selector )
547    {
548    return TupleEntryStream.entryStream( getSource( name ), getFlowProcess(), selector );
549    }
550
551  default Stream<TupleEntry> getSourceEntryStreamCopy( String name )
552    {
553    return TupleEntryStream.entryStreamCopy( getSource( name ), getFlowProcess() );
554    }
555
556  default Stream<TupleEntry> getSourceEntryStreamCopy( String name, Fields selector )
557    {
558    return TupleEntryStream.entryStreamCopy( getSource( name ), getFlowProcess(), selector );
559    }
560
561  default Stream<Tuple> getSourceTupleStream( String name )
562    {
563    return TupleStream.tupleStream( getSource( name ), getFlowProcess() );
564    }
565
566  default Stream<Tuple> getSourceTupleStream( String name, Fields selector )
567    {
568    return TupleStream.tupleStream( getSource( name ), getFlowProcess(), selector );
569    }
570
571  default Stream<Tuple> getSourceTupleStreamCopy( String name )
572    {
573    return TupleStream.tupleStream( getSource( name ), getFlowProcess() );
574    }
575
576  default Stream<Tuple> getSourceTupleStreamCopy( String name, Fields selector )
577    {
578    return TupleStream.tupleStream( getSource( name ), getFlowProcess(), selector );
579    }
580
581  default Stream<TupleEntry> getSinkEntryStream()
582    {
583    return TupleEntryStream.entryStream( getSink(), getFlowProcess() );
584    }
585
586  default Stream<TupleEntry> getSinkEntryStream( Fields selector )
587    {
588    return TupleEntryStream.entryStream( getSink(), getFlowProcess(), selector );
589    }
590
591  default Stream<TupleEntry> getSinkEntryStreamCopy()
592    {
593    return TupleEntryStream.entryStreamCopy( getSink(), getFlowProcess() );
594    }
595
596  default Stream<TupleEntry> getSinkEntryStreamCopy( Fields selector )
597    {
598    return TupleEntryStream.entryStreamCopy( getSink(), getFlowProcess(), selector );
599    }
600
601  default Stream<Tuple> getSinkTupleStream()
602    {
603    return TupleStream.tupleStream( getSink(), getFlowProcess() );
604    }
605
606  default Stream<Tuple> getSinkTupleStream( Fields selector )
607    {
608    return TupleStream.tupleStream( getSink(), getFlowProcess(), selector );
609    }
610
611  default Stream<Tuple> getSinkTupleStreamCopy()
612    {
613    return TupleStream.tupleStream( getSink(), getFlowProcess() );
614    }
615
616  default Stream<Tuple> getSinkTupleStreamCopy( Fields selector )
617    {
618    return TupleStream.tupleStream( getSink(), getFlowProcess(), selector );
619    }
620
621  default Stream<TupleEntry> getSinkEntryStream( String name )
622    {
623    return TupleEntryStream.entryStream( getSink( name ), getFlowProcess() );
624    }
625
626  default Stream<TupleEntry> getSinkEntryStream( String name, Fields selector )
627    {
628    return TupleEntryStream.entryStream( getSink( name ), getFlowProcess(), selector );
629    }
630
631  default Stream<TupleEntry> getSinkEntryStreamCopy( String name )
632    {
633    return TupleEntryStream.entryStreamCopy( getSink( name ), getFlowProcess() );
634    }
635
636  default Stream<TupleEntry> getSinkEntryStreamCopy( String name, Fields selector )
637    {
638    return TupleEntryStream.entryStreamCopy( getSink( name ), getFlowProcess(), selector );
639    }
640
641  default Stream<Tuple> getSinkTupleStream( String name )
642    {
643    return TupleStream.tupleStream( getSink( name ), getFlowProcess() );
644    }
645
646  default Stream<Tuple> getSinkTupleStream( String name, Fields selector )
647    {
648    return TupleStream.tupleStream( getSink( name ), getFlowProcess(), selector );
649    }
650
651  default Stream<Tuple> getSinkTupleStreamCopy( String name )
652    {
653    return TupleStream.tupleStream( getSink( name ), getFlowProcess() );
654    }
655
656  default Stream<Tuple> getSinkTupleStreamCopy( String name, Fields selector )
657    {
658    return TupleStream.tupleStream( getSink( name ), getFlowProcess(), selector );
659    }
660  }