001/*
002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.tap.hadoop.io;
022
023import java.io.DataInput;
024import java.io.DataOutput;
025import java.io.IOException;
026import java.util.HashMap;
027import java.util.Map;
028
029import cascading.flow.hadoop.util.HadoopUtil;
030import org.apache.hadoop.fs.Path;
031import org.apache.hadoop.io.WritableUtils;
032import org.apache.hadoop.mapred.FileSplit;
033import org.apache.hadoop.mapred.InputSplit;
034import org.apache.hadoop.mapred.JobConf;
035import org.apache.hadoop.mapred.JobConfigurable;
036import org.apache.hadoop.util.ReflectionUtils;
037import org.slf4j.Logger;
038import org.slf4j.LoggerFactory;
039
040/** Class MultiInputSplit is used by MultiInputFormat */
041public class MultiInputSplit implements InputSplit, JobConfigurable
042  {
043  public static final String CASCADING_SOURCE_PATH = "cascading.source.path";
044  private static final Logger LOG = LoggerFactory.getLogger( MultiInputSplit.class );
045
046  /** Field jobConf */
047  private transient JobConf jobConf;
048  /** Field inputSplit */
049  InputSplit inputSplit;
050  /** Field config */
051  Map<String, String> config;
052
053  /**
054   * Method getCurrentTapSourcePath finds and returns the current source Tap filename path, if any.
055   * <p/>
056   * Use this method inside an Operation to find the current file being processed.
057   *
058   * @param jobConf
059   * @return a String
060   */
061  public static String getCurrentTapSourcePath( JobConf jobConf )
062    {
063    return jobConf.get( CASCADING_SOURCE_PATH );
064    }
065
066  public MultiInputSplit( InputSplit inputSplit, Map<String, String> config )
067    {
068    if( inputSplit == null )
069      throw new IllegalArgumentException( "input split may not be null" );
070
071    if( config == null )
072      throw new IllegalArgumentException( "config may not be null" );
073
074    this.inputSplit = inputSplit;
075    this.config = config;
076    }
077
078  /**
079   * This constructor is used internally by Hadoop. it is expected {@link #configure(org.apache.hadoop.mapred.JobConf)}
080   * and {@link #readFields(java.io.DataInput)} are called to properly initialize.
081   */
082  public MultiInputSplit()
083    {
084    }
085
086  public void configure( JobConf jobConf )
087    {
088    this.jobConf = jobConf;
089    }
090
091  public long getLength() throws IOException
092    {
093    return inputSplit.getLength();
094    }
095
096  public String[] getLocations() throws IOException
097    {
098    return inputSplit.getLocations();
099    }
100
101  public InputSplit getWrappedInputSplit()
102    {
103    return inputSplit;
104    }
105
106  public void write( DataOutput out ) throws IOException
107    {
108    out.writeUTF( inputSplit.getClass().getName() );
109
110    String[] keys = config.keySet().toArray( new String[ config.size() ] );
111    String[] values = new String[ keys.length ];
112
113    for( int i = 0; i < keys.length; i++ )
114      values[ i ] = config.get( keys[ i ] );
115
116    WritableUtils.writeStringArray( out, keys );
117    WritableUtils.writeStringArray( out, values );
118
119    inputSplit.write( out );
120    }
121
122  public void readFields( DataInput in ) throws IOException
123    {
124    String splitType = in.readUTF();
125    config = new HashMap<String, String>();
126
127    String[] keys = WritableUtils.readStringArray( in );
128    String[] values = WritableUtils.readStringArray( in );
129
130    for( int i = 0; i < keys.length; i++ )
131      config.put( keys[ i ], values[ i ] );
132
133    if( LOG.isDebugEnabled() )
134      {
135      LOG.debug( "current split config diff:" );
136      for( Map.Entry<String, String> entry : config.entrySet() )
137        LOG.debug( "key: {}, value: {}", entry.getKey(), entry.getValue() );
138      }
139
140    JobConf currentConf = HadoopUtil.mergeConf( jobConf, config, false );
141
142    try
143      {
144      inputSplit = (InputSplit) ReflectionUtils.newInstance( currentConf.getClassByName( splitType ), currentConf );
145      }
146    catch( ClassNotFoundException exp )
147      {
148      throw new IOException( "split class " + splitType + " not found" );
149      }
150
151    inputSplit.readFields( in );
152
153    if( inputSplit instanceof FileSplit )
154      {
155      Path path = ( (FileSplit) inputSplit ).getPath();
156
157      if( path != null )
158        {
159        jobConf.set( CASCADING_SOURCE_PATH, path.toString() );
160
161        LOG.info( "current split input path: {}", path );
162        }
163      }
164    }
165  }