001    /*
002     * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.scheme.local;
022    
023    import java.beans.ConstructorProperties;
024    import java.io.IOException;
025    import java.io.InputStream;
026    import java.io.InputStreamReader;
027    import java.io.LineNumberReader;
028    import java.io.OutputStream;
029    import java.io.OutputStreamWriter;
030    import java.io.PrintWriter;
031    import java.io.UnsupportedEncodingException;
032    import java.nio.charset.Charset;
033    import java.util.Properties;
034    
035    import cascading.flow.FlowProcess;
036    import cascading.management.annotation.Property;
037    import cascading.management.annotation.PropertyDescription;
038    import cascading.management.annotation.Visibility;
039    import cascading.scheme.Scheme;
040    import cascading.scheme.SinkCall;
041    import cascading.scheme.SourceCall;
042    import cascading.tap.Tap;
043    import cascading.tap.TapException;
044    import cascading.tuple.Fields;
045    import cascading.tuple.TupleEntry;
046    
047    /**
048     * A TextLine is a type of {@link cascading.scheme.Scheme} for plain text files. Files are broken into
049     * lines. Either line-feed or carriage-return are used to signal end of line.
050     * <p/>
051     * By default, this scheme returns a {@link cascading.tuple.Tuple} with two fields, "num" and "line". Where "num"
052     * is the line number for "line".
053     * <p/>
054     * Many of the constructors take both "sourceFields" and "sinkFields". sourceFields denote the field names
055     * to be used instead of the names "num" and "line". sinkFields is a selector and is by default {@link Fields#ALL}.
056     * Any available field names can be given if only a subset of the incoming fields should be used.
057     * <p/>
058     * If a {@link Fields} instance is passed on the constructor as sourceFields having only one field, the return tuples
059     * will simply be the "line" value using the given field name.
060     * <p/>
061     * Note that TextLine will concatenate all the Tuple values for the selected fields with a TAB delimiter before
062     * writing out the line.
063     * <p/>
064     * By default, all text is encoded/decoded as UTF-8. This can be changed via the {@code charsetName} constructor
065     * argument.
066     */
067    public class TextLine extends Scheme<Properties, InputStream, OutputStream, LineNumberReader, PrintWriter>
068      {
069      public static final String DEFAULT_CHARSET = "UTF-8";
070    
071      private String charsetName = DEFAULT_CHARSET;
072    
073      /**
074       * Creates a new TextLine instance that sources "num" and "line" fields, and sinks all incoming fields, where
075       * "num" is the line number of the line in the input file.
076       */
077      public TextLine()
078        {
079        super( new Fields( "num", "line" ), Fields.ALL );
080        }
081    
082      /**
083       * Creates a new TextLine instance. If sourceFields has one field, only the text line will be returned in the
084       * subsequent tuples.
085       *
086       * @param sourceFields of Fields
087       */
088      @ConstructorProperties({"sourceFields"})
089      public TextLine( Fields sourceFields )
090        {
091        super( sourceFields );
092    
093        verify( sourceFields );
094        }
095    
096      /**
097       * Creates a new TextLine instance. If sourceFields has one field, only the text line will be returned in the
098       * subsequent tuples.
099       *
100       * @param sourceFields of Fields
101       * @param charsetName  of type String
102       */
103      @ConstructorProperties({"sourceFields", "charsetName"})
104      public TextLine( Fields sourceFields, String charsetName )
105        {
106        super( sourceFields );
107    
108        // throws an exception if not found
109        setCharsetName( charsetName );
110    
111        verify( sourceFields );
112        }
113    
114      /**
115       * Creates a new TextLine instance. If sourceFields has one field, only the text line will be returned in the
116       * subsequent tuples.
117       *
118       * @param sourceFields of Fields
119       * @param sinkFields   of Fields
120       */
121      @ConstructorProperties({"sourceFields", "sinkFields"})
122      public TextLine( Fields sourceFields, Fields sinkFields )
123        {
124        super( sourceFields, sinkFields );
125    
126        verify( sourceFields );
127        }
128    
129      /**
130       * Creates a new TextLine instance. If sourceFields has one field, only the text line will be returned in the
131       * subsequent tuples.
132       *
133       * @param sourceFields of Fields
134       * @param sinkFields   of Fields
135       * @param charsetName  of type String
136       */
137      @ConstructorProperties({"sourceFields", "sinkFields", "charsetName"})
138      public TextLine( Fields sourceFields, Fields sinkFields, String charsetName )
139        {
140        super( sourceFields, sinkFields );
141    
142        // throws an exception if not found
143        setCharsetName( charsetName );
144    
145        verify( sourceFields );
146        }
147    
148      private void setCharsetName( String charsetName )
149        {
150        if( charsetName != null )
151          this.charsetName = charsetName;
152    
153        Charset.forName( this.charsetName );
154        }
155    
156      @Property(name = "charset", visibility = Visibility.PUBLIC)
157      @PropertyDescription("character set used.")
158      public String getCharsetName()
159        {
160        return charsetName;
161        }
162    
163      protected void verify( Fields sourceFields )
164        {
165        if( sourceFields.size() < 1 || sourceFields.size() > 2 )
166          throw new IllegalArgumentException( "this scheme requires either one or two source fields, given [" + sourceFields + "]" );
167        }
168    
169      public LineNumberReader createInput( InputStream inputStream )
170        {
171        try
172          {
173          return new LineNumberReader( new InputStreamReader( inputStream, charsetName ) );
174          }
175        catch( UnsupportedEncodingException exception )
176          {
177          throw new TapException( exception );
178          }
179        }
180    
181      public PrintWriter createOutput( OutputStream outputStream )
182        {
183        try
184          {
185          return new PrintWriter( new OutputStreamWriter( outputStream, charsetName ) );
186          }
187        catch( UnsupportedEncodingException exception )
188          {
189          throw new TapException( exception );
190          }
191        }
192    
193      @Override
194      public void presentSourceFields( FlowProcess<Properties> process, Tap tap, Fields fields )
195        {
196        // do nothing
197        }
198    
199      @Override
200      public void presentSinkFields( FlowProcess<Properties> process, Tap tap, Fields fields )
201        {
202        // do nothing
203        }
204    
205      @Override
206      public void sourceConfInit( FlowProcess<Properties> flowProcess, Tap<Properties, InputStream, OutputStream> tap, Properties conf )
207        {
208        }
209    
210      @Override
211      public void sinkConfInit( FlowProcess<Properties> flowProcess, Tap<Properties, InputStream, OutputStream> tap, Properties conf )
212        {
213        }
214    
215      @Override
216      public void sourcePrepare( FlowProcess<Properties> flowProcess, SourceCall<LineNumberReader, InputStream> sourceCall ) throws IOException
217        {
218        sourceCall.setContext( createInput( sourceCall.getInput() ) );
219        }
220    
221      @Override
222      public boolean source( FlowProcess<Properties> flowProcess, SourceCall<LineNumberReader, InputStream> sourceCall ) throws IOException
223        {
224        // first line is 0, this matches offset being zero, so when throwing out the first line for comments
225        int lineNumber = sourceCall.getContext().getLineNumber();
226        String line = sourceCall.getContext().readLine();
227    
228        if( line == null )
229          return false;
230    
231        TupleEntry incomingEntry = sourceCall.getIncomingEntry();
232    
233        if( getSourceFields().size() == 1 )
234          {
235          incomingEntry.setObject( 0, line );
236          }
237        else
238          {
239          incomingEntry.setInteger( 0, lineNumber );
240          incomingEntry.setString( 1, line );
241          }
242    
243        return true;
244        }
245    
246      @Override
247      public void sourceCleanup( FlowProcess<Properties> flowProcess, SourceCall<LineNumberReader, InputStream> sourceCall ) throws IOException
248        {
249        sourceCall.setContext( null );
250        }
251    
252      @Override
253      public void sinkPrepare( FlowProcess<Properties> flowProcess, SinkCall<PrintWriter, OutputStream> sinkCall ) throws IOException
254        {
255        sinkCall.setContext( createOutput( sinkCall.getOutput() ) );
256        }
257    
258      @Override
259      public void sink( FlowProcess<Properties> flowProcess, SinkCall<PrintWriter, OutputStream> sinkCall ) throws IOException
260        {
261        sinkCall.getContext().println( sinkCall.getOutgoingEntry().getTuple().toString() );
262        }
263    
264      @Override
265      public void sinkCleanup( FlowProcess<Properties> flowProcess, SinkCall<PrintWriter, OutputStream> sinkCall ) throws IOException
266        {
267        sinkCall.getContext().flush();
268        sinkCall.setContext( null );
269        }
270      }