001 /* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.tap.hadoop; 022 023 import java.beans.ConstructorProperties; 024 import java.io.IOException; 025 import java.net.URI; 026 import java.net.URISyntaxException; 027 import java.util.HashSet; 028 import java.util.LinkedHashSet; 029 import java.util.Map; 030 import java.util.Set; 031 032 import cascading.flow.FlowProcess; 033 import cascading.flow.hadoop.util.HadoopUtil; 034 import cascading.scheme.Scheme; 035 import cascading.scheme.hadoop.SequenceFile; 036 import cascading.tap.SinkMode; 037 import cascading.tap.Tap; 038 import cascading.tap.TapException; 039 import cascading.tap.hadoop.io.CombineFileRecordReaderWrapper; 040 import cascading.tap.hadoop.io.HadoopTupleEntrySchemeCollector; 041 import cascading.tap.hadoop.io.HadoopTupleEntrySchemeIterator; 042 import cascading.tap.type.FileType; 043 import cascading.tuple.Fields; 044 import cascading.tuple.TupleEntryCollector; 045 import cascading.tuple.TupleEntryIterator; 046 import cascading.tuple.hadoop.TupleSerialization; 047 import cascading.util.Util; 048 import org.apache.hadoop.conf.Configurable; 049 import org.apache.hadoop.conf.Configuration; 050 import org.apache.hadoop.fs.FileStatus; 051 import org.apache.hadoop.fs.FileSystem; 052 import org.apache.hadoop.fs.Path; 053 import org.apache.hadoop.fs.PathFilter; 054 import org.apache.hadoop.fs.s3native.NativeS3FileSystem; 055 import org.apache.hadoop.mapred.FileInputFormat; 056 import org.apache.hadoop.mapred.FileOutputFormat; 057 import org.apache.hadoop.mapred.InputSplit; 058 import org.apache.hadoop.mapred.JobConf; 059 import org.apache.hadoop.mapred.OutputCollector; 060 import org.apache.hadoop.mapred.RecordReader; 061 import org.apache.hadoop.mapred.Reporter; 062 import org.apache.hadoop.mapred.lib.CombineFileInputFormat; 063 import org.apache.hadoop.mapred.lib.CombineFileRecordReader; 064 import org.apache.hadoop.mapred.lib.CombineFileSplit; 065 import org.slf4j.Logger; 066 import org.slf4j.LoggerFactory; 067 068 /** 069 * Class Hfs is the base class for all Hadoop file system access. Hfs may only be used with the 070 * {@link cascading.flow.hadoop.HadoopFlowConnector} when creating Hadoop executable {@link cascading.flow.Flow} 071 * instances. 072 * <p/> 073 * Paths typically should point to a directory, where in turn all the "part" files immediately in that directory will 074 * be included. This is the practice Hadoop expects. Sub-directories are not included and typically result in a failure. 075 * <p/> 076 * To include sub-directories, Hadoop supports "globing". Globing is a frustrating feature and is supported more 077 * robustly by {@link GlobHfs} and less so by Hfs. 078 * <p/> 079 * Hfs will accept {@code /*} (wildcard) paths, but not all convenience methods like 080 * {@link #getSize(org.apache.hadoop.mapred.JobConf)} will behave properly or reliably. Nor can the Hfs instance 081 * with a wildcard path be used as a sink to write data. 082 * <p/> 083 * In those cases use GlobHfs since it is a sub-class of {@link cascading.tap.MultiSourceTap}. 084 * <p/> 085 * Optionally use {@link Dfs} or {@link Lfs} for resources specific to Hadoop Distributed file system or 086 * the Local file system, respectively. Using Hfs is the best practice when possible, Lfs and Dfs are conveniences. 087 * <p/> 088 * Use the Hfs class if the 'kind' of resource is unknown at design time. To use, prefix a scheme to the 'stringPath'. Where 089 * <code>hdfs://...</code> will denote Dfs, and <code>file://...</code> will denote Lfs. 090 * <p/> 091 * Call {@link #setTemporaryDirectory(java.util.Map, String)} to use a different temporary file directory path 092 * other than the current Hadoop default path. 093 * <p/> 094 * By default Cascading on Hadoop will assume any source or sink Tap using the {@code file://} URI scheme 095 * intends to read files from the local client filesystem (for example when using the {@code Lfs} Tap) where the Hadoop 096 * job jar is started, Tap so will force any MapReduce jobs reading or writing to {@code file://} resources to run in 097 * Hadoop "standalone mode" so that the file can be read. 098 * <p/> 099 * To change this behavior, {@link HfsProps#setLocalModeScheme(java.util.Map, String)} to set a different scheme value, 100 * or to "none" to disable entirely for the case the file to be read is available on every Hadoop processing node 101 * in the exact same path. 102 * <p/> 103 * Hfs can optionally combine multiple small files (or a series of small "blocks") into larger "splits". This reduces 104 * the number of resulting map tasks created by Hadoop and can improve application performance. 105 * <p/> 106 * This is enabled by calling {@link HfsProps#setUseCombinedInput(boolean)} to {@code true}. By default, merging 107 * or combining splits into large ones is disabled. 108 */ 109 public class Hfs extends Tap<JobConf, RecordReader, OutputCollector> implements FileType<JobConf> 110 { 111 /** Field LOG */ 112 private static final Logger LOG = LoggerFactory.getLogger( Hfs.class ); 113 114 /** 115 * Field TEMPORARY_DIRECTORY 116 * 117 * @deprecated see {@link HfsProps#TEMPORARY_DIRECTORY} 118 */ 119 @Deprecated 120 public static final String TEMPORARY_DIRECTORY = HfsProps.TEMPORARY_DIRECTORY; 121 122 /** Field stringPath */ 123 protected String stringPath; 124 /** Field uriScheme */ 125 transient URI uriScheme; 126 /** Field path */ 127 transient Path path; 128 /** Field paths */ 129 private transient FileStatus[] statuses; // only used by getModifiedTime 130 131 private static final PathFilter HIDDEN_FILES_FILTER = new PathFilter() 132 { 133 public boolean accept( Path path ) 134 { 135 String name = path.getName(); 136 137 if( name.isEmpty() ) // should never happen 138 return true; 139 140 char first = name.charAt( 0 ); 141 142 return first != '_' && first != '.'; 143 } 144 }; 145 146 /** 147 * Method setTemporaryDirectory sets the temporary directory on the given properties object. 148 * 149 * @param properties of type Map<Object,Object> 150 * @param tempDir of type String 151 * @deprecated see {@link HfsProps} 152 */ 153 @Deprecated 154 public static void setTemporaryDirectory( Map<Object, Object> properties, String tempDir ) 155 { 156 properties.put( HfsProps.TEMPORARY_DIRECTORY, tempDir ); 157 } 158 159 /** 160 * Method getTemporaryDirectory returns the configured temporary directory from the given properties object. 161 * 162 * @param properties of type Map<Object,Object> 163 * @return a String or null if not set 164 * @deprecated see {@link HfsProps} 165 */ 166 @Deprecated 167 public static String getTemporaryDirectory( Map<Object, Object> properties ) 168 { 169 return (String) properties.get( HfsProps.TEMPORARY_DIRECTORY ); 170 } 171 172 protected static String getLocalModeScheme( JobConf conf, String defaultValue ) 173 { 174 return conf.get( HfsProps.LOCAL_MODE_SCHEME, defaultValue ); 175 } 176 177 protected static boolean getUseCombinedInput( JobConf conf ) 178 { 179 return conf.getBoolean( HfsProps.COMBINE_INPUT_FILES, false ); 180 } 181 182 protected static boolean getCombinedInputSafeMode( JobConf conf ) 183 { 184 return conf.getBoolean( HfsProps.COMBINE_INPUT_FILES_SAFE_MODE, true ); 185 } 186 187 protected Hfs() 188 { 189 } 190 191 @ConstructorProperties( {"scheme"} ) 192 protected Hfs( Scheme<JobConf, RecordReader, OutputCollector, ?, ?> scheme ) 193 { 194 super( scheme ); 195 } 196 197 /** 198 * Constructor Hfs creates a new Hfs instance. 199 * 200 * @param fields of type Fields 201 * @param stringPath of type String 202 */ 203 @Deprecated 204 @ConstructorProperties( {"fields", "stringPath"} ) 205 public Hfs( Fields fields, String stringPath ) 206 { 207 super( new SequenceFile( fields ) ); 208 setStringPath( stringPath ); 209 } 210 211 /** 212 * Constructor Hfs creates a new Hfs instance. 213 * 214 * @param fields of type Fields 215 * @param stringPath of type String 216 * @param replace of type boolean 217 */ 218 @Deprecated 219 @ConstructorProperties( {"fields", "stringPath", "replace"} ) 220 public Hfs( Fields fields, String stringPath, boolean replace ) 221 { 222 super( new SequenceFile( fields ), replace ? SinkMode.REPLACE : SinkMode.KEEP ); 223 setStringPath( stringPath ); 224 } 225 226 /** 227 * Constructor Hfs creates a new Hfs instance. 228 * 229 * @param fields of type Fields 230 * @param stringPath of type String 231 * @param sinkMode of type SinkMode 232 */ 233 @Deprecated 234 @ConstructorProperties( {"fields", "stringPath", "sinkMode"} ) 235 public Hfs( Fields fields, String stringPath, SinkMode sinkMode ) 236 { 237 super( new SequenceFile( fields ), sinkMode ); 238 setStringPath( stringPath ); 239 240 if( sinkMode == SinkMode.UPDATE ) 241 throw new IllegalArgumentException( "updates are not supported" ); 242 } 243 244 /** 245 * Constructor Hfs creates a new Hfs instance. 246 * 247 * @param scheme of type Scheme 248 * @param stringPath of type String 249 */ 250 @ConstructorProperties( {"scheme", "stringPath"} ) 251 public Hfs( Scheme<JobConf, RecordReader, OutputCollector, ?, ?> scheme, String stringPath ) 252 { 253 super( scheme ); 254 setStringPath( stringPath ); 255 } 256 257 /** 258 * Constructor Hfs creates a new Hfs instance. 259 * 260 * @param scheme of type Scheme 261 * @param stringPath of type String 262 * @param replace of type boolean 263 */ 264 @Deprecated 265 @ConstructorProperties( {"scheme", "stringPath", "replace"} ) 266 public Hfs( Scheme<JobConf, RecordReader, OutputCollector, ?, ?> scheme, String stringPath, boolean replace ) 267 { 268 super( scheme, replace ? SinkMode.REPLACE : SinkMode.KEEP ); 269 setStringPath( stringPath ); 270 } 271 272 /** 273 * Constructor Hfs creates a new Hfs instance. 274 * 275 * @param scheme of type Scheme 276 * @param stringPath of type String 277 * @param sinkMode of type SinkMode 278 */ 279 @ConstructorProperties( {"scheme", "stringPath", "sinkMode"} ) 280 public Hfs( Scheme<JobConf, RecordReader, OutputCollector, ?, ?> scheme, String stringPath, SinkMode sinkMode ) 281 { 282 super( scheme, sinkMode ); 283 setStringPath( stringPath ); 284 } 285 286 protected void setStringPath( String stringPath ) 287 { 288 this.stringPath = Util.normalizeUrl( stringPath ); 289 } 290 291 protected void setUriScheme( URI uriScheme ) 292 { 293 this.uriScheme = uriScheme; 294 } 295 296 public URI getURIScheme( JobConf jobConf ) 297 { 298 if( uriScheme != null ) 299 return uriScheme; 300 301 uriScheme = makeURIScheme( jobConf ); 302 303 return uriScheme; 304 } 305 306 protected URI makeURIScheme( JobConf jobConf ) 307 { 308 try 309 { 310 URI uriScheme; 311 312 LOG.debug( "handling path: {}", stringPath ); 313 314 URI uri = new Path( stringPath ).toUri(); // safer URI parsing 315 String schemeString = uri.getScheme(); 316 String authority = uri.getAuthority(); 317 318 LOG.debug( "found scheme: {}, authority: {}", schemeString, authority ); 319 320 if( schemeString != null && authority != null ) 321 uriScheme = new URI( schemeString + "://" + uri.getAuthority() ); 322 else if( schemeString != null ) 323 uriScheme = new URI( schemeString + ":///" ); 324 else 325 uriScheme = getDefaultFileSystemURIScheme( jobConf ); 326 327 LOG.debug( "using uri scheme: {}", uriScheme ); 328 329 return uriScheme; 330 } 331 catch( URISyntaxException exception ) 332 { 333 throw new TapException( "could not determine scheme from path: " + getPath(), exception ); 334 } 335 } 336 337 /** 338 * Method getDefaultFileSystemURIScheme returns the URI scheme for the default Hadoop FileSystem. 339 * 340 * @param jobConf of type JobConf 341 * @return URI 342 */ 343 public URI getDefaultFileSystemURIScheme( JobConf jobConf ) 344 { 345 return getDefaultFileSystem( jobConf ).getUri(); 346 } 347 348 protected FileSystem getDefaultFileSystem( JobConf jobConf ) 349 { 350 try 351 { 352 return FileSystem.get( jobConf ); 353 } 354 catch( IOException exception ) 355 { 356 throw new TapException( "unable to get handle to underlying filesystem", exception ); 357 } 358 } 359 360 protected FileSystem getFileSystem( JobConf jobConf ) 361 { 362 URI scheme = getURIScheme( jobConf ); 363 364 try 365 { 366 return FileSystem.get( scheme, jobConf ); 367 } 368 catch( IOException exception ) 369 { 370 throw new TapException( "unable to get handle to get filesystem for: " + scheme.getScheme(), exception ); 371 } 372 } 373 374 @Override 375 public String getIdentifier() 376 { 377 return getPath().toString(); 378 } 379 380 public Path getPath() 381 { 382 if( path != null ) 383 return path; 384 385 if( stringPath == null ) 386 throw new IllegalStateException( "path not initialized" ); 387 388 path = new Path( stringPath ); 389 390 return path; 391 } 392 393 @Override 394 public String getFullIdentifier( JobConf conf ) 395 { 396 return getPath().makeQualified( getFileSystem( conf ) ).toString(); 397 } 398 399 @Override 400 public void sourceConfInit( FlowProcess<JobConf> process, JobConf conf ) 401 { 402 String fullIdentifier = getFullIdentifier( conf ); 403 404 applySourceConfInitIdentifiers( process, conf, fullIdentifier ); 405 406 verifyNoDuplicates( conf ); 407 } 408 409 protected static void verifyNoDuplicates( JobConf conf ) 410 { 411 Path[] inputPaths = FileInputFormat.getInputPaths( conf ); 412 Set<Path> paths = new HashSet<Path>( (int) ( inputPaths.length / .75f ) ); 413 414 for( Path inputPath : inputPaths ) 415 { 416 if( !paths.add( inputPath ) ) 417 throw new TapException( "may not add duplicate paths, found: " + inputPath ); 418 } 419 } 420 421 protected void applySourceConfInitIdentifiers( FlowProcess<JobConf> process, JobConf conf, String... fullIdentifiers ) 422 { 423 for( String fullIdentifier : fullIdentifiers ) 424 sourceConfInitAddInputPath( conf, new Path( fullIdentifier ) ); 425 426 sourceConfInitComplete( process, conf ); 427 } 428 429 protected void sourceConfInitAddInputPath( JobConf conf, Path qualifiedPath ) 430 { 431 FileInputFormat.addInputPath( conf, qualifiedPath ); 432 433 makeLocal( conf, qualifiedPath, "forcing job to local mode, via source: " ); 434 } 435 436 protected void sourceConfInitComplete( FlowProcess<JobConf> process, JobConf conf ) 437 { 438 super.sourceConfInit( process, conf ); 439 440 TupleSerialization.setSerializations( conf ); // allows Hfs to be used independent of Flow 441 442 // use CombineFileInputFormat if that is enabled 443 handleCombineFileInputFormat( conf ); 444 } 445 446 /** 447 * Based on the configuration, handles and sets {@link CombineFileInputFormat} as the input 448 * format. 449 */ 450 private void handleCombineFileInputFormat( JobConf conf ) 451 { 452 // if combining files, override the configuration to use CombineFileInputFormat 453 if( !getUseCombinedInput( conf ) ) 454 return; 455 456 // get the prescribed individual input format from the underlying scheme so it can be used by CombinedInputFormat 457 String individualInputFormat = conf.get( "mapred.input.format.class" ); 458 459 if( individualInputFormat == null ) 460 throw new TapException( "input format is missing from the underlying scheme" ); 461 462 if( individualInputFormat.equals( CombinedInputFormat.class.getName() ) && 463 conf.get( CombineFileRecordReaderWrapper.INDIVIDUAL_INPUT_FORMAT ) == null ) 464 throw new TapException( "the input format class is already the combined input format but the underlying input format is missing" ); 465 466 // if safe mode is on (default) throw an exception if the InputFormat is not a FileInputFormat, otherwise log a 467 // warning and don't use the CombineFileInputFormat 468 boolean safeMode = getCombinedInputSafeMode( conf ); 469 470 if( !FileInputFormat.class.isAssignableFrom( conf.getClass( "mapred.input.format.class", null ) ) ) 471 { 472 if( safeMode ) 473 throw new TapException( "input format must be of type org.apache.hadoop.mapred.FileInputFormat, got: " + individualInputFormat ); 474 else 475 LOG.warn( "not combining input splits with CombineFileInputFormat, {} is not of type org.apache.hadoop.mapred.FileInputFormat.", individualInputFormat ); 476 } 477 else 478 { 479 // set the underlying individual input format 480 conf.set( CombineFileRecordReaderWrapper.INDIVIDUAL_INPUT_FORMAT, individualInputFormat ); 481 482 // override the input format class 483 conf.setInputFormat( CombinedInputFormat.class ); 484 } 485 } 486 487 @Override 488 public void sinkConfInit( FlowProcess<JobConf> process, JobConf conf ) 489 { 490 Path qualifiedPath = new Path( getFullIdentifier( conf ) ); 491 492 FileOutputFormat.setOutputPath( conf, qualifiedPath ); 493 super.sinkConfInit( process, conf ); 494 495 makeLocal( conf, qualifiedPath, "forcing job to local mode, via sink: " ); 496 497 TupleSerialization.setSerializations( conf ); // allows Hfs to be used independent of Flow 498 } 499 500 private void makeLocal( JobConf conf, Path qualifiedPath, String infoMessage ) 501 { 502 String scheme = getLocalModeScheme( conf, "file" ); 503 504 if( !HadoopUtil.isLocal( conf ) && qualifiedPath.toUri().getScheme().equalsIgnoreCase( scheme ) ) 505 { 506 if( LOG.isInfoEnabled() ) 507 LOG.info( infoMessage + toString() ); 508 509 HadoopUtil.setLocal( conf ); // force job to run locally 510 } 511 } 512 513 @Override 514 public TupleEntryIterator openForRead( FlowProcess<JobConf> flowProcess, RecordReader input ) throws IOException 515 { 516 // input may be null when this method is called on the client side or cluster side when accumulating 517 // for a HashJoin 518 return new HadoopTupleEntrySchemeIterator( flowProcess, this, input ); 519 } 520 521 @Override 522 public TupleEntryCollector openForWrite( FlowProcess<JobConf> flowProcess, OutputCollector output ) throws IOException 523 { 524 // output may be null when this method is called on the client side or cluster side when creating 525 // side files with the TemplateTap 526 return new HadoopTupleEntrySchemeCollector( flowProcess, this, output ); 527 } 528 529 @Override 530 public boolean createResource( JobConf conf ) throws IOException 531 { 532 if( LOG.isDebugEnabled() ) 533 LOG.debug( "making dirs: {}", getFullIdentifier( conf ) ); 534 535 return getFileSystem( conf ).mkdirs( getPath() ); 536 } 537 538 @Override 539 public boolean deleteResource( JobConf conf ) throws IOException 540 { 541 String fullIdentifier = getFullIdentifier( conf ); 542 543 return deleteFullIdentifier( conf, fullIdentifier ); 544 } 545 546 private boolean deleteFullIdentifier( JobConf conf, String fullIdentifier ) throws IOException 547 { 548 if( LOG.isDebugEnabled() ) 549 LOG.debug( "deleting: {}", fullIdentifier ); 550 551 Path fullPath = new Path( fullIdentifier ); 552 553 // do not delete the root directory 554 if( fullPath.depth() == 0 ) 555 return true; 556 557 FileSystem fileSystem = getFileSystem( conf ); 558 559 try 560 { 561 return fileSystem.delete( fullPath, true ); 562 } 563 catch( NullPointerException exception ) 564 { 565 // hack to get around npe thrown when fs reaches root directory 566 if( !( fileSystem instanceof NativeS3FileSystem ) ) 567 throw exception; 568 } 569 570 return true; 571 } 572 573 public boolean deleteChildResource( JobConf conf, String childIdentifier ) throws IOException 574 { 575 Path childPath = new Path( childIdentifier ).makeQualified( getFileSystem( conf ) ); 576 577 if( !childPath.toString().startsWith( getFullIdentifier( conf ) ) ) 578 return false; 579 580 return deleteFullIdentifier( conf, childPath.toString() ); 581 } 582 583 @Override 584 public boolean resourceExists( JobConf conf ) throws IOException 585 { 586 // unfortunately getFileSystem( conf ).exists( getPath() ); does not account for "/*" etc 587 // nor is there an more efficient means to test for existence 588 FileStatus[] fileStatuses = getFileSystem( conf ).globStatus( getPath() ); 589 590 return fileStatuses != null && fileStatuses.length > 0; 591 } 592 593 @Override 594 public boolean isDirectory( JobConf conf ) throws IOException 595 { 596 if( !resourceExists( conf ) ) 597 return false; 598 599 return getFileSystem( conf ).getFileStatus( getPath() ).isDir(); 600 } 601 602 @Override 603 public long getSize( JobConf conf ) throws IOException 604 { 605 if( !resourceExists( conf ) ) 606 return 0; 607 608 FileStatus fileStatus = getFileSystem( conf ).getFileStatus( getPath() ); 609 610 if( fileStatus.isDir() ) 611 return 0; 612 613 return getFileSystem( conf ).getFileStatus( getPath() ).getLen(); 614 } 615 616 /** 617 * Method getBlockSize returns the {@code blocksize} specified by the underlying file system for this resource. 618 * 619 * @param conf of JobConf 620 * @return long 621 * @throws IOException when 622 */ 623 public long getBlockSize( JobConf conf ) throws IOException 624 { 625 if( !resourceExists( conf ) ) 626 return 0; 627 628 FileStatus fileStatus = getFileSystem( conf ).getFileStatus( getPath() ); 629 630 if( fileStatus.isDir() ) 631 return 0; 632 633 return fileStatus.getBlockSize(); 634 } 635 636 /** 637 * Method getReplication returns the {@code replication} specified by the underlying file system for 638 * this resource. 639 * 640 * @param conf of JobConf 641 * @return int 642 * @throws IOException when 643 */ 644 public int getReplication( JobConf conf ) throws IOException 645 { 646 if( !resourceExists( conf ) ) 647 return 0; 648 649 FileStatus fileStatus = getFileSystem( conf ).getFileStatus( getPath() ); 650 651 if( fileStatus.isDir() ) 652 return 0; 653 654 return fileStatus.getReplication(); 655 } 656 657 @Override 658 public String[] getChildIdentifiers( JobConf conf ) throws IOException 659 { 660 return getChildIdentifiers( conf, 1, false ); 661 } 662 663 @Override 664 public String[] getChildIdentifiers( JobConf conf, int depth, boolean fullyQualified ) throws IOException 665 { 666 if( !resourceExists( conf ) ) 667 return new String[ 0 ]; 668 669 if( depth == 0 && !fullyQualified ) 670 return new String[]{getIdentifier()}; 671 672 String fullIdentifier = getFullIdentifier( conf ); 673 674 int trim = fullyQualified ? 0 : fullIdentifier.length() + 1; 675 676 Set<String> results = new LinkedHashSet<String>(); 677 678 getChildPaths( conf, results, trim, new Path( fullIdentifier ), depth ); 679 680 return results.toArray( new String[ results.size() ] ); 681 } 682 683 private void getChildPaths( JobConf conf, Set<String> results, int trim, Path path, int depth ) throws IOException 684 { 685 if( depth == 0 ) 686 { 687 String substring = path.toString().substring( trim ); 688 String identifier = getIdentifier(); 689 690 if( identifier == null || identifier.isEmpty() ) 691 results.add( new Path( substring ).toString() ); 692 else 693 results.add( new Path( identifier, substring ).toString() ); 694 695 return; 696 } 697 698 FileStatus[] statuses = getFileSystem( conf ).listStatus( path, HIDDEN_FILES_FILTER ); 699 700 if( statuses == null ) 701 return; 702 703 for( FileStatus fileStatus : statuses ) 704 getChildPaths( conf, results, trim, fileStatus.getPath(), depth - 1 ); 705 } 706 707 @Override 708 public long getModifiedTime( JobConf conf ) throws IOException 709 { 710 if( !resourceExists( conf ) ) 711 return 0; 712 713 FileStatus fileStatus = getFileSystem( conf ).getFileStatus( getPath() ); 714 715 if( !fileStatus.isDir() ) 716 return fileStatus.getModificationTime(); 717 718 // todo: this should ignore the _temporary path, or not cache if found in the array 719 makeStatuses( conf ); 720 721 // statuses is empty, return 0 722 if( statuses == null || statuses.length == 0 ) 723 return 0; 724 725 long date = 0; 726 727 // filter out directories as we don't recurs into sub dirs 728 for( FileStatus status : statuses ) 729 { 730 if( !status.isDir() ) 731 date = Math.max( date, status.getModificationTime() ); 732 } 733 734 return date; 735 } 736 737 public static Path getTempPath( JobConf conf ) 738 { 739 String tempDir = conf.get( HfsProps.TEMPORARY_DIRECTORY ); 740 741 if( tempDir == null ) 742 tempDir = conf.get( "hadoop.tmp.dir" ); 743 744 return new Path( tempDir ); 745 } 746 747 protected String makeTemporaryPathDirString( String name ) 748 { 749 // _ is treated as a hidden file, so wipe them out 750 name = name.replaceAll( "^[_\\W\\s]+", "" ); 751 752 if( name.isEmpty() ) 753 name = "temp-path"; 754 755 return name.replaceAll( "[\\W\\s]+", "_" ) + Util.createUniqueID(); 756 } 757 758 /** 759 * Given a file-system object, it makes an array of paths 760 * 761 * @param conf of type JobConf 762 * @throws IOException on failure 763 */ 764 private void makeStatuses( JobConf conf ) throws IOException 765 { 766 if( statuses != null ) 767 return; 768 769 statuses = getFileSystem( conf ).listStatus( getPath() ); 770 } 771 772 /** Combined input format that uses the underlying individual input format to combine multiple files into a single split. */ 773 static class CombinedInputFormat extends CombineFileInputFormat implements Configurable 774 { 775 private Configuration conf; 776 777 public RecordReader getRecordReader( InputSplit split, JobConf job, Reporter reporter ) throws IOException 778 { 779 return new CombineFileRecordReader( job, (CombineFileSplit) split, reporter, CombineFileRecordReaderWrapper.class ); 780 } 781 782 @Override 783 public void setConf( Configuration conf ) 784 { 785 this.conf = conf; 786 787 // set the aliased property value, if zero, the super class will look up the hadoop property 788 setMaxSplitSize( conf.getLong( HfsProps.COMBINE_INPUT_FILES_SIZE_MAX, 0 ) ); 789 } 790 791 @Override 792 public Configuration getConf() 793 { 794 return conf; 795 } 796 } 797 }