001/*
002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow;
022
023import java.io.IOException;
024import java.util.Collection;
025import java.util.Collections;
026import java.util.HashMap;
027import java.util.Map;
028
029import cascading.flow.stream.duct.DuctException;
030import cascading.tap.Tap;
031import cascading.tuple.TupleEntryCollector;
032import cascading.tuple.TupleEntryIterator;
033
034/**
035 * FlowProcess implementations provide a call-back interface into the current computing system. Each
036 * {@link cascading.operation.Operation} is given a reference to a particular implementation, allowing it
037 * to get configuration properties, send a "keep alive" ping, or to set a counter value.
038 * <p/>
039 * Depending on the underlying system, FlowProcess instances are not continuous across all operations in a {@link Flow}.
040 * Thus, a call to {@link #increment(Enum, long)} may start incrementing from zero if the operation making the call
041 * belongs to a subsequent 'job' or 'step' from any previous operations calling increment.
042 * <p/>
043 * A FlowProcess is roughly a child of {@link FlowSession}. FlowSession is roughly one to one with a particular {@link Flow}.
044 * And every FlowSession will have one or more FlowProcesses.
045 *
046 * @see FlowSession
047 */
048public abstract class FlowProcess<Config>
049  {
050  /** Field NULL is a noop implementation of FlowSession. */
051  public static FlowProcess NULL = new NullFlowProcess();
052
053  public static class NullFlowProcess extends FlowProcess<Object>
054    {
055    protected NullFlowProcess()
056      {
057      }
058
059    @Override
060    public FlowProcess copyWith( Object object )
061      {
062      return new NullFlowProcess();
063      }
064
065    public Object getProperty( String key )
066      {
067      return null;
068      }
069
070    @Override
071    public Collection<String> getPropertyKeys()
072      {
073      return Collections.EMPTY_SET;
074      }
075
076    @Override
077    public Object newInstance( String className )
078      {
079      return null;
080      }
081
082    public void keepAlive()
083      {
084      }
085
086    public void increment( Enum counter, long amount )
087      {
088      }
089
090    public void increment( String group, String counter, long amount )
091      {
092      }
093
094    @Override
095    public long getCounterValue( Enum counter )
096      {
097      return 0;
098      }
099
100    @Override
101    public long getCounterValue( String group, String counter )
102      {
103      return 0;
104      }
105
106    public void setStatus( String status )
107      {
108      }
109
110    @Override
111    public boolean isCounterStatusInitialized()
112      {
113      return true;
114      }
115
116    @Override
117    public int getNumProcessSlices()
118      {
119      return 1;
120      }
121
122    @Override
123    public int getCurrentSliceNum()
124      {
125      return 0;
126      }
127
128    public TupleEntryIterator openTapForRead( Tap tap ) throws IOException
129      {
130      return tap.openForRead( this );
131      }
132
133    public TupleEntryCollector openTapForWrite( Tap tap ) throws IOException
134      {
135      return tap.openForWrite( this );
136      }
137
138    @Override
139    public TupleEntryCollector openTrapForWrite( Tap trap ) throws IOException
140      {
141      return trap.openForWrite( this );
142      }
143
144    @Override
145    public TupleEntryCollector openSystemIntermediateForWrite() throws IOException
146      {
147      return null;
148      }
149
150    @Override
151    public Object getConfig()
152      {
153      return null;
154      }
155
156    @Override
157    public Object getConfigCopy()
158      {
159      return null;
160      }
161
162    @Override
163    public Object copyConfig( Object config )
164      {
165      return config;
166      }
167
168    @Override
169    public Map<String, String> diffConfigIntoMap( Object defaultConfig, Object updatedConfig )
170      {
171      return null;
172      }
173
174    @Override
175    public Object mergeMapIntoConfig( Object defaultConfig, Map<String, String> map )
176      {
177      return null;
178      }
179    }
180
181  private FlowSession currentSession = FlowSession.NULL;
182  private Map<Tap, TupleEntryCollector> trapCollectors;
183
184  protected FlowProcess()
185    {
186    }
187
188  protected FlowProcess( FlowSession currentSession )
189    {
190    setCurrentSession( currentSession );
191    }
192
193  /**
194   * Copy constructor.
195   * <p/>
196   * Shares the underlying trap collector collection across copies to avoid a static collection.
197   *
198   * @param flowProcess
199   */
200  protected FlowProcess( FlowProcess<Config> flowProcess )
201    {
202    setCurrentSession( flowProcess.getCurrentSession() );
203
204    // lazy initialize trap collectors collection and share across copies
205    this.trapCollectors = flowProcess.getTrapCollectors();
206    }
207
208  public abstract FlowProcess<Config> copyWith( Config config );
209
210  /**
211   * Method getID() returns the current
212   *
213   * @return of type String
214   */
215  public String getID()
216    {
217    return getStringProperty( FlowStep.CASCADING_FLOW_STEP_ID );
218    }
219
220  /**
221   * Method getCurrentSession returns the currentSession of this FlowProcess object.
222   *
223   * @return the currentSession (type FlowSession) of this FlowProcess object.
224   */
225  public FlowSession getCurrentSession()
226    {
227    return currentSession;
228    }
229
230  /**
231   * Method setCurrentSession sets the currentSession of this FlowProcess object.
232   *
233   * @param currentSession the currentSession of this FlowProcess object.
234   */
235  public void setCurrentSession( FlowSession currentSession )
236    {
237    this.currentSession = currentSession;
238
239    currentSession.setCurrentProcess( this );
240    }
241
242  /**
243   * Method getNumProcessSlices returns the number of parallel slices or tasks allocated
244   * for this process execution.
245   * <p/>
246   * For MapReduce platforms, this is the same as the number of tasks for a given MapReduce job.
247   *
248   * @return an int
249   */
250  public abstract int getNumProcessSlices();
251
252  /**
253   * Method getCurrentSliceNum returns an integer representing which slice instance currently running.
254   * <p/>
255   * {@code 0} (zero) is the first slice instance.
256   *
257   * @return an int
258   */
259  public abstract int getCurrentSliceNum();
260
261  /**
262   * Method getProperty should be used to return configuration parameters from the underlying system.
263   * <p/>
264   * In the case of Hadoop, the current Configuration will be queried.
265   *
266   * @param key of type String
267   * @return an Object
268   */
269  public abstract Object getProperty( String key );
270
271  /**
272   * Method getStringProperty should be used to return configuration parameters from the underlying system.
273   * <p/>
274   * In the case of Hadoop, the current Configuration will be queried.
275   *
276   * @param key of type String,
277   * @return null if property is not set
278   */
279  public String getStringProperty( String key )
280    {
281    Object value = getProperty( key );
282
283    if( value == null )
284      return null;
285
286    return value.toString();
287    }
288
289  /**
290   * Method getStringProperty should be used to return configuration parameters from the underlying system.
291   * <p/>
292   * In the case of Hadoop, the current Configuration will be queried.
293   *
294   * @param key          of type String,
295   * @param defaultValue of type String,
296   * @return {@code defaultValue} if property is not set
297   */
298  public String getStringProperty( String key, String defaultValue )
299    {
300    Object value = getProperty( key );
301
302    if( value == null )
303      return defaultValue;
304
305    return value.toString();
306    }
307
308  /**
309   * Method getIntegerProperty should be used to return configuration parameters from the underlying system.
310   * <p/>
311   * In the case of Hadoop, the current Configuration will be queried.
312   *
313   * @param key of type String,
314   * @return null if property is not set
315   */
316  public Integer getIntegerProperty( String key )
317    {
318    String value = getStringProperty( key );
319
320    if( value == null || value.isEmpty() )
321      return null;
322
323    return Integer.valueOf( value );
324    }
325
326  /**
327   * Method getIntegerProperty should be used to return configuration parameters from the underlying system.
328   * <p/>
329   * In the case of Hadoop, the current Configuration will be queried.
330   *
331   * @param key          of type String,
332   * @param defaultValue of type int,
333   * @return {@code defaultValue} if property is not set
334   */
335  public int getIntegerProperty( String key, int defaultValue )
336    {
337    String value = getStringProperty( key );
338
339    if( value == null || value.isEmpty() )
340      return defaultValue;
341
342    return Integer.valueOf( value );
343    }
344
345  /**
346   * Method getBooleanProperty should be used to return configuration parameters from the underlying system.
347   * <p/>
348   * In the case of Hadoop, the current Configuration will be queried.
349   *
350   * @param key of type Boolean, null if property is not set
351   * @return an Object
352   */
353  public Boolean getBooleanProperty( String key )
354    {
355    String value = getStringProperty( key );
356
357    if( value == null || value.isEmpty() )
358      return null;
359
360    return Boolean.valueOf( value );
361    }
362
363  /**
364   * Method getBooleanProperty should be used to return configuration parameters from the underlying system.
365   * <p/>
366   * In the case of Hadoop, the current Configuration will be queried.
367   *
368   * @param key          of type String
369   * @param defaultValue of type boolean
370   * @return {@code defaultValue} if property is not set
371   */
372  public boolean getBooleanProperty( String key, boolean defaultValue )
373    {
374    String value = getStringProperty( key );
375
376    if( value == null || value.isEmpty() )
377      return defaultValue;
378
379    return Boolean.valueOf( value );
380    }
381
382  /**
383   * Method getPropertyKeys returns an immutable collection of all available property key values.
384   *
385   * @return a Collection<String>
386   */
387  public abstract Collection<String> getPropertyKeys();
388
389  /**
390   * Method newInstance creates a new object instance from the given className argument delegating to any
391   * platform specific instantiation and configuration routines.
392   *
393   * @param className
394   * @return an instance of className
395   */
396  public abstract Object newInstance( String className );
397
398  /**
399   * Method keepAlive notifies the system that the current process is still alive. Use this method if a particular
400   * {@link cascading.operation.Operation} takes some moments to complete. Each system is different, so calling
401   * ping every few seconds to every minute or so would be best.
402   * <p/>
403   * This method will fail silently if the underlying mechanism to notify keepAlive status are not initialized.
404   */
405  public abstract void keepAlive();
406
407  /**
408   * Method increment is used to increment a custom counter. Counters must be of type Enum. The amount
409   * to increment must be a integer value.
410   * <p/>
411   * This method will fail if the underlying counter infrastructure is unavailable. See {@link #isCounterStatusInitialized()}.
412   *
413   * @param counter of type Enum
414   * @param amount  of type int
415   */
416  public abstract void increment( Enum counter, long amount );
417
418  /**
419   * Method increment is used to increment a custom counter. The amount to increment must be a integer value.
420   * <p/>
421   * This method will fail if the underlying counter infrastructure is unavailable. See {@link #isCounterStatusInitialized()}.
422   *
423   * @param group   of type String
424   * @param counter of type String
425   * @param amount  of type int
426   */
427  public abstract void increment( String group, String counter, long amount );
428
429  /**
430   * Method getCounterValue is used to retrieve a counter value.
431   * <p/>
432   * This method will fail if the underlying counter infrastructure is unavailable. See {@link #isCounterStatusInitialized()}.
433   *
434   * @param counter of type Enum
435   */
436  public abstract long getCounterValue( Enum counter );
437
438  /**
439   * Method getCounterValue is used to retrieve a counter value.
440   * <p/>
441   * This method will fail if the underlying counter infrastructure is unavailable. See {@link #isCounterStatusInitialized()}.
442   *
443   * @param group   of type String
444   * @param counter of type String
445   */
446  public abstract long getCounterValue( String group, String counter );
447
448  /**
449   * Method setStatus is used to set the status of the current operation.
450   * <p/>
451   * This method will fail if the underlying counter infrastructure is unavailable. See {@link #isCounterStatusInitialized()}.
452   *
453   * @param status of type String
454   */
455  public abstract void setStatus( String status );
456
457  /**
458   * Method isCounterStatusInitialized returns true if it is safe to increment a counter or set a status.
459   *
460   * @return boolean
461   */
462  public abstract boolean isCounterStatusInitialized();
463
464  /**
465   * Method openTapForRead return a {@link cascading.tuple.TupleEntryIterator} for the given Tap instance.
466   * <p/>
467   * Note the returned iterator will return the same instance of {@link cascading.tuple.TupleEntry} on every call,
468   * thus a copy must be made of either the TupleEntry or the underlying {@code Tuple} instance if they are to be
469   * stored in a Collection.
470   *
471   * @param tap of type Tap
472   * @return TupleIterator
473   * @throws java.io.IOException when there is a failure opening the resource
474   */
475  public abstract TupleEntryIterator openTapForRead( Tap tap ) throws IOException;
476
477  /**
478   * Method openTapForWrite returns a (@link TupleCollector} for the given Tap instance.
479   *
480   * @param tap of type Tap
481   * @return TupleCollector
482   * @throws java.io.IOException when there is a failure opening the resource
483   */
484  public abstract TupleEntryCollector openTapForWrite( Tap tap ) throws IOException;
485
486  /**
487   * Method openTrapForWrite returns a (@link TupleCollector} for the given Tap instance.
488   *
489   * @param trap of type Tap
490   * @return TupleCollector
491   * @throws java.io.IOException when there is a failure opening the resource
492   */
493  public abstract TupleEntryCollector openTrapForWrite( Tap trap ) throws IOException;
494
495  public abstract TupleEntryCollector openSystemIntermediateForWrite() throws IOException;
496
497  /**
498   * Method getConfig returns the actual instance of the underlying configuration instance.
499   * <p/>
500   * This instance should not be modified or cached, see {@link #getConfigCopy()} for a modifiable instance.
501   *
502   * @return an instance of the configuration
503   */
504  public abstract Config getConfig();
505
506  public abstract Config getConfigCopy();
507
508  public abstract <C> C copyConfig( C config );
509
510  public abstract <C> Map<String, String> diffConfigIntoMap( C defaultConfig, C updatedConfig );
511
512  public abstract Config mergeMapIntoConfig( Config defaultConfig, Map<String, String> map );
513
514  /**
515   * Method getTrapCollectorFor will return a new {@link TupleEntryCollector} if one hasn't previously
516   * been created for the given trap Tap.
517   *
518   * @param trap
519   * @return TupleEntryCollector
520   */
521  public TupleEntryCollector getTrapCollectorFor( Tap trap )
522    {
523    Map<Tap, TupleEntryCollector> trapCollectors = getTrapCollectors();
524
525    TupleEntryCollector trapCollector = trapCollectors.get( trap );
526
527    if( trapCollector == null )
528      {
529      try
530        {
531        trapCollector = openTrapForWrite( trap );
532        trapCollectors.put( trap, trapCollector );
533        }
534      catch( IOException exception )
535        {
536        throw new DuctException( exception );
537        }
538      }
539
540    return trapCollector;
541    }
542
543  protected synchronized Map<Tap, TupleEntryCollector> getTrapCollectors()
544    {
545    if( trapCollectors == null )
546      trapCollectors = Collections.synchronizedMap( new HashMap<Tap, TupleEntryCollector>() );
547
548    return trapCollectors;
549    }
550
551  public synchronized void closeTrapCollectors()
552    {
553    if( trapCollectors == null )
554      return;
555
556    for( TupleEntryCollector trapCollector : trapCollectors.values() )
557      {
558      try
559        {
560        trapCollector.close();
561        }
562      catch( Exception exception )
563        {
564        // do nothing
565        }
566      }
567
568    trapCollectors.clear();
569    }
570  }