001/* 002 * Copyright (c) 2016-2017 Chris K Wensel <chris@wensel.net>. All Rights Reserved. 003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 004 * 005 * Project and contact information: http://www.cascading.org/ 006 * 007 * This file is part of the Cascading project. 008 * 009 * Licensed under the Apache License, Version 2.0 (the "License"); 010 * you may not use this file except in compliance with the License. 011 * You may obtain a copy of the License at 012 * 013 * http://www.apache.org/licenses/LICENSE-2.0 014 * 015 * Unless required by applicable law or agreed to in writing, software 016 * distributed under the License is distributed on an "AS IS" BASIS, 017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 018 * See the License for the specific language governing permissions and 019 * limitations under the License. 020 */ 021 022package cascading.scheme.util; 023 024import java.io.IOException; 025import java.io.Serializable; 026import java.lang.reflect.Type; 027import java.util.ArrayList; 028import java.util.Arrays; 029import java.util.List; 030import java.util.regex.Pattern; 031 032import cascading.flow.FlowProcess; 033import cascading.tap.Tap; 034import cascading.tap.TapException; 035import cascading.tuple.Fields; 036import cascading.tuple.Tuple; 037import cascading.tuple.TupleEntry; 038import cascading.tuple.TupleEntryIterator; 039import cascading.tuple.coerce.Coercions; 040import cascading.tuple.type.CoercibleType; 041import cascading.util.Util; 042import org.slf4j.Logger; 043import org.slf4j.LoggerFactory; 044 045/** 046 * Class DelimitedParser is a base class for parsing text delimited files. 047 * <p> 048 * It maybe sub-classed to change its behavior. 049 * <p> 050 * The interface {@link FieldTypeResolver} maybe used to clean and prepare field names 051 * for data columns, and to infer type information from column names. 052 */ 053public class DelimitedParser implements Serializable 054 { 055 /** Field LOG */ 056 private static final Logger LOG = LoggerFactory.getLogger( DelimitedParser.class ); 057 058 /** Field SPECIAL_REGEX_CHARS */ 059 static final String SPECIAL_REGEX_CHARS = "([\\]\\[|.*<>\\\\$^?()=!+])"; 060 /** Field QUOTED_REGEX_FORMAT */ 061 static final String QUOTED_REGEX_FORMAT = "%2$s(?=(?:[^%1$s]*%1$s[^%1$s]*[^%1$s%2$s]*%1$s)*(?![^%1$s]*%1$s))"; 062 /** Field CLEAN_REGEX_FORMAT */ 063 static final String CLEAN_REGEX_FORMAT = "^(?:%1$s)(.*)(?:%1$s)$"; 064 /** Field ESCAPE_REGEX_FORMAT */ 065 static final String ESCAPE_REGEX_FORMAT = "(%1$s%1$s)"; 066 067 /** Field sourceFields */ 068 protected Fields sourceFields; 069 070 /** Field splitPattern */ 071 protected Pattern splitPattern; 072 /** Field cleanPattern */ 073 protected Pattern cleanPattern; 074 /** Field escapePattern */ 075 protected Pattern escapePattern; 076 /** Field delimiter * */ 077 protected String delimiter; 078 /** Field quote */ 079 protected String quote; 080 /** Field strict */ 081 protected boolean strict = true; // need to cache value across resets 082 /** Field enforceStrict */ 083 protected boolean enforceStrict = true; 084 /** Field numValues */ 085 protected int numValues; 086 /** Field types */ 087 protected Type[] types; 088 /** Fields coercibles */ 089 protected CoercibleType[] coercibles; 090 /** Field safe */ 091 protected boolean safe = true; 092 /** fieldTypeResolver */ 093 protected FieldTypeResolver fieldTypeResolver; 094 095 public DelimitedParser( String delimiter, String quote, Class[] types ) 096 { 097 reset( delimiter, quote, types, strict, safe, null, null, null ); 098 } 099 100 public DelimitedParser( String delimiter, String quote, Class[] types, boolean strict, boolean safe ) 101 { 102 reset( delimiter, quote, types, strict, safe, null, null, null ); 103 } 104 105 public DelimitedParser( String delimiter, String quote, FieldTypeResolver fieldTypeResolver ) 106 { 107 reset( delimiter, quote, null, strict, safe, null, null, fieldTypeResolver ); 108 } 109 110 public DelimitedParser( String delimiter, String quote, Class[] types, boolean strict, boolean safe, FieldTypeResolver fieldTypeResolver ) 111 { 112 reset( delimiter, quote, types, strict, safe, null, null, fieldTypeResolver ); 113 } 114 115 public DelimitedParser( String delimiter, String quote, Class[] types, boolean strict, boolean safe, Fields sourceFields, Fields sinkFields ) 116 { 117 reset( delimiter, quote, types, strict, safe, sourceFields, sinkFields, null ); 118 } 119 120 public DelimitedParser( String delimiter, String quote, Class[] types, boolean strict, boolean safe, Fields sourceFields, Fields sinkFields, FieldTypeResolver fieldTypeResolver ) 121 { 122 reset( delimiter, quote, types, strict, safe, sourceFields, sinkFields, fieldTypeResolver ); 123 } 124 125 public void reset( Fields sourceFields, Fields sinkFields ) 126 { 127 reset( delimiter, quote, types, strict, safe, sourceFields, sinkFields, fieldTypeResolver ); 128 } 129 130 public void reset( String delimiter, String quote, Type[] types, boolean strict, boolean safe, Fields sourceFields, Fields sinkFields, FieldTypeResolver fieldTypeResolver ) 131 { 132 if( delimiter == null || delimiter.isEmpty() ) 133 throw new IllegalArgumentException( "delimiter may not be null or empty" ); 134 135 if( delimiter.equals( quote ) ) 136 throw new IllegalArgumentException( "delimiter and quote character may not be the same value, got: '" + delimiter + "'" ); 137 138 this.delimiter = delimiter; 139 this.strict = strict; 140 this.safe = safe; 141 this.fieldTypeResolver = fieldTypeResolver; 142 143 if( quote != null && !quote.isEmpty() ) // if empty, leave null 144 this.quote = quote; 145 146 if( types != null && types.length == 0 ) 147 this.types = null; 148 149 if( types != null ) 150 this.types = Arrays.copyOf( types, types.length ); 151 152 if( sourceFields == null || sinkFields == null ) 153 return; 154 155 if( types == null && sourceFields.hasTypes() ) 156 this.types = sourceFields.getTypes(); // gets a copy 157 158 this.sourceFields = sourceFields; 159 this.numValues = Math.max( sourceFields.size(), sinkFields.size() ); // if asymmetrical, one is zero 160 161 this.enforceStrict = this.strict; 162 163 if( sourceFields.isUnknown() ) 164 this.enforceStrict = false; 165 166 if( !sinkFields.isAll() && numValues == 0 ) 167 throw new IllegalArgumentException( "may not be zero declared fields, found: " + sinkFields.printVerbose() ); 168 169 splitPattern = createSplitPatternFor( this.delimiter, this.quote ); 170 cleanPattern = createCleanPatternFor( this.quote ); 171 escapePattern = createEscapePatternFor( this.quote ); 172 173 if( this.types != null && sinkFields.isAll() ) 174 throw new IllegalArgumentException( "when using Fields.ALL, field types may not be used" ); 175 176 if( this.types != null && this.types.length != sinkFields.size() ) 177 throw new IllegalArgumentException( "num of types must equal number of fields: " + sinkFields.printVerbose() + ", found: " + this.types.length ); 178 179 coercibles = Coercions.coercibleArray( this.numValues, this.types ); 180 } 181 182 public String getDelimiter() 183 { 184 return delimiter; 185 } 186 187 public String getQuote() 188 { 189 return quote; 190 } 191 192 /** 193 * Method createEscapePatternFor creates a regex {@link java.util.regex.Pattern} cleaning quote escapes from a String. 194 * <p> 195 * If {@code quote} is null or empty, a null value will be returned; 196 * 197 * @param quote of type String 198 * @return Pattern 199 */ 200 public Pattern createEscapePatternFor( String quote ) 201 { 202 if( quote == null || quote.isEmpty() ) 203 return null; 204 205 return Pattern.compile( String.format( ESCAPE_REGEX_FORMAT, quote ) ); 206 } 207 208 /** 209 * Method createCleanPatternFor creates a regex {@link java.util.regex.Pattern} for removing quote characters from a String. 210 * <p> 211 * If {@code quote} is null or empty, a null value will be returned; 212 * 213 * @param quote of type String 214 * @return Pattern 215 */ 216 public Pattern createCleanPatternFor( String quote ) 217 { 218 if( quote == null || quote.isEmpty() ) 219 return null; 220 221 return Pattern.compile( String.format( CLEAN_REGEX_FORMAT, quote ) ); 222 } 223 224 /** 225 * Method createSplitPatternFor creates a regex {@link java.util.regex.Pattern} for splitting a line of text into its component 226 * parts using the given delimiter and quote Strings. {@code quote} may be null. 227 * 228 * @param delimiter of type String 229 * @param quote of type String 230 * @return Pattern 231 */ 232 public Pattern createSplitPatternFor( String delimiter, String quote ) 233 { 234 String escapedDelimiter = delimiter.replaceAll( SPECIAL_REGEX_CHARS, "\\\\$1" ); 235 236 if( quote == null || quote.isEmpty() ) 237 return Pattern.compile( escapedDelimiter ); 238 else 239 return Pattern.compile( String.format( QUOTED_REGEX_FORMAT, quote, escapedDelimiter ) ); 240 } 241 242 /** 243 * Method createSplit will split the given {@code value} with the given {@code splitPattern}. 244 * 245 * @param value of type String 246 * @param splitPattern of type Pattern 247 * @param numValues of type int 248 * @return String[] 249 */ 250 public String[] createSplit( String value, Pattern splitPattern, int numValues ) 251 { 252 return splitPattern.split( value, numValues ); 253 } 254 255 /** 256 * Method cleanSplit will return a quote free array of String values, the given {@code split} array 257 * will be updated in place. 258 * <p> 259 * If {@code cleanPattern} is null, quote cleaning will not be performed, but all empty String values 260 * will be replaces with a {@code null} value. 261 * 262 * @param split of type Object[] 263 * @param cleanPattern of type Pattern 264 * @param escapePattern of type Pattern 265 * @param quote of type String 266 * @return Object[] as a convenience 267 */ 268 public Object[] cleanSplit( Object[] split, Pattern cleanPattern, Pattern escapePattern, String quote ) 269 { 270 if( cleanPattern != null ) 271 { 272 for( int i = 0; i < split.length; i++ ) 273 { 274 split[ i ] = cleanPattern.matcher( (String) split[ i ] ).replaceAll( "$1" ); 275 split[ i ] = escapePattern.matcher( (String) split[ i ] ).replaceAll( quote ); 276 } 277 } 278 279 for( int i = 0; i < split.length; i++ ) 280 { 281 if( ( (String) split[ i ] ).isEmpty() ) 282 split[ i ] = null; 283 } 284 285 return split; 286 } 287 288 public Fields parseFirstLine( FlowProcess flowProcess, Tap tap ) 289 { 290 Fields sourceFields; 291 TupleEntryIterator iterator = null; 292 293 try 294 { 295 if( !tap.resourceExists( flowProcess ) ) 296 throw new TapException( "unable to read fields from tap: " + tap + ", does not exist" ); 297 298 iterator = tap.openForRead( flowProcess ); 299 300 TupleEntry entry = iterator.hasNext() ? iterator.next() : null; 301 302 if( entry == null ) 303 throw new TapException( "unable to read fields from tap: " + tap + ", is empty" ); 304 305 Object[] result = onlyParseLine( entry.getTuple().getString( 0 ) ); // don't coerce if type info is avail 306 307 result = cleanParsedLine( result ); 308 309 Type[] inferred = inferTypes( result ); // infer type from field name, after removing quotes/escapes 310 311 result = cleanFields( result ); // clean field names to remove any meta-data or manage case 312 313 sourceFields = new Fields( Arrays.copyOf( result, result.length, Comparable[].class ) ); 314 315 if( inferred != null ) 316 sourceFields = sourceFields.applyTypes( inferred ); 317 } 318 catch( IOException exception ) 319 { 320 throw new TapException( "unable to read fields from tap: " + tap, exception ); 321 } 322 finally 323 { 324 if( iterator != null ) 325 { 326 try 327 { 328 iterator.close(); 329 } 330 catch( IOException exception ) 331 { 332 // do nothing 333 } 334 } 335 } 336 337 return sourceFields; 338 } 339 340 public Object[] parseLine( String line ) 341 { 342 Object[] split = onlyParseLine( line ); 343 344 split = cleanParsedLine( split ); 345 346 return coerceParsedLine( line, split ); 347 } 348 349 protected Object[] cleanParsedLine( Object[] split ) 350 { 351 return cleanSplit( split, cleanPattern, escapePattern, quote ); 352 } 353 354 protected Object[] coerceParsedLine( String line, Object[] split ) 355 { 356 if( types != null ) // forced null in ctor 357 { 358 Object[] result = new Object[ split.length ]; 359 360 for( int i = 0; i < split.length; i++ ) 361 { 362 try 363 { 364 result[ i ] = coercibles[ i ].canonical( split[ i ] ); 365 } 366 catch( Exception exception ) 367 { 368 result[ i ] = null; 369 370 if( !safe ) 371 throw new TapException( getSafeMessage( split[ i ], i ), exception, new Tuple( line ) ); // trap actual line data 372 373 if( LOG.isDebugEnabled() ) 374 LOG.debug( getSafeMessage( split[ i ], i ), exception ); 375 } 376 } 377 378 split = result; 379 } 380 381 return split; 382 } 383 384 private String getSafeMessage( Object object, int i ) 385 { 386 try 387 { 388 return "field " + sourceFields.get( i ) + " cannot be coerced from : " + object + " to: " + Util.getTypeName( types[ i ] ); 389 } 390 catch( Throwable throwable ) 391 { 392 // you may get an exception while composing the message (e.g. ArrayIndexOutOfBoundsException) 393 // use a generic string 394 return "field pos " + i + " cannot be coerced from: " + object + ", pos has no corresponding field name or coercion type"; 395 } 396 } 397 398 protected Object[] onlyParseLine( String line ) 399 { 400 Object[] split = createSplit( line, splitPattern, numValues == 0 ? 0 : -1 ); 401 402 if( numValues != 0 && split.length != numValues ) 403 { 404 if( enforceStrict ) 405 throw new TapException( getParseMessage( split ), new Tuple( line ) ); // trap actual line data 406 407 if( LOG.isDebugEnabled() ) 408 LOG.debug( getParseMessage( split ) ); 409 410 Object[] array = new Object[ numValues ]; 411 Arrays.fill( array, "" ); 412 System.arraycopy( split, 0, array, 0, Math.min( numValues, split.length ) ); 413 414 split = array; 415 } 416 417 return split; 418 } 419 420 private String getParseMessage( Object[] split ) 421 { 422 return "did not parse correct number of values from input data, expected: " + numValues + ", got: " + split.length + ":" + Util.join( ",", (String[]) split ); 423 } 424 425 public Appendable joinFirstLine( Iterable iterable, Appendable buffer ) 426 { 427 iterable = prepareFields( iterable ); 428 429 return joinLine( iterable, buffer ); 430 } 431 432 public Appendable joinLine( Iterable iterable, Appendable buffer ) 433 { 434 try 435 { 436 if( quote != null ) 437 return joinWithQuote( iterable, buffer ); 438 439 return joinNoQuote( iterable, buffer ); 440 } 441 catch( IOException exception ) 442 { 443 throw new TapException( "unable to append data", exception ); 444 } 445 } 446 447 protected Appendable joinWithQuote( Iterable tuple, Appendable buffer ) throws IOException 448 { 449 int count = 0; 450 451 for( Object value : tuple ) 452 { 453 if( count != 0 ) 454 buffer.append( delimiter ); 455 456 if( value != null ) 457 { 458 String valueString = value.toString(); 459 460 if( valueString.contains( quote ) ) 461 valueString = valueString.replaceAll( quote, quote + quote ); 462 463 if( valueString.contains( delimiter ) ) 464 valueString = quote + valueString + quote; 465 466 buffer.append( valueString ); 467 } 468 469 count++; 470 } 471 472 return buffer; 473 } 474 475 protected Appendable joinNoQuote( Iterable tuple, Appendable buffer ) throws IOException 476 { 477 int count = 0; 478 479 for( Object value : tuple ) 480 { 481 if( count != 0 ) 482 buffer.append( delimiter ); 483 484 if( value != null ) 485 buffer.append( value.toString() ); 486 487 count++; 488 } 489 490 return buffer; 491 } 492 493 protected Type[] inferTypes( Object[] result ) 494 { 495 if( fieldTypeResolver == null ) 496 return null; 497 498 Type[] inferred = new Type[ result.length ]; 499 500 for( int i = 0; i < result.length; i++ ) 501 { 502 String field = (String) result[ i ]; 503 504 inferred[ i ] = fieldTypeResolver.inferTypeFrom( i, field ); 505 } 506 507 return inferred; 508 } 509 510 protected Iterable prepareFields( Iterable fields ) 511 { 512 if( fieldTypeResolver == null ) 513 return fields; 514 515 List result = new ArrayList(); 516 517 for( Object field : fields ) 518 { 519 int index = result.size(); 520 Type type = types != null ? types[ index ] : null; 521 String value = fieldTypeResolver.prepareField( index, (String) field, type ); 522 523 if( value != null && !value.isEmpty() ) 524 field = value; 525 526 result.add( field ); 527 } 528 529 return result; 530 } 531 532 protected Object[] cleanFields( Object[] result ) 533 { 534 if( fieldTypeResolver == null ) 535 return result; 536 537 for( int i = 0; i < result.length; i++ ) 538 { 539 Type type = types != null ? types[ i ] : null; 540 String value = fieldTypeResolver.cleanField( i, (String) result[ i ], type ); 541 542 if( value != null && !value.isEmpty() ) 543 result[ i ] = value; 544 } 545 546 return result; 547 } 548 }