001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.tap.hadoop;
022    
023    import java.io.IOException;
024    import java.net.URI;
025    import java.util.ArrayList;
026    import java.util.List;
027    
028    import cascading.flow.FlowProcess;
029    import cascading.flow.hadoop.util.HadoopUtil;
030    import cascading.tap.DecoratorTap;
031    import cascading.tap.MultiSourceTap;
032    import cascading.tap.Tap;
033    import cascading.tap.TapException;
034    import cascading.tuple.TupleEntryIterator;
035    import org.apache.hadoop.filecache.DistributedCache;
036    import org.apache.hadoop.fs.FileStatus;
037    import org.apache.hadoop.fs.FileSystem;
038    import org.apache.hadoop.fs.Path;
039    import org.apache.hadoop.mapred.JobConf;
040    import org.apache.hadoop.mapred.OutputCollector;
041    import org.apache.hadoop.mapred.RecordReader;
042    import org.slf4j.Logger;
043    import org.slf4j.LoggerFactory;
044    
045    /**
046     * Class DistCacheTap is a Tap decorator for Hfs and can be used to move a file to the
047     * {@link org.apache.hadoop.filecache.DistributedCache} on read when accessed cluster side.
048     * <p/>
049     * This is useful for {@link cascading.pipe.HashJoin}s.
050     * <p/>
051     * The distributed cache is only used when the Tap is used as a source. If the DistCacheTap is used as a sink,
052     * it will delegate to the provided parent instance and not use the DistributedCache.
053     */
054    public class DistCacheTap extends DecoratorTap<Void, JobConf, RecordReader, OutputCollector>
055      {
056      /** logger. */
057      private static final Logger LOG = LoggerFactory.getLogger( DistCacheTap.class );
058    
059      /**
060       * Constructs a new DistCacheTap instance with the given Hfs.
061       *
062       * @param parent an Hfs or GlobHfs instance representing a small file.
063       */
064      public DistCacheTap( Hfs parent )
065        {
066        super( parent );
067        }
068    
069      @Override
070      public void sourceConfInit( FlowProcess<JobConf> process, JobConf conf )
071        {
072        if( HadoopUtil.isLocal( conf ) || Tap.id( this ).equals( conf.get( "cascading.step.source" ) ) )
073          {
074          LOG.info( "can't use distributed cache. reading '{}' from hdfs.", getIdentifier() );
075          super.sourceConfInit( process, conf );
076          return;
077          }
078        try
079          {
080          registerHfs( process, conf, getHfs() );
081          }
082        catch( IOException exception )
083          {
084          throw new TapException( exception );
085          }
086    
087        }
088    
089      @Override
090      public TupleEntryIterator openForRead( FlowProcess<JobConf> flowProcess, RecordReader input ) throws IOException
091        {
092        // always read via Hadoop FileSystem if in standalone/local mode, or if an RecordReader is provided
093        if( HadoopUtil.isLocal( flowProcess.getConfigCopy() ) || input != null )
094          {
095          LOG.info( "delegating to parent." );
096          return super.openForRead( flowProcess, input );
097          }
098    
099        Path[] cachedFiles = DistributedCache.getLocalCacheFiles( flowProcess.getConfigCopy() );
100    
101        if( cachedFiles == null || cachedFiles.length == 0 )
102          return super.openForRead( flowProcess, input );
103    
104        List<Path> paths = new ArrayList<Path>();
105        List<Tap> taps = new ArrayList<Tap>();
106    
107        if( isSimpleGlob() )
108          {
109          FileSystem fs = FileSystem.get( flowProcess.getConfigCopy() );
110          FileStatus[] statuses = fs.globStatus( getHfs().getPath() );
111    
112          for( FileStatus status : statuses )
113            paths.add( status.getPath() );
114          }
115        else
116          {
117          paths.add( getHfs().getPath() );
118          }
119    
120        for( Path pathToFind : paths )
121          {
122          for( Path path : cachedFiles )
123            {
124            if( path.toString().endsWith( pathToFind.getName() ) )
125              {
126              LOG.info( "found {} in distributed cache", path );
127              taps.add( new Lfs( getScheme(), path.toString() ) );
128              }
129            }
130          }
131    
132        if( paths.isEmpty() ) // not in cache, read from HDFS
133          {
134          LOG.info( "could not find files in distributed cache. delegating to parent: {}", super.getIdentifier() );
135          return super.openForRead( flowProcess, input );
136          }
137    
138        return new MultiSourceTap( taps.toArray( new Tap[ taps.size() ] ) ).openForRead( flowProcess, input );
139        }
140    
141      private void registerHfs( FlowProcess<JobConf> process, JobConf conf, Hfs hfs ) throws IOException
142        {
143        if( isSimpleGlob() )
144          {
145          FileSystem fs = FileSystem.get( conf );
146          FileStatus[] statuses = fs.globStatus( getHfs().getPath() );
147    
148          if( statuses == null || statuses.length == 0 )
149            throw new TapException( String.format( "glob expression %s does not match any files on the filesystem", getHfs().getPath() ) );
150    
151          for( FileStatus fileStatus : statuses )
152            registerURI( conf, fileStatus.getPath() );
153          }
154        else
155          {
156          registerURI( conf, hfs.getPath() );
157          }
158    
159        hfs.sourceConfInitComplete( process, conf );
160        }
161    
162      private void registerURI( JobConf conf, Path path )
163        {
164        URI uri = path.toUri();
165        LOG.info( "adding {} to distributed cache ", uri );
166        DistributedCache.addCacheFile( uri, conf );
167        }
168    
169      private Hfs getHfs()
170        {
171        return (Hfs) getOriginal();
172        }
173    
174      private boolean isSimpleGlob()
175        {
176        return getHfs().getIdentifier().contains( "*" );
177        }
178      }