001 /* 002 * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.scheme.hadoop; 022 023 import java.beans.ConstructorProperties; 024 import java.io.IOException; 025 import java.nio.charset.Charset; 026 027 import cascading.flow.FlowProcess; 028 import cascading.scheme.SinkCall; 029 import cascading.scheme.SourceCall; 030 import cascading.scheme.util.DelimitedParser; 031 import cascading.tap.CompositeTap; 032 import cascading.tap.Tap; 033 import cascading.tap.TapException; 034 import cascading.tap.hadoop.Hfs; 035 import cascading.tuple.Fields; 036 import cascading.tuple.Tuple; 037 import cascading.tuple.TupleEntry; 038 import cascading.tuple.util.TupleViews; 039 import org.apache.hadoop.io.LongWritable; 040 import org.apache.hadoop.io.Text; 041 import org.apache.hadoop.mapred.JobConf; 042 import org.apache.hadoop.mapred.OutputCollector; 043 import org.apache.hadoop.mapred.RecordReader; 044 045 /** 046 * Class TextDelimited is a sub-class of {@link TextLine}. It provides direct support for delimited text files, like 047 * TAB (\t) or COMMA (,) delimited files. It also optionally allows for quoted values. 048 * <p/> 049 * TextDelimited may also be used to skip the "header" in a file, where the header is defined as the very first line 050 * in every input file. That is, if the byte offset of the current line from the input is zero (0), that line will 051 * be skipped. 052 * <p/> 053 * It is assumed if sink/source {@code fields} is set to either {@link Fields#ALL} or {@link Fields#UNKNOWN} and 054 * {@code skipHeader} or {@code hasHeader} is {@code true}, the field names will be retrieved from the header of the 055 * file and used during planning. The header will parsed with the same rules as the body of the file. 056 * <p/> 057 * By default headers are not skipped. 058 * <p/> 059 * TextDelimited may also be used to write a "header" in a file. The fields names for the header are taken directly 060 * from the declared fields. Or if the declared fields are {@link Fields#ALL} or {@link Fields#UNKNOWN}, the 061 * resolved field names will be used, if any. 062 * <p/> 063 * By default headers are not written. 064 * <p/> 065 * If {@code hasHeaders} is set to {@code true} on a constructor, both {@code skipHeader} and {@code writeHeader} will 066 * be set to {@code true}. 067 * <p/> 068 * By default this {@link cascading.scheme.Scheme} is both {@code strict} and {@code safe}. 069 * <p/> 070 * Strict meaning if a line of text does not parse into the expected number of fields, this class will throw a 071 * {@link TapException}. If strict is {@code false}, then {@link Tuple} will be returned with {@code null} values 072 * for the missing fields. 073 * <p/> 074 * Safe meaning if a field cannot be coerced into an expected type, a {@code null} will be used for the value. 075 * If safe is {@code false}, a {@link TapException} will be thrown. 076 * <p/> 077 * Also by default, {@code quote} strings are not searched for to improve processing speed. If a file is 078 * COMMA delimited but may have COMMA's in a value, the whole value should be surrounded by the quote string, typically 079 * double quotes ({@literal "}). 080 * <p/> 081 * Note all empty fields in a line will be returned as {@code null} unless coerced into a new type. 082 * <p/> 083 * This Scheme may source/sink {@link Fields#ALL}, when given on the constructor the new instance will automatically 084 * default to strict == false as the number of fields parsed are arbitrary or unknown. A type array may not be given 085 * either, so all values will be returned as Strings. 086 * <p/> 087 * By default, all text is encoded/decoded as UTF-8. This can be changed via the {@code charsetName} constructor 088 * argument. 089 * <p/> 090 * To override field and line parsing behaviors, sub-class {@link DelimitedParser} or provide a 091 * {@link cascading.scheme.util.FieldTypeResolver} implementation. 092 * <p/> 093 * Note that there should be no expectation that TextDelimited, or specifically {@link DelimitedParser}, can handle 094 * all delimited and quoted combinations reliably. Attempting to do so would impair its performance and maintainability. 095 * <p/> 096 * Further, it can be safely said any corrupted files will not be supported for obvious reasons. Corrupted files may 097 * result in exceptions or could cause edge cases in the underlying java regular expression engine. 098 * <p/> 099 * A large part of Cascading was designed to help users cleans data. Thus the recommendation is to create Flows that 100 * are responsible for cleansing large data-sets when faced with the problem 101 * <p/> 102 * DelimitedParser maybe sub-classed and extended if necessary. 103 * 104 * @see TextLine 105 */ 106 public class TextDelimited extends TextLine 107 { 108 public static final String DEFAULT_CHARSET = "UTF-8"; 109 110 /** Field delimitedParser */ 111 protected final DelimitedParser delimitedParser; 112 /** Field skipHeader */ 113 private boolean skipHeader; 114 private final boolean writeHeader; 115 116 /** 117 * Constructor TextDelimited creates a new TextDelimited instance sourcing {@link Fields#UNKNOWN}, sinking 118 * {@link Fields#ALL} and using TAB as the default delimiter. 119 * <p/> 120 * Use this constructor if the source and sink fields will be resolved during planning, for example, when using 121 * with a {@link cascading.pipe.Checkpoint} Tap. 122 */ 123 public TextDelimited() 124 { 125 this( Fields.ALL, null, "\t", null, null ); 126 } 127 128 /** 129 * Constructor TextDelimited creates a new TextDelimited instance sourcing {@link Fields#UNKNOWN}, sinking 130 * {@link Fields#ALL} and using TAB as the default delimiter. 131 * <p/> 132 * Use this constructor if the source and sink fields will be resolved during planning, for example, when using 133 * with a {@link cascading.pipe.Checkpoint} Tap. 134 * 135 * @param hasHeader of type boolean 136 * @param delimiter of type String 137 */ 138 @ConstructorProperties({"hasHeader", "delimiter"}) 139 public TextDelimited( boolean hasHeader, String delimiter ) 140 { 141 this( Fields.ALL, null, hasHeader, delimiter, null, (Class[]) null ); 142 } 143 144 /** 145 * Constructor TextDelimited creates a new TextDelimited instance sourcing {@link Fields#UNKNOWN}, sinking 146 * {@link Fields#ALL} and using TAB as the default delimiter. 147 * <p/> 148 * Use this constructor if the source and sink fields will be resolved during planning, for example, when using 149 * with a {@link cascading.pipe.Checkpoint} Tap. 150 * 151 * @param hasHeader of type boolean 152 * @param delimiter of type String 153 * @param quote of type String 154 */ 155 @ConstructorProperties({"hasHeader", "delimiter", "quote"}) 156 public TextDelimited( boolean hasHeader, String delimiter, String quote ) 157 { 158 this( Fields.ALL, null, hasHeader, delimiter, quote, (Class[]) null ); 159 } 160 161 /** 162 * Constructor TextDelimited creates a new TextDelimited instance sourcing {@link Fields#UNKNOWN}, sinking 163 * {@link Fields#ALL} and using the given delimitedParser instance for parsing. 164 * <p/> 165 * Use this constructor if the source and sink fields will be resolved during planning, for example, when using 166 * with a {@link cascading.pipe.Checkpoint} Tap. 167 * 168 * @param hasHeader of type boolean 169 * @param delimitedParser of type DelimitedParser 170 */ 171 @ConstructorProperties({"hasHeader", "delimitedParser"}) 172 public TextDelimited( boolean hasHeader, DelimitedParser delimitedParser ) 173 { 174 this( Fields.ALL, null, hasHeader, hasHeader, delimitedParser ); 175 } 176 177 /** 178 * Constructor TextDelimited creates a new TextDelimited instance sourcing {@link Fields#UNKNOWN}, sinking 179 * {@link Fields#ALL} and using the given delimitedParser instance for parsing. 180 * <p/> 181 * Use this constructor if the source and sink fields will be resolved during planning, for example, when using 182 * with a {@link cascading.pipe.Checkpoint} Tap. 183 * <p/> 184 * This constructor will set {@code skipHeader} and {@code writeHeader} values to true. 185 * 186 * @param delimitedParser of type DelimitedParser 187 */ 188 @ConstructorProperties({"delimitedParser"}) 189 public TextDelimited( DelimitedParser delimitedParser ) 190 { 191 this( Fields.ALL, null, true, true, delimitedParser ); 192 } 193 194 /** 195 * Constructor TextDelimited creates a new TextDelimited instance sourcing {@link Fields#UNKNOWN}, sinking 196 * {@link Fields#ALL} and using the given delimitedParser instance for parsing. 197 * <p/> 198 * Use this constructor if the source and sink fields will be resolved during planning, for example, when using 199 * with a {@link cascading.pipe.Checkpoint} Tap. 200 * 201 * @param sinkCompression of type Compress 202 * @param hasHeader of type boolean 203 * @param delimitedParser of type DelimitedParser 204 */ 205 @ConstructorProperties({"sinkCompression", "hasHeader", "delimitedParser"}) 206 public TextDelimited( Compress sinkCompression, boolean hasHeader, DelimitedParser delimitedParser ) 207 { 208 this( Fields.ALL, sinkCompression, hasHeader, hasHeader, delimitedParser ); 209 } 210 211 /** 212 * Constructor TextDelimited creates a new TextDelimited instance sourcing {@link Fields#UNKNOWN}, sinking 213 * {@link Fields#ALL} and using the given delimitedParser instance for parsing. 214 * <p/> 215 * Use this constructor if the source and sink fields will be resolved during planning, for example, when using 216 * with a {@link cascading.pipe.Checkpoint} Tap. 217 * <p/> 218 * This constructor will set {@code skipHeader} and {@code writeHeader} values to true. 219 * 220 * @param delimitedParser of type DelimitedParser 221 */ 222 @ConstructorProperties({"sinkCompression", "delimitedParser"}) 223 public TextDelimited( Compress sinkCompression, DelimitedParser delimitedParser ) 224 { 225 this( Fields.ALL, sinkCompression, true, true, delimitedParser ); 226 } 227 228 /** 229 * Constructor TextDelimited creates a new TextDelimited instance sourcing {@link Fields#UNKNOWN}, sinking 230 * {@link Fields#ALL} and using TAB as the default delimiter. 231 * <p/> 232 * Use this constructor if the source and sink fields will be resolved during planning, for example, when using 233 * with a {@link cascading.pipe.Checkpoint} Tap. 234 * 235 * @param sinkCompression of type Compress 236 * @param hasHeader of type boolean 237 * @param delimiter of type String 238 * @param quote of type String 239 */ 240 @ConstructorProperties({"sinkCompression", "hasHeader", "delimiter", "quote"}) 241 public TextDelimited( Compress sinkCompression, boolean hasHeader, String delimiter, String quote ) 242 { 243 this( Fields.ALL, sinkCompression, hasHeader, delimiter, quote, (Class[]) null ); 244 } 245 246 /** 247 * Constructor TextDelimited creates a new TextDelimited instance with TAB as the default delimiter. 248 * 249 * @param fields of type Fields 250 */ 251 @ConstructorProperties({"fields"}) 252 public TextDelimited( Fields fields ) 253 { 254 this( fields, null, "\t", null, null ); 255 } 256 257 /** 258 * Constructor TextDelimited creates a new TextDelimited instance. 259 * 260 * @param fields of type Fields 261 * @param delimiter of type String 262 */ 263 @ConstructorProperties({"fields", "delimiter"}) 264 public TextDelimited( Fields fields, String delimiter ) 265 { 266 this( fields, null, delimiter, null, null ); 267 } 268 269 /** 270 * Constructor TextDelimited creates a new TextDelimited instance. 271 * 272 * @param fields of type Fields 273 * @param hasHeader of type boolean 274 * @param delimiter of type String 275 */ 276 @ConstructorProperties({"fields", "hasHeader", "delimiter"}) 277 public TextDelimited( Fields fields, boolean hasHeader, String delimiter ) 278 { 279 this( fields, null, hasHeader, hasHeader, delimiter, null, null ); 280 } 281 282 /** 283 * Constructor TextDelimited creates a new TextDelimited instance. 284 * 285 * @param fields of type Fields 286 * @param skipHeader of type boolean 287 * @param writeHeader of type boolean 288 * @param delimiter of type String 289 */ 290 @ConstructorProperties({"fields", "skipHeader", "writeHeader", "delimiter"}) 291 public TextDelimited( Fields fields, boolean skipHeader, boolean writeHeader, String delimiter ) 292 { 293 this( fields, null, skipHeader, writeHeader, delimiter, null, null ); 294 } 295 296 /** 297 * Constructor TextDelimited creates a new TextDelimited instance. 298 * 299 * @param fields of type Fields 300 * @param delimiter of type String 301 * @param types of type Class[] 302 */ 303 @ConstructorProperties({"fields", "delimiter", "types"}) 304 public TextDelimited( Fields fields, String delimiter, Class[] types ) 305 { 306 this( fields, null, delimiter, null, types ); 307 } 308 309 /** 310 * Constructor TextDelimited creates a new TextDelimited instance. 311 * 312 * @param fields of type Fields 313 * @param hasHeader of type boolean 314 * @param delimiter of type String 315 * @param types of type Class[] 316 */ 317 @ConstructorProperties({"fields", "hasHeader", "delimiter", "types"}) 318 public TextDelimited( Fields fields, boolean hasHeader, String delimiter, Class[] types ) 319 { 320 this( fields, null, hasHeader, hasHeader, delimiter, null, types ); 321 } 322 323 /** 324 * Constructor TextDelimited creates a new TextDelimited instance. 325 * 326 * @param fields of type Fields 327 * @param skipHeader of type boolean 328 * @param writeHeader of type boolean 329 * @param delimiter of type String 330 * @param types of type Class[] 331 */ 332 @ConstructorProperties({"fields", "skipHeader", "writeHeader", "delimiter", "types"}) 333 public TextDelimited( Fields fields, boolean skipHeader, boolean writeHeader, String delimiter, Class[] types ) 334 { 335 this( fields, null, skipHeader, writeHeader, delimiter, null, types ); 336 } 337 338 /** 339 * Constructor TextDelimited creates a new TextDelimited instance. 340 * 341 * @param fields of type Fields 342 * @param delimiter of type String 343 * @param quote of type String 344 * @param types of type Class[] 345 */ 346 @ConstructorProperties({"fields", "delimiter", "quote", "types"}) 347 public TextDelimited( Fields fields, String delimiter, String quote, Class[] types ) 348 { 349 this( fields, null, delimiter, quote, types ); 350 } 351 352 /** 353 * Constructor TextDelimited creates a new TextDelimited instance. 354 * 355 * @param fields of type Fields 356 * @param hasHeader of type boolean 357 * @param delimiter of type String 358 * @param quote of type String 359 * @param types of type Class[] 360 */ 361 @ConstructorProperties({"fields", "hasHeader", "delimiter", "quote", "types"}) 362 public TextDelimited( Fields fields, boolean hasHeader, String delimiter, String quote, Class[] types ) 363 { 364 this( fields, null, hasHeader, hasHeader, delimiter, quote, types ); 365 } 366 367 /** 368 * Constructor TextDelimited creates a new TextDelimited instance. 369 * 370 * @param fields of type Fields 371 * @param skipHeader of type boolean 372 * @param writeHeader of type boolean 373 * @param delimiter of type String 374 * @param quote of type String 375 * @param types of type Class[] 376 */ 377 @ConstructorProperties({"fields", "skipHeader", "writeHeader", "delimiter", "quote", "types"}) 378 public TextDelimited( Fields fields, boolean skipHeader, boolean writeHeader, String delimiter, String quote, Class[] types ) 379 { 380 this( fields, null, skipHeader, writeHeader, delimiter, quote, types ); 381 } 382 383 /** 384 * Constructor TextDelimited creates a new TextDelimited instance. 385 * 386 * @param fields of type Fields 387 * @param delimiter of type String 388 * @param quote of type String 389 * @param types of type Class[] 390 * @param safe of type boolean 391 */ 392 @ConstructorProperties({"fields", "delimiter", "quote", "types", "safe"}) 393 public TextDelimited( Fields fields, String delimiter, String quote, Class[] types, boolean safe ) 394 { 395 this( fields, null, delimiter, quote, types, safe ); 396 } 397 398 /** 399 * Constructor TextDelimited creates a new TextDelimited instance. 400 * 401 * @param fields of type Fields 402 * @param hasHeader of type boolean 403 * @param delimiter of type String 404 * @param quote of type String 405 * @param types of type Class[] 406 * @param safe of type boolean 407 */ 408 @ConstructorProperties({"fields", "hasHeader", "delimiter", "quote", "types", "safe"}) 409 public TextDelimited( Fields fields, boolean hasHeader, String delimiter, String quote, Class[] types, boolean safe ) 410 { 411 this( fields, null, hasHeader, hasHeader, delimiter, quote, types, safe ); 412 } 413 414 /** 415 * Constructor TextDelimited creates a new TextDelimited instance. 416 * 417 * @param fields of type Fields 418 * @param hasHeader of type boolean 419 * @param delimiter of type String 420 * @param quote of type String 421 * @param types of type Class[] 422 * @param safe of type boolean 423 * @param charsetName of type String 424 */ 425 @ConstructorProperties({"fields", "hasHeader", "delimiter", "quote", "types", "safe", "charsetName"}) 426 public TextDelimited( Fields fields, boolean hasHeader, String delimiter, String quote, Class[] types, boolean safe, String charsetName ) 427 { 428 this( fields, null, hasHeader, hasHeader, delimiter, true, quote, types, safe, charsetName ); 429 } 430 431 /** 432 * Constructor TextDelimited creates a new TextDelimited instance. 433 * 434 * @param fields of type Fields 435 * @param skipHeader of type boolean 436 * @param writeHeader of type boolean 437 * @param delimiter of type String 438 * @param quote of type String 439 * @param types of type Class[] 440 * @param safe of type boolean 441 */ 442 @ConstructorProperties({"fields", "skipHeader", "writeHeader", "delimiter", "quote", "types", "safe"}) 443 public TextDelimited( Fields fields, boolean skipHeader, boolean writeHeader, String delimiter, String quote, Class[] types, boolean safe ) 444 { 445 this( fields, null, skipHeader, writeHeader, delimiter, quote, types, safe ); 446 } 447 448 /** 449 * Constructor TextDelimited creates a new TextDelimited instance. 450 * 451 * @param fields of type Fields 452 * @param sinkCompression of type Compress 453 * @param delimiter of type String 454 */ 455 @ConstructorProperties({"fields", "sinkCompression", "delimiter"}) 456 public TextDelimited( Fields fields, Compress sinkCompression, String delimiter ) 457 { 458 this( fields, sinkCompression, delimiter, null, null ); 459 } 460 461 /** 462 * Constructor TextDelimited creates a new TextDelimited instance. 463 * 464 * @param fields of type Fields 465 * @param sinkCompression of type Compress 466 * @param hasHeader of type boolean 467 * @param delimiter of type String 468 */ 469 @ConstructorProperties({"fields", "sinkCompression", "hasHeader", "delimiter"}) 470 public TextDelimited( Fields fields, Compress sinkCompression, boolean hasHeader, String delimiter ) 471 { 472 this( fields, sinkCompression, hasHeader, hasHeader, delimiter, null, null ); 473 } 474 475 /** 476 * Constructor TextDelimited creates a new TextDelimited instance. 477 * 478 * @param fields of type Fields 479 * @param sinkCompression of type Compress 480 * @param skipHeader of type boolean 481 * @param writeHeader of type boolean 482 * @param delimiter of type String 483 */ 484 @ConstructorProperties({"fields", "sinkCompression", "skipHeader", "writeHeader", "delimiter"}) 485 public TextDelimited( Fields fields, Compress sinkCompression, boolean skipHeader, boolean writeHeader, String delimiter ) 486 { 487 this( fields, sinkCompression, skipHeader, writeHeader, delimiter, null, null ); 488 } 489 490 /** 491 * Constructor TextDelimited creates a new TextDelimited instance. 492 * 493 * @param fields of type Fields 494 * @param sinkCompression of type Compress 495 * @param delimiter of type String 496 * @param types of type Class[] 497 */ 498 @ConstructorProperties({"fields", "sinkCompression", "delimiter", "types"}) 499 public TextDelimited( Fields fields, Compress sinkCompression, String delimiter, Class[] types ) 500 { 501 this( fields, sinkCompression, delimiter, null, types ); 502 } 503 504 /** 505 * Constructor TextDelimited creates a new TextDelimited instance. 506 * 507 * @param fields of type Fields 508 * @param sinkCompression of type Compress 509 * @param hasHeader of type boolean 510 * @param delimiter of type String 511 * @param types of type Class[] 512 */ 513 @ConstructorProperties({"fields", "sinkCompression", "hasHeader", "delimiter", "types"}) 514 public TextDelimited( Fields fields, Compress sinkCompression, boolean hasHeader, String delimiter, Class[] types ) 515 { 516 this( fields, sinkCompression, hasHeader, hasHeader, delimiter, null, types ); 517 } 518 519 /** 520 * Constructor TextDelimited creates a new TextDelimited instance. 521 * 522 * @param fields of type Fields 523 * @param sinkCompression of type Compress 524 * @param skipHeader of type boolean 525 * @param writeHeader of type boolean 526 * @param delimiter of type String 527 * @param types of type Class[] 528 */ 529 @ConstructorProperties({"fields", "sinkCompression", "skipHeader", "writeHeader", "delimiter", "types"}) 530 public TextDelimited( Fields fields, Compress sinkCompression, boolean skipHeader, boolean writeHeader, String delimiter, Class[] types ) 531 { 532 this( fields, sinkCompression, skipHeader, writeHeader, delimiter, null, types ); 533 } 534 535 /** 536 * Constructor TextDelimited creates a new TextDelimited instance. 537 * 538 * @param fields of type Fields 539 * @param sinkCompression of type Compress 540 * @param delimiter of type String 541 * @param types of type Class[] 542 * @param safe of type boolean 543 */ 544 @ConstructorProperties({"fields", "sinkCompression", "delimiter", "types", "safe"}) 545 public TextDelimited( Fields fields, Compress sinkCompression, String delimiter, Class[] types, boolean safe ) 546 { 547 this( fields, sinkCompression, delimiter, null, types, safe ); 548 } 549 550 /** 551 * Constructor TextDelimited creates a new TextDelimited instance. 552 * 553 * @param fields of type Fields 554 * @param sinkCompression of type Compress 555 * @param hasHeader of type boolean 556 * @param delimiter of type String 557 * @param types of type Class[] 558 * @param safe of type boolean 559 */ 560 @ConstructorProperties({"fields", "sinkCompression", "hasHeader", "delimiter", "types", "safe"}) 561 public TextDelimited( Fields fields, Compress sinkCompression, boolean hasHeader, String delimiter, Class[] types, boolean safe ) 562 { 563 this( fields, sinkCompression, hasHeader, hasHeader, delimiter, null, types, safe ); 564 } 565 566 /** 567 * Constructor TextDelimited creates a new TextDelimited instance. 568 * 569 * @param fields of type Fields 570 * @param sinkCompression of type Compress 571 * @param hasHeader of type boolean 572 * @param delimiter of type String 573 * @param types of type Class[] 574 * @param safe of type boolean 575 * @param charsetName of type String 576 */ 577 @ConstructorProperties({"fields", "sinkCompression", "hasHeader", "delimiter", "types", "safe", "charsetName"}) 578 public TextDelimited( Fields fields, Compress sinkCompression, boolean hasHeader, String delimiter, Class[] types, boolean safe, String charsetName ) 579 { 580 this( fields, sinkCompression, hasHeader, hasHeader, delimiter, true, null, types, safe, charsetName ); 581 } 582 583 /** 584 * Constructor TextDelimited creates a new TextDelimited instance. 585 * 586 * @param fields of type Fields 587 * @param sinkCompression of type Compress 588 * @param skipHeader of type boolean 589 * @param writeHeader of type boolean 590 * @param delimiter of type String 591 * @param types of type Class[] 592 * @param safe of type boolean 593 */ 594 @ConstructorProperties({"fields", "sinkCompression", "skipHeader", "writeHeader", "delimiter", "types", "safe"}) 595 public TextDelimited( Fields fields, Compress sinkCompression, boolean skipHeader, boolean writeHeader, String delimiter, Class[] types, boolean safe ) 596 { 597 this( fields, sinkCompression, skipHeader, writeHeader, delimiter, null, types, safe ); 598 } 599 600 /** 601 * Constructor TextDelimited creates a new TextDelimited instance. 602 * 603 * @param fields of type Fields 604 * @param delimiter of type String 605 * @param quote of type String 606 */ 607 @ConstructorProperties({"fields", "delimiter", "quote"}) 608 public TextDelimited( Fields fields, String delimiter, String quote ) 609 { 610 this( fields, null, delimiter, quote ); 611 } 612 613 /** 614 * Constructor TextDelimited creates a new TextDelimited instance. 615 * 616 * @param fields of type Fields 617 * @param hasHeader of type boolean 618 * @param delimiter of type String 619 * @param quote of type String 620 */ 621 @ConstructorProperties({"fields", "hasHeader", "delimiter", "quote"}) 622 public TextDelimited( Fields fields, boolean hasHeader, String delimiter, String quote ) 623 { 624 this( fields, null, hasHeader, hasHeader, delimiter, quote ); 625 } 626 627 /** 628 * Constructor TextDelimited creates a new TextDelimited instance. 629 * 630 * @param fields of type Fields 631 * @param skipHeader of type boolean 632 * @param writeHeader of type boolean 633 * @param delimiter of type String 634 * @param quote of type String 635 */ 636 @ConstructorProperties({"fields", "skipHeader", "writeHeader", "delimiter", "quote"}) 637 public TextDelimited( Fields fields, boolean skipHeader, boolean writeHeader, String delimiter, String quote ) 638 { 639 this( fields, null, skipHeader, writeHeader, delimiter, quote ); 640 } 641 642 /** 643 * Constructor TextDelimited creates a new TextDelimited instance. 644 * 645 * @param fields of type Fields 646 * @param sinkCompression of type Compress 647 * @param delimiter of type String 648 * @param quote of type String 649 */ 650 @ConstructorProperties({"fields", "sinkCompression", "delimiter", "quote"}) 651 public TextDelimited( Fields fields, Compress sinkCompression, String delimiter, String quote ) 652 { 653 this( fields, sinkCompression, false, false, delimiter, true, quote, null, true ); 654 } 655 656 /** 657 * Constructor TextDelimited creates a new TextDelimited instance. 658 * 659 * @param fields of type Fields 660 * @param sinkCompression of type Compress 661 * @param hasHeader of type boolean 662 * @param delimiter of type String 663 * @param quote of type String 664 */ 665 @ConstructorProperties({"fields", "sinkCompression", "hasHeader", "delimiter", "quote"}) 666 public TextDelimited( Fields fields, Compress sinkCompression, boolean hasHeader, String delimiter, String quote ) 667 { 668 this( fields, sinkCompression, hasHeader, hasHeader, delimiter, true, quote, null, true ); 669 } 670 671 /** 672 * Constructor TextDelimited creates a new TextDelimited instance. 673 * 674 * @param fields of type Fields 675 * @param sinkCompression of type Compress 676 * @param hasHeader of type boolean 677 * @param delimiter of type String 678 * @param quote of type String 679 * @param charsetName of type String 680 */ 681 @ConstructorProperties({"fields", "sinkCompression", "hasHeader", "delimiter", "quote", "charsetName"}) 682 public TextDelimited( Fields fields, Compress sinkCompression, boolean hasHeader, String delimiter, String quote, String charsetName ) 683 { 684 this( fields, sinkCompression, hasHeader, hasHeader, delimiter, true, quote, null, true, charsetName ); 685 } 686 687 /** 688 * Constructor TextDelimited creates a new TextDelimited instance. 689 * 690 * @param fields of type Fields 691 * @param sinkCompression of type Compress 692 * @param skipHeader of type boolean 693 * @param writeHeader of type boolean 694 * @param delimiter of type String 695 * @param quote of type String 696 */ 697 @ConstructorProperties({"fields", "sinkCompression", "skipHeader", "writeHeader", "delimiter", "quote"}) 698 public TextDelimited( Fields fields, Compress sinkCompression, boolean skipHeader, boolean writeHeader, String delimiter, String quote ) 699 { 700 this( fields, sinkCompression, skipHeader, writeHeader, delimiter, true, quote, null, true ); 701 } 702 703 /** 704 * Constructor TextDelimited creates a new TextDelimited instance. 705 * 706 * @param fields of type Fields 707 * @param sinkCompression of type Compress 708 * @param delimiter of type String 709 * @param quote of type String 710 * @param types of type Class[] 711 */ 712 @ConstructorProperties({"fields", "sinkCompression", "delimiter", "quote", "types"}) 713 public TextDelimited( Fields fields, Compress sinkCompression, String delimiter, String quote, Class[] types ) 714 { 715 this( fields, sinkCompression, false, false, delimiter, true, quote, types, true ); 716 } 717 718 /** 719 * Constructor TextDelimited creates a new TextDelimited instance. 720 * 721 * @param fields of type Fields 722 * @param sinkCompression of type Compress 723 * @param hasHeader of type boolean 724 * @param delimiter of type String 725 * @param quote of type String 726 * @param types of type Class[] 727 */ 728 @ConstructorProperties({"fields", "sinkCompression", "hasHeader", "delimiter", "quote", "types"}) 729 public TextDelimited( Fields fields, Compress sinkCompression, boolean hasHeader, String delimiter, String quote, Class[] types ) 730 { 731 this( fields, sinkCompression, hasHeader, hasHeader, delimiter, true, quote, types, true ); 732 } 733 734 /** 735 * Constructor TextDelimited creates a new TextDelimited instance. 736 * 737 * @param fields of type Fields 738 * @param sinkCompression of type Compress 739 * @param skipHeader of type boolean 740 * @param writeHeader of type boolean 741 * @param delimiter of type String 742 * @param quote of type String 743 * @param types of type Class[] 744 */ 745 @ConstructorProperties({"fields", "sinkCompression", "skipHeader", "writeHeader", "delimiter", "quote", "types"}) 746 public TextDelimited( Fields fields, Compress sinkCompression, boolean skipHeader, boolean writeHeader, String delimiter, String quote, Class[] types ) 747 { 748 this( fields, sinkCompression, skipHeader, writeHeader, delimiter, true, quote, types, true ); 749 } 750 751 /** 752 * Constructor TextDelimited creates a new TextDelimited instance. 753 * 754 * @param fields of type Fields 755 * @param sinkCompression of type Compress 756 * @param delimiter of type String 757 * @param quote of type String 758 * @param types of type Class[] 759 * @param safe of type boolean 760 */ 761 @ConstructorProperties({"fields", "sinkCompression", "delimiter", "quote", "types", "safe"}) 762 public TextDelimited( Fields fields, Compress sinkCompression, String delimiter, String quote, Class[] types, boolean safe ) 763 { 764 this( fields, sinkCompression, false, false, delimiter, true, quote, types, safe ); 765 } 766 767 /** 768 * Constructor TextDelimited creates a new TextDelimited instance. 769 * 770 * @param fields of type Fields 771 * @param sinkCompression of type Compress 772 * @param hasHeader of type boolean 773 * @param delimiter of type String 774 * @param quote of type String 775 * @param types of type Class[] 776 * @param safe of type boolean 777 */ 778 @ConstructorProperties({"fields", "sinkCompression", "hasHeader", "delimiter", "quote", "types", "safe"}) 779 public TextDelimited( Fields fields, Compress sinkCompression, boolean hasHeader, String delimiter, String quote, Class[] types, boolean safe ) 780 { 781 this( fields, sinkCompression, hasHeader, hasHeader, delimiter, true, quote, types, safe ); 782 } 783 784 /** 785 * Constructor TextDelimited creates a new TextDelimited instance. 786 * 787 * @param fields of type Fields 788 * @param sinkCompression of type Compress 789 * @param skipHeader of type boolean 790 * @param writeHeader of type boolean 791 * @param delimiter of type String 792 * @param quote of type String 793 * @param types of type Class[] 794 * @param safe of type boolean 795 */ 796 @ConstructorProperties({"fields", "sinkCompression", "skipHeader", "writeHeader", "delimiter", "quote", "types", 797 "safe"}) 798 public TextDelimited( Fields fields, Compress sinkCompression, boolean skipHeader, boolean writeHeader, String delimiter, String quote, Class[] types, boolean safe ) 799 { 800 this( fields, sinkCompression, skipHeader, writeHeader, delimiter, true, quote, types, safe ); 801 } 802 803 /** 804 * Constructor TextDelimited creates a new TextDelimited instance. 805 * 806 * @param fields of type Fields 807 * @param sinkCompression of type Compress 808 * @param skipHeader of type boolean 809 * @param delimiter of type String 810 * @param strict of type boolean 811 * @param quote of type String 812 * @param types of type Class[] 813 * @param safe of type boolean 814 */ 815 @ConstructorProperties({"fields", "sinkCompression", "skipHeader", "writeHeader", "delimiter", "strict", "quote", 816 "types", "safe"}) 817 public TextDelimited( Fields fields, Compress sinkCompression, boolean skipHeader, boolean writeHeader, String delimiter, boolean strict, String quote, Class[] types, boolean safe ) 818 { 819 this( fields, sinkCompression, skipHeader, writeHeader, delimiter, strict, quote, types, safe, DEFAULT_CHARSET ); 820 } 821 822 /** 823 * Constructor TextDelimited creates a new TextDelimited instance. 824 * 825 * @param fields of type Fields 826 * @param sinkCompression of type Compress 827 * @param skipHeader of type boolean 828 * @param delimiter of type String 829 * @param strict of type boolean 830 * @param quote of type String 831 * @param types of type Class[] 832 * @param safe of type boolean 833 * @param charsetName of type String 834 */ 835 @ConstructorProperties({"fields", "sinkCompression", "skipHeader", "writeHeader", "delimiter", "strict", "quote", 836 "types", "safe", "charsetName"}) 837 public TextDelimited( Fields fields, Compress sinkCompression, boolean skipHeader, boolean writeHeader, String delimiter, boolean strict, String quote, Class[] types, boolean safe, String charsetName ) 838 { 839 this( fields, sinkCompression, skipHeader, writeHeader, charsetName, new DelimitedParser( delimiter, quote, types, strict, safe ) ); 840 } 841 842 /** 843 * Constructor TextDelimited creates a new TextDelimited instance. 844 * 845 * @param fields of type Fields 846 * @param writeHeader of type boolean 847 * @param delimitedParser of type DelimitedParser 848 */ 849 @ConstructorProperties({"fields", "skipHeader", "writeHeader", "delimitedParser"}) 850 public TextDelimited( Fields fields, boolean skipHeader, boolean writeHeader, DelimitedParser delimitedParser ) 851 { 852 this( fields, null, skipHeader, writeHeader, null, delimitedParser ); 853 } 854 855 /** 856 * Constructor TextDelimited creates a new TextDelimited instance. 857 * 858 * @param fields of type Fields 859 * @param hasHeader of type boolean 860 * @param delimitedParser of type DelimitedParser 861 */ 862 @ConstructorProperties({"fields", "hasHeader", "delimitedParser"}) 863 public TextDelimited( Fields fields, boolean hasHeader, DelimitedParser delimitedParser ) 864 { 865 this( fields, null, hasHeader, hasHeader, null, delimitedParser ); 866 } 867 868 /** 869 * Constructor TextDelimited creates a new TextDelimited instance. 870 * 871 * @param fields of type Fields 872 * @param writeHeader of type boolean 873 * @param delimitedParser of type DelimitedParser 874 */ 875 @ConstructorProperties({"fields", "sinkCompression", "skipHeader", "writeHeader", "delimitedParser"}) 876 public TextDelimited( Fields fields, Compress sinkCompression, boolean skipHeader, boolean writeHeader, DelimitedParser delimitedParser ) 877 { 878 this( fields, sinkCompression, skipHeader, writeHeader, null, delimitedParser ); 879 } 880 881 /** 882 * Constructor TextDelimited creates a new TextDelimited instance. 883 * 884 * @param fields of type Fields 885 * @param sinkCompression of type Compress 886 * @param skipHeader of type boolean 887 * @param writeHeader of type boolean 888 * @param charsetName of type String 889 * @param delimitedParser of type DelimitedParser 890 */ 891 @ConstructorProperties({"fields", "sinkCompression", "skipHeader", "writeHeader", "charsetName", "delimitedParser"}) 892 public TextDelimited( Fields fields, Compress sinkCompression, boolean skipHeader, boolean writeHeader, String charsetName, DelimitedParser delimitedParser ) 893 { 894 super( sinkCompression ); 895 896 this.delimitedParser = delimitedParser; 897 898 // normalizes ALL and UNKNOWN 899 setSinkFields( fields ); 900 setSourceFields( fields ); 901 902 this.skipHeader = skipHeader; 903 this.writeHeader = writeHeader; 904 905 // throws an exception if not found 906 setCharsetName( charsetName ); 907 } 908 909 /** 910 * Method getDelimiter returns the delimiter used to parse fields from the current line of text. 911 * 912 * @return a String 913 */ 914 public String getDelimiter() 915 { 916 return delimitedParser.getDelimiter(); 917 } 918 919 /** 920 * Method getQuote returns the quote string, if any, used to encapsulate each field in a line to delimited text. 921 * 922 * @return a String 923 */ 924 public String getQuote() 925 { 926 return delimitedParser.getQuote(); 927 } 928 929 @Override 930 public boolean isSymmetrical() 931 { 932 return super.isSymmetrical() && skipHeader == writeHeader; 933 } 934 935 @Override 936 public void setSinkFields( Fields sinkFields ) 937 { 938 super.setSourceFields( sinkFields ); 939 super.setSinkFields( sinkFields ); 940 941 if( delimitedParser != null ) 942 delimitedParser.reset( getSourceFields(), getSinkFields() ); 943 } 944 945 @Override 946 public void setSourceFields( Fields sourceFields ) 947 { 948 super.setSourceFields( sourceFields ); 949 super.setSinkFields( sourceFields ); 950 951 if( delimitedParser != null ) 952 delimitedParser.reset( getSourceFields(), getSinkFields() ); 953 } 954 955 @Override 956 public Fields retrieveSourceFields( FlowProcess<JobConf> flowProcess, Tap tap ) 957 { 958 if( !skipHeader || !getSourceFields().isUnknown() ) 959 return getSourceFields(); 960 961 // no need to open them all 962 if( tap instanceof CompositeTap ) 963 tap = (Tap) ( (CompositeTap) tap ).getChildTaps().next(); 964 965 // should revert to file:// (Lfs) if tap is Lfs 966 tap = new Hfs( new TextLine( new Fields( "line" ), charsetName ), tap.getFullIdentifier( flowProcess.getConfigCopy() ) ); 967 968 setSourceFields( delimitedParser.parseFirstLine( flowProcess, tap ) ); 969 970 return getSourceFields(); 971 } 972 973 @Override 974 public void presentSourceFields( FlowProcess<JobConf> flowProcess, Tap tap, Fields fields ) 975 { 976 presentSourceFieldsInternal( fields ); 977 } 978 979 @Override 980 public void presentSinkFields( FlowProcess<JobConf> flowProcess, Tap tap, Fields fields ) 981 { 982 presentSinkFieldsInternal( fields ); 983 } 984 985 @Override 986 public void sourcePrepare( FlowProcess<JobConf> flowProcess, SourceCall<Object[], RecordReader> sourceCall ) 987 { 988 super.sourcePrepare( flowProcess, sourceCall ); 989 990 sourceCall.getIncomingEntry().setTuple( TupleViews.createObjectArray() ); 991 } 992 993 @Override 994 public boolean source( FlowProcess<JobConf> flowProcess, SourceCall<Object[], RecordReader> sourceCall ) throws IOException 995 { 996 Object[] context = sourceCall.getContext(); 997 998 if( !sourceCall.getInput().next( context[ 0 ], context[ 1 ] ) ) 999 return false; 1000 1001 if( skipHeader && ( (LongWritable) context[ 0 ] ).get() == 0 ) 1002 { 1003 if( !sourceCall.getInput().next( context[ 0 ], context[ 1 ] ) ) 1004 return false; 1005 } 1006 1007 // delegate coercion to delimitedParser for robustness 1008 Object[] split = delimitedParser.parseLine( makeEncodedString( context ) ); 1009 Tuple tuple = sourceCall.getIncomingEntry().getTuple(); 1010 1011 TupleViews.reset( tuple, split ); 1012 1013 return true; 1014 } 1015 1016 @Override 1017 public void sinkPrepare( FlowProcess<JobConf> flowProcess, SinkCall<Object[], OutputCollector> sinkCall ) throws IOException 1018 { 1019 sinkCall.setContext( new Object[ 3 ] ); 1020 1021 sinkCall.getContext()[ 0 ] = new Text(); 1022 sinkCall.getContext()[ 1 ] = new StringBuilder( 4 * 1024 ); 1023 sinkCall.getContext()[ 2 ] = Charset.forName( charsetName ); 1024 1025 if( writeHeader ) 1026 writeHeader( sinkCall ); 1027 } 1028 1029 protected void writeHeader( SinkCall<Object[], OutputCollector> sinkCall ) throws IOException 1030 { 1031 Fields fields = sinkCall.getOutgoingEntry().getFields(); 1032 1033 Text text = (Text) sinkCall.getContext()[ 0 ]; 1034 StringBuilder line = (StringBuilder) sinkCall.getContext()[ 1 ]; 1035 Charset charset = (Charset) sinkCall.getContext()[ 2 ]; 1036 1037 line = (StringBuilder) delimitedParser.joinFirstLine( fields, line ); 1038 1039 text.set( line.toString().getBytes( charset ) ); 1040 1041 sinkCall.getOutput().collect( null, text ); 1042 1043 line.setLength( 0 ); 1044 } 1045 1046 @Override 1047 public void sink( FlowProcess<JobConf> flowProcess, SinkCall<Object[], OutputCollector> sinkCall ) throws IOException 1048 { 1049 TupleEntry tupleEntry = sinkCall.getOutgoingEntry(); 1050 1051 Text text = (Text) sinkCall.getContext()[ 0 ]; 1052 StringBuilder line = (StringBuilder) sinkCall.getContext()[ 1 ]; 1053 Charset charset = (Charset) sinkCall.getContext()[ 2 ]; 1054 1055 Iterable<String> strings = tupleEntry.asIterableOf( String.class ); 1056 1057 line = (StringBuilder) delimitedParser.joinLine( strings, line ); 1058 1059 text.set( line.toString().getBytes( charset ) ); 1060 1061 sinkCall.getOutput().collect( null, text ); 1062 1063 line.setLength( 0 ); 1064 } 1065 } 1066