001    /*
002     * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.flow;
022    
023    import java.io.IOException;
024    import java.util.Collection;
025    import java.util.Collections;
026    import java.util.Map;
027    
028    import cascading.tap.Tap;
029    import cascading.tuple.TupleEntryCollector;
030    import cascading.tuple.TupleEntryIterator;
031    
032    /**
033     * FlowProcess implementations provide a call-back interface into the current computing system. Each
034     * {@link cascading.operation.Operation} is given a reference to a particular implementation, allowing it
035     * to get configuration properties, send a "keep alive" ping, or to set a counter value.
036     * <p/>
037     * Depending on the underlying system, FlowProcess instances are not continuous across all operations in a {@link Flow}.
038     * Thus, a call to {@link #increment(Enum, long)} may start incrementing from zero if the operation making the call
039     * belongs to a subsequent 'job' or 'step' from any previous operations calling increment.
040     * <p/>
041     * A FlowProcess is roughly a child of {@link FlowSession}. FlowSession is roughly one to one with a particular {@link Flow}.
042     * And every FlowSession will have one or more FlowProcesses.
043     *
044     * @see FlowSession
045     */
046    public abstract class FlowProcess<Config>
047      {
048      /** Field NULL is a noop implementation of FlowSession. */
049      public static FlowProcess NULL = new NullFlowProcess();
050    
051      public static class NullFlowProcess extends FlowProcess<Object>
052        {
053        protected NullFlowProcess()
054          {
055          }
056    
057        @Override
058        public FlowProcess copyWith( Object object )
059          {
060          return new NullFlowProcess();
061          }
062    
063        public Object getProperty( String key )
064          {
065          return null;
066          }
067    
068        @Override
069        public Collection<String> getPropertyKeys()
070          {
071          return Collections.EMPTY_SET;
072          }
073    
074        @Override
075        public Object newInstance( String className )
076          {
077          return null;
078          }
079    
080        public void keepAlive()
081          {
082          }
083    
084        public void increment( Enum counter, long amount )
085          {
086          }
087    
088        public void increment( String group, String counter, long amount )
089          {
090          }
091    
092        public void setStatus( String status )
093          {
094          }
095    
096        @Override
097        public boolean isCounterStatusInitialized()
098          {
099          return true;
100          }
101    
102        @Override
103        public int getNumProcessSlices()
104          {
105          return 1;
106          }
107    
108        @Override
109        public int getCurrentSliceNum()
110          {
111          return 0;
112          }
113    
114        public TupleEntryIterator openTapForRead( Tap tap ) throws IOException
115          {
116          return tap.openForRead( this );
117          }
118    
119        public TupleEntryCollector openTapForWrite( Tap tap ) throws IOException
120          {
121          return tap.openForWrite( this );
122          }
123    
124        @Override
125        public TupleEntryCollector openTrapForWrite( Tap trap ) throws IOException
126          {
127          return trap.openForWrite( this );
128          }
129    
130        @Override
131        public TupleEntryCollector openSystemIntermediateForWrite() throws IOException
132          {
133          return null;
134          }
135    
136        @Override
137        public Object getConfigCopy()
138          {
139          return null;
140          }
141    
142        @Override
143        public Object copyConfig( Object config )
144          {
145          return config;
146          }
147    
148        @Override
149        public Map<String, String> diffConfigIntoMap( Object defaultConfig, Object updatedConfig )
150          {
151          return null;
152          }
153    
154        @Override
155        public Object mergeMapIntoConfig( Object defaultConfig, Map<String, String> map )
156          {
157          return null;
158          }
159        }
160    
161      /** Field currentSession */
162      private FlowSession currentSession = FlowSession.NULL;
163    
164      protected FlowProcess()
165        {
166        }
167    
168      protected FlowProcess( FlowSession currentSession )
169        {
170        setCurrentSession( currentSession );
171        }
172    
173      public abstract FlowProcess copyWith( Config config );
174    
175      /**
176       * Method getID() returns the current
177       *
178       * @return of type String
179       */
180      public String getID()
181        {
182        return getStringProperty( FlowStep.CASCADING_FLOW_STEP_ID );
183        }
184    
185      /**
186       * Method getCurrentSession returns the currentSession of this FlowProcess object.
187       *
188       * @return the currentSession (type FlowSession) of this FlowProcess object.
189       */
190      public FlowSession getCurrentSession()
191        {
192        return currentSession;
193        }
194    
195      /**
196       * Method setCurrentSession sets the currentSession of this FlowProcess object.
197       *
198       * @param currentSession the currentSession of this FlowProcess object.
199       */
200      public void setCurrentSession( FlowSession currentSession )
201        {
202        this.currentSession = currentSession;
203    
204        currentSession.setCurrentProcess( this );
205        }
206    
207      /**
208       * Method getNumProcessSlices returns the number of parallel slices or tasks allocated
209       * for this process execution.
210       * <p/>
211       * For MapReduce platforms, this is the same as the number of tasks for a given MapReduce job.
212       *
213       * @return an int
214       */
215      public abstract int getNumProcessSlices();
216    
217      /**
218       * Method getCurrentSliceNum returns an integer representing which slice instance currently running.
219       * <p/>
220       * {@code 0} (zero) is the first slice instance.
221       *
222       * @return an int
223       */
224      public abstract int getCurrentSliceNum();
225    
226      /**
227       * Method getProperty should be used to return configuration parameters from the underlying system.
228       * <p/>
229       * In the case of Hadoop, the current Configuration will be queried.
230       *
231       * @param key of type String
232       * @return an Object
233       */
234      public abstract Object getProperty( String key );
235    
236      /**
237       * Method getStringProperty should be used to return configuration parameters from the underlying system.
238       * <p/>
239       * In the case of Hadoop, the current Configuration will be queried.
240       *
241       * @param key of type String,
242       * @return null if property is not set
243       */
244      public String getStringProperty( String key )
245        {
246        Object value = getProperty( key );
247    
248        if( value == null )
249          return null;
250    
251        return value.toString();
252        }
253    
254      /**
255       * Method getStringProperty should be used to return configuration parameters from the underlying system.
256       * <p/>
257       * In the case of Hadoop, the current Configuration will be queried.
258       *
259       * @param key          of type String,
260       * @param defaultValue of type String,
261       * @return {@code defaultValue} if property is not set
262       */
263      public String getStringProperty( String key, String defaultValue )
264        {
265        Object value = getProperty( key );
266    
267        if( value == null )
268          return defaultValue;
269    
270        return value.toString();
271        }
272    
273      /**
274       * Method getIntegerProperty should be used to return configuration parameters from the underlying system.
275       * <p/>
276       * In the case of Hadoop, the current Configuration will be queried.
277       *
278       * @param key of type String,
279       * @return null if property is not set
280       */
281      public Integer getIntegerProperty( String key )
282        {
283        String value = getStringProperty( key );
284    
285        if( value == null || value.isEmpty() )
286          return null;
287    
288        return Integer.valueOf( value );
289        }
290    
291      /**
292       * Method getIntegerProperty should be used to return configuration parameters from the underlying system.
293       * <p/>
294       * In the case of Hadoop, the current Configuration will be queried.
295       *
296       * @param key          of type String,
297       * @param defaultValue of type int,
298       * @return {@code defaultValue} if property is not set
299       */
300      public int getIntegerProperty( String key, int defaultValue )
301        {
302        String value = getStringProperty( key );
303    
304        if( value == null || value.isEmpty() )
305          return defaultValue;
306    
307        return Integer.valueOf( value );
308        }
309    
310      /**
311       * Method getBooleanProperty should be used to return configuration parameters from the underlying system.
312       * <p/>
313       * In the case of Hadoop, the current Configuration will be queried.
314       *
315       * @param key of type Boolean, null if property is not set
316       * @return an Object
317       */
318      public Boolean getBooleanProperty( String key )
319        {
320        String value = getStringProperty( key );
321    
322        if( value == null || value.isEmpty() )
323          return null;
324    
325        return Boolean.valueOf( value );
326        }
327    
328      /**
329       * Method getBooleanProperty should be used to return configuration parameters from the underlying system.
330       * <p/>
331       * In the case of Hadoop, the current Configuration will be queried.
332       *
333       * @param key          of type String
334       * @param defaultValue of type boolean
335       * @return {@code defaultValue} if property is not set
336       */
337      public boolean getBooleanProperty( String key, boolean defaultValue )
338        {
339        String value = getStringProperty( key );
340    
341        if( value == null || value.isEmpty() )
342          return defaultValue;
343    
344        return Boolean.valueOf( value );
345        }
346    
347      /**
348       * Method getPropertyKeys returns an immutable collection of all available property key values.
349       *
350       * @return a Collection<String>
351       */
352      public abstract Collection<String> getPropertyKeys();
353    
354      /**
355       * Method newInstance creates a new object instance from the given className argument delegating to any
356       * platform specific instantiation and configuration routines.
357       *
358       * @param className
359       * @return an instance of className
360       */
361      public abstract Object newInstance( String className );
362    
363      /**
364       * Method keepAlive notifies the system that the current process is still alive. Use this method if a particular
365       * {@link cascading.operation.Operation} takes some moments to complete. Each system is different, so calling
366       * ping every few seconds to every minute or so would be best.
367       * <p/>
368       * This method will fail silently if the underlying mechanism to notify keepAlive status are not initialized.
369       */
370      public abstract void keepAlive();
371    
372      /**
373       * Method increment is used to increment a custom counter. Counters must be of type Enum. The amount
374       * to increment must be a integer value.
375       * <p/>
376       * This method will fail if the underlying counter infrastructure is unavailable. See {@link #isCounterStatusInitialized()}.
377       *
378       * @param counter of type Enum
379       * @param amount  of type int
380       */
381      public abstract void increment( Enum counter, long amount );
382    
383      /**
384       * Method increment is used to increment a custom counter. The amount to increment must be a integer value.
385       * <p/>
386       * This method will fail if the underlying counter infrastructure is unavailable. See {@link #isCounterStatusInitialized()}.
387       *
388       * @param group   of type String
389       * @param counter of type String
390       * @param amount  of type int
391       */
392      public abstract void increment( String group, String counter, long amount );
393    
394      /**
395       * Method setStatus is used to set the status of the current operation.
396       * <p/>
397       * This method will fail if the underlying counter infrastructure is unavailable. See {@link #isCounterStatusInitialized()}.
398       *
399       * @param status of type String
400       */
401      public abstract void setStatus( String status );
402    
403      /**
404       * Method isCounterStatusInitialized returns true if it is safe to increment a counter or set a status.
405       *
406       * @return boolean
407       */
408      public abstract boolean isCounterStatusInitialized();
409    
410      /**
411       * Method openTapForRead return a {@link cascading.tuple.TupleEntryIterator} for the given Tap instance.
412       * <p/>
413       * Note the returned iterator will return the same instance of {@link cascading.tuple.TupleEntry} on every call,
414       * thus a copy must be made of either the TupleEntry or the underlying {@code Tuple} instance if they are to be
415       * stored in a Collection.
416       *
417       * @param tap of type Tap
418       * @return TupleIterator
419       * @throws java.io.IOException when there is a failure opening the resource
420       */
421      public abstract TupleEntryIterator openTapForRead( Tap tap ) throws IOException;
422    
423      /**
424       * Method openTapForWrite returns a (@link TupleCollector} for the given Tap instance.
425       *
426       * @param tap of type Tap
427       * @return TupleCollector
428       * @throws java.io.IOException when there is a failure opening the resource
429       */
430      public abstract TupleEntryCollector openTapForWrite( Tap tap ) throws IOException;
431    
432      /**
433       * Method openTrapForWrite returns a (@link TupleCollector} for the given Tap instance.
434       *
435       * @param trap of type Tap
436       * @return TupleCollector
437       * @throws java.io.IOException when there is a failure opening the resource
438       */
439      public abstract TupleEntryCollector openTrapForWrite( Tap trap ) throws IOException;
440    
441      public abstract TupleEntryCollector openSystemIntermediateForWrite() throws IOException;
442    
443      public abstract Config getConfigCopy();
444    
445      public abstract Config copyConfig( Config jobConf );
446    
447      public abstract Map<String, String> diffConfigIntoMap( Config defaultConfig, Config updatedConfig );
448    
449      public abstract Config mergeMapIntoConfig( Config defaultConfig, Map<String, String> map );
450      }