001/*
002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.tap.hadoop;
022
023import java.io.IOException;
024import java.net.URI;
025import java.util.ArrayList;
026import java.util.List;
027
028import cascading.flow.FlowProcess;
029import cascading.flow.hadoop.util.HadoopUtil;
030import cascading.tap.DecoratorTap;
031import cascading.tap.MultiSourceTap;
032import cascading.tap.Tap;
033import cascading.tap.TapException;
034import cascading.tuple.TupleEntryIterator;
035import org.apache.hadoop.conf.Configuration;
036import org.apache.hadoop.fs.FileStatus;
037import org.apache.hadoop.fs.FileSystem;
038import org.apache.hadoop.fs.Path;
039import org.apache.hadoop.mapred.OutputCollector;
040import org.apache.hadoop.mapred.RecordReader;
041import org.slf4j.Logger;
042import org.slf4j.LoggerFactory;
043
044/**
045 *
046 */
047public abstract class BaseDistCacheTap extends DecoratorTap<Void, Configuration, RecordReader, OutputCollector>
048  {
049  /** logger. */
050  private static final Logger LOG = LoggerFactory.getLogger( DistCacheTap.class );
051
052  public BaseDistCacheTap( Tap<Configuration, RecordReader, OutputCollector> original )
053    {
054    super( original );
055    }
056
057  @Override
058  public void sourceConfInit( FlowProcess<? extends Configuration> process, Configuration conf )
059    {
060    if( HadoopUtil.isLocal( conf ) ||
061      Tap.id( this ).equals( conf.get( "cascading.node.source" ) ) ||
062      Tap.id( this ).equals( conf.get( "cascading.step.source" ) ) )
063      {
064      LOG.info( "can't use distributed cache. reading '{}' from hdfs", super.getIdentifier() );
065      super.sourceConfInit( process, conf );
066      return;
067      }
068    try
069      {
070      registerHfs( process, conf, getHfs() );
071      }
072    catch( IOException exception )
073      {
074      throw new TapException( exception );
075      }
076    }
077
078  @Override
079  public TupleEntryIterator openForRead( FlowProcess<? extends Configuration> flowProcess, RecordReader input ) throws IOException
080    {
081    // always read via Hadoop FileSystem if in standalone/local mode, or if an RecordReader is provided
082    if( HadoopUtil.isLocal( flowProcess.getConfig() ) || input != null )
083      {
084      LOG.info( "delegating to parent" );
085      return super.openForRead( flowProcess, input );
086      }
087
088    Path[] cachedFiles = getLocalCacheFiles( flowProcess );
089
090    if( cachedFiles == null || cachedFiles.length == 0 )
091      return super.openForRead( flowProcess, null );
092
093    List<Path> paths = new ArrayList<>();
094    List<Tap> taps = new ArrayList<>();
095
096    if( isSimpleGlob() )
097      {
098      FileSystem fs = FileSystem.get( flowProcess.getConfig() );
099      FileStatus[] statuses = fs.globStatus( getHfs().getPath() );
100
101      for( FileStatus status : statuses )
102        paths.add( status.getPath() );
103      }
104    else
105      {
106      paths.add( getHfs().getPath() );
107      }
108
109    for( Path pathToFind : paths )
110      {
111      for( Path path : cachedFiles )
112        {
113        if( path.toString().endsWith( pathToFind.getName() ) )
114          {
115          LOG.info( "found {} in distributed cache", path );
116          taps.add( new Lfs( getScheme(), path.toString() ) );
117          }
118        }
119      }
120
121    if( paths.isEmpty() ) // not in cache, read from HDFS
122      {
123      LOG.info( "could not find files in local resource path. delegating to parent: {}", super.getIdentifier() );
124      return super.openForRead( flowProcess, input );
125      }
126
127    return new MultiSourceTap( taps.toArray( new Tap[ taps.size() ] ) ).openForRead( flowProcess, input );
128    }
129
130  private void registerHfs( FlowProcess<? extends Configuration> process, Configuration conf, Hfs hfs ) throws IOException
131    {
132    if( isSimpleGlob() )
133      {
134      FileSystem fs = FileSystem.get( conf );
135      FileStatus[] statuses = fs.globStatus( getHfs().getPath() );
136
137      if( statuses == null || statuses.length == 0 )
138        throw new TapException( String.format( "glob expression %s does not match any files on the filesystem", getHfs().getPath() ) );
139
140      for( FileStatus fileStatus : statuses )
141        registerURI( conf, fileStatus.getPath() );
142      }
143    else
144      {
145      registerURI( conf, hfs.getPath() );
146      }
147
148    hfs.sourceConfInitComplete( process, conf );
149    }
150
151  private void registerURI( Configuration conf, Path path )
152    {
153    URI uri = path.toUri();
154    LOG.info( "adding {} to local resource configuration ", uri );
155    addLocalCacheFiles( conf, uri );
156    }
157
158  private Hfs getHfs()
159    {
160    return (Hfs) getOriginal();
161    }
162
163  private boolean isSimpleGlob()
164    {
165    return getHfs().getIdentifier().contains( "*" );
166    }
167
168  protected abstract Path[] getLocalCacheFiles( FlowProcess<? extends Configuration> flowProcess ) throws IOException;
169
170  protected abstract void addLocalCacheFiles( Configuration conf, URI uri );
171  }