001/*
002 * Copyright (c) 2016-2017 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
004 *
005 * Project and contact information: http://www.cascading.org/
006 *
007 * This file is part of the Cascading project.
008 *
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *     http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 */
021
022package cascading.flow;
023
024import java.io.IOException;
025import java.util.Collection;
026import java.util.Collections;
027import java.util.HashMap;
028import java.util.Map;
029
030import cascading.flow.stream.duct.DuctException;
031import cascading.tap.Tap;
032import cascading.tap.type.FileType;
033import cascading.tuple.TupleEntryCollector;
034import cascading.tuple.TupleEntryIterator;
035
036/**
037 * FlowProcess implementations provide a call-back interface into the current computing system. Each
038 * {@link cascading.operation.Operation} is given a reference to a particular implementation, allowing it
039 * to get configuration properties, send a "keep alive" ping, or to set a counter value.
040 * <p>
041 * Depending on the underlying system, FlowProcess instances are not continuous across all operations in a {@link Flow}.
042 * Thus, a call to {@link #increment(Enum, long)} may start incrementing from zero if the operation making the call
043 * belongs to a subsequent 'job' or 'step' from any previous operations calling increment.
044 * <p>
045 * A FlowProcess is roughly a child of {@link FlowSession}. FlowSession is roughly one to one with a particular {@link Flow}.
046 * And every FlowSession will have one or more FlowProcesses.
047 *
048 * @see FlowSession
049 */
050public abstract class FlowProcess<Config>
051  {
052  /** Field NULL is a noop implementation of FlowProcess. */
053  @SuppressWarnings("StaticInitializerReferencesSubClass")
054  public static FlowProcess NULL = new NullFlowProcess();
055
056  /**
057   * A noop implementation of a FlowProcess.
058   *
059   * @return A noop implementation of a FlowProcess.
060   */
061  @SuppressWarnings("unchecked")
062  public static <C> FlowProcess<C> nullFlowProcess()
063    {
064    return (FlowProcess<C>) NULL;
065    }
066
067  public static class NullFlowProcess extends FlowProcess<Object>
068    {
069    protected NullFlowProcess()
070      {
071      }
072
073    @Override
074    public FlowProcess copyWith( Object object )
075      {
076      return new NullFlowProcess();
077      }
078
079    public Object getProperty( String key )
080      {
081      return null;
082      }
083
084    @Override
085    public Collection<String> getPropertyKeys()
086      {
087      return Collections.EMPTY_SET;
088      }
089
090    @Override
091    public Object newInstance( String className )
092      {
093      return null;
094      }
095
096    public void keepAlive()
097      {
098      }
099
100    public void increment( Enum counter, long amount )
101      {
102      }
103
104    public void increment( String group, String counter, long amount )
105      {
106      }
107
108    @Override
109    public long getCounterValue( Enum counter )
110      {
111      return 0;
112      }
113
114    @Override
115    public long getCounterValue( String group, String counter )
116      {
117      return 0;
118      }
119
120    public void setStatus( String status )
121      {
122      }
123
124    @Override
125    public boolean isCounterStatusInitialized()
126      {
127      return true;
128      }
129
130    @Override
131    public int getNumProcessSlices()
132      {
133      return 1;
134      }
135
136    @Override
137    public int getCurrentSliceNum()
138      {
139      return 0;
140      }
141
142    public TupleEntryIterator openTapForRead( Tap tap ) throws IOException
143      {
144      return tap.openForRead( this );
145      }
146
147    public TupleEntryCollector openTapForWrite( Tap tap ) throws IOException
148      {
149      return tap.openForWrite( this );
150      }
151
152    @Override
153    public TupleEntryCollector openTrapForWrite( Tap trap ) throws IOException
154      {
155      return trap.openForWrite( this );
156      }
157
158    @Override
159    public TupleEntryCollector openSystemIntermediateForWrite() throws IOException
160      {
161      return null;
162      }
163
164    @Override
165    public Object getConfig()
166      {
167      return null;
168      }
169
170    @Override
171    public Object getConfigCopy()
172      {
173      return null;
174      }
175
176    @Override
177    public Object copyConfig( Object config )
178      {
179      return config;
180      }
181
182    @Override
183    public Map<String, String> diffConfigIntoMap( Object defaultConfig, Object updatedConfig )
184      {
185      return null;
186      }
187
188    @Override
189    public Object mergeMapIntoConfig( Object defaultConfig, Map<String, String> map )
190      {
191      return null;
192      }
193    }
194
195  public class FlowProcessContext
196    {
197    String sourcePath;
198
199    public String getSourcePath()
200      {
201      return FlowProcess.this.getStringProperty( FileType.CASCADING_SOURCE_PATH, sourcePath );
202      }
203
204    public void setSourcePath( String sourcePath )
205      {
206      this.sourcePath = sourcePath;
207      }
208    }
209
210  private FlowSession currentSession = FlowSession.NULL;
211  private Map<Tap, TupleEntryCollector> trapCollectors;
212  private FlowProcessContext flowProcessContext = new FlowProcessContext();
213
214  protected FlowProcess()
215    {
216    }
217
218  protected FlowProcess( FlowSession currentSession )
219    {
220    setCurrentSession( currentSession );
221    }
222
223  /**
224   * Copy constructor.
225   * <p>
226   * Shares the underlying trap collector collection across copies to avoid a static collection.
227   *
228   * @param flowProcess
229   */
230  protected FlowProcess( FlowProcess<Config> flowProcess )
231    {
232    setCurrentSession( flowProcess.getCurrentSession() );
233
234    // lazy initialize trap collectors collection and share across copies
235    this.trapCollectors = flowProcess.getTrapCollectors();
236    this.flowProcessContext = flowProcess.getFlowProcessContext();
237    }
238
239  public FlowProcessContext getFlowProcessContext()
240    {
241    return flowProcessContext;
242    }
243
244  public abstract FlowProcess<Config> copyWith( Config config );
245
246  /**
247   * Method getID() returns the current
248   *
249   * @return of type String
250   */
251  public String getID()
252    {
253    return getStringProperty( FlowStep.CASCADING_FLOW_STEP_ID );
254    }
255
256  /**
257   * Method getCurrentSession returns the currentSession of this FlowProcess object.
258   *
259   * @return the currentSession (type FlowSession) of this FlowProcess object.
260   */
261  public FlowSession getCurrentSession()
262    {
263    return currentSession;
264    }
265
266  /**
267   * Method setCurrentSession sets the currentSession of this FlowProcess object.
268   *
269   * @param currentSession the currentSession of this FlowProcess object.
270   */
271  public void setCurrentSession( FlowSession currentSession )
272    {
273    this.currentSession = currentSession;
274
275    currentSession.setCurrentProcess( this );
276    }
277
278  /**
279   * Method getNumProcessSlices returns the number of parallel slices or tasks allocated
280   * for this process execution.
281   * <p>
282   * For MapReduce platforms, this is the same as the number of tasks for a given MapReduce job.
283   *
284   * @return an int
285   */
286  public abstract int getNumProcessSlices();
287
288  /**
289   * Method getCurrentSliceNum returns an integer representing which slice instance currently running.
290   * <p>
291   * {@code 0} (zero) is the first slice instance.
292   *
293   * @return an int
294   */
295  public abstract int getCurrentSliceNum();
296
297  /**
298   * Method getProperty should be used to return configuration parameters from the underlying system.
299   * <p>
300   * In the case of Hadoop, the current Configuration will be queried.
301   *
302   * @param key of type String
303   * @return an Object
304   */
305  public abstract Object getProperty( String key );
306
307  /**
308   * Method getStringProperty should be used to return configuration parameters from the underlying system.
309   * <p>
310   * In the case of Hadoop, the current Configuration will be queried.
311   *
312   * @param key of type String,
313   * @return null if property is not set
314   */
315  public String getStringProperty( String key )
316    {
317    Object value = getProperty( key );
318
319    if( value == null )
320      return null;
321
322    return value.toString();
323    }
324
325  /**
326   * Method getStringProperty should be used to return configuration parameters from the underlying system.
327   * <p>
328   * In the case of Hadoop, the current Configuration will be queried.
329   *
330   * @param key          of type String,
331   * @param defaultValue of type String,
332   * @return {@code defaultValue} if property is not set
333   */
334  public String getStringProperty( String key, String defaultValue )
335    {
336    Object value = getProperty( key );
337
338    if( value == null )
339      return defaultValue;
340
341    return value.toString();
342    }
343
344  /**
345   * Method getIntegerProperty should be used to return configuration parameters from the underlying system.
346   * <p>
347   * In the case of Hadoop, the current Configuration will be queried.
348   *
349   * @param key of type String,
350   * @return null if property is not set
351   */
352  public Integer getIntegerProperty( String key )
353    {
354    String value = getStringProperty( key );
355
356    if( value == null || value.isEmpty() )
357      return null;
358
359    return Integer.valueOf( value );
360    }
361
362  /**
363   * Method getIntegerProperty should be used to return configuration parameters from the underlying system.
364   * <p>
365   * In the case of Hadoop, the current Configuration will be queried.
366   *
367   * @param key          of type String,
368   * @param defaultValue of type int,
369   * @return {@code defaultValue} if property is not set
370   */
371  public int getIntegerProperty( String key, int defaultValue )
372    {
373    String value = getStringProperty( key );
374
375    if( value == null || value.isEmpty() )
376      return defaultValue;
377
378    return Integer.valueOf( value );
379    }
380
381  /**
382   * Method getBooleanProperty should be used to return configuration parameters from the underlying system.
383   * <p>
384   * In the case of Hadoop, the current Configuration will be queried.
385   *
386   * @param key of type Boolean, null if property is not set
387   * @return an Object
388   */
389  public Boolean getBooleanProperty( String key )
390    {
391    String value = getStringProperty( key );
392
393    if( value == null || value.isEmpty() )
394      return null;
395
396    return Boolean.valueOf( value );
397    }
398
399  /**
400   * Method getBooleanProperty should be used to return configuration parameters from the underlying system.
401   * <p>
402   * In the case of Hadoop, the current Configuration will be queried.
403   *
404   * @param key          of type String
405   * @param defaultValue of type boolean
406   * @return {@code defaultValue} if property is not set
407   */
408  public boolean getBooleanProperty( String key, boolean defaultValue )
409    {
410    String value = getStringProperty( key );
411
412    if( value == null || value.isEmpty() )
413      return defaultValue;
414
415    return Boolean.valueOf( value );
416    }
417
418  /**
419   * Method getPropertyKeys returns an immutable collection of all available property key values.
420   *
421   * @return a Collection
422   */
423  public abstract Collection<String> getPropertyKeys();
424
425  /**
426   * Method newInstance creates a new object instance from the given className argument delegating to any
427   * platform specific instantiation and configuration routines.
428   *
429   * @param className
430   * @return an instance of className
431   */
432  public abstract Object newInstance( String className );
433
434  /**
435   * Method keepAlive notifies the system that the current process is still alive. Use this method if a particular
436   * {@link cascading.operation.Operation} takes some moments to complete. Each system is different, so calling
437   * ping every few seconds to every minute or so would be best.
438   * <p>
439   * This method will fail silently if the underlying mechanism to notify keepAlive status are not initialized.
440   */
441  public abstract void keepAlive();
442
443  /**
444   * Method increment is used to increment a custom counter. Counters must be of type Enum. The amount
445   * to increment must be a integer value.
446   * <p>
447   * This method will fail if the underlying counter infrastructure is unavailable. See {@link #isCounterStatusInitialized()}.
448   *
449   * @param counter of type Enum
450   * @param amount  of type int
451   */
452  public abstract void increment( Enum counter, long amount );
453
454  /**
455   * Method increment is used to increment a custom counter. The amount to increment must be a integer value.
456   * <p>
457   * This method will fail if the underlying counter infrastructure is unavailable. See {@link #isCounterStatusInitialized()}.
458   *
459   * @param group   of type String
460   * @param counter of type String
461   * @param amount  of type int
462   */
463  public abstract void increment( String group, String counter, long amount );
464
465  /**
466   * Method getCounterValue is used to retrieve a counter value.
467   * <p>
468   * This method will fail if the underlying counter infrastructure is unavailable. See {@link #isCounterStatusInitialized()}.
469   *
470   * @param counter of type Enum
471   */
472  public abstract long getCounterValue( Enum counter );
473
474  /**
475   * Method getCounterValue is used to retrieve a counter value.
476   * <p>
477   * This method will fail if the underlying counter infrastructure is unavailable. See {@link #isCounterStatusInitialized()}.
478   *
479   * @param group   of type String
480   * @param counter of type String
481   */
482  public abstract long getCounterValue( String group, String counter );
483
484  /**
485   * Method setStatus is used to set the status of the current operation.
486   * <p>
487   * This method will fail if the underlying counter infrastructure is unavailable. See {@link #isCounterStatusInitialized()}.
488   *
489   * @param status of type String
490   */
491  public abstract void setStatus( String status );
492
493  /**
494   * Method isCounterStatusInitialized returns true if it is safe to increment a counter or set a status.
495   *
496   * @return boolean
497   */
498  public abstract boolean isCounterStatusInitialized();
499
500  /**
501   * Method openTapForRead return a {@link cascading.tuple.TupleEntryIterator} for the given Tap instance.
502   * <p>
503   * Note the returned iterator will return the same instance of {@link cascading.tuple.TupleEntry} on every call,
504   * thus a copy must be made of either the TupleEntry or the underlying {@code Tuple} instance if they are to be
505   * stored in a Collection.
506   *
507   * @param tap of type Tap
508   * @return TupleIterator
509   * @throws java.io.IOException when there is a failure opening the resource
510   */
511  public abstract TupleEntryIterator openTapForRead( Tap tap ) throws IOException;
512
513  /**
514   * Method openTapForWrite returns a (@link TupleCollector} for the given Tap instance.
515   *
516   * @param tap of type Tap
517   * @return TupleCollector
518   * @throws java.io.IOException when there is a failure opening the resource
519   */
520  public abstract TupleEntryCollector openTapForWrite( Tap tap ) throws IOException;
521
522  /**
523   * Method openTrapForWrite returns a (@link TupleCollector} for the given Tap instance.
524   *
525   * @param trap of type Tap
526   * @return TupleCollector
527   * @throws java.io.IOException when there is a failure opening the resource
528   */
529  public abstract TupleEntryCollector openTrapForWrite( Tap trap ) throws IOException;
530
531  public abstract TupleEntryCollector openSystemIntermediateForWrite() throws IOException;
532
533  /**
534   * Method getConfig returns the actual instance of the underlying configuration instance.
535   * <p>
536   * This instance should not be modified or cached, see {@link #getConfigCopy()} for a modifiable instance.
537   *
538   * @return an instance of the configuration
539   */
540  public abstract Config getConfig();
541
542  public abstract Config getConfigCopy();
543
544  public abstract <C> C copyConfig( C config );
545
546  public abstract <C> Map<String, String> diffConfigIntoMap( C defaultConfig, C updatedConfig );
547
548  public abstract Config mergeMapIntoConfig( Config defaultConfig, Map<String, String> map );
549
550  /**
551   * Method getTrapCollectorFor will return a new {@link TupleEntryCollector} if one hasn't previously
552   * been created for the given trap Tap.
553   *
554   * @param trap
555   * @return TupleEntryCollector
556   */
557  public TupleEntryCollector getTrapCollectorFor( Tap trap )
558    {
559    Map<Tap, TupleEntryCollector> trapCollectors = getTrapCollectors();
560
561    TupleEntryCollector trapCollector = trapCollectors.get( trap );
562
563    if( trapCollector == null )
564      {
565      try
566        {
567        trapCollector = openTrapForWrite( trap );
568        trapCollectors.put( trap, trapCollector );
569        }
570      catch( IOException exception )
571        {
572        throw new DuctException( exception );
573        }
574      }
575
576    return trapCollector;
577    }
578
579  protected synchronized Map<Tap, TupleEntryCollector> getTrapCollectors()
580    {
581    if( trapCollectors == null )
582      trapCollectors = Collections.synchronizedMap( new HashMap<Tap, TupleEntryCollector>() );
583
584    return trapCollectors;
585    }
586
587  public synchronized void closeTrapCollectors()
588    {
589    if( trapCollectors == null )
590      return;
591
592    for( TupleEntryCollector trapCollector : trapCollectors.values() )
593      {
594      try
595        {
596        trapCollector.close();
597        }
598      catch( Exception exception )
599        {
600        // do nothing
601        }
602      }
603
604    trapCollectors.clear();
605    }
606  }