001 /* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.scheme.local; 022 023 import java.beans.ConstructorProperties; 024 import java.io.IOException; 025 import java.io.InputStream; 026 import java.io.InputStreamReader; 027 import java.io.LineNumberReader; 028 import java.io.OutputStream; 029 import java.io.OutputStreamWriter; 030 import java.io.PrintWriter; 031 import java.io.UnsupportedEncodingException; 032 import java.nio.charset.Charset; 033 import java.util.Properties; 034 035 import cascading.flow.FlowProcess; 036 import cascading.management.annotation.Property; 037 import cascading.management.annotation.PropertyDescription; 038 import cascading.management.annotation.Visibility; 039 import cascading.scheme.Scheme; 040 import cascading.scheme.SinkCall; 041 import cascading.scheme.SourceCall; 042 import cascading.scheme.util.DelimitedParser; 043 import cascading.tap.CompositeTap; 044 import cascading.tap.Tap; 045 import cascading.tap.TapException; 046 import cascading.tap.local.FileTap; 047 import cascading.tuple.Fields; 048 import cascading.tuple.Tuple; 049 import cascading.tuple.TupleEntry; 050 import cascading.tuple.util.TupleViews; 051 052 /** 053 * Class TextDelimited provides direct support for delimited text files, like 054 * TAB (\t) or COMMA (,) delimited files. It also optionally allows for quoted values. 055 * <p/> 056 * TextDelimited may also be used to skip the "header" in a file, where the header is defined as the very first line 057 * in every input file. That is, if the byte offset of the current line from the input is zero (0), that line will 058 * be skipped. 059 * <p/> 060 * It is assumed if sink/source {@code fields} is set to either {@link Fields#ALL} or {@link Fields#UNKNOWN} and 061 * {@code skipHeader} or {@code hasHeader} is {@code true}, the field names will be retrieved from the header of the 062 * file and used during planning. The header will parsed with the same rules as the body of the file. 063 * <p/> 064 * By default headers are not skipped. 065 * <p/> 066 * TextDelimited may also be used to write a "header" in a file. The fields names for the header are taken directly 067 * from the declared fields. Or if the declared fields are {@link Fields#ALL} or {@link Fields#UNKNOWN}, the 068 * resolved field names will be used, if any. 069 * <p/> 070 * By default headers are not written. 071 * <p/> 072 * If {@code hasHeaders} is set to {@code true} on a constructor, both {@code skipHeader} and {@code writeHeader} will 073 * be set to {@code true}. 074 * <p/> 075 * By default this {@link cascading.scheme.Scheme} is both {@code strict} and {@code safe}. 076 * <p/> 077 * Strict meaning if a line of text does not parse into the expected number of fields, this class will throw a 078 * {@link TapException}. If strict is {@code false}, then {@link Tuple} will be returned with {@code null} values 079 * for the missing fields. 080 * <p/> 081 * Safe meaning if a field cannot be coerced into an expected type, a {@code null} will be used for the value. 082 * If safe is {@code false}, a {@link TapException} will be thrown. 083 * <p/> 084 * Also by default, {@code quote} strings are not searched for to improve processing speed. If a file is 085 * COMMA delimited but may have COMMA's in a value, the whole value should be surrounded by the quote string, typically 086 * double quotes ({@literal "}). 087 * <p/> 088 * Note all empty fields in a line will be returned as {@code null} unless coerced into a new type. 089 * <p/> 090 * This Scheme may source/sink {@link Fields#ALL}, when given on the constructor the new instance will automatically 091 * default to strict == false as the number of fields parsed are arbitrary or unknown. A type array may not be given 092 * either, so all values will be returned as Strings. 093 * <p/> 094 * By default, all text is encoded/decoded as UTF-8. This can be changed via the {@code charsetName} constructor 095 * argument. 096 * <p/> 097 * To override field and line parsing behaviors, sub-class {@link DelimitedParser} or provide a 098 * {@link cascading.scheme.util.FieldTypeResolver} implementation. 099 * <p/> 100 * Note that there should be no expectation that TextDelimited, or specifically {@link DelimitedParser}, can handle 101 * all delimited and quoted combinations reliably. Attempting to do so would impair its performance and maintainability. 102 * <p/> 103 * Further, it can be safely said any corrupted files will not be supported for obvious reasons. Corrupted files may 104 * result in exceptions or could cause edge cases in the underlying java regular expression engine. 105 * <p/> 106 * A large part of Cascading was designed to help users cleans data. Thus the recommendation is to create Flows that 107 * are responsible for cleansing large data-sets when faced with the problem 108 * <p/> 109 * DelimitedParser maybe sub-classed and extended if necessary. 110 * 111 * @see TextLine 112 */ 113 public class TextDelimited extends Scheme<Properties, InputStream, OutputStream, LineNumberReader, PrintWriter> 114 { 115 public static final String DEFAULT_CHARSET = "UTF-8"; 116 117 private final boolean skipHeader; 118 private final boolean writeHeader; 119 private final DelimitedParser delimitedParser; 120 private String charsetName = DEFAULT_CHARSET; 121 122 /** 123 * Constructor TextDelimited creates a new TextDelimited instance sourcing {@link Fields#UNKNOWN}, sinking 124 * {@link Fields#ALL} and using TAB as the default delimiter. 125 * <p/> 126 * Use this constructor if the source and sink fields will be resolved during planning, for example, when using 127 * with a {@link cascading.pipe.Checkpoint} Tap. 128 */ 129 public TextDelimited() 130 { 131 this( Fields.ALL ); 132 } 133 134 /** 135 * Constructor TextDelimited creates a new TextDelimited instance sourcing {@link Fields#UNKNOWN}, sinking 136 * {@link Fields#ALL} and using TAB as the default delimiter. 137 * <p/> 138 * Use this constructor if the source and sink fields will be resolved during planning, for example, when using 139 * with a {@link cascading.pipe.Checkpoint} Tap. 140 * 141 * @param hasHeader 142 * @param delimiter 143 */ 144 @ConstructorProperties({"hasHeader", "delimiter"}) 145 public TextDelimited( boolean hasHeader, String delimiter ) 146 { 147 this( Fields.ALL, hasHeader, delimiter, null, (Class[]) null ); 148 } 149 150 /** 151 * Constructor TextDelimited creates a new TextDelimited instance sourcing {@link Fields#UNKNOWN}, sinking 152 * {@link Fields#ALL} and using TAB as the default delimiter. 153 * <p/> 154 * Use this constructor if the source and sink fields will be resolved during planning, for example, when using 155 * with a {@link cascading.pipe.Checkpoint} Tap. 156 * 157 * @param hasHeader 158 * @param delimiter 159 * @param quote 160 */ 161 @ConstructorProperties({"hasHeader", "delimiter", "quote"}) 162 public TextDelimited( boolean hasHeader, String delimiter, String quote ) 163 { 164 this( Fields.ALL, hasHeader, delimiter, quote, (Class[]) null ); 165 } 166 167 /** 168 * Constructor TextDelimited creates a new TextDelimited instance sourcing {@link Fields#UNKNOWN}, sinking 169 * {@link Fields#ALL} and using the given delimitedParser instance for parsing. 170 * <p/> 171 * Use this constructor if the source and sink fields will be resolved during planning, for example, when using 172 * with a {@link cascading.pipe.Checkpoint} Tap. 173 * 174 * @param hasHeader 175 * @param delimitedParser 176 */ 177 @ConstructorProperties({"hasHeader", "delimitedParser"}) 178 public TextDelimited( boolean hasHeader, DelimitedParser delimitedParser ) 179 { 180 this( Fields.ALL, hasHeader, hasHeader, delimitedParser ); 181 } 182 183 /** 184 * Constructor TextDelimited creates a new TextDelimited instance sourcing {@link Fields#UNKNOWN}, sinking 185 * {@link Fields#ALL} and using the given delimitedParser instance for parsing. 186 * <p/> 187 * Use this constructor if the source and sink fields will be resolved during planning, for example, when using 188 * with a {@link cascading.pipe.Checkpoint} Tap. 189 * <p/> 190 * This constructor will set {@code skipHeader} and {@code writeHeader} values to true. 191 * 192 * @param delimitedParser 193 */ 194 @ConstructorProperties({"delimitedParser"}) 195 public TextDelimited( DelimitedParser delimitedParser ) 196 { 197 this( Fields.ALL, true, true, delimitedParser ); 198 } 199 200 /** 201 * Constructor TextDelimited creates a new TextDelimited instance with TAB as the default delimiter. 202 * 203 * @param fields of type Fields 204 */ 205 @ConstructorProperties({"fields"}) 206 public TextDelimited( Fields fields ) 207 { 208 this( fields, "\t", null, null ); 209 } 210 211 /** 212 * Constructor TextDelimited creates a new TextDelimited instance. 213 * 214 * @param fields of type Fields 215 * @param delimiter of type String 216 */ 217 @ConstructorProperties({"fields", "delimiter"}) 218 public TextDelimited( Fields fields, String delimiter ) 219 { 220 this( fields, delimiter, null, null ); 221 } 222 223 /** 224 * Constructor TextDelimited creates a new TextDelimited instance. 225 * 226 * @param fields of type Fields 227 * @param hasHeader of type boolean 228 * @param delimiter of type String 229 */ 230 @ConstructorProperties({"fields", "hasHeader", "delimiter"}) 231 public TextDelimited( Fields fields, boolean hasHeader, String delimiter ) 232 { 233 this( fields, hasHeader, hasHeader, delimiter, null, null ); 234 } 235 236 /** 237 * Constructor TextDelimited creates a new TextDelimited instance. 238 * 239 * @param fields of type Fields 240 * @param skipHeader of type boolean 241 * @param delimiter of type String 242 */ 243 @ConstructorProperties({"fields", "skipHeader", "writeHeader", "delimiter"}) 244 public TextDelimited( Fields fields, boolean skipHeader, boolean writeHeader, String delimiter ) 245 { 246 this( fields, skipHeader, writeHeader, delimiter, null, null ); 247 } 248 249 /** 250 * Constructor TextDelimited creates a new TextDelimited instance. 251 * 252 * @param fields of type Fields 253 * @param delimiter of type String 254 * @param types of type Class[] 255 */ 256 @ConstructorProperties({"fields", "delimiter", "types"}) 257 public TextDelimited( Fields fields, String delimiter, Class[] types ) 258 { 259 this( fields, delimiter, null, types ); 260 } 261 262 /** 263 * Constructor TextDelimited creates a new TextDelimited instance. 264 * 265 * @param fields of type Fields 266 * @param hasHeader of type boolean 267 * @param delimiter of type String 268 * @param types of type Class[] 269 */ 270 @ConstructorProperties({"fields", "hasHeader", "delimiter", "types"}) 271 public TextDelimited( Fields fields, boolean hasHeader, String delimiter, Class[] types ) 272 { 273 this( fields, hasHeader, hasHeader, delimiter, null, types ); 274 } 275 276 /** 277 * Constructor TextDelimited creates a new TextDelimited instance. 278 * 279 * @param fields of type Fields 280 * @param skipHeader of type boolean 281 * @param writeHeader of type boolean 282 * @param delimiter of type String 283 * @param types of type Class[] 284 */ 285 @ConstructorProperties({"fields", "skipHeader", "writeHeader", "delimiter", "types"}) 286 public TextDelimited( Fields fields, boolean skipHeader, boolean writeHeader, String delimiter, Class[] types ) 287 { 288 this( fields, skipHeader, writeHeader, delimiter, null, types ); 289 } 290 291 /** 292 * Constructor TextDelimited creates a new TextDelimited instance. 293 * 294 * @param fields of type Fields 295 * @param delimiter of type String 296 * @param quote of type String 297 * @param types of type Class[] 298 */ 299 @ConstructorProperties({"fields", "delimiter", "quote", "types"}) 300 public TextDelimited( Fields fields, String delimiter, String quote, Class[] types ) 301 { 302 this( fields, false, delimiter, quote, types ); 303 } 304 305 /** 306 * Constructor TextDelimited creates a new TextDelimited instance. 307 * 308 * @param fields of type Fields 309 * @param hasHeader of type boolean 310 * @param delimiter of type String 311 * @param quote of type String 312 * @param types of type Class[] 313 */ 314 @ConstructorProperties({"fields", "hasHeader", "delimiter", "quote", "types"}) 315 public TextDelimited( Fields fields, boolean hasHeader, String delimiter, String quote, Class[] types ) 316 { 317 this( fields, hasHeader, hasHeader, delimiter, quote, types, true ); 318 } 319 320 /** 321 * Constructor TextDelimited creates a new TextDelimited instance. 322 * 323 * @param fields of type Fields 324 * @param skipHeader of type boolean 325 * @param writeHeader of type boolean 326 * @param delimiter of type String 327 * @param quote of type String 328 * @param types of type Class[] 329 */ 330 @ConstructorProperties({"fields", "skipHeader", "writeHeader", "delimiter", "quote", "types"}) 331 public TextDelimited( Fields fields, boolean skipHeader, boolean writeHeader, String delimiter, String quote, Class[] types ) 332 { 333 this( fields, skipHeader, writeHeader, delimiter, quote, types, true ); 334 } 335 336 /** 337 * Constructor TextDelimited creates a new TextDelimited instance. 338 * 339 * @param fields of type Fields 340 * @param delimiter of type String 341 * @param quote of type String 342 * @param types of type Class[] 343 * @param safe of type boolean 344 */ 345 @ConstructorProperties({"fields", "delimiter", "quote", "types", "safe"}) 346 public TextDelimited( Fields fields, String delimiter, String quote, Class[] types, boolean safe ) 347 { 348 this( fields, false, delimiter, quote, types, safe ); 349 } 350 351 /** 352 * Constructor TextDelimited creates a new TextDelimited instance. 353 * 354 * @param fields of type Fields 355 * @param hasHeader of type boolean 356 * @param delimiter of type String 357 * @param quote of type String 358 * @param types of type Class[] 359 * @param safe of type boolean 360 */ 361 @ConstructorProperties({"fields", "hasHeader", "delimiter", "quote", "types", "safe"}) 362 public TextDelimited( Fields fields, boolean hasHeader, String delimiter, String quote, Class[] types, boolean safe ) 363 { 364 this( fields, hasHeader, hasHeader, delimiter, true, quote, types, safe ); 365 } 366 367 /** 368 * Constructor TextDelimited creates a new TextDelimited instance. 369 * 370 * @param fields of type Fields 371 * @param hasHeader of type boolean 372 * @param delimiter of type String 373 * @param quote of type String 374 * @param types of type Class[] 375 * @param safe of type boolean 376 * @param charsetName of type String 377 */ 378 @ConstructorProperties({"fields", "hasHeader", "delimiter", "quote", "types", "safe", "charsetName"}) 379 public TextDelimited( Fields fields, boolean hasHeader, String delimiter, String quote, Class[] types, boolean safe, String charsetName ) 380 { 381 this( fields, hasHeader, hasHeader, delimiter, true, quote, types, safe, charsetName ); 382 } 383 384 /** 385 * Constructor TextDelimited creates a new TextDelimited instance. 386 * 387 * @param fields of type Fields 388 * @param skipHeader of type boolean 389 * @param writeHeader of type boolean 390 * @param delimiter of type String 391 * @param quote of type String 392 * @param types of type Class[] 393 * @param safe of type boolean 394 */ 395 @ConstructorProperties({"fields", "skipHeader", "writeHeader", "delimiter", "quote", "types", "safe"}) 396 public TextDelimited( Fields fields, boolean skipHeader, boolean writeHeader, String delimiter, String quote, Class[] types, boolean safe ) 397 { 398 this( fields, skipHeader, writeHeader, delimiter, true, quote, types, safe ); 399 } 400 401 /** 402 * Constructor TextDelimited creates a new TextDelimited instance. 403 * 404 * @param fields of type Fields 405 * @param delimiter of type String 406 * @param quote of type String 407 */ 408 @ConstructorProperties({"fields", "delimiter", "quote"}) 409 public TextDelimited( Fields fields, String delimiter, String quote ) 410 { 411 this( fields, false, delimiter, quote, null, true ); 412 } 413 414 /** 415 * Constructor TextDelimited creates a new TextDelimited instance. 416 * 417 * @param fields of type Fields 418 * @param hasHeader of type boolean 419 * @param delimiter of type String 420 * @param quote of type String 421 */ 422 @ConstructorProperties({"fields", "hasHeader", "delimiter", "quote"}) 423 public TextDelimited( Fields fields, boolean hasHeader, String delimiter, String quote ) 424 { 425 this( fields, hasHeader, delimiter, quote, null, true ); 426 } 427 428 /** 429 * Constructor TextDelimited creates a new TextDelimited instance. 430 * 431 * @param fields of type Fields 432 * @param hasHeader of type boolean 433 * @param delimiter of type String 434 * @param quote of type String 435 * @param charsetName of type String 436 */ 437 @ConstructorProperties({"fields", "hasHeader", "delimiter", "quote", "charsetName"}) 438 public TextDelimited( Fields fields, boolean hasHeader, String delimiter, String quote, String charsetName ) 439 { 440 this( fields, hasHeader, delimiter, quote, null, true, charsetName ); 441 } 442 443 /** 444 * Constructor TextDelimited creates a new TextDelimited instance. 445 * 446 * @param fields of type Fields 447 * @param skipHeader of type boolean 448 * @param writeHeader of type boolean 449 * @param delimiter of type String 450 * @param strict of type boolean 451 * @param quote of type String 452 * @param types of type Class[] 453 * @param safe of type boolean 454 */ 455 @ConstructorProperties({"fields", "skipHeader", "writeHeader", "delimiter", "strict", "quote", "types", "safe"}) 456 public TextDelimited( Fields fields, boolean skipHeader, boolean writeHeader, String delimiter, boolean strict, String quote, Class[] types, boolean safe ) 457 { 458 this( fields, skipHeader, writeHeader, delimiter, strict, quote, types, safe, DEFAULT_CHARSET ); 459 } 460 461 /** 462 * Constructor TextDelimited creates a new TextDelimited instance. 463 * 464 * @param fields of type Fields 465 * @param skipHeader of type boolean 466 * @param writeHeader of type boolean 467 * @param delimiter of type String 468 * @param strict of type boolean 469 * @param quote of type String 470 * @param types of type Class[] 471 * @param safe of type boolean 472 * @param charsetName of type String 473 */ 474 @ConstructorProperties({"fields", "skipHeader", "writeHeader", "delimiter", "strict", "quote", "types", "safe", 475 "charsetName"}) 476 public TextDelimited( Fields fields, boolean skipHeader, boolean writeHeader, String delimiter, boolean strict, String quote, Class[] types, boolean safe, String charsetName ) 477 { 478 this( fields, skipHeader, writeHeader, charsetName, new DelimitedParser( delimiter, quote, types, strict, safe ) ); 479 } 480 481 /** 482 * Constructor TextDelimited creates a new TextDelimited instance. 483 * 484 * @param fields of type Fields 485 * @param writeHeader of type boolean 486 * @param delimitedParser of type DelimitedParser 487 */ 488 @ConstructorProperties({"fields", "skipHeader", "writeHeader", "delimitedParser"}) 489 public TextDelimited( Fields fields, boolean skipHeader, boolean writeHeader, DelimitedParser delimitedParser ) 490 { 491 this( fields, skipHeader, writeHeader, null, delimitedParser ); 492 } 493 494 /** 495 * Constructor TextDelimited creates a new TextDelimited instance. 496 * 497 * @param fields of type Fields 498 * @param hasHeader of type boolean 499 * @param delimitedParser of type DelimitedParser 500 */ 501 @ConstructorProperties({"fields", "hasHeader", "delimitedParser"}) 502 public TextDelimited( Fields fields, boolean hasHeader, DelimitedParser delimitedParser ) 503 { 504 this( fields, hasHeader, hasHeader, null, delimitedParser ); 505 } 506 507 /** 508 * Constructor TextDelimited creates a new TextDelimited instance. 509 * 510 * @param fields of type Fields 511 * @param writeHeader of type boolean 512 * @param charsetName of type String 513 * @param delimitedParser of type DelimitedParser 514 */ 515 @ConstructorProperties({"fields", "skipHeader", "writeHeader", "charsetName", "delimitedParser"}) 516 public TextDelimited( Fields fields, boolean skipHeader, boolean writeHeader, String charsetName, DelimitedParser delimitedParser ) 517 { 518 super( fields, fields ); 519 520 this.delimitedParser = delimitedParser; 521 522 // normalizes ALL and UNKNOWN 523 // calls reset on delimitedParser 524 setSourceFields( fields ); 525 setSinkFields( fields ); 526 527 this.skipHeader = skipHeader; 528 this.writeHeader = writeHeader; 529 530 if( charsetName != null ) 531 this.charsetName = charsetName; 532 533 // throws an exception if not found 534 Charset.forName( this.charsetName ); 535 } 536 537 @Property(name = "charset", visibility = Visibility.PUBLIC) 538 @PropertyDescription("character set used.") 539 public String getCharsetName() 540 { 541 return charsetName; 542 } 543 544 /** 545 * Method getDelimiter returns the delimiter used to parse fields from the current line of text. 546 * 547 * @return a String 548 */ 549 @Property(name = "delimiter", visibility = Visibility.PUBLIC) 550 @PropertyDescription("The delimiter used to separate fields.") 551 public String getDelimiter() 552 { 553 return delimitedParser.getDelimiter(); 554 } 555 556 /** 557 * Method getQuote returns the quote string, if any, used to encapsulate each field in a line to delimited text. 558 * 559 * @return a String 560 */ 561 @Property(name = "quote", visibility = Visibility.PUBLIC) 562 @PropertyDescription("The string used for quoting.") 563 public String getQuote() 564 { 565 return delimitedParser.getQuote(); 566 } 567 568 public LineNumberReader createInput( InputStream inputStream ) 569 { 570 try 571 { 572 return new LineNumberReader( new InputStreamReader( inputStream, charsetName ) ); 573 } 574 catch( UnsupportedEncodingException exception ) 575 { 576 throw new TapException( exception ); 577 } 578 } 579 580 public PrintWriter createOutput( OutputStream outputStream ) 581 { 582 try 583 { 584 return new PrintWriter( new OutputStreamWriter( outputStream, charsetName ) ); 585 } 586 catch( UnsupportedEncodingException exception ) 587 { 588 throw new TapException( exception ); 589 } 590 } 591 592 @Override 593 public void setSinkFields( Fields sinkFields ) 594 { 595 super.setSourceFields( sinkFields ); 596 super.setSinkFields( sinkFields ); 597 598 if( delimitedParser != null ) 599 delimitedParser.reset( getSourceFields(), getSinkFields() ); 600 } 601 602 @Override 603 public void setSourceFields( Fields sourceFields ) 604 { 605 super.setSourceFields( sourceFields ); 606 super.setSinkFields( sourceFields ); 607 608 if( delimitedParser != null ) 609 delimitedParser.reset( getSourceFields(), getSinkFields() ); 610 } 611 612 @Override 613 public boolean isSymmetrical() 614 { 615 return super.isSymmetrical() && skipHeader == writeHeader; 616 } 617 618 @Override 619 public Fields retrieveSourceFields( FlowProcess<Properties> process, Tap tap ) 620 { 621 if( !skipHeader || !getSourceFields().isUnknown() ) 622 return getSourceFields(); 623 624 // no need to open them all 625 if( tap instanceof CompositeTap ) 626 tap = (Tap) ( (CompositeTap) tap ).getChildTaps().next(); 627 628 tap = new FileTap( new TextLine( new Fields( "line" ), charsetName ), tap.getIdentifier() ); 629 630 setSourceFields( delimitedParser.parseFirstLine( process, tap ) ); 631 632 return getSourceFields(); 633 } 634 635 @Override 636 public void presentSourceFields( FlowProcess<Properties> process, Tap tap, Fields fields ) 637 { 638 // do nothing 639 } 640 641 @Override 642 public void presentSinkFields( FlowProcess<Properties> flowProcess, Tap tap, Fields fields ) 643 { 644 if( writeHeader ) 645 presentSinkFieldsInternal( fields ); 646 } 647 648 @Override 649 public void sourceConfInit( FlowProcess<Properties> flowProcess, Tap<Properties, InputStream, OutputStream> tap, Properties conf ) 650 { 651 } 652 653 @Override 654 public void sourcePrepare( FlowProcess<Properties> flowProcess, SourceCall<LineNumberReader, InputStream> sourceCall ) throws IOException 655 { 656 sourceCall.setContext( createInput( sourceCall.getInput() ) ); 657 658 sourceCall.getIncomingEntry().setTuple( TupleViews.createObjectArray() ); 659 } 660 661 @Override 662 public boolean source( FlowProcess<Properties> flowProcess, SourceCall<LineNumberReader, InputStream> sourceCall ) throws IOException 663 { 664 String line = sourceCall.getContext().readLine(); 665 666 if( line == null ) 667 return false; 668 669 if( skipHeader && sourceCall.getContext().getLineNumber() == 1 ) // todo: optimize this away 670 line = sourceCall.getContext().readLine(); 671 672 if( line == null ) 673 return false; 674 675 Object[] split = delimitedParser.parseLine( line ); 676 677 // assumption it is better to re-use than to construct new 678 Tuple tuple = sourceCall.getIncomingEntry().getTuple(); 679 680 TupleViews.reset( tuple, split ); 681 682 return true; 683 } 684 685 @Override 686 public void sourceCleanup( FlowProcess<Properties> flowProcess, SourceCall<LineNumberReader, InputStream> sourceCall ) throws IOException 687 { 688 sourceCall.setContext( null ); 689 } 690 691 @Override 692 public void sinkConfInit( FlowProcess<Properties> flowProcess, Tap<Properties, InputStream, OutputStream> tap, Properties conf ) 693 { 694 } 695 696 @Override 697 public void sinkPrepare( FlowProcess<Properties> flowProcess, SinkCall<PrintWriter, OutputStream> sinkCall ) 698 { 699 sinkCall.setContext( createOutput( sinkCall.getOutput() ) ); 700 701 if( writeHeader ) 702 { 703 Fields fields = sinkCall.getOutgoingEntry().getFields(); 704 delimitedParser.joinFirstLine( fields, sinkCall.getContext() ); 705 706 sinkCall.getContext().println(); 707 } 708 } 709 710 @Override 711 public void sink( FlowProcess<Properties> flowProcess, SinkCall<PrintWriter, OutputStream> sinkCall ) throws IOException 712 { 713 TupleEntry tupleEntry = sinkCall.getOutgoingEntry(); 714 715 Iterable<String> strings = tupleEntry.asIterableOf( String.class ); 716 717 delimitedParser.joinLine( strings, sinkCall.getContext() ); 718 719 sinkCall.getContext().println(); 720 } 721 722 @Override 723 public void sinkCleanup( FlowProcess<Properties> flowProcess, SinkCall<PrintWriter, OutputStream> sinkCall ) 724 { 725 sinkCall.getContext().flush(); 726 sinkCall.setContext( null ); 727 } 728 }