001/* 002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.scheme.local; 022 023import java.beans.ConstructorProperties; 024import java.io.IOException; 025import java.io.InputStream; 026import java.io.InputStreamReader; 027import java.io.LineNumberReader; 028import java.io.OutputStream; 029import java.io.OutputStreamWriter; 030import java.io.PrintWriter; 031import java.io.UnsupportedEncodingException; 032import java.nio.charset.Charset; 033import java.util.Properties; 034 035import cascading.flow.FlowProcess; 036import cascading.management.annotation.Property; 037import cascading.management.annotation.PropertyDescription; 038import cascading.management.annotation.Visibility; 039import cascading.scheme.Scheme; 040import cascading.scheme.SinkCall; 041import cascading.scheme.SourceCall; 042import cascading.tap.Tap; 043import cascading.tap.TapException; 044import cascading.tuple.Fields; 045import cascading.tuple.TupleEntry; 046 047/** 048 * A TextLine is a type of {@link cascading.scheme.Scheme} for plain text files. Files are broken into 049 * lines. Either line-feed or carriage-return are used to signal end of line. 050 * <p/> 051 * By default, this scheme returns a {@link cascading.tuple.Tuple} with two fields, "num" and "line". Where "num" 052 * is the line number for "line". 053 * <p/> 054 * Many of the constructors take both "sourceFields" and "sinkFields". sourceFields denote the field names 055 * to be used instead of the names "num" and "line". sinkFields is a selector and is by default {@link Fields#ALL}. 056 * Any available field names can be given if only a subset of the incoming fields should be used. 057 * <p/> 058 * If a {@link Fields} instance is passed on the constructor as sourceFields having only one field, the return tuples 059 * will simply be the "line" value using the given field name. 060 * <p/> 061 * Note that TextLine will concatenate all the Tuple values for the selected fields with a TAB delimiter before 062 * writing out the line. 063 * <p/> 064 * By default, all text is encoded/decoded as UTF-8. This can be changed via the {@code charsetName} constructor 065 * argument. 066 */ 067public class TextLine extends Scheme<Properties, InputStream, OutputStream, LineNumberReader, PrintWriter> 068 { 069 public static final String DEFAULT_CHARSET = "UTF-8"; 070 public static final Fields DEFAULT_SOURCE_FIELDS = new Fields( "num", "line" ).applyTypes( Integer.TYPE, String.class ); 071 072 private String charsetName = DEFAULT_CHARSET; 073 074 /** 075 * Creates a new TextLine instance that sources "num" and "line" fields, and sinks all incoming fields, where 076 * "num" is the line number of the line in the input file. 077 */ 078 public TextLine() 079 { 080 super( DEFAULT_SOURCE_FIELDS, Fields.ALL ); 081 } 082 083 /** 084 * Creates a new TextLine instance. If sourceFields has one field, only the text line will be returned in the 085 * subsequent tuples. 086 * 087 * @param sourceFields of Fields 088 */ 089 @ConstructorProperties({"sourceFields"}) 090 public TextLine( Fields sourceFields ) 091 { 092 super( sourceFields ); 093 094 verify( sourceFields ); 095 } 096 097 /** 098 * Creates a new TextLine instance. If sourceFields has one field, only the text line will be returned in the 099 * subsequent tuples. 100 * 101 * @param sourceFields of Fields 102 * @param charsetName of type String 103 */ 104 @ConstructorProperties({"sourceFields", "charsetName"}) 105 public TextLine( Fields sourceFields, String charsetName ) 106 { 107 super( sourceFields ); 108 109 // throws an exception if not found 110 setCharsetName( charsetName ); 111 112 verify( sourceFields ); 113 } 114 115 /** 116 * Creates a new TextLine instance. If sourceFields has one field, only the text line will be returned in the 117 * subsequent tuples. 118 * 119 * @param sourceFields of Fields 120 * @param sinkFields of Fields 121 */ 122 @ConstructorProperties({"sourceFields", "sinkFields"}) 123 public TextLine( Fields sourceFields, Fields sinkFields ) 124 { 125 super( sourceFields, sinkFields ); 126 127 verify( sourceFields ); 128 } 129 130 /** 131 * Creates a new TextLine instance. If sourceFields has one field, only the text line will be returned in the 132 * subsequent tuples. 133 * 134 * @param sourceFields of Fields 135 * @param sinkFields of Fields 136 * @param charsetName of type String 137 */ 138 @ConstructorProperties({"sourceFields", "sinkFields", "charsetName"}) 139 public TextLine( Fields sourceFields, Fields sinkFields, String charsetName ) 140 { 141 super( sourceFields, sinkFields ); 142 143 // throws an exception if not found 144 setCharsetName( charsetName ); 145 146 verify( sourceFields ); 147 } 148 149 private void setCharsetName( String charsetName ) 150 { 151 if( charsetName != null ) 152 this.charsetName = charsetName; 153 154 Charset.forName( this.charsetName ); 155 } 156 157 @Property(name = "charset", visibility = Visibility.PUBLIC) 158 @PropertyDescription("character set used.") 159 public String getCharsetName() 160 { 161 return charsetName; 162 } 163 164 protected void verify( Fields sourceFields ) 165 { 166 if( sourceFields.size() < 1 || sourceFields.size() > 2 ) 167 throw new IllegalArgumentException( "this scheme requires either one or two source fields, given [" + sourceFields + "]" ); 168 } 169 170 public LineNumberReader createInput( InputStream inputStream ) 171 { 172 try 173 { 174 return new LineNumberReader( new InputStreamReader( inputStream, charsetName ) ); 175 } 176 catch( UnsupportedEncodingException exception ) 177 { 178 throw new TapException( exception ); 179 } 180 } 181 182 public PrintWriter createOutput( OutputStream outputStream ) 183 { 184 try 185 { 186 return new PrintWriter( new OutputStreamWriter( outputStream, charsetName ) ); 187 } 188 catch( UnsupportedEncodingException exception ) 189 { 190 throw new TapException( exception ); 191 } 192 } 193 194 @Override 195 public void presentSourceFields( FlowProcess<? extends Properties> process, Tap tap, Fields fields ) 196 { 197 // do nothing 198 } 199 200 @Override 201 public void presentSinkFields( FlowProcess<? extends Properties> process, Tap tap, Fields fields ) 202 { 203 // do nothing 204 } 205 206 @Override 207 public void sourceConfInit( FlowProcess<? extends Properties> flowProcess, Tap<Properties, InputStream, OutputStream> tap, Properties conf ) 208 { 209 } 210 211 @Override 212 public void sinkConfInit( FlowProcess<? extends Properties> flowProcess, Tap<Properties, InputStream, OutputStream> tap, Properties conf ) 213 { 214 } 215 216 @Override 217 public void sourcePrepare( FlowProcess<? extends Properties> flowProcess, SourceCall<LineNumberReader, InputStream> sourceCall ) throws IOException 218 { 219 sourceCall.setContext( createInput( sourceCall.getInput() ) ); 220 } 221 222 @Override 223 public boolean source( FlowProcess<? extends Properties> flowProcess, SourceCall<LineNumberReader, InputStream> sourceCall ) throws IOException 224 { 225 // first line is 0, this matches offset being zero, so when throwing out the first line for comments 226 int lineNumber = sourceCall.getContext().getLineNumber(); 227 String line = sourceCall.getContext().readLine(); 228 229 if( line == null ) 230 return false; 231 232 TupleEntry incomingEntry = sourceCall.getIncomingEntry(); 233 234 if( getSourceFields().size() == 1 ) 235 { 236 incomingEntry.setObject( 0, line ); 237 } 238 else 239 { 240 incomingEntry.setInteger( 0, lineNumber ); 241 incomingEntry.setString( 1, line ); 242 } 243 244 return true; 245 } 246 247 @Override 248 public void sourceCleanup( FlowProcess<? extends Properties> flowProcess, SourceCall<LineNumberReader, InputStream> sourceCall ) throws IOException 249 { 250 sourceCall.setContext( null ); 251 } 252 253 @Override 254 public void sinkPrepare( FlowProcess<? extends Properties> flowProcess, SinkCall<PrintWriter, OutputStream> sinkCall ) throws IOException 255 { 256 sinkCall.setContext( createOutput( sinkCall.getOutput() ) ); 257 } 258 259 @Override 260 public void sink( FlowProcess<? extends Properties> flowProcess, SinkCall<PrintWriter, OutputStream> sinkCall ) throws IOException 261 { 262 sinkCall.getContext().println( sinkCall.getOutgoingEntry().getTuple().toString() ); 263 } 264 265 @Override 266 public void sinkCleanup( FlowProcess<? extends Properties> flowProcess, SinkCall<PrintWriter, OutputStream> sinkCall ) throws IOException 267 { 268 sinkCall.getContext().flush(); 269 sinkCall.setContext( null ); 270 } 271 }