001 /* 002 * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.tap.hadoop; 022 023 import java.io.IOException; 024 import java.net.URI; 025 import java.util.ArrayList; 026 import java.util.List; 027 028 import cascading.flow.FlowProcess; 029 import cascading.flow.hadoop.util.HadoopUtil; 030 import cascading.tap.DecoratorTap; 031 import cascading.tap.MultiSourceTap; 032 import cascading.tap.Tap; 033 import cascading.tap.TapException; 034 import cascading.tuple.TupleEntryIterator; 035 import org.apache.hadoop.filecache.DistributedCache; 036 import org.apache.hadoop.fs.FileStatus; 037 import org.apache.hadoop.fs.FileSystem; 038 import org.apache.hadoop.fs.Path; 039 import org.apache.hadoop.mapred.JobConf; 040 import org.apache.hadoop.mapred.OutputCollector; 041 import org.apache.hadoop.mapred.RecordReader; 042 import org.slf4j.Logger; 043 import org.slf4j.LoggerFactory; 044 045 /** 046 * Class DistCacheTap is a Tap decorator for Hfs and can be used to move a file to the 047 * {@link org.apache.hadoop.filecache.DistributedCache} on read when accessed cluster side. 048 * <p/> 049 * This is useful for {@link cascading.pipe.HashJoin}s. 050 * <p/> 051 * The distributed cache is only used when the Tap is used as a source. If the DistCacheTap is used as a sink, 052 * it will delegate to the provided parent instance and not use the DistributedCache. 053 */ 054 public class DistCacheTap extends DecoratorTap<Void, JobConf, RecordReader, OutputCollector> 055 { 056 /** logger. */ 057 private static final Logger LOG = LoggerFactory.getLogger( DistCacheTap.class ); 058 059 /** 060 * Constructs a new DistCacheTap instance with the given Hfs. 061 * 062 * @param parent an Hfs or GlobHfs instance representing a small file. 063 */ 064 public DistCacheTap( Hfs parent ) 065 { 066 super( parent ); 067 } 068 069 @Override 070 public void sourceConfInit( FlowProcess<JobConf> process, JobConf conf ) 071 { 072 if( HadoopUtil.isLocal( conf ) || Tap.id( this ).equals( conf.get( "cascading.step.source" ) ) ) 073 { 074 LOG.info( "can't use distributed cache. reading '{}' from hdfs.", getIdentifier() ); 075 super.sourceConfInit( process, conf ); 076 return; 077 } 078 try 079 { 080 registerHfs( process, conf, getHfs() ); 081 } 082 catch( IOException exception ) 083 { 084 throw new TapException( exception ); 085 } 086 087 } 088 089 @Override 090 public TupleEntryIterator openForRead( FlowProcess<JobConf> flowProcess, RecordReader input ) throws IOException 091 { 092 // always read via Hadoop FileSystem if in standalone/local mode, or if an RecordReader is provided 093 if( HadoopUtil.isLocal( flowProcess.getConfigCopy() ) || input != null ) 094 { 095 LOG.info( "delegating to parent." ); 096 return super.openForRead( flowProcess, input ); 097 } 098 099 Path[] cachedFiles = DistributedCache.getLocalCacheFiles( flowProcess.getConfigCopy() ); 100 101 if( cachedFiles == null || cachedFiles.length == 0 ) 102 return super.openForRead( flowProcess, input ); 103 104 List<Path> paths = new ArrayList<Path>(); 105 List<Tap> taps = new ArrayList<Tap>(); 106 107 if( isSimpleGlob() ) 108 { 109 FileSystem fs = FileSystem.get( flowProcess.getConfigCopy() ); 110 FileStatus[] statuses = fs.globStatus( getHfs().getPath() ); 111 112 for( FileStatus status : statuses ) 113 paths.add( status.getPath() ); 114 } 115 else 116 { 117 paths.add( getHfs().getPath() ); 118 } 119 120 for( Path pathToFind : paths ) 121 { 122 for( Path path : cachedFiles ) 123 { 124 if( path.toString().endsWith( pathToFind.getName() ) ) 125 { 126 LOG.info( "found {} in distributed cache", path ); 127 taps.add( new Lfs( getScheme(), path.toString() ) ); 128 } 129 } 130 } 131 132 if( paths.isEmpty() ) // not in cache, read from HDFS 133 { 134 LOG.info( "could not find files in distributed cache. delegating to parent: {}", super.getIdentifier() ); 135 return super.openForRead( flowProcess, input ); 136 } 137 138 return new MultiSourceTap( taps.toArray( new Tap[ taps.size() ] ) ).openForRead( flowProcess, input ); 139 } 140 141 private void registerHfs( FlowProcess<JobConf> process, JobConf conf, Hfs hfs ) throws IOException 142 { 143 if( isSimpleGlob() ) 144 { 145 FileSystem fs = FileSystem.get( conf ); 146 FileStatus[] statuses = fs.globStatus( getHfs().getPath() ); 147 148 if( statuses == null || statuses.length == 0 ) 149 throw new TapException( String.format( "glob expression %s does not match any files on the filesystem", getHfs().getPath() ) ); 150 151 for( FileStatus fileStatus : statuses ) 152 registerURI( conf, fileStatus.getPath() ); 153 } 154 else 155 { 156 registerURI( conf, hfs.getPath() ); 157 } 158 159 hfs.sourceConfInitComplete( process, conf ); 160 } 161 162 private void registerURI( JobConf conf, Path path ) 163 { 164 URI uri = path.toUri(); 165 LOG.info( "adding {} to distributed cache ", uri ); 166 DistributedCache.addCacheFile( uri, conf ); 167 } 168 169 private Hfs getHfs() 170 { 171 return (Hfs) getOriginal(); 172 } 173 174 private boolean isSimpleGlob() 175 { 176 return getHfs().getIdentifier().contains( "*" ); 177 } 178 }