001/*
002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.scheme.hadoop;
022
023import java.beans.ConstructorProperties;
024import java.io.IOException;
025import java.nio.charset.Charset;
026
027import cascading.flow.FlowProcess;
028import cascading.management.annotation.Property;
029import cascading.management.annotation.PropertyDescription;
030import cascading.management.annotation.Visibility;
031import cascading.scheme.SinkCall;
032import cascading.scheme.SourceCall;
033import cascading.scheme.util.DelimitedParser;
034import cascading.tap.CompositeTap;
035import cascading.tap.Tap;
036import cascading.tap.TapException;
037import cascading.tap.hadoop.Hfs;
038import cascading.tuple.Fields;
039import cascading.tuple.Tuple;
040import cascading.tuple.TupleEntry;
041import cascading.tuple.util.TupleViews;
042import org.apache.hadoop.conf.Configuration;
043import org.apache.hadoop.io.LongWritable;
044import org.apache.hadoop.io.Text;
045import org.apache.hadoop.mapred.OutputCollector;
046import org.apache.hadoop.mapred.RecordReader;
047
048/**
049 * Class TextDelimited is a sub-class of {@link TextLine}. It provides direct support for delimited text files, like
050 * TAB (\t) or COMMA (,) delimited files. It also optionally allows for quoted values.
051 * <p/>
052 * TextDelimited may also be used to skip the "header" in a file, where the header is defined as the very first line
053 * in every input file. That is, if the byte offset of the current line from the input is zero (0), that line will
054 * be skipped.
055 * <p/>
056 * It is assumed if sink/source {@code fields} is set to either {@link Fields#ALL} or {@link Fields#UNKNOWN} and
057 * {@code skipHeader} or {@code hasHeader} is {@code true}, the field names will be retrieved from the header of the
058 * file and used during planning. The header will parsed with the same rules as the body of the file.
059 * <p/>
060 * By default headers are not skipped.
061 * <p/>
062 * TextDelimited may also be used to write a "header" in a file. The fields names for the header are taken directly
063 * from the declared fields. Or if the declared fields are {@link Fields#ALL} or {@link Fields#UNKNOWN}, the
064 * resolved field names will be used, if any.
065 * <p/>
066 * By default headers are not written.
067 * <p/>
068 * If {@code hasHeaders} is set to {@code true} on a constructor, both {@code skipHeader} and {@code writeHeader} will
069 * be set to {@code true}.
070 * <p/>
071 * By default this {@link cascading.scheme.Scheme} is both {@code strict} and {@code safe}.
072 * <p/>
073 * Strict meaning if a line of text does not parse into the expected number of fields, this class will throw a
074 * {@link TapException}. If strict is {@code false}, then {@link Tuple} will be returned with {@code null} values
075 * for the missing fields.
076 * <p/>
077 * Safe meaning if a field cannot be coerced into an expected type, a {@code null} will be used for the value.
078 * If safe is {@code false}, a {@link TapException} will be thrown.
079 * <p/>
080 * Also by default, {@code quote} strings are not searched for to improve processing speed. If a file is
081 * COMMA delimited but may have COMMA's in a value, the whole value should be surrounded by the quote string, typically
082 * double quotes ({@literal "}).
083 * <p/>
084 * Note all empty fields in a line will be returned as {@code null} unless coerced into a new type.
085 * <p/>
086 * This Scheme may source/sink {@link Fields#ALL}, when given on the constructor the new instance will automatically
087 * default to strict == false as the number of fields parsed are arbitrary or unknown. A type array may not be given
088 * either, so all values will be returned as Strings.
089 * <p/>
090 * By default, all text is encoded/decoded as UTF-8. This can be changed via the {@code charsetName} constructor
091 * argument.
092 * <p/>
093 * To override field and line parsing behaviors, sub-class {@link DelimitedParser} or provide a
094 * {@link cascading.scheme.util.FieldTypeResolver} implementation.
095 * <p/>
096 * Note that there should be no expectation that TextDelimited, or specifically {@link DelimitedParser}, can handle
097 * all delimited and quoted combinations reliably. Attempting to do so would impair its performance and maintainability.
098 * <p/>
099 * Further, it can be safely said any corrupted files will not be supported for obvious reasons. Corrupted files may
100 * result in exceptions or could cause edge cases in the underlying java regular expression engine.
101 * <p/>
102 * A large part of Cascading was designed to help users cleans data. Thus the recommendation is to create Flows that
103 * are responsible for cleansing large data-sets when faced with the problem
104 * <p/>
105 * DelimitedParser maybe sub-classed and extended if necessary.
106 *
107 * @see TextLine
108 */
109public class TextDelimited extends TextLine
110  {
111  public static final String DEFAULT_CHARSET = "UTF-8";
112
113  /** Field delimitedParser */
114  protected final DelimitedParser delimitedParser;
115  /** Field skipHeader */
116  private boolean skipHeader;
117  private final boolean writeHeader;
118
119  /**
120   * Constructor TextDelimited creates a new TextDelimited instance sourcing {@link Fields#UNKNOWN}, sinking
121   * {@link Fields#ALL} and using TAB as the default delimiter.
122   * <p/>
123   * Use this constructor if the source and sink fields will be resolved during planning, for example, when using
124   * with a {@link cascading.pipe.Checkpoint} Tap.
125   */
126  public TextDelimited()
127    {
128    this( Fields.ALL, null, "\t", null, null );
129    }
130
131  /**
132   * Constructor TextDelimited creates a new TextDelimited instance sourcing {@link Fields#UNKNOWN}, sinking
133   * {@link Fields#ALL} and using TAB as the default delimiter.
134   * <p/>
135   * Use this constructor if the source and sink fields will be resolved during planning, for example, when using
136   * with a {@link cascading.pipe.Checkpoint} Tap.
137   *
138   * @param hasHeader of type boolean
139   * @param delimiter of type String
140   */
141  @ConstructorProperties({"hasHeader", "delimiter"})
142  public TextDelimited( boolean hasHeader, String delimiter )
143    {
144    this( Fields.ALL, null, hasHeader, delimiter, null, (Class[]) null );
145    }
146
147  /**
148   * Constructor TextDelimited creates a new TextDelimited instance sourcing {@link Fields#UNKNOWN}, sinking
149   * {@link Fields#ALL} and using TAB as the default delimiter.
150   * <p/>
151   * Use this constructor if the source and sink fields will be resolved during planning, for example, when using
152   * with a {@link cascading.pipe.Checkpoint} Tap.
153   *
154   * @param hasHeader of type boolean
155   * @param delimiter of type String
156   * @param quote     of type String
157   */
158  @ConstructorProperties({"hasHeader", "delimiter", "quote"})
159  public TextDelimited( boolean hasHeader, String delimiter, String quote )
160    {
161    this( Fields.ALL, null, hasHeader, delimiter, quote, (Class[]) null );
162    }
163
164  /**
165   * Constructor TextDelimited creates a new TextDelimited instance sourcing {@link Fields#UNKNOWN}, sinking
166   * {@link Fields#ALL} and using the given delimitedParser instance for parsing.
167   * <p/>
168   * Use this constructor if the source and sink fields will be resolved during planning, for example, when using
169   * with a {@link cascading.pipe.Checkpoint} Tap.
170   *
171   * @param hasHeader       of type boolean
172   * @param delimitedParser of type DelimitedParser
173   */
174  @ConstructorProperties({"hasHeader", "delimitedParser"})
175  public TextDelimited( boolean hasHeader, DelimitedParser delimitedParser )
176    {
177    this( Fields.ALL, null, hasHeader, hasHeader, delimitedParser );
178    }
179
180  /**
181   * Constructor TextDelimited creates a new TextDelimited instance sourcing {@link Fields#UNKNOWN}, sinking
182   * {@link Fields#ALL} and using the given delimitedParser instance for parsing.
183   * <p/>
184   * Use this constructor if the source and sink fields will be resolved during planning, for example, when using
185   * with a {@link cascading.pipe.Checkpoint} Tap.
186   * <p/>
187   * This constructor will set {@code skipHeader} and {@code writeHeader} values to true.
188   *
189   * @param delimitedParser of type DelimitedParser
190   */
191  @ConstructorProperties({"delimitedParser"})
192  public TextDelimited( DelimitedParser delimitedParser )
193    {
194    this( Fields.ALL, null, true, true, delimitedParser );
195    }
196
197  /**
198   * Constructor TextDelimited creates a new TextDelimited instance sourcing {@link Fields#UNKNOWN}, sinking
199   * {@link Fields#ALL} and using the given delimitedParser instance for parsing.
200   * <p/>
201   * Use this constructor if the source and sink fields will be resolved during planning, for example, when using
202   * with a {@link cascading.pipe.Checkpoint} Tap.
203   *
204   * @param sinkCompression of type Compress
205   * @param hasHeader       of type boolean
206   * @param delimitedParser of type DelimitedParser
207   */
208  @ConstructorProperties({"sinkCompression", "hasHeader", "delimitedParser"})
209  public TextDelimited( Compress sinkCompression, boolean hasHeader, DelimitedParser delimitedParser )
210    {
211    this( Fields.ALL, sinkCompression, hasHeader, hasHeader, delimitedParser );
212    }
213
214  /**
215   * Constructor TextDelimited creates a new TextDelimited instance sourcing {@link Fields#UNKNOWN}, sinking
216   * {@link Fields#ALL} and using the given delimitedParser instance for parsing.
217   * <p/>
218   * Use this constructor if the source and sink fields will be resolved during planning, for example, when using
219   * with a {@link cascading.pipe.Checkpoint} Tap.
220   * <p/>
221   * This constructor will set {@code skipHeader} and {@code writeHeader} values to true.
222   *
223   * @param delimitedParser of type DelimitedParser
224   */
225  @ConstructorProperties({"sinkCompression", "delimitedParser"})
226  public TextDelimited( Compress sinkCompression, DelimitedParser delimitedParser )
227    {
228    this( Fields.ALL, sinkCompression, true, true, delimitedParser );
229    }
230
231  /**
232   * Constructor TextDelimited creates a new TextDelimited instance sourcing {@link Fields#UNKNOWN}, sinking
233   * {@link Fields#ALL} and using TAB as the default delimiter.
234   * <p/>
235   * Use this constructor if the source and sink fields will be resolved during planning, for example, when using
236   * with a {@link cascading.pipe.Checkpoint} Tap.
237   *
238   * @param sinkCompression of type Compress
239   * @param hasHeader       of type boolean
240   * @param delimiter       of type String
241   * @param quote           of type String
242   */
243  @ConstructorProperties({"sinkCompression", "hasHeader", "delimiter", "quote"})
244  public TextDelimited( Compress sinkCompression, boolean hasHeader, String delimiter, String quote )
245    {
246    this( Fields.ALL, sinkCompression, hasHeader, delimiter, quote, (Class[]) null );
247    }
248
249  /**
250   * Constructor TextDelimited creates a new TextDelimited instance with TAB as the default delimiter.
251   *
252   * @param fields of type Fields
253   */
254  @ConstructorProperties({"fields"})
255  public TextDelimited( Fields fields )
256    {
257    this( fields, null, "\t", null, null );
258    }
259
260  /**
261   * Constructor TextDelimited creates a new TextDelimited instance.
262   *
263   * @param fields    of type Fields
264   * @param delimiter of type String
265   */
266  @ConstructorProperties({"fields", "delimiter"})
267  public TextDelimited( Fields fields, String delimiter )
268    {
269    this( fields, null, delimiter, null, null );
270    }
271
272  /**
273   * Constructor TextDelimited creates a new TextDelimited instance.
274   *
275   * @param fields    of type Fields
276   * @param hasHeader of type boolean
277   * @param delimiter of type String
278   */
279  @ConstructorProperties({"fields", "hasHeader", "delimiter"})
280  public TextDelimited( Fields fields, boolean hasHeader, String delimiter )
281    {
282    this( fields, null, hasHeader, hasHeader, delimiter, null, null );
283    }
284
285  /**
286   * Constructor TextDelimited creates a new TextDelimited instance.
287   *
288   * @param fields      of type Fields
289   * @param skipHeader  of type boolean
290   * @param writeHeader of type boolean
291   * @param delimiter   of type String
292   */
293  @ConstructorProperties({"fields", "skipHeader", "writeHeader", "delimiter"})
294  public TextDelimited( Fields fields, boolean skipHeader, boolean writeHeader, String delimiter )
295    {
296    this( fields, null, skipHeader, writeHeader, delimiter, null, null );
297    }
298
299  /**
300   * Constructor TextDelimited creates a new TextDelimited instance.
301   *
302   * @param fields    of type Fields
303   * @param delimiter of type String
304   * @param types     of type Class[]
305   */
306  @ConstructorProperties({"fields", "delimiter", "types"})
307  public TextDelimited( Fields fields, String delimiter, Class[] types )
308    {
309    this( fields, null, delimiter, null, types );
310    }
311
312  /**
313   * Constructor TextDelimited creates a new TextDelimited instance.
314   *
315   * @param fields    of type Fields
316   * @param hasHeader of type boolean
317   * @param delimiter of type String
318   * @param types     of type Class[]
319   */
320  @ConstructorProperties({"fields", "hasHeader", "delimiter", "types"})
321  public TextDelimited( Fields fields, boolean hasHeader, String delimiter, Class[] types )
322    {
323    this( fields, null, hasHeader, hasHeader, delimiter, null, types );
324    }
325
326  /**
327   * Constructor TextDelimited creates a new TextDelimited instance.
328   *
329   * @param fields      of type Fields
330   * @param skipHeader  of type boolean
331   * @param writeHeader of type boolean
332   * @param delimiter   of type String
333   * @param types       of type Class[]
334   */
335  @ConstructorProperties({"fields", "skipHeader", "writeHeader", "delimiter", "types"})
336  public TextDelimited( Fields fields, boolean skipHeader, boolean writeHeader, String delimiter, Class[] types )
337    {
338    this( fields, null, skipHeader, writeHeader, delimiter, null, types );
339    }
340
341  /**
342   * Constructor TextDelimited creates a new TextDelimited instance.
343   *
344   * @param fields    of type Fields
345   * @param delimiter of type String
346   * @param quote     of type String
347   * @param types     of type Class[]
348   */
349  @ConstructorProperties({"fields", "delimiter", "quote", "types"})
350  public TextDelimited( Fields fields, String delimiter, String quote, Class[] types )
351    {
352    this( fields, null, delimiter, quote, types );
353    }
354
355  /**
356   * Constructor TextDelimited creates a new TextDelimited instance.
357   *
358   * @param fields    of type Fields
359   * @param hasHeader of type boolean
360   * @param delimiter of type String
361   * @param quote     of type String
362   * @param types     of type Class[]
363   */
364  @ConstructorProperties({"fields", "hasHeader", "delimiter", "quote", "types"})
365  public TextDelimited( Fields fields, boolean hasHeader, String delimiter, String quote, Class[] types )
366    {
367    this( fields, null, hasHeader, hasHeader, delimiter, quote, types );
368    }
369
370  /**
371   * Constructor TextDelimited creates a new TextDelimited instance.
372   *
373   * @param fields      of type Fields
374   * @param skipHeader  of type boolean
375   * @param writeHeader of type boolean
376   * @param delimiter   of type String
377   * @param quote       of type String
378   * @param types       of type Class[]
379   */
380  @ConstructorProperties({"fields", "skipHeader", "writeHeader", "delimiter", "quote", "types"})
381  public TextDelimited( Fields fields, boolean skipHeader, boolean writeHeader, String delimiter, String quote, Class[] types )
382    {
383    this( fields, null, skipHeader, writeHeader, delimiter, quote, types );
384    }
385
386  /**
387   * Constructor TextDelimited creates a new TextDelimited instance.
388   *
389   * @param fields    of type Fields
390   * @param delimiter of type String
391   * @param quote     of type String
392   * @param types     of type Class[]
393   * @param safe      of type boolean
394   */
395  @ConstructorProperties({"fields", "delimiter", "quote", "types", "safe"})
396  public TextDelimited( Fields fields, String delimiter, String quote, Class[] types, boolean safe )
397    {
398    this( fields, null, delimiter, quote, types, safe );
399    }
400
401  /**
402   * Constructor TextDelimited creates a new TextDelimited instance.
403   *
404   * @param fields    of type Fields
405   * @param hasHeader of type boolean
406   * @param delimiter of type String
407   * @param quote     of type String
408   * @param types     of type Class[]
409   * @param safe      of type boolean
410   */
411  @ConstructorProperties({"fields", "hasHeader", "delimiter", "quote", "types", "safe"})
412  public TextDelimited( Fields fields, boolean hasHeader, String delimiter, String quote, Class[] types, boolean safe )
413    {
414    this( fields, null, hasHeader, hasHeader, delimiter, quote, types, safe );
415    }
416
417  /**
418   * Constructor TextDelimited creates a new TextDelimited instance.
419   *
420   * @param fields      of type Fields
421   * @param hasHeader   of type boolean
422   * @param delimiter   of type String
423   * @param quote       of type String
424   * @param types       of type Class[]
425   * @param safe        of type boolean
426   * @param charsetName of type String
427   */
428  @ConstructorProperties({"fields", "hasHeader", "delimiter", "quote", "types", "safe", "charsetName"})
429  public TextDelimited( Fields fields, boolean hasHeader, String delimiter, String quote, Class[] types, boolean safe, String charsetName )
430    {
431    this( fields, null, hasHeader, hasHeader, delimiter, true, quote, types, safe, charsetName );
432    }
433
434  /**
435   * Constructor TextDelimited creates a new TextDelimited instance.
436   *
437   * @param fields      of type Fields
438   * @param skipHeader  of type boolean
439   * @param writeHeader of type boolean
440   * @param delimiter   of type String
441   * @param quote       of type String
442   * @param types       of type Class[]
443   * @param safe        of type boolean
444   */
445  @ConstructorProperties({"fields", "skipHeader", "writeHeader", "delimiter", "quote", "types", "safe"})
446  public TextDelimited( Fields fields, boolean skipHeader, boolean writeHeader, String delimiter, String quote, Class[] types, boolean safe )
447    {
448    this( fields, null, skipHeader, writeHeader, delimiter, quote, types, safe );
449    }
450
451  /**
452   * Constructor TextDelimited creates a new TextDelimited instance.
453   *
454   * @param fields          of type Fields
455   * @param sinkCompression of type Compress
456   * @param delimiter       of type String
457   */
458  @ConstructorProperties({"fields", "sinkCompression", "delimiter"})
459  public TextDelimited( Fields fields, Compress sinkCompression, String delimiter )
460    {
461    this( fields, sinkCompression, delimiter, null, null );
462    }
463
464  /**
465   * Constructor TextDelimited creates a new TextDelimited instance.
466   *
467   * @param fields          of type Fields
468   * @param sinkCompression of type Compress
469   * @param hasHeader       of type boolean
470   * @param delimiter       of type String
471   */
472  @ConstructorProperties({"fields", "sinkCompression", "hasHeader", "delimiter"})
473  public TextDelimited( Fields fields, Compress sinkCompression, boolean hasHeader, String delimiter )
474    {
475    this( fields, sinkCompression, hasHeader, hasHeader, delimiter, null, null );
476    }
477
478  /**
479   * Constructor TextDelimited creates a new TextDelimited instance.
480   *
481   * @param fields          of type Fields
482   * @param sinkCompression of type Compress
483   * @param skipHeader      of type boolean
484   * @param writeHeader     of type boolean
485   * @param delimiter       of type String
486   */
487  @ConstructorProperties({"fields", "sinkCompression", "skipHeader", "writeHeader", "delimiter"})
488  public TextDelimited( Fields fields, Compress sinkCompression, boolean skipHeader, boolean writeHeader, String delimiter )
489    {
490    this( fields, sinkCompression, skipHeader, writeHeader, delimiter, null, null );
491    }
492
493  /**
494   * Constructor TextDelimited creates a new TextDelimited instance.
495   *
496   * @param fields          of type Fields
497   * @param sinkCompression of type Compress
498   * @param delimiter       of type String
499   * @param types           of type Class[]
500   */
501  @ConstructorProperties({"fields", "sinkCompression", "delimiter", "types"})
502  public TextDelimited( Fields fields, Compress sinkCompression, String delimiter, Class[] types )
503    {
504    this( fields, sinkCompression, delimiter, null, types );
505    }
506
507  /**
508   * Constructor TextDelimited creates a new TextDelimited instance.
509   *
510   * @param fields          of type Fields
511   * @param sinkCompression of type Compress
512   * @param hasHeader       of type boolean
513   * @param delimiter       of type String
514   * @param types           of type Class[]
515   */
516  @ConstructorProperties({"fields", "sinkCompression", "hasHeader", "delimiter", "types"})
517  public TextDelimited( Fields fields, Compress sinkCompression, boolean hasHeader, String delimiter, Class[] types )
518    {
519    this( fields, sinkCompression, hasHeader, hasHeader, delimiter, null, types );
520    }
521
522  /**
523   * Constructor TextDelimited creates a new TextDelimited instance.
524   *
525   * @param fields          of type Fields
526   * @param sinkCompression of type Compress
527   * @param skipHeader      of type boolean
528   * @param writeHeader     of type boolean
529   * @param delimiter       of type String
530   * @param types           of type Class[]
531   */
532  @ConstructorProperties({"fields", "sinkCompression", "skipHeader", "writeHeader", "delimiter", "types"})
533  public TextDelimited( Fields fields, Compress sinkCompression, boolean skipHeader, boolean writeHeader, String delimiter, Class[] types )
534    {
535    this( fields, sinkCompression, skipHeader, writeHeader, delimiter, null, types );
536    }
537
538  /**
539   * Constructor TextDelimited creates a new TextDelimited instance.
540   *
541   * @param fields          of type Fields
542   * @param sinkCompression of type Compress
543   * @param delimiter       of type String
544   * @param types           of type Class[]
545   * @param safe            of type boolean
546   */
547  @ConstructorProperties({"fields", "sinkCompression", "delimiter", "types", "safe"})
548  public TextDelimited( Fields fields, Compress sinkCompression, String delimiter, Class[] types, boolean safe )
549    {
550    this( fields, sinkCompression, delimiter, null, types, safe );
551    }
552
553  /**
554   * Constructor TextDelimited creates a new TextDelimited instance.
555   *
556   * @param fields          of type Fields
557   * @param sinkCompression of type Compress
558   * @param hasHeader       of type boolean
559   * @param delimiter       of type String
560   * @param types           of type Class[]
561   * @param safe            of type boolean
562   */
563  @ConstructorProperties({"fields", "sinkCompression", "hasHeader", "delimiter", "types", "safe"})
564  public TextDelimited( Fields fields, Compress sinkCompression, boolean hasHeader, String delimiter, Class[] types, boolean safe )
565    {
566    this( fields, sinkCompression, hasHeader, hasHeader, delimiter, null, types, safe );
567    }
568
569  /**
570   * Constructor TextDelimited creates a new TextDelimited instance.
571   *
572   * @param fields          of type Fields
573   * @param sinkCompression of type Compress
574   * @param hasHeader       of type boolean
575   * @param delimiter       of type String
576   * @param types           of type Class[]
577   * @param safe            of type boolean
578   * @param charsetName     of type String
579   */
580  @ConstructorProperties({"fields", "sinkCompression", "hasHeader", "delimiter", "types", "safe", "charsetName"})
581  public TextDelimited( Fields fields, Compress sinkCompression, boolean hasHeader, String delimiter, Class[] types, boolean safe, String charsetName )
582    {
583    this( fields, sinkCompression, hasHeader, hasHeader, delimiter, true, null, types, safe, charsetName );
584    }
585
586  /**
587   * Constructor TextDelimited creates a new TextDelimited instance.
588   *
589   * @param fields          of type Fields
590   * @param sinkCompression of type Compress
591   * @param skipHeader      of type boolean
592   * @param writeHeader     of type boolean
593   * @param delimiter       of type String
594   * @param types           of type Class[]
595   * @param safe            of type boolean
596   */
597  @ConstructorProperties({"fields", "sinkCompression", "skipHeader", "writeHeader", "delimiter", "types", "safe"})
598  public TextDelimited( Fields fields, Compress sinkCompression, boolean skipHeader, boolean writeHeader, String delimiter, Class[] types, boolean safe )
599    {
600    this( fields, sinkCompression, skipHeader, writeHeader, delimiter, null, types, safe );
601    }
602
603  /**
604   * Constructor TextDelimited creates a new TextDelimited instance.
605   *
606   * @param fields    of type Fields
607   * @param delimiter of type String
608   * @param quote     of type String
609   */
610  @ConstructorProperties({"fields", "delimiter", "quote"})
611  public TextDelimited( Fields fields, String delimiter, String quote )
612    {
613    this( fields, null, delimiter, quote );
614    }
615
616  /**
617   * Constructor TextDelimited creates a new TextDelimited instance.
618   *
619   * @param fields    of type Fields
620   * @param hasHeader of type boolean
621   * @param delimiter of type String
622   * @param quote     of type String
623   */
624  @ConstructorProperties({"fields", "hasHeader", "delimiter", "quote"})
625  public TextDelimited( Fields fields, boolean hasHeader, String delimiter, String quote )
626    {
627    this( fields, null, hasHeader, hasHeader, delimiter, quote );
628    }
629
630  /**
631   * Constructor TextDelimited creates a new TextDelimited instance.
632   *
633   * @param fields      of type Fields
634   * @param skipHeader  of type boolean
635   * @param writeHeader of type boolean
636   * @param delimiter   of type String
637   * @param quote       of type String
638   */
639  @ConstructorProperties({"fields", "skipHeader", "writeHeader", "delimiter", "quote"})
640  public TextDelimited( Fields fields, boolean skipHeader, boolean writeHeader, String delimiter, String quote )
641    {
642    this( fields, null, skipHeader, writeHeader, delimiter, quote );
643    }
644
645  /**
646   * Constructor TextDelimited creates a new TextDelimited instance.
647   *
648   * @param fields          of type Fields
649   * @param sinkCompression of type Compress
650   * @param delimiter       of type String
651   * @param quote           of type String
652   */
653  @ConstructorProperties({"fields", "sinkCompression", "delimiter", "quote"})
654  public TextDelimited( Fields fields, Compress sinkCompression, String delimiter, String quote )
655    {
656    this( fields, sinkCompression, false, false, delimiter, true, quote, null, true );
657    }
658
659  /**
660   * Constructor TextDelimited creates a new TextDelimited instance.
661   *
662   * @param fields          of type Fields
663   * @param sinkCompression of type Compress
664   * @param hasHeader       of type boolean
665   * @param delimiter       of type String
666   * @param quote           of type String
667   */
668  @ConstructorProperties({"fields", "sinkCompression", "hasHeader", "delimiter", "quote"})
669  public TextDelimited( Fields fields, Compress sinkCompression, boolean hasHeader, String delimiter, String quote )
670    {
671    this( fields, sinkCompression, hasHeader, hasHeader, delimiter, true, quote, null, true );
672    }
673
674  /**
675   * Constructor TextDelimited creates a new TextDelimited instance.
676   *
677   * @param fields          of type Fields
678   * @param sinkCompression of type Compress
679   * @param hasHeader       of type boolean
680   * @param delimiter       of type String
681   * @param quote           of type String
682   * @param charsetName     of type String
683   */
684  @ConstructorProperties({"fields", "sinkCompression", "hasHeader", "delimiter", "quote", "charsetName"})
685  public TextDelimited( Fields fields, Compress sinkCompression, boolean hasHeader, String delimiter, String quote, String charsetName )
686    {
687    this( fields, sinkCompression, hasHeader, hasHeader, delimiter, true, quote, null, true, charsetName );
688    }
689
690  /**
691   * Constructor TextDelimited creates a new TextDelimited instance.
692   *
693   * @param fields          of type Fields
694   * @param sinkCompression of type Compress
695   * @param skipHeader      of type boolean
696   * @param writeHeader     of type boolean
697   * @param delimiter       of type String
698   * @param quote           of type String
699   */
700  @ConstructorProperties({"fields", "sinkCompression", "skipHeader", "writeHeader", "delimiter", "quote"})
701  public TextDelimited( Fields fields, Compress sinkCompression, boolean skipHeader, boolean writeHeader, String delimiter, String quote )
702    {
703    this( fields, sinkCompression, skipHeader, writeHeader, delimiter, true, quote, null, true );
704    }
705
706  /**
707   * Constructor TextDelimited creates a new TextDelimited instance.
708   *
709   * @param fields          of type Fields
710   * @param sinkCompression of type Compress
711   * @param delimiter       of type String
712   * @param quote           of type String
713   * @param types           of type Class[]
714   */
715  @ConstructorProperties({"fields", "sinkCompression", "delimiter", "quote", "types"})
716  public TextDelimited( Fields fields, Compress sinkCompression, String delimiter, String quote, Class[] types )
717    {
718    this( fields, sinkCompression, false, false, delimiter, true, quote, types, true );
719    }
720
721  /**
722   * Constructor TextDelimited creates a new TextDelimited instance.
723   *
724   * @param fields          of type Fields
725   * @param sinkCompression of type Compress
726   * @param hasHeader       of type boolean
727   * @param delimiter       of type String
728   * @param quote           of type String
729   * @param types           of type Class[]
730   */
731  @ConstructorProperties({"fields", "sinkCompression", "hasHeader", "delimiter", "quote", "types"})
732  public TextDelimited( Fields fields, Compress sinkCompression, boolean hasHeader, String delimiter, String quote, Class[] types )
733    {
734    this( fields, sinkCompression, hasHeader, hasHeader, delimiter, true, quote, types, true );
735    }
736
737  /**
738   * Constructor TextDelimited creates a new TextDelimited instance.
739   *
740   * @param fields          of type Fields
741   * @param sinkCompression of type Compress
742   * @param skipHeader      of type boolean
743   * @param writeHeader     of type boolean
744   * @param delimiter       of type String
745   * @param quote           of type String
746   * @param types           of type Class[]
747   */
748  @ConstructorProperties({"fields", "sinkCompression", "skipHeader", "writeHeader", "delimiter", "quote", "types"})
749  public TextDelimited( Fields fields, Compress sinkCompression, boolean skipHeader, boolean writeHeader, String delimiter, String quote, Class[] types )
750    {
751    this( fields, sinkCompression, skipHeader, writeHeader, delimiter, true, quote, types, true );
752    }
753
754  /**
755   * Constructor TextDelimited creates a new TextDelimited instance.
756   *
757   * @param fields          of type Fields
758   * @param sinkCompression of type Compress
759   * @param delimiter       of type String
760   * @param quote           of type String
761   * @param types           of type Class[]
762   * @param safe            of type boolean
763   */
764  @ConstructorProperties({"fields", "sinkCompression", "delimiter", "quote", "types", "safe"})
765  public TextDelimited( Fields fields, Compress sinkCompression, String delimiter, String quote, Class[] types, boolean safe )
766    {
767    this( fields, sinkCompression, false, false, delimiter, true, quote, types, safe );
768    }
769
770  /**
771   * Constructor TextDelimited creates a new TextDelimited instance.
772   *
773   * @param fields          of type Fields
774   * @param sinkCompression of type Compress
775   * @param hasHeader       of type boolean
776   * @param delimiter       of type String
777   * @param quote           of type String
778   * @param types           of type Class[]
779   * @param safe            of type boolean
780   */
781  @ConstructorProperties({"fields", "sinkCompression", "hasHeader", "delimiter", "quote", "types", "safe"})
782  public TextDelimited( Fields fields, Compress sinkCompression, boolean hasHeader, String delimiter, String quote, Class[] types, boolean safe )
783    {
784    this( fields, sinkCompression, hasHeader, hasHeader, delimiter, true, quote, types, safe );
785    }
786
787  /**
788   * Constructor TextDelimited creates a new TextDelimited instance.
789   *
790   * @param fields          of type Fields
791   * @param sinkCompression of type Compress
792   * @param skipHeader      of type boolean
793   * @param writeHeader     of type boolean
794   * @param delimiter       of type String
795   * @param quote           of type String
796   * @param types           of type Class[]
797   * @param safe            of type boolean
798   */
799  @ConstructorProperties({"fields", "sinkCompression", "skipHeader", "writeHeader", "delimiter", "quote", "types",
800                          "safe"})
801  public TextDelimited( Fields fields, Compress sinkCompression, boolean skipHeader, boolean writeHeader, String delimiter, String quote, Class[] types, boolean safe )
802    {
803    this( fields, sinkCompression, skipHeader, writeHeader, delimiter, true, quote, types, safe );
804    }
805
806  /**
807   * Constructor TextDelimited creates a new TextDelimited instance.
808   *
809   * @param fields          of type Fields
810   * @param sinkCompression of type Compress
811   * @param skipHeader      of type boolean
812   * @param delimiter       of type String
813   * @param strict          of type boolean
814   * @param quote           of type String
815   * @param types           of type Class[]
816   * @param safe            of type boolean
817   */
818  @ConstructorProperties({"fields", "sinkCompression", "skipHeader", "writeHeader", "delimiter", "strict", "quote",
819                          "types", "safe"})
820  public TextDelimited( Fields fields, Compress sinkCompression, boolean skipHeader, boolean writeHeader, String delimiter, boolean strict, String quote, Class[] types, boolean safe )
821    {
822    this( fields, sinkCompression, skipHeader, writeHeader, delimiter, strict, quote, types, safe, DEFAULT_CHARSET );
823    }
824
825  /**
826   * Constructor TextDelimited creates a new TextDelimited instance.
827   *
828   * @param fields          of type Fields
829   * @param sinkCompression of type Compress
830   * @param skipHeader      of type boolean
831   * @param delimiter       of type String
832   * @param strict          of type boolean
833   * @param quote           of type String
834   * @param types           of type Class[]
835   * @param safe            of type boolean
836   * @param charsetName     of type String
837   */
838  @ConstructorProperties({"fields", "sinkCompression", "skipHeader", "writeHeader", "delimiter", "strict", "quote",
839                          "types", "safe", "charsetName"})
840  public TextDelimited( Fields fields, Compress sinkCompression, boolean skipHeader, boolean writeHeader, String delimiter, boolean strict, String quote, Class[] types, boolean safe, String charsetName )
841    {
842    this( fields, sinkCompression, skipHeader, writeHeader, charsetName, new DelimitedParser( delimiter, quote, types, strict, safe ) );
843    }
844
845  /**
846   * Constructor TextDelimited creates a new TextDelimited instance.
847   *
848   * @param fields          of type Fields
849   * @param writeHeader     of type boolean
850   * @param delimitedParser of type DelimitedParser
851   */
852  @ConstructorProperties({"fields", "skipHeader", "writeHeader", "delimitedParser"})
853  public TextDelimited( Fields fields, boolean skipHeader, boolean writeHeader, DelimitedParser delimitedParser )
854    {
855    this( fields, null, skipHeader, writeHeader, null, delimitedParser );
856    }
857
858  /**
859   * Constructor TextDelimited creates a new TextDelimited instance.
860   *
861   * @param fields          of type Fields
862   * @param hasHeader       of type boolean
863   * @param delimitedParser of type DelimitedParser
864   */
865  @ConstructorProperties({"fields", "hasHeader", "delimitedParser"})
866  public TextDelimited( Fields fields, boolean hasHeader, DelimitedParser delimitedParser )
867    {
868    this( fields, null, hasHeader, hasHeader, null, delimitedParser );
869    }
870
871  /**
872   * Constructor TextDelimited creates a new TextDelimited instance.
873   *
874   * @param fields          of type Fields
875   * @param writeHeader     of type boolean
876   * @param delimitedParser of type DelimitedParser
877   */
878  @ConstructorProperties({"fields", "sinkCompression", "skipHeader", "writeHeader", "delimitedParser"})
879  public TextDelimited( Fields fields, Compress sinkCompression, boolean skipHeader, boolean writeHeader, DelimitedParser delimitedParser )
880    {
881    this( fields, sinkCompression, skipHeader, writeHeader, null, delimitedParser );
882    }
883
884  /**
885   * Constructor TextDelimited creates a new TextDelimited instance.
886   *
887   * @param fields          of type Fields
888   * @param sinkCompression of type Compress
889   * @param skipHeader      of type boolean
890   * @param writeHeader     of type boolean
891   * @param charsetName     of type String
892   * @param delimitedParser of type DelimitedParser
893   */
894  @ConstructorProperties({"fields", "sinkCompression", "skipHeader", "writeHeader", "charsetName", "delimitedParser"})
895  public TextDelimited( Fields fields, Compress sinkCompression, boolean skipHeader, boolean writeHeader, String charsetName, DelimitedParser delimitedParser )
896    {
897    super( sinkCompression );
898
899    this.delimitedParser = delimitedParser;
900
901    // normalizes ALL and UNKNOWN
902    setSinkFields( fields );
903    setSourceFields( fields );
904
905    this.skipHeader = skipHeader;
906    this.writeHeader = writeHeader;
907
908    // throws an exception if not found
909    setCharsetName( charsetName );
910    }
911
912  /**
913   * Method getDelimiter returns the delimiter used to parse fields from the current line of text.
914   *
915   * @return a String
916   */
917  @Property(name = "delimiter", visibility = Visibility.PUBLIC)
918  @PropertyDescription("The delimiter used to separate fields.")
919  public String getDelimiter()
920    {
921    return delimitedParser.getDelimiter();
922    }
923
924  /**
925   * Method getQuote returns the quote string, if any, used to encapsulate each field in a line to delimited text.
926   *
927   * @return a String
928   */
929  @Property(name = "quote", visibility = Visibility.PUBLIC)
930  @PropertyDescription("The string used for quoting.")
931  public String getQuote()
932    {
933    return delimitedParser.getQuote();
934    }
935
936  @Override
937  public boolean isSymmetrical()
938    {
939    return super.isSymmetrical() && skipHeader == writeHeader;
940    }
941
942  @Override
943  public void setSinkFields( Fields sinkFields )
944    {
945    super.setSourceFields( sinkFields );
946    super.setSinkFields( sinkFields );
947
948    if( delimitedParser != null )
949      delimitedParser.reset( getSourceFields(), getSinkFields() );
950    }
951
952  @Override
953  public void setSourceFields( Fields sourceFields )
954    {
955    super.setSourceFields( sourceFields );
956    super.setSinkFields( sourceFields );
957
958    if( delimitedParser != null )
959      delimitedParser.reset( getSourceFields(), getSinkFields() );
960    }
961
962  @Override
963  public Fields retrieveSourceFields( FlowProcess<? extends Configuration> flowProcess, Tap tap )
964    {
965    if( !skipHeader || !getSourceFields().isUnknown() )
966      return getSourceFields();
967
968    // no need to open them all
969    if( tap instanceof CompositeTap )
970      tap = (Tap) ( (CompositeTap) tap ).getChildTaps().next();
971
972    // should revert to file:// (Lfs) if tap is Lfs
973    tap = new Hfs( new TextLine( new Fields( "line" ), charsetName ), tap.getFullIdentifier( flowProcess ) );
974
975    setSourceFields( delimitedParser.parseFirstLine( flowProcess, tap ) );
976
977    return getSourceFields();
978    }
979
980  @Override
981  public void presentSourceFields( FlowProcess<? extends Configuration> flowProcess, Tap tap, Fields fields )
982    {
983    presentSourceFieldsInternal( fields );
984    }
985
986  @Override
987  public void presentSinkFields( FlowProcess<? extends Configuration> flowProcess, Tap tap, Fields fields )
988    {
989    presentSinkFieldsInternal( fields );
990    }
991
992  @Override
993  public void sourcePrepare( FlowProcess<? extends Configuration> flowProcess, SourceCall<Object[], RecordReader> sourceCall )
994    {
995    super.sourcePrepare( flowProcess, sourceCall );
996
997    sourceCall.getIncomingEntry().setTuple( TupleViews.createObjectArray() );
998    }
999
1000  @Override
1001  public boolean source( FlowProcess<? extends Configuration> flowProcess, SourceCall<Object[], RecordReader> sourceCall ) throws IOException
1002    {
1003    Object[] context = sourceCall.getContext();
1004
1005    if( !sourceCall.getInput().next( context[ 0 ], context[ 1 ] ) )
1006      return false;
1007
1008    if( skipHeader && ( (LongWritable) context[ 0 ] ).get() == 0 )
1009      {
1010      if( !sourceCall.getInput().next( context[ 0 ], context[ 1 ] ) )
1011        return false;
1012      }
1013
1014    // delegate coercion to delimitedParser for robustness
1015    Object[] split = delimitedParser.parseLine( makeEncodedString( context ) );
1016    Tuple tuple = sourceCall.getIncomingEntry().getTuple();
1017
1018    TupleViews.reset( tuple, split );
1019
1020    return true;
1021    }
1022
1023  @Override
1024  public void sinkPrepare( FlowProcess<? extends Configuration> flowProcess, SinkCall<Object[], OutputCollector> sinkCall ) throws IOException
1025    {
1026    sinkCall.setContext( new Object[ 3 ] );
1027
1028    sinkCall.getContext()[ 0 ] = new Text();
1029    sinkCall.getContext()[ 1 ] = new StringBuilder( 4 * 1024 );
1030    sinkCall.getContext()[ 2 ] = Charset.forName( charsetName );
1031
1032    if( writeHeader )
1033      writeHeader( sinkCall );
1034    }
1035
1036  protected void writeHeader( SinkCall<Object[], OutputCollector> sinkCall ) throws IOException
1037    {
1038    Fields fields = sinkCall.getOutgoingEntry().getFields();
1039
1040    Text text = (Text) sinkCall.getContext()[ 0 ];
1041    StringBuilder line = (StringBuilder) sinkCall.getContext()[ 1 ];
1042    Charset charset = (Charset) sinkCall.getContext()[ 2 ];
1043
1044    line = (StringBuilder) delimitedParser.joinFirstLine( fields, line );
1045
1046    text.set( line.toString().getBytes( charset ) );
1047
1048    sinkCall.getOutput().collect( null, text );
1049
1050    line.setLength( 0 );
1051    }
1052
1053  @Override
1054  public void sink( FlowProcess<? extends Configuration> flowProcess, SinkCall<Object[], OutputCollector> sinkCall ) throws IOException
1055    {
1056    TupleEntry tupleEntry = sinkCall.getOutgoingEntry();
1057
1058    Text text = (Text) sinkCall.getContext()[ 0 ];
1059    StringBuilder line = (StringBuilder) sinkCall.getContext()[ 1 ];
1060    Charset charset = (Charset) sinkCall.getContext()[ 2 ];
1061
1062    Iterable<String> strings = tupleEntry.asIterableOf( String.class );
1063
1064    line = (StringBuilder) delimitedParser.joinLine( strings, line );
1065
1066    text.set( line.toString().getBytes( charset ) );
1067
1068    sinkCall.getOutput().collect( null, text );
1069
1070    line.setLength( 0 );
1071    }
1072  }
1073