001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.flow.hadoop;
022    
023    import java.beans.ConstructorProperties;
024    import java.util.HashMap;
025    import java.util.Map;
026    import java.util.Properties;
027    
028    import cascading.flow.FlowStep;
029    import cascading.flow.hadoop.planner.HadoopStepGraph;
030    import cascading.flow.hadoop.util.HadoopUtil;
031    import cascading.flow.planner.FlowStepGraph;
032    import cascading.scheme.NullScheme;
033    import cascading.tap.Tap;
034    import cascading.tap.hadoop.Hfs;
035    import org.apache.hadoop.fs.Path;
036    import org.apache.hadoop.mapred.FileInputFormat;
037    import org.apache.hadoop.mapred.FileOutputFormat;
038    import org.apache.hadoop.mapred.JobConf;
039    
040    /**
041     * Class MapReduceFlow is a {@link cascading.flow.hadoop.HadoopFlow} subclass that supports custom MapReduce jobs
042     * pre-configured via the {@link JobConf} object.
043     * <p/>
044     * Use this class to allow custom MapReduce jobs to participate in the {@link cascading.cascade.Cascade} scheduler. If
045     * other Flow instances in the Cascade share resources with this Flow instance, all participants will be scheduled
046     * according to their dependencies (topologically).
047     * <p/>
048     * Set the parameter {@code deleteSinkOnInit} to {@code true} if the outputPath in the jobConf should be deleted before executing the MapReduce job.
049     * <p/>
050     * MapReduceFlow assumes the underlying input and output paths are compatible with the {@link Hfs} Tap.
051     * <p/>
052     * If the configured JobConf instance uses some other identifier instead of Hadoop FS paths, you should override the
053     * {@link #createSources(org.apache.hadoop.mapred.JobConf)}, {@link #createSinks(org.apache.hadoop.mapred.JobConf)}, and
054     * {@link #createTraps(org.apache.hadoop.mapred.JobConf)} methods to properly resolve the configured paths into
055     * usable {@link Tap} instances. By default createTraps returns an empty collection and should probably be left alone.
056     */
057    public class MapReduceFlow extends HadoopFlow
058      {
059      /** Field deleteSinkOnInit */
060      protected boolean deleteSinkOnInit = false;
061    
062      /**
063       * Constructor MapReduceFlow creates a new MapReduceFlow instance.
064       *
065       * @param jobConf of type JobConf
066       */
067      @ConstructorProperties({"jobConf"})
068      public MapReduceFlow( JobConf jobConf )
069        {
070        this( jobConf.getJobName(), jobConf, false );
071        }
072    
073      /**
074       * Constructor MapReduceFlow creates a new MapReduceFlow instance.
075       *
076       * @param jobConf          of type JobConf
077       * @param deleteSinkOnInit of type boolean
078       */
079      @ConstructorProperties({"jobConf", "deleteSinkOnInit"})
080      public MapReduceFlow( JobConf jobConf, boolean deleteSinkOnInit )
081        {
082        this( jobConf.getJobName(), jobConf, deleteSinkOnInit );
083        }
084    
085      /**
086       * Constructor MapReduceFlow creates a new MapReduceFlow instance.
087       *
088       * @param name    of type String
089       * @param jobConf of type JobConf
090       */
091      @ConstructorProperties({"name", "jobConf"})
092      public MapReduceFlow( String name, JobConf jobConf )
093        {
094        this( name, jobConf, false );
095        }
096    
097      /**
098       * Constructor MapReduceFlow creates a new MapReduceFlow instance.
099       *
100       * @param name             of type String
101       * @param jobConf          of type JobConf
102       * @param deleteSinkOnInit of type boolean
103       */
104      @ConstructorProperties({"name", "jobConf", "deleteSinkOnInit"})
105      public MapReduceFlow( String name, JobConf jobConf, boolean deleteSinkOnInit )
106        {
107        this( name, jobConf, deleteSinkOnInit, true );
108        }
109    
110      /**
111       * Constructor MapReduceFlow creates a new MapReduceFlow instance.
112       *
113       * @param name             of type String
114       * @param jobConf          of type JobConf
115       * @param deleteSinkOnInit of type boolean
116       * @param stopJobsOnExit   of type boolean
117       */
118      @ConstructorProperties({"name", "jobConf", "deleteSinkOnInit", "stopJobsOnExit"})
119      public MapReduceFlow( String name, JobConf jobConf, boolean deleteSinkOnInit, boolean stopJobsOnExit )
120        {
121        super( HadoopUtil.getPlatformInfo(), new Properties(), jobConf, name );
122        this.deleteSinkOnInit = deleteSinkOnInit;
123        this.stopJobsOnExit = stopJobsOnExit;
124    
125        setSources( createSources( jobConf ) );
126        setSinks( createSinks( jobConf ) );
127        setTraps( createTraps( jobConf ) );
128        setFlowStepGraph( makeStepGraph( jobConf ) );
129        initSteps();
130    
131        initializeNewJobsMap();
132        }
133    
134      private FlowStepGraph makeStepGraph( JobConf jobConf )
135        {
136        FlowStepGraph<JobConf> flowStepGraph = new HadoopStepGraph();
137    
138        Tap sink = getSinksCollection().iterator().next();
139        FlowStep<JobConf> step = new MapReduceFlowStep( getName(), sink.toString(), jobConf, sink );
140    
141        flowStepGraph.addVertex( step );
142    
143        return flowStepGraph;
144        }
145    
146      protected Map<String, Tap> createSources( JobConf jobConf )
147        {
148        Path[] paths = FileInputFormat.getInputPaths( jobConf );
149    
150        Map<String, Tap> taps = new HashMap<String, Tap>();
151    
152        for( Path path : paths )
153          taps.put( path.toString(), new Hfs( new NullScheme(), path.toString() ) );
154    
155        return taps;
156        }
157    
158      protected Map<String, Tap> createSinks( JobConf jobConf )
159        {
160        Map<String, Tap> taps = new HashMap<String, Tap>();
161    
162        String path = FileOutputFormat.getOutputPath( jobConf ).toString();
163    
164        taps.put( path, new Hfs( new NullScheme(), path, deleteSinkOnInit ) );
165    
166        return taps;
167        }
168    
169      protected Map<String, Tap> createTraps( JobConf jobConf )
170        {
171        return new HashMap<String, Tap>();
172        }
173      }