001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.tap.hadoop;
022    
023    import java.beans.ConstructorProperties;
024    import java.io.IOException;
025    import java.net.URI;
026    import java.net.URISyntaxException;
027    import java.util.HashSet;
028    import java.util.LinkedHashSet;
029    import java.util.Map;
030    import java.util.Set;
031    
032    import cascading.flow.FlowProcess;
033    import cascading.flow.hadoop.util.HadoopUtil;
034    import cascading.scheme.Scheme;
035    import cascading.scheme.hadoop.SequenceFile;
036    import cascading.tap.SinkMode;
037    import cascading.tap.Tap;
038    import cascading.tap.TapException;
039    import cascading.tap.hadoop.io.CombineFileRecordReaderWrapper;
040    import cascading.tap.hadoop.io.HadoopTupleEntrySchemeCollector;
041    import cascading.tap.hadoop.io.HadoopTupleEntrySchemeIterator;
042    import cascading.tap.type.FileType;
043    import cascading.tuple.Fields;
044    import cascading.tuple.TupleEntryCollector;
045    import cascading.tuple.TupleEntryIterator;
046    import cascading.tuple.hadoop.TupleSerialization;
047    import cascading.util.Util;
048    import org.apache.hadoop.conf.Configurable;
049    import org.apache.hadoop.conf.Configuration;
050    import org.apache.hadoop.fs.FileStatus;
051    import org.apache.hadoop.fs.FileSystem;
052    import org.apache.hadoop.fs.Path;
053    import org.apache.hadoop.fs.PathFilter;
054    import org.apache.hadoop.fs.s3native.NativeS3FileSystem;
055    import org.apache.hadoop.mapred.FileInputFormat;
056    import org.apache.hadoop.mapred.FileOutputFormat;
057    import org.apache.hadoop.mapred.InputSplit;
058    import org.apache.hadoop.mapred.JobConf;
059    import org.apache.hadoop.mapred.OutputCollector;
060    import org.apache.hadoop.mapred.RecordReader;
061    import org.apache.hadoop.mapred.Reporter;
062    import org.apache.hadoop.mapred.lib.CombineFileInputFormat;
063    import org.apache.hadoop.mapred.lib.CombineFileRecordReader;
064    import org.apache.hadoop.mapred.lib.CombineFileSplit;
065    import org.slf4j.Logger;
066    import org.slf4j.LoggerFactory;
067    
068    /**
069     * Class Hfs is the base class for all Hadoop file system access. Hfs may only be used with the
070     * {@link cascading.flow.hadoop.HadoopFlowConnector} when creating Hadoop executable {@link cascading.flow.Flow}
071     * instances.
072     * <p/>
073     * Paths typically should point to a directory, where in turn all the "part" files immediately in that directory will
074     * be included. This is the practice Hadoop expects. Sub-directories are not included and typically result in a failure.
075     * <p/>
076     * To include sub-directories, Hadoop supports "globing". Globing is a frustrating feature and is supported more
077     * robustly by {@link GlobHfs} and less so by Hfs.
078     * <p/>
079     * Hfs will accept {@code /*} (wildcard) paths, but not all convenience methods like
080     * {@link #getSize(org.apache.hadoop.mapred.JobConf)} will behave properly or reliably. Nor can the Hfs instance
081     * with a wildcard path be used as a sink to write data.
082     * <p/>
083     * In those cases use GlobHfs since it is a sub-class of {@link cascading.tap.MultiSourceTap}.
084     * <p/>
085     * Optionally use {@link Dfs} or {@link Lfs} for resources specific to Hadoop Distributed file system or
086     * the Local file system, respectively. Using Hfs is the best practice when possible, Lfs and Dfs are conveniences.
087     * <p/>
088     * Use the Hfs class if the 'kind' of resource is unknown at design time. To use, prefix a scheme to the 'stringPath'. Where
089     * <code>hdfs://...</code> will denote Dfs, and <code>file://...</code> will denote Lfs.
090     * <p/>
091     * Call {@link #setTemporaryDirectory(java.util.Map, String)} to use a different temporary file directory path
092     * other than the current Hadoop default path.
093     * <p/>
094     * By default Cascading on Hadoop will assume any source or sink Tap using the {@code file://} URI scheme
095     * intends to read files from the local client filesystem (for example when using the {@code Lfs} Tap) where the Hadoop
096     * job jar is started, Tap so will force any MapReduce jobs reading or writing to {@code file://} resources to run in
097     * Hadoop "standalone mode" so that the file can be read.
098     * <p/>
099     * To change this behavior, {@link HfsProps#setLocalModeScheme(java.util.Map, String)} to set a different scheme value,
100     * or to "none" to disable entirely for the case the file to be read is available on every Hadoop processing node
101     * in the exact same path.
102     * <p/>
103     * Hfs can optionally combine multiple small files (or a series of small "blocks") into larger "splits". This reduces
104     * the number of resulting map tasks created by Hadoop and can improve application performance.
105     * <p/>
106     * This is enabled by calling {@link HfsProps#setUseCombinedInput(boolean)} to {@code true}. By default, merging
107     * or combining splits into large ones is disabled.
108     */
109    public class Hfs extends Tap<JobConf, RecordReader, OutputCollector> implements FileType<JobConf>
110      {
111      /** Field LOG */
112      private static final Logger LOG = LoggerFactory.getLogger( Hfs.class );
113    
114      /**
115       * Field TEMPORARY_DIRECTORY
116       *
117       * @deprecated see {@link HfsProps#TEMPORARY_DIRECTORY}
118       */
119      @Deprecated
120      public static final String TEMPORARY_DIRECTORY = HfsProps.TEMPORARY_DIRECTORY;
121    
122      /** Field stringPath */
123      protected String stringPath;
124      /** Field uriScheme */
125      transient URI uriScheme;
126      /** Field path */
127      transient Path path;
128      /** Field paths */
129      private transient FileStatus[] statuses; // only used by getModifiedTime
130    
131      private static final PathFilter HIDDEN_FILES_FILTER = new PathFilter()
132      {
133      public boolean accept( Path path )
134        {
135        String name = path.getName();
136    
137        if( name.isEmpty() ) // should never happen
138          return true;
139    
140        char first = name.charAt( 0 );
141    
142        return first != '_' && first != '.';
143        }
144      };
145    
146      /**
147       * Method setTemporaryDirectory sets the temporary directory on the given properties object.
148       *
149       * @param properties of type Map<Object,Object>
150       * @param tempDir    of type String
151       * @deprecated see {@link HfsProps}
152       */
153      @Deprecated
154      public static void setTemporaryDirectory( Map<Object, Object> properties, String tempDir )
155        {
156        properties.put( HfsProps.TEMPORARY_DIRECTORY, tempDir );
157        }
158    
159      /**
160       * Method getTemporaryDirectory returns the configured temporary directory from the given properties object.
161       *
162       * @param properties of type Map<Object,Object>
163       * @return a String or null if not set
164       * @deprecated see {@link HfsProps}
165       */
166      @Deprecated
167      public static String getTemporaryDirectory( Map<Object, Object> properties )
168        {
169        return (String) properties.get( HfsProps.TEMPORARY_DIRECTORY );
170        }
171    
172      protected static String getLocalModeScheme( JobConf conf, String defaultValue )
173        {
174        return conf.get( HfsProps.LOCAL_MODE_SCHEME, defaultValue );
175        }
176    
177      protected static boolean getUseCombinedInput( JobConf conf )
178        {
179        return conf.getBoolean( HfsProps.COMBINE_INPUT_FILES, false );
180        }
181    
182      protected static boolean getCombinedInputSafeMode( JobConf conf )
183        {
184        return conf.getBoolean( HfsProps.COMBINE_INPUT_FILES_SAFE_MODE, true );
185        }
186    
187      protected Hfs()
188        {
189        }
190    
191      @ConstructorProperties( {"scheme"} )
192      protected Hfs( Scheme<JobConf, RecordReader, OutputCollector, ?, ?> scheme )
193        {
194        super( scheme );
195        }
196    
197      /**
198       * Constructor Hfs creates a new Hfs instance.
199       *
200       * @param fields     of type Fields
201       * @param stringPath of type String
202       */
203      @Deprecated
204      @ConstructorProperties( {"fields", "stringPath"} )
205      public Hfs( Fields fields, String stringPath )
206        {
207        super( new SequenceFile( fields ) );
208        setStringPath( stringPath );
209        }
210    
211      /**
212       * Constructor Hfs creates a new Hfs instance.
213       *
214       * @param fields     of type Fields
215       * @param stringPath of type String
216       * @param replace    of type boolean
217       */
218      @Deprecated
219      @ConstructorProperties( {"fields", "stringPath", "replace"} )
220      public Hfs( Fields fields, String stringPath, boolean replace )
221        {
222        super( new SequenceFile( fields ), replace ? SinkMode.REPLACE : SinkMode.KEEP );
223        setStringPath( stringPath );
224        }
225    
226      /**
227       * Constructor Hfs creates a new Hfs instance.
228       *
229       * @param fields     of type Fields
230       * @param stringPath of type String
231       * @param sinkMode   of type SinkMode
232       */
233      @Deprecated
234      @ConstructorProperties( {"fields", "stringPath", "sinkMode"} )
235      public Hfs( Fields fields, String stringPath, SinkMode sinkMode )
236        {
237        super( new SequenceFile( fields ), sinkMode );
238        setStringPath( stringPath );
239    
240        if( sinkMode == SinkMode.UPDATE )
241          throw new IllegalArgumentException( "updates are not supported" );
242        }
243    
244      /**
245       * Constructor Hfs creates a new Hfs instance.
246       *
247       * @param scheme     of type Scheme
248       * @param stringPath of type String
249       */
250      @ConstructorProperties( {"scheme", "stringPath"} )
251      public Hfs( Scheme<JobConf, RecordReader, OutputCollector, ?, ?> scheme, String stringPath )
252        {
253        super( scheme );
254        setStringPath( stringPath );
255        }
256    
257      /**
258       * Constructor Hfs creates a new Hfs instance.
259       *
260       * @param scheme     of type Scheme
261       * @param stringPath of type String
262       * @param replace    of type boolean
263       */
264      @Deprecated
265      @ConstructorProperties( {"scheme", "stringPath", "replace"} )
266      public Hfs( Scheme<JobConf, RecordReader, OutputCollector, ?, ?> scheme, String stringPath, boolean replace )
267        {
268        super( scheme, replace ? SinkMode.REPLACE : SinkMode.KEEP );
269        setStringPath( stringPath );
270        }
271    
272      /**
273       * Constructor Hfs creates a new Hfs instance.
274       *
275       * @param scheme     of type Scheme
276       * @param stringPath of type String
277       * @param sinkMode   of type SinkMode
278       */
279      @ConstructorProperties( {"scheme", "stringPath", "sinkMode"} )
280      public Hfs( Scheme<JobConf, RecordReader, OutputCollector, ?, ?> scheme, String stringPath, SinkMode sinkMode )
281        {
282        super( scheme, sinkMode );
283        setStringPath( stringPath );
284        }
285    
286      protected void setStringPath( String stringPath )
287        {
288        this.stringPath = Util.normalizeUrl( stringPath );
289        }
290    
291      protected void setUriScheme( URI uriScheme )
292        {
293        this.uriScheme = uriScheme;
294        }
295    
296      public URI getURIScheme( JobConf jobConf )
297        {
298        if( uriScheme != null )
299          return uriScheme;
300    
301        uriScheme = makeURIScheme( jobConf );
302    
303        return uriScheme;
304        }
305    
306      protected URI makeURIScheme( JobConf jobConf )
307        {
308        try
309          {
310          URI uriScheme;
311    
312          LOG.debug( "handling path: {}", stringPath );
313    
314          URI uri = new Path( stringPath ).toUri(); // safer URI parsing
315          String schemeString = uri.getScheme();
316          String authority = uri.getAuthority();
317    
318          LOG.debug( "found scheme: {}, authority: {}", schemeString, authority );
319    
320          if( schemeString != null && authority != null )
321            uriScheme = new URI( schemeString + "://" + uri.getAuthority() );
322          else if( schemeString != null )
323            uriScheme = new URI( schemeString + ":///" );
324          else
325            uriScheme = getDefaultFileSystemURIScheme( jobConf );
326    
327          LOG.debug( "using uri scheme: {}", uriScheme );
328    
329          return uriScheme;
330          }
331        catch( URISyntaxException exception )
332          {
333          throw new TapException( "could not determine scheme from path: " + getPath(), exception );
334          }
335        }
336    
337      /**
338       * Method getDefaultFileSystemURIScheme returns the URI scheme for the default Hadoop FileSystem.
339       *
340       * @param jobConf of type JobConf
341       * @return URI
342       */
343      public URI getDefaultFileSystemURIScheme( JobConf jobConf )
344        {
345        return getDefaultFileSystem( jobConf ).getUri();
346        }
347    
348      protected FileSystem getDefaultFileSystem( JobConf jobConf )
349        {
350        try
351          {
352          return FileSystem.get( jobConf );
353          }
354        catch( IOException exception )
355          {
356          throw new TapException( "unable to get handle to underlying filesystem", exception );
357          }
358        }
359    
360      protected FileSystem getFileSystem( JobConf jobConf )
361        {
362        URI scheme = getURIScheme( jobConf );
363    
364        try
365          {
366          return FileSystem.get( scheme, jobConf );
367          }
368        catch( IOException exception )
369          {
370          throw new TapException( "unable to get handle to get filesystem for: " + scheme.getScheme(), exception );
371          }
372        }
373    
374      @Override
375      public String getIdentifier()
376        {
377        return getPath().toString();
378        }
379    
380      public Path getPath()
381        {
382        if( path != null )
383          return path;
384    
385        if( stringPath == null )
386          throw new IllegalStateException( "path not initialized" );
387    
388        path = new Path( stringPath );
389    
390        return path;
391        }
392    
393      @Override
394      public String getFullIdentifier( JobConf conf )
395        {
396        return getPath().makeQualified( getFileSystem( conf ) ).toString();
397        }
398    
399      @Override
400      public void sourceConfInit( FlowProcess<JobConf> process, JobConf conf )
401        {
402        String fullIdentifier = getFullIdentifier( conf );
403    
404        applySourceConfInitIdentifiers( process, conf, fullIdentifier );
405    
406        verifyNoDuplicates( conf );
407        }
408    
409      protected static void verifyNoDuplicates( JobConf conf )
410        {
411        Path[] inputPaths = FileInputFormat.getInputPaths( conf );
412        Set<Path> paths = new HashSet<Path>( (int) ( inputPaths.length / .75f ) );
413    
414        for( Path inputPath : inputPaths )
415          {
416          if( !paths.add( inputPath ) )
417            throw new TapException( "may not add duplicate paths, found: " + inputPath );
418          }
419        }
420    
421      protected void applySourceConfInitIdentifiers( FlowProcess<JobConf> process, JobConf conf, String... fullIdentifiers )
422        {
423        for( String fullIdentifier : fullIdentifiers )
424          sourceConfInitAddInputPath( conf, new Path( fullIdentifier ) );
425    
426        sourceConfInitComplete( process, conf );
427        }
428    
429      protected void sourceConfInitAddInputPath( JobConf conf, Path qualifiedPath )
430        {
431        FileInputFormat.addInputPath( conf, qualifiedPath );
432    
433        makeLocal( conf, qualifiedPath, "forcing job to local mode, via source: " );
434        }
435    
436      protected void sourceConfInitComplete( FlowProcess<JobConf> process, JobConf conf )
437        {
438        super.sourceConfInit( process, conf );
439    
440        TupleSerialization.setSerializations( conf ); // allows Hfs to be used independent of Flow
441    
442        // use CombineFileInputFormat if that is enabled
443        handleCombineFileInputFormat( conf );
444        }
445    
446      /**
447       * Based on the configuration, handles and sets {@link CombineFileInputFormat} as the input
448       * format.
449       */
450      private void handleCombineFileInputFormat( JobConf conf )
451        {
452        // if combining files, override the configuration to use CombineFileInputFormat
453        if( !getUseCombinedInput( conf ) )
454          return;
455    
456        // get the prescribed individual input format from the underlying scheme so it can be used by CombinedInputFormat
457        String individualInputFormat = conf.get( "mapred.input.format.class" );
458    
459        if( individualInputFormat == null )
460          throw new TapException( "input format is missing from the underlying scheme" );
461    
462        if( individualInputFormat.equals( CombinedInputFormat.class.getName() ) &&
463          conf.get( CombineFileRecordReaderWrapper.INDIVIDUAL_INPUT_FORMAT ) == null )
464          throw new TapException( "the input format class is already the combined input format but the underlying input format is missing" );
465    
466        // if safe mode is on (default) throw an exception if the InputFormat is not a FileInputFormat, otherwise log a
467        // warning and don't use the CombineFileInputFormat
468        boolean safeMode = getCombinedInputSafeMode( conf );
469    
470        if( !FileInputFormat.class.isAssignableFrom( conf.getClass( "mapred.input.format.class", null ) ) )
471          {
472          if( safeMode )
473            throw new TapException( "input format must be of type org.apache.hadoop.mapred.FileInputFormat, got: " + individualInputFormat );
474          else
475            LOG.warn( "not combining input splits with CombineFileInputFormat, {} is not of type org.apache.hadoop.mapred.FileInputFormat.", individualInputFormat );
476          }
477        else
478          {
479          // set the underlying individual input format
480          conf.set( CombineFileRecordReaderWrapper.INDIVIDUAL_INPUT_FORMAT, individualInputFormat );
481    
482          // override the input format class
483          conf.setInputFormat( CombinedInputFormat.class );
484          }
485        }
486    
487      @Override
488      public void sinkConfInit( FlowProcess<JobConf> process, JobConf conf )
489        {
490        Path qualifiedPath = new Path( getFullIdentifier( conf ) );
491    
492        FileOutputFormat.setOutputPath( conf, qualifiedPath );
493        super.sinkConfInit( process, conf );
494    
495        makeLocal( conf, qualifiedPath, "forcing job to local mode, via sink: " );
496    
497        TupleSerialization.setSerializations( conf ); // allows Hfs to be used independent of Flow
498        }
499    
500      private void makeLocal( JobConf conf, Path qualifiedPath, String infoMessage )
501        {
502        String scheme = getLocalModeScheme( conf, "file" );
503    
504        if( !HadoopUtil.isLocal( conf ) && qualifiedPath.toUri().getScheme().equalsIgnoreCase( scheme ) )
505          {
506          if( LOG.isInfoEnabled() )
507            LOG.info( infoMessage + toString() );
508    
509          HadoopUtil.setLocal( conf ); // force job to run locally
510          }
511        }
512    
513      @Override
514      public TupleEntryIterator openForRead( FlowProcess<JobConf> flowProcess, RecordReader input ) throws IOException
515        {
516        // input may be null when this method is called on the client side or cluster side when accumulating
517        // for a HashJoin
518        return new HadoopTupleEntrySchemeIterator( flowProcess, this, input );
519        }
520    
521      @Override
522      public TupleEntryCollector openForWrite( FlowProcess<JobConf> flowProcess, OutputCollector output ) throws IOException
523        {
524        // output may be null when this method is called on the client side or cluster side when creating
525        // side files with the TemplateTap
526        return new HadoopTupleEntrySchemeCollector( flowProcess, this, output );
527        }
528    
529      @Override
530      public boolean createResource( JobConf conf ) throws IOException
531        {
532        if( LOG.isDebugEnabled() )
533          LOG.debug( "making dirs: {}", getFullIdentifier( conf ) );
534    
535        return getFileSystem( conf ).mkdirs( getPath() );
536        }
537    
538      @Override
539      public boolean deleteResource( JobConf conf ) throws IOException
540        {
541        String fullIdentifier = getFullIdentifier( conf );
542    
543        return deleteFullIdentifier( conf, fullIdentifier );
544        }
545    
546      private boolean deleteFullIdentifier( JobConf conf, String fullIdentifier ) throws IOException
547        {
548        if( LOG.isDebugEnabled() )
549          LOG.debug( "deleting: {}", fullIdentifier );
550    
551        Path fullPath = new Path( fullIdentifier );
552    
553        // do not delete the root directory
554        if( fullPath.depth() == 0 )
555          return true;
556    
557        FileSystem fileSystem = getFileSystem( conf );
558    
559        try
560          {
561          return fileSystem.delete( fullPath, true );
562          }
563        catch( NullPointerException exception )
564          {
565          // hack to get around npe thrown when fs reaches root directory
566          if( !( fileSystem instanceof NativeS3FileSystem ) )
567            throw exception;
568          }
569    
570        return true;
571        }
572    
573      public boolean deleteChildResource( JobConf conf, String childIdentifier ) throws IOException
574        {
575        Path childPath = new Path( childIdentifier ).makeQualified( getFileSystem( conf ) );
576    
577        if( !childPath.toString().startsWith( getFullIdentifier( conf ) ) )
578          return false;
579    
580        return deleteFullIdentifier( conf, childPath.toString() );
581        }
582    
583      @Override
584      public boolean resourceExists( JobConf conf ) throws IOException
585        {
586        // unfortunately getFileSystem( conf ).exists( getPath() ); does not account for "/*" etc
587        // nor is there an more efficient means to test for existence
588        FileStatus[] fileStatuses = getFileSystem( conf ).globStatus( getPath() );
589    
590        return fileStatuses != null && fileStatuses.length > 0;
591        }
592    
593      @Override
594      public boolean isDirectory( JobConf conf ) throws IOException
595        {
596        if( !resourceExists( conf ) )
597          return false;
598    
599        return getFileSystem( conf ).getFileStatus( getPath() ).isDir();
600        }
601    
602      @Override
603      public long getSize( JobConf conf ) throws IOException
604        {
605        if( !resourceExists( conf ) )
606          return 0;
607    
608        FileStatus fileStatus = getFileSystem( conf ).getFileStatus( getPath() );
609    
610        if( fileStatus.isDir() )
611          return 0;
612    
613        return getFileSystem( conf ).getFileStatus( getPath() ).getLen();
614        }
615    
616      /**
617       * Method getBlockSize returns the {@code blocksize} specified by the underlying file system for this resource.
618       *
619       * @param conf of JobConf
620       * @return long
621       * @throws IOException when
622       */
623      public long getBlockSize( JobConf conf ) throws IOException
624        {
625        if( !resourceExists( conf ) )
626          return 0;
627    
628        FileStatus fileStatus = getFileSystem( conf ).getFileStatus( getPath() );
629    
630        if( fileStatus.isDir() )
631          return 0;
632    
633        return fileStatus.getBlockSize();
634        }
635    
636      /**
637       * Method getReplication returns the {@code replication} specified by the underlying file system for
638       * this resource.
639       *
640       * @param conf of JobConf
641       * @return int
642       * @throws IOException when
643       */
644      public int getReplication( JobConf conf ) throws IOException
645        {
646        if( !resourceExists( conf ) )
647          return 0;
648    
649        FileStatus fileStatus = getFileSystem( conf ).getFileStatus( getPath() );
650    
651        if( fileStatus.isDir() )
652          return 0;
653    
654        return fileStatus.getReplication();
655        }
656    
657      @Override
658      public String[] getChildIdentifiers( JobConf conf ) throws IOException
659        {
660        return getChildIdentifiers( conf, 1, false );
661        }
662    
663      @Override
664      public String[] getChildIdentifiers( JobConf conf, int depth, boolean fullyQualified ) throws IOException
665        {
666        if( !resourceExists( conf ) )
667          return new String[ 0 ];
668    
669        if( depth == 0 && !fullyQualified )
670          return new String[]{getIdentifier()};
671    
672        String fullIdentifier = getFullIdentifier( conf );
673    
674        int trim = fullyQualified ? 0 : fullIdentifier.length() + 1;
675    
676        Set<String> results = new LinkedHashSet<String>();
677    
678        getChildPaths( conf, results, trim, new Path( fullIdentifier ), depth );
679    
680        return results.toArray( new String[ results.size() ] );
681        }
682    
683      private void getChildPaths( JobConf conf, Set<String> results, int trim, Path path, int depth ) throws IOException
684        {
685        if( depth == 0 )
686          {
687          String substring = path.toString().substring( trim );
688          String identifier = getIdentifier();
689    
690          if( identifier == null || identifier.isEmpty() )
691            results.add( new Path( substring ).toString() );
692          else
693            results.add( new Path( identifier, substring ).toString() );
694    
695          return;
696          }
697    
698        FileStatus[] statuses = getFileSystem( conf ).listStatus( path, HIDDEN_FILES_FILTER );
699    
700        if( statuses == null )
701          return;
702    
703        for( FileStatus fileStatus : statuses )
704          getChildPaths( conf, results, trim, fileStatus.getPath(), depth - 1 );
705        }
706    
707      @Override
708      public long getModifiedTime( JobConf conf ) throws IOException
709        {
710        if( !resourceExists( conf ) )
711          return 0;
712    
713        FileStatus fileStatus = getFileSystem( conf ).getFileStatus( getPath() );
714    
715        if( !fileStatus.isDir() )
716          return fileStatus.getModificationTime();
717    
718        // todo: this should ignore the _temporary path, or not cache if found in the array
719        makeStatuses( conf );
720    
721        // statuses is empty, return 0
722        if( statuses == null || statuses.length == 0 )
723          return 0;
724    
725        long date = 0;
726    
727        // filter out directories as we don't recurs into sub dirs
728        for( FileStatus status : statuses )
729          {
730          if( !status.isDir() )
731            date = Math.max( date, status.getModificationTime() );
732          }
733    
734        return date;
735        }
736    
737      public static Path getTempPath( JobConf conf )
738        {
739        String tempDir = conf.get( HfsProps.TEMPORARY_DIRECTORY );
740    
741        if( tempDir == null )
742          tempDir = conf.get( "hadoop.tmp.dir" );
743    
744        return new Path( tempDir );
745        }
746    
747      protected String makeTemporaryPathDirString( String name )
748        {
749        // _ is treated as a hidden file, so wipe them out
750        name = name.replaceAll( "^[_\\W\\s]+", "" );
751    
752        if( name.isEmpty() )
753          name = "temp-path";
754    
755        return name.replaceAll( "[\\W\\s]+", "_" ) + Util.createUniqueID();
756        }
757    
758      /**
759       * Given a file-system object, it makes an array of paths
760       *
761       * @param conf of type JobConf
762       * @throws IOException on failure
763       */
764      private void makeStatuses( JobConf conf ) throws IOException
765        {
766        if( statuses != null )
767          return;
768    
769        statuses = getFileSystem( conf ).listStatus( getPath() );
770        }
771    
772      /** Combined input format that uses the underlying individual input format to combine multiple files into a single split. */
773      static class CombinedInputFormat extends CombineFileInputFormat implements Configurable
774        {
775        private Configuration conf;
776    
777        public RecordReader getRecordReader( InputSplit split, JobConf job, Reporter reporter ) throws IOException
778          {
779          return new CombineFileRecordReader( job, (CombineFileSplit) split, reporter, CombineFileRecordReaderWrapper.class );
780          }
781    
782        @Override
783        public void setConf( Configuration conf )
784          {
785          this.conf = conf;
786    
787          // set the aliased property value, if zero, the super class will look up the hadoop property
788          setMaxSplitSize( conf.getLong( HfsProps.COMBINE_INPUT_FILES_SIZE_MAX, 0 ) );
789          }
790    
791        @Override
792        public Configuration getConf()
793          {
794          return conf;
795          }
796        }
797      }