001/* 002 * Copyright (c) 2016-2018 Chris K Wensel <chris@wensel.net>. All Rights Reserved. 003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 004 * 005 * Project and contact information: http://www.cascading.org/ 006 * 007 * This file is part of the Cascading project. 008 * 009 * Licensed under the Apache License, Version 2.0 (the "License"); 010 * you may not use this file except in compliance with the License. 011 * You may obtain a copy of the License at 012 * 013 * http://www.apache.org/licenses/LICENSE-2.0 014 * 015 * Unless required by applicable law or agreed to in writing, software 016 * distributed under the License is distributed on an "AS IS" BASIS, 017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 018 * See the License for the specific language governing permissions and 019 * limitations under the License. 020 */ 021 022package cascading.tap.hadoop; 023 024import java.beans.ConstructorProperties; 025import java.io.IOException; 026import java.net.URI; 027import java.net.URISyntaxException; 028import java.util.HashSet; 029import java.util.LinkedHashSet; 030import java.util.Set; 031 032import cascading.flow.FlowProcess; 033import cascading.flow.FlowRuntimeProps; 034import cascading.flow.hadoop.util.HadoopUtil; 035import cascading.scheme.Scheme; 036import cascading.tap.SinkMode; 037import cascading.tap.Tap; 038import cascading.tap.TapException; 039import cascading.tap.hadoop.io.CombineFileRecordReaderWrapper; 040import cascading.tap.hadoop.io.HadoopTupleEntrySchemeCollector; 041import cascading.tap.hadoop.io.HadoopTupleEntrySchemeIterator; 042import cascading.tap.type.FileType; 043import cascading.tuple.TupleEntryCollector; 044import cascading.tuple.TupleEntryIterator; 045import cascading.tuple.hadoop.TupleSerialization; 046import cascading.util.LazyIterable; 047import cascading.util.Util; 048import org.apache.hadoop.conf.Configurable; 049import org.apache.hadoop.conf.Configuration; 050import org.apache.hadoop.fs.FileStatus; 051import org.apache.hadoop.fs.FileSystem; 052import org.apache.hadoop.fs.Path; 053import org.apache.hadoop.fs.PathFilter; 054import org.apache.hadoop.mapred.FileInputFormat; 055import org.apache.hadoop.mapred.InputFormat; 056import org.apache.hadoop.mapred.InputSplit; 057import org.apache.hadoop.mapred.JobConf; 058import org.apache.hadoop.mapred.OutputCollector; 059import org.apache.hadoop.mapred.RecordReader; 060import org.apache.hadoop.mapred.Reporter; 061import org.apache.hadoop.mapred.lib.CombineFileInputFormat; 062import org.apache.hadoop.mapred.lib.CombineFileRecordReader; 063import org.apache.hadoop.mapred.lib.CombineFileSplit; 064import org.slf4j.Logger; 065import org.slf4j.LoggerFactory; 066 067/** 068 * Class Hfs is the base class for all Hadoop file system access. Hfs may only be used with the 069 * Hadoop {@link cascading.flow.FlowConnector} sub-classes when creating Hadoop executable {@link cascading.flow.Flow} 070 * instances. 071 * <p/> 072 * Paths typically should point to a directory, where in turn all the "part" files immediately in that directory will 073 * be included. This is the practice Hadoop expects. Sub-directories are not included and typically result in a failure. 074 * <p/> 075 * To include sub-directories, Hadoop supports "globing". Globing is a frustrating feature and is supported more 076 * robustly by {@link GlobHfs} and less so by Hfs. 077 * <p/> 078 * Hfs will accept {@code /*} (wildcard) paths, but not all convenience methods like 079 * {@code jobConf.getSize} will behave properly or reliably. Nor can the Hfs instance 080 * with a wildcard path be used as a sink to write data. 081 * <p/> 082 * In those cases use GlobHfs since it is a sub-class of {@link cascading.tap.MultiSourceTap}. 083 * <p/> 084 * Optionally use {@link Dfs} or {@link Lfs} for resources specific to Hadoop Distributed file system or 085 * the Local file system, respectively. Using Hfs is the best practice when possible, Lfs and Dfs are conveniences. 086 * <p/> 087 * Use the Hfs class if the 'kind' of resource is unknown at design time. To use, prefix a scheme to the 'stringPath'. Where 088 * <code>hdfs://...</code> will denote Dfs, and <code>file://...</code> will denote Lfs. 089 * <p/> 090 * Call {@link HfsProps#setTemporaryDirectory(java.util.Map, String)} to use a different temporary file directory path 091 * other than the current Hadoop default path. 092 * <p/> 093 * By default Cascading on Hadoop will assume any source or sink Tap using the {@code file://} URI scheme 094 * intends to read files from the local client filesystem (for example when using the {@code Lfs} Tap) where the Hadoop 095 * job jar is started. Subsequently Cascading will force any MapReduce jobs reading or writing to {@code file://} resources 096 * to run in Hadoop "standalone mode" so that the file can be read. 097 * <p/> 098 * To change this behavior, {@link HfsProps#setLocalModeScheme(java.util.Map, String)} to set a different scheme value, 099 * or to "none" to disable entirely for the case the file to be read is available on every Hadoop processing node 100 * in the exact same path. 101 * <p/> 102 * When using a MapReduce planner, Hfs can optionally combine multiple small files (or a series of small "blocks") into 103 * larger "splits". This reduces the number of resulting map tasks created by Hadoop and can improve application 104 * performance. 105 * <p/> 106 * This is enabled by calling {@link HfsProps#setUseCombinedInput(boolean)} to {@code true}. By default, merging 107 * or combining splits into large ones is disabled. 108 * <p/> 109 * Apache Tez planner does not require this setting, it is supported by default and enabled by the application manager. 110 */ 111public class Hfs extends Tap<Configuration, RecordReader, OutputCollector> implements FileType<Configuration> 112 { 113 /** Field LOG */ 114 private static final Logger LOG = LoggerFactory.getLogger( Hfs.class ); 115 116 /** Field stringPath */ 117 protected String stringPath; 118 /** Field uriScheme */ 119 transient URI uriScheme; 120 /** Field path */ 121 transient Path path; 122 /** Field paths */ 123 private transient FileStatus[] statuses; // only used by getModifiedTime 124 125 private transient String cachedPath = null; 126 127 private static final PathFilter HIDDEN_FILES_FILTER = new PathFilter() 128 { 129 public boolean accept( Path path ) 130 { 131 String name = path.getName(); 132 133 if( name.isEmpty() ) // should never happen 134 return true; 135 136 char first = name.charAt( 0 ); 137 138 return first != '_' && first != '.'; 139 } 140 }; 141 142 protected static String getLocalModeScheme( Configuration conf, String defaultValue ) 143 { 144 return conf.get( HfsProps.LOCAL_MODE_SCHEME, defaultValue ); 145 } 146 147 protected static boolean getUseCombinedInput( Configuration conf ) 148 { 149 boolean combineEnabled = conf.getBoolean( "cascading.hadoop.hfs.combine.files", false ); 150 151 if( conf.get( FlowRuntimeProps.COMBINE_SPLITS ) == null && !combineEnabled ) 152 return false; 153 154 if( !combineEnabled ) // enable if set in FlowRuntimeProps 155 combineEnabled = conf.getBoolean( FlowRuntimeProps.COMBINE_SPLITS, false ); 156 157 String platform = conf.get( "cascading.flow.platform", "" ); 158 159 // only supported by these platforms 160 if( platform.equals( "hadoop" ) || platform.equals( "hadoop2-mr1" ) ) 161 return combineEnabled; 162 163 // we are on a platform that supports combining, just not through the combiner 164 // do not enable it here locally 165 if( conf.get( FlowRuntimeProps.COMBINE_SPLITS ) != null ) 166 return false; 167 168 if( combineEnabled && !Boolean.getBoolean( "cascading.hadoop.hfs.combine.files.warned" ) ) 169 { 170 LOG.warn( "'cascading.hadoop.hfs.combine.files' has been set to true, but is unsupported by this platform: {}, will be ignored to prevent failures", platform ); 171 System.setProperty( "cascading.hadoop.hfs.combine.files.warned", "true" ); 172 } 173 174 return false; 175 } 176 177 protected static boolean getCombinedInputSafeMode( Configuration conf ) 178 { 179 return conf.getBoolean( "cascading.hadoop.hfs.combine.safemode", true ); 180 } 181 182 protected Hfs() 183 { 184 } 185 186 @ConstructorProperties({"scheme"}) 187 protected Hfs( Scheme<Configuration, RecordReader, OutputCollector, ?, ?> scheme ) 188 { 189 super( scheme ); 190 } 191 192 /** 193 * Constructor Hfs creates a new Hfs instance. 194 * 195 * @param scheme of type Scheme 196 * @param stringPath of type String 197 */ 198 @ConstructorProperties({"scheme", "stringPath"}) 199 public Hfs( Scheme<Configuration, RecordReader, OutputCollector, ?, ?> scheme, String stringPath ) 200 { 201 super( scheme ); 202 setStringPath( stringPath ); 203 } 204 205 /** 206 * Constructor Hfs creates a new Hfs instance. 207 * 208 * @param scheme of type Scheme 209 * @param stringPath of type String 210 * @param sinkMode of type SinkMode 211 */ 212 @ConstructorProperties({"scheme", "stringPath", "sinkMode"}) 213 public Hfs( Scheme<Configuration, RecordReader, OutputCollector, ?, ?> scheme, String stringPath, SinkMode sinkMode ) 214 { 215 super( scheme, sinkMode ); 216 setStringPath( stringPath ); 217 } 218 219 protected void setStringPath( String stringPath ) 220 { 221 this.stringPath = Util.normalizeUrl( stringPath ); 222 } 223 224 protected void setUriScheme( URI uriScheme ) 225 { 226 this.uriScheme = uriScheme; 227 } 228 229 public URI getURIScheme( Configuration jobConf ) 230 { 231 if( uriScheme != null ) 232 return uriScheme; 233 234 uriScheme = makeURIScheme( jobConf ); 235 236 return uriScheme; 237 } 238 239 protected URI makeURIScheme( Configuration configuration ) 240 { 241 try 242 { 243 URI uriScheme; 244 245 LOG.debug( "handling path: {}", stringPath ); 246 247 URI uri = new Path( stringPath ).toUri(); // safer URI parsing 248 String schemeString = uri.getScheme(); 249 String authority = uri.getAuthority(); 250 251 LOG.debug( "found scheme: {}, authority: {}", schemeString, authority ); 252 253 if( schemeString != null && authority != null ) 254 uriScheme = new URI( schemeString + "://" + uri.getAuthority() ); 255 else if( schemeString != null ) 256 uriScheme = new URI( schemeString + ":///" ); 257 else 258 uriScheme = getDefaultFileSystemURIScheme( configuration ); 259 260 LOG.debug( "using uri scheme: {}", uriScheme ); 261 262 return uriScheme; 263 } 264 catch( URISyntaxException exception ) 265 { 266 throw new TapException( "could not determine scheme from path: " + getPath(), exception ); 267 } 268 } 269 270 /** 271 * Method getDefaultFileSystemURIScheme returns the URI scheme for the default Hadoop FileSystem. 272 * 273 * @param configuration of type JobConf 274 * @return URI 275 */ 276 public URI getDefaultFileSystemURIScheme( Configuration configuration ) 277 { 278 return getDefaultFileSystem( configuration ).getUri(); 279 } 280 281 protected FileSystem getDefaultFileSystem( Configuration configuration ) 282 { 283 try 284 { 285 return FileSystem.get( configuration ); 286 } 287 catch( IOException exception ) 288 { 289 throw new TapException( "unable to get handle to underlying filesystem", exception ); 290 } 291 } 292 293 protected FileSystem getFileSystem( Configuration configuration ) 294 { 295 URI scheme = getURIScheme( configuration ); 296 297 try 298 { 299 return FileSystem.get( scheme, configuration ); 300 } 301 catch( IOException exception ) 302 { 303 throw new TapException( "unable to get handle to get filesystem for: " + scheme.getScheme(), exception ); 304 } 305 } 306 307 @Override 308 public String getIdentifier() 309 { 310 if( cachedPath == null ) 311 cachedPath = getPath().toString(); 312 313 return cachedPath; 314 } 315 316 public Path getPath() 317 { 318 if( path != null ) 319 return path; 320 321 if( stringPath == null ) 322 throw new IllegalStateException( "path not initialized" ); 323 324 path = new Path( stringPath ); 325 326 return path; 327 } 328 329 @Override 330 public String getFullIdentifier( Configuration conf ) 331 { 332 return getPath().makeQualified( getFileSystem( conf ) ).toString(); 333 } 334 335 @Override 336 public void sourceConfInit( FlowProcess<? extends Configuration> process, Configuration conf ) 337 { 338 String fullIdentifier = getFullIdentifier( conf ); 339 340 applySourceConfInitIdentifiers( process, conf, fullIdentifier ); 341 342 verifyNoDuplicates( conf ); 343 } 344 345 protected static void verifyNoDuplicates( Configuration conf ) 346 { 347 Path[] inputPaths = FileInputFormat.getInputPaths( HadoopUtil.asJobConfInstance( conf ) ); 348 Set<Path> paths = new HashSet<Path>( (int) ( inputPaths.length / .75f ) ); 349 350 for( Path inputPath : inputPaths ) 351 { 352 if( !paths.add( inputPath ) ) 353 throw new TapException( "may not add duplicate paths, found: " + inputPath ); 354 } 355 } 356 357 protected void applySourceConfInitIdentifiers( FlowProcess<? extends Configuration> process, Configuration conf, final String... fullIdentifiers ) 358 { 359 sourceConfInitAddInputPaths( conf, new LazyIterable<String, Path>( fullIdentifiers ) 360 { 361 @Override 362 protected Path convert( String next ) 363 { 364 return new Path( next ); 365 } 366 } ); 367 368 sourceConfInitComplete( process, conf ); 369 } 370 371 protected void sourceConfInitAddInputPaths( Configuration conf, Iterable<Path> qualifiedPaths ) 372 { 373 HadoopUtil.addInputPaths( conf, qualifiedPaths ); 374 375 for( Path qualifiedPath : qualifiedPaths ) 376 { 377 boolean stop = !makeLocal( conf, qualifiedPath, "forcing job to stand-alone mode, via source: " ); 378 379 if( stop ) 380 break; 381 } 382 } 383 384 @Deprecated 385 protected void sourceConfInitAddInputPath( Configuration conf, Path qualifiedPath ) 386 { 387 HadoopUtil.addInputPath( conf, qualifiedPath ); 388 389 makeLocal( conf, qualifiedPath, "forcing job to stand-alone mode, via source: " ); 390 } 391 392 protected void sourceConfInitComplete( FlowProcess<? extends Configuration> process, Configuration conf ) 393 { 394 super.sourceConfInit( process, conf ); 395 396 TupleSerialization.setSerializations( conf ); // allows Hfs to be used independent of Flow 397 398 // use CombineFileInputFormat if that is enabled 399 handleCombineFileInputFormat( conf ); 400 } 401 402 /** 403 * Based on the configuration, handles and sets {@link CombineFileInputFormat} as the input 404 * format. 405 */ 406 private void handleCombineFileInputFormat( Configuration conf ) 407 { 408 // if combining files, override the configuration to use CombineFileInputFormat 409 if( !getUseCombinedInput( conf ) ) 410 return; 411 412 // get the prescribed individual input format from the underlying scheme so it can be used by CombinedInputFormat 413 String individualInputFormat = conf.get( "mapred.input.format.class" ); 414 415 if( individualInputFormat == null ) 416 throw new TapException( "input format is missing from the underlying scheme" ); 417 418 if( individualInputFormat.equals( CombinedInputFormat.class.getName() ) && 419 conf.get( CombineFileRecordReaderWrapper.INDIVIDUAL_INPUT_FORMAT ) == null ) 420 throw new TapException( "the input format class is already the combined input format but the underlying input format is missing" ); 421 422 // if safe mode is on (default) throw an exception if the InputFormat is not a FileInputFormat, otherwise log a 423 // warning and don't use the CombineFileInputFormat 424 boolean safeMode = getCombinedInputSafeMode( conf ); 425 426 if( !FileInputFormat.class.isAssignableFrom( conf.getClass( "mapred.input.format.class", null ) ) ) 427 { 428 if( safeMode ) 429 throw new TapException( "input format must be of type org.apache.hadoop.mapred.FileInputFormat, got: " + individualInputFormat ); 430 else 431 LOG.warn( "not combining input splits with CombineFileInputFormat, {} is not of type org.apache.hadoop.mapred.FileInputFormat.", individualInputFormat ); 432 } 433 else 434 { 435 // set the underlying individual input format 436 conf.set( CombineFileRecordReaderWrapper.INDIVIDUAL_INPUT_FORMAT, individualInputFormat ); 437 438 // override the input format class 439 conf.setClass( "mapred.input.format.class", CombinedInputFormat.class, InputFormat.class ); 440 } 441 } 442 443 @Override 444 public void sinkConfInit( FlowProcess<? extends Configuration> process, Configuration conf ) 445 { 446 Path qualifiedPath = new Path( getFullIdentifier( conf ) ); 447 448 HadoopUtil.setOutputPath( conf, qualifiedPath ); 449 super.sinkConfInit( process, conf ); 450 451 makeLocal( conf, qualifiedPath, "forcing job to stand-alone mode, via sink: " ); 452 453 TupleSerialization.setSerializations( conf ); // allows Hfs to be used independent of Flow 454 } 455 456 private boolean makeLocal( Configuration conf, Path qualifiedPath, String infoMessage ) 457 { 458 // don't change the conf or log any messages if running cluster side 459 if( HadoopUtil.isInflow( conf ) ) 460 return false; 461 462 String scheme = getLocalModeScheme( conf, "file" ); 463 464 if( !HadoopUtil.isLocal( conf ) && qualifiedPath.toUri().getScheme().equalsIgnoreCase( scheme ) ) 465 { 466 if( LOG.isInfoEnabled() ) 467 LOG.info( infoMessage + toString() ); 468 469 HadoopUtil.setLocal( conf ); // force job to run locally 470 471 return false; // only need to set local once 472 } 473 474 return true; 475 } 476 477 @Override 478 public TupleEntryIterator openForRead( FlowProcess<? extends Configuration> flowProcess, RecordReader input ) throws IOException 479 { 480 // input may be null when this method is called on the client side or cluster side when accumulating 481 // for a HashJoin 482 return new HadoopTupleEntrySchemeIterator( flowProcess, this, input ); 483 } 484 485 @Override 486 public TupleEntryCollector openForWrite( FlowProcess<? extends Configuration> flowProcess, OutputCollector output ) throws IOException 487 { 488 resetFileStatuses(); 489 490 // output may be null when this method is called on the client side or cluster side when creating 491 // side files with the PartitionTap 492 return new HadoopTupleEntrySchemeCollector( flowProcess, this, output ); 493 } 494 495 @Override 496 public boolean createResource( Configuration conf ) throws IOException 497 { 498 if( LOG.isDebugEnabled() ) 499 LOG.debug( "making dirs: {}", getFullIdentifier( conf ) ); 500 501 return getFileSystem( conf ).mkdirs( getPath() ); 502 } 503 504 @Override 505 public boolean deleteResource( Configuration conf ) throws IOException 506 { 507 String fullIdentifier = getFullIdentifier( conf ); 508 509 return deleteFullIdentifier( conf, fullIdentifier ); 510 } 511 512 private boolean deleteFullIdentifier( Configuration conf, String fullIdentifier ) throws IOException 513 { 514 if( LOG.isDebugEnabled() ) 515 LOG.debug( "deleting: {}", fullIdentifier ); 516 517 resetFileStatuses(); 518 519 Path fullPath = new Path( fullIdentifier ); 520 521 // do not delete the root directory 522 if( fullPath.depth() == 0 ) 523 return true; 524 525 FileSystem fileSystem = getFileSystem( conf ); 526 527 try 528 { 529 return fileSystem.delete( fullPath, true ); 530 } 531 catch( NullPointerException exception ) 532 { 533 // hack to get around npe thrown when fs reaches root directory 534 // removes coupling to the new aws hadoop artifacts that may not be deployed 535 if( !( fileSystem.getClass().getSimpleName().equals( "NativeS3FileSystem" ) ) ) 536 throw exception; 537 } 538 539 return true; 540 } 541 542 public boolean deleteChildResource( FlowProcess<? extends Configuration> flowProcess, String childIdentifier ) throws IOException 543 { 544 return deleteChildResource( flowProcess.getConfig(), childIdentifier ); 545 } 546 547 public boolean deleteChildResource( Configuration conf, String childIdentifier ) throws IOException 548 { 549 resetFileStatuses(); 550 551 Path childPath = new Path( childIdentifier ).makeQualified( getFileSystem( conf ) ); 552 553 if( !childPath.toString().startsWith( getFullIdentifier( conf ) ) ) 554 return false; 555 556 return deleteFullIdentifier( conf, childPath.toString() ); 557 } 558 559 @Override 560 public boolean resourceExists( Configuration conf ) throws IOException 561 { 562 // unfortunately getFileSystem( conf ).exists( getPath() ); does not account for "/*" etc 563 // nor is there an more efficient means to test for existence 564 FileStatus[] fileStatuses = getFileSystem( conf ).globStatus( getPath() ); 565 566 return fileStatuses != null && fileStatuses.length > 0; 567 } 568 569 @Override 570 public boolean isDirectory( FlowProcess<? extends Configuration> flowProcess ) throws IOException 571 { 572 return isDirectory( flowProcess.getConfig() ); 573 } 574 575 @Override 576 public boolean isDirectory( Configuration conf ) throws IOException 577 { 578 if( !resourceExists( conf ) ) 579 return false; 580 581 return getFileSystem( conf ).getFileStatus( getPath() ).isDir(); 582 } 583 584 @Override 585 public long getSize( FlowProcess<? extends Configuration> flowProcess ) throws IOException 586 { 587 return getSize( flowProcess.getConfig() ); 588 } 589 590 @Override 591 public long getSize( Configuration conf ) throws IOException 592 { 593 if( !resourceExists( conf ) ) 594 return 0; 595 596 FileStatus fileStatus = getFileStatus( conf ); 597 598 if( fileStatus.isDir() ) 599 return 0; 600 601 return getFileSystem( conf ).getFileStatus( getPath() ).getLen(); 602 } 603 604 /** 605 * Method getBlockSize returns the {@code blocksize} specified by the underlying file system for this resource. 606 * 607 * @param flowProcess 608 * @return long 609 * @throws IOException when 610 */ 611 public long getBlockSize( FlowProcess<? extends Configuration> flowProcess ) throws IOException 612 { 613 return getBlockSize( flowProcess.getConfig() ); 614 } 615 616 /** 617 * Method getBlockSize returns the {@code blocksize} specified by the underlying file system for this resource. 618 * 619 * @param conf of JobConf 620 * @return long 621 * @throws IOException when 622 */ 623 public long getBlockSize( Configuration conf ) throws IOException 624 { 625 if( !resourceExists( conf ) ) 626 return 0; 627 628 FileStatus fileStatus = getFileStatus( conf ); 629 630 if( fileStatus.isDir() ) 631 return 0; 632 633 return fileStatus.getBlockSize(); 634 } 635 636 /** 637 * Method getReplication returns the {@code replication} specified by the underlying file system for 638 * this resource. 639 * 640 * @param flowProcess 641 * @return int 642 * @throws IOException when 643 */ 644 public int getReplication( FlowProcess<? extends Configuration> flowProcess ) throws IOException 645 { 646 return getReplication( flowProcess.getConfig() ); 647 } 648 649 /** 650 * Method getReplication returns the {@code replication} specified by the underlying file system for 651 * this resource. 652 * 653 * @param conf of JobConf 654 * @return int 655 * @throws IOException when 656 */ 657 public int getReplication( Configuration conf ) throws IOException 658 { 659 if( !resourceExists( conf ) ) 660 return 0; 661 662 FileStatus fileStatus = getFileStatus( conf ); 663 664 if( fileStatus.isDir() ) 665 return 0; 666 667 return fileStatus.getReplication(); 668 } 669 670 @Override 671 public String[] getChildIdentifiers( FlowProcess<? extends Configuration> flowProcess ) throws IOException 672 { 673 return getChildIdentifiers( flowProcess.getConfig(), 1, false ); 674 } 675 676 @Override 677 public String[] getChildIdentifiers( Configuration conf ) throws IOException 678 { 679 return getChildIdentifiers( conf, 1, false ); 680 } 681 682 @Override 683 public String[] getChildIdentifiers( FlowProcess<? extends Configuration> flowProcess, int depth, boolean fullyQualified ) throws IOException 684 { 685 return getChildIdentifiers( flowProcess.getConfig(), depth, fullyQualified ); 686 } 687 688 @Override 689 public String[] getChildIdentifiers( Configuration conf, int depth, boolean fullyQualified ) throws IOException 690 { 691 if( !resourceExists( conf ) ) 692 return new String[ 0 ]; 693 694 if( depth == 0 && !fullyQualified ) 695 return new String[]{getIdentifier()}; 696 697 String fullIdentifier = getFullIdentifier( conf ); 698 699 int trim = fullyQualified ? 0 : fullIdentifier.length() + 1; 700 701 Set<String> results = new LinkedHashSet<String>(); 702 703 getChildPaths( conf, results, trim, new Path( fullIdentifier ), depth ); 704 705 return results.toArray( new String[ results.size() ] ); 706 } 707 708 private void getChildPaths( Configuration conf, Set<String> results, int trim, Path path, int depth ) throws IOException 709 { 710 if( depth == 0 ) 711 { 712 String substring = path.toString().substring( trim ); 713 String identifier = getIdentifier(); 714 715 if( identifier == null || identifier.isEmpty() ) 716 results.add( new Path( substring ).toString() ); 717 else 718 results.add( new Path( identifier, substring ).toString() ); 719 720 return; 721 } 722 723 FileStatus[] statuses = getFileSystem( conf ).listStatus( path, HIDDEN_FILES_FILTER ); 724 725 if( statuses == null ) 726 return; 727 728 for( FileStatus fileStatus : statuses ) 729 getChildPaths( conf, results, trim, fileStatus.getPath(), depth - 1 ); 730 } 731 732 @Override 733 public long getModifiedTime( Configuration conf ) throws IOException 734 { 735 if( !resourceExists( conf ) ) 736 return 0; 737 738 FileStatus fileStatus = getFileStatus( conf ); 739 740 if( !fileStatus.isDir() ) 741 return fileStatus.getModificationTime(); 742 743 // todo: this should ignore the _temporary path, or not cache if found in the array 744 makeStatuses( conf ); 745 746 // statuses is empty, return 0 747 if( statuses == null || statuses.length == 0 ) 748 return 0; 749 750 long date = 0; 751 752 // filter out directories as we don't recurs into sub dirs 753 for( FileStatus status : statuses ) 754 { 755 if( !status.isDir() ) 756 date = Math.max( date, status.getModificationTime() ); 757 } 758 759 return date; 760 } 761 762 public FileStatus getFileStatus( Configuration conf ) throws IOException 763 { 764 return getFileSystem( conf ).getFileStatus( getPath() ); 765 } 766 767 public static Path getTempPath( Configuration conf ) 768 { 769 String tempDir = conf.get( HfsProps.TEMPORARY_DIRECTORY ); 770 771 if( tempDir == null ) 772 tempDir = conf.get( "hadoop.tmp.dir" ); 773 774 return new Path( tempDir ); 775 } 776 777 protected String makeTemporaryPathDirString( String name ) 778 { 779 // _ is treated as a hidden file, so wipe them out 780 name = name.replaceAll( "^[_\\W\\s]+", "" ); 781 782 if( name.isEmpty() ) 783 name = "temp-path"; 784 785 return name.replaceAll( "[\\W\\s]+", "_" ) + Util.createUniqueID(); 786 } 787 788 /** 789 * Given a file-system object, it makes an array of paths 790 * 791 * @param conf of type JobConf 792 * @throws IOException on failure 793 */ 794 private void makeStatuses( Configuration conf ) throws IOException 795 { 796 if( statuses != null ) 797 return; 798 799 statuses = getFileSystem( conf ).listStatus( getPath() ); 800 } 801 802 /** 803 * Method resetFileStatuses removes the status cache, if any. 804 */ 805 public void resetFileStatuses() 806 { 807 statuses = null; 808 } 809 810 /** Combined input format that uses the underlying individual input format to combine multiple files into a single split. */ 811 static class CombinedInputFormat extends CombineFileInputFormat implements Configurable 812 { 813 private Configuration conf; 814 815 public RecordReader getRecordReader( InputSplit split, JobConf job, Reporter reporter ) throws IOException 816 { 817 return new CombineFileRecordReader( job, (CombineFileSplit) split, reporter, CombineFileRecordReaderWrapper.class ); 818 } 819 820 @Override 821 public void setConf( Configuration conf ) 822 { 823 this.conf = conf; 824 825 // set the aliased property value, if zero, the super class will look up the hadoop property 826 setMaxSplitSize( conf.getLong( "cascading.hadoop.hfs.combine.max.size", 0 ) ); 827 } 828 829 @Override 830 public Configuration getConf() 831 { 832 return conf; 833 } 834 } 835 }