001 /* 002 * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.tap.hadoop.util; 022 023 import java.io.IOException; 024 import java.net.URI; 025 import java.util.HashMap; 026 import java.util.Map; 027 import java.util.concurrent.atomic.AtomicInteger; 028 029 import cascading.tap.Tap; 030 import org.apache.hadoop.fs.FileStatus; 031 import org.apache.hadoop.fs.FileSystem; 032 import org.apache.hadoop.fs.Path; 033 import org.apache.hadoop.mapred.FileOutputFormat; 034 import org.apache.hadoop.mapred.JobConf; 035 import org.slf4j.Logger; 036 import org.slf4j.LoggerFactory; 037 038 public class Hadoop18TapUtil 039 { 040 /** Field LOG */ 041 private static final Logger LOG = LoggerFactory.getLogger( Hadoop18TapUtil.class ); 042 043 /** The Hadoop temporary path used to prevent collisions */ 044 public static final String TEMPORARY_PATH = "_temporary"; 045 046 private static final Map<String, AtomicInteger> pathCounts = new HashMap<String, AtomicInteger>(); 047 048 /** 049 * should only be called if not in a Flow 050 * 051 * @param conf 052 * @throws IOException 053 */ 054 public static void setupJob( JobConf conf ) throws IOException 055 { 056 Path outputPath = FileOutputFormat.getOutputPath( conf ); 057 058 if( outputPath == null ) 059 return; 060 061 if( getFSSafe( conf, outputPath ) == null ) 062 return; 063 064 if( conf.get( "mapred.task.id" ) == null ) // need to stuff a fake id 065 { 066 String mapper = conf.getBoolean( "mapred.task.is.map", true ) ? "m" : "r"; 067 conf.set( "mapred.task.id", String.format( "attempt_%012d_0000_%s_000000_0", (int) Math.rint( System.currentTimeMillis() ), mapper ) ); 068 } 069 070 makeTempPath( conf ); 071 072 if( writeDirectlyToWorkingPath( conf, outputPath ) ) 073 { 074 LOG.info( "writing directly to output path: {}", outputPath ); 075 setWorkOutputPath( conf, outputPath ); 076 return; 077 } 078 079 // "mapred.work.output.dir" 080 Path taskOutputPath = getTaskOutputPath( conf ); 081 setWorkOutputPath( conf, taskOutputPath ); 082 } 083 084 public static synchronized void setupTask( JobConf conf ) throws IOException 085 { 086 String workpath = conf.get( "mapred.work.output.dir" ); 087 088 if( workpath == null ) 089 return; 090 091 FileSystem fs = getFSSafe( conf, new Path( workpath ) ); 092 093 if( fs == null ) 094 return; 095 096 String taskId = conf.get( "mapred.task.id" ); 097 098 LOG.info( "setting up task: '{}' - {}", taskId, workpath ); 099 100 AtomicInteger integer = pathCounts.get( workpath ); 101 102 if( integer == null ) 103 { 104 integer = new AtomicInteger(); 105 pathCounts.put( workpath, integer ); 106 } 107 108 integer.incrementAndGet(); 109 } 110 111 public static boolean needsTaskCommit( JobConf conf ) throws IOException 112 { 113 String workpath = conf.get( "mapred.work.output.dir" ); 114 115 if( workpath == null ) 116 return false; 117 118 Path taskOutputPath = new Path( workpath ); 119 120 if( taskOutputPath != null ) 121 { 122 FileSystem fs = getFSSafe( conf, taskOutputPath ); 123 124 if( fs == null ) 125 return false; 126 127 if( fs.exists( taskOutputPath ) ) 128 return true; 129 } 130 131 return false; 132 } 133 134 /** 135 * copies all files from the taskoutputpath to the outputpath 136 * 137 * @param conf 138 */ 139 public static void commitTask( JobConf conf ) throws IOException 140 { 141 Path taskOutputPath = new Path( conf.get( "mapred.work.output.dir" ) ); 142 143 FileSystem fs = getFSSafe( conf, taskOutputPath ); 144 145 if( fs == null ) 146 return; 147 148 AtomicInteger integer = pathCounts.get( taskOutputPath.toString() ); 149 150 if( integer.decrementAndGet() != 0 ) 151 return; 152 153 String taskId = conf.get( "mapred.task.id" ); 154 155 LOG.info( "committing task: '{}' - {}", taskId, taskOutputPath ); 156 157 if( taskOutputPath != null ) 158 { 159 if( writeDirectlyToWorkingPath( conf, taskOutputPath ) ) 160 return; 161 162 if( fs.exists( taskOutputPath ) ) 163 { 164 Path jobOutputPath = taskOutputPath.getParent().getParent(); 165 // Move the task outputs to their final place 166 moveTaskOutputs( conf, fs, jobOutputPath, taskOutputPath ); 167 168 // Delete the temporary task-specific output directory 169 if( !fs.delete( taskOutputPath, true ) ) 170 LOG.info( "failed to delete the temporary output directory of task: '{}' - {}", taskId, taskOutputPath ); 171 172 LOG.info( "saved output of task '{}' to {}", taskId, jobOutputPath ); 173 } 174 } 175 } 176 177 /** 178 * Called from flow step to remove temp dirs 179 * 180 * @param conf 181 * @throws IOException 182 */ 183 public static void cleanupTapMetaData( JobConf conf, Tap tap ) throws IOException 184 { 185 cleanTempPath( conf, new Path( tap.getIdentifier() ) ); 186 } 187 188 /** 189 * May only be called once. should only be called if not in a flow 190 * 191 * @param conf 192 */ 193 public static void cleanupJob( JobConf conf ) throws IOException 194 { 195 if( isInflow( conf ) ) 196 return; 197 198 Path outputPath = FileOutputFormat.getOutputPath( conf ); 199 200 cleanTempPath( conf, outputPath ); 201 } 202 203 private static synchronized void cleanTempPath( JobConf conf, Path outputPath ) throws IOException 204 { 205 // do the clean up of temporary directory 206 207 if( outputPath != null ) 208 { 209 FileSystem fileSys = getFSSafe( conf, outputPath ); 210 211 if( fileSys == null ) 212 return; 213 214 if( !fileSys.exists( outputPath ) ) 215 return; 216 217 Path tmpDir = new Path( outputPath, TEMPORARY_PATH ); 218 219 LOG.info( "deleting temp path {}", tmpDir ); 220 221 if( fileSys.exists( tmpDir ) ) 222 fileSys.delete( tmpDir, true ); 223 } 224 } 225 226 private static FileSystem getFSSafe( JobConf conf, Path tmpDir ) 227 { 228 try 229 { 230 return tmpDir.getFileSystem( conf ); 231 } 232 catch( IOException e ) 233 { 234 // ignore 235 } 236 237 return null; 238 } 239 240 static boolean isInflow( JobConf conf ) 241 { 242 return conf.get( "cascading.flow.step" ) != null || conf.get( "cascading.flow.step.path" ) != null; 243 } 244 245 private static Path getTaskOutputPath( JobConf conf ) 246 { 247 String taskId = conf.get( "mapred.task.id" ); 248 249 Path p = new Path( FileOutputFormat.getOutputPath( conf ), TEMPORARY_PATH + Path.SEPARATOR + "_" + taskId ); 250 251 try 252 { 253 FileSystem fs = p.getFileSystem( conf ); 254 return p.makeQualified( fs ); 255 } 256 catch( IOException ie ) 257 { 258 return p; 259 } 260 } 261 262 static void setWorkOutputPath( JobConf conf, Path outputDir ) 263 { 264 outputDir = new Path( conf.getWorkingDirectory(), outputDir ); 265 conf.set( "mapred.work.output.dir", outputDir.toString() ); 266 } 267 268 public static void makeTempPath( JobConf conf ) throws IOException 269 { 270 // create job specific temporary directory in output path 271 Path outputPath = FileOutputFormat.getOutputPath( conf ); 272 273 if( outputPath != null ) 274 { 275 Path tmpDir = new Path( outputPath, TEMPORARY_PATH ); 276 FileSystem fileSys = tmpDir.getFileSystem( conf ); 277 278 if( !fileSys.exists( tmpDir ) && !fileSys.mkdirs( tmpDir ) ) 279 LOG.error( "mkdirs failed to create {}", tmpDir ); 280 } 281 } 282 283 private static void moveTaskOutputs( JobConf conf, FileSystem fs, Path jobOutputDir, Path taskOutput ) throws IOException 284 { 285 String taskId = conf.get( "mapred.task.id" ); 286 287 if( fs.isFile( taskOutput ) ) 288 { 289 Path finalOutputPath = getFinalPath( jobOutputDir, taskOutput, getTaskOutputPath( conf ) ); 290 if( !fs.rename( taskOutput, finalOutputPath ) ) 291 { 292 if( !fs.delete( finalOutputPath, true ) ) 293 { 294 throw new IOException( "Failed to delete earlier output of task: " + taskId ); 295 } 296 if( !fs.rename( taskOutput, finalOutputPath ) ) 297 { 298 throw new IOException( "Failed to save output of task: " + taskId ); 299 } 300 } 301 302 LOG.debug( "Moved {} to {}", taskOutput, finalOutputPath ); 303 } 304 else if( fs.getFileStatus( taskOutput ).isDir() ) 305 { 306 FileStatus[] paths = fs.listStatus( taskOutput ); 307 Path finalOutputPath = getFinalPath( jobOutputDir, taskOutput, getTaskOutputPath( conf ) ); 308 fs.mkdirs( finalOutputPath ); 309 if( paths != null ) 310 { 311 for( FileStatus path : paths ) 312 moveTaskOutputs( conf, fs, jobOutputDir, path.getPath() ); 313 } 314 } 315 } 316 317 private static Path getFinalPath( Path jobOutputDir, Path taskOutput, Path taskOutputPath ) throws IOException 318 { 319 URI taskOutputUri = taskOutput.toUri(); 320 URI relativePath = taskOutputPath.toUri().relativize( taskOutputUri ); 321 if( taskOutputUri == relativePath ) 322 {//taskOutputPath is not a parent of taskOutput 323 throw new IOException( "Can not get the relative path: base = " + taskOutputPath + " child = " + taskOutput ); 324 } 325 if( relativePath.getPath().length() > 0 ) 326 { 327 return new Path( jobOutputDir, relativePath.getPath() ); 328 } 329 else 330 { 331 return jobOutputDir; 332 } 333 } 334 335 336 /** used in AWS EMR to disable temp paths on some file systems, s3. */ 337 private static boolean writeDirectlyToWorkingPath( JobConf conf, Path path ) 338 { 339 FileSystem fs = getFSSafe( conf, path ); 340 341 if( fs == null ) 342 return false; 343 344 boolean result = conf.getBoolean( "mapred.output.direct." + fs.getClass().getSimpleName(), false ); 345 346 if( result ) 347 LOG.info( "output direct is enabled for this fs: " + fs.getName() ); 348 349 return result; 350 } 351 352 }