001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.tap.hadoop.io;
022    
023    import java.io.DataInput;
024    import java.io.DataOutput;
025    import java.io.IOException;
026    import java.util.HashMap;
027    import java.util.Map;
028    
029    import cascading.flow.hadoop.util.HadoopUtil;
030    import org.apache.hadoop.fs.Path;
031    import org.apache.hadoop.io.WritableUtils;
032    import org.apache.hadoop.mapred.FileSplit;
033    import org.apache.hadoop.mapred.InputSplit;
034    import org.apache.hadoop.mapred.JobConf;
035    import org.apache.hadoop.mapred.JobConfigurable;
036    import org.apache.hadoop.util.ReflectionUtils;
037    import org.slf4j.Logger;
038    import org.slf4j.LoggerFactory;
039    
040    /** Class MultiInputSplit is used by MultiInputFormat */
041    public class MultiInputSplit implements InputSplit, JobConfigurable
042      {
043      public static final String CASCADING_SOURCE_PATH = "cascading.source.path";
044      private static final Logger LOG = LoggerFactory.getLogger( MultiInputSplit.class );
045    
046      /** Field jobConf */
047      private transient JobConf jobConf;
048      /** Field inputSplit */
049      InputSplit inputSplit;
050      /** Field config */
051      Map<String, String> config;
052    
053      /**
054       * Method getCurrentTapSourcePath finds and returns the current source Tap filename path, if any.
055       * <p/>
056       * Use this method inside an Operation to find the current file being processed.
057       *
058       * @param jobConf
059       * @return a String
060       */
061      public static String getCurrentTapSourcePath( JobConf jobConf )
062        {
063        return jobConf.get( CASCADING_SOURCE_PATH );
064        }
065    
066      public MultiInputSplit( InputSplit inputSplit, Map<String, String> config )
067        {
068        if( inputSplit == null )
069          throw new IllegalArgumentException( "input split may not be null" );
070    
071        if( config == null )
072          throw new IllegalArgumentException( "config may not be null" );
073    
074        this.inputSplit = inputSplit;
075        this.config = config;
076        }
077    
078      /**
079       * This constructor is used internally by Hadoop. it is expected {@link #configure(org.apache.hadoop.mapred.JobConf)}
080       * and {@link #readFields(java.io.DataInput)} are called to properly initialize.
081       */
082      public MultiInputSplit()
083        {
084        }
085    
086      public void configure( JobConf jobConf )
087        {
088        this.jobConf = jobConf;
089        }
090    
091      public long getLength() throws IOException
092        {
093        return inputSplit.getLength();
094        }
095    
096      public String[] getLocations() throws IOException
097        {
098        return inputSplit.getLocations();
099        }
100    
101      public InputSplit getWrappedInputSplit()
102        {
103        return inputSplit;
104        }
105    
106      public void write( DataOutput out ) throws IOException
107        {
108        out.writeUTF( inputSplit.getClass().getName() );
109    
110        String[] keys = config.keySet().toArray( new String[ config.size() ] );
111        String[] values = new String[ keys.length ];
112    
113        for( int i = 0; i < keys.length; i++ )
114          values[ i ] = config.get( keys[ i ] );
115    
116        WritableUtils.writeStringArray( out, keys );
117        WritableUtils.writeStringArray( out, values );
118    
119        inputSplit.write( out );
120        }
121    
122      public void readFields( DataInput in ) throws IOException
123        {
124        String splitType = in.readUTF();
125        config = new HashMap<String, String>();
126    
127        String[] keys = WritableUtils.readStringArray( in );
128        String[] values = WritableUtils.readStringArray( in );
129    
130        for( int i = 0; i < keys.length; i++ )
131          config.put( keys[ i ], values[ i ] );
132    
133        if( LOG.isDebugEnabled() )
134          {
135          LOG.debug( "current split config diff:" );
136          for( Map.Entry<String, String> entry : config.entrySet() )
137            LOG.debug( "key: {}, value: {}", entry.getKey(), entry.getValue() );
138          }
139    
140        JobConf currentConf = HadoopUtil.mergeConf( jobConf, config, false );
141    
142        try
143          {
144          inputSplit = (InputSplit) ReflectionUtils.newInstance( currentConf.getClassByName( splitType ), currentConf );
145          }
146        catch( ClassNotFoundException exp )
147          {
148          throw new IOException( "split class " + splitType + " not found" );
149          }
150    
151        inputSplit.readFields( in );
152    
153        if( inputSplit instanceof FileSplit )
154          {
155          Path path = ( (FileSplit) inputSplit ).getPath();
156    
157          if( path != null )
158            {
159            jobConf.set( CASCADING_SOURCE_PATH, path.toString() );
160    
161            LOG.info( "current split input path: {}", path );
162            }
163          }
164        }
165      }