001 /* 002 * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.scheme.hadoop; 022 023 import java.beans.ConstructorProperties; 024 import java.io.IOException; 025 import java.nio.charset.Charset; 026 import java.util.Arrays; 027 028 import cascading.flow.FlowProcess; 029 import cascading.scheme.Scheme; 030 import cascading.scheme.SinkCall; 031 import cascading.scheme.SourceCall; 032 import cascading.tap.Tap; 033 import cascading.tuple.Fields; 034 import cascading.tuple.Tuple; 035 import cascading.tuple.TupleEntry; 036 import org.apache.hadoop.fs.Path; 037 import org.apache.hadoop.io.LongWritable; 038 import org.apache.hadoop.io.Text; 039 import org.apache.hadoop.mapred.FileInputFormat; 040 import org.apache.hadoop.mapred.FileOutputFormat; 041 import org.apache.hadoop.mapred.JobConf; 042 import org.apache.hadoop.mapred.OutputCollector; 043 import org.apache.hadoop.mapred.RecordReader; 044 import org.apache.hadoop.mapred.TextInputFormat; 045 import org.apache.hadoop.mapred.TextOutputFormat; 046 047 /** 048 * A TextLine is a type of {@link cascading.scheme.Scheme} for plain text files. Files are broken into 049 * lines. Either line-feed or carriage-return are used to signal end of line. 050 * <p/> 051 * By default, this scheme returns a {@link Tuple} with two fields, "offset" and "line". 052 * <p/> 053 * Many of the constructors take both "sourceFields" and "sinkFields". sourceFields denote the field names 054 * to be used instead of the names "offset" and "line". sinkFields is a selector and is by default {@link Fields#ALL}. 055 * Any available field names can be given if only a subset of the incoming fields should be used. 056 * <p/> 057 * If a {@link Fields} instance is passed on the constructor as sourceFields having only one field, the return tuples 058 * will simply be the "line" value using the given field name. 059 * <p/> 060 * Note that TextLine will concatenate all the Tuple values for the selected fields with a TAB delimiter before 061 * writing out the line. 062 * <p/> 063 * Note sink compression is {@link Compress#DISABLE} by default. If {@code null} is passed to the constructor 064 * for the compression value, it will remain disabled. 065 * <p/> 066 * If any of the input files end with ".zip", an error will be thrown. 067 * * <p/> 068 * By default, all text is encoded/decoded as UTF-8. This can be changed via the {@code charsetName} constructor 069 * argument. 070 */ 071 public class TextLine extends Scheme<JobConf, RecordReader, OutputCollector, Object[], Object[]> 072 { 073 public enum Compress 074 { 075 DEFAULT, ENABLE, DISABLE 076 } 077 078 public static final String DEFAULT_CHARSET = "UTF-8"; 079 080 /** Field serialVersionUID */ 081 private static final long serialVersionUID = 1L; 082 /** Field DEFAULT_SOURCE_FIELDS */ 083 public static final Fields DEFAULT_SOURCE_FIELDS = new Fields( "offset", "line" ); 084 085 /** Field sinkCompression */ 086 Compress sinkCompression = Compress.DISABLE; 087 088 String charsetName = DEFAULT_CHARSET; 089 090 /** 091 * Creates a new TextLine instance that sources "offset" and "line" fields, and sinks all incoming fields, where 092 * "offset" is the byte offset in the input file. 093 */ 094 public TextLine() 095 { 096 super( DEFAULT_SOURCE_FIELDS ); 097 } 098 099 /** 100 * Creates a new TextLine instance that sources "offset" and "line" fields, and sinks all incoming fields, where 101 * "offset" is the byte offset in the input file. 102 * 103 * @param numSinkParts of type int 104 */ 105 @ConstructorProperties({"numSinkParts"}) 106 public TextLine( int numSinkParts ) 107 { 108 super( DEFAULT_SOURCE_FIELDS, numSinkParts ); 109 } 110 111 /** 112 * Creates a new TextLine instance that sources "offset" and "line" fields, and sinks all incoming fields, where 113 * "offset" is the byte offset in the input file. 114 * 115 * @param sinkCompression of type Compress 116 */ 117 @ConstructorProperties({"sinkCompression"}) 118 public TextLine( Compress sinkCompression ) 119 { 120 super( DEFAULT_SOURCE_FIELDS ); 121 122 setSinkCompression( sinkCompression ); 123 } 124 125 /** 126 * Creates a new TextLine instance. If sourceFields has one field, only the text line will be returned in the 127 * subsequent tuples. 128 * 129 * @param sourceFields the source fields for this scheme 130 * @param sinkFields the sink fields for this scheme 131 */ 132 @ConstructorProperties({"sourceFields", "sinkFields"}) 133 public TextLine( Fields sourceFields, Fields sinkFields ) 134 { 135 super( sourceFields, sinkFields ); 136 137 verify( sourceFields ); 138 } 139 140 /** 141 * Creates a new TextLine instance. If sourceFields has one field, only the text line will be returned in the 142 * subsequent tuples. 143 * 144 * @param sourceFields the source fields for this scheme 145 * @param sinkFields the sink fields for this scheme 146 * @param charsetName of type String 147 */ 148 @ConstructorProperties({"sourceFields", "sinkFields", "charsetName"}) 149 public TextLine( Fields sourceFields, Fields sinkFields, String charsetName ) 150 { 151 super( sourceFields, sinkFields ); 152 153 // throws an exception if not found 154 setCharsetName( charsetName ); 155 156 verify( sourceFields ); 157 } 158 159 /** 160 * Creates a new TextLine instance. If sourceFields has one field, only the text line will be returned in the 161 * subsequent tuples. 162 * 163 * @param sourceFields the source fields for this scheme 164 * @param sinkFields the sink fields for this scheme 165 * @param numSinkParts of type int 166 */ 167 @ConstructorProperties({"sourceFields", "sinkFields", "numSinkParts"}) 168 public TextLine( Fields sourceFields, Fields sinkFields, int numSinkParts ) 169 { 170 super( sourceFields, sinkFields, numSinkParts ); 171 172 verify( sourceFields ); 173 } 174 175 /** 176 * Constructor TextLine creates a new TextLine instance. If sourceFields has one field, only the text line will be returned in the 177 * subsequent tuples. 178 * 179 * @param sourceFields of type Fields 180 * @param sinkFields of type Fields 181 * @param sinkCompression of type Compress 182 */ 183 @ConstructorProperties({"sourceFields", "sinkFields", "sinkCompression"}) 184 public TextLine( Fields sourceFields, Fields sinkFields, Compress sinkCompression ) 185 { 186 super( sourceFields, sinkFields ); 187 188 setSinkCompression( sinkCompression ); 189 190 verify( sourceFields ); 191 } 192 193 /** 194 * Constructor TextLine creates a new TextLine instance. If sourceFields has one field, only the text line will be returned in the 195 * subsequent tuples. 196 * 197 * @param sourceFields of type Fields 198 * @param sinkFields of type Fields 199 * @param sinkCompression of type Compress 200 * @param charsetName of type String 201 */ 202 @ConstructorProperties({"sourceFields", "sinkFields", "sinkCompression", "charsetName"}) 203 public TextLine( Fields sourceFields, Fields sinkFields, Compress sinkCompression, String charsetName ) 204 { 205 super( sourceFields, sinkFields ); 206 207 setSinkCompression( sinkCompression ); 208 209 // throws an exception if not found 210 setCharsetName( charsetName ); 211 212 verify( sourceFields ); 213 } 214 215 /** 216 * Constructor TextLine creates a new TextLine instance. If sourceFields has one field, only the text line will be returned in the 217 * subsequent tuples. 218 * 219 * @param sourceFields of type Fields 220 * @param sinkFields of type Fields 221 * @param sinkCompression of type Compress 222 * @param numSinkParts of type int 223 */ 224 @ConstructorProperties({"sourceFields", "sinkFields", "sinkCompression", "numSinkParts"}) 225 public TextLine( Fields sourceFields, Fields sinkFields, Compress sinkCompression, int numSinkParts ) 226 { 227 super( sourceFields, sinkFields, numSinkParts ); 228 229 setSinkCompression( sinkCompression ); 230 231 verify( sourceFields ); 232 } 233 234 /** 235 * Constructor TextLine creates a new TextLine instance. If sourceFields has one field, only the text line will be returned in the 236 * subsequent tuples. 237 * 238 * @param sourceFields of type Fields 239 * @param sinkFields of type Fields 240 * @param sinkCompression of type Compress 241 * @param numSinkParts of type int 242 * @param charsetName of type String 243 */ 244 @ConstructorProperties({"sourceFields", "sinkFields", "sinkCompression", "numSinkParts", "charsetName"}) 245 public TextLine( Fields sourceFields, Fields sinkFields, Compress sinkCompression, int numSinkParts, String charsetName ) 246 { 247 super( sourceFields, sinkFields, numSinkParts ); 248 249 setSinkCompression( sinkCompression ); 250 251 // throws an exception if not found 252 setCharsetName( charsetName ); 253 254 verify( sourceFields ); 255 } 256 257 /** 258 * Creates a new TextLine instance. If sourceFields has one field, only the text line will be returned in the 259 * subsequent tuples. 260 * 261 * @param sourceFields the source fields for this scheme 262 */ 263 @ConstructorProperties({"sourceFields"}) 264 public TextLine( Fields sourceFields ) 265 { 266 super( sourceFields ); 267 268 verify( sourceFields ); 269 } 270 271 /** 272 * Creates a new TextLine instance. If sourceFields has one field, only the text line will be returned in the 273 * subsequent tuples. 274 * 275 * @param sourceFields the source fields for this scheme 276 * @param charsetName of type String 277 */ 278 @ConstructorProperties({"sourceFields", "charsetName"}) 279 public TextLine( Fields sourceFields, String charsetName ) 280 { 281 super( sourceFields ); 282 283 // throws an exception if not found 284 setCharsetName( charsetName ); 285 286 verify( sourceFields ); 287 } 288 289 /** 290 * Creates a new TextLine instance. If sourceFields has one field, only the text line will be returned in the 291 * subsequent tuples. The resulting data set will have numSinkParts. 292 * 293 * @param sourceFields the source fields for this scheme 294 * @param numSinkParts of type int 295 */ 296 @ConstructorProperties({"sourceFields", "numSinkParts"}) 297 public TextLine( Fields sourceFields, int numSinkParts ) 298 { 299 super( sourceFields, numSinkParts ); 300 301 verify( sourceFields ); 302 } 303 304 protected void setCharsetName( String charsetName ) 305 { 306 if( charsetName != null ) 307 this.charsetName = charsetName; 308 309 Charset.forName( this.charsetName ); 310 } 311 312 protected void verify( Fields sourceFields ) 313 { 314 if( sourceFields.size() < 1 || sourceFields.size() > 2 ) 315 throw new IllegalArgumentException( "this scheme requires either one or two source fields, given [" + sourceFields + "]" ); 316 } 317 318 /** 319 * Method getSinkCompression returns the sinkCompression of this TextLine object. 320 * 321 * @return the sinkCompression (type Compress) of this TextLine object. 322 */ 323 public Compress getSinkCompression() 324 { 325 return sinkCompression; 326 } 327 328 /** 329 * Method setSinkCompression sets the sinkCompression of this TextLine object. If null, compression will remain disabled. 330 * 331 * @param sinkCompression the sinkCompression of this TextLine object. 332 */ 333 public void setSinkCompression( Compress sinkCompression ) 334 { 335 if( sinkCompression != null ) // leave disabled if null 336 this.sinkCompression = sinkCompression; 337 } 338 339 @Override 340 public void sourceConfInit( FlowProcess<JobConf> flowProcess, Tap<JobConf, RecordReader, OutputCollector> tap, JobConf conf ) 341 { 342 if( hasZippedFiles( FileInputFormat.getInputPaths( conf ) ) ) 343 throw new IllegalStateException( "cannot read zip files: " + Arrays.toString( FileInputFormat.getInputPaths( conf ) ) ); 344 345 conf.setInputFormat( TextInputFormat.class ); 346 } 347 348 private boolean hasZippedFiles( Path[] paths ) 349 { 350 if( paths == null || paths.length == 0 ) 351 return false; 352 353 boolean isZipped = paths[ 0 ].getName().endsWith( ".zip" ); 354 355 for( int i = 1; i < paths.length; i++ ) 356 { 357 if( isZipped != paths[ i ].getName().endsWith( ".zip" ) ) 358 throw new IllegalStateException( "cannot mix zipped and upzipped files" ); 359 } 360 361 return isZipped; 362 } 363 364 @Override 365 public void presentSourceFields( FlowProcess<JobConf> flowProcess, Tap tap, Fields fields ) 366 { 367 // do nothing to change TextLine state 368 } 369 370 @Override 371 public void presentSinkFields( FlowProcess<JobConf> flowProcess, Tap tap, Fields fields ) 372 { 373 // do nothing to change TextLine state 374 } 375 376 @Override 377 public void sinkConfInit( FlowProcess<JobConf> flowProcess, Tap<JobConf, RecordReader, OutputCollector> tap, JobConf conf ) 378 { 379 if( tap.getFullIdentifier( conf ).endsWith( ".zip" ) ) 380 throw new IllegalStateException( "cannot write zip files: " + FileOutputFormat.getOutputPath( conf ) ); 381 382 if( getSinkCompression() == Compress.DISABLE ) 383 conf.setBoolean( "mapred.output.compress", false ); 384 else if( getSinkCompression() == Compress.ENABLE ) 385 conf.setBoolean( "mapred.output.compress", true ); 386 387 conf.setOutputKeyClass( Text.class ); // be explicit 388 conf.setOutputValueClass( Text.class ); // be explicit 389 conf.setOutputFormat( TextOutputFormat.class ); 390 } 391 392 @Override 393 public void sourcePrepare( FlowProcess<JobConf> flowProcess, SourceCall<Object[], RecordReader> sourceCall ) 394 { 395 if( sourceCall.getContext() == null ) 396 sourceCall.setContext( new Object[ 3 ] ); 397 398 sourceCall.getContext()[ 0 ] = sourceCall.getInput().createKey(); 399 sourceCall.getContext()[ 1 ] = sourceCall.getInput().createValue(); 400 sourceCall.getContext()[ 2 ] = Charset.forName( charsetName ); 401 } 402 403 @Override 404 public boolean source( FlowProcess<JobConf> flowProcess, SourceCall<Object[], RecordReader> sourceCall ) throws IOException 405 { 406 if( !sourceReadInput( sourceCall ) ) 407 return false; 408 409 sourceHandleInput( sourceCall ); 410 411 return true; 412 } 413 414 private boolean sourceReadInput( SourceCall<Object[], RecordReader> sourceCall ) throws IOException 415 { 416 Object[] context = sourceCall.getContext(); 417 418 return sourceCall.getInput().next( context[ 0 ], context[ 1 ] ); 419 } 420 421 protected void sourceHandleInput( SourceCall<Object[], RecordReader> sourceCall ) 422 { 423 TupleEntry result = sourceCall.getIncomingEntry(); 424 425 int index = 0; 426 Object[] context = sourceCall.getContext(); 427 428 // coerce into canonical forms 429 if( getSourceFields().size() == 2 ) 430 result.setLong( index++, ( (LongWritable) context[ 0 ] ).get() ); 431 432 result.setString( index, makeEncodedString( context ) ); 433 } 434 435 protected String makeEncodedString( Object[] context ) 436 { 437 Text text = (Text) context[ 1 ]; 438 return new String( text.getBytes(), 0, text.getLength(), (Charset) context[ 2 ] ); 439 } 440 441 @Override 442 public void sourceCleanup( FlowProcess<JobConf> flowProcess, SourceCall<Object[], RecordReader> sourceCall ) 443 { 444 sourceCall.setContext( null ); 445 } 446 447 @Override 448 public void sinkPrepare( FlowProcess<JobConf> flowProcess, SinkCall<Object[], OutputCollector> sinkCall ) throws IOException 449 { 450 sinkCall.setContext( new Object[ 2 ] ); 451 452 sinkCall.getContext()[ 0 ] = new Text(); 453 sinkCall.getContext()[ 1 ] = Charset.forName( charsetName ); 454 } 455 456 @Override 457 public void sink( FlowProcess<JobConf> flowProcess, SinkCall<Object[], OutputCollector> sinkCall ) throws IOException 458 { 459 Text text = (Text) sinkCall.getContext()[ 0 ]; 460 Charset charset = (Charset) sinkCall.getContext()[ 1 ]; 461 String line = sinkCall.getOutgoingEntry().getTuple().toString(); 462 463 text.set( line.getBytes( charset ) ); 464 465 // it's ok to use NULL here so the collector does not write anything 466 sinkCall.getOutput().collect( null, text ); 467 } 468 }