001/* 002 * Copyright (c) 2016-2018 Chris K Wensel <chris@wensel.net>. All Rights Reserved. 003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 004 * 005 * Project and contact information: http://www.cascading.org/ 006 * 007 * This file is part of the Cascading project. 008 * 009 * Licensed under the Apache License, Version 2.0 (the "License"); 010 * you may not use this file except in compliance with the License. 011 * You may obtain a copy of the License at 012 * 013 * http://www.apache.org/licenses/LICENSE-2.0 014 * 015 * Unless required by applicable law or agreed to in writing, software 016 * distributed under the License is distributed on an "AS IS" BASIS, 017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 018 * See the License for the specific language governing permissions and 019 * limitations under the License. 020 */ 021 022package cascading.scheme.hadoop; 023 024import java.beans.ConstructorProperties; 025import java.io.IOException; 026import java.nio.charset.Charset; 027import java.util.regex.Pattern; 028 029import cascading.flow.FlowProcess; 030import cascading.flow.hadoop.util.HadoopUtil; 031import cascading.management.annotation.Property; 032import cascading.management.annotation.PropertyDescription; 033import cascading.management.annotation.Visibility; 034import cascading.scheme.Scheme; 035import cascading.scheme.SinkCall; 036import cascading.scheme.SourceCall; 037import cascading.tap.Tap; 038import cascading.tuple.Fields; 039import cascading.tuple.Tuple; 040import cascading.tuple.TupleEntry; 041import org.apache.hadoop.conf.Configuration; 042import org.apache.hadoop.io.LongWritable; 043import org.apache.hadoop.io.Text; 044import org.apache.hadoop.mapred.InputFormat; 045import org.apache.hadoop.mapred.JobConf; 046import org.apache.hadoop.mapred.OutputCollector; 047import org.apache.hadoop.mapred.OutputFormat; 048import org.apache.hadoop.mapred.RecordReader; 049import org.apache.hadoop.mapred.TextInputFormat; 050import org.apache.hadoop.mapred.TextOutputFormat; 051 052import static cascading.flow.hadoop.util.HadoopUtil.asJobConfInstance; 053 054/** 055 * A TextLine is a type of {@link cascading.scheme.Scheme} for plain text files. Files are broken into 056 * lines. Either line-feed or carriage-return are used to signal end of line. 057 * <p/> 058 * By default, this scheme returns a {@link Tuple} with two fields, "offset" and "line". 059 * <p/> 060 * Many of the constructors take both "sourceFields" and "sinkFields". sourceFields denote the field names 061 * to be used instead of the names "offset" and "line". sinkFields is a selector and is by default {@link Fields#ALL}. 062 * Any available field names can be given if only a subset of the incoming fields should be used. 063 * <p/> 064 * If a {@link Fields} instance is passed on the constructor as sourceFields having only one field, the return tuples 065 * will simply be the "line" value using the given field name. 066 * <p/> 067 * Note that TextLine will concatenate all the Tuple values for the selected fields with a TAB delimiter before 068 * writing out the line. 069 * <p/> 070 * Note sink compression is {@link Compress#DISABLE} by default. If {@code null} is passed to the constructor 071 * for the compression value, it will remain disabled. 072 * <p/> 073 * If any of the input files end with ".zip", an error will be thrown. 074 * * <p/> 075 * By default, all text is encoded/decoded as UTF-8. This can be changed via the {@code charsetName} constructor 076 * argument. 077 */ 078public class TextLine extends Scheme<Configuration, RecordReader, OutputCollector, Object[], Object[]> 079 { 080 public enum Compress 081 { 082 DEFAULT, ENABLE, DISABLE 083 } 084 085 public static final String DEFAULT_CHARSET = "UTF-8"; 086 087 /** Field serialVersionUID */ 088 private static final long serialVersionUID = 1L; 089 /** Field DEFAULT_SOURCE_FIELDS */ 090 public static final Fields DEFAULT_SOURCE_FIELDS = new Fields( "offset", "line" ).applyTypes( Long.TYPE, String.class ); 091 092 /** Field zipPattern */ 093 private static final Pattern zipPattern = Pattern.compile( "\\.[zZ][iI][pP]([ ,]|$)" ); 094 095 /** Field sinkCompression */ 096 Compress sinkCompression = Compress.DISABLE; 097 098 /** Field charsetName */ 099 String charsetName = DEFAULT_CHARSET; 100 101 /** 102 * Creates a new TextLine instance that sources "offset" and "line" fields, and sinks all incoming fields, where 103 * "offset" is the byte offset in the input file. 104 */ 105 public TextLine() 106 { 107 super( DEFAULT_SOURCE_FIELDS ); 108 } 109 110 /** 111 * Creates a new TextLine instance that sources "offset" and "line" fields, and sinks all incoming fields, where 112 * "offset" is the byte offset in the input file. 113 * 114 * @param numSinkParts of type int 115 */ 116 @ConstructorProperties({"numSinkParts"}) 117 public TextLine( int numSinkParts ) 118 { 119 super( DEFAULT_SOURCE_FIELDS, numSinkParts ); 120 } 121 122 /** 123 * Creates a new TextLine instance that sources "offset" and "line" fields, and sinks all incoming fields, where 124 * "offset" is the byte offset in the input file. 125 * 126 * @param sinkCompression of type Compress 127 */ 128 @ConstructorProperties({"sinkCompression"}) 129 public TextLine( Compress sinkCompression ) 130 { 131 super( DEFAULT_SOURCE_FIELDS ); 132 133 setSinkCompression( sinkCompression ); 134 } 135 136 /** 137 * Creates a new TextLine instance. If sourceFields has one field, only the text line will be returned in the 138 * subsequent tuples. 139 * 140 * @param sourceFields the source fields for this scheme 141 * @param sinkFields the sink fields for this scheme 142 */ 143 @ConstructorProperties({"sourceFields", "sinkFields"}) 144 public TextLine( Fields sourceFields, Fields sinkFields ) 145 { 146 super( sourceFields, sinkFields ); 147 148 verify( sourceFields ); 149 } 150 151 /** 152 * Creates a new TextLine instance. If sourceFields has one field, only the text line will be returned in the 153 * subsequent tuples. 154 * 155 * @param sourceFields the source fields for this scheme 156 * @param sinkFields the sink fields for this scheme 157 * @param charsetName of type String 158 */ 159 @ConstructorProperties({"sourceFields", "sinkFields", "charsetName"}) 160 public TextLine( Fields sourceFields, Fields sinkFields, String charsetName ) 161 { 162 super( sourceFields, sinkFields ); 163 164 // throws an exception if not found 165 setCharsetName( charsetName ); 166 167 verify( sourceFields ); 168 } 169 170 /** 171 * Creates a new TextLine instance. If sourceFields has one field, only the text line will be returned in the 172 * subsequent tuples. 173 * 174 * @param sourceFields the source fields for this scheme 175 * @param sinkFields the sink fields for this scheme 176 * @param numSinkParts of type int 177 */ 178 @ConstructorProperties({"sourceFields", "sinkFields", "numSinkParts"}) 179 public TextLine( Fields sourceFields, Fields sinkFields, int numSinkParts ) 180 { 181 super( sourceFields, sinkFields, numSinkParts ); 182 183 verify( sourceFields ); 184 } 185 186 /** 187 * Constructor TextLine creates a new TextLine instance. If sourceFields has one field, only the text line will be returned in the 188 * subsequent tuples. 189 * 190 * @param sourceFields of type Fields 191 * @param sinkFields of type Fields 192 * @param sinkCompression of type Compress 193 */ 194 @ConstructorProperties({"sourceFields", "sinkFields", "sinkCompression"}) 195 public TextLine( Fields sourceFields, Fields sinkFields, Compress sinkCompression ) 196 { 197 super( sourceFields, sinkFields ); 198 199 setSinkCompression( sinkCompression ); 200 201 verify( sourceFields ); 202 } 203 204 /** 205 * Constructor TextLine creates a new TextLine instance. If sourceFields has one field, only the text line will be returned in the 206 * subsequent tuples. 207 * 208 * @param sourceFields of type Fields 209 * @param sinkFields of type Fields 210 * @param sinkCompression of type Compress 211 * @param charsetName of type String 212 */ 213 @ConstructorProperties({"sourceFields", "sinkFields", "sinkCompression", "charsetName"}) 214 public TextLine( Fields sourceFields, Fields sinkFields, Compress sinkCompression, String charsetName ) 215 { 216 super( sourceFields, sinkFields ); 217 218 setSinkCompression( sinkCompression ); 219 220 // throws an exception if not found 221 setCharsetName( charsetName ); 222 223 verify( sourceFields ); 224 } 225 226 /** 227 * Constructor TextLine creates a new TextLine instance. If sourceFields has one field, only the text line will be returned in the 228 * subsequent tuples. 229 * 230 * @param sourceFields of type Fields 231 * @param sinkFields of type Fields 232 * @param sinkCompression of type Compress 233 * @param numSinkParts of type int 234 */ 235 @ConstructorProperties({"sourceFields", "sinkFields", "sinkCompression", "numSinkParts"}) 236 public TextLine( Fields sourceFields, Fields sinkFields, Compress sinkCompression, int numSinkParts ) 237 { 238 super( sourceFields, sinkFields, numSinkParts ); 239 240 setSinkCompression( sinkCompression ); 241 242 verify( sourceFields ); 243 } 244 245 /** 246 * Constructor TextLine creates a new TextLine instance. If sourceFields has one field, only the text line will be returned in the 247 * subsequent tuples. 248 * 249 * @param sourceFields of type Fields 250 * @param sinkFields of type Fields 251 * @param sinkCompression of type Compress 252 * @param numSinkParts of type int 253 * @param charsetName of type String 254 */ 255 @ConstructorProperties({"sourceFields", "sinkFields", "sinkCompression", "numSinkParts", "charsetName"}) 256 public TextLine( Fields sourceFields, Fields sinkFields, Compress sinkCompression, int numSinkParts, String charsetName ) 257 { 258 super( sourceFields, sinkFields, numSinkParts ); 259 260 setSinkCompression( sinkCompression ); 261 262 // throws an exception if not found 263 setCharsetName( charsetName ); 264 265 verify( sourceFields ); 266 } 267 268 /** 269 * Creates a new TextLine instance. If sourceFields has one field, only the text line will be returned in the 270 * subsequent tuples. 271 * 272 * @param sourceFields the source fields for this scheme 273 */ 274 @ConstructorProperties({"sourceFields"}) 275 public TextLine( Fields sourceFields ) 276 { 277 super( sourceFields ); 278 279 verify( sourceFields ); 280 } 281 282 /** 283 * Creates a new TextLine instance. If sourceFields has one field, only the text line will be returned in the 284 * subsequent tuples. 285 * 286 * @param sourceFields the source fields for this scheme 287 * @param charsetName of type String 288 */ 289 @ConstructorProperties({"sourceFields", "charsetName"}) 290 public TextLine( Fields sourceFields, String charsetName ) 291 { 292 super( sourceFields ); 293 294 // throws an exception if not found 295 setCharsetName( charsetName ); 296 297 verify( sourceFields ); 298 } 299 300 /** 301 * Creates a new TextLine instance. If sourceFields has one field, only the text line will be returned in the 302 * subsequent tuples. The resulting data set will have numSinkParts. 303 * 304 * @param sourceFields the source fields for this scheme 305 * @param numSinkParts of type int 306 */ 307 @ConstructorProperties({"sourceFields", "numSinkParts"}) 308 public TextLine( Fields sourceFields, int numSinkParts ) 309 { 310 super( sourceFields, numSinkParts ); 311 312 verify( sourceFields ); 313 } 314 315 protected void setCharsetName( String charsetName ) 316 { 317 if( charsetName != null ) 318 this.charsetName = charsetName; 319 320 Charset.forName( this.charsetName ); 321 } 322 323 @Property(name = "charset", visibility = Visibility.PUBLIC) 324 @PropertyDescription(value = "character set used in this scheme.") 325 public String getCharsetName() 326 { 327 return charsetName; 328 } 329 330 protected void verify( Fields sourceFields ) 331 { 332 if( sourceFields.size() < 1 || sourceFields.size() > 2 ) 333 throw new IllegalArgumentException( "this scheme requires either one or two source fields, given [" + sourceFields + "]" ); 334 } 335 336 /** 337 * Method getSinkCompression returns the sinkCompression of this TextLine object. 338 * 339 * @return the sinkCompression (type Compress) of this TextLine object. 340 */ 341 @Property(name = "sinkCompression", visibility = Visibility.PUBLIC) 342 @PropertyDescription(value = "The compression of the scheme when used in a sink.") 343 public Compress getSinkCompression() 344 { 345 return sinkCompression; 346 } 347 348 /** 349 * Method setSinkCompression sets the sinkCompression of this TextLine object. If null, compression will remain disabled. 350 * 351 * @param sinkCompression the sinkCompression of this TextLine object. 352 */ 353 public void setSinkCompression( Compress sinkCompression ) 354 { 355 if( sinkCompression != null ) // leave disabled if null 356 this.sinkCompression = sinkCompression; 357 } 358 359 @Override 360 public void sourceConfInit( FlowProcess<? extends Configuration> flowProcess, Tap<Configuration, RecordReader, OutputCollector> tap, Configuration conf ) 361 { 362 JobConf jobConf = asJobConfInstance( conf ); 363 String paths = jobConf.get( "mapred.input.dir", "" ); 364 365 if( hasZippedFiles( paths ) ) 366 throw new IllegalStateException( "cannot read zip files: " + paths ); 367 368 conf.setBoolean( "mapred.mapper.new-api", false ); 369 conf.setClass( "mapred.input.format.class", TextInputFormat.class, InputFormat.class ); 370 } 371 372 private boolean hasZippedFiles( String paths ) 373 { 374 if( paths == null || paths.length() == 0 ) 375 return false; 376 377 return zipPattern.matcher( paths ).find(); 378 } 379 380 @Override 381 public void presentSourceFields( FlowProcess<? extends Configuration> flowProcess, Tap tap, Fields fields ) 382 { 383 // do nothing to change TextLine state 384 } 385 386 @Override 387 public void presentSinkFields( FlowProcess<? extends Configuration> flowProcess, Tap tap, Fields fields ) 388 { 389 // do nothing to change TextLine state 390 } 391 392 @Override 393 public void sinkConfInit( FlowProcess<? extends Configuration> flowProcess, Tap<Configuration, RecordReader, OutputCollector> tap, Configuration conf ) 394 { 395 if( tap.getFullIdentifier( conf ).endsWith( ".zip" ) ) 396 throw new IllegalStateException( "cannot write zip files: " + HadoopUtil.getOutputPath( conf ) ); 397 398 conf.setBoolean( "mapred.mapper.new-api", false ); 399 400 if( getSinkCompression() == Compress.DISABLE ) 401 conf.setBoolean( "mapred.output.compress", false ); 402 else if( getSinkCompression() == Compress.ENABLE ) 403 conf.setBoolean( "mapred.output.compress", true ); 404 405 conf.setClass( "mapred.output.key.class", Text.class, Object.class ); 406 conf.setClass( "mapred.output.value.class", Text.class, Object.class ); 407 conf.setClass( "mapred.output.format.class", TextOutputFormat.class, OutputFormat.class ); 408 } 409 410 @Override 411 public void sourcePrepare( FlowProcess<? extends Configuration> flowProcess, SourceCall<Object[], RecordReader> sourceCall ) 412 { 413 if( sourceCall.getContext() == null ) 414 sourceCall.setContext( new Object[ 3 ] ); 415 416 sourceCall.getContext()[ 0 ] = sourceCall.getInput().createKey(); 417 sourceCall.getContext()[ 1 ] = sourceCall.getInput().createValue(); 418 sourceCall.getContext()[ 2 ] = Charset.forName( charsetName ); 419 } 420 421 @Override 422 public boolean source( FlowProcess<? extends Configuration> flowProcess, SourceCall<Object[], RecordReader> sourceCall ) throws IOException 423 { 424 if( !sourceReadInput( sourceCall ) ) 425 return false; 426 427 sourceHandleInput( sourceCall ); 428 429 return true; 430 } 431 432 private boolean sourceReadInput( SourceCall<Object[], RecordReader> sourceCall ) throws IOException 433 { 434 Object[] context = sourceCall.getContext(); 435 436 return sourceCall.getInput().next( context[ 0 ], context[ 1 ] ); 437 } 438 439 protected void sourceHandleInput( SourceCall<Object[], RecordReader> sourceCall ) 440 { 441 TupleEntry result = sourceCall.getIncomingEntry(); 442 443 int index = 0; 444 Object[] context = sourceCall.getContext(); 445 446 // coerce into canonical forms 447 if( getSourceFields().size() == 2 ) 448 result.setLong( index++, ( (LongWritable) context[ 0 ] ).get() ); 449 450 result.setString( index, makeEncodedString( context ) ); 451 } 452 453 protected String makeEncodedString( Object[] context ) 454 { 455 Text text = (Text) context[ 1 ]; 456 return new String( text.getBytes(), 0, text.getLength(), (Charset) context[ 2 ] ); 457 } 458 459 @Override 460 public void sourceCleanup( FlowProcess<? extends Configuration> flowProcess, SourceCall<Object[], RecordReader> sourceCall ) 461 { 462 sourceCall.setContext( null ); 463 } 464 465 @Override 466 public void sinkPrepare( FlowProcess<? extends Configuration> flowProcess, SinkCall<Object[], OutputCollector> sinkCall ) throws IOException 467 { 468 sinkCall.setContext( new Object[ 2 ] ); 469 470 sinkCall.getContext()[ 0 ] = new Text(); 471 sinkCall.getContext()[ 1 ] = Charset.forName( charsetName ); 472 } 473 474 @Override 475 public void sink( FlowProcess<? extends Configuration> flowProcess, SinkCall<Object[], OutputCollector> sinkCall ) throws IOException 476 { 477 Text text = (Text) sinkCall.getContext()[ 0 ]; 478 Charset charset = (Charset) sinkCall.getContext()[ 1 ]; 479 String line = sinkCall.getOutgoingEntry().getTuple().toString(); 480 481 text.set( line.getBytes( charset ) ); 482 483 // it's ok to use NULL here so the collector does not write anything 484 sinkCall.getOutput().collect( null, text ); 485 } 486 }