001/*
002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.scheme.local;
022
023import java.beans.ConstructorProperties;
024import java.io.IOException;
025import java.io.InputStream;
026import java.io.InputStreamReader;
027import java.io.LineNumberReader;
028import java.io.OutputStream;
029import java.io.OutputStreamWriter;
030import java.io.PrintWriter;
031import java.io.UnsupportedEncodingException;
032import java.nio.charset.Charset;
033import java.util.Properties;
034
035import cascading.flow.FlowProcess;
036import cascading.management.annotation.Property;
037import cascading.management.annotation.PropertyDescription;
038import cascading.management.annotation.Visibility;
039import cascading.scheme.Scheme;
040import cascading.scheme.SinkCall;
041import cascading.scheme.SourceCall;
042import cascading.tap.Tap;
043import cascading.tap.TapException;
044import cascading.tuple.Fields;
045import cascading.tuple.TupleEntry;
046
047/**
048 * A TextLine is a type of {@link cascading.scheme.Scheme} for plain text files. Files are broken into
049 * lines. Either line-feed or carriage-return are used to signal end of line.
050 * <p/>
051 * By default, this scheme returns a {@link cascading.tuple.Tuple} with two fields, "num" and "line". Where "num"
052 * is the line number for "line".
053 * <p/>
054 * Many of the constructors take both "sourceFields" and "sinkFields". sourceFields denote the field names
055 * to be used instead of the names "num" and "line". sinkFields is a selector and is by default {@link Fields#ALL}.
056 * Any available field names can be given if only a subset of the incoming fields should be used.
057 * <p/>
058 * If a {@link Fields} instance is passed on the constructor as sourceFields having only one field, the return tuples
059 * will simply be the "line" value using the given field name.
060 * <p/>
061 * Note that TextLine will concatenate all the Tuple values for the selected fields with a TAB delimiter before
062 * writing out the line.
063 * <p/>
064 * By default, all text is encoded/decoded as UTF-8. This can be changed via the {@code charsetName} constructor
065 * argument.
066 */
067public class TextLine extends Scheme<Properties, InputStream, OutputStream, LineNumberReader, PrintWriter>
068  {
069  public static final String DEFAULT_CHARSET = "UTF-8";
070  public static final Fields DEFAULT_SOURCE_FIELDS = new Fields( "num", "line" ).applyTypes( Integer.TYPE, String.class );
071
072  private String charsetName = DEFAULT_CHARSET;
073
074  /**
075   * Creates a new TextLine instance that sources "num" and "line" fields, and sinks all incoming fields, where
076   * "num" is the line number of the line in the input file.
077   */
078  public TextLine()
079    {
080    super( DEFAULT_SOURCE_FIELDS, Fields.ALL );
081    }
082
083  /**
084   * Creates a new TextLine instance. If sourceFields has one field, only the text line will be returned in the
085   * subsequent tuples.
086   *
087   * @param sourceFields of Fields
088   */
089  @ConstructorProperties({"sourceFields"})
090  public TextLine( Fields sourceFields )
091    {
092    super( sourceFields );
093
094    verify( sourceFields );
095    }
096
097  /**
098   * Creates a new TextLine instance. If sourceFields has one field, only the text line will be returned in the
099   * subsequent tuples.
100   *
101   * @param sourceFields of Fields
102   * @param charsetName  of type String
103   */
104  @ConstructorProperties({"sourceFields", "charsetName"})
105  public TextLine( Fields sourceFields, String charsetName )
106    {
107    super( sourceFields );
108
109    // throws an exception if not found
110    setCharsetName( charsetName );
111
112    verify( sourceFields );
113    }
114
115  /**
116   * Creates a new TextLine instance. If sourceFields has one field, only the text line will be returned in the
117   * subsequent tuples.
118   *
119   * @param sourceFields of Fields
120   * @param sinkFields   of Fields
121   */
122  @ConstructorProperties({"sourceFields", "sinkFields"})
123  public TextLine( Fields sourceFields, Fields sinkFields )
124    {
125    super( sourceFields, sinkFields );
126
127    verify( sourceFields );
128    }
129
130  /**
131   * Creates a new TextLine instance. If sourceFields has one field, only the text line will be returned in the
132   * subsequent tuples.
133   *
134   * @param sourceFields of Fields
135   * @param sinkFields   of Fields
136   * @param charsetName  of type String
137   */
138  @ConstructorProperties({"sourceFields", "sinkFields", "charsetName"})
139  public TextLine( Fields sourceFields, Fields sinkFields, String charsetName )
140    {
141    super( sourceFields, sinkFields );
142
143    // throws an exception if not found
144    setCharsetName( charsetName );
145
146    verify( sourceFields );
147    }
148
149  private void setCharsetName( String charsetName )
150    {
151    if( charsetName != null )
152      this.charsetName = charsetName;
153
154    Charset.forName( this.charsetName );
155    }
156
157  @Property(name = "charset", visibility = Visibility.PUBLIC)
158  @PropertyDescription("character set used.")
159  public String getCharsetName()
160    {
161    return charsetName;
162    }
163
164  protected void verify( Fields sourceFields )
165    {
166    if( sourceFields.size() < 1 || sourceFields.size() > 2 )
167      throw new IllegalArgumentException( "this scheme requires either one or two source fields, given [" + sourceFields + "]" );
168    }
169
170  public LineNumberReader createInput( InputStream inputStream )
171    {
172    try
173      {
174      return new LineNumberReader( new InputStreamReader( inputStream, charsetName ) );
175      }
176    catch( UnsupportedEncodingException exception )
177      {
178      throw new TapException( exception );
179      }
180    }
181
182  public PrintWriter createOutput( OutputStream outputStream )
183    {
184    try
185      {
186      return new PrintWriter( new OutputStreamWriter( outputStream, charsetName ) );
187      }
188    catch( UnsupportedEncodingException exception )
189      {
190      throw new TapException( exception );
191      }
192    }
193
194  @Override
195  public void presentSourceFields( FlowProcess<? extends Properties> process, Tap tap, Fields fields )
196    {
197    // do nothing
198    }
199
200  @Override
201  public void presentSinkFields( FlowProcess<? extends Properties> process, Tap tap, Fields fields )
202    {
203    // do nothing
204    }
205
206  @Override
207  public void sourceConfInit( FlowProcess<? extends Properties> flowProcess, Tap<Properties, InputStream, OutputStream> tap, Properties conf )
208    {
209    }
210
211  @Override
212  public void sinkConfInit( FlowProcess<? extends Properties> flowProcess, Tap<Properties, InputStream, OutputStream> tap, Properties conf )
213    {
214    }
215
216  @Override
217  public void sourcePrepare( FlowProcess<? extends Properties> flowProcess, SourceCall<LineNumberReader, InputStream> sourceCall ) throws IOException
218    {
219    sourceCall.setContext( createInput( sourceCall.getInput() ) );
220    }
221
222  @Override
223  public boolean source( FlowProcess<? extends Properties> flowProcess, SourceCall<LineNumberReader, InputStream> sourceCall ) throws IOException
224    {
225    // first line is 0, this matches offset being zero, so when throwing out the first line for comments
226    int lineNumber = sourceCall.getContext().getLineNumber();
227    String line = sourceCall.getContext().readLine();
228
229    if( line == null )
230      return false;
231
232    TupleEntry incomingEntry = sourceCall.getIncomingEntry();
233
234    if( getSourceFields().size() == 1 )
235      {
236      incomingEntry.setObject( 0, line );
237      }
238    else
239      {
240      incomingEntry.setInteger( 0, lineNumber );
241      incomingEntry.setString( 1, line );
242      }
243
244    return true;
245    }
246
247  @Override
248  public void sourceCleanup( FlowProcess<? extends Properties> flowProcess, SourceCall<LineNumberReader, InputStream> sourceCall ) throws IOException
249    {
250    sourceCall.setContext( null );
251    }
252
253  @Override
254  public void sinkPrepare( FlowProcess<? extends Properties> flowProcess, SinkCall<PrintWriter, OutputStream> sinkCall ) throws IOException
255    {
256    sinkCall.setContext( createOutput( sinkCall.getOutput() ) );
257    }
258
259  @Override
260  public void sink( FlowProcess<? extends Properties> flowProcess, SinkCall<PrintWriter, OutputStream> sinkCall ) throws IOException
261    {
262    sinkCall.getContext().println( sinkCall.getOutgoingEntry().getTuple().toString() );
263    }
264
265  @Override
266  public void sinkCleanup( FlowProcess<? extends Properties> flowProcess, SinkCall<PrintWriter, OutputStream> sinkCall ) throws IOException
267    {
268    sinkCall.getContext().flush();
269    sinkCall.setContext( null );
270    }
271  }