001/*
002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.scheme.local;
022
023import java.beans.ConstructorProperties;
024import java.io.IOException;
025import java.io.InputStream;
026import java.io.InputStreamReader;
027import java.io.LineNumberReader;
028import java.io.OutputStream;
029import java.io.OutputStreamWriter;
030import java.io.PrintWriter;
031import java.io.UnsupportedEncodingException;
032import java.nio.charset.Charset;
033import java.util.Properties;
034
035import cascading.flow.FlowProcess;
036import cascading.management.annotation.Property;
037import cascading.management.annotation.PropertyDescription;
038import cascading.management.annotation.Visibility;
039import cascading.scheme.Scheme;
040import cascading.scheme.SinkCall;
041import cascading.scheme.SourceCall;
042import cascading.scheme.util.DelimitedParser;
043import cascading.tap.CompositeTap;
044import cascading.tap.Tap;
045import cascading.tap.TapException;
046import cascading.tap.local.FileTap;
047import cascading.tuple.Fields;
048import cascading.tuple.Tuple;
049import cascading.tuple.TupleEntry;
050import cascading.tuple.util.TupleViews;
051
052/**
053 * Class TextDelimited provides direct support for delimited text files, like
054 * TAB (\t) or COMMA (,) delimited files. It also optionally allows for quoted values.
055 * <p/>
056 * TextDelimited may also be used to skip the "header" in a file, where the header is defined as the very first line
057 * in every input file. That is, if the byte offset of the current line from the input is zero (0), that line will
058 * be skipped.
059 * <p/>
060 * It is assumed if sink/source {@code fields} is set to either {@link Fields#ALL} or {@link Fields#UNKNOWN} and
061 * {@code skipHeader} or {@code hasHeader} is {@code true}, the field names will be retrieved from the header of the
062 * file and used during planning. The header will parsed with the same rules as the body of the file.
063 * <p/>
064 * By default headers are not skipped.
065 * <p/>
066 * TextDelimited may also be used to write a "header" in a file. The fields names for the header are taken directly
067 * from the declared fields. Or if the declared fields are {@link Fields#ALL} or {@link Fields#UNKNOWN}, the
068 * resolved field names will be used, if any.
069 * <p/>
070 * By default headers are not written.
071 * <p/>
072 * If {@code hasHeaders} is set to {@code true} on a constructor, both {@code skipHeader} and {@code writeHeader} will
073 * be set to {@code true}.
074 * <p/>
075 * By default this {@link cascading.scheme.Scheme} is both {@code strict} and {@code safe}.
076 * <p/>
077 * Strict meaning if a line of text does not parse into the expected number of fields, this class will throw a
078 * {@link TapException}. If strict is {@code false}, then {@link Tuple} will be returned with {@code null} values
079 * for the missing fields.
080 * <p/>
081 * Safe meaning if a field cannot be coerced into an expected type, a {@code null} will be used for the value.
082 * If safe is {@code false}, a {@link TapException} will be thrown.
083 * <p/>
084 * Also by default, {@code quote} strings are not searched for to improve processing speed. If a file is
085 * COMMA delimited but may have COMMA's in a value, the whole value should be surrounded by the quote string, typically
086 * double quotes ({@literal "}).
087 * <p/>
088 * Note all empty fields in a line will be returned as {@code null} unless coerced into a new type.
089 * <p/>
090 * This Scheme may source/sink {@link Fields#ALL}, when given on the constructor the new instance will automatically
091 * default to strict == false as the number of fields parsed are arbitrary or unknown. A type array may not be given
092 * either, so all values will be returned as Strings.
093 * <p/>
094 * By default, all text is encoded/decoded as UTF-8. This can be changed via the {@code charsetName} constructor
095 * argument.
096 * <p/>
097 * To override field and line parsing behaviors, sub-class {@link DelimitedParser} or provide a
098 * {@link cascading.scheme.util.FieldTypeResolver} implementation.
099 * <p/>
100 * Note that there should be no expectation that TextDelimited, or specifically {@link DelimitedParser}, can handle
101 * all delimited and quoted combinations reliably. Attempting to do so would impair its performance and maintainability.
102 * <p/>
103 * Further, it can be safely said any corrupted files will not be supported for obvious reasons. Corrupted files may
104 * result in exceptions or could cause edge cases in the underlying java regular expression engine.
105 * <p/>
106 * A large part of Cascading was designed to help users cleans data. Thus the recommendation is to create Flows that
107 * are responsible for cleansing large data-sets when faced with the problem
108 * <p/>
109 * DelimitedParser maybe sub-classed and extended if necessary.
110 *
111 * @see TextLine
112 */
113public class TextDelimited extends Scheme<Properties, InputStream, OutputStream, LineNumberReader, PrintWriter>
114  {
115  public static final String DEFAULT_CHARSET = "UTF-8";
116
117  private final boolean skipHeader;
118  private final boolean writeHeader;
119  private final DelimitedParser delimitedParser;
120  private String charsetName = DEFAULT_CHARSET;
121
122  /**
123   * Constructor TextDelimited creates a new TextDelimited instance sourcing {@link Fields#UNKNOWN}, sinking
124   * {@link Fields#ALL} and using TAB as the default delimiter.
125   * <p/>
126   * Use this constructor if the source and sink fields will be resolved during planning, for example, when using
127   * with a {@link cascading.pipe.Checkpoint} Tap.
128   */
129  public TextDelimited()
130    {
131    this( Fields.ALL );
132    }
133
134  /**
135   * Constructor TextDelimited creates a new TextDelimited instance sourcing {@link Fields#UNKNOWN}, sinking
136   * {@link Fields#ALL} and using TAB as the default delimiter.
137   * <p/>
138   * Use this constructor if the source and sink fields will be resolved during planning, for example, when using
139   * with a {@link cascading.pipe.Checkpoint} Tap.
140   *
141   * @param hasHeader
142   * @param delimiter
143   */
144  @ConstructorProperties({"hasHeader", "delimiter"})
145  public TextDelimited( boolean hasHeader, String delimiter )
146    {
147    this( Fields.ALL, hasHeader, delimiter, null, (Class[]) null );
148    }
149
150  /**
151   * Constructor TextDelimited creates a new TextDelimited instance sourcing {@link Fields#UNKNOWN}, sinking
152   * {@link Fields#ALL} and using TAB as the default delimiter.
153   * <p/>
154   * Use this constructor if the source and sink fields will be resolved during planning, for example, when using
155   * with a {@link cascading.pipe.Checkpoint} Tap.
156   *
157   * @param hasHeader
158   * @param delimiter
159   * @param quote
160   */
161  @ConstructorProperties({"hasHeader", "delimiter", "quote"})
162  public TextDelimited( boolean hasHeader, String delimiter, String quote )
163    {
164    this( Fields.ALL, hasHeader, delimiter, quote, (Class[]) null );
165    }
166
167  /**
168   * Constructor TextDelimited creates a new TextDelimited instance sourcing {@link Fields#UNKNOWN}, sinking
169   * {@link Fields#ALL} and using the given delimitedParser instance for parsing.
170   * <p/>
171   * Use this constructor if the source and sink fields will be resolved during planning, for example, when using
172   * with a {@link cascading.pipe.Checkpoint} Tap.
173   *
174   * @param hasHeader
175   * @param delimitedParser
176   */
177  @ConstructorProperties({"hasHeader", "delimitedParser"})
178  public TextDelimited( boolean hasHeader, DelimitedParser delimitedParser )
179    {
180    this( Fields.ALL, hasHeader, hasHeader, delimitedParser );
181    }
182
183  /**
184   * Constructor TextDelimited creates a new TextDelimited instance sourcing {@link Fields#UNKNOWN}, sinking
185   * {@link Fields#ALL} and using the given delimitedParser instance for parsing.
186   * <p/>
187   * Use this constructor if the source and sink fields will be resolved during planning, for example, when using
188   * with a {@link cascading.pipe.Checkpoint} Tap.
189   * <p/>
190   * This constructor will set {@code skipHeader} and {@code writeHeader} values to true.
191   *
192   * @param delimitedParser
193   */
194  @ConstructorProperties({"delimitedParser"})
195  public TextDelimited( DelimitedParser delimitedParser )
196    {
197    this( Fields.ALL, true, true, delimitedParser );
198    }
199
200  /**
201   * Constructor TextDelimited creates a new TextDelimited instance with TAB as the default delimiter.
202   *
203   * @param fields of type Fields
204   */
205  @ConstructorProperties({"fields"})
206  public TextDelimited( Fields fields )
207    {
208    this( fields, "\t", null, null );
209    }
210
211  /**
212   * Constructor TextDelimited creates a new TextDelimited instance.
213   *
214   * @param fields    of type Fields
215   * @param delimiter of type String
216   */
217  @ConstructorProperties({"fields", "delimiter"})
218  public TextDelimited( Fields fields, String delimiter )
219    {
220    this( fields, delimiter, null, null );
221    }
222
223  /**
224   * Constructor TextDelimited creates a new TextDelimited instance.
225   *
226   * @param fields    of type Fields
227   * @param hasHeader of type boolean
228   * @param delimiter of type String
229   */
230  @ConstructorProperties({"fields", "hasHeader", "delimiter"})
231  public TextDelimited( Fields fields, boolean hasHeader, String delimiter )
232    {
233    this( fields, hasHeader, hasHeader, delimiter, null, null );
234    }
235
236  /**
237   * Constructor TextDelimited creates a new TextDelimited instance.
238   *
239   * @param fields     of type Fields
240   * @param skipHeader of type boolean
241   * @param delimiter  of type String
242   */
243  @ConstructorProperties({"fields", "skipHeader", "writeHeader", "delimiter"})
244  public TextDelimited( Fields fields, boolean skipHeader, boolean writeHeader, String delimiter )
245    {
246    this( fields, skipHeader, writeHeader, delimiter, null, null );
247    }
248
249  /**
250   * Constructor TextDelimited creates a new TextDelimited instance.
251   *
252   * @param fields    of type Fields
253   * @param delimiter of type String
254   * @param types     of type Class[]
255   */
256  @ConstructorProperties({"fields", "delimiter", "types"})
257  public TextDelimited( Fields fields, String delimiter, Class[] types )
258    {
259    this( fields, delimiter, null, types );
260    }
261
262  /**
263   * Constructor TextDelimited creates a new TextDelimited instance.
264   *
265   * @param fields    of type Fields
266   * @param hasHeader of type boolean
267   * @param delimiter of type String
268   * @param types     of type Class[]
269   */
270  @ConstructorProperties({"fields", "hasHeader", "delimiter", "types"})
271  public TextDelimited( Fields fields, boolean hasHeader, String delimiter, Class[] types )
272    {
273    this( fields, hasHeader, hasHeader, delimiter, null, types );
274    }
275
276  /**
277   * Constructor TextDelimited creates a new TextDelimited instance.
278   *
279   * @param fields      of type Fields
280   * @param skipHeader  of type boolean
281   * @param writeHeader of type boolean
282   * @param delimiter   of type String
283   * @param types       of type Class[]
284   */
285  @ConstructorProperties({"fields", "skipHeader", "writeHeader", "delimiter", "types"})
286  public TextDelimited( Fields fields, boolean skipHeader, boolean writeHeader, String delimiter, Class[] types )
287    {
288    this( fields, skipHeader, writeHeader, delimiter, null, types );
289    }
290
291  /**
292   * Constructor TextDelimited creates a new TextDelimited instance.
293   *
294   * @param fields    of type Fields
295   * @param delimiter of type String
296   * @param quote     of type String
297   * @param types     of type Class[]
298   */
299  @ConstructorProperties({"fields", "delimiter", "quote", "types"})
300  public TextDelimited( Fields fields, String delimiter, String quote, Class[] types )
301    {
302    this( fields, false, delimiter, quote, types );
303    }
304
305  /**
306   * Constructor TextDelimited creates a new TextDelimited instance.
307   *
308   * @param fields    of type Fields
309   * @param hasHeader of type boolean
310   * @param delimiter of type String
311   * @param quote     of type String
312   * @param types     of type Class[]
313   */
314  @ConstructorProperties({"fields", "hasHeader", "delimiter", "quote", "types"})
315  public TextDelimited( Fields fields, boolean hasHeader, String delimiter, String quote, Class[] types )
316    {
317    this( fields, hasHeader, hasHeader, delimiter, quote, types, true );
318    }
319
320  /**
321   * Constructor TextDelimited creates a new TextDelimited instance.
322   *
323   * @param fields      of type Fields
324   * @param skipHeader  of type boolean
325   * @param writeHeader of type boolean
326   * @param delimiter   of type String
327   * @param quote       of type String
328   * @param types       of type Class[]
329   */
330  @ConstructorProperties({"fields", "skipHeader", "writeHeader", "delimiter", "quote", "types"})
331  public TextDelimited( Fields fields, boolean skipHeader, boolean writeHeader, String delimiter, String quote, Class[] types )
332    {
333    this( fields, skipHeader, writeHeader, delimiter, quote, types, true );
334    }
335
336  /**
337   * Constructor TextDelimited creates a new TextDelimited instance.
338   *
339   * @param fields    of type Fields
340   * @param delimiter of type String
341   * @param quote     of type String
342   * @param types     of type Class[]
343   * @param safe      of type boolean
344   */
345  @ConstructorProperties({"fields", "delimiter", "quote", "types", "safe"})
346  public TextDelimited( Fields fields, String delimiter, String quote, Class[] types, boolean safe )
347    {
348    this( fields, false, delimiter, quote, types, safe );
349    }
350
351  /**
352   * Constructor TextDelimited creates a new TextDelimited instance.
353   *
354   * @param fields    of type Fields
355   * @param hasHeader of type boolean
356   * @param delimiter of type String
357   * @param quote     of type String
358   * @param types     of type Class[]
359   * @param safe      of type boolean
360   */
361  @ConstructorProperties({"fields", "hasHeader", "delimiter", "quote", "types", "safe"})
362  public TextDelimited( Fields fields, boolean hasHeader, String delimiter, String quote, Class[] types, boolean safe )
363    {
364    this( fields, hasHeader, hasHeader, delimiter, true, quote, types, safe );
365    }
366
367  /**
368   * Constructor TextDelimited creates a new TextDelimited instance.
369   *
370   * @param fields      of type Fields
371   * @param hasHeader   of type boolean
372   * @param delimiter   of type String
373   * @param quote       of type String
374   * @param types       of type Class[]
375   * @param safe        of type boolean
376   * @param charsetName of type String
377   */
378  @ConstructorProperties({"fields", "hasHeader", "delimiter", "quote", "types", "safe", "charsetName"})
379  public TextDelimited( Fields fields, boolean hasHeader, String delimiter, String quote, Class[] types, boolean safe, String charsetName )
380    {
381    this( fields, hasHeader, hasHeader, delimiter, true, quote, types, safe, charsetName );
382    }
383
384  /**
385   * Constructor TextDelimited creates a new TextDelimited instance.
386   *
387   * @param fields      of type Fields
388   * @param skipHeader  of type boolean
389   * @param writeHeader of type boolean
390   * @param delimiter   of type String
391   * @param quote       of type String
392   * @param types       of type Class[]
393   * @param safe        of type boolean
394   */
395  @ConstructorProperties({"fields", "skipHeader", "writeHeader", "delimiter", "quote", "types", "safe"})
396  public TextDelimited( Fields fields, boolean skipHeader, boolean writeHeader, String delimiter, String quote, Class[] types, boolean safe )
397    {
398    this( fields, skipHeader, writeHeader, delimiter, true, quote, types, safe );
399    }
400
401  /**
402   * Constructor TextDelimited creates a new TextDelimited instance.
403   *
404   * @param fields    of type Fields
405   * @param delimiter of type String
406   * @param quote     of type String
407   */
408  @ConstructorProperties({"fields", "delimiter", "quote"})
409  public TextDelimited( Fields fields, String delimiter, String quote )
410    {
411    this( fields, false, delimiter, quote, null, true );
412    }
413
414  /**
415   * Constructor TextDelimited creates a new TextDelimited instance.
416   *
417   * @param fields    of type Fields
418   * @param hasHeader of type boolean
419   * @param delimiter of type String
420   * @param quote     of type String
421   */
422  @ConstructorProperties({"fields", "hasHeader", "delimiter", "quote"})
423  public TextDelimited( Fields fields, boolean hasHeader, String delimiter, String quote )
424    {
425    this( fields, hasHeader, delimiter, quote, null, true );
426    }
427
428  /**
429   * Constructor TextDelimited creates a new TextDelimited instance.
430   *
431   * @param fields      of type Fields
432   * @param hasHeader   of type boolean
433   * @param delimiter   of type String
434   * @param quote       of type String
435   * @param charsetName of type String
436   */
437  @ConstructorProperties({"fields", "hasHeader", "delimiter", "quote", "charsetName"})
438  public TextDelimited( Fields fields, boolean hasHeader, String delimiter, String quote, String charsetName )
439    {
440    this( fields, hasHeader, delimiter, quote, null, true, charsetName );
441    }
442
443  /**
444   * Constructor TextDelimited creates a new TextDelimited instance.
445   *
446   * @param fields      of type Fields
447   * @param skipHeader  of type boolean
448   * @param writeHeader of type boolean
449   * @param delimiter   of type String
450   * @param strict      of type boolean
451   * @param quote       of type String
452   * @param types       of type Class[]
453   * @param safe        of type boolean
454   */
455  @ConstructorProperties({"fields", "skipHeader", "writeHeader", "delimiter", "strict", "quote", "types", "safe"})
456  public TextDelimited( Fields fields, boolean skipHeader, boolean writeHeader, String delimiter, boolean strict, String quote, Class[] types, boolean safe )
457    {
458    this( fields, skipHeader, writeHeader, delimiter, strict, quote, types, safe, DEFAULT_CHARSET );
459    }
460
461  /**
462   * Constructor TextDelimited creates a new TextDelimited instance.
463   *
464   * @param fields      of type Fields
465   * @param skipHeader  of type boolean
466   * @param writeHeader of type boolean
467   * @param delimiter   of type String
468   * @param strict      of type boolean
469   * @param quote       of type String
470   * @param types       of type Class[]
471   * @param safe        of type boolean
472   * @param charsetName of type String
473   */
474  @ConstructorProperties({"fields", "skipHeader", "writeHeader", "delimiter", "strict", "quote", "types", "safe",
475                          "charsetName"})
476  public TextDelimited( Fields fields, boolean skipHeader, boolean writeHeader, String delimiter, boolean strict, String quote, Class[] types, boolean safe, String charsetName )
477    {
478    this( fields, skipHeader, writeHeader, charsetName, new DelimitedParser( delimiter, quote, types, strict, safe ) );
479    }
480
481  /**
482   * Constructor TextDelimited creates a new TextDelimited instance.
483   *
484   * @param fields          of type Fields
485   * @param writeHeader     of type boolean
486   * @param delimitedParser of type DelimitedParser
487   */
488  @ConstructorProperties({"fields", "skipHeader", "writeHeader", "delimitedParser"})
489  public TextDelimited( Fields fields, boolean skipHeader, boolean writeHeader, DelimitedParser delimitedParser )
490    {
491    this( fields, skipHeader, writeHeader, null, delimitedParser );
492    }
493
494  /**
495   * Constructor TextDelimited creates a new TextDelimited instance.
496   *
497   * @param fields          of type Fields
498   * @param hasHeader       of type boolean
499   * @param delimitedParser of type DelimitedParser
500   */
501  @ConstructorProperties({"fields", "hasHeader", "delimitedParser"})
502  public TextDelimited( Fields fields, boolean hasHeader, DelimitedParser delimitedParser )
503    {
504    this( fields, hasHeader, hasHeader, null, delimitedParser );
505    }
506
507  /**
508   * Constructor TextDelimited creates a new TextDelimited instance.
509   *
510   * @param fields          of type Fields
511   * @param writeHeader     of type boolean
512   * @param charsetName     of type String
513   * @param delimitedParser of type DelimitedParser
514   */
515  @ConstructorProperties({"fields", "skipHeader", "writeHeader", "charsetName", "delimitedParser"})
516  public TextDelimited( Fields fields, boolean skipHeader, boolean writeHeader, String charsetName, DelimitedParser delimitedParser )
517    {
518    super( fields, fields );
519
520    this.delimitedParser = delimitedParser;
521
522    // normalizes ALL and UNKNOWN
523    // calls reset on delimitedParser
524    setSourceFields( fields );
525    setSinkFields( fields );
526
527    this.skipHeader = skipHeader;
528    this.writeHeader = writeHeader;
529
530    if( charsetName != null )
531      this.charsetName = charsetName;
532
533    // throws an exception if not found
534    Charset.forName( this.charsetName );
535    }
536
537  @Property(name = "charset", visibility = Visibility.PUBLIC)
538  @PropertyDescription("character set used.")
539  public String getCharsetName()
540    {
541    return charsetName;
542    }
543
544  /**
545   * Method getDelimiter returns the delimiter used to parse fields from the current line of text.
546   *
547   * @return a String
548   */
549  @Property(name = "delimiter", visibility = Visibility.PUBLIC)
550  @PropertyDescription("The delimiter used to separate fields.")
551  public String getDelimiter()
552    {
553    return delimitedParser.getDelimiter();
554    }
555
556  /**
557   * Method getQuote returns the quote string, if any, used to encapsulate each field in a line to delimited text.
558   *
559   * @return a String
560   */
561  @Property(name = "quote", visibility = Visibility.PUBLIC)
562  @PropertyDescription("The string used for quoting.")
563  public String getQuote()
564    {
565    return delimitedParser.getQuote();
566    }
567
568  public LineNumberReader createInput( InputStream inputStream )
569    {
570    try
571      {
572      return new LineNumberReader( new InputStreamReader( inputStream, charsetName ) );
573      }
574    catch( UnsupportedEncodingException exception )
575      {
576      throw new TapException( exception );
577      }
578    }
579
580  public PrintWriter createOutput( OutputStream outputStream )
581    {
582    try
583      {
584      return new PrintWriter( new OutputStreamWriter( outputStream, charsetName ) );
585      }
586    catch( UnsupportedEncodingException exception )
587      {
588      throw new TapException( exception );
589      }
590    }
591
592  @Override
593  public void setSinkFields( Fields sinkFields )
594    {
595    super.setSourceFields( sinkFields );
596    super.setSinkFields( sinkFields );
597
598    if( delimitedParser != null )
599      delimitedParser.reset( getSourceFields(), getSinkFields() );
600    }
601
602  @Override
603  public void setSourceFields( Fields sourceFields )
604    {
605    super.setSourceFields( sourceFields );
606    super.setSinkFields( sourceFields );
607
608    if( delimitedParser != null )
609      delimitedParser.reset( getSourceFields(), getSinkFields() );
610    }
611
612  @Override
613  public boolean isSymmetrical()
614    {
615    return super.isSymmetrical() && skipHeader == writeHeader;
616    }
617
618  @Override
619  public Fields retrieveSourceFields( FlowProcess<? extends Properties> process, Tap tap )
620    {
621    if( !skipHeader || !getSourceFields().isUnknown() )
622      return getSourceFields();
623
624    // no need to open them all
625    if( tap instanceof CompositeTap )
626      tap = (Tap) ( (CompositeTap) tap ).getChildTaps().next();
627
628    tap = new FileTap( new TextLine( new Fields( "line" ), charsetName ), tap.getIdentifier() );
629
630    setSourceFields( delimitedParser.parseFirstLine( process, tap ) );
631
632    return getSourceFields();
633    }
634
635  @Override
636  public void presentSourceFields( FlowProcess<? extends Properties> process, Tap tap, Fields fields )
637    {
638    // do nothing
639    }
640
641  @Override
642  public void presentSinkFields( FlowProcess<? extends Properties> flowProcess, Tap tap, Fields fields )
643    {
644    if( writeHeader )
645      presentSinkFieldsInternal( fields );
646    }
647
648  @Override
649  public void sourceConfInit( FlowProcess<? extends Properties> flowProcess, Tap<Properties, InputStream, OutputStream> tap, Properties conf )
650    {
651    }
652
653  @Override
654  public void sourcePrepare( FlowProcess<? extends Properties> flowProcess, SourceCall<LineNumberReader, InputStream> sourceCall ) throws IOException
655    {
656    sourceCall.setContext( createInput( sourceCall.getInput() ) );
657
658    sourceCall.getIncomingEntry().setTuple( TupleViews.createObjectArray() );
659    }
660
661  @Override
662  public boolean source( FlowProcess<? extends Properties> flowProcess, SourceCall<LineNumberReader, InputStream> sourceCall ) throws IOException
663    {
664    String line = sourceCall.getContext().readLine();
665
666    if( line == null )
667      return false;
668
669    if( skipHeader && sourceCall.getContext().getLineNumber() == 1 ) // todo: optimize this away
670      line = sourceCall.getContext().readLine();
671
672    if( line == null )
673      return false;
674
675    Object[] split = delimitedParser.parseLine( line );
676
677    // assumption it is better to re-use than to construct new
678    Tuple tuple = sourceCall.getIncomingEntry().getTuple();
679
680    TupleViews.reset( tuple, split );
681
682    return true;
683    }
684
685  @Override
686  public void sourceCleanup( FlowProcess<? extends Properties> flowProcess, SourceCall<LineNumberReader, InputStream> sourceCall ) throws IOException
687    {
688    sourceCall.setContext( null );
689    }
690
691  @Override
692  public void sinkConfInit( FlowProcess<? extends Properties> flowProcess, Tap<Properties, InputStream, OutputStream> tap, Properties conf )
693    {
694    }
695
696  @Override
697  public void sinkPrepare( FlowProcess<? extends Properties> flowProcess, SinkCall<PrintWriter, OutputStream> sinkCall )
698    {
699    sinkCall.setContext( createOutput( sinkCall.getOutput() ) );
700
701    if( writeHeader )
702      {
703      Fields fields = sinkCall.getOutgoingEntry().getFields();
704      delimitedParser.joinFirstLine( fields, sinkCall.getContext() );
705
706      sinkCall.getContext().println();
707      }
708    }
709
710  @Override
711  public void sink( FlowProcess<? extends Properties> flowProcess, SinkCall<PrintWriter, OutputStream> sinkCall ) throws IOException
712    {
713    TupleEntry tupleEntry = sinkCall.getOutgoingEntry();
714
715    Iterable<String> strings = tupleEntry.asIterableOf( String.class );
716
717    delimitedParser.joinLine( strings, sinkCall.getContext() );
718
719    sinkCall.getContext().println();
720    }
721
722  @Override
723  public void sinkCleanup( FlowProcess<? extends Properties> flowProcess, SinkCall<PrintWriter, OutputStream> sinkCall )
724    {
725    sinkCall.getContext().flush();
726    sinkCall.setContext( null );
727    }
728  }