001 /* 002 * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.tap.hadoop.io; 022 023 import java.io.DataInput; 024 import java.io.DataOutput; 025 import java.io.IOException; 026 import java.util.HashMap; 027 import java.util.Map; 028 029 import cascading.flow.hadoop.util.HadoopUtil; 030 import org.apache.hadoop.fs.Path; 031 import org.apache.hadoop.io.WritableUtils; 032 import org.apache.hadoop.mapred.FileSplit; 033 import org.apache.hadoop.mapred.InputSplit; 034 import org.apache.hadoop.mapred.JobConf; 035 import org.apache.hadoop.mapred.JobConfigurable; 036 import org.apache.hadoop.util.ReflectionUtils; 037 import org.slf4j.Logger; 038 import org.slf4j.LoggerFactory; 039 040 /** Class MultiInputSplit is used by MultiInputFormat */ 041 public class MultiInputSplit implements InputSplit, JobConfigurable 042 { 043 public static final String CASCADING_SOURCE_PATH = "cascading.source.path"; 044 private static final Logger LOG = LoggerFactory.getLogger( MultiInputSplit.class ); 045 046 /** Field jobConf */ 047 private transient JobConf jobConf; 048 /** Field inputSplit */ 049 InputSplit inputSplit; 050 /** Field config */ 051 Map<String, String> config; 052 053 /** 054 * Method getCurrentTapSourcePath finds and returns the current source Tap filename path, if any. 055 * <p/> 056 * Use this method inside an Operation to find the current file being processed. 057 * 058 * @param jobConf 059 * @return a String 060 */ 061 public static String getCurrentTapSourcePath( JobConf jobConf ) 062 { 063 return jobConf.get( CASCADING_SOURCE_PATH ); 064 } 065 066 public MultiInputSplit( InputSplit inputSplit, Map<String, String> config ) 067 { 068 if( inputSplit == null ) 069 throw new IllegalArgumentException( "input split may not be null" ); 070 071 if( config == null ) 072 throw new IllegalArgumentException( "config may not be null" ); 073 074 this.inputSplit = inputSplit; 075 this.config = config; 076 } 077 078 /** 079 * This constructor is used internally by Hadoop. it is expected {@link #configure(org.apache.hadoop.mapred.JobConf)} 080 * and {@link #readFields(java.io.DataInput)} are called to properly initialize. 081 */ 082 public MultiInputSplit() 083 { 084 } 085 086 public void configure( JobConf jobConf ) 087 { 088 this.jobConf = jobConf; 089 } 090 091 public long getLength() throws IOException 092 { 093 return inputSplit.getLength(); 094 } 095 096 public String[] getLocations() throws IOException 097 { 098 return inputSplit.getLocations(); 099 } 100 101 public InputSplit getWrappedInputSplit() 102 { 103 return inputSplit; 104 } 105 106 public void write( DataOutput out ) throws IOException 107 { 108 out.writeUTF( inputSplit.getClass().getName() ); 109 110 String[] keys = config.keySet().toArray( new String[ config.size() ] ); 111 String[] values = new String[ keys.length ]; 112 113 for( int i = 0; i < keys.length; i++ ) 114 values[ i ] = config.get( keys[ i ] ); 115 116 WritableUtils.writeStringArray( out, keys ); 117 WritableUtils.writeStringArray( out, values ); 118 119 inputSplit.write( out ); 120 } 121 122 public void readFields( DataInput in ) throws IOException 123 { 124 String splitType = in.readUTF(); 125 config = new HashMap<String, String>(); 126 127 String[] keys = WritableUtils.readStringArray( in ); 128 String[] values = WritableUtils.readStringArray( in ); 129 130 for( int i = 0; i < keys.length; i++ ) 131 config.put( keys[ i ], values[ i ] ); 132 133 if( LOG.isDebugEnabled() ) 134 { 135 LOG.debug( "current split config diff:" ); 136 for( Map.Entry<String, String> entry : config.entrySet() ) 137 LOG.debug( "key: {}, value: {}", entry.getKey(), entry.getValue() ); 138 } 139 140 JobConf currentConf = HadoopUtil.mergeConf( jobConf, config, false ); 141 142 try 143 { 144 inputSplit = (InputSplit) ReflectionUtils.newInstance( currentConf.getClassByName( splitType ), currentConf ); 145 } 146 catch( ClassNotFoundException exp ) 147 { 148 throw new IOException( "split class " + splitType + " not found" ); 149 } 150 151 inputSplit.readFields( in ); 152 153 if( inputSplit instanceof FileSplit ) 154 { 155 Path path = ( (FileSplit) inputSplit ).getPath(); 156 157 if( path != null ) 158 { 159 jobConf.set( CASCADING_SOURCE_PATH, path.toString() ); 160 161 LOG.info( "current split input path: {}", path ); 162 } 163 } 164 } 165 }