001 /* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.scheme.local; 022 023 import java.beans.ConstructorProperties; 024 import java.io.IOException; 025 import java.io.InputStream; 026 import java.io.InputStreamReader; 027 import java.io.LineNumberReader; 028 import java.io.OutputStream; 029 import java.io.OutputStreamWriter; 030 import java.io.PrintWriter; 031 import java.io.UnsupportedEncodingException; 032 import java.nio.charset.Charset; 033 import java.util.Properties; 034 035 import cascading.flow.FlowProcess; 036 import cascading.management.annotation.Property; 037 import cascading.management.annotation.PropertyDescription; 038 import cascading.management.annotation.Visibility; 039 import cascading.scheme.Scheme; 040 import cascading.scheme.SinkCall; 041 import cascading.scheme.SourceCall; 042 import cascading.tap.Tap; 043 import cascading.tap.TapException; 044 import cascading.tuple.Fields; 045 import cascading.tuple.TupleEntry; 046 047 /** 048 * A TextLine is a type of {@link cascading.scheme.Scheme} for plain text files. Files are broken into 049 * lines. Either line-feed or carriage-return are used to signal end of line. 050 * <p/> 051 * By default, this scheme returns a {@link cascading.tuple.Tuple} with two fields, "num" and "line". Where "num" 052 * is the line number for "line". 053 * <p/> 054 * Many of the constructors take both "sourceFields" and "sinkFields". sourceFields denote the field names 055 * to be used instead of the names "num" and "line". sinkFields is a selector and is by default {@link Fields#ALL}. 056 * Any available field names can be given if only a subset of the incoming fields should be used. 057 * <p/> 058 * If a {@link Fields} instance is passed on the constructor as sourceFields having only one field, the return tuples 059 * will simply be the "line" value using the given field name. 060 * <p/> 061 * Note that TextLine will concatenate all the Tuple values for the selected fields with a TAB delimiter before 062 * writing out the line. 063 * <p/> 064 * By default, all text is encoded/decoded as UTF-8. This can be changed via the {@code charsetName} constructor 065 * argument. 066 */ 067 public class TextLine extends Scheme<Properties, InputStream, OutputStream, LineNumberReader, PrintWriter> 068 { 069 public static final String DEFAULT_CHARSET = "UTF-8"; 070 071 private String charsetName = DEFAULT_CHARSET; 072 073 /** 074 * Creates a new TextLine instance that sources "num" and "line" fields, and sinks all incoming fields, where 075 * "num" is the line number of the line in the input file. 076 */ 077 public TextLine() 078 { 079 super( new Fields( "num", "line" ), Fields.ALL ); 080 } 081 082 /** 083 * Creates a new TextLine instance. If sourceFields has one field, only the text line will be returned in the 084 * subsequent tuples. 085 * 086 * @param sourceFields of Fields 087 */ 088 @ConstructorProperties({"sourceFields"}) 089 public TextLine( Fields sourceFields ) 090 { 091 super( sourceFields ); 092 093 verify( sourceFields ); 094 } 095 096 /** 097 * Creates a new TextLine instance. If sourceFields has one field, only the text line will be returned in the 098 * subsequent tuples. 099 * 100 * @param sourceFields of Fields 101 * @param charsetName of type String 102 */ 103 @ConstructorProperties({"sourceFields", "charsetName"}) 104 public TextLine( Fields sourceFields, String charsetName ) 105 { 106 super( sourceFields ); 107 108 // throws an exception if not found 109 setCharsetName( charsetName ); 110 111 verify( sourceFields ); 112 } 113 114 /** 115 * Creates a new TextLine instance. If sourceFields has one field, only the text line will be returned in the 116 * subsequent tuples. 117 * 118 * @param sourceFields of Fields 119 * @param sinkFields of Fields 120 */ 121 @ConstructorProperties({"sourceFields", "sinkFields"}) 122 public TextLine( Fields sourceFields, Fields sinkFields ) 123 { 124 super( sourceFields, sinkFields ); 125 126 verify( sourceFields ); 127 } 128 129 /** 130 * Creates a new TextLine instance. If sourceFields has one field, only the text line will be returned in the 131 * subsequent tuples. 132 * 133 * @param sourceFields of Fields 134 * @param sinkFields of Fields 135 * @param charsetName of type String 136 */ 137 @ConstructorProperties({"sourceFields", "sinkFields", "charsetName"}) 138 public TextLine( Fields sourceFields, Fields sinkFields, String charsetName ) 139 { 140 super( sourceFields, sinkFields ); 141 142 // throws an exception if not found 143 setCharsetName( charsetName ); 144 145 verify( sourceFields ); 146 } 147 148 private void setCharsetName( String charsetName ) 149 { 150 if( charsetName != null ) 151 this.charsetName = charsetName; 152 153 Charset.forName( this.charsetName ); 154 } 155 156 @Property(name = "charset", visibility = Visibility.PUBLIC) 157 @PropertyDescription("character set used.") 158 public String getCharsetName() 159 { 160 return charsetName; 161 } 162 163 protected void verify( Fields sourceFields ) 164 { 165 if( sourceFields.size() < 1 || sourceFields.size() > 2 ) 166 throw new IllegalArgumentException( "this scheme requires either one or two source fields, given [" + sourceFields + "]" ); 167 } 168 169 public LineNumberReader createInput( InputStream inputStream ) 170 { 171 try 172 { 173 return new LineNumberReader( new InputStreamReader( inputStream, charsetName ) ); 174 } 175 catch( UnsupportedEncodingException exception ) 176 { 177 throw new TapException( exception ); 178 } 179 } 180 181 public PrintWriter createOutput( OutputStream outputStream ) 182 { 183 try 184 { 185 return new PrintWriter( new OutputStreamWriter( outputStream, charsetName ) ); 186 } 187 catch( UnsupportedEncodingException exception ) 188 { 189 throw new TapException( exception ); 190 } 191 } 192 193 @Override 194 public void presentSourceFields( FlowProcess<Properties> process, Tap tap, Fields fields ) 195 { 196 // do nothing 197 } 198 199 @Override 200 public void presentSinkFields( FlowProcess<Properties> process, Tap tap, Fields fields ) 201 { 202 // do nothing 203 } 204 205 @Override 206 public void sourceConfInit( FlowProcess<Properties> flowProcess, Tap<Properties, InputStream, OutputStream> tap, Properties conf ) 207 { 208 } 209 210 @Override 211 public void sinkConfInit( FlowProcess<Properties> flowProcess, Tap<Properties, InputStream, OutputStream> tap, Properties conf ) 212 { 213 } 214 215 @Override 216 public void sourcePrepare( FlowProcess<Properties> flowProcess, SourceCall<LineNumberReader, InputStream> sourceCall ) throws IOException 217 { 218 sourceCall.setContext( createInput( sourceCall.getInput() ) ); 219 } 220 221 @Override 222 public boolean source( FlowProcess<Properties> flowProcess, SourceCall<LineNumberReader, InputStream> sourceCall ) throws IOException 223 { 224 // first line is 0, this matches offset being zero, so when throwing out the first line for comments 225 int lineNumber = sourceCall.getContext().getLineNumber(); 226 String line = sourceCall.getContext().readLine(); 227 228 if( line == null ) 229 return false; 230 231 TupleEntry incomingEntry = sourceCall.getIncomingEntry(); 232 233 if( getSourceFields().size() == 1 ) 234 { 235 incomingEntry.setObject( 0, line ); 236 } 237 else 238 { 239 incomingEntry.setInteger( 0, lineNumber ); 240 incomingEntry.setString( 1, line ); 241 } 242 243 return true; 244 } 245 246 @Override 247 public void sourceCleanup( FlowProcess<Properties> flowProcess, SourceCall<LineNumberReader, InputStream> sourceCall ) throws IOException 248 { 249 sourceCall.setContext( null ); 250 } 251 252 @Override 253 public void sinkPrepare( FlowProcess<Properties> flowProcess, SinkCall<PrintWriter, OutputStream> sinkCall ) throws IOException 254 { 255 sinkCall.setContext( createOutput( sinkCall.getOutput() ) ); 256 } 257 258 @Override 259 public void sink( FlowProcess<Properties> flowProcess, SinkCall<PrintWriter, OutputStream> sinkCall ) throws IOException 260 { 261 sinkCall.getContext().println( sinkCall.getOutgoingEntry().getTuple().toString() ); 262 } 263 264 @Override 265 public void sinkCleanup( FlowProcess<Properties> flowProcess, SinkCall<PrintWriter, OutputStream> sinkCall ) throws IOException 266 { 267 sinkCall.getContext().flush(); 268 sinkCall.setContext( null ); 269 } 270 }