001/*
002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow.hadoop;
022
023import java.beans.ConstructorProperties;
024import java.io.IOException;
025import java.util.HashMap;
026import java.util.Map;
027import java.util.Properties;
028
029import cascading.CascadingException;
030import cascading.flow.FlowStep;
031import cascading.flow.hadoop.util.HadoopUtil;
032import cascading.flow.planner.process.FlowStepGraph;
033import cascading.scheme.NullScheme;
034import cascading.tap.SinkMode;
035import cascading.tap.Tap;
036import cascading.tap.hadoop.Hfs;
037import org.apache.hadoop.fs.Path;
038import org.apache.hadoop.mapred.FileInputFormat;
039import org.apache.hadoop.mapred.FileOutputFormat;
040import org.apache.hadoop.mapred.JobConf;
041import org.apache.hadoop.mapreduce.Job;
042
043/**
044 * Class MapReduceFlow is a {@link cascading.flow.hadoop.HadoopFlow} subclass that supports custom MapReduce jobs
045 * pre-configured via the {@link JobConf} object.
046 * <p/>
047 * Use this class to allow custom MapReduce jobs to participate in the {@link cascading.cascade.Cascade} scheduler. If
048 * other Flow instances in the Cascade share resources with this Flow instance, all participants will be scheduled
049 * according to their dependencies (topologically).
050 * <p/>
051 * Set the parameter {@code deleteSinkOnInit} to {@code true} if the outputPath in the jobConf should be deleted before executing the MapReduce job.
052 * <p/>
053 * MapReduceFlow assumes the underlying input and output paths are compatible with the {@link Hfs} Tap.
054 * <p/>
055 * If the configured JobConf instance uses some other identifier instead of Hadoop FS paths, you should override the
056 * {@link #createSources(org.apache.hadoop.mapred.JobConf)}, {@link #createSinks(org.apache.hadoop.mapred.JobConf)}, and
057 * {@link #createTraps(org.apache.hadoop.mapred.JobConf)} methods to properly resolve the configured paths into
058 * usable {@link Tap} instances. By default createTraps returns an empty collection and should probably be left alone.
059 * <p/>
060 * MapReduceFlow supports both org.apache.hadoop.mapred.* and org.apache.hadoop.mapreduce.* API Jobs.
061 */
062public class MapReduceFlow extends HadoopFlow
063  {
064  /** Field deleteSinkOnInit */
065  protected boolean deleteSinkOnInit = false;
066
067  /**
068   * Constructor MapReduceFlow creates a new MapReduceFlow instance.
069   *
070   * @param jobConf of type JobConf
071   */
072  @ConstructorProperties({"jobConf"})
073  public MapReduceFlow( JobConf jobConf )
074    {
075    this( jobConf.getJobName(), jobConf, false );
076    }
077
078  /**
079   * Constructor MapReduceFlow creates a new MapReduceFlow instance.
080   *
081   * @param jobConf          of type JobConf
082   * @param deleteSinkOnInit of type boolean
083   */
084  @ConstructorProperties({"jobConf", "deleteSinkOnInit"})
085  public MapReduceFlow( JobConf jobConf, boolean deleteSinkOnInit )
086    {
087    this( jobConf.getJobName(), jobConf, deleteSinkOnInit );
088    }
089
090  /**
091   * Constructor MapReduceFlow creates a new MapReduceFlow instance.
092   *
093   * @param name    of type String
094   * @param jobConf of type JobConf
095   */
096  @ConstructorProperties({"name", "jobConf"})
097  public MapReduceFlow( String name, JobConf jobConf )
098    {
099    this( name, jobConf, false );
100    }
101
102  /**
103   * Constructor MapReduceFlow creates a new MapReduceFlow instance.
104   *
105   * @param name             of type String
106   * @param jobConf          of type JobConf
107   * @param deleteSinkOnInit of type boolean
108   */
109  @ConstructorProperties({"name", "jobConf", "deleteSinkOnInit"})
110  public MapReduceFlow( String name, JobConf jobConf, boolean deleteSinkOnInit )
111    {
112    this( new Properties(), name, jobConf, null, deleteSinkOnInit, true );
113    }
114
115  /**
116   * Constructor MapReduceFlow creates a new MapReduceFlow instance.
117   *
118   * @param properties       of type Properties
119   * @param name             of type String
120   * @param jobConf          of type JobConf
121   * @param deleteSinkOnInit of type boolean
122   */
123  @ConstructorProperties({"properties", "name", "jobConf", "deleteSinkOnInit"})
124  public MapReduceFlow( Properties properties, String name, JobConf jobConf, boolean deleteSinkOnInit )
125    {
126    this( properties, name, jobConf, null, deleteSinkOnInit, true );
127    }
128
129  /**
130   * Constructor MapReduceFlow creates a new MapReduceFlow instance.
131   *
132   * @param properties       of type Properties
133   * @param name             of type String
134   * @param jobConf          of type JobConf
135   * @param flowDescriptor   of type Map<String, String>
136   * @param deleteSinkOnInit of type boolean
137   */
138  @ConstructorProperties({"properties", "name", "jobConf", "flowDescriptor", "deleteSinkOnInit"})
139  public MapReduceFlow( Properties properties, String name, JobConf jobConf, Map<String, String> flowDescriptor, boolean deleteSinkOnInit )
140    {
141    this( properties, name, jobConf, flowDescriptor, deleteSinkOnInit, true );
142    }
143
144  /**
145   * Constructor MapReduceFlow creates a new MapReduceFlow instance.
146   *
147   * @param properties       of type Properties
148   * @param name             of type String
149   * @param jobConf          of type JobConf
150   * @param flowDescriptor   of type Map<String, String>
151   * @param deleteSinkOnInit of type boolean
152   * @param stopJobsOnExit   of type boolean
153   */
154  @ConstructorProperties({"properties", "name", "jobConf", "flowDescriptor", "deleteSinkOnInit", "stopJobsOnExit"})
155  public MapReduceFlow( Properties properties, String name, JobConf jobConf, Map<String, String> flowDescriptor, boolean deleteSinkOnInit, boolean stopJobsOnExit )
156    {
157    super( HadoopUtil.getPlatformInfo( JobConf.class, "org/apache/hadoop", "Hadoop MR" ), properties, jobConf, name, flowDescriptor );
158    this.deleteSinkOnInit = deleteSinkOnInit;
159    this.stopJobsOnExit = stopJobsOnExit;
160
161    initializeFrom( jobConf ); // push off initialization allowing for overrides
162    }
163
164  protected void initializeFrom( JobConf jobConf )
165    {
166    setSources( createSources( jobConf ) );
167    setSinks( createSinks( jobConf ) );
168    setTraps( createTraps( jobConf ) );
169    setFlowStepGraph( makeStepGraph( jobConf ) );
170
171    // this mirrors BaseFlow#initialize()
172
173    initSteps();
174
175    this.flowStats = createPrepareFlowStats(); // must be last
176
177    initializeNewJobsMap();
178
179    initializeChildStats();
180    }
181
182  protected FlowStepGraph makeStepGraph( JobConf jobConf )
183    {
184    FlowStepGraph flowStepGraph = new FlowStepGraph();
185
186    Tap sink = getSinksCollection().iterator().next();
187    FlowStep<JobConf> step = createFlowStep( jobConf, sink );
188
189    flowStepGraph.addVertex( step );
190
191    return flowStepGraph;
192    }
193
194  protected FlowStep<JobConf> createFlowStep( JobConf jobConf, Tap sink )
195    {
196    return new MapReduceFlowStep( getName(), sink.toString(), jobConf, sink );
197    }
198
199  protected Map<String, Tap> createSources( JobConf jobConf )
200    {
201    return fileInputToTaps( jobConf );
202    }
203
204  protected Map<String, Tap> fileInputToTaps( JobConf jobConf )
205    {
206    Path[] paths = FileInputFormat.getInputPaths( jobConf );
207
208    if( paths == null || paths.length == 0 )
209      {
210      try
211        {
212        paths = org.apache.hadoop.mapreduce.lib.input.FileInputFormat.getInputPaths( new Job( jobConf ) );
213        }
214      catch( IOException exception )
215        {
216        throw new CascadingException( exception );
217        }
218      }
219
220    Map<String, Tap> taps = new HashMap<>();
221
222    if( paths == null )
223      return taps;
224
225    for( Path path : paths )
226      toSourceTap( taps, path );
227
228    return taps;
229    }
230
231  protected Tap toSourceTap( Map<String, Tap> taps, Path path )
232    {
233    String name = makeNameFromPath( taps, path );
234
235    return taps.put( name, new Hfs( new NullScheme(), path.toString() ) );
236    }
237
238  protected Map<String, Tap> createSinks( JobConf jobConf )
239    {
240    return fileOutputToTaps( jobConf );
241    }
242
243  protected Map<String, Tap> fileOutputToTaps( JobConf jobConf )
244    {
245    Path path = FileOutputFormat.getOutputPath( jobConf );
246
247    if( path == null )
248      {
249      try
250        {
251        path = org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.getOutputPath( new Job( jobConf ) );
252        }
253      catch( IOException exception )
254        {
255        throw new CascadingException( exception );
256        }
257      }
258
259    Map<String, Tap> taps = new HashMap<>();
260
261    if( path != null )
262      toSinkTap( taps, path );
263
264    return taps;
265    }
266
267  protected Tap toSinkTap( Map<String, Tap> taps, Path path )
268    {
269    String name = makeNameFromPath( taps, path );
270
271    return taps.put( name, new Hfs( new NullScheme(), path.toString(), deleteSinkOnInit ? SinkMode.REPLACE : SinkMode.KEEP ) );
272    }
273
274  // find the least sensitive name
275  protected String makeNameFromPath( Map<String, Tap> taps, Path path )
276    {
277    Path parent = path.getParent();
278    String name = path.getName();
279
280    while( taps.containsKey( name ) )
281      {
282      name = new Path( parent.getName(), name ).toString();
283      parent = parent.getParent();
284      }
285
286    return name;
287    }
288
289  protected Map<String, Tap> createTraps( JobConf jobConf )
290    {
291    return new HashMap<>();
292    }
293  }