001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.tap.hadoop;
022    
023    import java.beans.ConstructorProperties;
024    import java.io.IOException;
025    import java.net.URI;
026    import java.net.URISyntaxException;
027    import java.util.HashSet;
028    import java.util.LinkedHashSet;
029    import java.util.Map;
030    import java.util.Set;
031    
032    import cascading.flow.FlowProcess;
033    import cascading.flow.hadoop.util.HadoopUtil;
034    import cascading.scheme.Scheme;
035    import cascading.scheme.hadoop.SequenceFile;
036    import cascading.tap.SinkMode;
037    import cascading.tap.Tap;
038    import cascading.tap.TapException;
039    import cascading.tap.hadoop.io.CombineFileRecordReaderWrapper;
040    import cascading.tap.hadoop.io.HadoopTupleEntrySchemeCollector;
041    import cascading.tap.hadoop.io.HadoopTupleEntrySchemeIterator;
042    import cascading.tap.type.FileType;
043    import cascading.tuple.Fields;
044    import cascading.tuple.TupleEntryCollector;
045    import cascading.tuple.TupleEntryIterator;
046    import cascading.tuple.hadoop.TupleSerialization;
047    import cascading.util.Util;
048    import org.apache.hadoop.conf.Configurable;
049    import org.apache.hadoop.conf.Configuration;
050    import org.apache.hadoop.fs.FileStatus;
051    import org.apache.hadoop.fs.FileSystem;
052    import org.apache.hadoop.fs.Path;
053    import org.apache.hadoop.fs.s3native.NativeS3FileSystem;
054    import org.apache.hadoop.mapred.FileInputFormat;
055    import org.apache.hadoop.mapred.FileOutputFormat;
056    import org.apache.hadoop.mapred.InputSplit;
057    import org.apache.hadoop.mapred.JobConf;
058    import org.apache.hadoop.mapred.OutputCollector;
059    import org.apache.hadoop.mapred.RecordReader;
060    import org.apache.hadoop.mapred.Reporter;
061    import org.apache.hadoop.mapred.Utils;
062    import org.apache.hadoop.mapred.lib.CombineFileInputFormat;
063    import org.apache.hadoop.mapred.lib.CombineFileRecordReader;
064    import org.apache.hadoop.mapred.lib.CombineFileSplit;
065    import org.slf4j.Logger;
066    import org.slf4j.LoggerFactory;
067    
068    /**
069     * Class Hfs is the base class for all Hadoop file system access. Hfs may only be used with the
070     * {@link cascading.flow.hadoop.HadoopFlowConnector} when creating Hadoop executable {@link cascading.flow.Flow}
071     * instances.
072     * <p/>
073     * Paths typically should point to a directory, where in turn all the "part" files immediately in that directory will
074     * be included. This is the practice Hadoop expects. Sub-directories are not included and typically result in a failure.
075     * <p/>
076     * To include sub-directories, Hadoop supports "globing". Globing is a frustrating feature and is supported more
077     * robustly by {@link GlobHfs} and less so by Hfs.
078     * <p/>
079     * Hfs will accept {@code /*} (wildcard) paths, but not all convenience methods like
080     * {@link #getSize(org.apache.hadoop.mapred.JobConf)} will behave properly or reliably. Nor can the Hfs instance
081     * with a wildcard path be used as a sink to write data.
082     * <p/>
083     * In those cases use GlobHfs since it is a sub-class of {@link cascading.tap.MultiSourceTap}.
084     * <p/>
085     * Optionally use {@link Dfs} or {@link Lfs} for resources specific to Hadoop Distributed file system or
086     * the Local file system, respectively. Using Hfs is the best practice when possible, Lfs and Dfs are conveniences.
087     * <p/>
088     * Use the Hfs class if the 'kind' of resource is unknown at design time. To use, prefix a scheme to the 'stringPath'. Where
089     * <code>hdfs://...</code> will denote Dfs, and <code>file://...</code> will denote Lfs.
090     * <p/>
091     * Call {@link #setTemporaryDirectory(java.util.Map, String)} to use a different temporary file directory path
092     * other than the current Hadoop default path.
093     * <p/>
094     * By default Cascading on Hadoop will assume any source or sink Tap using the {@code file://} URI scheme
095     * intends to read files from the local client filesystem (for example when using the {@code Lfs} Tap) where the Hadoop
096     * job jar is started, Tap so will force any MapReduce jobs reading or writing to {@code file://} resources to run in
097     * Hadoop "standalone mode" so that the file can be read.
098     * <p/>
099     * To change this behavior, {@link HfsProps#setLocalModeScheme(java.util.Map, String)} to set a different scheme value,
100     * or to "none" to disable entirely for the case the file to be read is available on every Hadoop processing node
101     * in the exact same path.
102     * <p/>
103     * Hfs can optionally combine multiple small files (or a series of small "blocks") into larger "splits". This reduces
104     * the number of resulting map tasks created by Hadoop and can improve application performance.
105     * <p/>
106     * This is enabled by calling {@link HfsProps#setUseCombinedInput(boolean)} to {@code true}. By default, merging
107     * or combining splits into large ones is disabled.
108     */
109    public class Hfs extends Tap<JobConf, RecordReader, OutputCollector> implements FileType<JobConf>
110      {
111      /** Field LOG */
112      private static final Logger LOG = LoggerFactory.getLogger( Hfs.class );
113    
114      /**
115       * Field TEMPORARY_DIRECTORY
116       *
117       * @deprecated see {@link HfsProps#TEMPORARY_DIRECTORY}
118       */
119      @Deprecated
120      public static final String TEMPORARY_DIRECTORY = HfsProps.TEMPORARY_DIRECTORY;
121    
122      /** Field stringPath */
123      protected String stringPath;
124      /** Field uriScheme */
125      transient URI uriScheme;
126      /** Field path */
127      transient Path path;
128      /** Field paths */
129      private transient FileStatus[] statuses; // only used by getModifiedTime
130    
131      /**
132       * Method setTemporaryDirectory sets the temporary directory on the given properties object.
133       *
134       * @param properties of type Map<Object,Object>
135       * @param tempDir    of type String
136       * @deprecated see {@link HfsProps}
137       */
138      @Deprecated
139      public static void setTemporaryDirectory( Map<Object, Object> properties, String tempDir )
140        {
141        properties.put( HfsProps.TEMPORARY_DIRECTORY, tempDir );
142        }
143    
144      /**
145       * Method getTemporaryDirectory returns the configured temporary directory from the given properties object.
146       *
147       * @param properties of type Map<Object,Object>
148       * @return a String or null if not set
149       * @deprecated see {@link HfsProps}
150       */
151      @Deprecated
152      public static String getTemporaryDirectory( Map<Object, Object> properties )
153        {
154        return (String) properties.get( HfsProps.TEMPORARY_DIRECTORY );
155        }
156    
157      protected static String getLocalModeScheme( JobConf conf, String defaultValue )
158        {
159        return conf.get( HfsProps.LOCAL_MODE_SCHEME, defaultValue );
160        }
161    
162      protected static boolean getUseCombinedInput( JobConf conf )
163        {
164        return conf.getBoolean( HfsProps.COMBINE_INPUT_FILES, false );
165        }
166    
167      protected static boolean getCombinedInputSafeMode( JobConf conf )
168        {
169        return conf.getBoolean( HfsProps.COMBINE_INPUT_FILES_SAFE_MODE, true );
170        }
171    
172      protected Hfs()
173        {
174        }
175    
176      @ConstructorProperties({"scheme"})
177      protected Hfs( Scheme<JobConf, RecordReader, OutputCollector, ?, ?> scheme )
178        {
179        super( scheme );
180        }
181    
182      /**
183       * Constructor Hfs creates a new Hfs instance.
184       *
185       * @param fields     of type Fields
186       * @param stringPath of type String
187       */
188      @Deprecated
189      @ConstructorProperties({"fields", "stringPath"})
190      public Hfs( Fields fields, String stringPath )
191        {
192        super( new SequenceFile( fields ) );
193        setStringPath( stringPath );
194        }
195    
196      /**
197       * Constructor Hfs creates a new Hfs instance.
198       *
199       * @param fields     of type Fields
200       * @param stringPath of type String
201       * @param replace    of type boolean
202       */
203      @Deprecated
204      @ConstructorProperties({"fields", "stringPath", "replace"})
205      public Hfs( Fields fields, String stringPath, boolean replace )
206        {
207        super( new SequenceFile( fields ), replace ? SinkMode.REPLACE : SinkMode.KEEP );
208        setStringPath( stringPath );
209        }
210    
211      /**
212       * Constructor Hfs creates a new Hfs instance.
213       *
214       * @param fields     of type Fields
215       * @param stringPath of type String
216       * @param sinkMode   of type SinkMode
217       */
218      @Deprecated
219      @ConstructorProperties({"fields", "stringPath", "sinkMode"})
220      public Hfs( Fields fields, String stringPath, SinkMode sinkMode )
221        {
222        super( new SequenceFile( fields ), sinkMode );
223        setStringPath( stringPath );
224    
225        if( sinkMode == SinkMode.UPDATE )
226          throw new IllegalArgumentException( "updates are not supported" );
227        }
228    
229      /**
230       * Constructor Hfs creates a new Hfs instance.
231       *
232       * @param scheme     of type Scheme
233       * @param stringPath of type String
234       */
235      @ConstructorProperties({"scheme", "stringPath"})
236      public Hfs( Scheme<JobConf, RecordReader, OutputCollector, ?, ?> scheme, String stringPath )
237        {
238        super( scheme );
239        setStringPath( stringPath );
240        }
241    
242      /**
243       * Constructor Hfs creates a new Hfs instance.
244       *
245       * @param scheme     of type Scheme
246       * @param stringPath of type String
247       * @param replace    of type boolean
248       */
249      @Deprecated
250      @ConstructorProperties({"scheme", "stringPath", "replace"})
251      public Hfs( Scheme<JobConf, RecordReader, OutputCollector, ?, ?> scheme, String stringPath, boolean replace )
252        {
253        super( scheme, replace ? SinkMode.REPLACE : SinkMode.KEEP );
254        setStringPath( stringPath );
255        }
256    
257      /**
258       * Constructor Hfs creates a new Hfs instance.
259       *
260       * @param scheme     of type Scheme
261       * @param stringPath of type String
262       * @param sinkMode   of type SinkMode
263       */
264      @ConstructorProperties({"scheme", "stringPath", "sinkMode"})
265      public Hfs( Scheme<JobConf, RecordReader, OutputCollector, ?, ?> scheme, String stringPath, SinkMode sinkMode )
266        {
267        super( scheme, sinkMode );
268        setStringPath( stringPath );
269        }
270    
271      protected void setStringPath( String stringPath )
272        {
273        this.stringPath = Util.normalizeUrl( stringPath );
274        }
275    
276      protected void setUriScheme( URI uriScheme )
277        {
278        this.uriScheme = uriScheme;
279        }
280    
281      public URI getURIScheme( JobConf jobConf )
282        {
283        if( uriScheme != null )
284          return uriScheme;
285    
286        uriScheme = makeURIScheme( jobConf );
287    
288        return uriScheme;
289        }
290    
291      protected URI makeURIScheme( JobConf jobConf )
292        {
293        try
294          {
295          URI uriScheme;
296    
297          LOG.debug( "handling path: {}", stringPath );
298    
299          URI uri = new Path( stringPath ).toUri(); // safer URI parsing
300          String schemeString = uri.getScheme();
301          String authority = uri.getAuthority();
302    
303          LOG.debug( "found scheme: {}, authority: {}", schemeString, authority );
304    
305          if( schemeString != null && authority != null )
306            uriScheme = new URI( schemeString + "://" + uri.getAuthority() );
307          else if( schemeString != null )
308            uriScheme = new URI( schemeString + ":///" );
309          else
310            uriScheme = getDefaultFileSystemURIScheme( jobConf );
311    
312          LOG.debug( "using uri scheme: {}", uriScheme );
313    
314          return uriScheme;
315          }
316        catch( URISyntaxException exception )
317          {
318          throw new TapException( "could not determine scheme from path: " + getPath(), exception );
319          }
320        }
321    
322      /**
323       * Method getDefaultFileSystemURIScheme returns the URI scheme for the default Hadoop FileSystem.
324       *
325       * @param jobConf of type JobConf
326       * @return URI
327       */
328      public URI getDefaultFileSystemURIScheme( JobConf jobConf )
329        {
330        return getDefaultFileSystem( jobConf ).getUri();
331        }
332    
333      protected FileSystem getDefaultFileSystem( JobConf jobConf )
334        {
335        try
336          {
337          return FileSystem.get( jobConf );
338          }
339        catch( IOException exception )
340          {
341          throw new TapException( "unable to get handle to underlying filesystem", exception );
342          }
343        }
344    
345      protected FileSystem getFileSystem( JobConf jobConf )
346        {
347        URI scheme = getURIScheme( jobConf );
348    
349        try
350          {
351          return FileSystem.get( scheme, jobConf );
352          }
353        catch( IOException exception )
354          {
355          throw new TapException( "unable to get handle to get filesystem for: " + scheme.getScheme(), exception );
356          }
357        }
358    
359      @Override
360      public String getIdentifier()
361        {
362        return getPath().toString();
363        }
364    
365      public Path getPath()
366        {
367        if( path != null )
368          return path;
369    
370        if( stringPath == null )
371          throw new IllegalStateException( "path not initialized" );
372    
373        path = new Path( stringPath );
374    
375        return path;
376        }
377    
378      @Override
379      public String getFullIdentifier( JobConf conf )
380        {
381        return getPath().makeQualified( getFileSystem( conf ) ).toString();
382        }
383    
384      @Override
385      public void sourceConfInit( FlowProcess<JobConf> process, JobConf conf )
386        {
387        String fullIdentifier = getFullIdentifier( conf );
388    
389        applySourceConfInitIdentifiers( process, conf, fullIdentifier );
390    
391        verifyNoDuplicates( conf );
392        }
393    
394      protected static void verifyNoDuplicates( JobConf conf )
395        {
396        Path[] inputPaths = FileInputFormat.getInputPaths( conf );
397        Set<Path> paths = new HashSet<Path>( (int) ( inputPaths.length / .75f ) );
398    
399        for( Path inputPath : inputPaths )
400          {
401          if( !paths.add( inputPath ) )
402            throw new TapException( "may not add duplicate paths, found: " + inputPath );
403          }
404        }
405    
406      protected void applySourceConfInitIdentifiers( FlowProcess<JobConf> process, JobConf conf, String... fullIdentifiers )
407        {
408        for( String fullIdentifier : fullIdentifiers )
409          sourceConfInitAddInputPath( conf, new Path( fullIdentifier ) );
410    
411        sourceConfInitComplete( process, conf );
412        }
413    
414      protected void sourceConfInitAddInputPath( JobConf conf, Path qualifiedPath )
415        {
416        FileInputFormat.addInputPath( conf, qualifiedPath );
417    
418        makeLocal( conf, qualifiedPath, "forcing job to local mode, via source: " );
419        }
420    
421      protected void sourceConfInitComplete( FlowProcess<JobConf> process, JobConf conf )
422        {
423        super.sourceConfInit( process, conf );
424    
425        TupleSerialization.setSerializations( conf ); // allows Hfs to be used independent of Flow
426    
427        // use CombineFileInputFormat if that is enabled
428        handleCombineFileInputFormat( conf );
429        }
430    
431      /**
432       * Based on the configuration, handles and sets {@link CombineFileInputFormat} as the input
433       * format.
434       */
435      private void handleCombineFileInputFormat( JobConf conf )
436        {
437        // if combining files, override the configuration to use CombineFileInputFormat
438        if( !getUseCombinedInput( conf ) )
439          return;
440    
441        // get the prescribed individual input format from the underlying scheme so it can be used by CombinedInputFormat
442        String individualInputFormat = conf.get( "mapred.input.format.class" );
443    
444        if( individualInputFormat == null )
445          throw new TapException( "input format is missing from the underlying scheme" );
446    
447        if( individualInputFormat.equals( CombinedInputFormat.class.getName() ) &&
448          conf.get( CombineFileRecordReaderWrapper.INDIVIDUAL_INPUT_FORMAT ) == null )
449          throw new TapException( "the input format class is already the combined input format but the underlying input format is missing" );
450    
451        // if safe mode is on (default) throw an exception if the InputFormat is not a FileInputFormat, otherwise log a
452        // warning and don't use the CombineFileInputFormat
453        boolean safeMode = getCombinedInputSafeMode( conf );
454    
455        if( !FileInputFormat.class.isAssignableFrom( conf.getClass( "mapred.input.format.class", null ) ) )
456          {
457          if( safeMode )
458            throw new TapException( "input format must be of type org.apache.hadoop.mapred.FileInputFormat, got: " + individualInputFormat );
459          else
460            LOG.warn( "not combining input splits with CombineFileInputFormat, {} is not of type org.apache.hadoop.mapred.FileInputFormat.", individualInputFormat );
461          }
462        else
463          {
464          // set the underlying individual input format
465          conf.set( CombineFileRecordReaderWrapper.INDIVIDUAL_INPUT_FORMAT, individualInputFormat );
466    
467          // override the input format class
468          conf.setInputFormat( CombinedInputFormat.class );
469          }
470        }
471    
472      @Override
473      public void sinkConfInit( FlowProcess<JobConf> process, JobConf conf )
474        {
475        Path qualifiedPath = new Path( getFullIdentifier( conf ) );
476    
477        FileOutputFormat.setOutputPath( conf, qualifiedPath );
478        super.sinkConfInit( process, conf );
479    
480        makeLocal( conf, qualifiedPath, "forcing job to local mode, via sink: " );
481    
482        TupleSerialization.setSerializations( conf ); // allows Hfs to be used independent of Flow
483        }
484    
485      private void makeLocal( JobConf conf, Path qualifiedPath, String infoMessage )
486        {
487        String scheme = getLocalModeScheme( conf, "file" );
488    
489        if( !HadoopUtil.isLocal( conf ) && qualifiedPath.toUri().getScheme().equalsIgnoreCase( scheme ) )
490          {
491          if( LOG.isInfoEnabled() )
492            LOG.info( infoMessage + toString() );
493    
494          HadoopUtil.setLocal( conf ); // force job to run locally
495          }
496        }
497    
498      @Override
499      public TupleEntryIterator openForRead( FlowProcess<JobConf> flowProcess, RecordReader input ) throws IOException
500        {
501        // input may be null when this method is called on the client side or cluster side when accumulating
502        // for a HashJoin
503        return new HadoopTupleEntrySchemeIterator( flowProcess, this, input );
504        }
505    
506      @Override
507      public TupleEntryCollector openForWrite( FlowProcess<JobConf> flowProcess, OutputCollector output ) throws IOException
508        {
509        // output may be null when this method is called on the client side or cluster side when creating
510        // side files with the TemplateTap
511        return new HadoopTupleEntrySchemeCollector( flowProcess, this, output );
512        }
513    
514      @Override
515      public boolean createResource( JobConf conf ) throws IOException
516        {
517        if( LOG.isDebugEnabled() )
518          LOG.debug( "making dirs: {}", getFullIdentifier( conf ) );
519    
520        return getFileSystem( conf ).mkdirs( getPath() );
521        }
522    
523      @Override
524      public boolean deleteResource( JobConf conf ) throws IOException
525        {
526        String fullIdentifier = getFullIdentifier( conf );
527    
528        return deleteFullIdentifier( conf, fullIdentifier );
529        }
530    
531      private boolean deleteFullIdentifier( JobConf conf, String fullIdentifier ) throws IOException
532        {
533        if( LOG.isDebugEnabled() )
534          LOG.debug( "deleting: {}", fullIdentifier );
535    
536        Path fullPath = new Path( fullIdentifier );
537    
538        // do not delete the root directory
539        if( fullPath.depth() == 0 )
540          return true;
541    
542        FileSystem fileSystem = getFileSystem( conf );
543    
544        try
545          {
546          return fileSystem.delete( fullPath, true );
547          }
548        catch( NullPointerException exception )
549          {
550          // hack to get around npe thrown when fs reaches root directory
551          if( !( fileSystem instanceof NativeS3FileSystem ) )
552            throw exception;
553          }
554    
555        return true;
556        }
557    
558      public boolean deleteChildResource( JobConf conf, String childIdentifier ) throws IOException
559        {
560        Path childPath = new Path( childIdentifier ).makeQualified( getFileSystem( conf ) );
561    
562        if( !childPath.toString().startsWith( getFullIdentifier( conf ) ) )
563          return false;
564    
565        return deleteFullIdentifier( conf, childPath.toString() );
566        }
567    
568    
569      @Override
570      public boolean resourceExists( JobConf conf ) throws IOException
571        {
572        // unfortunately getFileSystem( conf ).exists( getPath() ); does not account for "/*" etc
573        // nor is there an more efficient means to test for existence
574        FileStatus[] fileStatuses = getFileSystem( conf ).globStatus( getPath() );
575    
576        return fileStatuses != null && fileStatuses.length > 0;
577        }
578    
579      @Override
580      public boolean isDirectory( JobConf conf ) throws IOException
581        {
582        if( !resourceExists( conf ) )
583          return false;
584    
585        return getFileSystem( conf ).getFileStatus( getPath() ).isDir();
586        }
587    
588      @Override
589      public long getSize( JobConf conf ) throws IOException
590        {
591        if( !resourceExists( conf ) )
592          return 0;
593    
594        FileStatus fileStatus = getFileSystem( conf ).getFileStatus( getPath() );
595    
596        if( fileStatus.isDir() )
597          return 0;
598    
599        return getFileSystem( conf ).getFileStatus( getPath() ).getLen();
600        }
601    
602      /**
603       * Method getBlockSize returns the {@code blocksize} specified by the underlying file system for this resource.
604       *
605       * @param conf of JobConf
606       * @return long
607       * @throws IOException when
608       */
609      public long getBlockSize( JobConf conf ) throws IOException
610        {
611        if( !resourceExists( conf ) )
612          return 0;
613    
614        FileStatus fileStatus = getFileSystem( conf ).getFileStatus( getPath() );
615    
616        if( fileStatus.isDir() )
617          return 0;
618    
619        return fileStatus.getBlockSize();
620        }
621    
622      /**
623       * Method getReplication returns the {@code replication} specified by the underlying file system for
624       * this resource.
625       *
626       * @param conf of JobConf
627       * @return int
628       * @throws IOException when
629       */
630      public int getReplication( JobConf conf ) throws IOException
631        {
632        if( !resourceExists( conf ) )
633          return 0;
634    
635        FileStatus fileStatus = getFileSystem( conf ).getFileStatus( getPath() );
636    
637        if( fileStatus.isDir() )
638          return 0;
639    
640        return fileStatus.getReplication();
641        }
642    
643      @Override
644      public String[] getChildIdentifiers( JobConf conf ) throws IOException
645        {
646        return getChildIdentifiers( conf, 1, false );
647        }
648    
649      @Override
650      public String[] getChildIdentifiers( JobConf conf, int depth, boolean fullyQualified ) throws IOException
651        {
652        if( !resourceExists( conf ) )
653          return new String[ 0 ];
654    
655        if( depth == 0 && !fullyQualified )
656          return new String[]{getIdentifier()};
657    
658        String fullIdentifier = getFullIdentifier( conf );
659    
660        int trim = fullyQualified ? 0 : fullIdentifier.length() + 1;
661    
662        Set<String> results = new LinkedHashSet<String>();
663    
664        getChildPaths( conf, results, trim, new Path( fullIdentifier ), depth );
665    
666        return results.toArray( new String[ results.size() ] );
667        }
668    
669      private void getChildPaths( JobConf conf, Set<String> results, int trim, Path path, int depth ) throws IOException
670        {
671        if( depth == 0 )
672          {
673          String substring = path.toString().substring( trim );
674          String identifier = getIdentifier();
675    
676          if( identifier == null || identifier.isEmpty() )
677            results.add( new Path( substring ).toString() );
678          else
679            results.add( new Path( identifier, substring ).toString() );
680    
681          return;
682          }
683    
684        FileStatus[] statuses = getFileSystem( conf ).listStatus( path, new Utils.OutputFileUtils.OutputFilesFilter() );
685    
686        if( statuses == null )
687          return;
688    
689        for( FileStatus fileStatus : statuses )
690          getChildPaths( conf, results, trim, fileStatus.getPath(), depth - 1 );
691        }
692    
693      @Override
694      public long getModifiedTime( JobConf conf ) throws IOException
695        {
696        if( !resourceExists( conf ) )
697          return 0;
698    
699        FileStatus fileStatus = getFileSystem( conf ).getFileStatus( getPath() );
700    
701        if( !fileStatus.isDir() )
702          return fileStatus.getModificationTime();
703    
704        // todo: this should ignore the _temporary path, or not cache if found in the array
705        makeStatuses( conf );
706    
707        // statuses is empty, return 0
708        if( statuses == null || statuses.length == 0 )
709          return 0;
710    
711        long date = 0;
712    
713        // filter out directories as we don't recurs into sub dirs
714        for( FileStatus status : statuses )
715          {
716          if( !status.isDir() )
717            date = Math.max( date, status.getModificationTime() );
718          }
719    
720        return date;
721        }
722    
723      public static Path getTempPath( JobConf conf )
724        {
725        String tempDir = conf.get( HfsProps.TEMPORARY_DIRECTORY );
726    
727        if( tempDir == null )
728          tempDir = conf.get( "hadoop.tmp.dir" );
729    
730        return new Path( tempDir );
731        }
732    
733      protected String makeTemporaryPathDirString( String name )
734        {
735        // _ is treated as a hidden file, so wipe them out
736        name = name.replaceAll( "^[_\\W\\s]+", "" );
737    
738        if( name.isEmpty() )
739          name = "temp-path";
740    
741        return name.replaceAll( "[\\W\\s]+", "_" ) + Util.createUniqueID();
742        }
743    
744      /**
745       * Given a file-system object, it makes an array of paths
746       *
747       * @param conf of type JobConf
748       * @throws IOException on failure
749       */
750      private void makeStatuses( JobConf conf ) throws IOException
751        {
752        if( statuses != null )
753          return;
754    
755        statuses = getFileSystem( conf ).listStatus( getPath() );
756        }
757    
758      /** Combined input format that uses the underlying individual input format to combine multiple files into a single split. */
759      static class CombinedInputFormat extends CombineFileInputFormat implements Configurable
760        {
761        private Configuration conf;
762    
763        public RecordReader getRecordReader( InputSplit split, JobConf job, Reporter reporter ) throws IOException
764          {
765          return new CombineFileRecordReader( job, (CombineFileSplit) split, reporter, CombineFileRecordReaderWrapper.class );
766          }
767    
768        @Override
769        public void setConf( Configuration conf )
770          {
771          this.conf = conf;
772    
773          // set the aliased property value, if zero, the super class will look up the hadoop property
774          setMaxSplitSize( conf.getLong( HfsProps.COMBINE_INPUT_FILES_SIZE_MAX, 0 ) );
775          }
776    
777        @Override
778        public Configuration getConf()
779          {
780          return conf;
781          }
782        }
783      }