001/* 002 * Copyright (c) 2016-2018 Chris K Wensel <chris@wensel.net>. All Rights Reserved. 003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 004 * 005 * Project and contact information: http://www.cascading.org/ 006 * 007 * This file is part of the Cascading project. 008 * 009 * Licensed under the Apache License, Version 2.0 (the "License"); 010 * you may not use this file except in compliance with the License. 011 * You may obtain a copy of the License at 012 * 013 * http://www.apache.org/licenses/LICENSE-2.0 014 * 015 * Unless required by applicable law or agreed to in writing, software 016 * distributed under the License is distributed on an "AS IS" BASIS, 017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 018 * See the License for the specific language governing permissions and 019 * limitations under the License. 020 */ 021 022package cascading.tap.hadoop; 023 024import java.beans.ConstructorProperties; 025import java.io.IOException; 026import java.net.URI; 027import java.net.URISyntaxException; 028import java.util.HashSet; 029import java.util.LinkedHashSet; 030import java.util.Set; 031 032import cascading.CascadingException; 033import cascading.flow.FlowProcess; 034import cascading.flow.FlowRuntimeProps; 035import cascading.flow.hadoop.util.HadoopUtil; 036import cascading.scheme.Scheme; 037import cascading.tap.SinkMode; 038import cascading.tap.Tap; 039import cascading.tap.TapException; 040import cascading.tap.hadoop.io.CombineFileRecordReaderWrapper; 041import cascading.tap.hadoop.io.HadoopTupleEntrySchemeCollector; 042import cascading.tap.hadoop.io.HadoopTupleEntrySchemeIterator; 043import cascading.tap.type.FileType; 044import cascading.tap.type.TapWith; 045import cascading.tuple.TupleEntryCollector; 046import cascading.tuple.TupleEntryIterator; 047import cascading.tuple.hadoop.TupleSerialization; 048import cascading.util.LazyIterable; 049import cascading.util.Util; 050import org.apache.hadoop.conf.Configurable; 051import org.apache.hadoop.conf.Configuration; 052import org.apache.hadoop.fs.FileStatus; 053import org.apache.hadoop.fs.FileSystem; 054import org.apache.hadoop.fs.Path; 055import org.apache.hadoop.fs.PathFilter; 056import org.apache.hadoop.mapred.FileInputFormat; 057import org.apache.hadoop.mapred.InputFormat; 058import org.apache.hadoop.mapred.InputSplit; 059import org.apache.hadoop.mapred.JobConf; 060import org.apache.hadoop.mapred.OutputCollector; 061import org.apache.hadoop.mapred.RecordReader; 062import org.apache.hadoop.mapred.Reporter; 063import org.apache.hadoop.mapred.lib.CombineFileInputFormat; 064import org.apache.hadoop.mapred.lib.CombineFileRecordReader; 065import org.apache.hadoop.mapred.lib.CombineFileSplit; 066import org.slf4j.Logger; 067import org.slf4j.LoggerFactory; 068 069/** 070 * Class Hfs is the base class for all Hadoop file system access. Hfs may only be used with the 071 * Hadoop {@link cascading.flow.FlowConnector} sub-classes when creating Hadoop executable {@link cascading.flow.Flow} 072 * instances. 073 * <p> 074 * Paths typically should point to a directory, where in turn all the "part" files immediately in that directory will 075 * be included. This is the practice Hadoop expects. Sub-directories are not included and typically result in a failure. 076 * <p> 077 * To include sub-directories, Hadoop supports "globing". Globing is a frustrating feature and is supported more 078 * robustly by {@link GlobHfs} and less so by Hfs. 079 * <p> 080 * Hfs will accept {@code /*} (wildcard) paths, but not all convenience methods like 081 * {@code jobConf.getSize} will behave properly or reliably. Nor can the Hfs instance 082 * with a wildcard path be used as a sink to write data. 083 * <p> 084 * In those cases use GlobHfs since it is a sub-class of {@link cascading.tap.MultiSourceTap}. 085 * <p> 086 * Optionally use {@link Dfs} or {@link Lfs} for resources specific to Hadoop Distributed file system or 087 * the Local file system, respectively. Using Hfs is the best practice when possible, Lfs and Dfs are conveniences. 088 * <p> 089 * Use the Hfs class if the 'kind' of resource is unknown at design time. To use, prefix a scheme to the 'stringPath'. Where 090 * {@code hdfs://...} will denote Dfs, and {@code file://...} will denote Lfs. 091 * <p> 092 * Call {@link HfsProps#setTemporaryDirectory(java.util.Map, String)} to use a different temporary file directory path 093 * other than the current Hadoop default path. 094 * <p> 095 * By default Cascading on Hadoop will assume any source or sink Tap using the {@code file://} URI scheme 096 * intends to read files from the local client filesystem (for example when using the {@code Lfs} Tap) where the Hadoop 097 * job jar is started. Subsequently Cascading will force any MapReduce jobs reading or writing to {@code file://} resources 098 * to run in Hadoop "standalone mode" so that the file can be read. 099 * <p> 100 * To change this behavior, {@link HfsProps#setLocalModeScheme(java.util.Map, String)} to set a different scheme value, 101 * or to "none" to disable entirely for the case the file to be read is available on every Hadoop processing node 102 * in the exact same path. 103 * <p> 104 * When using a MapReduce planner, Hfs can optionally combine multiple small files (or a series of small "blocks") into 105 * larger "splits". This reduces the number of resulting map tasks created by Hadoop and can improve application 106 * performance. 107 * <p> 108 * This is enabled by calling {@link HfsProps#setUseCombinedInput(boolean)} to {@code true}. By default, merging 109 * or combining splits into large ones is disabled. 110 * <p> 111 * Apache Tez planner does not require this setting, it is supported by default and enabled by the application manager. 112 */ 113public class Hfs extends Tap<Configuration, RecordReader, OutputCollector> implements FileType<Configuration>, TapWith<Configuration, RecordReader, OutputCollector> 114 { 115 /** Field LOG */ 116 private static final Logger LOG = LoggerFactory.getLogger( Hfs.class ); 117 118 /** Field stringPath */ 119 protected String stringPath; 120 /** Field uriScheme */ 121 transient URI uriScheme; 122 /** Field path */ 123 transient Path path; 124 /** Field paths */ 125 private transient FileStatus[] statuses; // only used by getModifiedTime 126 127 private transient String cachedPath = null; 128 129 private static final PathFilter HIDDEN_FILES_FILTER = path -> 130 { 131 String name = path.getName(); 132 133 if( name.isEmpty() ) // should never happen 134 return true; 135 136 char first = name.charAt( 0 ); 137 138 return first != '_' && first != '.'; 139 }; 140 141 protected static String getLocalModeScheme( Configuration conf, String defaultValue ) 142 { 143 return conf.get( HfsProps.LOCAL_MODE_SCHEME, defaultValue ); 144 } 145 146 protected static boolean getUseCombinedInput( Configuration conf ) 147 { 148 boolean combineEnabled = conf.getBoolean( "cascading.hadoop.hfs.combine.files", false ); 149 150 if( conf.get( FlowRuntimeProps.COMBINE_SPLITS ) == null && !combineEnabled ) 151 return false; 152 153 if( !combineEnabled ) // enable if set in FlowRuntimeProps 154 combineEnabled = conf.getBoolean( FlowRuntimeProps.COMBINE_SPLITS, false ); 155 156 String platform = conf.get( "cascading.flow.platform", "" ); 157 158 // only supported by these platforms 159 if( platform.equals( "hadoop" ) || platform.equals( "hadoop2-mr1" ) ) 160 return combineEnabled; 161 162 // we are on a platform that supports combining, just not through the combiner 163 // do not enable it here locally 164 if( conf.get( FlowRuntimeProps.COMBINE_SPLITS ) != null ) 165 return false; 166 167 if( combineEnabled && !Boolean.getBoolean( "cascading.hadoop.hfs.combine.files.warned" ) ) 168 { 169 LOG.warn( "'cascading.hadoop.hfs.combine.files' has been set to true, but is unsupported by this platform: {}, will be ignored to prevent failures", platform ); 170 System.setProperty( "cascading.hadoop.hfs.combine.files.warned", "true" ); 171 } 172 173 return false; 174 } 175 176 protected static boolean getCombinedInputSafeMode( Configuration conf ) 177 { 178 return conf.getBoolean( "cascading.hadoop.hfs.combine.safemode", true ); 179 } 180 181 protected Hfs() 182 { 183 } 184 185 @ConstructorProperties({"scheme"}) 186 protected Hfs( Scheme<Configuration, RecordReader, OutputCollector, ?, ?> scheme ) 187 { 188 super( scheme ); 189 } 190 191 /** 192 * Constructor Hfs creates a new Hfs instance. 193 * 194 * @param scheme of type Scheme 195 * @param stringPath of type String 196 */ 197 @ConstructorProperties({"scheme", "stringPath"}) 198 public Hfs( Scheme<Configuration, RecordReader, OutputCollector, ?, ?> scheme, String stringPath ) 199 { 200 super( scheme ); 201 setStringPath( stringPath ); 202 } 203 204 /** 205 * Constructor Hfs creates a new Hfs instance. 206 * 207 * @param scheme of type Scheme 208 * @param stringPath of type String 209 * @param sinkMode of type SinkMode 210 */ 211 @ConstructorProperties({"scheme", "stringPath", "sinkMode"}) 212 public Hfs( Scheme<Configuration, RecordReader, OutputCollector, ?, ?> scheme, String stringPath, SinkMode sinkMode ) 213 { 214 super( scheme, sinkMode ); 215 setStringPath( stringPath ); 216 } 217 218 /** 219 * Constructor Hfs creates a new Hfs instance. 220 * 221 * @param scheme of type Scheme 222 * @param path of type Path 223 */ 224 @ConstructorProperties({"scheme", "path"}) 225 public Hfs( Scheme<Configuration, RecordReader, OutputCollector, ?, ?> scheme, Path path ) 226 { 227 super( scheme ); 228 setStringPath( path.toString() ); 229 } 230 231 /** 232 * Constructor Hfs creates a new Hfs instance. 233 * 234 * @param scheme of type Scheme 235 * @param path of type Path 236 * @param sinkMode of type SinkMode 237 */ 238 @ConstructorProperties({"scheme", "path", "sinkMode"}) 239 public Hfs( Scheme<Configuration, RecordReader, OutputCollector, ?, ?> scheme, Path path, SinkMode sinkMode ) 240 { 241 super( scheme, sinkMode ); 242 setStringPath( path.toString() ); 243 } 244 245 @Override 246 public TapWith<Configuration, RecordReader, OutputCollector> withChildIdentifier( String identifier ) 247 { 248 Path path = new Path( identifier ); 249 250 if( !path.toString().startsWith( getPath().toString() ) ) 251 path = new Path( getPath(), path ); 252 253 return create( getScheme(), path, getSinkMode() ); 254 } 255 256 @Override 257 public TapWith<Configuration, RecordReader, OutputCollector> withScheme( Scheme<Configuration, RecordReader, OutputCollector, ?, ?> scheme ) 258 { 259 return create( scheme, getPath(), getSinkMode() ); 260 } 261 262 @Override 263 public TapWith<Configuration, RecordReader, OutputCollector> withSinkMode( SinkMode sinkMode ) 264 { 265 return create( getScheme(), getPath(), sinkMode ); 266 } 267 268 protected TapWith<Configuration, RecordReader, OutputCollector> create( Scheme<Configuration, RecordReader, OutputCollector, ?, ?> scheme, Path path, SinkMode sinkMode ) 269 { 270 try 271 { 272 return Util.newInstance( getClass(), new Object[]{scheme, path, sinkMode} ); 273 } 274 catch( CascadingException exception ) 275 { 276 throw new TapException( "unable to create a new instance of: " + getClass().getName(), exception ); 277 } 278 } 279 280 protected void setStringPath( String stringPath ) 281 { 282 this.stringPath = Util.normalizeUrl( stringPath ); 283 } 284 285 protected void setUriScheme( URI uriScheme ) 286 { 287 this.uriScheme = uriScheme; 288 } 289 290 public URI getURIScheme( Configuration jobConf ) 291 { 292 if( uriScheme != null ) 293 return uriScheme; 294 295 uriScheme = makeURIScheme( jobConf ); 296 297 return uriScheme; 298 } 299 300 protected URI makeURIScheme( Configuration configuration ) 301 { 302 try 303 { 304 URI uriScheme; 305 306 LOG.debug( "handling path: {}", stringPath ); 307 308 URI uri = new Path( stringPath ).toUri(); // safer URI parsing 309 String schemeString = uri.getScheme(); 310 String authority = uri.getAuthority(); 311 312 LOG.debug( "found scheme: {}, authority: {}", schemeString, authority ); 313 314 if( schemeString != null && authority != null ) 315 uriScheme = new URI( schemeString + "://" + uri.getAuthority() ); 316 else if( schemeString != null ) 317 uriScheme = new URI( schemeString + ":///" ); 318 else 319 uriScheme = getDefaultFileSystemURIScheme( configuration ); 320 321 LOG.debug( "using uri scheme: {}", uriScheme ); 322 323 return uriScheme; 324 } 325 catch( URISyntaxException exception ) 326 { 327 throw new TapException( "could not determine scheme from path: " + getPath(), exception ); 328 } 329 } 330 331 /** 332 * Method getDefaultFileSystemURIScheme returns the URI scheme for the default Hadoop FileSystem. 333 * 334 * @param configuration of type JobConf 335 * @return URI 336 */ 337 public URI getDefaultFileSystemURIScheme( Configuration configuration ) 338 { 339 return getDefaultFileSystem( configuration ).getUri(); 340 } 341 342 protected FileSystem getDefaultFileSystem( Configuration configuration ) 343 { 344 try 345 { 346 return FileSystem.get( configuration ); 347 } 348 catch( IOException exception ) 349 { 350 throw new TapException( "unable to get handle to underlying filesystem", exception ); 351 } 352 } 353 354 protected FileSystem getFileSystem( Configuration configuration ) 355 { 356 URI scheme = getURIScheme( configuration ); 357 358 try 359 { 360 return FileSystem.get( scheme, configuration ); 361 } 362 catch( IOException exception ) 363 { 364 throw new TapException( "unable to get handle to get filesystem for: " + scheme.getScheme(), exception ); 365 } 366 } 367 368 @Override 369 public String getIdentifier() 370 { 371 if( cachedPath == null ) 372 cachedPath = getPath().toString(); 373 374 return cachedPath; 375 } 376 377 public Path getPath() 378 { 379 if( path != null ) 380 return path; 381 382 if( stringPath == null ) 383 throw new IllegalStateException( "path not initialized" ); 384 385 path = new Path( stringPath ); 386 387 return path; 388 } 389 390 @Override 391 public String getFullIdentifier( Configuration conf ) 392 { 393 return getPath().makeQualified( getFileSystem( conf ) ).toString(); 394 } 395 396 @Override 397 public void sourceConfInit( FlowProcess<? extends Configuration> process, Configuration conf ) 398 { 399 String fullIdentifier = getFullIdentifier( conf ); 400 401 applySourceConfInitIdentifiers( process, conf, fullIdentifier ); 402 403 verifyNoDuplicates( conf ); 404 } 405 406 protected static void verifyNoDuplicates( Configuration conf ) 407 { 408 Path[] inputPaths = FileInputFormat.getInputPaths( HadoopUtil.asJobConfInstance( conf ) ); 409 Set<Path> paths = new HashSet<Path>( (int) ( inputPaths.length / .75f ) ); 410 411 for( Path inputPath : inputPaths ) 412 { 413 if( !paths.add( inputPath ) ) 414 throw new TapException( "may not add duplicate paths, found: " + inputPath ); 415 } 416 } 417 418 protected void applySourceConfInitIdentifiers( FlowProcess<? extends Configuration> process, Configuration conf, final String... fullIdentifiers ) 419 { 420 sourceConfInitAddInputPaths( conf, new LazyIterable<String, Path>( fullIdentifiers ) 421 { 422 @Override 423 protected Path convert( String next ) 424 { 425 return new Path( next ); 426 } 427 } ); 428 429 sourceConfInitComplete( process, conf ); 430 } 431 432 protected void sourceConfInitAddInputPaths( Configuration conf, Iterable<Path> qualifiedPaths ) 433 { 434 HadoopUtil.addInputPaths( conf, qualifiedPaths ); 435 436 for( Path qualifiedPath : qualifiedPaths ) 437 { 438 boolean stop = !makeLocal( conf, qualifiedPath, "forcing job to stand-alone mode, via source: " ); 439 440 if( stop ) 441 break; 442 } 443 } 444 445 @Deprecated 446 protected void sourceConfInitAddInputPath( Configuration conf, Path qualifiedPath ) 447 { 448 HadoopUtil.addInputPath( conf, qualifiedPath ); 449 450 makeLocal( conf, qualifiedPath, "forcing job to stand-alone mode, via source: " ); 451 } 452 453 protected void sourceConfInitComplete( FlowProcess<? extends Configuration> process, Configuration conf ) 454 { 455 super.sourceConfInit( process, conf ); 456 457 TupleSerialization.setSerializations( conf ); // allows Hfs to be used independent of Flow 458 459 // use CombineFileInputFormat if that is enabled 460 handleCombineFileInputFormat( conf ); 461 } 462 463 /** 464 * Based on the configuration, handles and sets {@link CombineFileInputFormat} as the input 465 * format. 466 */ 467 private void handleCombineFileInputFormat( Configuration conf ) 468 { 469 // if combining files, override the configuration to use CombineFileInputFormat 470 if( !getUseCombinedInput( conf ) ) 471 return; 472 473 // get the prescribed individual input format from the underlying scheme so it can be used by CombinedInputFormat 474 String individualInputFormat = conf.get( "mapred.input.format.class" ); 475 476 if( individualInputFormat == null ) 477 throw new TapException( "input format is missing from the underlying scheme" ); 478 479 if( individualInputFormat.equals( CombinedInputFormat.class.getName() ) && 480 conf.get( CombineFileRecordReaderWrapper.INDIVIDUAL_INPUT_FORMAT ) == null ) 481 throw new TapException( "the input format class is already the combined input format but the underlying input format is missing" ); 482 483 // if safe mode is on (default) throw an exception if the InputFormat is not a FileInputFormat, otherwise log a 484 // warning and don't use the CombineFileInputFormat 485 boolean safeMode = getCombinedInputSafeMode( conf ); 486 487 if( !FileInputFormat.class.isAssignableFrom( conf.getClass( "mapred.input.format.class", null ) ) ) 488 { 489 if( safeMode ) 490 throw new TapException( "input format must be of type org.apache.hadoop.mapred.FileInputFormat, got: " + individualInputFormat ); 491 else 492 LOG.warn( "not combining input splits with CombineFileInputFormat, {} is not of type org.apache.hadoop.mapred.FileInputFormat.", individualInputFormat ); 493 } 494 else 495 { 496 // set the underlying individual input format 497 conf.set( CombineFileRecordReaderWrapper.INDIVIDUAL_INPUT_FORMAT, individualInputFormat ); 498 499 // override the input format class 500 conf.setClass( "mapred.input.format.class", CombinedInputFormat.class, InputFormat.class ); 501 } 502 } 503 504 @Override 505 public void sinkConfInit( FlowProcess<? extends Configuration> process, Configuration conf ) 506 { 507 Path qualifiedPath = new Path( getFullIdentifier( conf ) ); 508 509 HadoopUtil.setOutputPath( conf, qualifiedPath ); 510 super.sinkConfInit( process, conf ); 511 512 makeLocal( conf, qualifiedPath, "forcing job to stand-alone mode, via sink: " ); 513 514 TupleSerialization.setSerializations( conf ); // allows Hfs to be used independent of Flow 515 } 516 517 private boolean makeLocal( Configuration conf, Path qualifiedPath, String infoMessage ) 518 { 519 // don't change the conf or log any messages if running cluster side 520 if( HadoopUtil.isInflow( conf ) ) 521 return false; 522 523 String scheme = getLocalModeScheme( conf, "file" ); 524 525 if( !HadoopUtil.isLocal( conf ) && qualifiedPath.toUri().getScheme().equalsIgnoreCase( scheme ) ) 526 { 527 if( LOG.isInfoEnabled() ) 528 LOG.info( infoMessage + toString() ); 529 530 HadoopUtil.setLocal( conf ); // force job to run locally 531 532 return false; // only need to set local once 533 } 534 535 return true; 536 } 537 538 @Override 539 public TupleEntryIterator openForRead( FlowProcess<? extends Configuration> flowProcess, RecordReader input ) throws IOException 540 { 541 // input may be null when this method is called on the client side or cluster side when accumulating 542 // for a HashJoin 543 return new HadoopTupleEntrySchemeIterator( flowProcess, this, input ); 544 } 545 546 @Override 547 public TupleEntryCollector openForWrite( FlowProcess<? extends Configuration> flowProcess, OutputCollector output ) throws IOException 548 { 549 resetFileStatuses(); 550 551 // output may be null when this method is called on the client side or cluster side when creating 552 // side files with the PartitionTap 553 return new HadoopTupleEntrySchemeCollector( flowProcess, this, output ); 554 } 555 556 @Override 557 public boolean createResource( Configuration conf ) throws IOException 558 { 559 if( LOG.isDebugEnabled() ) 560 LOG.debug( "making dirs: {}", getFullIdentifier( conf ) ); 561 562 return getFileSystem( conf ).mkdirs( getPath() ); 563 } 564 565 @Override 566 public boolean deleteResource( Configuration conf ) throws IOException 567 { 568 String fullIdentifier = getFullIdentifier( conf ); 569 570 return deleteFullIdentifier( conf, fullIdentifier ); 571 } 572 573 private boolean deleteFullIdentifier( Configuration conf, String fullIdentifier ) throws IOException 574 { 575 if( LOG.isDebugEnabled() ) 576 LOG.debug( "deleting: {}", fullIdentifier ); 577 578 resetFileStatuses(); 579 580 Path fullPath = new Path( fullIdentifier ); 581 582 // do not delete the root directory 583 if( fullPath.depth() == 0 ) 584 return true; 585 586 FileSystem fileSystem = getFileSystem( conf ); 587 588 try 589 { 590 return fileSystem.delete( fullPath, true ); 591 } 592 catch( NullPointerException exception ) 593 { 594 // hack to get around npe thrown when fs reaches root directory 595 // removes coupling to the new aws hadoop artifacts that may not be deployed 596 if( !( fileSystem.getClass().getSimpleName().equals( "NativeS3FileSystem" ) ) ) 597 throw exception; 598 } 599 600 return true; 601 } 602 603 public boolean deleteChildResource( FlowProcess<? extends Configuration> flowProcess, String childIdentifier ) throws IOException 604 { 605 return deleteChildResource( flowProcess.getConfig(), childIdentifier ); 606 } 607 608 public boolean deleteChildResource( Configuration conf, String childIdentifier ) throws IOException 609 { 610 resetFileStatuses(); 611 612 Path childPath = new Path( childIdentifier ).makeQualified( getFileSystem( conf ) ); 613 614 if( !childPath.toString().startsWith( getFullIdentifier( conf ) ) ) 615 return false; 616 617 return deleteFullIdentifier( conf, childPath.toString() ); 618 } 619 620 @Override 621 public boolean resourceExists( Configuration conf ) throws IOException 622 { 623 // unfortunately getFileSystem( conf ).exists( getPath() ); does not account for "/*" etc 624 // nor is there an more efficient means to test for existence 625 FileStatus[] fileStatuses = getFileSystem( conf ).globStatus( getPath() ); 626 627 return fileStatuses != null && fileStatuses.length > 0; 628 } 629 630 @Override 631 public boolean isDirectory( FlowProcess<? extends Configuration> flowProcess ) throws IOException 632 { 633 return isDirectory( flowProcess.getConfig() ); 634 } 635 636 @Override 637 public boolean isDirectory( Configuration conf ) throws IOException 638 { 639 if( !resourceExists( conf ) ) 640 return false; 641 642 return getFileSystem( conf ).getFileStatus( getPath() ).isDir(); 643 } 644 645 @Override 646 public long getSize( FlowProcess<? extends Configuration> flowProcess ) throws IOException 647 { 648 return getSize( flowProcess.getConfig() ); 649 } 650 651 @Override 652 public long getSize( Configuration conf ) throws IOException 653 { 654 if( !resourceExists( conf ) ) 655 return 0; 656 657 FileStatus fileStatus = getFileStatus( conf ); 658 659 if( fileStatus.isDir() ) 660 return 0; 661 662 return getFileSystem( conf ).getFileStatus( getPath() ).getLen(); 663 } 664 665 /** 666 * Method getBlockSize returns the {@code blocksize} specified by the underlying file system for this resource. 667 * 668 * @param flowProcess 669 * @return long 670 * @throws IOException when 671 */ 672 public long getBlockSize( FlowProcess<? extends Configuration> flowProcess ) throws IOException 673 { 674 return getBlockSize( flowProcess.getConfig() ); 675 } 676 677 /** 678 * Method getBlockSize returns the {@code blocksize} specified by the underlying file system for this resource. 679 * 680 * @param conf of JobConf 681 * @return long 682 * @throws IOException when 683 */ 684 public long getBlockSize( Configuration conf ) throws IOException 685 { 686 if( !resourceExists( conf ) ) 687 return 0; 688 689 FileStatus fileStatus = getFileStatus( conf ); 690 691 if( fileStatus.isDir() ) 692 return 0; 693 694 return fileStatus.getBlockSize(); 695 } 696 697 /** 698 * Method getReplication returns the {@code replication} specified by the underlying file system for 699 * this resource. 700 * 701 * @param flowProcess 702 * @return int 703 * @throws IOException when 704 */ 705 public int getReplication( FlowProcess<? extends Configuration> flowProcess ) throws IOException 706 { 707 return getReplication( flowProcess.getConfig() ); 708 } 709 710 /** 711 * Method getReplication returns the {@code replication} specified by the underlying file system for 712 * this resource. 713 * 714 * @param conf of JobConf 715 * @return int 716 * @throws IOException when 717 */ 718 public int getReplication( Configuration conf ) throws IOException 719 { 720 if( !resourceExists( conf ) ) 721 return 0; 722 723 FileStatus fileStatus = getFileStatus( conf ); 724 725 if( fileStatus.isDir() ) 726 return 0; 727 728 return fileStatus.getReplication(); 729 } 730 731 @Override 732 public String[] getChildIdentifiers( FlowProcess<? extends Configuration> flowProcess ) throws IOException 733 { 734 return getChildIdentifiers( flowProcess.getConfig(), 1, false ); 735 } 736 737 @Override 738 public String[] getChildIdentifiers( Configuration conf ) throws IOException 739 { 740 return getChildIdentifiers( conf, 1, false ); 741 } 742 743 @Override 744 public String[] getChildIdentifiers( FlowProcess<? extends Configuration> flowProcess, int depth, boolean fullyQualified ) throws IOException 745 { 746 return getChildIdentifiers( flowProcess.getConfig(), depth, fullyQualified ); 747 } 748 749 @Override 750 public String[] getChildIdentifiers( Configuration conf, int depth, boolean fullyQualified ) throws IOException 751 { 752 if( !resourceExists( conf ) ) 753 return new String[ 0 ]; 754 755 if( depth == 0 && !fullyQualified ) 756 return new String[]{getIdentifier()}; 757 758 String fullIdentifier = getFullIdentifier( conf ); 759 760 int trim = fullyQualified ? 0 : fullIdentifier.length() + 1; 761 762 Set<String> results = new LinkedHashSet<String>(); 763 764 getChildPaths( conf, results, trim, new Path( fullIdentifier ), depth ); 765 766 return results.toArray( new String[ results.size() ] ); 767 } 768 769 private void getChildPaths( Configuration conf, Set<String> results, int trim, Path path, int depth ) throws IOException 770 { 771 if( depth == 0 ) 772 { 773 String substring = path.toString().substring( trim ); 774 String identifier = getIdentifier(); 775 776 if( identifier == null || identifier.isEmpty() ) 777 results.add( new Path( substring ).toString() ); 778 else 779 results.add( new Path( identifier, substring ).toString() ); 780 781 return; 782 } 783 784 FileStatus[] statuses = getFileSystem( conf ).listStatus( path, HIDDEN_FILES_FILTER ); 785 786 if( statuses == null ) 787 return; 788 789 for( FileStatus fileStatus : statuses ) 790 getChildPaths( conf, results, trim, fileStatus.getPath(), depth - 1 ); 791 } 792 793 @Override 794 public long getModifiedTime( Configuration conf ) throws IOException 795 { 796 if( !resourceExists( conf ) ) 797 return 0; 798 799 FileStatus fileStatus = getFileStatus( conf ); 800 801 if( !fileStatus.isDir() ) 802 return fileStatus.getModificationTime(); 803 804 // todo: this should ignore the _temporary path, or not cache if found in the array 805 makeStatuses( conf ); 806 807 // statuses is empty, return 0 808 if( statuses == null || statuses.length == 0 ) 809 return 0; 810 811 long date = 0; 812 813 // filter out directories as we don't recurs into sub dirs 814 for( FileStatus status : statuses ) 815 { 816 if( !status.isDir() ) 817 date = Math.max( date, status.getModificationTime() ); 818 } 819 820 return date; 821 } 822 823 public FileStatus getFileStatus( Configuration conf ) throws IOException 824 { 825 return getFileSystem( conf ).getFileStatus( getPath() ); 826 } 827 828 public static Path getTempPath( Configuration conf ) 829 { 830 String tempDir = conf.get( HfsProps.TEMPORARY_DIRECTORY ); 831 832 if( tempDir == null ) 833 tempDir = conf.get( "hadoop.tmp.dir" ); 834 835 return new Path( tempDir ); 836 } 837 838 protected String makeTemporaryPathDirString( String name ) 839 { 840 // _ is treated as a hidden file, so wipe them out 841 name = name.replaceAll( "^[_\\W\\s]+", "" ); 842 843 if( name.isEmpty() ) 844 name = "temp-path"; 845 846 return name.replaceAll( "[\\W\\s]+", "_" ) + Util.createUniqueID(); 847 } 848 849 /** 850 * Given a file-system object, it makes an array of paths 851 * 852 * @param conf of type JobConf 853 * @throws IOException on failure 854 */ 855 private void makeStatuses( Configuration conf ) throws IOException 856 { 857 if( statuses != null ) 858 return; 859 860 statuses = getFileSystem( conf ).listStatus( getPath() ); 861 } 862 863 /** 864 * Method resetFileStatuses removes the status cache, if any. 865 */ 866 public void resetFileStatuses() 867 { 868 statuses = null; 869 } 870 871 /** Combined input format that uses the underlying individual input format to combine multiple files into a single split. */ 872 static class CombinedInputFormat extends CombineFileInputFormat implements Configurable 873 { 874 private Configuration conf; 875 876 public RecordReader getRecordReader( InputSplit split, JobConf job, Reporter reporter ) throws IOException 877 { 878 return new CombineFileRecordReader( job, (CombineFileSplit) split, reporter, CombineFileRecordReaderWrapper.class ); 879 } 880 881 @Override 882 public void setConf( Configuration conf ) 883 { 884 this.conf = conf; 885 886 // set the aliased property value, if zero, the super class will look up the hadoop property 887 setMaxSplitSize( conf.getLong( "cascading.hadoop.hfs.combine.max.size", 0 ) ); 888 } 889 890 @Override 891 public Configuration getConf() 892 { 893 return conf; 894 } 895 } 896 }