001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.scheme.hadoop;
022    
023    import java.beans.ConstructorProperties;
024    import java.io.IOException;
025    import java.nio.charset.Charset;
026    import java.util.Arrays;
027    
028    import cascading.flow.FlowProcess;
029    import cascading.scheme.Scheme;
030    import cascading.scheme.SinkCall;
031    import cascading.scheme.SourceCall;
032    import cascading.tap.Tap;
033    import cascading.tuple.Fields;
034    import cascading.tuple.Tuple;
035    import cascading.tuple.TupleEntry;
036    import org.apache.hadoop.fs.Path;
037    import org.apache.hadoop.io.LongWritable;
038    import org.apache.hadoop.io.Text;
039    import org.apache.hadoop.mapred.FileInputFormat;
040    import org.apache.hadoop.mapred.FileOutputFormat;
041    import org.apache.hadoop.mapred.JobConf;
042    import org.apache.hadoop.mapred.OutputCollector;
043    import org.apache.hadoop.mapred.RecordReader;
044    import org.apache.hadoop.mapred.TextInputFormat;
045    import org.apache.hadoop.mapred.TextOutputFormat;
046    
047    /**
048     * A TextLine is a type of {@link cascading.scheme.Scheme} for plain text files. Files are broken into
049     * lines. Either line-feed or carriage-return are used to signal end of line.
050     * <p/>
051     * By default, this scheme returns a {@link Tuple} with two fields, "offset" and "line".
052     * <p/>
053     * Many of the constructors take both "sourceFields" and "sinkFields". sourceFields denote the field names
054     * to be used instead of the names "offset" and "line". sinkFields is a selector and is by default {@link Fields#ALL}.
055     * Any available field names can be given if only a subset of the incoming fields should be used.
056     * <p/>
057     * If a {@link Fields} instance is passed on the constructor as sourceFields having only one field, the return tuples
058     * will simply be the "line" value using the given field name.
059     * <p/>
060     * Note that TextLine will concatenate all the Tuple values for the selected fields with a TAB delimiter before
061     * writing out the line.
062     * <p/>
063     * Note sink compression is {@link Compress#DISABLE} by default. If {@code null} is passed to the constructor
064     * for the compression value, it will remain disabled.
065     * <p/>
066     * If any of the input files end with ".zip", an error will be thrown.
067     * * <p/>
068     * By default, all text is encoded/decoded as UTF-8. This can be changed via the {@code charsetName} constructor
069     * argument.
070     */
071    public class TextLine extends Scheme<JobConf, RecordReader, OutputCollector, Object[], Object[]>
072      {
073      public enum Compress
074        {
075          DEFAULT, ENABLE, DISABLE
076        }
077    
078      public static final String DEFAULT_CHARSET = "UTF-8";
079    
080      /** Field serialVersionUID */
081      private static final long serialVersionUID = 1L;
082      /** Field DEFAULT_SOURCE_FIELDS */
083      public static final Fields DEFAULT_SOURCE_FIELDS = new Fields( "offset", "line" );
084    
085      /** Field sinkCompression */
086      Compress sinkCompression = Compress.DISABLE;
087    
088      String charsetName = DEFAULT_CHARSET;
089    
090      /**
091       * Creates a new TextLine instance that sources "offset" and "line" fields, and sinks all incoming fields, where
092       * "offset" is the byte offset in the input file.
093       */
094      public TextLine()
095        {
096        super( DEFAULT_SOURCE_FIELDS );
097        }
098    
099      /**
100       * Creates a new TextLine instance that sources "offset" and "line" fields, and sinks all incoming fields, where
101       * "offset" is the byte offset in the input file.
102       *
103       * @param numSinkParts of type int
104       */
105      @ConstructorProperties({"numSinkParts"})
106      public TextLine( int numSinkParts )
107        {
108        super( DEFAULT_SOURCE_FIELDS, numSinkParts );
109        }
110    
111      /**
112       * Creates a new TextLine instance that sources "offset" and "line" fields, and sinks all incoming fields, where
113       * "offset" is the byte offset in the input file.
114       *
115       * @param sinkCompression of type Compress
116       */
117      @ConstructorProperties({"sinkCompression"})
118      public TextLine( Compress sinkCompression )
119        {
120        super( DEFAULT_SOURCE_FIELDS );
121    
122        setSinkCompression( sinkCompression );
123        }
124    
125      /**
126       * Creates a new TextLine instance. If sourceFields has one field, only the text line will be returned in the
127       * subsequent tuples.
128       *
129       * @param sourceFields the source fields for this scheme
130       * @param sinkFields   the sink fields for this scheme
131       */
132      @ConstructorProperties({"sourceFields", "sinkFields"})
133      public TextLine( Fields sourceFields, Fields sinkFields )
134        {
135        super( sourceFields, sinkFields );
136    
137        verify( sourceFields );
138        }
139    
140      /**
141       * Creates a new TextLine instance. If sourceFields has one field, only the text line will be returned in the
142       * subsequent tuples.
143       *
144       * @param sourceFields the source fields for this scheme
145       * @param sinkFields   the sink fields for this scheme
146       * @param charsetName  of type String
147       */
148      @ConstructorProperties({"sourceFields", "sinkFields", "charsetName"})
149      public TextLine( Fields sourceFields, Fields sinkFields, String charsetName )
150        {
151        super( sourceFields, sinkFields );
152    
153        // throws an exception if not found
154        setCharsetName( charsetName );
155    
156        verify( sourceFields );
157        }
158    
159      /**
160       * Creates a new TextLine instance. If sourceFields has one field, only the text line will be returned in the
161       * subsequent tuples.
162       *
163       * @param sourceFields the source fields for this scheme
164       * @param sinkFields   the sink fields for this scheme
165       * @param numSinkParts of type int
166       */
167      @ConstructorProperties({"sourceFields", "sinkFields", "numSinkParts"})
168      public TextLine( Fields sourceFields, Fields sinkFields, int numSinkParts )
169        {
170        super( sourceFields, sinkFields, numSinkParts );
171    
172        verify( sourceFields );
173        }
174    
175      /**
176       * Constructor TextLine creates a new TextLine instance. If sourceFields has one field, only the text line will be returned in the
177       * subsequent tuples.
178       *
179       * @param sourceFields    of type Fields
180       * @param sinkFields      of type Fields
181       * @param sinkCompression of type Compress
182       */
183      @ConstructorProperties({"sourceFields", "sinkFields", "sinkCompression"})
184      public TextLine( Fields sourceFields, Fields sinkFields, Compress sinkCompression )
185        {
186        super( sourceFields, sinkFields );
187    
188        setSinkCompression( sinkCompression );
189    
190        verify( sourceFields );
191        }
192    
193      /**
194       * Constructor TextLine creates a new TextLine instance. If sourceFields has one field, only the text line will be returned in the
195       * subsequent tuples.
196       *
197       * @param sourceFields    of type Fields
198       * @param sinkFields      of type Fields
199       * @param sinkCompression of type Compress
200       * @param charsetName     of type String
201       */
202      @ConstructorProperties({"sourceFields", "sinkFields", "sinkCompression", "charsetName"})
203      public TextLine( Fields sourceFields, Fields sinkFields, Compress sinkCompression, String charsetName )
204        {
205        super( sourceFields, sinkFields );
206    
207        setSinkCompression( sinkCompression );
208    
209        // throws an exception if not found
210        setCharsetName( charsetName );
211    
212        verify( sourceFields );
213        }
214    
215      /**
216       * Constructor TextLine creates a new TextLine instance. If sourceFields has one field, only the text line will be returned in the
217       * subsequent tuples.
218       *
219       * @param sourceFields    of type Fields
220       * @param sinkFields      of type Fields
221       * @param sinkCompression of type Compress
222       * @param numSinkParts    of type int
223       */
224      @ConstructorProperties({"sourceFields", "sinkFields", "sinkCompression", "numSinkParts"})
225      public TextLine( Fields sourceFields, Fields sinkFields, Compress sinkCompression, int numSinkParts )
226        {
227        super( sourceFields, sinkFields, numSinkParts );
228    
229        setSinkCompression( sinkCompression );
230    
231        verify( sourceFields );
232        }
233    
234      /**
235       * Constructor TextLine creates a new TextLine instance. If sourceFields has one field, only the text line will be returned in the
236       * subsequent tuples.
237       *
238       * @param sourceFields    of type Fields
239       * @param sinkFields      of type Fields
240       * @param sinkCompression of type Compress
241       * @param numSinkParts    of type int
242       * @param charsetName     of type String
243       */
244      @ConstructorProperties({"sourceFields", "sinkFields", "sinkCompression", "numSinkParts", "charsetName"})
245      public TextLine( Fields sourceFields, Fields sinkFields, Compress sinkCompression, int numSinkParts, String charsetName )
246        {
247        super( sourceFields, sinkFields, numSinkParts );
248    
249        setSinkCompression( sinkCompression );
250    
251        // throws an exception if not found
252        setCharsetName( charsetName );
253    
254        verify( sourceFields );
255        }
256    
257      /**
258       * Creates a new TextLine instance. If sourceFields has one field, only the text line will be returned in the
259       * subsequent tuples.
260       *
261       * @param sourceFields the source fields for this scheme
262       */
263      @ConstructorProperties({"sourceFields"})
264      public TextLine( Fields sourceFields )
265        {
266        super( sourceFields );
267    
268        verify( sourceFields );
269        }
270    
271      /**
272       * Creates a new TextLine instance. If sourceFields has one field, only the text line will be returned in the
273       * subsequent tuples.
274       *
275       * @param sourceFields the source fields for this scheme
276       * @param charsetName  of type String
277       */
278      @ConstructorProperties({"sourceFields", "charsetName"})
279      public TextLine( Fields sourceFields, String charsetName )
280        {
281        super( sourceFields );
282    
283        // throws an exception if not found
284        setCharsetName( charsetName );
285    
286        verify( sourceFields );
287        }
288    
289      /**
290       * Creates a new TextLine instance. If sourceFields has one field, only the text line will be returned in the
291       * subsequent tuples. The resulting data set will have numSinkParts.
292       *
293       * @param sourceFields the source fields for this scheme
294       * @param numSinkParts of type int
295       */
296      @ConstructorProperties({"sourceFields", "numSinkParts"})
297      public TextLine( Fields sourceFields, int numSinkParts )
298        {
299        super( sourceFields, numSinkParts );
300    
301        verify( sourceFields );
302        }
303    
304      protected void setCharsetName( String charsetName )
305        {
306        if( charsetName != null )
307          this.charsetName = charsetName;
308    
309        Charset.forName( this.charsetName );
310        }
311    
312      protected void verify( Fields sourceFields )
313        {
314        if( sourceFields.size() < 1 || sourceFields.size() > 2 )
315          throw new IllegalArgumentException( "this scheme requires either one or two source fields, given [" + sourceFields + "]" );
316        }
317    
318      /**
319       * Method getSinkCompression returns the sinkCompression of this TextLine object.
320       *
321       * @return the sinkCompression (type Compress) of this TextLine object.
322       */
323      public Compress getSinkCompression()
324        {
325        return sinkCompression;
326        }
327    
328      /**
329       * Method setSinkCompression sets the sinkCompression of this TextLine object. If null, compression will remain disabled.
330       *
331       * @param sinkCompression the sinkCompression of this TextLine object.
332       */
333      public void setSinkCompression( Compress sinkCompression )
334        {
335        if( sinkCompression != null ) // leave disabled if null
336          this.sinkCompression = sinkCompression;
337        }
338    
339      @Override
340      public void sourceConfInit( FlowProcess<JobConf> flowProcess, Tap<JobConf, RecordReader, OutputCollector> tap, JobConf conf )
341        {
342        if( hasZippedFiles( FileInputFormat.getInputPaths( conf ) ) )
343          throw new IllegalStateException( "cannot read zip files: " + Arrays.toString( FileInputFormat.getInputPaths( conf ) ) );
344    
345        conf.setInputFormat( TextInputFormat.class );
346        }
347    
348      private boolean hasZippedFiles( Path[] paths )
349        {
350        if( paths == null || paths.length == 0 )
351          return false;
352    
353        boolean isZipped = paths[ 0 ].getName().endsWith( ".zip" );
354    
355        for( int i = 1; i < paths.length; i++ )
356          {
357          if( isZipped != paths[ i ].getName().endsWith( ".zip" ) )
358            throw new IllegalStateException( "cannot mix zipped and upzipped files" );
359          }
360    
361        return isZipped;
362        }
363    
364      @Override
365      public void presentSourceFields( FlowProcess<JobConf> flowProcess, Tap tap, Fields fields )
366        {
367        // do nothing to change TextLine state
368        }
369    
370      @Override
371      public void presentSinkFields( FlowProcess<JobConf> flowProcess, Tap tap, Fields fields )
372        {
373        // do nothing to change TextLine state
374        }
375    
376      @Override
377      public void sinkConfInit( FlowProcess<JobConf> flowProcess, Tap<JobConf, RecordReader, OutputCollector> tap, JobConf conf )
378        {
379        if( tap.getFullIdentifier( conf ).endsWith( ".zip" ) )
380          throw new IllegalStateException( "cannot write zip files: " + FileOutputFormat.getOutputPath( conf ) );
381    
382        if( getSinkCompression() == Compress.DISABLE )
383          conf.setBoolean( "mapred.output.compress", false );
384        else if( getSinkCompression() == Compress.ENABLE )
385          conf.setBoolean( "mapred.output.compress", true );
386    
387        conf.setOutputKeyClass( Text.class ); // be explicit
388        conf.setOutputValueClass( Text.class ); // be explicit
389        conf.setOutputFormat( TextOutputFormat.class );
390        }
391    
392      @Override
393      public void sourcePrepare( FlowProcess<JobConf> flowProcess, SourceCall<Object[], RecordReader> sourceCall )
394        {
395        if( sourceCall.getContext() == null )
396          sourceCall.setContext( new Object[ 3 ] );
397    
398        sourceCall.getContext()[ 0 ] = sourceCall.getInput().createKey();
399        sourceCall.getContext()[ 1 ] = sourceCall.getInput().createValue();
400        sourceCall.getContext()[ 2 ] = Charset.forName( charsetName );
401        }
402    
403      @Override
404      public boolean source( FlowProcess<JobConf> flowProcess, SourceCall<Object[], RecordReader> sourceCall ) throws IOException
405        {
406        if( !sourceReadInput( sourceCall ) )
407          return false;
408    
409        sourceHandleInput( sourceCall );
410    
411        return true;
412        }
413    
414      private boolean sourceReadInput( SourceCall<Object[], RecordReader> sourceCall ) throws IOException
415        {
416        Object[] context = sourceCall.getContext();
417    
418        return sourceCall.getInput().next( context[ 0 ], context[ 1 ] );
419        }
420    
421      protected void sourceHandleInput( SourceCall<Object[], RecordReader> sourceCall )
422        {
423        TupleEntry result = sourceCall.getIncomingEntry();
424    
425        int index = 0;
426        Object[] context = sourceCall.getContext();
427    
428        // coerce into canonical forms
429        if( getSourceFields().size() == 2 )
430          result.setLong( index++, ( (LongWritable) context[ 0 ] ).get() );
431    
432        result.setString( index, makeEncodedString( context ) );
433        }
434    
435      protected String makeEncodedString( Object[] context )
436        {
437        Text text = (Text) context[ 1 ];
438        return new String( text.getBytes(), 0, text.getLength(), (Charset) context[ 2 ] );
439        }
440    
441      @Override
442      public void sourceCleanup( FlowProcess<JobConf> flowProcess, SourceCall<Object[], RecordReader> sourceCall )
443        {
444        sourceCall.setContext( null );
445        }
446    
447      @Override
448      public void sinkPrepare( FlowProcess<JobConf> flowProcess, SinkCall<Object[], OutputCollector> sinkCall ) throws IOException
449        {
450        sinkCall.setContext( new Object[ 2 ] );
451    
452        sinkCall.getContext()[ 0 ] = new Text();
453        sinkCall.getContext()[ 1 ] = Charset.forName( charsetName );
454        }
455    
456      @Override
457      public void sink( FlowProcess<JobConf> flowProcess, SinkCall<Object[], OutputCollector> sinkCall ) throws IOException
458        {
459        Text text = (Text) sinkCall.getContext()[ 0 ];
460        Charset charset = (Charset) sinkCall.getContext()[ 1 ];
461        String line = sinkCall.getOutgoingEntry().getTuple().toString();
462    
463        text.set( line.getBytes( charset ) );
464    
465        // it's ok to use NULL here so the collector does not write anything
466        sinkCall.getOutput().collect( null, text );
467        }
468      }