001/*
002 * Copyright (c) 2016-2018 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
004 *
005 * Project and contact information: http://www.cascading.org/
006 *
007 * This file is part of the Cascading project.
008 *
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *     http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 */
021
022package cascading.tap.hadoop.util;
023
024import java.io.IOException;
025import java.net.URI;
026import java.util.HashMap;
027import java.util.Map;
028import java.util.concurrent.atomic.AtomicInteger;
029
030import cascading.flow.hadoop.util.HadoopUtil;
031import cascading.tap.Tap;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.fs.FileStatus;
034import org.apache.hadoop.fs.FileSystem;
035import org.apache.hadoop.fs.Path;
036import org.apache.hadoop.mapred.FileOutputFormat;
037import org.slf4j.Logger;
038import org.slf4j.LoggerFactory;
039
040import static cascading.flow.hadoop.util.HadoopUtil.asJobConfInstance;
041
042public class Hadoop18TapUtil
043  {
044  /** Field LOG */
045  private static final Logger LOG = LoggerFactory.getLogger( Hadoop18TapUtil.class );
046
047  /** The Hadoop temporary path used to prevent collisions */
048  public static final String TEMPORARY_PATH = "_temporary";
049
050  private static final Map<String, AtomicInteger> pathCounts = new HashMap<String, AtomicInteger>();
051
052  /**
053   * should only be called if not in a Flow
054   *
055   * @param conf
056   * @throws IOException
057   */
058  public static void setupJob( Configuration conf ) throws IOException
059    {
060    Path outputPath = FileOutputFormat.getOutputPath( asJobConfInstance( conf ) );
061
062    if( outputPath == null )
063      return;
064
065    if( getFSSafe( conf, outputPath ) == null )
066      return;
067
068    String taskID = conf.get( "mapred.task.id", conf.get( "mapreduce.task.id" ) );
069
070    if( taskID == null ) // need to stuff a fake id
071      {
072      String mapper = conf.getBoolean( "mapred.task.is.map", conf.getBoolean( "mapreduce.task.is.map", true ) ) ? "m" : "r";
073      String value = String.format( "attempt_%012d_0000_%s_000000_0", (int) Math.rint( System.currentTimeMillis() ), mapper );
074      conf.set( "mapred.task.id", value );
075      conf.set( "mapreduce.task.id", value );
076      }
077
078    makeTempPath( conf );
079
080    if( writeDirectlyToWorkingPath( conf, outputPath ) )
081      {
082      LOG.info( "writing directly to output path: {}", outputPath );
083      setWorkOutputPath( conf, outputPath );
084      return;
085      }
086
087    // "mapred.work.output.dir"
088    Path taskOutputPath = getTaskOutputPath( conf );
089    setWorkOutputPath( conf, taskOutputPath );
090    }
091
092  public static synchronized void setupTask( Configuration conf ) throws IOException
093    {
094    String workpath = conf.get( "mapred.work.output.dir" );
095
096    if( workpath == null )
097      return;
098
099    FileSystem fs = getFSSafe( conf, new Path( workpath ) );
100
101    if( fs == null )
102      return;
103
104    String taskId = conf.get( "mapred.task.id", conf.get( "mapreduce.task.id" ) );
105
106    LOG.info( "setting up task: '{}' - {}", taskId, workpath );
107
108    AtomicInteger integer = pathCounts.get( workpath );
109
110    if( integer == null )
111      {
112      integer = new AtomicInteger();
113      pathCounts.put( workpath, integer );
114      }
115
116    integer.incrementAndGet();
117    }
118
119  public static boolean needsTaskCommit( Configuration conf ) throws IOException
120    {
121    String workpath = conf.get( "mapred.work.output.dir" );
122
123    if( workpath == null )
124      return false;
125
126    Path taskOutputPath = new Path( workpath );
127
128    if( taskOutputPath != null )
129      {
130      FileSystem fs = getFSSafe( conf, taskOutputPath );
131
132      if( fs == null )
133        return false;
134
135      if( fs.exists( taskOutputPath ) )
136        return true;
137      }
138
139    return false;
140    }
141
142  /**
143   * copies all files from the taskoutputpath to the outputpath
144   *
145   * @param conf
146   */
147  public static void commitTask( Configuration conf ) throws IOException
148    {
149    Path taskOutputPath = new Path( conf.get( "mapred.work.output.dir" ) );
150
151    FileSystem fs = getFSSafe( conf, taskOutputPath );
152
153    if( fs == null )
154      return;
155
156    AtomicInteger integer = pathCounts.get( taskOutputPath.toString() );
157
158    if( integer.decrementAndGet() != 0 )
159      return;
160
161    String taskId = conf.get( "mapred.task.id", conf.get( "mapreduce.task.id" ) );
162
163    LOG.info( "committing task: '{}' - {}", taskId, taskOutputPath );
164
165    if( taskOutputPath != null )
166      {
167      if( writeDirectlyToWorkingPath( conf, taskOutputPath ) )
168        return;
169
170      if( fs.exists( taskOutputPath ) )
171        {
172        Path jobOutputPath = taskOutputPath.getParent().getParent();
173        // Move the task outputs to their final place
174        moveTaskOutputs( conf, fs, jobOutputPath, taskOutputPath );
175
176        // Delete the temporary task-specific output directory
177        if( !fs.delete( taskOutputPath, true ) )
178          LOG.info( "failed to delete the temporary output directory of task: '{}' - {}", taskId, taskOutputPath );
179
180        LOG.info( "saved output of task '{}' to {}", taskId, jobOutputPath );
181        }
182      }
183    }
184
185  /**
186   * Called from flow step to remove temp dirs
187   *
188   * @param conf
189   * @throws IOException
190   */
191  public static void cleanupTapMetaData( Configuration conf, Tap tap ) throws IOException
192    {
193    cleanTempPath( conf, new Path( tap.getIdentifier() ) );
194    }
195
196  public static void writeSuccessMarker( Configuration conf ) throws IOException
197    {
198    writeSuccessMarker( conf, FileOutputFormat.getOutputPath( asJobConfInstance( conf ) ) );
199    }
200
201  public static void writeSuccessMarker( Configuration conf, Path outputPath ) throws IOException
202    {
203    if( conf.getBoolean( "mapreduce.fileoutputcommitter.marksuccessfuljobs", true ) )
204      {
205      LOG.info( "writing success marker to {}", outputPath );
206
207      Path markerPath = new Path( outputPath, "_SUCCESS" );
208      FileSystem fs = markerPath.getFileSystem( conf );
209
210      fs.create( markerPath ).close();
211      }
212    }
213
214  /**
215   * May only be called once. should only be called if not in a flow
216   *
217   * @param conf
218   */
219  public static void cleanupJob( Configuration conf ) throws IOException
220    {
221    if( HadoopUtil.isInflow( conf ) )
222      return;
223
224    Path outputPath = FileOutputFormat.getOutputPath( asJobConfInstance( conf ) );
225
226    cleanTempPath( conf, outputPath );
227    }
228
229  private static synchronized void cleanTempPath( Configuration conf, Path outputPath ) throws IOException
230    {
231    // do the clean up of temporary directory
232
233    if( outputPath != null )
234      {
235      FileSystem fileSys = getFSSafe( conf, outputPath );
236
237      if( fileSys == null )
238        return;
239
240      if( !fileSys.exists( outputPath ) )
241        return;
242
243      Path tmpDir = new Path( outputPath, TEMPORARY_PATH );
244
245      LOG.info( "deleting temp path {}", tmpDir );
246
247      if( fileSys.exists( tmpDir ) )
248        fileSys.delete( tmpDir, true );
249      }
250    }
251
252  private static FileSystem getFSSafe( Configuration conf, Path tmpDir )
253    {
254    try
255      {
256      return tmpDir.getFileSystem( conf );
257      }
258    catch( IOException e )
259      {
260      // ignore
261      }
262
263    return null;
264    }
265
266  private static Path getTaskOutputPath( Configuration conf )
267    {
268    String taskId = conf.get( "mapred.task.id", conf.get( "mapreduce.task.id" ) );
269
270    Path p = new Path( FileOutputFormat.getOutputPath( asJobConfInstance( conf ) ), TEMPORARY_PATH + Path.SEPARATOR + "_" + taskId );
271
272    try
273      {
274      FileSystem fs = p.getFileSystem( conf );
275      return p.makeQualified( fs );
276      }
277    catch( IOException ie )
278      {
279      return p;
280      }
281    }
282
283  static void setWorkOutputPath( Configuration conf, Path outputDir )
284    {
285    outputDir = new Path( asJobConfInstance( conf ).getWorkingDirectory(), outputDir );
286    conf.set( "mapred.work.output.dir", outputDir.toString() );
287    }
288
289  public static void makeTempPath( Configuration conf ) throws IOException
290    {
291    // create job specific temporary directory in output path
292    Path outputPath = FileOutputFormat.getOutputPath( asJobConfInstance( conf ) );
293
294    if( outputPath != null )
295      {
296      Path tmpDir = new Path( outputPath, TEMPORARY_PATH );
297      FileSystem fileSys = tmpDir.getFileSystem( conf );
298
299      if( !fileSys.exists( tmpDir ) && !fileSys.mkdirs( tmpDir ) )
300        LOG.error( "mkdirs failed to create {}", tmpDir );
301      }
302    }
303
304  private static void moveTaskOutputs( Configuration conf, FileSystem fs, Path jobOutputDir, Path taskOutput ) throws IOException
305    {
306    String taskId = conf.get( "mapred.task.id", conf.get( "mapreduce.task.id" ) );
307
308    if( fs.isFile( taskOutput ) )
309      {
310      Path finalOutputPath = getFinalPath( jobOutputDir, taskOutput, getTaskOutputPath( conf ) );
311      if( !fs.rename( taskOutput, finalOutputPath ) )
312        {
313        if( !fs.delete( finalOutputPath, true ) )
314          throw new IOException( "Failed to delete earlier output of task: " + taskId );
315
316        if( !fs.rename( taskOutput, finalOutputPath ) )
317          throw new IOException( "Failed to save output of task: " + taskId );
318        }
319
320      LOG.debug( "Moved {} to {}", taskOutput, finalOutputPath );
321      }
322    else if( fs.getFileStatus( taskOutput ).isDir() )
323      {
324      FileStatus[] paths = fs.listStatus( taskOutput );
325      Path finalOutputPath = getFinalPath( jobOutputDir, taskOutput, getTaskOutputPath( conf ) );
326      fs.mkdirs( finalOutputPath );
327      if( paths != null )
328        {
329        for( FileStatus path : paths )
330          moveTaskOutputs( conf, fs, jobOutputDir, path.getPath() );
331        }
332      }
333    }
334
335  private static Path getFinalPath( Path jobOutputDir, Path taskOutput, Path taskOutputPath ) throws IOException
336    {
337    URI taskOutputUri = taskOutput.toUri();
338    URI relativePath = taskOutputPath.toUri().relativize( taskOutputUri );
339    if( taskOutputUri == relativePath )
340      {//taskOutputPath is not a parent of taskOutput
341      throw new IOException( "Can not get the relative path: base = " + taskOutputPath + " child = " + taskOutput );
342      }
343    if( relativePath.getPath().length() > 0 )
344      {
345      return new Path( jobOutputDir, relativePath.getPath() );
346      }
347    else
348      {
349      return jobOutputDir;
350      }
351    }
352
353  /** used in AWS EMR to disable temp paths on some file systems, s3. */
354  private static boolean writeDirectlyToWorkingPath( Configuration conf, Path path )
355    {
356    FileSystem fs = getFSSafe( conf, path );
357
358    if( fs == null )
359      return false;
360
361    boolean result = conf.getBoolean( "mapred.output.direct." + fs.getClass().getSimpleName(), false );
362
363    if( result )
364      LOG.info( "output direct is enabled for this fs: " + fs.getName() );
365
366    return result;
367    }
368  }