001    /*
002     * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.flow;
022    
023    import java.util.Collection;
024    import java.util.Collections;
025    import java.util.HashMap;
026    import java.util.HashSet;
027    import java.util.Map;
028    import java.util.Properties;
029    import java.util.Set;
030    
031    import cascading.CascadingException;
032    import cascading.flow.planner.FlowPlanner;
033    import cascading.flow.planner.PlatformInfo;
034    import cascading.pipe.Pipe;
035    import cascading.property.AppProps;
036    import cascading.property.PropertyUtil;
037    import cascading.scheme.Scheme;
038    import cascading.tap.Tap;
039    import cascading.util.Util;
040    
041    import static cascading.flow.FlowDef.flowDef;
042    
043    /**
044     * Class FlowConnector is the base class for all platform planners.
045     * <p/>
046     * See the {@link FlowDef} class for a fluent way to define a new Flow.
047     * <p/>
048     * Use the FlowConnector to link source and sink {@link Tap} instances with an assembly of {@link Pipe} instances into
049     * an executable {@link cascading.flow.Flow}.
050     * <p/>
051     * FlowConnector invokes a planner for the target execution environment.
052     * <p/>
053     * For executing Flows in local memory against local files, see {@link cascading.flow.local.LocalFlowConnector}.
054     * <p/>
055     * For Apache Hadoop, see the {@link cascading.flow.hadoop.HadoopFlowConnector}.
056     * Or if you have a pre-existing custom Hadoop job to execute, see {@link cascading.flow.hadoop.MapReduceFlow}, which
057     * doesn't require a planner.
058     * <p/>
059     * Note that all {@code connect} methods take a single {@code tail} or an array of {@code tail} Pipe instances. "tail"
060     * refers to the last connected Pipe instances in a pipe-assembly. Pipe-assemblies are graphs of object with "heads"
061     * and "tails". From a given "tail", all connected heads can be found, but not the reverse. So "tails" must be
062     * supplied by the user.
063     * <p/>
064     * The FlowConnector and the underlying execution framework (Hadoop or local mode) can be configured via a
065     * {@link Map} or {@link Properties} instance given to the constructor.
066     * <p/>
067     * This properties map must be populated before constructing a FlowConnector instance. Many planner specific
068     * properties can be set through the {@link FlowConnectorProps} fluent interface.
069     * <p/>
070     * Some planners have required properties. Hadoop expects {@link AppProps#setApplicationJarPath(java.util.Map, String)} or
071     * {@link AppProps#setApplicationJarClass(java.util.Map, Class)} to be set.
072     * <p/>
073     * Any properties set and passed through the FlowConnector constructor will be global to all Flow instances created through
074     * the that FlowConnector instance. Some properties are on the {@link FlowDef} and would only be applicable to the
075     * resulting Flow instance.
076     * <p/>
077     * These properties are used to influence the current planner and are also passed down to the
078     * execution framework to override any default values. For example when using the Hadoop planner, the number of reducers
079     * or mappers can be set by using platform specific properties.
080     * <p/>
081     * Custom operations (Functions, Filter, etc) may also retrieve these property values at runtime through calls to
082     * {@link cascading.flow.FlowProcess#getProperty(String)} or {@link FlowProcess#getStringProperty(String)}.
083     * <p/>
084     * Most applications will need to call {@link cascading.property.AppProps#setApplicationJarClass(java.util.Map, Class)} or
085     * {@link cascading.property.AppProps#setApplicationJarPath(java.util.Map, String)} so that
086     * the correct application jar file is passed through to all child processes. The Class or path must reference
087     * the custom application jar, not a Cascading library class or jar. The easiest thing to do is give setApplicationJarClass
088     * the Class with your static main function and let Cascading figure out which jar to use.
089     * <p/>
090     * Note that Map<Object,Object> is compatible with the {@link Properties} class, so properties can be loaded at
091     * runtime from a configuration file.
092     * <p/>
093     * By default, all {@link cascading.operation.Assertion}s are planned into the resulting Flow instance. This can be
094     * changed for a given Flow by calling {@link FlowDef#setAssertionLevel(cascading.operation.AssertionLevel)} or globally
095     * via {@link FlowConnectorProps#setAssertionLevel(cascading.operation.AssertionLevel)}.
096     * <p/>
097     * Also by default, all {@link cascading.operation.Debug}s are planned into the resulting Flow instance. This can be
098     * changed for a given flow by calling {@link FlowDef#setDebugLevel(cascading.operation.DebugLevel)} or globally via
099     * {@link FlowConnectorProps#setDebugLevel(cascading.operation.DebugLevel)}.
100     *
101     * @see cascading.flow.local.LocalFlowConnector
102     * @see cascading.flow.hadoop.HadoopFlowConnector
103     */
104    public abstract class FlowConnector
105      {
106      /** Field properties */
107      protected Map<Object, Object> properties; // may be a Map or Properties instance. see PropertyUtil
108    
109      /**
110       * Method getIntermediateSchemeClass is used for debugging.
111       *
112       * @param properties of type Map<Object, Object>
113       * @return Class
114       */
115      public Class getIntermediateSchemeClass( Map<Object, Object> properties )
116        {
117        // supporting stuffed classes to overcome classloading issue
118        Object type = PropertyUtil.getProperty( properties, FlowConnectorProps.INTERMEDIATE_SCHEME_CLASS, null );
119    
120        if( type == null )
121          return getDefaultIntermediateSchemeClass();
122    
123        if( type instanceof Class )
124          return (Class) type;
125    
126        try
127          {
128          return FlowConnector.class.getClassLoader().loadClass( type.toString() );
129          }
130        catch( ClassNotFoundException exception )
131          {
132          throw new CascadingException( "unable to load class: " + type.toString(), exception );
133          }
134        }
135    
136      /**
137       * This has moved to {@link cascading.property.AppProps#setApplicationJarClass(java.util.Map, Class)}.
138       *
139       * @param properties
140       * @param type
141       */
142      @Deprecated
143      public static void setApplicationJarClass( Map<Object, Object> properties, Class type )
144        {
145        AppProps.setApplicationJarClass( properties, type );
146        }
147    
148      /**
149       * This has moved to {@link cascading.property.AppProps#setApplicationJarPath(java.util.Map, String)}.
150       *
151       * @param properties
152       * @param path
153       */
154      @Deprecated
155      public static void setApplicationJarPath( Map<Object, Object> properties, String path )
156        {
157        AppProps.setApplicationJarPath( properties, path );
158        }
159    
160      protected abstract Class<? extends Scheme> getDefaultIntermediateSchemeClass();
161    
162      protected FlowConnector()
163        {
164        this.properties = new HashMap<Object, Object>();
165        }
166    
167      protected FlowConnector( Map<Object, Object> properties )
168        {
169        if( properties == null )
170          this.properties = new HashMap<Object, Object>();
171        else if( properties instanceof Properties )
172          this.properties = new Properties( (Properties) properties );
173        else
174          this.properties = new HashMap<Object, Object>( properties );
175        }
176    
177      /**
178       * Method getProperties returns the properties of this FlowConnector object. The returned Map instance
179       * is immutable to prevent changes to the underlying property values in this FlowConnector instance.
180       * <p/>
181       * If a {@link Properties} instance was passed to the constructor, the returned object will be a flattened
182       * {@link Map} instance.
183       *
184       * @return the properties (type Map<Object, Object>) of this FlowConnector object.
185       */
186      public Map<Object, Object> getProperties()
187        {
188        // Sub-classes of FlowConnector should rely on PropertyUtil to manage access to properties objects internally.
189        return Collections.unmodifiableMap( PropertyUtil.asFlatMap( properties ) );
190        }
191    
192      /**
193       * Method connect links the given source and sink Taps to the given pipe assembly.
194       *
195       * @param source source Tap to bind to the head of the given tail Pipe
196       * @param sink   sink Tap to bind to the given tail Pipe
197       * @param tail   tail end of a pipe assembly
198       * @return Flow
199       */
200      public Flow connect( Tap source, Tap sink, Pipe tail )
201        {
202        return connect( null, source, sink, tail );
203        }
204    
205      /**
206       * Method connect links the given source and sink Taps to the given pipe assembly.
207       *
208       * @param name   name to give the resulting Flow
209       * @param source source Tap to bind to the head of the given tail Pipe
210       * @param sink   sink Tap to bind to the given tail Pipe
211       * @param tail   tail end of a pipe assembly
212       * @return Flow
213       */
214      public Flow connect( String name, Tap source, Tap sink, Pipe tail )
215        {
216        Map<String, Tap> sources = new HashMap<String, Tap>();
217    
218        sources.put( tail.getHeads()[ 0 ].getName(), source );
219    
220        return connect( name, sources, sink, tail );
221        }
222    
223      /**
224       * Method connect links the given source, sink, and trap Taps to the given pipe assembly. The given trap will
225       * be linked to the assembly head along with the source.
226       *
227       * @param name   name to give the resulting Flow
228       * @param source source Tap to bind to the head of the given tail Pipe
229       * @param sink   sink Tap to bind to the given tail Pipe
230       * @param trap   trap Tap to sink all failed Tuples into
231       * @param tail   tail end of a pipe assembly
232       * @return Flow
233       */
234      public Flow connect( String name, Tap source, Tap sink, Tap trap, Pipe tail )
235        {
236        Map<String, Tap> sources = new HashMap<String, Tap>();
237    
238        sources.put( tail.getHeads()[ 0 ].getName(), source );
239    
240        Map<String, Tap> traps = new HashMap<String, Tap>();
241    
242        traps.put( tail.getHeads()[ 0 ].getName(), trap );
243    
244        return connect( name, sources, sink, traps, tail );
245        }
246    
247      /**
248       * Method connect links the named source Taps and sink Tap to the given pipe assembly.
249       *
250       * @param sources all head names and source Taps to bind to the heads of the given tail Pipe
251       * @param sink    sink Tap to bind to the given tail Pipe
252       * @param tail    tail end of a pipe assembly
253       * @return Flow
254       */
255      public Flow connect( Map<String, Tap> sources, Tap sink, Pipe tail )
256        {
257        return connect( null, sources, sink, tail );
258        }
259    
260      /**
261       * Method connect links the named source Taps and sink Tap to the given pipe assembly.
262       *
263       * @param name    name to give the resulting Flow
264       * @param sources all head names and source Taps to bind to the heads of the given tail Pipe
265       * @param sink    sink Tap to bind to the given tail Pipe
266       * @param tail    tail end of a pipe assembly
267       * @return Flow
268       */
269      public Flow connect( String name, Map<String, Tap> sources, Tap sink, Pipe tail )
270        {
271        Map<String, Tap> sinks = new HashMap<String, Tap>();
272    
273        sinks.put( tail.getName(), sink );
274    
275        return connect( name, sources, sinks, tail );
276        }
277    
278      /**
279       * Method connect links the named source and trap Taps and sink Tap to the given pipe assembly.
280       *
281       * @param name    name to give the resulting Flow
282       * @param sources all head names and source Taps to bind to the heads of the given tail Pipe
283       * @param sink    sink Tap to bind to the given tail Pipe
284       * @param traps   all pipe names and trap Taps to sink all failed Tuples into
285       * @param tail    tail end of a pipe assembly
286       * @return Flow
287       */
288      public Flow connect( String name, Map<String, Tap> sources, Tap sink, Map<String, Tap> traps, Pipe tail )
289        {
290        Map<String, Tap> sinks = new HashMap<String, Tap>();
291    
292        sinks.put( tail.getName(), sink );
293    
294        return connect( name, sources, sinks, traps, tail );
295        }
296    
297      /**
298       * Method connect links the named trap Taps, source and sink Tap to the given pipe assembly.
299       *
300       * @param name   name to give the resulting Flow
301       * @param source source Tap to bind to the head of the given tail Pipe
302       * @param sink   sink Tap to bind to the given tail Pipe
303       * @param traps  all pipe names and trap Taps to sink all failed Tuples into
304       * @param tail   tail end of a pipe assembly
305       * @return Flow
306       */
307      public Flow connect( String name, Tap source, Tap sink, Map<String, Tap> traps, Pipe tail )
308        {
309        Map<String, Tap> sources = new HashMap<String, Tap>();
310    
311        sources.put( tail.getHeads()[ 0 ].getName(), source );
312    
313        Map<String, Tap> sinks = new HashMap<String, Tap>();
314    
315        sinks.put( tail.getName(), sink );
316    
317        return connect( name, sources, sinks, traps, tail );
318        }
319    
320      /**
321       * Method connect links the named source Taps and sink Tap to the given pipe assembly.
322       * <p/>
323       * Since only once source Tap is given, it is assumed to be associated with the 'head' pipe.
324       * So the head pipe does not need to be included as an argument.
325       *
326       * @param source source Tap to bind to the head of the given tail Pipes
327       * @param sinks  all tail names and sink Taps to bind to the given tail Pipes
328       * @param tails  all tail ends of a pipe assembly
329       * @return Flow
330       */
331      public Flow connect( Tap source, Map<String, Tap> sinks, Collection<Pipe> tails )
332        {
333        return connect( null, source, sinks, tails.toArray( new Pipe[ tails.size() ] ) );
334        }
335    
336      /**
337       * Method connect links the named source Taps and sink Tap to the given pipe assembly.
338       * <p/>
339       * Since only once source Tap is given, it is assumed to be associated with the 'head' pipe.
340       * So the head pipe does not need to be included as an argument.
341       *
342       * @param name   name to give the resulting Flow
343       * @param source source Tap to bind to the head of the given tail Pipes
344       * @param sinks  all tail names and sink Taps to bind to the given tail Pipes
345       * @param tails  all tail ends of a pipe assembly
346       * @return Flow
347       */
348      public Flow connect( String name, Tap source, Map<String, Tap> sinks, Collection<Pipe> tails )
349        {
350        return connect( name, source, sinks, tails.toArray( new Pipe[ tails.size() ] ) );
351        }
352    
353      /**
354       * Method connect links the named source Taps and sink Tap to the given pipe assembly.
355       * <p/>
356       * Since only once source Tap is given, it is assumed to be associated with the 'head' pipe.
357       * So the head pipe does not need to be included as an argument.
358       *
359       * @param source source Tap to bind to the head of the given tail Pipes
360       * @param sinks  all tail names and sink Taps to bind to the given tail Pipes
361       * @param tails  all tail ends of a pipe assembly
362       * @return Flow
363       */
364      public Flow connect( Tap source, Map<String, Tap> sinks, Pipe... tails )
365        {
366        return connect( null, source, sinks, tails );
367        }
368    
369      /**
370       * Method connect links the named source Taps and sink Tap to the given pipe assembly.
371       * <p/>
372       * Since only once source Tap is given, it is assumed to be associated with the 'head' pipe.
373       * So the head pipe does not need to be included as an argument.
374       *
375       * @param name   name to give the resulting Flow
376       * @param source source Tap to bind to the head of the given tail Pipes
377       * @param sinks  all tail names and sink Taps to bind to the given tail Pipes
378       * @param tails  all tail ends of a pipe assembly
379       * @return Flow
380       */
381      public Flow connect( String name, Tap source, Map<String, Tap> sinks, Pipe... tails )
382        {
383        Set<Pipe> heads = new HashSet<Pipe>();
384    
385        for( Pipe pipe : tails )
386          Collections.addAll( heads, pipe.getHeads() );
387    
388        if( heads.isEmpty() )
389          throw new IllegalArgumentException( "no pipe instance found" );
390    
391        if( heads.size() != 1 )
392          throw new IllegalArgumentException( "there may be only 1 head pipe instance, found " + heads.size() );
393    
394        Map<String, Tap> sources = new HashMap<String, Tap>();
395    
396        for( Pipe pipe : heads )
397          sources.put( pipe.getName(), source );
398    
399        return connect( name, sources, sinks, tails );
400        }
401    
402      /**
403       * Method connect links the named sources and sinks to the given pipe assembly.
404       *
405       * @param sources all head names and source Taps to bind to the heads of the given tail Pipes
406       * @param sinks   all tail names and sink Taps to bind to the given tail Pipes
407       * @param tails   all tail ends of a pipe assembly
408       * @return Flow
409       */
410      public Flow connect( Map<String, Tap> sources, Map<String, Tap> sinks, Pipe... tails )
411        {
412        return connect( null, sources, sinks, tails );
413        }
414    
415      /**
416       * Method connect links the named sources and sinks to the given pipe assembly.
417       *
418       * @param name    name to give the resulting Flow
419       * @param sources all head names and source Taps to bind to the heads of the given tail Pipes
420       * @param sinks   all tail names and sink Taps to bind to the given tail Pipes
421       * @param tails   all tail ends of a pipe assembly
422       * @return Flow
423       */
424      public Flow connect( String name, Map<String, Tap> sources, Map<String, Tap> sinks, Pipe... tails )
425        {
426        return connect( name, sources, sinks, new HashMap<String, Tap>(), tails );
427        }
428    
429      /**
430       * Method connect links the named sources, sinks and traps to the given pipe assembly.
431       *
432       * @param name    name to give the resulting Flow
433       * @param sources all head names and source Taps to bind to the heads of the given tail Pipes
434       * @param sinks   all tail names and sink Taps to bind to the given tail Pipes
435       * @param traps   all pipe names and trap Taps to sink all failed Tuples into
436       * @param tails   all tail ends of a pipe assembly
437       * @return Flow
438       */
439      public Flow connect( String name, Map<String, Tap> sources, Map<String, Tap> sinks, Map<String, Tap> traps, Pipe... tails )
440        {
441        name = name == null ? makeName( tails ) : name;
442    
443        FlowDef flowDef = flowDef()
444          .setName( name )
445          .addSources( sources )
446          .addSinks( sinks )
447          .addTraps( traps )
448          .addTails( tails );
449    
450        return connect( flowDef );
451        }
452    
453      public Flow connect( FlowDef flowDef )
454        {
455        FlowPlanner flowPlanner = createFlowPlanner();
456    
457        flowPlanner.initialize( this, properties );
458    
459        return flowPlanner.buildFlow( flowDef );
460        }
461    
462      protected abstract FlowPlanner createFlowPlanner();
463    
464      /**
465       * Method getPlatformInfo returns an instance of {@link PlatformInfo} for the underlying platform.
466       *
467       * @return of type PlatformInfo
468       */
469      public PlatformInfo getPlatformInfo()
470        {
471        return createFlowPlanner().getPlatformInfo();
472        }
473    
474      /////////
475      // UTIL
476      /////////
477    
478      private String makeName( Pipe[] pipes )
479        {
480        String[] names = new String[ pipes.length ];
481    
482        for( int i = 0; i < pipes.length; i++ )
483          names[ i ] = pipes[ i ].getName();
484    
485        String name = Util.join( names, "+" );
486    
487        if( name.length() > 32 )
488          name = name.substring( 0, 32 );
489    
490        return name;
491        }
492      }