001/*
002 * Copyright (c) 2016-2018 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
004 *
005 * Project and contact information: http://www.cascading.org/
006 *
007 * This file is part of the Cascading project.
008 *
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *     http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 */
021
022package cascading.tap.hadoop;
023
024import java.beans.ConstructorProperties;
025import java.io.IOException;
026import java.net.URI;
027import java.net.URISyntaxException;
028import java.util.HashSet;
029import java.util.LinkedHashSet;
030import java.util.Set;
031
032import cascading.CascadingException;
033import cascading.flow.FlowProcess;
034import cascading.flow.FlowRuntimeProps;
035import cascading.flow.hadoop.util.HadoopUtil;
036import cascading.scheme.Scheme;
037import cascading.tap.SinkMode;
038import cascading.tap.Tap;
039import cascading.tap.TapException;
040import cascading.tap.hadoop.io.CombineFileRecordReaderWrapper;
041import cascading.tap.hadoop.io.HadoopTupleEntrySchemeCollector;
042import cascading.tap.hadoop.io.HadoopTupleEntrySchemeIterator;
043import cascading.tap.type.FileType;
044import cascading.tap.type.TapWith;
045import cascading.tuple.TupleEntryCollector;
046import cascading.tuple.TupleEntryIterator;
047import cascading.tuple.hadoop.TupleSerialization;
048import cascading.util.LazyIterable;
049import cascading.util.Util;
050import org.apache.hadoop.conf.Configurable;
051import org.apache.hadoop.conf.Configuration;
052import org.apache.hadoop.fs.FileStatus;
053import org.apache.hadoop.fs.FileSystem;
054import org.apache.hadoop.fs.Path;
055import org.apache.hadoop.fs.PathFilter;
056import org.apache.hadoop.mapred.FileInputFormat;
057import org.apache.hadoop.mapred.InputFormat;
058import org.apache.hadoop.mapred.InputSplit;
059import org.apache.hadoop.mapred.JobConf;
060import org.apache.hadoop.mapred.OutputCollector;
061import org.apache.hadoop.mapred.RecordReader;
062import org.apache.hadoop.mapred.Reporter;
063import org.apache.hadoop.mapred.lib.CombineFileInputFormat;
064import org.apache.hadoop.mapred.lib.CombineFileRecordReader;
065import org.apache.hadoop.mapred.lib.CombineFileSplit;
066import org.slf4j.Logger;
067import org.slf4j.LoggerFactory;
068
069/**
070 * Class Hfs is the base class for all Hadoop file system access. Hfs may only be used with the
071 * Hadoop {@link cascading.flow.FlowConnector} sub-classes when creating Hadoop executable {@link cascading.flow.Flow}
072 * instances.
073 * <p>
074 * Paths typically should point to a directory, where in turn all the "part" files immediately in that directory will
075 * be included. This is the practice Hadoop expects. Sub-directories are not included and typically result in a failure.
076 * <p>
077 * To include sub-directories, Hadoop supports "globing". Globing is a frustrating feature and is supported more
078 * robustly by {@link GlobHfs} and less so by Hfs.
079 * <p>
080 * Hfs will accept {@code /*} (wildcard) paths, but not all convenience methods like
081 * {@code jobConf.getSize} will behave properly or reliably. Nor can the Hfs instance
082 * with a wildcard path be used as a sink to write data.
083 * <p>
084 * In those cases use GlobHfs since it is a sub-class of {@link cascading.tap.MultiSourceTap}.
085 * <p>
086 * Optionally use {@link Dfs} or {@link Lfs} for resources specific to Hadoop Distributed file system or
087 * the Local file system, respectively. Using Hfs is the best practice when possible, Lfs and Dfs are conveniences.
088 * <p>
089 * Use the Hfs class if the 'kind' of resource is unknown at design time. To use, prefix a scheme to the 'stringPath'. Where
090 * {@code hdfs://...} will denote Dfs, and {@code file://...} will denote Lfs.
091 * <p>
092 * Call {@link HfsProps#setTemporaryDirectory(java.util.Map, String)} to use a different temporary file directory path
093 * other than the current Hadoop default path.
094 * <p>
095 * By default Cascading on Hadoop will assume any source or sink Tap using the {@code file://} URI scheme
096 * intends to read files from the local client filesystem (for example when using the {@code Lfs} Tap) where the Hadoop
097 * job jar is started. Subsequently Cascading will force any MapReduce jobs reading or writing to {@code file://} resources
098 * to run in Hadoop "standalone mode" so that the file can be read.
099 * <p>
100 * To change this behavior, {@link HfsProps#setLocalModeScheme(java.util.Map, String)} to set a different scheme value,
101 * or to "none" to disable entirely for the case the file to be read is available on every Hadoop processing node
102 * in the exact same path.
103 * <p>
104 * When using a MapReduce planner, Hfs can optionally combine multiple small files (or a series of small "blocks") into
105 * larger "splits". This reduces the number of resulting map tasks created by Hadoop and can improve application
106 * performance.
107 * <p>
108 * This is enabled by calling {@link HfsProps#setUseCombinedInput(boolean)} to {@code true}. By default, merging
109 * or combining splits into large ones is disabled.
110 * <p>
111 * Apache Tez planner does not require this setting, it is supported by default and enabled by the application manager.
112 */
113public class Hfs extends Tap<Configuration, RecordReader, OutputCollector> implements FileType<Configuration>, TapWith<Configuration, RecordReader, OutputCollector>
114  {
115  /** Field LOG */
116  private static final Logger LOG = LoggerFactory.getLogger( Hfs.class );
117
118  /** Field stringPath */
119  protected String stringPath;
120  /** Field uriScheme */
121  transient URI uriScheme;
122  /** Field path */
123  transient Path path;
124  /** Field paths */
125  private transient FileStatus[] statuses; // only used by getModifiedTime
126
127  private transient String cachedPath = null;
128
129  private static final PathFilter HIDDEN_FILES_FILTER = path ->
130  {
131  String name = path.getName();
132
133  if( name.isEmpty() ) // should never happen
134    return true;
135
136  char first = name.charAt( 0 );
137
138  return first != '_' && first != '.';
139  };
140
141  protected static String getLocalModeScheme( Configuration conf, String defaultValue )
142    {
143    return conf.get( HfsProps.LOCAL_MODE_SCHEME, defaultValue );
144    }
145
146  protected static boolean getUseCombinedInput( Configuration conf )
147    {
148    boolean combineEnabled = conf.getBoolean( "cascading.hadoop.hfs.combine.files", false );
149
150    if( conf.get( FlowRuntimeProps.COMBINE_SPLITS ) == null && !combineEnabled )
151      return false;
152
153    if( !combineEnabled ) // enable if set in FlowRuntimeProps
154      combineEnabled = conf.getBoolean( FlowRuntimeProps.COMBINE_SPLITS, false );
155
156    String platform = conf.get( "cascading.flow.platform", "" );
157
158    // only supported by these platforms
159    if( platform.equals( "hadoop" ) || platform.equals( "hadoop2-mr1" ) )
160      return combineEnabled;
161
162    // we are on a platform that supports combining, just not through the combiner
163    // do not enable it here locally
164    if( conf.get( FlowRuntimeProps.COMBINE_SPLITS ) != null )
165      return false;
166
167    if( combineEnabled && !Boolean.getBoolean( "cascading.hadoop.hfs.combine.files.warned" ) )
168      {
169      LOG.warn( "'cascading.hadoop.hfs.combine.files' has been set to true, but is unsupported by this platform: {}, will be ignored to prevent failures", platform );
170      System.setProperty( "cascading.hadoop.hfs.combine.files.warned", "true" );
171      }
172
173    return false;
174    }
175
176  protected static boolean getCombinedInputSafeMode( Configuration conf )
177    {
178    return conf.getBoolean( "cascading.hadoop.hfs.combine.safemode", true );
179    }
180
181  protected Hfs()
182    {
183    }
184
185  @ConstructorProperties({"scheme"})
186  protected Hfs( Scheme<Configuration, RecordReader, OutputCollector, ?, ?> scheme )
187    {
188    super( scheme );
189    }
190
191  /**
192   * Constructor Hfs creates a new Hfs instance.
193   *
194   * @param scheme     of type Scheme
195   * @param stringPath of type String
196   */
197  @ConstructorProperties({"scheme", "stringPath"})
198  public Hfs( Scheme<Configuration, RecordReader, OutputCollector, ?, ?> scheme, String stringPath )
199    {
200    super( scheme );
201    setStringPath( stringPath );
202    }
203
204  /**
205   * Constructor Hfs creates a new Hfs instance.
206   *
207   * @param scheme     of type Scheme
208   * @param stringPath of type String
209   * @param sinkMode   of type SinkMode
210   */
211  @ConstructorProperties({"scheme", "stringPath", "sinkMode"})
212  public Hfs( Scheme<Configuration, RecordReader, OutputCollector, ?, ?> scheme, String stringPath, SinkMode sinkMode )
213    {
214    super( scheme, sinkMode );
215    setStringPath( stringPath );
216    }
217
218  /**
219   * Constructor Hfs creates a new Hfs instance.
220   *
221   * @param scheme of type Scheme
222   * @param path   of type Path
223   */
224  @ConstructorProperties({"scheme", "path"})
225  public Hfs( Scheme<Configuration, RecordReader, OutputCollector, ?, ?> scheme, Path path )
226    {
227    super( scheme );
228    setStringPath( path.toString() );
229    }
230
231  /**
232   * Constructor Hfs creates a new Hfs instance.
233   *
234   * @param scheme   of type Scheme
235   * @param path     of type Path
236   * @param sinkMode of type SinkMode
237   */
238  @ConstructorProperties({"scheme", "path", "sinkMode"})
239  public Hfs( Scheme<Configuration, RecordReader, OutputCollector, ?, ?> scheme, Path path, SinkMode sinkMode )
240    {
241    super( scheme, sinkMode );
242    setStringPath( path.toString() );
243    }
244
245  @Override
246  public TapWith<Configuration, RecordReader, OutputCollector> withChildIdentifier( String identifier )
247    {
248    Path path = new Path( identifier );
249
250    if( !path.toString().startsWith( getPath().toString() ) )
251      path = new Path( getPath(), path );
252
253    return create( getScheme(), path, getSinkMode() );
254    }
255
256  @Override
257  public TapWith<Configuration, RecordReader, OutputCollector> withScheme( Scheme<Configuration, RecordReader, OutputCollector, ?, ?> scheme )
258    {
259    return create( scheme, getPath(), getSinkMode() );
260    }
261
262  @Override
263  public TapWith<Configuration, RecordReader, OutputCollector> withSinkMode( SinkMode sinkMode )
264    {
265    return create( getScheme(), getPath(), sinkMode );
266    }
267
268  protected TapWith<Configuration, RecordReader, OutputCollector> create( Scheme<Configuration, RecordReader, OutputCollector, ?, ?> scheme, Path path, SinkMode sinkMode )
269    {
270    try
271      {
272      return Util.newInstance( getClass(), new Object[]{scheme, path, sinkMode} );
273      }
274    catch( CascadingException exception )
275      {
276      throw new TapException( "unable to create a new instance of: " + getClass().getName(), exception );
277      }
278    }
279
280  protected void setStringPath( String stringPath )
281    {
282    this.stringPath = Util.normalizeUrl( stringPath );
283    }
284
285  protected void setUriScheme( URI uriScheme )
286    {
287    this.uriScheme = uriScheme;
288    }
289
290  public URI getURIScheme( Configuration jobConf )
291    {
292    if( uriScheme != null )
293      return uriScheme;
294
295    uriScheme = makeURIScheme( jobConf );
296
297    return uriScheme;
298    }
299
300  protected URI makeURIScheme( Configuration configuration )
301    {
302    try
303      {
304      URI uriScheme;
305
306      LOG.debug( "handling path: {}", stringPath );
307
308      URI uri = new Path( stringPath ).toUri(); // safer URI parsing
309      String schemeString = uri.getScheme();
310      String authority = uri.getAuthority();
311
312      LOG.debug( "found scheme: {}, authority: {}", schemeString, authority );
313
314      if( schemeString != null && authority != null )
315        uriScheme = new URI( schemeString + "://" + uri.getAuthority() );
316      else if( schemeString != null )
317        uriScheme = new URI( schemeString + ":///" );
318      else
319        uriScheme = getDefaultFileSystemURIScheme( configuration );
320
321      LOG.debug( "using uri scheme: {}", uriScheme );
322
323      return uriScheme;
324      }
325    catch( URISyntaxException exception )
326      {
327      throw new TapException( "could not determine scheme from path: " + getPath(), exception );
328      }
329    }
330
331  /**
332   * Method getDefaultFileSystemURIScheme returns the URI scheme for the default Hadoop FileSystem.
333   *
334   * @param configuration of type JobConf
335   * @return URI
336   */
337  public URI getDefaultFileSystemURIScheme( Configuration configuration )
338    {
339    return getDefaultFileSystem( configuration ).getUri();
340    }
341
342  protected FileSystem getDefaultFileSystem( Configuration configuration )
343    {
344    try
345      {
346      return FileSystem.get( configuration );
347      }
348    catch( IOException exception )
349      {
350      throw new TapException( "unable to get handle to underlying filesystem", exception );
351      }
352    }
353
354  protected FileSystem getFileSystem( Configuration configuration )
355    {
356    URI scheme = getURIScheme( configuration );
357
358    try
359      {
360      return FileSystem.get( scheme, configuration );
361      }
362    catch( IOException exception )
363      {
364      throw new TapException( "unable to get handle to get filesystem for: " + scheme.getScheme(), exception );
365      }
366    }
367
368  @Override
369  public String getIdentifier()
370    {
371    if( cachedPath == null )
372      cachedPath = getPath().toString();
373
374    return cachedPath;
375    }
376
377  public Path getPath()
378    {
379    if( path != null )
380      return path;
381
382    if( stringPath == null )
383      throw new IllegalStateException( "path not initialized" );
384
385    path = new Path( stringPath );
386
387    return path;
388    }
389
390  @Override
391  public String getFullIdentifier( Configuration conf )
392    {
393    return getPath().makeQualified( getFileSystem( conf ) ).toString();
394    }
395
396  @Override
397  public void sourceConfInit( FlowProcess<? extends Configuration> process, Configuration conf )
398    {
399    String fullIdentifier = getFullIdentifier( conf );
400
401    applySourceConfInitIdentifiers( process, conf, fullIdentifier );
402
403    verifyNoDuplicates( conf );
404    }
405
406  protected static void verifyNoDuplicates( Configuration conf )
407    {
408    Path[] inputPaths = FileInputFormat.getInputPaths( HadoopUtil.asJobConfInstance( conf ) );
409    Set<Path> paths = new HashSet<Path>( (int) ( inputPaths.length / .75f ) );
410
411    for( Path inputPath : inputPaths )
412      {
413      if( !paths.add( inputPath ) )
414        throw new TapException( "may not add duplicate paths, found: " + inputPath );
415      }
416    }
417
418  protected void applySourceConfInitIdentifiers( FlowProcess<? extends Configuration> process, Configuration conf, final String... fullIdentifiers )
419    {
420    sourceConfInitAddInputPaths( conf, new LazyIterable<String, Path>( fullIdentifiers )
421      {
422      @Override
423      protected Path convert( String next )
424        {
425        return new Path( next );
426        }
427      } );
428
429    sourceConfInitComplete( process, conf );
430    }
431
432  protected void sourceConfInitAddInputPaths( Configuration conf, Iterable<Path> qualifiedPaths )
433    {
434    HadoopUtil.addInputPaths( conf, qualifiedPaths );
435
436    for( Path qualifiedPath : qualifiedPaths )
437      {
438      boolean stop = !makeLocal( conf, qualifiedPath, "forcing job to stand-alone mode, via source: " );
439
440      if( stop )
441        break;
442      }
443    }
444
445  @Deprecated
446  protected void sourceConfInitAddInputPath( Configuration conf, Path qualifiedPath )
447    {
448    HadoopUtil.addInputPath( conf, qualifiedPath );
449
450    makeLocal( conf, qualifiedPath, "forcing job to stand-alone mode, via source: " );
451    }
452
453  protected void sourceConfInitComplete( FlowProcess<? extends Configuration> process, Configuration conf )
454    {
455    super.sourceConfInit( process, conf );
456
457    TupleSerialization.setSerializations( conf ); // allows Hfs to be used independent of Flow
458
459    // use CombineFileInputFormat if that is enabled
460    handleCombineFileInputFormat( conf );
461    }
462
463  /**
464   * Based on the configuration, handles and sets {@link CombineFileInputFormat} as the input
465   * format.
466   */
467  private void handleCombineFileInputFormat( Configuration conf )
468    {
469    // if combining files, override the configuration to use CombineFileInputFormat
470    if( !getUseCombinedInput( conf ) )
471      return;
472
473    // get the prescribed individual input format from the underlying scheme so it can be used by CombinedInputFormat
474    String individualInputFormat = conf.get( "mapred.input.format.class" );
475
476    if( individualInputFormat == null )
477      throw new TapException( "input format is missing from the underlying scheme" );
478
479    if( individualInputFormat.equals( CombinedInputFormat.class.getName() ) &&
480      conf.get( CombineFileRecordReaderWrapper.INDIVIDUAL_INPUT_FORMAT ) == null )
481      throw new TapException( "the input format class is already the combined input format but the underlying input format is missing" );
482
483    // if safe mode is on (default) throw an exception if the InputFormat is not a FileInputFormat, otherwise log a
484    // warning and don't use the CombineFileInputFormat
485    boolean safeMode = getCombinedInputSafeMode( conf );
486
487    if( !FileInputFormat.class.isAssignableFrom( conf.getClass( "mapred.input.format.class", null ) ) )
488      {
489      if( safeMode )
490        throw new TapException( "input format must be of type org.apache.hadoop.mapred.FileInputFormat, got: " + individualInputFormat );
491      else
492        LOG.warn( "not combining input splits with CombineFileInputFormat, {} is not of type org.apache.hadoop.mapred.FileInputFormat.", individualInputFormat );
493      }
494    else
495      {
496      // set the underlying individual input format
497      conf.set( CombineFileRecordReaderWrapper.INDIVIDUAL_INPUT_FORMAT, individualInputFormat );
498
499      // override the input format class
500      conf.setClass( "mapred.input.format.class", CombinedInputFormat.class, InputFormat.class );
501      }
502    }
503
504  @Override
505  public void sinkConfInit( FlowProcess<? extends Configuration> process, Configuration conf )
506    {
507    Path qualifiedPath = new Path( getFullIdentifier( conf ) );
508
509    HadoopUtil.setOutputPath( conf, qualifiedPath );
510    super.sinkConfInit( process, conf );
511
512    makeLocal( conf, qualifiedPath, "forcing job to stand-alone mode, via sink: " );
513
514    TupleSerialization.setSerializations( conf ); // allows Hfs to be used independent of Flow
515    }
516
517  private boolean makeLocal( Configuration conf, Path qualifiedPath, String infoMessage )
518    {
519    // don't change the conf or log any messages if running cluster side
520    if( HadoopUtil.isInflow( conf ) )
521      return false;
522
523    String scheme = getLocalModeScheme( conf, "file" );
524
525    if( !HadoopUtil.isLocal( conf ) && qualifiedPath.toUri().getScheme().equalsIgnoreCase( scheme ) )
526      {
527      if( LOG.isInfoEnabled() )
528        LOG.info( infoMessage + toString() );
529
530      HadoopUtil.setLocal( conf ); // force job to run locally
531
532      return false; // only need to set local once
533      }
534
535    return true;
536    }
537
538  @Override
539  public TupleEntryIterator openForRead( FlowProcess<? extends Configuration> flowProcess, RecordReader input ) throws IOException
540    {
541    // input may be null when this method is called on the client side or cluster side when accumulating
542    // for a HashJoin
543    return new HadoopTupleEntrySchemeIterator( flowProcess, this, input );
544    }
545
546  @Override
547  public TupleEntryCollector openForWrite( FlowProcess<? extends Configuration> flowProcess, OutputCollector output ) throws IOException
548    {
549    resetFileStatuses();
550
551    // output may be null when this method is called on the client side or cluster side when creating
552    // side files with the PartitionTap
553    return new HadoopTupleEntrySchemeCollector( flowProcess, this, output );
554    }
555
556  @Override
557  public boolean createResource( Configuration conf ) throws IOException
558    {
559    if( LOG.isDebugEnabled() )
560      LOG.debug( "making dirs: {}", getFullIdentifier( conf ) );
561
562    return getFileSystem( conf ).mkdirs( getPath() );
563    }
564
565  @Override
566  public boolean deleteResource( Configuration conf ) throws IOException
567    {
568    String fullIdentifier = getFullIdentifier( conf );
569
570    return deleteFullIdentifier( conf, fullIdentifier );
571    }
572
573  private boolean deleteFullIdentifier( Configuration conf, String fullIdentifier ) throws IOException
574    {
575    if( LOG.isDebugEnabled() )
576      LOG.debug( "deleting: {}", fullIdentifier );
577
578    resetFileStatuses();
579
580    Path fullPath = new Path( fullIdentifier );
581
582    // do not delete the root directory
583    if( fullPath.depth() == 0 )
584      return true;
585
586    FileSystem fileSystem = getFileSystem( conf );
587
588    try
589      {
590      return fileSystem.delete( fullPath, true );
591      }
592    catch( NullPointerException exception )
593      {
594      // hack to get around npe thrown when fs reaches root directory
595      // removes coupling to the new aws hadoop artifacts that may not be deployed
596      if( !( fileSystem.getClass().getSimpleName().equals( "NativeS3FileSystem" ) ) )
597        throw exception;
598      }
599
600    return true;
601    }
602
603  public boolean deleteChildResource( FlowProcess<? extends Configuration> flowProcess, String childIdentifier ) throws IOException
604    {
605    return deleteChildResource( flowProcess.getConfig(), childIdentifier );
606    }
607
608  public boolean deleteChildResource( Configuration conf, String childIdentifier ) throws IOException
609    {
610    resetFileStatuses();
611
612    Path childPath = new Path( childIdentifier ).makeQualified( getFileSystem( conf ) );
613
614    if( !childPath.toString().startsWith( getFullIdentifier( conf ) ) )
615      return false;
616
617    return deleteFullIdentifier( conf, childPath.toString() );
618    }
619
620  @Override
621  public boolean resourceExists( Configuration conf ) throws IOException
622    {
623    // unfortunately getFileSystem( conf ).exists( getPath() ); does not account for "/*" etc
624    // nor is there an more efficient means to test for existence
625    FileStatus[] fileStatuses = getFileSystem( conf ).globStatus( getPath() );
626
627    return fileStatuses != null && fileStatuses.length > 0;
628    }
629
630  @Override
631  public boolean isDirectory( FlowProcess<? extends Configuration> flowProcess ) throws IOException
632    {
633    return isDirectory( flowProcess.getConfig() );
634    }
635
636  @Override
637  public boolean isDirectory( Configuration conf ) throws IOException
638    {
639    if( !resourceExists( conf ) )
640      return false;
641
642    return getFileSystem( conf ).getFileStatus( getPath() ).isDir();
643    }
644
645  @Override
646  public long getSize( FlowProcess<? extends Configuration> flowProcess ) throws IOException
647    {
648    return getSize( flowProcess.getConfig() );
649    }
650
651  @Override
652  public long getSize( Configuration conf ) throws IOException
653    {
654    if( !resourceExists( conf ) )
655      return 0;
656
657    FileStatus fileStatus = getFileStatus( conf );
658
659    if( fileStatus.isDir() )
660      return 0;
661
662    return getFileSystem( conf ).getFileStatus( getPath() ).getLen();
663    }
664
665  /**
666   * Method getBlockSize returns the {@code blocksize} specified by the underlying file system for this resource.
667   *
668   * @param flowProcess
669   * @return long
670   * @throws IOException when
671   */
672  public long getBlockSize( FlowProcess<? extends Configuration> flowProcess ) throws IOException
673    {
674    return getBlockSize( flowProcess.getConfig() );
675    }
676
677  /**
678   * Method getBlockSize returns the {@code blocksize} specified by the underlying file system for this resource.
679   *
680   * @param conf of JobConf
681   * @return long
682   * @throws IOException when
683   */
684  public long getBlockSize( Configuration conf ) throws IOException
685    {
686    if( !resourceExists( conf ) )
687      return 0;
688
689    FileStatus fileStatus = getFileStatus( conf );
690
691    if( fileStatus.isDir() )
692      return 0;
693
694    return fileStatus.getBlockSize();
695    }
696
697  /**
698   * Method getReplication returns the {@code replication} specified by the underlying file system for
699   * this resource.
700   *
701   * @param flowProcess
702   * @return int
703   * @throws IOException when
704   */
705  public int getReplication( FlowProcess<? extends Configuration> flowProcess ) throws IOException
706    {
707    return getReplication( flowProcess.getConfig() );
708    }
709
710  /**
711   * Method getReplication returns the {@code replication} specified by the underlying file system for
712   * this resource.
713   *
714   * @param conf of JobConf
715   * @return int
716   * @throws IOException when
717   */
718  public int getReplication( Configuration conf ) throws IOException
719    {
720    if( !resourceExists( conf ) )
721      return 0;
722
723    FileStatus fileStatus = getFileStatus( conf );
724
725    if( fileStatus.isDir() )
726      return 0;
727
728    return fileStatus.getReplication();
729    }
730
731  @Override
732  public String[] getChildIdentifiers( FlowProcess<? extends Configuration> flowProcess ) throws IOException
733    {
734    return getChildIdentifiers( flowProcess.getConfig(), 1, false );
735    }
736
737  @Override
738  public String[] getChildIdentifiers( Configuration conf ) throws IOException
739    {
740    return getChildIdentifiers( conf, 1, false );
741    }
742
743  @Override
744  public String[] getChildIdentifiers( FlowProcess<? extends Configuration> flowProcess, int depth, boolean fullyQualified ) throws IOException
745    {
746    return getChildIdentifiers( flowProcess.getConfig(), depth, fullyQualified );
747    }
748
749  @Override
750  public String[] getChildIdentifiers( Configuration conf, int depth, boolean fullyQualified ) throws IOException
751    {
752    if( !resourceExists( conf ) )
753      return new String[ 0 ];
754
755    if( depth == 0 && !fullyQualified )
756      return new String[]{getIdentifier()};
757
758    String fullIdentifier = getFullIdentifier( conf );
759
760    int trim = fullyQualified ? 0 : fullIdentifier.length() + 1;
761
762    Set<String> results = new LinkedHashSet<String>();
763
764    getChildPaths( conf, results, trim, new Path( fullIdentifier ), depth );
765
766    return results.toArray( new String[ results.size() ] );
767    }
768
769  private void getChildPaths( Configuration conf, Set<String> results, int trim, Path path, int depth ) throws IOException
770    {
771    if( depth == 0 )
772      {
773      String substring = path.toString().substring( trim );
774      String identifier = getIdentifier();
775
776      if( identifier == null || identifier.isEmpty() )
777        results.add( new Path( substring ).toString() );
778      else
779        results.add( new Path( identifier, substring ).toString() );
780
781      return;
782      }
783
784    FileStatus[] statuses = getFileSystem( conf ).listStatus( path, HIDDEN_FILES_FILTER );
785
786    if( statuses == null )
787      return;
788
789    for( FileStatus fileStatus : statuses )
790      getChildPaths( conf, results, trim, fileStatus.getPath(), depth - 1 );
791    }
792
793  @Override
794  public long getModifiedTime( Configuration conf ) throws IOException
795    {
796    if( !resourceExists( conf ) )
797      return 0;
798
799    FileStatus fileStatus = getFileStatus( conf );
800
801    if( !fileStatus.isDir() )
802      return fileStatus.getModificationTime();
803
804    // todo: this should ignore the _temporary path, or not cache if found in the array
805    makeStatuses( conf );
806
807    // statuses is empty, return 0
808    if( statuses == null || statuses.length == 0 )
809      return 0;
810
811    long date = 0;
812
813    // filter out directories as we don't recurs into sub dirs
814    for( FileStatus status : statuses )
815      {
816      if( !status.isDir() )
817        date = Math.max( date, status.getModificationTime() );
818      }
819
820    return date;
821    }
822
823  public FileStatus getFileStatus( Configuration conf ) throws IOException
824    {
825    return getFileSystem( conf ).getFileStatus( getPath() );
826    }
827
828  public static Path getTempPath( Configuration conf )
829    {
830    String tempDir = conf.get( HfsProps.TEMPORARY_DIRECTORY );
831
832    if( tempDir == null )
833      tempDir = conf.get( "hadoop.tmp.dir" );
834
835    return new Path( tempDir );
836    }
837
838  protected String makeTemporaryPathDirString( String name )
839    {
840    // _ is treated as a hidden file, so wipe them out
841    name = name.replaceAll( "^[_\\W\\s]+", "" );
842
843    if( name.isEmpty() )
844      name = "temp-path";
845
846    return name.replaceAll( "[\\W\\s]+", "_" ) + Util.createUniqueID();
847    }
848
849  /**
850   * Given a file-system object, it makes an array of paths
851   *
852   * @param conf of type JobConf
853   * @throws IOException on failure
854   */
855  private void makeStatuses( Configuration conf ) throws IOException
856    {
857    if( statuses != null )
858      return;
859
860    statuses = getFileSystem( conf ).listStatus( getPath() );
861    }
862
863  /**
864   * Method resetFileStatuses removes the status cache, if any.
865   */
866  public void resetFileStatuses()
867    {
868    statuses = null;
869    }
870
871  /** Combined input format that uses the underlying individual input format to combine multiple files into a single split. */
872  static class CombinedInputFormat extends CombineFileInputFormat implements Configurable
873    {
874    private Configuration conf;
875
876    public RecordReader getRecordReader( InputSplit split, JobConf job, Reporter reporter ) throws IOException
877      {
878      return new CombineFileRecordReader( job, (CombineFileSplit) split, reporter, CombineFileRecordReaderWrapper.class );
879      }
880
881    @Override
882    public void setConf( Configuration conf )
883      {
884      this.conf = conf;
885
886      // set the aliased property value, if zero, the super class will look up the hadoop property
887      setMaxSplitSize( conf.getLong( "cascading.hadoop.hfs.combine.max.size", 0 ) );
888      }
889
890    @Override
891    public Configuration getConf()
892      {
893      return conf;
894      }
895    }
896  }