001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.tap.hadoop.io;
022    
023    import java.io.IOException;
024    
025    import org.apache.hadoop.conf.Configurable;
026    import org.apache.hadoop.conf.Configuration;
027    import org.apache.hadoop.mapred.FileInputFormat;
028    import org.apache.hadoop.mapred.FileSplit;
029    import org.apache.hadoop.mapred.JobConf;
030    import org.apache.hadoop.mapred.RecordReader;
031    import org.apache.hadoop.mapred.Reporter;
032    import org.apache.hadoop.mapred.lib.CombineFileSplit;
033    
034    /**
035     * A wrapper class for a record reader that handles a single file split. It delegates most of the
036     * methods to the wrapped instance. We need this wrapper to satisfy the constructor requirement to
037     * be used with hadoop's CombineFileRecordReader class.
038     *
039     * @see org.apache.hadoop.mapred.lib.CombineFileRecordReader
040     * @see org.apache.hadoop.mapred.lib.CombineFileInputFormat
041     */
042    public class CombineFileRecordReaderWrapper<K, V> implements RecordReader<K, V>
043      {
044      /** property that indicates how individual input format is to be interpreted */
045      public static final String INDIVIDUAL_INPUT_FORMAT = "cascading.individual.input.format";
046    
047      private final RecordReader<K, V> delegate;
048    
049      // this constructor signature is required by CombineFileRecordReader
050      public CombineFileRecordReaderWrapper( CombineFileSplit split, Configuration conf, Reporter reporter, Integer idx ) throws Exception
051        {
052        FileSplit fileSplit = new FileSplit(
053          split.getPath( idx ),
054          split.getOffset( idx ),
055          split.getLength( idx ),
056          split.getLocations()
057        );
058    
059        Class<?> clz = conf.getClass( INDIVIDUAL_INPUT_FORMAT, null );
060        FileInputFormat<K, V> inputFormat = (FileInputFormat<K, V>) clz.newInstance();
061    
062        if( inputFormat instanceof Configurable )
063          ( (Configurable) inputFormat ).setConf( conf );
064    
065        delegate = inputFormat.getRecordReader( fileSplit, (JobConf) conf, reporter );
066        }
067    
068      public boolean next( K key, V value ) throws IOException
069        {
070        return delegate.next( key, value );
071        }
072    
073      public K createKey()
074        {
075        return delegate.createKey();
076        }
077    
078      public V createValue()
079        {
080        return delegate.createValue();
081        }
082    
083      public long getPos() throws IOException
084        {
085        return delegate.getPos();
086        }
087    
088      public void close() throws IOException
089        {
090        delegate.close();
091        }
092    
093      public float getProgress() throws IOException
094        {
095        return delegate.getProgress();
096        }
097      }