001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.scheme.hadoop;
022    
023    import java.beans.ConstructorProperties;
024    import java.io.IOException;
025    import java.nio.charset.Charset;
026    
027    import cascading.flow.FlowProcess;
028    import cascading.scheme.SinkCall;
029    import cascading.scheme.SourceCall;
030    import cascading.scheme.util.DelimitedParser;
031    import cascading.tap.CompositeTap;
032    import cascading.tap.Tap;
033    import cascading.tap.TapException;
034    import cascading.tap.hadoop.Hfs;
035    import cascading.tuple.Fields;
036    import cascading.tuple.Tuple;
037    import cascading.tuple.TupleEntry;
038    import cascading.tuple.util.TupleViews;
039    import org.apache.hadoop.io.LongWritable;
040    import org.apache.hadoop.io.Text;
041    import org.apache.hadoop.mapred.JobConf;
042    import org.apache.hadoop.mapred.OutputCollector;
043    import org.apache.hadoop.mapred.RecordReader;
044    
045    /**
046     * Class TextDelimited is a sub-class of {@link TextLine}. It provides direct support for delimited text files, like
047     * TAB (\t) or COMMA (,) delimited files. It also optionally allows for quoted values.
048     * <p/>
049     * TextDelimited may also be used to skip the "header" in a file, where the header is defined as the very first line
050     * in every input file. That is, if the byte offset of the current line from the input is zero (0), that line will
051     * be skipped.
052     * <p/>
053     * It is assumed if sink/source {@code fields} is set to either {@link Fields#ALL} or {@link Fields#UNKNOWN} and
054     * {@code skipHeader} or {@code hasHeader} is {@code true}, the field names will be retrieved from the header of the
055     * file and used during planning. The header will parsed with the same rules as the body of the file.
056     * <p/>
057     * By default headers are not skipped.
058     * <p/>
059     * TextDelimited may also be used to write a "header" in a file. The fields names for the header are taken directly
060     * from the declared fields. Or if the declared fields are {@link Fields#ALL} or {@link Fields#UNKNOWN}, the
061     * resolved field names will be used, if any.
062     * <p/>
063     * By default headers are not written.
064     * <p/>
065     * If {@code hasHeaders} is set to {@code true} on a constructor, both {@code skipHeader} and {@code writeHeader} will
066     * be set to {@code true}.
067     * <p/>
068     * By default this {@link cascading.scheme.Scheme} is both {@code strict} and {@code safe}.
069     * <p/>
070     * Strict meaning if a line of text does not parse into the expected number of fields, this class will throw a
071     * {@link TapException}. If strict is {@code false}, then {@link Tuple} will be returned with {@code null} values
072     * for the missing fields.
073     * <p/>
074     * Safe meaning if a field cannot be coerced into an expected type, a {@code null} will be used for the value.
075     * If safe is {@code false}, a {@link TapException} will be thrown.
076     * <p/>
077     * Also by default, {@code quote} strings are not searched for to improve processing speed. If a file is
078     * COMMA delimited but may have COMMA's in a value, the whole value should be surrounded by the quote string, typically
079     * double quotes ({@literal "}).
080     * <p/>
081     * Note all empty fields in a line will be returned as {@code null} unless coerced into a new type.
082     * <p/>
083     * This Scheme may source/sink {@link Fields#ALL}, when given on the constructor the new instance will automatically
084     * default to strict == false as the number of fields parsed are arbitrary or unknown. A type array may not be given
085     * either, so all values will be returned as Strings.
086     * <p/>
087     * By default, all text is encoded/decoded as UTF-8. This can be changed via the {@code charsetName} constructor
088     * argument.
089     * <p/>
090     * To override field and line parsing behaviors, sub-class {@link DelimitedParser} or provide a
091     * {@link cascading.scheme.util.FieldTypeResolver} implementation.
092     * <p/>
093     * Note that there should be no expectation that TextDelimited, or specifically {@link DelimitedParser}, can handle
094     * all delimited and quoted combinations reliably. Attempting to do so would impair its performance and maintainability.
095     * <p/>
096     * Further, it can be safely said any corrupted files will not be supported for obvious reasons. Corrupted files may
097     * result in exceptions or could cause edge cases in the underlying java regular expression engine.
098     * <p/>
099     * A large part of Cascading was designed to help users cleans data. Thus the recommendation is to create Flows that
100     * are responsible for cleansing large data-sets when faced with the problem
101     * <p/>
102     * DelimitedParser maybe sub-classed and extended if necessary.
103     *
104     * @see TextLine
105     */
106    public class TextDelimited extends TextLine
107      {
108      public static final String DEFAULT_CHARSET = "UTF-8";
109    
110      /** Field delimitedParser */
111      protected final DelimitedParser delimitedParser;
112      /** Field skipHeader */
113      private boolean skipHeader;
114      private final boolean writeHeader;
115    
116      /**
117       * Constructor TextDelimited creates a new TextDelimited instance sourcing {@link Fields#UNKNOWN}, sinking
118       * {@link Fields#ALL} and using TAB as the default delimiter.
119       * <p/>
120       * Use this constructor if the source and sink fields will be resolved during planning, for example, when using
121       * with a {@link cascading.pipe.Checkpoint} Tap.
122       */
123      public TextDelimited()
124        {
125        this( Fields.ALL, null, "\t", null, null );
126        }
127    
128      /**
129       * Constructor TextDelimited creates a new TextDelimited instance sourcing {@link Fields#UNKNOWN}, sinking
130       * {@link Fields#ALL} and using TAB as the default delimiter.
131       * <p/>
132       * Use this constructor if the source and sink fields will be resolved during planning, for example, when using
133       * with a {@link cascading.pipe.Checkpoint} Tap.
134       *
135       * @param hasHeader of type boolean
136       * @param delimiter of type String
137       */
138      @ConstructorProperties({"hasHeader", "delimiter"})
139      public TextDelimited( boolean hasHeader, String delimiter )
140        {
141        this( Fields.ALL, null, hasHeader, delimiter, null, (Class[]) null );
142        }
143    
144      /**
145       * Constructor TextDelimited creates a new TextDelimited instance sourcing {@link Fields#UNKNOWN}, sinking
146       * {@link Fields#ALL} and using TAB as the default delimiter.
147       * <p/>
148       * Use this constructor if the source and sink fields will be resolved during planning, for example, when using
149       * with a {@link cascading.pipe.Checkpoint} Tap.
150       *
151       * @param hasHeader of type boolean
152       * @param delimiter of type String
153       * @param quote     of type String
154       */
155      @ConstructorProperties({"hasHeader", "delimiter", "quote"})
156      public TextDelimited( boolean hasHeader, String delimiter, String quote )
157        {
158        this( Fields.ALL, null, hasHeader, delimiter, quote, (Class[]) null );
159        }
160    
161      /**
162       * Constructor TextDelimited creates a new TextDelimited instance sourcing {@link Fields#UNKNOWN}, sinking
163       * {@link Fields#ALL} and using the given delimitedParser instance for parsing.
164       * <p/>
165       * Use this constructor if the source and sink fields will be resolved during planning, for example, when using
166       * with a {@link cascading.pipe.Checkpoint} Tap.
167       *
168       * @param hasHeader       of type boolean
169       * @param delimitedParser of type DelimitedParser
170       */
171      @ConstructorProperties({"hasHeader", "delimitedParser"})
172      public TextDelimited( boolean hasHeader, DelimitedParser delimitedParser )
173        {
174        this( Fields.ALL, null, hasHeader, hasHeader, delimitedParser );
175        }
176    
177      /**
178       * Constructor TextDelimited creates a new TextDelimited instance sourcing {@link Fields#UNKNOWN}, sinking
179       * {@link Fields#ALL} and using the given delimitedParser instance for parsing.
180       * <p/>
181       * Use this constructor if the source and sink fields will be resolved during planning, for example, when using
182       * with a {@link cascading.pipe.Checkpoint} Tap.
183       * <p/>
184       * This constructor will set {@code skipHeader} and {@code writeHeader} values to true.
185       *
186       * @param delimitedParser of type DelimitedParser
187       */
188      @ConstructorProperties({"delimitedParser"})
189      public TextDelimited( DelimitedParser delimitedParser )
190        {
191        this( Fields.ALL, null, true, true, delimitedParser );
192        }
193    
194      /**
195       * Constructor TextDelimited creates a new TextDelimited instance sourcing {@link Fields#UNKNOWN}, sinking
196       * {@link Fields#ALL} and using the given delimitedParser instance for parsing.
197       * <p/>
198       * Use this constructor if the source and sink fields will be resolved during planning, for example, when using
199       * with a {@link cascading.pipe.Checkpoint} Tap.
200       *
201       * @param sinkCompression of type Compress
202       * @param hasHeader       of type boolean
203       * @param delimitedParser of type DelimitedParser
204       */
205      @ConstructorProperties({"sinkCompression", "hasHeader", "delimitedParser"})
206      public TextDelimited( Compress sinkCompression, boolean hasHeader, DelimitedParser delimitedParser )
207        {
208        this( Fields.ALL, sinkCompression, hasHeader, hasHeader, delimitedParser );
209        }
210    
211      /**
212       * Constructor TextDelimited creates a new TextDelimited instance sourcing {@link Fields#UNKNOWN}, sinking
213       * {@link Fields#ALL} and using the given delimitedParser instance for parsing.
214       * <p/>
215       * Use this constructor if the source and sink fields will be resolved during planning, for example, when using
216       * with a {@link cascading.pipe.Checkpoint} Tap.
217       * <p/>
218       * This constructor will set {@code skipHeader} and {@code writeHeader} values to true.
219       *
220       * @param delimitedParser of type DelimitedParser
221       */
222      @ConstructorProperties({"sinkCompression", "delimitedParser"})
223      public TextDelimited( Compress sinkCompression, DelimitedParser delimitedParser )
224        {
225        this( Fields.ALL, sinkCompression, true, true, delimitedParser );
226        }
227    
228      /**
229       * Constructor TextDelimited creates a new TextDelimited instance sourcing {@link Fields#UNKNOWN}, sinking
230       * {@link Fields#ALL} and using TAB as the default delimiter.
231       * <p/>
232       * Use this constructor if the source and sink fields will be resolved during planning, for example, when using
233       * with a {@link cascading.pipe.Checkpoint} Tap.
234       *
235       * @param sinkCompression of type Compress
236       * @param hasHeader       of type boolean
237       * @param delimiter       of type String
238       * @param quote           of type String
239       */
240      @ConstructorProperties({"sinkCompression", "hasHeader", "delimiter", "quote"})
241      public TextDelimited( Compress sinkCompression, boolean hasHeader, String delimiter, String quote )
242        {
243        this( Fields.ALL, sinkCompression, hasHeader, delimiter, quote, (Class[]) null );
244        }
245    
246      /**
247       * Constructor TextDelimited creates a new TextDelimited instance with TAB as the default delimiter.
248       *
249       * @param fields of type Fields
250       */
251      @ConstructorProperties({"fields"})
252      public TextDelimited( Fields fields )
253        {
254        this( fields, null, "\t", null, null );
255        }
256    
257      /**
258       * Constructor TextDelimited creates a new TextDelimited instance.
259       *
260       * @param fields    of type Fields
261       * @param delimiter of type String
262       */
263      @ConstructorProperties({"fields", "delimiter"})
264      public TextDelimited( Fields fields, String delimiter )
265        {
266        this( fields, null, delimiter, null, null );
267        }
268    
269      /**
270       * Constructor TextDelimited creates a new TextDelimited instance.
271       *
272       * @param fields    of type Fields
273       * @param hasHeader of type boolean
274       * @param delimiter of type String
275       */
276      @ConstructorProperties({"fields", "hasHeader", "delimiter"})
277      public TextDelimited( Fields fields, boolean hasHeader, String delimiter )
278        {
279        this( fields, null, hasHeader, hasHeader, delimiter, null, null );
280        }
281    
282      /**
283       * Constructor TextDelimited creates a new TextDelimited instance.
284       *
285       * @param fields      of type Fields
286       * @param skipHeader  of type boolean
287       * @param writeHeader of type boolean
288       * @param delimiter   of type String
289       */
290      @ConstructorProperties({"fields", "skipHeader", "writeHeader", "delimiter"})
291      public TextDelimited( Fields fields, boolean skipHeader, boolean writeHeader, String delimiter )
292        {
293        this( fields, null, skipHeader, writeHeader, delimiter, null, null );
294        }
295    
296      /**
297       * Constructor TextDelimited creates a new TextDelimited instance.
298       *
299       * @param fields    of type Fields
300       * @param delimiter of type String
301       * @param types     of type Class[]
302       */
303      @ConstructorProperties({"fields", "delimiter", "types"})
304      public TextDelimited( Fields fields, String delimiter, Class[] types )
305        {
306        this( fields, null, delimiter, null, types );
307        }
308    
309      /**
310       * Constructor TextDelimited creates a new TextDelimited instance.
311       *
312       * @param fields    of type Fields
313       * @param hasHeader of type boolean
314       * @param delimiter of type String
315       * @param types     of type Class[]
316       */
317      @ConstructorProperties({"fields", "hasHeader", "delimiter", "types"})
318      public TextDelimited( Fields fields, boolean hasHeader, String delimiter, Class[] types )
319        {
320        this( fields, null, hasHeader, hasHeader, delimiter, null, types );
321        }
322    
323      /**
324       * Constructor TextDelimited creates a new TextDelimited instance.
325       *
326       * @param fields      of type Fields
327       * @param skipHeader  of type boolean
328       * @param writeHeader of type boolean
329       * @param delimiter   of type String
330       * @param types       of type Class[]
331       */
332      @ConstructorProperties({"fields", "skipHeader", "writeHeader", "delimiter", "types"})
333      public TextDelimited( Fields fields, boolean skipHeader, boolean writeHeader, String delimiter, Class[] types )
334        {
335        this( fields, null, skipHeader, writeHeader, delimiter, null, types );
336        }
337    
338      /**
339       * Constructor TextDelimited creates a new TextDelimited instance.
340       *
341       * @param fields    of type Fields
342       * @param delimiter of type String
343       * @param quote     of type String
344       * @param types     of type Class[]
345       */
346      @ConstructorProperties({"fields", "delimiter", "quote", "types"})
347      public TextDelimited( Fields fields, String delimiter, String quote, Class[] types )
348        {
349        this( fields, null, delimiter, quote, types );
350        }
351    
352      /**
353       * Constructor TextDelimited creates a new TextDelimited instance.
354       *
355       * @param fields    of type Fields
356       * @param hasHeader of type boolean
357       * @param delimiter of type String
358       * @param quote     of type String
359       * @param types     of type Class[]
360       */
361      @ConstructorProperties({"fields", "hasHeader", "delimiter", "quote", "types"})
362      public TextDelimited( Fields fields, boolean hasHeader, String delimiter, String quote, Class[] types )
363        {
364        this( fields, null, hasHeader, hasHeader, delimiter, quote, types );
365        }
366    
367      /**
368       * Constructor TextDelimited creates a new TextDelimited instance.
369       *
370       * @param fields      of type Fields
371       * @param skipHeader  of type boolean
372       * @param writeHeader of type boolean
373       * @param delimiter   of type String
374       * @param quote       of type String
375       * @param types       of type Class[]
376       */
377      @ConstructorProperties({"fields", "skipHeader", "writeHeader", "delimiter", "quote", "types"})
378      public TextDelimited( Fields fields, boolean skipHeader, boolean writeHeader, String delimiter, String quote, Class[] types )
379        {
380        this( fields, null, skipHeader, writeHeader, delimiter, quote, types );
381        }
382    
383      /**
384       * Constructor TextDelimited creates a new TextDelimited instance.
385       *
386       * @param fields    of type Fields
387       * @param delimiter of type String
388       * @param quote     of type String
389       * @param types     of type Class[]
390       * @param safe      of type boolean
391       */
392      @ConstructorProperties({"fields", "delimiter", "quote", "types", "safe"})
393      public TextDelimited( Fields fields, String delimiter, String quote, Class[] types, boolean safe )
394        {
395        this( fields, null, delimiter, quote, types, safe );
396        }
397    
398      /**
399       * Constructor TextDelimited creates a new TextDelimited instance.
400       *
401       * @param fields    of type Fields
402       * @param hasHeader of type boolean
403       * @param delimiter of type String
404       * @param quote     of type String
405       * @param types     of type Class[]
406       * @param safe      of type boolean
407       */
408      @ConstructorProperties({"fields", "hasHeader", "delimiter", "quote", "types", "safe"})
409      public TextDelimited( Fields fields, boolean hasHeader, String delimiter, String quote, Class[] types, boolean safe )
410        {
411        this( fields, null, hasHeader, hasHeader, delimiter, quote, types, safe );
412        }
413    
414      /**
415       * Constructor TextDelimited creates a new TextDelimited instance.
416       *
417       * @param fields      of type Fields
418       * @param hasHeader   of type boolean
419       * @param delimiter   of type String
420       * @param quote       of type String
421       * @param types       of type Class[]
422       * @param safe        of type boolean
423       * @param charsetName of type String
424       */
425      @ConstructorProperties({"fields", "hasHeader", "delimiter", "quote", "types", "safe", "charsetName"})
426      public TextDelimited( Fields fields, boolean hasHeader, String delimiter, String quote, Class[] types, boolean safe, String charsetName )
427        {
428        this( fields, null, hasHeader, hasHeader, delimiter, true, quote, types, safe, charsetName );
429        }
430    
431      /**
432       * Constructor TextDelimited creates a new TextDelimited instance.
433       *
434       * @param fields      of type Fields
435       * @param skipHeader  of type boolean
436       * @param writeHeader of type boolean
437       * @param delimiter   of type String
438       * @param quote       of type String
439       * @param types       of type Class[]
440       * @param safe        of type boolean
441       */
442      @ConstructorProperties({"fields", "skipHeader", "writeHeader", "delimiter", "quote", "types", "safe"})
443      public TextDelimited( Fields fields, boolean skipHeader, boolean writeHeader, String delimiter, String quote, Class[] types, boolean safe )
444        {
445        this( fields, null, skipHeader, writeHeader, delimiter, quote, types, safe );
446        }
447    
448      /**
449       * Constructor TextDelimited creates a new TextDelimited instance.
450       *
451       * @param fields          of type Fields
452       * @param sinkCompression of type Compress
453       * @param delimiter       of type String
454       */
455      @ConstructorProperties({"fields", "sinkCompression", "delimiter"})
456      public TextDelimited( Fields fields, Compress sinkCompression, String delimiter )
457        {
458        this( fields, sinkCompression, delimiter, null, null );
459        }
460    
461      /**
462       * Constructor TextDelimited creates a new TextDelimited instance.
463       *
464       * @param fields          of type Fields
465       * @param sinkCompression of type Compress
466       * @param hasHeader       of type boolean
467       * @param delimiter       of type String
468       */
469      @ConstructorProperties({"fields", "sinkCompression", "hasHeader", "delimiter"})
470      public TextDelimited( Fields fields, Compress sinkCompression, boolean hasHeader, String delimiter )
471        {
472        this( fields, sinkCompression, hasHeader, hasHeader, delimiter, null, null );
473        }
474    
475      /**
476       * Constructor TextDelimited creates a new TextDelimited instance.
477       *
478       * @param fields          of type Fields
479       * @param sinkCompression of type Compress
480       * @param skipHeader      of type boolean
481       * @param writeHeader     of type boolean
482       * @param delimiter       of type String
483       */
484      @ConstructorProperties({"fields", "sinkCompression", "skipHeader", "writeHeader", "delimiter"})
485      public TextDelimited( Fields fields, Compress sinkCompression, boolean skipHeader, boolean writeHeader, String delimiter )
486        {
487        this( fields, sinkCompression, skipHeader, writeHeader, delimiter, null, null );
488        }
489    
490      /**
491       * Constructor TextDelimited creates a new TextDelimited instance.
492       *
493       * @param fields          of type Fields
494       * @param sinkCompression of type Compress
495       * @param delimiter       of type String
496       * @param types           of type Class[]
497       */
498      @ConstructorProperties({"fields", "sinkCompression", "delimiter", "types"})
499      public TextDelimited( Fields fields, Compress sinkCompression, String delimiter, Class[] types )
500        {
501        this( fields, sinkCompression, delimiter, null, types );
502        }
503    
504      /**
505       * Constructor TextDelimited creates a new TextDelimited instance.
506       *
507       * @param fields          of type Fields
508       * @param sinkCompression of type Compress
509       * @param hasHeader       of type boolean
510       * @param delimiter       of type String
511       * @param types           of type Class[]
512       */
513      @ConstructorProperties({"fields", "sinkCompression", "hasHeader", "delimiter", "types"})
514      public TextDelimited( Fields fields, Compress sinkCompression, boolean hasHeader, String delimiter, Class[] types )
515        {
516        this( fields, sinkCompression, hasHeader, hasHeader, delimiter, null, types );
517        }
518    
519      /**
520       * Constructor TextDelimited creates a new TextDelimited instance.
521       *
522       * @param fields          of type Fields
523       * @param sinkCompression of type Compress
524       * @param skipHeader      of type boolean
525       * @param writeHeader     of type boolean
526       * @param delimiter       of type String
527       * @param types           of type Class[]
528       */
529      @ConstructorProperties({"fields", "sinkCompression", "skipHeader", "writeHeader", "delimiter", "types"})
530      public TextDelimited( Fields fields, Compress sinkCompression, boolean skipHeader, boolean writeHeader, String delimiter, Class[] types )
531        {
532        this( fields, sinkCompression, skipHeader, writeHeader, delimiter, null, types );
533        }
534    
535      /**
536       * Constructor TextDelimited creates a new TextDelimited instance.
537       *
538       * @param fields          of type Fields
539       * @param sinkCompression of type Compress
540       * @param delimiter       of type String
541       * @param types           of type Class[]
542       * @param safe            of type boolean
543       */
544      @ConstructorProperties({"fields", "sinkCompression", "delimiter", "types", "safe"})
545      public TextDelimited( Fields fields, Compress sinkCompression, String delimiter, Class[] types, boolean safe )
546        {
547        this( fields, sinkCompression, delimiter, null, types, safe );
548        }
549    
550      /**
551       * Constructor TextDelimited creates a new TextDelimited instance.
552       *
553       * @param fields          of type Fields
554       * @param sinkCompression of type Compress
555       * @param hasHeader       of type boolean
556       * @param delimiter       of type String
557       * @param types           of type Class[]
558       * @param safe            of type boolean
559       */
560      @ConstructorProperties({"fields", "sinkCompression", "hasHeader", "delimiter", "types", "safe"})
561      public TextDelimited( Fields fields, Compress sinkCompression, boolean hasHeader, String delimiter, Class[] types, boolean safe )
562        {
563        this( fields, sinkCompression, hasHeader, hasHeader, delimiter, null, types, safe );
564        }
565    
566      /**
567       * Constructor TextDelimited creates a new TextDelimited instance.
568       *
569       * @param fields          of type Fields
570       * @param sinkCompression of type Compress
571       * @param hasHeader       of type boolean
572       * @param delimiter       of type String
573       * @param types           of type Class[]
574       * @param safe            of type boolean
575       * @param charsetName     of type String
576       */
577      @ConstructorProperties({"fields", "sinkCompression", "hasHeader", "delimiter", "types", "safe", "charsetName"})
578      public TextDelimited( Fields fields, Compress sinkCompression, boolean hasHeader, String delimiter, Class[] types, boolean safe, String charsetName )
579        {
580        this( fields, sinkCompression, hasHeader, hasHeader, delimiter, true, null, types, safe, charsetName );
581        }
582    
583      /**
584       * Constructor TextDelimited creates a new TextDelimited instance.
585       *
586       * @param fields          of type Fields
587       * @param sinkCompression of type Compress
588       * @param skipHeader      of type boolean
589       * @param writeHeader     of type boolean
590       * @param delimiter       of type String
591       * @param types           of type Class[]
592       * @param safe            of type boolean
593       */
594      @ConstructorProperties({"fields", "sinkCompression", "skipHeader", "writeHeader", "delimiter", "types", "safe"})
595      public TextDelimited( Fields fields, Compress sinkCompression, boolean skipHeader, boolean writeHeader, String delimiter, Class[] types, boolean safe )
596        {
597        this( fields, sinkCompression, skipHeader, writeHeader, delimiter, null, types, safe );
598        }
599    
600      /**
601       * Constructor TextDelimited creates a new TextDelimited instance.
602       *
603       * @param fields    of type Fields
604       * @param delimiter of type String
605       * @param quote     of type String
606       */
607      @ConstructorProperties({"fields", "delimiter", "quote"})
608      public TextDelimited( Fields fields, String delimiter, String quote )
609        {
610        this( fields, null, delimiter, quote );
611        }
612    
613      /**
614       * Constructor TextDelimited creates a new TextDelimited instance.
615       *
616       * @param fields    of type Fields
617       * @param hasHeader of type boolean
618       * @param delimiter of type String
619       * @param quote     of type String
620       */
621      @ConstructorProperties({"fields", "hasHeader", "delimiter", "quote"})
622      public TextDelimited( Fields fields, boolean hasHeader, String delimiter, String quote )
623        {
624        this( fields, null, hasHeader, hasHeader, delimiter, quote );
625        }
626    
627      /**
628       * Constructor TextDelimited creates a new TextDelimited instance.
629       *
630       * @param fields      of type Fields
631       * @param skipHeader  of type boolean
632       * @param writeHeader of type boolean
633       * @param delimiter   of type String
634       * @param quote       of type String
635       */
636      @ConstructorProperties({"fields", "skipHeader", "writeHeader", "delimiter", "quote"})
637      public TextDelimited( Fields fields, boolean skipHeader, boolean writeHeader, String delimiter, String quote )
638        {
639        this( fields, null, skipHeader, writeHeader, delimiter, quote );
640        }
641    
642      /**
643       * Constructor TextDelimited creates a new TextDelimited instance.
644       *
645       * @param fields          of type Fields
646       * @param sinkCompression of type Compress
647       * @param delimiter       of type String
648       * @param quote           of type String
649       */
650      @ConstructorProperties({"fields", "sinkCompression", "delimiter", "quote"})
651      public TextDelimited( Fields fields, Compress sinkCompression, String delimiter, String quote )
652        {
653        this( fields, sinkCompression, false, false, delimiter, true, quote, null, true );
654        }
655    
656      /**
657       * Constructor TextDelimited creates a new TextDelimited instance.
658       *
659       * @param fields          of type Fields
660       * @param sinkCompression of type Compress
661       * @param hasHeader       of type boolean
662       * @param delimiter       of type String
663       * @param quote           of type String
664       */
665      @ConstructorProperties({"fields", "sinkCompression", "hasHeader", "delimiter", "quote"})
666      public TextDelimited( Fields fields, Compress sinkCompression, boolean hasHeader, String delimiter, String quote )
667        {
668        this( fields, sinkCompression, hasHeader, hasHeader, delimiter, true, quote, null, true );
669        }
670    
671      /**
672       * Constructor TextDelimited creates a new TextDelimited instance.
673       *
674       * @param fields          of type Fields
675       * @param sinkCompression of type Compress
676       * @param hasHeader       of type boolean
677       * @param delimiter       of type String
678       * @param quote           of type String
679       * @param charsetName     of type String
680       */
681      @ConstructorProperties({"fields", "sinkCompression", "hasHeader", "delimiter", "quote", "charsetName"})
682      public TextDelimited( Fields fields, Compress sinkCompression, boolean hasHeader, String delimiter, String quote, String charsetName )
683        {
684        this( fields, sinkCompression, hasHeader, hasHeader, delimiter, true, quote, null, true, charsetName );
685        }
686    
687      /**
688       * Constructor TextDelimited creates a new TextDelimited instance.
689       *
690       * @param fields          of type Fields
691       * @param sinkCompression of type Compress
692       * @param skipHeader      of type boolean
693       * @param writeHeader     of type boolean
694       * @param delimiter       of type String
695       * @param quote           of type String
696       */
697      @ConstructorProperties({"fields", "sinkCompression", "skipHeader", "writeHeader", "delimiter", "quote"})
698      public TextDelimited( Fields fields, Compress sinkCompression, boolean skipHeader, boolean writeHeader, String delimiter, String quote )
699        {
700        this( fields, sinkCompression, skipHeader, writeHeader, delimiter, true, quote, null, true );
701        }
702    
703      /**
704       * Constructor TextDelimited creates a new TextDelimited instance.
705       *
706       * @param fields          of type Fields
707       * @param sinkCompression of type Compress
708       * @param delimiter       of type String
709       * @param quote           of type String
710       * @param types           of type Class[]
711       */
712      @ConstructorProperties({"fields", "sinkCompression", "delimiter", "quote", "types"})
713      public TextDelimited( Fields fields, Compress sinkCompression, String delimiter, String quote, Class[] types )
714        {
715        this( fields, sinkCompression, false, false, delimiter, true, quote, types, true );
716        }
717    
718      /**
719       * Constructor TextDelimited creates a new TextDelimited instance.
720       *
721       * @param fields          of type Fields
722       * @param sinkCompression of type Compress
723       * @param hasHeader       of type boolean
724       * @param delimiter       of type String
725       * @param quote           of type String
726       * @param types           of type Class[]
727       */
728      @ConstructorProperties({"fields", "sinkCompression", "hasHeader", "delimiter", "quote", "types"})
729      public TextDelimited( Fields fields, Compress sinkCompression, boolean hasHeader, String delimiter, String quote, Class[] types )
730        {
731        this( fields, sinkCompression, hasHeader, hasHeader, delimiter, true, quote, types, true );
732        }
733    
734      /**
735       * Constructor TextDelimited creates a new TextDelimited instance.
736       *
737       * @param fields          of type Fields
738       * @param sinkCompression of type Compress
739       * @param skipHeader      of type boolean
740       * @param writeHeader     of type boolean
741       * @param delimiter       of type String
742       * @param quote           of type String
743       * @param types           of type Class[]
744       */
745      @ConstructorProperties({"fields", "sinkCompression", "skipHeader", "writeHeader", "delimiter", "quote", "types"})
746      public TextDelimited( Fields fields, Compress sinkCompression, boolean skipHeader, boolean writeHeader, String delimiter, String quote, Class[] types )
747        {
748        this( fields, sinkCompression, skipHeader, writeHeader, delimiter, true, quote, types, true );
749        }
750    
751      /**
752       * Constructor TextDelimited creates a new TextDelimited instance.
753       *
754       * @param fields          of type Fields
755       * @param sinkCompression of type Compress
756       * @param delimiter       of type String
757       * @param quote           of type String
758       * @param types           of type Class[]
759       * @param safe            of type boolean
760       */
761      @ConstructorProperties({"fields", "sinkCompression", "delimiter", "quote", "types", "safe"})
762      public TextDelimited( Fields fields, Compress sinkCompression, String delimiter, String quote, Class[] types, boolean safe )
763        {
764        this( fields, sinkCompression, false, false, delimiter, true, quote, types, safe );
765        }
766    
767      /**
768       * Constructor TextDelimited creates a new TextDelimited instance.
769       *
770       * @param fields          of type Fields
771       * @param sinkCompression of type Compress
772       * @param hasHeader       of type boolean
773       * @param delimiter       of type String
774       * @param quote           of type String
775       * @param types           of type Class[]
776       * @param safe            of type boolean
777       */
778      @ConstructorProperties({"fields", "sinkCompression", "hasHeader", "delimiter", "quote", "types", "safe"})
779      public TextDelimited( Fields fields, Compress sinkCompression, boolean hasHeader, String delimiter, String quote, Class[] types, boolean safe )
780        {
781        this( fields, sinkCompression, hasHeader, hasHeader, delimiter, true, quote, types, safe );
782        }
783    
784      /**
785       * Constructor TextDelimited creates a new TextDelimited instance.
786       *
787       * @param fields          of type Fields
788       * @param sinkCompression of type Compress
789       * @param skipHeader      of type boolean
790       * @param writeHeader     of type boolean
791       * @param delimiter       of type String
792       * @param quote           of type String
793       * @param types           of type Class[]
794       * @param safe            of type boolean
795       */
796      @ConstructorProperties({"fields", "sinkCompression", "skipHeader", "writeHeader", "delimiter", "quote", "types",
797                              "safe"})
798      public TextDelimited( Fields fields, Compress sinkCompression, boolean skipHeader, boolean writeHeader, String delimiter, String quote, Class[] types, boolean safe )
799        {
800        this( fields, sinkCompression, skipHeader, writeHeader, delimiter, true, quote, types, safe );
801        }
802    
803      /**
804       * Constructor TextDelimited creates a new TextDelimited instance.
805       *
806       * @param fields          of type Fields
807       * @param sinkCompression of type Compress
808       * @param skipHeader      of type boolean
809       * @param delimiter       of type String
810       * @param strict          of type boolean
811       * @param quote           of type String
812       * @param types           of type Class[]
813       * @param safe            of type boolean
814       */
815      @ConstructorProperties({"fields", "sinkCompression", "skipHeader", "writeHeader", "delimiter", "strict", "quote",
816                              "types", "safe"})
817      public TextDelimited( Fields fields, Compress sinkCompression, boolean skipHeader, boolean writeHeader, String delimiter, boolean strict, String quote, Class[] types, boolean safe )
818        {
819        this( fields, sinkCompression, skipHeader, writeHeader, delimiter, strict, quote, types, safe, DEFAULT_CHARSET );
820        }
821    
822      /**
823       * Constructor TextDelimited creates a new TextDelimited instance.
824       *
825       * @param fields          of type Fields
826       * @param sinkCompression of type Compress
827       * @param skipHeader      of type boolean
828       * @param delimiter       of type String
829       * @param strict          of type boolean
830       * @param quote           of type String
831       * @param types           of type Class[]
832       * @param safe            of type boolean
833       * @param charsetName     of type String
834       */
835      @ConstructorProperties({"fields", "sinkCompression", "skipHeader", "writeHeader", "delimiter", "strict", "quote",
836                              "types", "safe", "charsetName"})
837      public TextDelimited( Fields fields, Compress sinkCompression, boolean skipHeader, boolean writeHeader, String delimiter, boolean strict, String quote, Class[] types, boolean safe, String charsetName )
838        {
839        this( fields, sinkCompression, skipHeader, writeHeader, charsetName, new DelimitedParser( delimiter, quote, types, strict, safe ) );
840        }
841    
842      /**
843       * Constructor TextDelimited creates a new TextDelimited instance.
844       *
845       * @param fields          of type Fields
846       * @param writeHeader     of type boolean
847       * @param delimitedParser of type DelimitedParser
848       */
849      @ConstructorProperties({"fields", "skipHeader", "writeHeader", "delimitedParser"})
850      public TextDelimited( Fields fields, boolean skipHeader, boolean writeHeader, DelimitedParser delimitedParser )
851        {
852        this( fields, null, skipHeader, writeHeader, null, delimitedParser );
853        }
854    
855      /**
856       * Constructor TextDelimited creates a new TextDelimited instance.
857       *
858       * @param fields          of type Fields
859       * @param hasHeader       of type boolean
860       * @param delimitedParser of type DelimitedParser
861       */
862      @ConstructorProperties({"fields", "hasHeader", "delimitedParser"})
863      public TextDelimited( Fields fields, boolean hasHeader, DelimitedParser delimitedParser )
864        {
865        this( fields, null, hasHeader, hasHeader, null, delimitedParser );
866        }
867    
868      /**
869       * Constructor TextDelimited creates a new TextDelimited instance.
870       *
871       * @param fields          of type Fields
872       * @param writeHeader     of type boolean
873       * @param delimitedParser of type DelimitedParser
874       */
875      @ConstructorProperties({"fields", "sinkCompression", "skipHeader", "writeHeader", "delimitedParser"})
876      public TextDelimited( Fields fields, Compress sinkCompression, boolean skipHeader, boolean writeHeader, DelimitedParser delimitedParser )
877        {
878        this( fields, sinkCompression, skipHeader, writeHeader, null, delimitedParser );
879        }
880    
881      /**
882       * Constructor TextDelimited creates a new TextDelimited instance.
883       *
884       * @param fields          of type Fields
885       * @param sinkCompression of type Compress
886       * @param skipHeader      of type boolean
887       * @param writeHeader     of type boolean
888       * @param charsetName     of type String
889       * @param delimitedParser of type DelimitedParser
890       */
891      @ConstructorProperties({"fields", "sinkCompression", "skipHeader", "writeHeader", "charsetName", "delimitedParser"})
892      public TextDelimited( Fields fields, Compress sinkCompression, boolean skipHeader, boolean writeHeader, String charsetName, DelimitedParser delimitedParser )
893        {
894        super( sinkCompression );
895    
896        this.delimitedParser = delimitedParser;
897    
898        // normalizes ALL and UNKNOWN
899        setSinkFields( fields );
900        setSourceFields( fields );
901    
902        this.skipHeader = skipHeader;
903        this.writeHeader = writeHeader;
904    
905        // throws an exception if not found
906        setCharsetName( charsetName );
907        }
908    
909      /**
910       * Method getDelimiter returns the delimiter used to parse fields from the current line of text.
911       *
912       * @return a String
913       */
914      public String getDelimiter()
915        {
916        return delimitedParser.getDelimiter();
917        }
918    
919      /**
920       * Method getQuote returns the quote string, if any, used to encapsulate each field in a line to delimited text.
921       *
922       * @return a String
923       */
924      public String getQuote()
925        {
926        return delimitedParser.getQuote();
927        }
928    
929      @Override
930      public boolean isSymmetrical()
931        {
932        return super.isSymmetrical() && skipHeader == writeHeader;
933        }
934    
935      @Override
936      public void setSinkFields( Fields sinkFields )
937        {
938        super.setSourceFields( sinkFields );
939        super.setSinkFields( sinkFields );
940    
941        if( delimitedParser != null )
942          delimitedParser.reset( getSourceFields(), getSinkFields() );
943        }
944    
945      @Override
946      public void setSourceFields( Fields sourceFields )
947        {
948        super.setSourceFields( sourceFields );
949        super.setSinkFields( sourceFields );
950    
951        if( delimitedParser != null )
952          delimitedParser.reset( getSourceFields(), getSinkFields() );
953        }
954    
955      @Override
956      public Fields retrieveSourceFields( FlowProcess<JobConf> flowProcess, Tap tap )
957        {
958        if( !skipHeader || !getSourceFields().isUnknown() )
959          return getSourceFields();
960    
961        // no need to open them all
962        if( tap instanceof CompositeTap )
963          tap = (Tap) ( (CompositeTap) tap ).getChildTaps().next();
964    
965        // should revert to file:// (Lfs) if tap is Lfs
966        tap = new Hfs( new TextLine( new Fields( "line" ), charsetName ), tap.getFullIdentifier( flowProcess.getConfigCopy() ) );
967    
968        setSourceFields( delimitedParser.parseFirstLine( flowProcess, tap ) );
969    
970        return getSourceFields();
971        }
972    
973      @Override
974      public void presentSourceFields( FlowProcess<JobConf> flowProcess, Tap tap, Fields fields )
975        {
976        presentSourceFieldsInternal( fields );
977        }
978    
979      @Override
980      public void presentSinkFields( FlowProcess<JobConf> flowProcess, Tap tap, Fields fields )
981        {
982        presentSinkFieldsInternal( fields );
983        }
984    
985      @Override
986      public void sourcePrepare( FlowProcess<JobConf> flowProcess, SourceCall<Object[], RecordReader> sourceCall )
987        {
988        super.sourcePrepare( flowProcess, sourceCall );
989    
990        sourceCall.getIncomingEntry().setTuple( TupleViews.createObjectArray() );
991        }
992    
993      @Override
994      public boolean source( FlowProcess<JobConf> flowProcess, SourceCall<Object[], RecordReader> sourceCall ) throws IOException
995        {
996        Object[] context = sourceCall.getContext();
997    
998        if( !sourceCall.getInput().next( context[ 0 ], context[ 1 ] ) )
999          return false;
1000    
1001        if( skipHeader && ( (LongWritable) context[ 0 ] ).get() == 0 )
1002          {
1003          if( !sourceCall.getInput().next( context[ 0 ], context[ 1 ] ) )
1004            return false;
1005          }
1006    
1007        // delegate coercion to delimitedParser for robustness
1008        Object[] split = delimitedParser.parseLine( makeEncodedString( context ) );
1009        Tuple tuple = sourceCall.getIncomingEntry().getTuple();
1010    
1011        TupleViews.reset( tuple, split );
1012    
1013        return true;
1014        }
1015    
1016      @Override
1017      public void sinkPrepare( FlowProcess<JobConf> flowProcess, SinkCall<Object[], OutputCollector> sinkCall ) throws IOException
1018        {
1019        sinkCall.setContext( new Object[ 3 ] );
1020    
1021        sinkCall.getContext()[ 0 ] = new Text();
1022        sinkCall.getContext()[ 1 ] = new StringBuilder( 4 * 1024 );
1023        sinkCall.getContext()[ 2 ] = Charset.forName( charsetName );
1024    
1025        if( writeHeader )
1026          writeHeader( sinkCall );
1027        }
1028    
1029      protected void writeHeader( SinkCall<Object[], OutputCollector> sinkCall ) throws IOException
1030        {
1031        Fields fields = sinkCall.getOutgoingEntry().getFields();
1032    
1033        Text text = (Text) sinkCall.getContext()[ 0 ];
1034        StringBuilder line = (StringBuilder) sinkCall.getContext()[ 1 ];
1035        Charset charset = (Charset) sinkCall.getContext()[ 2 ];
1036    
1037        line = (StringBuilder) delimitedParser.joinFirstLine( fields, line );
1038    
1039        text.set( line.toString().getBytes( charset ) );
1040    
1041        sinkCall.getOutput().collect( null, text );
1042    
1043        line.setLength( 0 );
1044        }
1045    
1046      @Override
1047      public void sink( FlowProcess<JobConf> flowProcess, SinkCall<Object[], OutputCollector> sinkCall ) throws IOException
1048        {
1049        TupleEntry tupleEntry = sinkCall.getOutgoingEntry();
1050    
1051        Text text = (Text) sinkCall.getContext()[ 0 ];
1052        StringBuilder line = (StringBuilder) sinkCall.getContext()[ 1 ];
1053        Charset charset = (Charset) sinkCall.getContext()[ 2 ];
1054    
1055        Iterable<String> strings = tupleEntry.asIterableOf( String.class );
1056    
1057        line = (StringBuilder) delimitedParser.joinLine( strings, line );
1058    
1059        text.set( line.toString().getBytes( charset ) );
1060    
1061        sinkCall.getOutput().collect( null, text );
1062    
1063        line.setLength( 0 );
1064        }
1065      }
1066