001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.scheme.hadoop;
022    
023    import java.beans.ConstructorProperties;
024    import java.io.IOException;
025    import java.nio.charset.Charset;
026    
027    import cascading.flow.FlowProcess;
028    import cascading.management.annotation.Property;
029    import cascading.management.annotation.PropertyDescription;
030    import cascading.management.annotation.Visibility;
031    import cascading.scheme.SinkCall;
032    import cascading.scheme.SourceCall;
033    import cascading.scheme.util.DelimitedParser;
034    import cascading.tap.CompositeTap;
035    import cascading.tap.Tap;
036    import cascading.tap.TapException;
037    import cascading.tap.hadoop.Hfs;
038    import cascading.tuple.Fields;
039    import cascading.tuple.Tuple;
040    import cascading.tuple.TupleEntry;
041    import cascading.tuple.util.TupleViews;
042    import org.apache.hadoop.io.LongWritable;
043    import org.apache.hadoop.io.Text;
044    import org.apache.hadoop.mapred.JobConf;
045    import org.apache.hadoop.mapred.OutputCollector;
046    import org.apache.hadoop.mapred.RecordReader;
047    
048    /**
049     * Class TextDelimited is a sub-class of {@link TextLine}. It provides direct support for delimited text files, like
050     * TAB (\t) or COMMA (,) delimited files. It also optionally allows for quoted values.
051     * <p/>
052     * TextDelimited may also be used to skip the "header" in a file, where the header is defined as the very first line
053     * in every input file. That is, if the byte offset of the current line from the input is zero (0), that line will
054     * be skipped.
055     * <p/>
056     * It is assumed if sink/source {@code fields} is set to either {@link Fields#ALL} or {@link Fields#UNKNOWN} and
057     * {@code skipHeader} or {@code hasHeader} is {@code true}, the field names will be retrieved from the header of the
058     * file and used during planning. The header will parsed with the same rules as the body of the file.
059     * <p/>
060     * By default headers are not skipped.
061     * <p/>
062     * TextDelimited may also be used to write a "header" in a file. The fields names for the header are taken directly
063     * from the declared fields. Or if the declared fields are {@link Fields#ALL} or {@link Fields#UNKNOWN}, the
064     * resolved field names will be used, if any.
065     * <p/>
066     * By default headers are not written.
067     * <p/>
068     * If {@code hasHeaders} is set to {@code true} on a constructor, both {@code skipHeader} and {@code writeHeader} will
069     * be set to {@code true}.
070     * <p/>
071     * By default this {@link cascading.scheme.Scheme} is both {@code strict} and {@code safe}.
072     * <p/>
073     * Strict meaning if a line of text does not parse into the expected number of fields, this class will throw a
074     * {@link TapException}. If strict is {@code false}, then {@link Tuple} will be returned with {@code null} values
075     * for the missing fields.
076     * <p/>
077     * Safe meaning if a field cannot be coerced into an expected type, a {@code null} will be used for the value.
078     * If safe is {@code false}, a {@link TapException} will be thrown.
079     * <p/>
080     * Also by default, {@code quote} strings are not searched for to improve processing speed. If a file is
081     * COMMA delimited but may have COMMA's in a value, the whole value should be surrounded by the quote string, typically
082     * double quotes ({@literal "}).
083     * <p/>
084     * Note all empty fields in a line will be returned as {@code null} unless coerced into a new type.
085     * <p/>
086     * This Scheme may source/sink {@link Fields#ALL}, when given on the constructor the new instance will automatically
087     * default to strict == false as the number of fields parsed are arbitrary or unknown. A type array may not be given
088     * either, so all values will be returned as Strings.
089     * <p/>
090     * By default, all text is encoded/decoded as UTF-8. This can be changed via the {@code charsetName} constructor
091     * argument.
092     * <p/>
093     * To override field and line parsing behaviors, sub-class {@link DelimitedParser} or provide a
094     * {@link cascading.scheme.util.FieldTypeResolver} implementation.
095     * <p/>
096     * Note that there should be no expectation that TextDelimited, or specifically {@link DelimitedParser}, can handle
097     * all delimited and quoted combinations reliably. Attempting to do so would impair its performance and maintainability.
098     * <p/>
099     * Further, it can be safely said any corrupted files will not be supported for obvious reasons. Corrupted files may
100     * result in exceptions or could cause edge cases in the underlying java regular expression engine.
101     * <p/>
102     * A large part of Cascading was designed to help users cleans data. Thus the recommendation is to create Flows that
103     * are responsible for cleansing large data-sets when faced with the problem
104     * <p/>
105     * DelimitedParser maybe sub-classed and extended if necessary.
106     *
107     * @see TextLine
108     */
109    public class TextDelimited extends TextLine
110      {
111      public static final String DEFAULT_CHARSET = "UTF-8";
112    
113      /** Field delimitedParser */
114      protected final DelimitedParser delimitedParser;
115      /** Field skipHeader */
116      private boolean skipHeader;
117      private final boolean writeHeader;
118    
119      /**
120       * Constructor TextDelimited creates a new TextDelimited instance sourcing {@link Fields#UNKNOWN}, sinking
121       * {@link Fields#ALL} and using TAB as the default delimiter.
122       * <p/>
123       * Use this constructor if the source and sink fields will be resolved during planning, for example, when using
124       * with a {@link cascading.pipe.Checkpoint} Tap.
125       */
126      public TextDelimited()
127        {
128        this( Fields.ALL, null, "\t", null, null );
129        }
130    
131      /**
132       * Constructor TextDelimited creates a new TextDelimited instance sourcing {@link Fields#UNKNOWN}, sinking
133       * {@link Fields#ALL} and using TAB as the default delimiter.
134       * <p/>
135       * Use this constructor if the source and sink fields will be resolved during planning, for example, when using
136       * with a {@link cascading.pipe.Checkpoint} Tap.
137       *
138       * @param hasHeader of type boolean
139       * @param delimiter of type String
140       */
141      @ConstructorProperties({"hasHeader", "delimiter"})
142      public TextDelimited( boolean hasHeader, String delimiter )
143        {
144        this( Fields.ALL, null, hasHeader, delimiter, null, (Class[]) null );
145        }
146    
147      /**
148       * Constructor TextDelimited creates a new TextDelimited instance sourcing {@link Fields#UNKNOWN}, sinking
149       * {@link Fields#ALL} and using TAB as the default delimiter.
150       * <p/>
151       * Use this constructor if the source and sink fields will be resolved during planning, for example, when using
152       * with a {@link cascading.pipe.Checkpoint} Tap.
153       *
154       * @param hasHeader of type boolean
155       * @param delimiter of type String
156       * @param quote     of type String
157       */
158      @ConstructorProperties({"hasHeader", "delimiter", "quote"})
159      public TextDelimited( boolean hasHeader, String delimiter, String quote )
160        {
161        this( Fields.ALL, null, hasHeader, delimiter, quote, (Class[]) null );
162        }
163    
164      /**
165       * Constructor TextDelimited creates a new TextDelimited instance sourcing {@link Fields#UNKNOWN}, sinking
166       * {@link Fields#ALL} and using the given delimitedParser instance for parsing.
167       * <p/>
168       * Use this constructor if the source and sink fields will be resolved during planning, for example, when using
169       * with a {@link cascading.pipe.Checkpoint} Tap.
170       *
171       * @param hasHeader       of type boolean
172       * @param delimitedParser of type DelimitedParser
173       */
174      @ConstructorProperties({"hasHeader", "delimitedParser"})
175      public TextDelimited( boolean hasHeader, DelimitedParser delimitedParser )
176        {
177        this( Fields.ALL, null, hasHeader, hasHeader, delimitedParser );
178        }
179    
180      /**
181       * Constructor TextDelimited creates a new TextDelimited instance sourcing {@link Fields#UNKNOWN}, sinking
182       * {@link Fields#ALL} and using the given delimitedParser instance for parsing.
183       * <p/>
184       * Use this constructor if the source and sink fields will be resolved during planning, for example, when using
185       * with a {@link cascading.pipe.Checkpoint} Tap.
186       * <p/>
187       * This constructor will set {@code skipHeader} and {@code writeHeader} values to true.
188       *
189       * @param delimitedParser of type DelimitedParser
190       */
191      @ConstructorProperties({"delimitedParser"})
192      public TextDelimited( DelimitedParser delimitedParser )
193        {
194        this( Fields.ALL, null, true, true, delimitedParser );
195        }
196    
197      /**
198       * Constructor TextDelimited creates a new TextDelimited instance sourcing {@link Fields#UNKNOWN}, sinking
199       * {@link Fields#ALL} and using the given delimitedParser instance for parsing.
200       * <p/>
201       * Use this constructor if the source and sink fields will be resolved during planning, for example, when using
202       * with a {@link cascading.pipe.Checkpoint} Tap.
203       *
204       * @param sinkCompression of type Compress
205       * @param hasHeader       of type boolean
206       * @param delimitedParser of type DelimitedParser
207       */
208      @ConstructorProperties({"sinkCompression", "hasHeader", "delimitedParser"})
209      public TextDelimited( Compress sinkCompression, boolean hasHeader, DelimitedParser delimitedParser )
210        {
211        this( Fields.ALL, sinkCompression, hasHeader, hasHeader, delimitedParser );
212        }
213    
214      /**
215       * Constructor TextDelimited creates a new TextDelimited instance sourcing {@link Fields#UNKNOWN}, sinking
216       * {@link Fields#ALL} and using the given delimitedParser instance for parsing.
217       * <p/>
218       * Use this constructor if the source and sink fields will be resolved during planning, for example, when using
219       * with a {@link cascading.pipe.Checkpoint} Tap.
220       * <p/>
221       * This constructor will set {@code skipHeader} and {@code writeHeader} values to true.
222       *
223       * @param delimitedParser of type DelimitedParser
224       */
225      @ConstructorProperties({"sinkCompression", "delimitedParser"})
226      public TextDelimited( Compress sinkCompression, DelimitedParser delimitedParser )
227        {
228        this( Fields.ALL, sinkCompression, true, true, delimitedParser );
229        }
230    
231      /**
232       * Constructor TextDelimited creates a new TextDelimited instance sourcing {@link Fields#UNKNOWN}, sinking
233       * {@link Fields#ALL} and using TAB as the default delimiter.
234       * <p/>
235       * Use this constructor if the source and sink fields will be resolved during planning, for example, when using
236       * with a {@link cascading.pipe.Checkpoint} Tap.
237       *
238       * @param sinkCompression of type Compress
239       * @param hasHeader       of type boolean
240       * @param delimiter       of type String
241       * @param quote           of type String
242       */
243      @ConstructorProperties({"sinkCompression", "hasHeader", "delimiter", "quote"})
244      public TextDelimited( Compress sinkCompression, boolean hasHeader, String delimiter, String quote )
245        {
246        this( Fields.ALL, sinkCompression, hasHeader, delimiter, quote, (Class[]) null );
247        }
248    
249      /**
250       * Constructor TextDelimited creates a new TextDelimited instance with TAB as the default delimiter.
251       *
252       * @param fields of type Fields
253       */
254      @ConstructorProperties({"fields"})
255      public TextDelimited( Fields fields )
256        {
257        this( fields, null, "\t", null, null );
258        }
259    
260      /**
261       * Constructor TextDelimited creates a new TextDelimited instance.
262       *
263       * @param fields    of type Fields
264       * @param delimiter of type String
265       */
266      @ConstructorProperties({"fields", "delimiter"})
267      public TextDelimited( Fields fields, String delimiter )
268        {
269        this( fields, null, delimiter, null, null );
270        }
271    
272      /**
273       * Constructor TextDelimited creates a new TextDelimited instance.
274       *
275       * @param fields    of type Fields
276       * @param hasHeader of type boolean
277       * @param delimiter of type String
278       */
279      @ConstructorProperties({"fields", "hasHeader", "delimiter"})
280      public TextDelimited( Fields fields, boolean hasHeader, String delimiter )
281        {
282        this( fields, null, hasHeader, hasHeader, delimiter, null, null );
283        }
284    
285      /**
286       * Constructor TextDelimited creates a new TextDelimited instance.
287       *
288       * @param fields      of type Fields
289       * @param skipHeader  of type boolean
290       * @param writeHeader of type boolean
291       * @param delimiter   of type String
292       */
293      @ConstructorProperties({"fields", "skipHeader", "writeHeader", "delimiter"})
294      public TextDelimited( Fields fields, boolean skipHeader, boolean writeHeader, String delimiter )
295        {
296        this( fields, null, skipHeader, writeHeader, delimiter, null, null );
297        }
298    
299      /**
300       * Constructor TextDelimited creates a new TextDelimited instance.
301       *
302       * @param fields    of type Fields
303       * @param delimiter of type String
304       * @param types     of type Class[]
305       */
306      @ConstructorProperties({"fields", "delimiter", "types"})
307      public TextDelimited( Fields fields, String delimiter, Class[] types )
308        {
309        this( fields, null, delimiter, null, types );
310        }
311    
312      /**
313       * Constructor TextDelimited creates a new TextDelimited instance.
314       *
315       * @param fields    of type Fields
316       * @param hasHeader of type boolean
317       * @param delimiter of type String
318       * @param types     of type Class[]
319       */
320      @ConstructorProperties({"fields", "hasHeader", "delimiter", "types"})
321      public TextDelimited( Fields fields, boolean hasHeader, String delimiter, Class[] types )
322        {
323        this( fields, null, hasHeader, hasHeader, delimiter, null, types );
324        }
325    
326      /**
327       * Constructor TextDelimited creates a new TextDelimited instance.
328       *
329       * @param fields      of type Fields
330       * @param skipHeader  of type boolean
331       * @param writeHeader of type boolean
332       * @param delimiter   of type String
333       * @param types       of type Class[]
334       */
335      @ConstructorProperties({"fields", "skipHeader", "writeHeader", "delimiter", "types"})
336      public TextDelimited( Fields fields, boolean skipHeader, boolean writeHeader, String delimiter, Class[] types )
337        {
338        this( fields, null, skipHeader, writeHeader, delimiter, null, types );
339        }
340    
341      /**
342       * Constructor TextDelimited creates a new TextDelimited instance.
343       *
344       * @param fields    of type Fields
345       * @param delimiter of type String
346       * @param quote     of type String
347       * @param types     of type Class[]
348       */
349      @ConstructorProperties({"fields", "delimiter", "quote", "types"})
350      public TextDelimited( Fields fields, String delimiter, String quote, Class[] types )
351        {
352        this( fields, null, delimiter, quote, types );
353        }
354    
355      /**
356       * Constructor TextDelimited creates a new TextDelimited instance.
357       *
358       * @param fields    of type Fields
359       * @param hasHeader of type boolean
360       * @param delimiter of type String
361       * @param quote     of type String
362       * @param types     of type Class[]
363       */
364      @ConstructorProperties({"fields", "hasHeader", "delimiter", "quote", "types"})
365      public TextDelimited( Fields fields, boolean hasHeader, String delimiter, String quote, Class[] types )
366        {
367        this( fields, null, hasHeader, hasHeader, delimiter, quote, types );
368        }
369    
370      /**
371       * Constructor TextDelimited creates a new TextDelimited instance.
372       *
373       * @param fields      of type Fields
374       * @param skipHeader  of type boolean
375       * @param writeHeader of type boolean
376       * @param delimiter   of type String
377       * @param quote       of type String
378       * @param types       of type Class[]
379       */
380      @ConstructorProperties({"fields", "skipHeader", "writeHeader", "delimiter", "quote", "types"})
381      public TextDelimited( Fields fields, boolean skipHeader, boolean writeHeader, String delimiter, String quote, Class[] types )
382        {
383        this( fields, null, skipHeader, writeHeader, delimiter, quote, types );
384        }
385    
386      /**
387       * Constructor TextDelimited creates a new TextDelimited instance.
388       *
389       * @param fields    of type Fields
390       * @param delimiter of type String
391       * @param quote     of type String
392       * @param types     of type Class[]
393       * @param safe      of type boolean
394       */
395      @ConstructorProperties({"fields", "delimiter", "quote", "types", "safe"})
396      public TextDelimited( Fields fields, String delimiter, String quote, Class[] types, boolean safe )
397        {
398        this( fields, null, delimiter, quote, types, safe );
399        }
400    
401      /**
402       * Constructor TextDelimited creates a new TextDelimited instance.
403       *
404       * @param fields    of type Fields
405       * @param hasHeader of type boolean
406       * @param delimiter of type String
407       * @param quote     of type String
408       * @param types     of type Class[]
409       * @param safe      of type boolean
410       */
411      @ConstructorProperties({"fields", "hasHeader", "delimiter", "quote", "types", "safe"})
412      public TextDelimited( Fields fields, boolean hasHeader, String delimiter, String quote, Class[] types, boolean safe )
413        {
414        this( fields, null, hasHeader, hasHeader, delimiter, quote, types, safe );
415        }
416    
417      /**
418       * Constructor TextDelimited creates a new TextDelimited instance.
419       *
420       * @param fields      of type Fields
421       * @param hasHeader   of type boolean
422       * @param delimiter   of type String
423       * @param quote       of type String
424       * @param types       of type Class[]
425       * @param safe        of type boolean
426       * @param charsetName of type String
427       */
428      @ConstructorProperties({"fields", "hasHeader", "delimiter", "quote", "types", "safe", "charsetName"})
429      public TextDelimited( Fields fields, boolean hasHeader, String delimiter, String quote, Class[] types, boolean safe, String charsetName )
430        {
431        this( fields, null, hasHeader, hasHeader, delimiter, true, quote, types, safe, charsetName );
432        }
433    
434      /**
435       * Constructor TextDelimited creates a new TextDelimited instance.
436       *
437       * @param fields      of type Fields
438       * @param skipHeader  of type boolean
439       * @param writeHeader of type boolean
440       * @param delimiter   of type String
441       * @param quote       of type String
442       * @param types       of type Class[]
443       * @param safe        of type boolean
444       */
445      @ConstructorProperties({"fields", "skipHeader", "writeHeader", "delimiter", "quote", "types", "safe"})
446      public TextDelimited( Fields fields, boolean skipHeader, boolean writeHeader, String delimiter, String quote, Class[] types, boolean safe )
447        {
448        this( fields, null, skipHeader, writeHeader, delimiter, quote, types, safe );
449        }
450    
451      /**
452       * Constructor TextDelimited creates a new TextDelimited instance.
453       *
454       * @param fields          of type Fields
455       * @param sinkCompression of type Compress
456       * @param delimiter       of type String
457       */
458      @ConstructorProperties({"fields", "sinkCompression", "delimiter"})
459      public TextDelimited( Fields fields, Compress sinkCompression, String delimiter )
460        {
461        this( fields, sinkCompression, delimiter, null, null );
462        }
463    
464      /**
465       * Constructor TextDelimited creates a new TextDelimited instance.
466       *
467       * @param fields          of type Fields
468       * @param sinkCompression of type Compress
469       * @param hasHeader       of type boolean
470       * @param delimiter       of type String
471       */
472      @ConstructorProperties({"fields", "sinkCompression", "hasHeader", "delimiter"})
473      public TextDelimited( Fields fields, Compress sinkCompression, boolean hasHeader, String delimiter )
474        {
475        this( fields, sinkCompression, hasHeader, hasHeader, delimiter, null, null );
476        }
477    
478      /**
479       * Constructor TextDelimited creates a new TextDelimited instance.
480       *
481       * @param fields          of type Fields
482       * @param sinkCompression of type Compress
483       * @param skipHeader      of type boolean
484       * @param writeHeader     of type boolean
485       * @param delimiter       of type String
486       */
487      @ConstructorProperties({"fields", "sinkCompression", "skipHeader", "writeHeader", "delimiter"})
488      public TextDelimited( Fields fields, Compress sinkCompression, boolean skipHeader, boolean writeHeader, String delimiter )
489        {
490        this( fields, sinkCompression, skipHeader, writeHeader, delimiter, null, null );
491        }
492    
493      /**
494       * Constructor TextDelimited creates a new TextDelimited instance.
495       *
496       * @param fields          of type Fields
497       * @param sinkCompression of type Compress
498       * @param delimiter       of type String
499       * @param types           of type Class[]
500       */
501      @ConstructorProperties({"fields", "sinkCompression", "delimiter", "types"})
502      public TextDelimited( Fields fields, Compress sinkCompression, String delimiter, Class[] types )
503        {
504        this( fields, sinkCompression, delimiter, null, types );
505        }
506    
507      /**
508       * Constructor TextDelimited creates a new TextDelimited instance.
509       *
510       * @param fields          of type Fields
511       * @param sinkCompression of type Compress
512       * @param hasHeader       of type boolean
513       * @param delimiter       of type String
514       * @param types           of type Class[]
515       */
516      @ConstructorProperties({"fields", "sinkCompression", "hasHeader", "delimiter", "types"})
517      public TextDelimited( Fields fields, Compress sinkCompression, boolean hasHeader, String delimiter, Class[] types )
518        {
519        this( fields, sinkCompression, hasHeader, hasHeader, delimiter, null, types );
520        }
521    
522      /**
523       * Constructor TextDelimited creates a new TextDelimited instance.
524       *
525       * @param fields          of type Fields
526       * @param sinkCompression of type Compress
527       * @param skipHeader      of type boolean
528       * @param writeHeader     of type boolean
529       * @param delimiter       of type String
530       * @param types           of type Class[]
531       */
532      @ConstructorProperties({"fields", "sinkCompression", "skipHeader", "writeHeader", "delimiter", "types"})
533      public TextDelimited( Fields fields, Compress sinkCompression, boolean skipHeader, boolean writeHeader, String delimiter, Class[] types )
534        {
535        this( fields, sinkCompression, skipHeader, writeHeader, delimiter, null, types );
536        }
537    
538      /**
539       * Constructor TextDelimited creates a new TextDelimited instance.
540       *
541       * @param fields          of type Fields
542       * @param sinkCompression of type Compress
543       * @param delimiter       of type String
544       * @param types           of type Class[]
545       * @param safe            of type boolean
546       */
547      @ConstructorProperties({"fields", "sinkCompression", "delimiter", "types", "safe"})
548      public TextDelimited( Fields fields, Compress sinkCompression, String delimiter, Class[] types, boolean safe )
549        {
550        this( fields, sinkCompression, delimiter, null, types, safe );
551        }
552    
553      /**
554       * Constructor TextDelimited creates a new TextDelimited instance.
555       *
556       * @param fields          of type Fields
557       * @param sinkCompression of type Compress
558       * @param hasHeader       of type boolean
559       * @param delimiter       of type String
560       * @param types           of type Class[]
561       * @param safe            of type boolean
562       */
563      @ConstructorProperties({"fields", "sinkCompression", "hasHeader", "delimiter", "types", "safe"})
564      public TextDelimited( Fields fields, Compress sinkCompression, boolean hasHeader, String delimiter, Class[] types, boolean safe )
565        {
566        this( fields, sinkCompression, hasHeader, hasHeader, delimiter, null, types, safe );
567        }
568    
569      /**
570       * Constructor TextDelimited creates a new TextDelimited instance.
571       *
572       * @param fields          of type Fields
573       * @param sinkCompression of type Compress
574       * @param hasHeader       of type boolean
575       * @param delimiter       of type String
576       * @param types           of type Class[]
577       * @param safe            of type boolean
578       * @param charsetName     of type String
579       */
580      @ConstructorProperties({"fields", "sinkCompression", "hasHeader", "delimiter", "types", "safe", "charsetName"})
581      public TextDelimited( Fields fields, Compress sinkCompression, boolean hasHeader, String delimiter, Class[] types, boolean safe, String charsetName )
582        {
583        this( fields, sinkCompression, hasHeader, hasHeader, delimiter, true, null, types, safe, charsetName );
584        }
585    
586      /**
587       * Constructor TextDelimited creates a new TextDelimited instance.
588       *
589       * @param fields          of type Fields
590       * @param sinkCompression of type Compress
591       * @param skipHeader      of type boolean
592       * @param writeHeader     of type boolean
593       * @param delimiter       of type String
594       * @param types           of type Class[]
595       * @param safe            of type boolean
596       */
597      @ConstructorProperties({"fields", "sinkCompression", "skipHeader", "writeHeader", "delimiter", "types", "safe"})
598      public TextDelimited( Fields fields, Compress sinkCompression, boolean skipHeader, boolean writeHeader, String delimiter, Class[] types, boolean safe )
599        {
600        this( fields, sinkCompression, skipHeader, writeHeader, delimiter, null, types, safe );
601        }
602    
603      /**
604       * Constructor TextDelimited creates a new TextDelimited instance.
605       *
606       * @param fields    of type Fields
607       * @param delimiter of type String
608       * @param quote     of type String
609       */
610      @ConstructorProperties({"fields", "delimiter", "quote"})
611      public TextDelimited( Fields fields, String delimiter, String quote )
612        {
613        this( fields, null, delimiter, quote );
614        }
615    
616      /**
617       * Constructor TextDelimited creates a new TextDelimited instance.
618       *
619       * @param fields    of type Fields
620       * @param hasHeader of type boolean
621       * @param delimiter of type String
622       * @param quote     of type String
623       */
624      @ConstructorProperties({"fields", "hasHeader", "delimiter", "quote"})
625      public TextDelimited( Fields fields, boolean hasHeader, String delimiter, String quote )
626        {
627        this( fields, null, hasHeader, hasHeader, delimiter, quote );
628        }
629    
630      /**
631       * Constructor TextDelimited creates a new TextDelimited instance.
632       *
633       * @param fields      of type Fields
634       * @param skipHeader  of type boolean
635       * @param writeHeader of type boolean
636       * @param delimiter   of type String
637       * @param quote       of type String
638       */
639      @ConstructorProperties({"fields", "skipHeader", "writeHeader", "delimiter", "quote"})
640      public TextDelimited( Fields fields, boolean skipHeader, boolean writeHeader, String delimiter, String quote )
641        {
642        this( fields, null, skipHeader, writeHeader, delimiter, quote );
643        }
644    
645      /**
646       * Constructor TextDelimited creates a new TextDelimited instance.
647       *
648       * @param fields          of type Fields
649       * @param sinkCompression of type Compress
650       * @param delimiter       of type String
651       * @param quote           of type String
652       */
653      @ConstructorProperties({"fields", "sinkCompression", "delimiter", "quote"})
654      public TextDelimited( Fields fields, Compress sinkCompression, String delimiter, String quote )
655        {
656        this( fields, sinkCompression, false, false, delimiter, true, quote, null, true );
657        }
658    
659      /**
660       * Constructor TextDelimited creates a new TextDelimited instance.
661       *
662       * @param fields          of type Fields
663       * @param sinkCompression of type Compress
664       * @param hasHeader       of type boolean
665       * @param delimiter       of type String
666       * @param quote           of type String
667       */
668      @ConstructorProperties({"fields", "sinkCompression", "hasHeader", "delimiter", "quote"})
669      public TextDelimited( Fields fields, Compress sinkCompression, boolean hasHeader, String delimiter, String quote )
670        {
671        this( fields, sinkCompression, hasHeader, hasHeader, delimiter, true, quote, null, true );
672        }
673    
674      /**
675       * Constructor TextDelimited creates a new TextDelimited instance.
676       *
677       * @param fields          of type Fields
678       * @param sinkCompression of type Compress
679       * @param hasHeader       of type boolean
680       * @param delimiter       of type String
681       * @param quote           of type String
682       * @param charsetName     of type String
683       */
684      @ConstructorProperties({"fields", "sinkCompression", "hasHeader", "delimiter", "quote", "charsetName"})
685      public TextDelimited( Fields fields, Compress sinkCompression, boolean hasHeader, String delimiter, String quote, String charsetName )
686        {
687        this( fields, sinkCompression, hasHeader, hasHeader, delimiter, true, quote, null, true, charsetName );
688        }
689    
690      /**
691       * Constructor TextDelimited creates a new TextDelimited instance.
692       *
693       * @param fields          of type Fields
694       * @param sinkCompression of type Compress
695       * @param skipHeader      of type boolean
696       * @param writeHeader     of type boolean
697       * @param delimiter       of type String
698       * @param quote           of type String
699       */
700      @ConstructorProperties({"fields", "sinkCompression", "skipHeader", "writeHeader", "delimiter", "quote"})
701      public TextDelimited( Fields fields, Compress sinkCompression, boolean skipHeader, boolean writeHeader, String delimiter, String quote )
702        {
703        this( fields, sinkCompression, skipHeader, writeHeader, delimiter, true, quote, null, true );
704        }
705    
706      /**
707       * Constructor TextDelimited creates a new TextDelimited instance.
708       *
709       * @param fields          of type Fields
710       * @param sinkCompression of type Compress
711       * @param delimiter       of type String
712       * @param quote           of type String
713       * @param types           of type Class[]
714       */
715      @ConstructorProperties({"fields", "sinkCompression", "delimiter", "quote", "types"})
716      public TextDelimited( Fields fields, Compress sinkCompression, String delimiter, String quote, Class[] types )
717        {
718        this( fields, sinkCompression, false, false, delimiter, true, quote, types, true );
719        }
720    
721      /**
722       * Constructor TextDelimited creates a new TextDelimited instance.
723       *
724       * @param fields          of type Fields
725       * @param sinkCompression of type Compress
726       * @param hasHeader       of type boolean
727       * @param delimiter       of type String
728       * @param quote           of type String
729       * @param types           of type Class[]
730       */
731      @ConstructorProperties({"fields", "sinkCompression", "hasHeader", "delimiter", "quote", "types"})
732      public TextDelimited( Fields fields, Compress sinkCompression, boolean hasHeader, String delimiter, String quote, Class[] types )
733        {
734        this( fields, sinkCompression, hasHeader, hasHeader, delimiter, true, quote, types, true );
735        }
736    
737      /**
738       * Constructor TextDelimited creates a new TextDelimited instance.
739       *
740       * @param fields          of type Fields
741       * @param sinkCompression of type Compress
742       * @param skipHeader      of type boolean
743       * @param writeHeader     of type boolean
744       * @param delimiter       of type String
745       * @param quote           of type String
746       * @param types           of type Class[]
747       */
748      @ConstructorProperties({"fields", "sinkCompression", "skipHeader", "writeHeader", "delimiter", "quote", "types"})
749      public TextDelimited( Fields fields, Compress sinkCompression, boolean skipHeader, boolean writeHeader, String delimiter, String quote, Class[] types )
750        {
751        this( fields, sinkCompression, skipHeader, writeHeader, delimiter, true, quote, types, true );
752        }
753    
754      /**
755       * Constructor TextDelimited creates a new TextDelimited instance.
756       *
757       * @param fields          of type Fields
758       * @param sinkCompression of type Compress
759       * @param delimiter       of type String
760       * @param quote           of type String
761       * @param types           of type Class[]
762       * @param safe            of type boolean
763       */
764      @ConstructorProperties({"fields", "sinkCompression", "delimiter", "quote", "types", "safe"})
765      public TextDelimited( Fields fields, Compress sinkCompression, String delimiter, String quote, Class[] types, boolean safe )
766        {
767        this( fields, sinkCompression, false, false, delimiter, true, quote, types, safe );
768        }
769    
770      /**
771       * Constructor TextDelimited creates a new TextDelimited instance.
772       *
773       * @param fields          of type Fields
774       * @param sinkCompression of type Compress
775       * @param hasHeader       of type boolean
776       * @param delimiter       of type String
777       * @param quote           of type String
778       * @param types           of type Class[]
779       * @param safe            of type boolean
780       */
781      @ConstructorProperties({"fields", "sinkCompression", "hasHeader", "delimiter", "quote", "types", "safe"})
782      public TextDelimited( Fields fields, Compress sinkCompression, boolean hasHeader, String delimiter, String quote, Class[] types, boolean safe )
783        {
784        this( fields, sinkCompression, hasHeader, hasHeader, delimiter, true, quote, types, safe );
785        }
786    
787      /**
788       * Constructor TextDelimited creates a new TextDelimited instance.
789       *
790       * @param fields          of type Fields
791       * @param sinkCompression of type Compress
792       * @param skipHeader      of type boolean
793       * @param writeHeader     of type boolean
794       * @param delimiter       of type String
795       * @param quote           of type String
796       * @param types           of type Class[]
797       * @param safe            of type boolean
798       */
799      @ConstructorProperties({"fields", "sinkCompression", "skipHeader", "writeHeader", "delimiter", "quote", "types",
800                              "safe"})
801      public TextDelimited( Fields fields, Compress sinkCompression, boolean skipHeader, boolean writeHeader, String delimiter, String quote, Class[] types, boolean safe )
802        {
803        this( fields, sinkCompression, skipHeader, writeHeader, delimiter, true, quote, types, safe );
804        }
805    
806      /**
807       * Constructor TextDelimited creates a new TextDelimited instance.
808       *
809       * @param fields          of type Fields
810       * @param sinkCompression of type Compress
811       * @param skipHeader      of type boolean
812       * @param delimiter       of type String
813       * @param strict          of type boolean
814       * @param quote           of type String
815       * @param types           of type Class[]
816       * @param safe            of type boolean
817       */
818      @ConstructorProperties({"fields", "sinkCompression", "skipHeader", "writeHeader", "delimiter", "strict", "quote",
819                              "types", "safe"})
820      public TextDelimited( Fields fields, Compress sinkCompression, boolean skipHeader, boolean writeHeader, String delimiter, boolean strict, String quote, Class[] types, boolean safe )
821        {
822        this( fields, sinkCompression, skipHeader, writeHeader, delimiter, strict, quote, types, safe, DEFAULT_CHARSET );
823        }
824    
825      /**
826       * Constructor TextDelimited creates a new TextDelimited instance.
827       *
828       * @param fields          of type Fields
829       * @param sinkCompression of type Compress
830       * @param skipHeader      of type boolean
831       * @param delimiter       of type String
832       * @param strict          of type boolean
833       * @param quote           of type String
834       * @param types           of type Class[]
835       * @param safe            of type boolean
836       * @param charsetName     of type String
837       */
838      @ConstructorProperties({"fields", "sinkCompression", "skipHeader", "writeHeader", "delimiter", "strict", "quote",
839                              "types", "safe", "charsetName"})
840      public TextDelimited( Fields fields, Compress sinkCompression, boolean skipHeader, boolean writeHeader, String delimiter, boolean strict, String quote, Class[] types, boolean safe, String charsetName )
841        {
842        this( fields, sinkCompression, skipHeader, writeHeader, charsetName, new DelimitedParser( delimiter, quote, types, strict, safe ) );
843        }
844    
845      /**
846       * Constructor TextDelimited creates a new TextDelimited instance.
847       *
848       * @param fields          of type Fields
849       * @param writeHeader     of type boolean
850       * @param delimitedParser of type DelimitedParser
851       */
852      @ConstructorProperties({"fields", "skipHeader", "writeHeader", "delimitedParser"})
853      public TextDelimited( Fields fields, boolean skipHeader, boolean writeHeader, DelimitedParser delimitedParser )
854        {
855        this( fields, null, skipHeader, writeHeader, null, delimitedParser );
856        }
857    
858      /**
859       * Constructor TextDelimited creates a new TextDelimited instance.
860       *
861       * @param fields          of type Fields
862       * @param hasHeader       of type boolean
863       * @param delimitedParser of type DelimitedParser
864       */
865      @ConstructorProperties({"fields", "hasHeader", "delimitedParser"})
866      public TextDelimited( Fields fields, boolean hasHeader, DelimitedParser delimitedParser )
867        {
868        this( fields, null, hasHeader, hasHeader, null, delimitedParser );
869        }
870    
871      /**
872       * Constructor TextDelimited creates a new TextDelimited instance.
873       *
874       * @param fields          of type Fields
875       * @param writeHeader     of type boolean
876       * @param delimitedParser of type DelimitedParser
877       */
878      @ConstructorProperties({"fields", "sinkCompression", "skipHeader", "writeHeader", "delimitedParser"})
879      public TextDelimited( Fields fields, Compress sinkCompression, boolean skipHeader, boolean writeHeader, DelimitedParser delimitedParser )
880        {
881        this( fields, sinkCompression, skipHeader, writeHeader, null, delimitedParser );
882        }
883    
884      /**
885       * Constructor TextDelimited creates a new TextDelimited instance.
886       *
887       * @param fields          of type Fields
888       * @param sinkCompression of type Compress
889       * @param skipHeader      of type boolean
890       * @param writeHeader     of type boolean
891       * @param charsetName     of type String
892       * @param delimitedParser of type DelimitedParser
893       */
894      @ConstructorProperties({"fields", "sinkCompression", "skipHeader", "writeHeader", "charsetName", "delimitedParser"})
895      public TextDelimited( Fields fields, Compress sinkCompression, boolean skipHeader, boolean writeHeader, String charsetName, DelimitedParser delimitedParser )
896        {
897        super( sinkCompression );
898    
899        this.delimitedParser = delimitedParser;
900    
901        // normalizes ALL and UNKNOWN
902        setSinkFields( fields );
903        setSourceFields( fields );
904    
905        this.skipHeader = skipHeader;
906        this.writeHeader = writeHeader;
907    
908        // throws an exception if not found
909        setCharsetName( charsetName );
910        }
911    
912      /**
913       * Method getDelimiter returns the delimiter used to parse fields from the current line of text.
914       *
915       * @return a String
916       */
917      @Property(name = "delimiter", visibility = Visibility.PUBLIC)
918      @PropertyDescription("The delimiter used to separate fields.")
919      public String getDelimiter()
920        {
921        return delimitedParser.getDelimiter();
922        }
923    
924      /**
925       * Method getQuote returns the quote string, if any, used to encapsulate each field in a line to delimited text.
926       *
927       * @return a String
928       */
929      @Property(name = "quote", visibility = Visibility.PUBLIC)
930      @PropertyDescription("The string used for quoting.")
931      public String getQuote()
932        {
933        return delimitedParser.getQuote();
934        }
935    
936      @Override
937      public boolean isSymmetrical()
938        {
939        return super.isSymmetrical() && skipHeader == writeHeader;
940        }
941    
942      @Override
943      public void setSinkFields( Fields sinkFields )
944        {
945        super.setSourceFields( sinkFields );
946        super.setSinkFields( sinkFields );
947    
948        if( delimitedParser != null )
949          delimitedParser.reset( getSourceFields(), getSinkFields() );
950        }
951    
952      @Override
953      public void setSourceFields( Fields sourceFields )
954        {
955        super.setSourceFields( sourceFields );
956        super.setSinkFields( sourceFields );
957    
958        if( delimitedParser != null )
959          delimitedParser.reset( getSourceFields(), getSinkFields() );
960        }
961    
962      @Override
963      public Fields retrieveSourceFields( FlowProcess<JobConf> flowProcess, Tap tap )
964        {
965        if( !skipHeader || !getSourceFields().isUnknown() )
966          return getSourceFields();
967    
968        // no need to open them all
969        if( tap instanceof CompositeTap )
970          tap = (Tap) ( (CompositeTap) tap ).getChildTaps().next();
971    
972        // should revert to file:// (Lfs) if tap is Lfs
973        tap = new Hfs( new TextLine( new Fields( "line" ), charsetName ), tap.getFullIdentifier( flowProcess.getConfigCopy() ) );
974    
975        setSourceFields( delimitedParser.parseFirstLine( flowProcess, tap ) );
976    
977        return getSourceFields();
978        }
979    
980      @Override
981      public void presentSourceFields( FlowProcess<JobConf> flowProcess, Tap tap, Fields fields )
982        {
983        presentSourceFieldsInternal( fields );
984        }
985    
986      @Override
987      public void presentSinkFields( FlowProcess<JobConf> flowProcess, Tap tap, Fields fields )
988        {
989        presentSinkFieldsInternal( fields );
990        }
991    
992      @Override
993      public void sourcePrepare( FlowProcess<JobConf> flowProcess, SourceCall<Object[], RecordReader> sourceCall )
994        {
995        super.sourcePrepare( flowProcess, sourceCall );
996    
997        sourceCall.getIncomingEntry().setTuple( TupleViews.createObjectArray() );
998        }
999    
1000      @Override
1001      public boolean source( FlowProcess<JobConf> flowProcess, SourceCall<Object[], RecordReader> sourceCall ) throws IOException
1002        {
1003        Object[] context = sourceCall.getContext();
1004    
1005        if( !sourceCall.getInput().next( context[ 0 ], context[ 1 ] ) )
1006          return false;
1007    
1008        if( skipHeader && ( (LongWritable) context[ 0 ] ).get() == 0 )
1009          {
1010          if( !sourceCall.getInput().next( context[ 0 ], context[ 1 ] ) )
1011            return false;
1012          }
1013    
1014        // delegate coercion to delimitedParser for robustness
1015        Object[] split = delimitedParser.parseLine( makeEncodedString( context ) );
1016        Tuple tuple = sourceCall.getIncomingEntry().getTuple();
1017    
1018        TupleViews.reset( tuple, split );
1019    
1020        return true;
1021        }
1022    
1023      @Override
1024      public void sinkPrepare( FlowProcess<JobConf> flowProcess, SinkCall<Object[], OutputCollector> sinkCall ) throws IOException
1025        {
1026        sinkCall.setContext( new Object[ 3 ] );
1027    
1028        sinkCall.getContext()[ 0 ] = new Text();
1029        sinkCall.getContext()[ 1 ] = new StringBuilder( 4 * 1024 );
1030        sinkCall.getContext()[ 2 ] = Charset.forName( charsetName );
1031    
1032        if( writeHeader )
1033          writeHeader( sinkCall );
1034        }
1035    
1036      protected void writeHeader( SinkCall<Object[], OutputCollector> sinkCall ) throws IOException
1037        {
1038        Fields fields = sinkCall.getOutgoingEntry().getFields();
1039    
1040        Text text = (Text) sinkCall.getContext()[ 0 ];
1041        StringBuilder line = (StringBuilder) sinkCall.getContext()[ 1 ];
1042        Charset charset = (Charset) sinkCall.getContext()[ 2 ];
1043    
1044        line = (StringBuilder) delimitedParser.joinFirstLine( fields, line );
1045    
1046        text.set( line.toString().getBytes( charset ) );
1047    
1048        sinkCall.getOutput().collect( null, text );
1049    
1050        line.setLength( 0 );
1051        }
1052    
1053      @Override
1054      public void sink( FlowProcess<JobConf> flowProcess, SinkCall<Object[], OutputCollector> sinkCall ) throws IOException
1055        {
1056        TupleEntry tupleEntry = sinkCall.getOutgoingEntry();
1057    
1058        Text text = (Text) sinkCall.getContext()[ 0 ];
1059        StringBuilder line = (StringBuilder) sinkCall.getContext()[ 1 ];
1060        Charset charset = (Charset) sinkCall.getContext()[ 2 ];
1061    
1062        Iterable<String> strings = tupleEntry.asIterableOf( String.class );
1063    
1064        line = (StringBuilder) delimitedParser.joinLine( strings, line );
1065    
1066        text.set( line.toString().getBytes( charset ) );
1067    
1068        sinkCall.getOutput().collect( null, text );
1069    
1070        line.setLength( 0 );
1071        }
1072      }
1073