001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.tap.hadoop.io;
022    
023    import java.io.IOException;
024    
025    import cascading.flow.FlowProcess;
026    import cascading.flow.hadoop.util.HadoopUtil;
027    import cascading.tap.Tap;
028    import cascading.tap.TapException;
029    import cascading.tuple.Tuple;
030    import cascading.util.CloseableIterator;
031    import org.apache.hadoop.mapred.InputFormat;
032    import org.apache.hadoop.mapred.InputSplit;
033    import org.apache.hadoop.mapred.JobConf;
034    import org.apache.hadoop.mapred.JobConfigurable;
035    import org.apache.hadoop.mapred.RecordReader;
036    import org.apache.hadoop.mapred.Reporter;
037    import org.slf4j.Logger;
038    import org.slf4j.LoggerFactory;
039    
040    /**
041     * Class TapIterator is an implementation of {@link cascading.util.CloseableIterator}. It is returned by {@link cascading.tap.Tap} instances when
042     * opening the taps resource for reading.
043     */
044    public class MultiRecordReaderIterator implements CloseableIterator<RecordReader>
045      {
046      /** Field LOG */
047      private static final Logger LOG = LoggerFactory.getLogger( MultiRecordReaderIterator.class );
048    
049      private final FlowProcess<JobConf> flowProcess;
050      /** Field tap */
051      private final Tap tap;
052      /** Field inputFormat */
053      private InputFormat inputFormat;
054      /** Field conf */
055      private JobConf conf;
056      /** Field splits */
057      private InputSplit[] splits;
058      /** Field reader */
059      private RecordReader reader;
060    
061      /** Field lastReader */
062      private RecordReader lastReader;
063    
064      /** Field currentSplit */
065      private int currentSplit = 0;
066      /** Field complete */
067      private boolean complete = false;
068    
069      /**
070       * Constructor TapIterator creates a new TapIterator instance.
071       *
072       * @throws IOException when
073       */
074      public MultiRecordReaderIterator( FlowProcess<JobConf> flowProcess, Tap tap ) throws IOException
075        {
076        this.flowProcess = flowProcess;
077        this.tap = tap;
078        this.conf = flowProcess.getConfigCopy();
079    
080        initialize();
081        }
082    
083      private void initialize() throws IOException
084        {
085        // prevent collisions of configuration properties set client side if now cluster side
086        String property = flowProcess.getStringProperty( "cascading.step.accumulated.source.conf." + Tap.id( tap ) );
087    
088        if( property == null )
089          {
090          // default behavior is to accumulate paths, so remove any set prior
091          conf = HadoopUtil.removePropertiesFrom( conf, "mapred.input.dir", "mapreduce.input.fileinputformat.inputdir" ); // hadoop2
092          tap.sourceConfInit( flowProcess, conf );
093          }
094    
095        inputFormat = conf.getInputFormat();
096    
097        if( inputFormat instanceof JobConfigurable )
098          ( (JobConfigurable) inputFormat ).configure( conf );
099    
100        // do not test for existence, let hadoop decide how to handle the given path
101        // this delegates globbing to the inputformat on split generation.
102        splits = inputFormat.getSplits( conf, 1 );
103    
104        if( splits.length == 0 )
105          complete = true;
106        }
107    
108      private RecordReader makeReader( int currentSplit ) throws IOException
109        {
110        LOG.debug( "reading split: {}", currentSplit );
111    
112        return inputFormat.getRecordReader( splits[ currentSplit ], conf, Reporter.NULL );
113        }
114    
115      /**
116       * Method hasNext returns true if there more {@link Tuple} instances available.
117       *
118       * @return boolean
119       */
120      public boolean hasNext()
121        {
122        getNextReader();
123    
124        return !complete;
125        }
126    
127      /**
128       * Method next returns the next {@link Tuple}.
129       *
130       * @return Tuple
131       */
132      public RecordReader next()
133        {
134        if( complete )
135          throw new IllegalStateException( "no more values" );
136    
137        try
138          {
139          getNextReader();
140    
141          return reader;
142          }
143        finally
144          {
145          reader = null;
146          }
147        }
148    
149      private void getNextReader()
150        {
151        if( complete || reader != null )
152          return;
153    
154        try
155          {
156          if( currentSplit < splits.length )
157            {
158            if( lastReader != null )
159              lastReader.close();
160    
161            reader = makeReader( currentSplit++ );
162            lastReader = reader;
163            }
164          else
165            {
166            complete = true;
167            }
168          }
169        catch( IOException exception )
170          {
171          throw new TapException( "could not get next tuple", exception );
172          }
173        }
174    
175      public void remove()
176        {
177        throw new UnsupportedOperationException( "unimplemented" );
178        }
179    
180      @Override
181      public void close() throws IOException
182        {
183        if( lastReader != null )
184          lastReader.close();
185        }
186      }