001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.flow.hadoop;
022    
023    import java.beans.ConstructorProperties;
024    import java.io.IOException;
025    import java.util.Arrays;
026    import java.util.Collection;
027    import java.util.HashMap;
028    import java.util.Map;
029    import java.util.Properties;
030    
031    import cascading.flow.FlowException;
032    import cascading.flow.FlowProcess;
033    import cascading.flow.hadoop.util.HadoopUtil;
034    import cascading.scheme.Scheme;
035    import cascading.scheme.SinkCall;
036    import cascading.scheme.SourceCall;
037    import cascading.tap.Tap;
038    import cascading.tuple.TupleEntryCollector;
039    import cascading.tuple.TupleEntryIterator;
040    import riffle.process.scheduler.ProcessException;
041    import riffle.process.scheduler.ProcessWrapper;
042    
043    /**
044     * Class ProcessFlow is a {@link cascading.flow.Flow} subclass that supports custom Riffle jobs.
045     * <p/>
046     * Use this class to allow custom Riffle jobs to participate in the {@link cascading.cascade.Cascade} scheduler. If
047     * other Flow instances in the Cascade share resources with this Flow instance, all participants will be scheduled
048     * according to their dependencies (topologically).
049     * <p/>
050     * Though this class sub-classes {@link HadoopFlow}, it does not support all the methods available or features.
051     * <p/>
052     * Currently {@link cascading.flow.FlowListener}s are supported but the
053     * {@link cascading.flow.FlowListener#onThrowable(cascading.flow.Flow, Throwable)} event is not.
054     */
055    public class ProcessFlow<P> extends HadoopFlow
056      {
057      /** Field process */
058      private final P process;
059      /** Field processWrapper */
060      private final ProcessWrapper processWrapper;
061    
062      private boolean isStarted = false; // only used for event handling
063    
064      /**
065       * Constructor ProcessFlow creates a new ProcessFlow instance.
066       *
067       * @param name    of type String
068       * @param process of type JobConf
069       */
070      @ConstructorProperties({"name", "process"})
071      public ProcessFlow( String name, P process )
072        {
073        this( new Properties(), name, process );
074        }
075    
076      /**
077       * Constructor ProcessFlow creates a new ProcessFlow instance.
078       *
079       * @param properties of type Map<Object, Object>
080       * @param name       of type String
081       * @param process    of type P
082       */
083      @ConstructorProperties({"properties", "name", "process"})
084      public ProcessFlow( Map<Object, Object> properties, String name, P process )
085        {
086        super( HadoopUtil.getPlatformInfo(), properties, null, name );
087        this.process = process;
088        this.processWrapper = new ProcessWrapper( this.process );
089    
090        setName( name );
091        setTapFromProcess();
092        }
093    
094      /**
095       * Method setTapFromProcess build {@link Tap} instance for the give process incoming and outgoing dependencies.
096       * <p/>
097       * This method may be called repeatedly to re-configure the source and sink taps.
098       */
099      public void setTapFromProcess()
100        {
101        setSources( createSources( this.processWrapper ) );
102        setSinks( createSinks( this.processWrapper ) );
103        setTraps( createTraps( this.processWrapper ) );
104        }
105    
106      /**
107       * Method getProcess returns the process of this ProcessFlow object.
108       *
109       * @return the process (type P) of this ProcessFlow object.
110       */
111      public P getProcess()
112        {
113        return process;
114        }
115    
116      @Override
117      public void prepare()
118        {
119        try
120          {
121          processWrapper.prepare();
122          }
123        catch( ProcessException exception )
124          {
125          if( exception.getCause() instanceof RuntimeException )
126            throw (RuntimeException) exception.getCause();
127    
128          throw new FlowException( "could not call prepare on process", exception.getCause() );
129          }
130        }
131    
132      @Override
133      public void start()
134        {
135        try
136          {
137          fireOnStarting();
138          processWrapper.start();
139          flowStats.markStarted();
140          isStarted = true;
141          }
142        catch( ProcessException exception )
143          {
144          if( exception.getCause() instanceof RuntimeException )
145            throw (RuntimeException) exception.getCause();
146    
147          throw new FlowException( "could not call start on process", exception.getCause() );
148          }
149        }
150    
151      @Override
152      public void stop()
153        {
154        try
155          {
156          fireOnStopping();
157          processWrapper.stop();
158          flowStats.markStopped();
159          }
160        catch( ProcessException exception )
161          {
162          flowStats.markFailed( exception );
163          if( exception.getCause() instanceof RuntimeException )
164            throw (RuntimeException) exception.getCause();
165    
166          throw new FlowException( "could not call stop on process", exception.getCause() );
167          }
168        }
169    
170      @Override
171      public void complete()
172        {
173        try
174          {
175          if( !isStarted )
176            {
177            fireOnStarting();
178            isStarted = true;
179            flowStats.markStarted();
180            }
181    
182          flowStats.markRunning();
183          processWrapper.complete();
184          fireOnCompleted();
185          flowStats.markSuccessful();
186          }
187        catch( ProcessException exception )
188          {
189          flowStats.markFailed( exception );
190          if( exception.getCause() instanceof RuntimeException )
191            throw (RuntimeException) exception.getCause();
192    
193          throw new FlowException( "could not call complete on process", exception.getCause() );
194          }
195        }
196    
197      @Override
198      public void cleanup()
199        {
200        try
201          {
202          processWrapper.cleanup();
203          }
204        catch( ProcessException exception )
205          {
206          if( exception.getCause() instanceof RuntimeException )
207            throw (RuntimeException) exception.getCause();
208    
209          throw new FlowException( "could not call cleanup on process", exception.getCause() );
210          }
211        }
212    
213      private Map<String, Tap> createSources( ProcessWrapper processParent )
214        {
215        try
216          {
217          return makeTapMap( processParent.getDependencyIncoming() );
218          }
219        catch( ProcessException exception )
220          {
221          if( exception.getCause() instanceof RuntimeException )
222            throw (RuntimeException) exception.getCause();
223    
224          throw new FlowException( "could not get process incoming dependency", exception.getCause() );
225          }
226        }
227    
228      private Map<String, Tap> createSinks( ProcessWrapper processParent )
229        {
230        try
231          {
232          return makeTapMap( processParent.getDependencyOutgoing() );
233          }
234        catch( ProcessException exception )
235          {
236          if( exception.getCause() instanceof RuntimeException )
237            throw (RuntimeException) exception.getCause();
238    
239          throw new FlowException( "could not get process outgoing dependency", exception.getCause() );
240          }
241        }
242    
243      private Map<String, Tap> makeTapMap( Object resource )
244        {
245        Collection paths = makeCollection( resource );
246    
247        Map<String, Tap> taps = new HashMap<String, Tap>();
248    
249        for( Object path : paths )
250          {
251          if( path instanceof Tap )
252            taps.put( ( (Tap) path ).getIdentifier(), (Tap) path );
253          else
254            taps.put( path.toString(), new ProcessTap( new NullScheme(), path.toString() ) );
255          }
256        return taps;
257        }
258    
259      private Collection makeCollection( Object resource )
260        {
261        if( resource instanceof Collection )
262          return (Collection) resource;
263        else if( resource instanceof Object[] )
264          return Arrays.asList( (Object[]) resource );
265        else
266          return Arrays.asList( resource );
267        }
268    
269      private Map<String, Tap> createTraps( ProcessWrapper processParent )
270        {
271        return new HashMap<String, Tap>();
272        }
273    
274      @Override
275      public String toString()
276        {
277        return getName() + ":" + process;
278        }
279    
280      static class NullScheme extends Scheme
281        {
282        public void sourceConfInit( FlowProcess flowProcess, Tap tap, Object conf )
283          {
284          }
285    
286        public void sinkConfInit( FlowProcess flowProcess, Tap tap, Object conf )
287          {
288          }
289    
290        public boolean source( FlowProcess flowProcess, SourceCall sourceCall ) throws IOException
291          {
292          throw new UnsupportedOperationException( "sourcing is not supported in the scheme" );
293          }
294    
295        @Override
296        public String toString()
297          {
298          return getClass().getSimpleName();
299          }
300    
301        public void sink( FlowProcess flowProcess, SinkCall sinkCall ) throws IOException
302          {
303          throw new UnsupportedOperationException( "sinking is not supported in the scheme" );
304          }
305        }
306    
307      /**
308       *
309       */
310      static class ProcessTap extends Tap
311        {
312        private final String token;
313    
314        ProcessTap( NullScheme scheme, String token )
315          {
316          super( scheme );
317          this.token = token;
318          }
319    
320        @Override
321        public String getIdentifier()
322          {
323          return token;
324          }
325    
326        @Override
327        public TupleEntryIterator openForRead( FlowProcess flowProcess, Object input ) throws IOException
328          {
329          return null;
330          }
331    
332        @Override
333        public TupleEntryCollector openForWrite( FlowProcess flowProcess, Object output ) throws IOException
334          {
335          return null;
336          }
337    
338        @Override
339        public boolean createResource( Object conf ) throws IOException
340          {
341          return false;
342          }
343    
344        @Override
345        public boolean deleteResource( Object conf ) throws IOException
346          {
347          return false;
348          }
349    
350        @Override
351        public boolean resourceExists( Object conf ) throws IOException
352          {
353          return false;
354          }
355    
356        @Override
357        public long getModifiedTime( Object conf ) throws IOException
358          {
359          return 0;
360          }
361    
362        @Override
363        public String toString()
364          {
365          return token;
366          }
367        }
368      }