001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.tap.hadoop.io;
022    
023    import java.io.IOException;
024    import java.util.ArrayList;
025    import java.util.Arrays;
026    import java.util.Collections;
027    import java.util.List;
028    import java.util.Map;
029    
030    import cascading.CascadingException;
031    import cascading.flow.hadoop.util.HadoopUtil;
032    import cascading.util.Util;
033    import org.apache.hadoop.fs.Path;
034    import org.apache.hadoop.mapred.FileInputFormat;
035    import org.apache.hadoop.mapred.InputFormat;
036    import org.apache.hadoop.mapred.InputSplit;
037    import org.apache.hadoop.mapred.JobConf;
038    import org.apache.hadoop.mapred.RecordReader;
039    import org.apache.hadoop.mapred.Reporter;
040    import org.jets3t.service.S3ServiceException;
041    import org.slf4j.Logger;
042    import org.slf4j.LoggerFactory;
043    
044    /**
045     * Class MultiInputFormat accepts multiple InputFormat class declarations allowing a single MR job
046     * to read data from incompatible file types.
047     */
048    public class MultiInputFormat implements InputFormat
049      {
050      /** Field LOG */
051      private static final Logger LOG = LoggerFactory.getLogger( MultiInputFormat.class );
052    
053      /**
054       * Used to set the current JobConf with all sub jobs configurations.
055       *
056       * @param toJob
057       * @param fromJobs
058       */
059      public static void addInputFormat( JobConf toJob, JobConf... fromJobs )
060        {
061        toJob.setInputFormat( MultiInputFormat.class );
062        List<Map<String, String>> configs = new ArrayList<Map<String, String>>();
063        List<Path> allPaths = new ArrayList<Path>();
064    
065        boolean isLocal = false;
066    
067        for( JobConf fromJob : fromJobs )
068          {
069          if( fromJob.get( "mapred.input.format.class" ) == null )
070            throw new CascadingException( "mapred.input.format.class is required, should be set in source Scheme#sourceConfInit" );
071    
072          configs.add( HadoopUtil.getConfig( toJob, fromJob ) );
073          Collections.addAll( allPaths, FileInputFormat.getInputPaths( fromJob ) );
074    
075          if( !isLocal )
076            isLocal = HadoopUtil.isLocal( fromJob );
077          }
078    
079        if( !allPaths.isEmpty() ) // it's possible there aren't any
080          FileInputFormat.setInputPaths( toJob, (Path[]) allPaths.toArray( new Path[ allPaths.size() ] ) );
081    
082        try
083          {
084          toJob.set( "cascading.multiinputformats", HadoopUtil.serializeBase64( configs, toJob, true ) );
085          }
086        catch( IOException exception )
087          {
088          throw new CascadingException( "unable to pack input formats", exception );
089          }
090    
091        if( isLocal )
092          HadoopUtil.setLocal( toJob );
093        }
094    
095      static InputFormat[] getInputFormats( JobConf[] jobConfs )
096        {
097        InputFormat[] inputFormats = new InputFormat[ jobConfs.length ];
098    
099        for( int i = 0; i < jobConfs.length; i++ )
100          inputFormats[ i ] = jobConfs[ i ].getInputFormat();
101    
102        return inputFormats;
103        }
104    
105      private List<Map<String, String>> getConfigs( JobConf job ) throws IOException
106        {
107        return (List<Map<String, String>>)
108          HadoopUtil.deserializeBase64( job.get( "cascading.multiinputformats" ), job, ArrayList.class, true );
109        }
110    
111      public void validateInput( JobConf job ) throws IOException
112        {
113        // do nothing, is deprecated
114        }
115    
116      /**
117       * Method getSplits delegates to the appropriate InputFormat.
118       *
119       * @param job       of type JobConf
120       * @param numSplits of type int
121       * @return InputSplit[]
122       * @throws IOException when
123       */
124      public InputSplit[] getSplits( JobConf job, int numSplits ) throws IOException
125        {
126        numSplits = numSplits == 0 ? 1 : numSplits;
127    
128        List<Map<String, String>> configs = getConfigs( job );
129        JobConf[] jobConfs = HadoopUtil.getJobConfs( job, configs );
130        InputFormat[] inputFormats = getInputFormats( jobConfs );
131    
132        // if only one InputFormat, just return what ever it suggests
133        if( inputFormats.length == 1 )
134          return collapse( getSplits( inputFormats, jobConfs, new int[]{numSplits} ), configs );
135    
136        int[] indexedSplits = new int[ inputFormats.length ];
137    
138        // if we need only a few, the return one for each
139        if( numSplits <= inputFormats.length )
140          {
141          Arrays.fill( indexedSplits, 1 );
142          return collapse( getSplits( inputFormats, jobConfs, indexedSplits ), configs );
143          }
144    
145        // attempt to get splits proportionally sized per input format
146        long[] inputSplitSizes = getInputSplitSizes( inputFormats, jobConfs, numSplits );
147        long totalSplitSize = sum( inputSplitSizes );
148    
149        if( totalSplitSize == 0 )
150          {
151          Arrays.fill( indexedSplits, 1 );
152          return collapse( getSplits( inputFormats, jobConfs, indexedSplits ), configs );
153          }
154    
155        for( int i = 0; i < inputSplitSizes.length; i++ )
156          {
157          int useSplits = (int) Math.ceil( (double) numSplits * inputSplitSizes[ i ] / (double) totalSplitSize );
158          indexedSplits[ i ] = useSplits == 0 ? 1 : useSplits;
159          }
160    
161        return collapse( getSplits( inputFormats, jobConfs, indexedSplits ), configs );
162        }
163    
164      private long sum( long[] inputSizes )
165        {
166        long size = 0;
167    
168        for( long inputSize : inputSizes )
169          size += inputSize;
170    
171        return size;
172        }
173    
174      private InputSplit[] collapse( InputSplit[][] splits, List<Map<String, String>> configs )
175        {
176        List<InputSplit> splitsList = new ArrayList<InputSplit>();
177    
178        for( int i = 0; i < splits.length; i++ )
179          {
180          Map<String, String> config = configs.get( i );
181    
182          config.remove( "mapred.input.dir" ); // this is a redundant value, will show up cluster side
183          config.remove( "mapreduce.input.fileinputformat.inputdir" ); // hadoop2
184    
185          InputSplit[] split = splits[ i ];
186    
187          for( int j = 0; j < split.length; j++ )
188            splitsList.add( new MultiInputSplit( split[ j ], config ) );
189          }
190    
191        return splitsList.toArray( new InputSplit[ splitsList.size() ] );
192        }
193    
194      private InputSplit[][] getSplits( InputFormat[] inputFormats, JobConf[] jobConfs, int[] numSplits ) throws IOException
195        {
196        InputSplit[][] inputSplits = new InputSplit[ inputFormats.length ][];
197    
198        for( int i = 0; i < inputFormats.length; i++ )
199          {
200          inputSplits[ i ] = inputFormats[ i ].getSplits( jobConfs[ i ], numSplits[ i ] );
201    
202          // it's reasonable the split array is empty, but really shouldn't be null
203          if( inputSplits[ i ] == null )
204            inputSplits[ i ] = new InputSplit[ 0 ];
205    
206          for( int j = 0; j < inputSplits[ i ].length; j++ )
207            {
208            if( inputSplits[ i ][ j ] == null )
209              throw new IllegalStateException( "input format: " + inputFormats[ i ].getClass().getName() + ", returned a split array with nulls" );
210            }
211          }
212    
213        return inputSplits;
214        }
215    
216      private long[] getInputSplitSizes( InputFormat[] inputFormats, JobConf[] jobConfs, int numSplits ) throws IOException
217        {
218        long[] inputSizes = new long[ inputFormats.length ];
219    
220        for( int i = 0; i < inputFormats.length; i++ )
221          {
222          InputFormat inputFormat = inputFormats[ i ];
223          InputSplit[] splits = inputFormat.getSplits( jobConfs[ i ], numSplits );
224    
225          inputSizes[ i ] = splits.length;
226          }
227    
228        return inputSizes;
229        }
230    
231      /**
232       * Method getRecordReader delegates to the appropriate InputFormat.
233       *
234       * @param split    of type InputSplit
235       * @param job      of type JobConf
236       * @param reporter of type Reporter
237       * @return RecordReader
238       * @throws IOException when
239       */
240      public RecordReader getRecordReader( InputSplit split, JobConf job, final Reporter reporter ) throws IOException
241        {
242        final MultiInputSplit multiSplit = (MultiInputSplit) split;
243        final JobConf currentConf = HadoopUtil.mergeConf( job, multiSplit.config, true );
244    
245        try
246          {
247          return Util.retry( LOG, 3, 20, "unable to get record reader", new Util.RetryOperator<RecordReader>()
248          {
249    
250          @Override
251          public RecordReader operate() throws Exception
252            {
253            return currentConf.getInputFormat().getRecordReader( multiSplit.inputSplit, currentConf, reporter );
254            }
255    
256          @Override
257          public boolean rethrow( Exception exception )
258            {
259            return !( exception.getCause() instanceof S3ServiceException );
260            }
261          } );
262          }
263        catch( Exception exception )
264          {
265          if( exception instanceof RuntimeException )
266            throw (RuntimeException) exception;
267          else
268            throw (IOException) exception;
269          }
270        }
271      }