001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.tap.hadoop;
022    
023    import java.beans.ConstructorProperties;
024    import java.io.IOException;
025    import java.util.ArrayList;
026    import java.util.List;
027    
028    import cascading.flow.FlowProcess;
029    import cascading.scheme.Scheme;
030    import cascading.tap.MultiSourceTap;
031    import cascading.tap.TapException;
032    import org.apache.hadoop.fs.FileStatus;
033    import org.apache.hadoop.fs.FileSystem;
034    import org.apache.hadoop.fs.Path;
035    import org.apache.hadoop.fs.PathFilter;
036    import org.apache.hadoop.mapred.JobConf;
037    import org.apache.hadoop.mapred.RecordReader;
038    
039    /**
040     * Class GlobHfs is a type of {@link cascading.tap.MultiSourceTap} that accepts Hadoop style 'file globing' expressions so
041     * multiple files that match the given pattern may be used as the input sources for a given {@link cascading.flow.Flow}.
042     * <p/>
043     * See {@link FileSystem#globStatus(org.apache.hadoop.fs.Path)} for details on the globing syntax. But in short
044     * it is similar to standard regular expressions except alternation is done via {foo,bar} instead of (foo|bar).
045     * <p/>
046     * Note that a {@link cascading.flow.Flow} sourcing from GlobHfs is not currently compatible with the {@link cascading.cascade.Cascade}
047     * scheduler. GlobHfs expects the files and paths to exist so the wildcards can be resolved into concrete values so
048     * that the scheduler can order the Flows properly.
049     * <p/>
050     * Note that globing can match files or directories. It may consume less resources to match directories and let
051     * Hadoop include all sub-files immediately contained in the directory instead of enumerating every individual file.
052     * Ending the glob path with a {@code /} should match only directories.
053     *
054     * @see Hfs
055     * @see cascading.tap.MultiSourceTap
056     * @see FileSystem
057     */
058    public class GlobHfs extends MultiSourceTap<Hfs, JobConf, RecordReader>
059      {
060      /** Field pathPattern */
061      private final String pathPattern;
062      /** Field pathFilter */
063      private final PathFilter pathFilter;
064    
065      /**
066       * Constructor GlobHfs creates a new GlobHfs instance.
067       *
068       * @param scheme      of type Scheme
069       * @param pathPattern of type String
070       */
071      @ConstructorProperties({"scheme", "pathPattern"})
072      public GlobHfs( Scheme<JobConf, RecordReader, ?, ?, ?> scheme, String pathPattern )
073        {
074        this( scheme, pathPattern, null );
075        }
076    
077      /**
078       * Constructor GlobHfs creates a new GlobHfs instance.
079       *
080       * @param scheme      of type Scheme
081       * @param pathPattern of type String
082       * @param pathFilter  of type PathFilter
083       */
084      @ConstructorProperties({"scheme", "pathPattern", "pathFilter"})
085      public GlobHfs( Scheme<JobConf, RecordReader, ?, ?, ?> scheme, String pathPattern, PathFilter pathFilter )
086        {
087        super( scheme );
088        this.pathPattern = pathPattern;
089        this.pathFilter = pathFilter;
090        }
091    
092      @Override
093      public String getIdentifier()
094        {
095        return pathPattern;
096        }
097    
098      @Override
099      protected Hfs[] getTaps()
100        {
101        return initTapsInternal( new JobConf() );
102        }
103    
104      private Hfs[] initTapsInternal( JobConf conf )
105        {
106        if( taps != null )
107          return taps;
108    
109        try
110          {
111          taps = makeTaps( conf );
112          }
113        catch( IOException exception )
114          {
115          throw new TapException( "unable to resolve taps for globing path: " + pathPattern );
116          }
117    
118        return taps;
119        }
120    
121      private Hfs[] makeTaps( JobConf conf ) throws IOException
122        {
123        FileStatus[] statusList;
124    
125        Path path = new Path( pathPattern );
126    
127        FileSystem fileSystem = path.getFileSystem( conf );
128    
129        if( pathFilter == null )
130          statusList = fileSystem.globStatus( path );
131        else
132          statusList = fileSystem.globStatus( path, pathFilter );
133    
134        if( statusList == null || statusList.length == 0 )
135          throw new TapException( "unable to find paths matching path pattern: " + pathPattern );
136    
137        List<Hfs> notEmpty = new ArrayList<Hfs>();
138    
139        for( int i = 0; i < statusList.length; i++ )
140          {
141          // remove empty files. some hadoop versions return non-zero for dirs
142          // so this jives with the expectations set in the above javadoc
143          if( statusList[ i ].isDir() || statusList[ i ].getLen() != 0 )
144            notEmpty.add( new Hfs( getScheme(), statusList[ i ].getPath().toString() ) );
145          }
146    
147        if( notEmpty.isEmpty() )
148          throw new TapException( "all paths matching path pattern are zero length and not directories: " + pathPattern );
149    
150        return notEmpty.toArray( new Hfs[ notEmpty.size() ] );
151        }
152    
153      @Override
154      public void sourceConfInit( FlowProcess<JobConf> process, JobConf conf )
155        {
156        Hfs[] taps = initTapsInternal( conf );
157    
158        for( Hfs tap : taps )
159          taps[ 0 ].sourceConfInitAddInputPath( conf, tap.getPath() ); // we are building fully qualified paths above
160    
161        taps[ 0 ].sourceConfInitComplete( process, conf );
162        }
163    
164      @Override
165      public boolean equals( Object object )
166        {
167        if( this == object )
168          return true;
169        if( object == null || getClass() != object.getClass() )
170          return false;
171    
172        GlobHfs globHfs = (GlobHfs) object;
173    
174        // do not compare tap arrays, these values should be sufficient to show identity
175        if( getScheme() != null ? !getScheme().equals( globHfs.getScheme() ) : globHfs.getScheme() != null )
176          return false;
177        if( pathFilter != null ? !pathFilter.equals( globHfs.pathFilter ) : globHfs.pathFilter != null )
178          return false;
179        if( pathPattern != null ? !pathPattern.equals( globHfs.pathPattern ) : globHfs.pathPattern != null )
180          return false;
181    
182        return true;
183        }
184    
185      @Override
186      public int hashCode()
187        {
188        int result = pathPattern != null ? pathPattern.hashCode() : 0;
189        result = 31 * result + ( pathFilter != null ? pathFilter.hashCode() : 0 );
190        return result;
191        }
192    
193      @Override
194      public String toString()
195        {
196        return "GlobHfs[" + pathPattern + ']';
197        }
198      }