001    /*
002     * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.scheme.local;
022    
023    import java.beans.ConstructorProperties;
024    import java.io.IOException;
025    import java.io.InputStream;
026    import java.io.InputStreamReader;
027    import java.io.LineNumberReader;
028    import java.io.OutputStream;
029    import java.io.OutputStreamWriter;
030    import java.io.PrintWriter;
031    import java.io.UnsupportedEncodingException;
032    import java.nio.charset.Charset;
033    import java.util.Properties;
034    
035    import cascading.flow.FlowProcess;
036    import cascading.management.annotation.Property;
037    import cascading.management.annotation.PropertyDescription;
038    import cascading.management.annotation.Visibility;
039    import cascading.scheme.Scheme;
040    import cascading.scheme.SinkCall;
041    import cascading.scheme.SourceCall;
042    import cascading.scheme.util.DelimitedParser;
043    import cascading.tap.CompositeTap;
044    import cascading.tap.Tap;
045    import cascading.tap.TapException;
046    import cascading.tap.local.FileTap;
047    import cascading.tuple.Fields;
048    import cascading.tuple.Tuple;
049    import cascading.tuple.TupleEntry;
050    import cascading.tuple.util.TupleViews;
051    
052    /**
053     * Class TextDelimited provides direct support for delimited text files, like
054     * TAB (\t) or COMMA (,) delimited files. It also optionally allows for quoted values.
055     * <p/>
056     * TextDelimited may also be used to skip the "header" in a file, where the header is defined as the very first line
057     * in every input file. That is, if the byte offset of the current line from the input is zero (0), that line will
058     * be skipped.
059     * <p/>
060     * It is assumed if sink/source {@code fields} is set to either {@link Fields#ALL} or {@link Fields#UNKNOWN} and
061     * {@code skipHeader} or {@code hasHeader} is {@code true}, the field names will be retrieved from the header of the
062     * file and used during planning. The header will parsed with the same rules as the body of the file.
063     * <p/>
064     * By default headers are not skipped.
065     * <p/>
066     * TextDelimited may also be used to write a "header" in a file. The fields names for the header are taken directly
067     * from the declared fields. Or if the declared fields are {@link Fields#ALL} or {@link Fields#UNKNOWN}, the
068     * resolved field names will be used, if any.
069     * <p/>
070     * By default headers are not written.
071     * <p/>
072     * If {@code hasHeaders} is set to {@code true} on a constructor, both {@code skipHeader} and {@code writeHeader} will
073     * be set to {@code true}.
074     * <p/>
075     * By default this {@link cascading.scheme.Scheme} is both {@code strict} and {@code safe}.
076     * <p/>
077     * Strict meaning if a line of text does not parse into the expected number of fields, this class will throw a
078     * {@link TapException}. If strict is {@code false}, then {@link Tuple} will be returned with {@code null} values
079     * for the missing fields.
080     * <p/>
081     * Safe meaning if a field cannot be coerced into an expected type, a {@code null} will be used for the value.
082     * If safe is {@code false}, a {@link TapException} will be thrown.
083     * <p/>
084     * Also by default, {@code quote} strings are not searched for to improve processing speed. If a file is
085     * COMMA delimited but may have COMMA's in a value, the whole value should be surrounded by the quote string, typically
086     * double quotes ({@literal "}).
087     * <p/>
088     * Note all empty fields in a line will be returned as {@code null} unless coerced into a new type.
089     * <p/>
090     * This Scheme may source/sink {@link Fields#ALL}, when given on the constructor the new instance will automatically
091     * default to strict == false as the number of fields parsed are arbitrary or unknown. A type array may not be given
092     * either, so all values will be returned as Strings.
093     * <p/>
094     * By default, all text is encoded/decoded as UTF-8. This can be changed via the {@code charsetName} constructor
095     * argument.
096     * <p/>
097     * To override field and line parsing behaviors, sub-class {@link DelimitedParser} or provide a
098     * {@link cascading.scheme.util.FieldTypeResolver} implementation.
099     * <p/>
100     * Note that there should be no expectation that TextDelimited, or specifically {@link DelimitedParser}, can handle
101     * all delimited and quoted combinations reliably. Attempting to do so would impair its performance and maintainability.
102     * <p/>
103     * Further, it can be safely said any corrupted files will not be supported for obvious reasons. Corrupted files may
104     * result in exceptions or could cause edge cases in the underlying java regular expression engine.
105     * <p/>
106     * A large part of Cascading was designed to help users cleans data. Thus the recommendation is to create Flows that
107     * are responsible for cleansing large data-sets when faced with the problem
108     * <p/>
109     * DelimitedParser maybe sub-classed and extended if necessary.
110     *
111     * @see TextLine
112     */
113    public class TextDelimited extends Scheme<Properties, InputStream, OutputStream, LineNumberReader, PrintWriter>
114      {
115      public static final String DEFAULT_CHARSET = "UTF-8";
116    
117      private final boolean skipHeader;
118      private final boolean writeHeader;
119      private final DelimitedParser delimitedParser;
120      private String charsetName = DEFAULT_CHARSET;
121    
122      /**
123       * Constructor TextDelimited creates a new TextDelimited instance sourcing {@link Fields#UNKNOWN}, sinking
124       * {@link Fields#ALL} and using TAB as the default delimiter.
125       * <p/>
126       * Use this constructor if the source and sink fields will be resolved during planning, for example, when using
127       * with a {@link cascading.pipe.Checkpoint} Tap.
128       */
129      public TextDelimited()
130        {
131        this( Fields.ALL );
132        }
133    
134      /**
135       * Constructor TextDelimited creates a new TextDelimited instance sourcing {@link Fields#UNKNOWN}, sinking
136       * {@link Fields#ALL} and using TAB as the default delimiter.
137       * <p/>
138       * Use this constructor if the source and sink fields will be resolved during planning, for example, when using
139       * with a {@link cascading.pipe.Checkpoint} Tap.
140       *
141       * @param hasHeader
142       * @param delimiter
143       */
144      @ConstructorProperties({"hasHeader", "delimiter"})
145      public TextDelimited( boolean hasHeader, String delimiter )
146        {
147        this( Fields.ALL, hasHeader, delimiter, null, (Class[]) null );
148        }
149    
150      /**
151       * Constructor TextDelimited creates a new TextDelimited instance sourcing {@link Fields#UNKNOWN}, sinking
152       * {@link Fields#ALL} and using TAB as the default delimiter.
153       * <p/>
154       * Use this constructor if the source and sink fields will be resolved during planning, for example, when using
155       * with a {@link cascading.pipe.Checkpoint} Tap.
156       *
157       * @param hasHeader
158       * @param delimiter
159       * @param quote
160       */
161      @ConstructorProperties({"hasHeader", "delimiter", "quote"})
162      public TextDelimited( boolean hasHeader, String delimiter, String quote )
163        {
164        this( Fields.ALL, hasHeader, delimiter, quote, (Class[]) null );
165        }
166    
167      /**
168       * Constructor TextDelimited creates a new TextDelimited instance sourcing {@link Fields#UNKNOWN}, sinking
169       * {@link Fields#ALL} and using the given delimitedParser instance for parsing.
170       * <p/>
171       * Use this constructor if the source and sink fields will be resolved during planning, for example, when using
172       * with a {@link cascading.pipe.Checkpoint} Tap.
173       *
174       * @param hasHeader
175       * @param delimitedParser
176       */
177      @ConstructorProperties({"hasHeader", "delimitedParser"})
178      public TextDelimited( boolean hasHeader, DelimitedParser delimitedParser )
179        {
180        this( Fields.ALL, hasHeader, hasHeader, delimitedParser );
181        }
182    
183      /**
184       * Constructor TextDelimited creates a new TextDelimited instance sourcing {@link Fields#UNKNOWN}, sinking
185       * {@link Fields#ALL} and using the given delimitedParser instance for parsing.
186       * <p/>
187       * Use this constructor if the source and sink fields will be resolved during planning, for example, when using
188       * with a {@link cascading.pipe.Checkpoint} Tap.
189       * <p/>
190       * This constructor will set {@code skipHeader} and {@code writeHeader} values to true.
191       *
192       * @param delimitedParser
193       */
194      @ConstructorProperties({"delimitedParser"})
195      public TextDelimited( DelimitedParser delimitedParser )
196        {
197        this( Fields.ALL, true, true, delimitedParser );
198        }
199    
200      /**
201       * Constructor TextDelimited creates a new TextDelimited instance with TAB as the default delimiter.
202       *
203       * @param fields of type Fields
204       */
205      @ConstructorProperties({"fields"})
206      public TextDelimited( Fields fields )
207        {
208        this( fields, "\t", null, null );
209        }
210    
211      /**
212       * Constructor TextDelimited creates a new TextDelimited instance.
213       *
214       * @param fields    of type Fields
215       * @param delimiter of type String
216       */
217      @ConstructorProperties({"fields", "delimiter"})
218      public TextDelimited( Fields fields, String delimiter )
219        {
220        this( fields, delimiter, null, null );
221        }
222    
223      /**
224       * Constructor TextDelimited creates a new TextDelimited instance.
225       *
226       * @param fields    of type Fields
227       * @param hasHeader of type boolean
228       * @param delimiter of type String
229       */
230      @ConstructorProperties({"fields", "hasHeader", "delimiter"})
231      public TextDelimited( Fields fields, boolean hasHeader, String delimiter )
232        {
233        this( fields, hasHeader, hasHeader, delimiter, null, null );
234        }
235    
236      /**
237       * Constructor TextDelimited creates a new TextDelimited instance.
238       *
239       * @param fields     of type Fields
240       * @param skipHeader of type boolean
241       * @param delimiter  of type String
242       */
243      @ConstructorProperties({"fields", "skipHeader", "writeHeader", "delimiter"})
244      public TextDelimited( Fields fields, boolean skipHeader, boolean writeHeader, String delimiter )
245        {
246        this( fields, skipHeader, writeHeader, delimiter, null, null );
247        }
248    
249      /**
250       * Constructor TextDelimited creates a new TextDelimited instance.
251       *
252       * @param fields    of type Fields
253       * @param delimiter of type String
254       * @param types     of type Class[]
255       */
256      @ConstructorProperties({"fields", "delimiter", "types"})
257      public TextDelimited( Fields fields, String delimiter, Class[] types )
258        {
259        this( fields, delimiter, null, types );
260        }
261    
262      /**
263       * Constructor TextDelimited creates a new TextDelimited instance.
264       *
265       * @param fields    of type Fields
266       * @param hasHeader of type boolean
267       * @param delimiter of type String
268       * @param types     of type Class[]
269       */
270      @ConstructorProperties({"fields", "hasHeader", "delimiter", "types"})
271      public TextDelimited( Fields fields, boolean hasHeader, String delimiter, Class[] types )
272        {
273        this( fields, hasHeader, hasHeader, delimiter, null, types );
274        }
275    
276      /**
277       * Constructor TextDelimited creates a new TextDelimited instance.
278       *
279       * @param fields      of type Fields
280       * @param skipHeader  of type boolean
281       * @param writeHeader of type boolean
282       * @param delimiter   of type String
283       * @param types       of type Class[]
284       */
285      @ConstructorProperties({"fields", "skipHeader", "writeHeader", "delimiter", "types"})
286      public TextDelimited( Fields fields, boolean skipHeader, boolean writeHeader, String delimiter, Class[] types )
287        {
288        this( fields, skipHeader, writeHeader, delimiter, null, types );
289        }
290    
291      /**
292       * Constructor TextDelimited creates a new TextDelimited instance.
293       *
294       * @param fields    of type Fields
295       * @param delimiter of type String
296       * @param quote     of type String
297       * @param types     of type Class[]
298       */
299      @ConstructorProperties({"fields", "delimiter", "quote", "types"})
300      public TextDelimited( Fields fields, String delimiter, String quote, Class[] types )
301        {
302        this( fields, false, delimiter, quote, types );
303        }
304    
305      /**
306       * Constructor TextDelimited creates a new TextDelimited instance.
307       *
308       * @param fields    of type Fields
309       * @param hasHeader of type boolean
310       * @param delimiter of type String
311       * @param quote     of type String
312       * @param types     of type Class[]
313       */
314      @ConstructorProperties({"fields", "hasHeader", "delimiter", "quote", "types"})
315      public TextDelimited( Fields fields, boolean hasHeader, String delimiter, String quote, Class[] types )
316        {
317        this( fields, hasHeader, hasHeader, delimiter, quote, types, true );
318        }
319    
320      /**
321       * Constructor TextDelimited creates a new TextDelimited instance.
322       *
323       * @param fields      of type Fields
324       * @param skipHeader  of type boolean
325       * @param writeHeader of type boolean
326       * @param delimiter   of type String
327       * @param quote       of type String
328       * @param types       of type Class[]
329       */
330      @ConstructorProperties({"fields", "skipHeader", "writeHeader", "delimiter", "quote", "types"})
331      public TextDelimited( Fields fields, boolean skipHeader, boolean writeHeader, String delimiter, String quote, Class[] types )
332        {
333        this( fields, skipHeader, writeHeader, delimiter, quote, types, true );
334        }
335    
336      /**
337       * Constructor TextDelimited creates a new TextDelimited instance.
338       *
339       * @param fields    of type Fields
340       * @param delimiter of type String
341       * @param quote     of type String
342       * @param types     of type Class[]
343       * @param safe      of type boolean
344       */
345      @ConstructorProperties({"fields", "delimiter", "quote", "types", "safe"})
346      public TextDelimited( Fields fields, String delimiter, String quote, Class[] types, boolean safe )
347        {
348        this( fields, false, delimiter, quote, types, safe );
349        }
350    
351      /**
352       * Constructor TextDelimited creates a new TextDelimited instance.
353       *
354       * @param fields    of type Fields
355       * @param hasHeader of type boolean
356       * @param delimiter of type String
357       * @param quote     of type String
358       * @param types     of type Class[]
359       * @param safe      of type boolean
360       */
361      @ConstructorProperties({"fields", "hasHeader", "delimiter", "quote", "types", "safe"})
362      public TextDelimited( Fields fields, boolean hasHeader, String delimiter, String quote, Class[] types, boolean safe )
363        {
364        this( fields, hasHeader, hasHeader, delimiter, true, quote, types, safe );
365        }
366    
367      /**
368       * Constructor TextDelimited creates a new TextDelimited instance.
369       *
370       * @param fields      of type Fields
371       * @param hasHeader   of type boolean
372       * @param delimiter   of type String
373       * @param quote       of type String
374       * @param types       of type Class[]
375       * @param safe        of type boolean
376       * @param charsetName of type String
377       */
378      @ConstructorProperties({"fields", "hasHeader", "delimiter", "quote", "types", "safe", "charsetName"})
379      public TextDelimited( Fields fields, boolean hasHeader, String delimiter, String quote, Class[] types, boolean safe, String charsetName )
380        {
381        this( fields, hasHeader, hasHeader, delimiter, true, quote, types, safe, charsetName );
382        }
383    
384      /**
385       * Constructor TextDelimited creates a new TextDelimited instance.
386       *
387       * @param fields      of type Fields
388       * @param skipHeader  of type boolean
389       * @param writeHeader of type boolean
390       * @param delimiter   of type String
391       * @param quote       of type String
392       * @param types       of type Class[]
393       * @param safe        of type boolean
394       */
395      @ConstructorProperties({"fields", "skipHeader", "writeHeader", "delimiter", "quote", "types", "safe"})
396      public TextDelimited( Fields fields, boolean skipHeader, boolean writeHeader, String delimiter, String quote, Class[] types, boolean safe )
397        {
398        this( fields, skipHeader, writeHeader, delimiter, true, quote, types, safe );
399        }
400    
401      /**
402       * Constructor TextDelimited creates a new TextDelimited instance.
403       *
404       * @param fields    of type Fields
405       * @param delimiter of type String
406       * @param quote     of type String
407       */
408      @ConstructorProperties({"fields", "delimiter", "quote"})
409      public TextDelimited( Fields fields, String delimiter, String quote )
410        {
411        this( fields, false, delimiter, quote, null, true );
412        }
413    
414      /**
415       * Constructor TextDelimited creates a new TextDelimited instance.
416       *
417       * @param fields    of type Fields
418       * @param hasHeader of type boolean
419       * @param delimiter of type String
420       * @param quote     of type String
421       */
422      @ConstructorProperties({"fields", "hasHeader", "delimiter", "quote"})
423      public TextDelimited( Fields fields, boolean hasHeader, String delimiter, String quote )
424        {
425        this( fields, hasHeader, delimiter, quote, null, true );
426        }
427    
428      /**
429       * Constructor TextDelimited creates a new TextDelimited instance.
430       *
431       * @param fields      of type Fields
432       * @param hasHeader   of type boolean
433       * @param delimiter   of type String
434       * @param quote       of type String
435       * @param charsetName of type String
436       */
437      @ConstructorProperties({"fields", "hasHeader", "delimiter", "quote", "charsetName"})
438      public TextDelimited( Fields fields, boolean hasHeader, String delimiter, String quote, String charsetName )
439        {
440        this( fields, hasHeader, delimiter, quote, null, true, charsetName );
441        }
442    
443      /**
444       * Constructor TextDelimited creates a new TextDelimited instance.
445       *
446       * @param fields      of type Fields
447       * @param skipHeader  of type boolean
448       * @param writeHeader of type boolean
449       * @param delimiter   of type String
450       * @param strict      of type boolean
451       * @param quote       of type String
452       * @param types       of type Class[]
453       * @param safe        of type boolean
454       */
455      @ConstructorProperties({"fields", "skipHeader", "writeHeader", "delimiter", "strict", "quote", "types", "safe"})
456      public TextDelimited( Fields fields, boolean skipHeader, boolean writeHeader, String delimiter, boolean strict, String quote, Class[] types, boolean safe )
457        {
458        this( fields, skipHeader, writeHeader, delimiter, strict, quote, types, safe, DEFAULT_CHARSET );
459        }
460    
461      /**
462       * Constructor TextDelimited creates a new TextDelimited instance.
463       *
464       * @param fields      of type Fields
465       * @param skipHeader  of type boolean
466       * @param writeHeader of type boolean
467       * @param delimiter   of type String
468       * @param strict      of type boolean
469       * @param quote       of type String
470       * @param types       of type Class[]
471       * @param safe        of type boolean
472       * @param charsetName of type String
473       */
474      @ConstructorProperties({"fields", "skipHeader", "writeHeader", "delimiter", "strict", "quote", "types", "safe",
475                              "charsetName"})
476      public TextDelimited( Fields fields, boolean skipHeader, boolean writeHeader, String delimiter, boolean strict, String quote, Class[] types, boolean safe, String charsetName )
477        {
478        this( fields, skipHeader, writeHeader, charsetName, new DelimitedParser( delimiter, quote, types, strict, safe ) );
479        }
480    
481      /**
482       * Constructor TextDelimited creates a new TextDelimited instance.
483       *
484       * @param fields          of type Fields
485       * @param writeHeader     of type boolean
486       * @param delimitedParser of type DelimitedParser
487       */
488      @ConstructorProperties({"fields", "skipHeader", "writeHeader", "delimitedParser"})
489      public TextDelimited( Fields fields, boolean skipHeader, boolean writeHeader, DelimitedParser delimitedParser )
490        {
491        this( fields, skipHeader, writeHeader, null, delimitedParser );
492        }
493    
494      /**
495       * Constructor TextDelimited creates a new TextDelimited instance.
496       *
497       * @param fields          of type Fields
498       * @param hasHeader       of type boolean
499       * @param delimitedParser of type DelimitedParser
500       */
501      @ConstructorProperties({"fields", "hasHeader", "delimitedParser"})
502      public TextDelimited( Fields fields, boolean hasHeader, DelimitedParser delimitedParser )
503        {
504        this( fields, hasHeader, hasHeader, null, delimitedParser );
505        }
506    
507      /**
508       * Constructor TextDelimited creates a new TextDelimited instance.
509       *
510       * @param fields          of type Fields
511       * @param writeHeader     of type boolean
512       * @param charsetName     of type String
513       * @param delimitedParser of type DelimitedParser
514       */
515      @ConstructorProperties({"fields", "skipHeader", "writeHeader", "charsetName", "delimitedParser"})
516      public TextDelimited( Fields fields, boolean skipHeader, boolean writeHeader, String charsetName, DelimitedParser delimitedParser )
517        {
518        super( fields, fields );
519    
520        this.delimitedParser = delimitedParser;
521    
522        // normalizes ALL and UNKNOWN
523        // calls reset on delimitedParser
524        setSourceFields( fields );
525        setSinkFields( fields );
526    
527        this.skipHeader = skipHeader;
528        this.writeHeader = writeHeader;
529    
530        if( charsetName != null )
531          this.charsetName = charsetName;
532    
533        // throws an exception if not found
534        Charset.forName( this.charsetName );
535        }
536    
537      @Property(name = "charset", visibility = Visibility.PUBLIC)
538      @PropertyDescription("character set used.")
539      public String getCharsetName()
540        {
541        return charsetName;
542        }
543    
544      /**
545       * Method getDelimiter returns the delimiter used to parse fields from the current line of text.
546       *
547       * @return a String
548       */
549      @Property(name = "delimiter", visibility = Visibility.PUBLIC)
550      @PropertyDescription("The delimiter used to separate fields.")
551      public String getDelimiter()
552        {
553        return delimitedParser.getDelimiter();
554        }
555    
556      /**
557       * Method getQuote returns the quote string, if any, used to encapsulate each field in a line to delimited text.
558       *
559       * @return a String
560       */
561      @Property(name = "quote", visibility = Visibility.PUBLIC)
562      @PropertyDescription("The string used for quoting.")
563      public String getQuote()
564        {
565        return delimitedParser.getQuote();
566        }
567    
568      public LineNumberReader createInput( InputStream inputStream )
569        {
570        try
571          {
572          return new LineNumberReader( new InputStreamReader( inputStream, charsetName ) );
573          }
574        catch( UnsupportedEncodingException exception )
575          {
576          throw new TapException( exception );
577          }
578        }
579    
580      public PrintWriter createOutput( OutputStream outputStream )
581        {
582        try
583          {
584          return new PrintWriter( new OutputStreamWriter( outputStream, charsetName ) );
585          }
586        catch( UnsupportedEncodingException exception )
587          {
588          throw new TapException( exception );
589          }
590        }
591    
592      @Override
593      public void setSinkFields( Fields sinkFields )
594        {
595        super.setSourceFields( sinkFields );
596        super.setSinkFields( sinkFields );
597    
598        if( delimitedParser != null )
599          delimitedParser.reset( getSourceFields(), getSinkFields() );
600        }
601    
602      @Override
603      public void setSourceFields( Fields sourceFields )
604        {
605        super.setSourceFields( sourceFields );
606        super.setSinkFields( sourceFields );
607    
608        if( delimitedParser != null )
609          delimitedParser.reset( getSourceFields(), getSinkFields() );
610        }
611    
612      @Override
613      public boolean isSymmetrical()
614        {
615        return super.isSymmetrical() && skipHeader == writeHeader;
616        }
617    
618      @Override
619      public Fields retrieveSourceFields( FlowProcess<Properties> process, Tap tap )
620        {
621        if( !skipHeader || !getSourceFields().isUnknown() )
622          return getSourceFields();
623    
624        // no need to open them all
625        if( tap instanceof CompositeTap )
626          tap = (Tap) ( (CompositeTap) tap ).getChildTaps().next();
627    
628        tap = new FileTap( new TextLine( new Fields( "line" ), charsetName ), tap.getIdentifier() );
629    
630        setSourceFields( delimitedParser.parseFirstLine( process, tap ) );
631    
632        return getSourceFields();
633        }
634    
635      @Override
636      public void presentSourceFields( FlowProcess<Properties> process, Tap tap, Fields fields )
637        {
638        // do nothing
639        }
640    
641      @Override
642      public void presentSinkFields( FlowProcess<Properties> flowProcess, Tap tap, Fields fields )
643        {
644        if( writeHeader )
645          presentSinkFieldsInternal( fields );
646        }
647    
648      @Override
649      public void sourceConfInit( FlowProcess<Properties> flowProcess, Tap<Properties, InputStream, OutputStream> tap, Properties conf )
650        {
651        }
652    
653      @Override
654      public void sourcePrepare( FlowProcess<Properties> flowProcess, SourceCall<LineNumberReader, InputStream> sourceCall ) throws IOException
655        {
656        sourceCall.setContext( createInput( sourceCall.getInput() ) );
657    
658        sourceCall.getIncomingEntry().setTuple( TupleViews.createObjectArray() );
659        }
660    
661      @Override
662      public boolean source( FlowProcess<Properties> flowProcess, SourceCall<LineNumberReader, InputStream> sourceCall ) throws IOException
663        {
664        String line = sourceCall.getContext().readLine();
665    
666        if( line == null )
667          return false;
668    
669        if( skipHeader && sourceCall.getContext().getLineNumber() == 1 ) // todo: optimize this away
670          line = sourceCall.getContext().readLine();
671    
672        if( line == null )
673          return false;
674    
675        Object[] split = delimitedParser.parseLine( line );
676    
677        // assumption it is better to re-use than to construct new
678        Tuple tuple = sourceCall.getIncomingEntry().getTuple();
679    
680        TupleViews.reset( tuple, split );
681    
682        return true;
683        }
684    
685      @Override
686      public void sourceCleanup( FlowProcess<Properties> flowProcess, SourceCall<LineNumberReader, InputStream> sourceCall ) throws IOException
687        {
688        sourceCall.setContext( null );
689        }
690    
691      @Override
692      public void sinkConfInit( FlowProcess<Properties> flowProcess, Tap<Properties, InputStream, OutputStream> tap, Properties conf )
693        {
694        }
695    
696      @Override
697      public void sinkPrepare( FlowProcess<Properties> flowProcess, SinkCall<PrintWriter, OutputStream> sinkCall )
698        {
699        sinkCall.setContext( createOutput( sinkCall.getOutput() ) );
700    
701        if( writeHeader )
702          {
703          Fields fields = sinkCall.getOutgoingEntry().getFields();
704          delimitedParser.joinFirstLine( fields, sinkCall.getContext() );
705    
706          sinkCall.getContext().println();
707          }
708        }
709    
710      @Override
711      public void sink( FlowProcess<Properties> flowProcess, SinkCall<PrintWriter, OutputStream> sinkCall ) throws IOException
712        {
713        TupleEntry tupleEntry = sinkCall.getOutgoingEntry();
714    
715        Iterable<String> strings = tupleEntry.asIterableOf( String.class );
716    
717        delimitedParser.joinLine( strings, sinkCall.getContext() );
718    
719        sinkCall.getContext().println();
720        }
721    
722      @Override
723      public void sinkCleanup( FlowProcess<Properties> flowProcess, SinkCall<PrintWriter, OutputStream> sinkCall )
724        {
725        sinkCall.getContext().flush();
726        sinkCall.setContext( null );
727        }
728      }