001/*
002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.scheme.hadoop;
022
023import java.beans.ConstructorProperties;
024import java.io.IOException;
025import java.nio.charset.Charset;
026import java.util.Arrays;
027
028import cascading.flow.FlowProcess;
029import cascading.flow.hadoop.util.HadoopUtil;
030import cascading.management.annotation.Property;
031import cascading.management.annotation.PropertyDescription;
032import cascading.management.annotation.Visibility;
033import cascading.scheme.Scheme;
034import cascading.scheme.SinkCall;
035import cascading.scheme.SourceCall;
036import cascading.tap.Tap;
037import cascading.tuple.Fields;
038import cascading.tuple.Tuple;
039import cascading.tuple.TupleEntry;
040import org.apache.hadoop.conf.Configuration;
041import org.apache.hadoop.fs.Path;
042import org.apache.hadoop.io.LongWritable;
043import org.apache.hadoop.io.Text;
044import org.apache.hadoop.mapred.FileInputFormat;
045import org.apache.hadoop.mapred.InputFormat;
046import org.apache.hadoop.mapred.OutputCollector;
047import org.apache.hadoop.mapred.OutputFormat;
048import org.apache.hadoop.mapred.RecordReader;
049import org.apache.hadoop.mapred.TextInputFormat;
050import org.apache.hadoop.mapred.TextOutputFormat;
051
052import static cascading.flow.hadoop.util.HadoopUtil.asJobConfInstance;
053
054/**
055 * A TextLine is a type of {@link cascading.scheme.Scheme} for plain text files. Files are broken into
056 * lines. Either line-feed or carriage-return are used to signal end of line.
057 * <p/>
058 * By default, this scheme returns a {@link Tuple} with two fields, "offset" and "line".
059 * <p/>
060 * Many of the constructors take both "sourceFields" and "sinkFields". sourceFields denote the field names
061 * to be used instead of the names "offset" and "line". sinkFields is a selector and is by default {@link Fields#ALL}.
062 * Any available field names can be given if only a subset of the incoming fields should be used.
063 * <p/>
064 * If a {@link Fields} instance is passed on the constructor as sourceFields having only one field, the return tuples
065 * will simply be the "line" value using the given field name.
066 * <p/>
067 * Note that TextLine will concatenate all the Tuple values for the selected fields with a TAB delimiter before
068 * writing out the line.
069 * <p/>
070 * Note sink compression is {@link Compress#DISABLE} by default. If {@code null} is passed to the constructor
071 * for the compression value, it will remain disabled.
072 * <p/>
073 * If any of the input files end with ".zip", an error will be thrown.
074 * * <p/>
075 * By default, all text is encoded/decoded as UTF-8. This can be changed via the {@code charsetName} constructor
076 * argument.
077 */
078public class TextLine extends Scheme<Configuration, RecordReader, OutputCollector, Object[], Object[]>
079  {
080  public enum Compress
081    {
082      DEFAULT, ENABLE, DISABLE
083    }
084
085  public static final String DEFAULT_CHARSET = "UTF-8";
086
087  /** Field serialVersionUID */
088  private static final long serialVersionUID = 1L;
089  /** Field DEFAULT_SOURCE_FIELDS */
090  public static final Fields DEFAULT_SOURCE_FIELDS = new Fields( "offset", "line" ).applyTypes( Long.TYPE, String.class );
091
092  /** Field sinkCompression */
093  Compress sinkCompression = Compress.DISABLE;
094
095  String charsetName = DEFAULT_CHARSET;
096
097  /**
098   * Creates a new TextLine instance that sources "offset" and "line" fields, and sinks all incoming fields, where
099   * "offset" is the byte offset in the input file.
100   */
101  public TextLine()
102    {
103    super( DEFAULT_SOURCE_FIELDS );
104    }
105
106  /**
107   * Creates a new TextLine instance that sources "offset" and "line" fields, and sinks all incoming fields, where
108   * "offset" is the byte offset in the input file.
109   *
110   * @param numSinkParts of type int
111   */
112  @ConstructorProperties({"numSinkParts"})
113  public TextLine( int numSinkParts )
114    {
115    super( DEFAULT_SOURCE_FIELDS, numSinkParts );
116    }
117
118  /**
119   * Creates a new TextLine instance that sources "offset" and "line" fields, and sinks all incoming fields, where
120   * "offset" is the byte offset in the input file.
121   *
122   * @param sinkCompression of type Compress
123   */
124  @ConstructorProperties({"sinkCompression"})
125  public TextLine( Compress sinkCompression )
126    {
127    super( DEFAULT_SOURCE_FIELDS );
128
129    setSinkCompression( sinkCompression );
130    }
131
132  /**
133   * Creates a new TextLine instance. If sourceFields has one field, only the text line will be returned in the
134   * subsequent tuples.
135   *
136   * @param sourceFields the source fields for this scheme
137   * @param sinkFields   the sink fields for this scheme
138   */
139  @ConstructorProperties({"sourceFields", "sinkFields"})
140  public TextLine( Fields sourceFields, Fields sinkFields )
141    {
142    super( sourceFields, sinkFields );
143
144    verify( sourceFields );
145    }
146
147  /**
148   * Creates a new TextLine instance. If sourceFields has one field, only the text line will be returned in the
149   * subsequent tuples.
150   *
151   * @param sourceFields the source fields for this scheme
152   * @param sinkFields   the sink fields for this scheme
153   * @param charsetName  of type String
154   */
155  @ConstructorProperties({"sourceFields", "sinkFields", "charsetName"})
156  public TextLine( Fields sourceFields, Fields sinkFields, String charsetName )
157    {
158    super( sourceFields, sinkFields );
159
160    // throws an exception if not found
161    setCharsetName( charsetName );
162
163    verify( sourceFields );
164    }
165
166  /**
167   * Creates a new TextLine instance. If sourceFields has one field, only the text line will be returned in the
168   * subsequent tuples.
169   *
170   * @param sourceFields the source fields for this scheme
171   * @param sinkFields   the sink fields for this scheme
172   * @param numSinkParts of type int
173   */
174  @ConstructorProperties({"sourceFields", "sinkFields", "numSinkParts"})
175  public TextLine( Fields sourceFields, Fields sinkFields, int numSinkParts )
176    {
177    super( sourceFields, sinkFields, numSinkParts );
178
179    verify( sourceFields );
180    }
181
182  /**
183   * Constructor TextLine creates a new TextLine instance. If sourceFields has one field, only the text line will be returned in the
184   * subsequent tuples.
185   *
186   * @param sourceFields    of type Fields
187   * @param sinkFields      of type Fields
188   * @param sinkCompression of type Compress
189   */
190  @ConstructorProperties({"sourceFields", "sinkFields", "sinkCompression"})
191  public TextLine( Fields sourceFields, Fields sinkFields, Compress sinkCompression )
192    {
193    super( sourceFields, sinkFields );
194
195    setSinkCompression( sinkCompression );
196
197    verify( sourceFields );
198    }
199
200  /**
201   * Constructor TextLine creates a new TextLine instance. If sourceFields has one field, only the text line will be returned in the
202   * subsequent tuples.
203   *
204   * @param sourceFields    of type Fields
205   * @param sinkFields      of type Fields
206   * @param sinkCompression of type Compress
207   * @param charsetName     of type String
208   */
209  @ConstructorProperties({"sourceFields", "sinkFields", "sinkCompression", "charsetName"})
210  public TextLine( Fields sourceFields, Fields sinkFields, Compress sinkCompression, String charsetName )
211    {
212    super( sourceFields, sinkFields );
213
214    setSinkCompression( sinkCompression );
215
216    // throws an exception if not found
217    setCharsetName( charsetName );
218
219    verify( sourceFields );
220    }
221
222  /**
223   * Constructor TextLine creates a new TextLine instance. If sourceFields has one field, only the text line will be returned in the
224   * subsequent tuples.
225   *
226   * @param sourceFields    of type Fields
227   * @param sinkFields      of type Fields
228   * @param sinkCompression of type Compress
229   * @param numSinkParts    of type int
230   */
231  @ConstructorProperties({"sourceFields", "sinkFields", "sinkCompression", "numSinkParts"})
232  public TextLine( Fields sourceFields, Fields sinkFields, Compress sinkCompression, int numSinkParts )
233    {
234    super( sourceFields, sinkFields, numSinkParts );
235
236    setSinkCompression( sinkCompression );
237
238    verify( sourceFields );
239    }
240
241  /**
242   * Constructor TextLine creates a new TextLine instance. If sourceFields has one field, only the text line will be returned in the
243   * subsequent tuples.
244   *
245   * @param sourceFields    of type Fields
246   * @param sinkFields      of type Fields
247   * @param sinkCompression of type Compress
248   * @param numSinkParts    of type int
249   * @param charsetName     of type String
250   */
251  @ConstructorProperties({"sourceFields", "sinkFields", "sinkCompression", "numSinkParts", "charsetName"})
252  public TextLine( Fields sourceFields, Fields sinkFields, Compress sinkCompression, int numSinkParts, String charsetName )
253    {
254    super( sourceFields, sinkFields, numSinkParts );
255
256    setSinkCompression( sinkCompression );
257
258    // throws an exception if not found
259    setCharsetName( charsetName );
260
261    verify( sourceFields );
262    }
263
264  /**
265   * Creates a new TextLine instance. If sourceFields has one field, only the text line will be returned in the
266   * subsequent tuples.
267   *
268   * @param sourceFields the source fields for this scheme
269   */
270  @ConstructorProperties({"sourceFields"})
271  public TextLine( Fields sourceFields )
272    {
273    super( sourceFields );
274
275    verify( sourceFields );
276    }
277
278  /**
279   * Creates a new TextLine instance. If sourceFields has one field, only the text line will be returned in the
280   * subsequent tuples.
281   *
282   * @param sourceFields the source fields for this scheme
283   * @param charsetName  of type String
284   */
285  @ConstructorProperties({"sourceFields", "charsetName"})
286  public TextLine( Fields sourceFields, String charsetName )
287    {
288    super( sourceFields );
289
290    // throws an exception if not found
291    setCharsetName( charsetName );
292
293    verify( sourceFields );
294    }
295
296  /**
297   * Creates a new TextLine instance. If sourceFields has one field, only the text line will be returned in the
298   * subsequent tuples. The resulting data set will have numSinkParts.
299   *
300   * @param sourceFields the source fields for this scheme
301   * @param numSinkParts of type int
302   */
303  @ConstructorProperties({"sourceFields", "numSinkParts"})
304  public TextLine( Fields sourceFields, int numSinkParts )
305    {
306    super( sourceFields, numSinkParts );
307
308    verify( sourceFields );
309    }
310
311  protected void setCharsetName( String charsetName )
312    {
313    if( charsetName != null )
314      this.charsetName = charsetName;
315
316    Charset.forName( this.charsetName );
317    }
318
319  @Property(name = "charset", visibility = Visibility.PUBLIC)
320  @PropertyDescription(value = "character set used in this scheme.")
321  public String getCharsetName()
322    {
323    return charsetName;
324    }
325
326  protected void verify( Fields sourceFields )
327    {
328    if( sourceFields.size() < 1 || sourceFields.size() > 2 )
329      throw new IllegalArgumentException( "this scheme requires either one or two source fields, given [" + sourceFields + "]" );
330    }
331
332  /**
333   * Method getSinkCompression returns the sinkCompression of this TextLine object.
334   *
335   * @return the sinkCompression (type Compress) of this TextLine object.
336   */
337  @Property(name = "sinkCompression", visibility = Visibility.PUBLIC)
338  @PropertyDescription(value = "The compression of the scheme when used in a sink.")
339  public Compress getSinkCompression()
340    {
341    return sinkCompression;
342    }
343
344  /**
345   * Method setSinkCompression sets the sinkCompression of this TextLine object. If null, compression will remain disabled.
346   *
347   * @param sinkCompression the sinkCompression of this TextLine object.
348   */
349  public void setSinkCompression( Compress sinkCompression )
350    {
351    if( sinkCompression != null ) // leave disabled if null
352      this.sinkCompression = sinkCompression;
353    }
354
355  @Override
356  public void sourceConfInit( FlowProcess<? extends Configuration> flowProcess, Tap<Configuration, RecordReader, OutputCollector> tap, Configuration conf )
357    {
358    if( hasZippedFiles( FileInputFormat.getInputPaths( asJobConfInstance( conf ) ) ) )
359      throw new IllegalStateException( "cannot read zip files: " + Arrays.toString( FileInputFormat.getInputPaths( asJobConfInstance( conf ) ) ) );
360
361    conf.setBoolean( "mapred.mapper.new-api", false );
362    conf.setClass( "mapred.input.format.class", TextInputFormat.class, InputFormat.class );
363    }
364
365  private boolean hasZippedFiles( Path[] paths )
366    {
367    if( paths == null || paths.length == 0 )
368      return false;
369
370    boolean isZipped = paths[ 0 ].getName().endsWith( ".zip" );
371
372    for( int i = 1; i < paths.length; i++ )
373      {
374      if( isZipped != paths[ i ].getName().endsWith( ".zip" ) )
375        throw new IllegalStateException( "cannot mix zipped and upzipped files" );
376      }
377
378    return isZipped;
379    }
380
381  @Override
382  public void presentSourceFields( FlowProcess<? extends Configuration> flowProcess, Tap tap, Fields fields )
383    {
384    // do nothing to change TextLine state
385    }
386
387  @Override
388  public void presentSinkFields( FlowProcess<? extends Configuration> flowProcess, Tap tap, Fields fields )
389    {
390    // do nothing to change TextLine state
391    }
392
393  @Override
394  public void sinkConfInit( FlowProcess<? extends Configuration> flowProcess, Tap<Configuration, RecordReader, OutputCollector> tap, Configuration conf )
395    {
396    if( tap.getFullIdentifier( conf ).endsWith( ".zip" ) )
397      throw new IllegalStateException( "cannot write zip files: " + HadoopUtil.getOutputPath( conf ) );
398
399    conf.setBoolean( "mapred.mapper.new-api", false );
400
401    if( getSinkCompression() == Compress.DISABLE )
402      conf.setBoolean( "mapred.output.compress", false );
403    else if( getSinkCompression() == Compress.ENABLE )
404      conf.setBoolean( "mapred.output.compress", true );
405
406    conf.setClass( "mapred.output.key.class", Text.class, Object.class );
407    conf.setClass( "mapred.output.value.class", Text.class, Object.class );
408    conf.setClass( "mapred.output.format.class", TextOutputFormat.class, OutputFormat.class );
409    }
410
411  @Override
412  public void sourcePrepare( FlowProcess<? extends Configuration> flowProcess, SourceCall<Object[], RecordReader> sourceCall )
413    {
414    if( sourceCall.getContext() == null )
415      sourceCall.setContext( new Object[ 3 ] );
416
417    sourceCall.getContext()[ 0 ] = sourceCall.getInput().createKey();
418    sourceCall.getContext()[ 1 ] = sourceCall.getInput().createValue();
419    sourceCall.getContext()[ 2 ] = Charset.forName( charsetName );
420    }
421
422  @Override
423  public boolean source( FlowProcess<? extends Configuration> flowProcess, SourceCall<Object[], RecordReader> sourceCall ) throws IOException
424    {
425    if( !sourceReadInput( sourceCall ) )
426      return false;
427
428    sourceHandleInput( sourceCall );
429
430    return true;
431    }
432
433  private boolean sourceReadInput( SourceCall<Object[], RecordReader> sourceCall ) throws IOException
434    {
435    Object[] context = sourceCall.getContext();
436
437    return sourceCall.getInput().next( context[ 0 ], context[ 1 ] );
438    }
439
440  protected void sourceHandleInput( SourceCall<Object[], RecordReader> sourceCall )
441    {
442    TupleEntry result = sourceCall.getIncomingEntry();
443
444    int index = 0;
445    Object[] context = sourceCall.getContext();
446
447    // coerce into canonical forms
448    if( getSourceFields().size() == 2 )
449      result.setLong( index++, ( (LongWritable) context[ 0 ] ).get() );
450
451    result.setString( index, makeEncodedString( context ) );
452    }
453
454  protected String makeEncodedString( Object[] context )
455    {
456    Text text = (Text) context[ 1 ];
457    return new String( text.getBytes(), 0, text.getLength(), (Charset) context[ 2 ] );
458    }
459
460  @Override
461  public void sourceCleanup( FlowProcess<? extends Configuration> flowProcess, SourceCall<Object[], RecordReader> sourceCall )
462    {
463    sourceCall.setContext( null );
464    }
465
466  @Override
467  public void sinkPrepare( FlowProcess<? extends Configuration> flowProcess, SinkCall<Object[], OutputCollector> sinkCall ) throws IOException
468    {
469    sinkCall.setContext( new Object[ 2 ] );
470
471    sinkCall.getContext()[ 0 ] = new Text();
472    sinkCall.getContext()[ 1 ] = Charset.forName( charsetName );
473    }
474
475  @Override
476  public void sink( FlowProcess<? extends Configuration> flowProcess, SinkCall<Object[], OutputCollector> sinkCall ) throws IOException
477    {
478    Text text = (Text) sinkCall.getContext()[ 0 ];
479    Charset charset = (Charset) sinkCall.getContext()[ 1 ];
480    String line = sinkCall.getOutgoingEntry().getTuple().toString();
481
482    text.set( line.getBytes( charset ) );
483
484    // it's ok to use NULL here so the collector does not write anything
485    sinkCall.getOutput().collect( null, text );
486    }
487  }