001 /* 002 * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.scheme.util; 022 023 import java.io.IOException; 024 import java.io.Serializable; 025 import java.lang.reflect.Type; 026 import java.util.ArrayList; 027 import java.util.Arrays; 028 import java.util.List; 029 import java.util.regex.Pattern; 030 031 import cascading.flow.FlowProcess; 032 import cascading.tap.Tap; 033 import cascading.tap.TapException; 034 import cascading.tuple.Fields; 035 import cascading.tuple.Tuple; 036 import cascading.tuple.TupleEntry; 037 import cascading.tuple.TupleEntryIterator; 038 import cascading.tuple.coerce.Coercions; 039 import cascading.tuple.type.CoercibleType; 040 import cascading.util.Util; 041 import org.slf4j.Logger; 042 import org.slf4j.LoggerFactory; 043 044 /** 045 * Class DelimitedParser is a base class for parsing text delimited files. 046 * <p/> 047 * It maybe sub-classed to change its behavior. 048 * <p/> 049 * The interface {@link FieldTypeResolver} maybe used to clean and prepare field names 050 * for data columns, and to infer type information from column names. 051 */ 052 public class DelimitedParser implements Serializable 053 { 054 /** Field LOG */ 055 private static final Logger LOG = LoggerFactory.getLogger( DelimitedParser.class ); 056 057 /** Field SPECIAL_REGEX_CHARS */ 058 static final String SPECIAL_REGEX_CHARS = "([\\]\\[|.*<>\\\\$^?()=!+])"; 059 /** Field QUOTED_REGEX_FORMAT */ 060 static final String QUOTED_REGEX_FORMAT = "%2$s(?=(?:[^%1$s]*%1$s[^%1$s]*[^%1$s%2$s]*%1$s)*(?![^%1$s]*%1$s))"; 061 /** Field CLEAN_REGEX_FORMAT */ 062 static final String CLEAN_REGEX_FORMAT = "^(?:%1$s)(.*)(?:%1$s)$"; 063 /** Field ESCAPE_REGEX_FORMAT */ 064 static final String ESCAPE_REGEX_FORMAT = "(%1$s%1$s)"; 065 066 /** Field sourceFields */ 067 protected Fields sourceFields; 068 069 /** Field splitPattern */ 070 protected Pattern splitPattern; 071 /** Field cleanPattern */ 072 protected Pattern cleanPattern; 073 /** Field escapePattern */ 074 protected Pattern escapePattern; 075 /** Field delimiter * */ 076 protected String delimiter; 077 /** Field quote */ 078 protected String quote; 079 /** Field strict */ 080 protected boolean strict = true; // need to cache value across resets 081 /** Field enforceStrict */ 082 protected boolean enforceStrict = true; 083 /** Field numValues */ 084 protected int numValues; 085 /** Field types */ 086 protected Type[] types; 087 /** Fields coercibles */ 088 protected CoercibleType[] coercibles; 089 /** Field safe */ 090 protected boolean safe = true; 091 /** fieldTypeResolver */ 092 protected FieldTypeResolver fieldTypeResolver; 093 094 public DelimitedParser( String delimiter, String quote, Class[] types ) 095 { 096 reset( delimiter, quote, types, strict, safe, null, null, null ); 097 } 098 099 public DelimitedParser( String delimiter, String quote, Class[] types, boolean strict, boolean safe ) 100 { 101 reset( delimiter, quote, types, strict, safe, null, null, null ); 102 } 103 104 public DelimitedParser( String delimiter, String quote, FieldTypeResolver fieldTypeResolver ) 105 { 106 reset( delimiter, quote, null, strict, safe, null, null, fieldTypeResolver ); 107 } 108 109 public DelimitedParser( String delimiter, String quote, Class[] types, boolean strict, boolean safe, FieldTypeResolver fieldTypeResolver ) 110 { 111 reset( delimiter, quote, types, strict, safe, null, null, fieldTypeResolver ); 112 } 113 114 public DelimitedParser( String delimiter, String quote, Class[] types, boolean strict, boolean safe, Fields sourceFields, Fields sinkFields ) 115 { 116 reset( delimiter, quote, types, strict, safe, sourceFields, sinkFields, null ); 117 } 118 119 public DelimitedParser( String delimiter, String quote, Class[] types, boolean strict, boolean safe, Fields sourceFields, Fields sinkFields, FieldTypeResolver fieldTypeResolver ) 120 { 121 reset( delimiter, quote, types, strict, safe, sourceFields, sinkFields, fieldTypeResolver ); 122 } 123 124 public void reset( Fields sourceFields, Fields sinkFields ) 125 { 126 reset( delimiter, quote, types, strict, safe, sourceFields, sinkFields, fieldTypeResolver ); 127 } 128 129 public void reset( String delimiter, String quote, Type[] types, boolean strict, boolean safe, Fields sourceFields, Fields sinkFields, FieldTypeResolver fieldTypeResolver ) 130 { 131 if( delimiter == null || delimiter.isEmpty() ) 132 throw new IllegalArgumentException( "delimiter may not be null or empty" ); 133 134 if( delimiter.equals( quote ) ) 135 throw new IllegalArgumentException( "delimiter and quote character may not be the same value, got: '" + delimiter + "'" ); 136 137 this.delimiter = delimiter; 138 this.strict = strict; 139 this.safe = safe; 140 this.fieldTypeResolver = fieldTypeResolver; 141 142 if( quote != null && !quote.isEmpty() ) // if empty, leave null 143 this.quote = quote; 144 145 if( types != null && types.length == 0 ) 146 this.types = null; 147 148 if( types != null ) 149 this.types = Arrays.copyOf( types, types.length ); 150 151 if( sourceFields == null || sinkFields == null ) 152 return; 153 154 if( types == null && sourceFields.hasTypes() ) 155 this.types = sourceFields.getTypes(); // gets a copy 156 157 this.sourceFields = sourceFields; 158 this.numValues = Math.max( sourceFields.size(), sinkFields.size() ); // if asymmetrical, one is zero 159 160 this.enforceStrict = this.strict; 161 162 if( sourceFields.isUnknown() ) 163 this.enforceStrict = false; 164 165 if( !sinkFields.isAll() && numValues == 0 ) 166 throw new IllegalArgumentException( "may not be zero declared fields, found: " + sinkFields.printVerbose() ); 167 168 splitPattern = createSplitPatternFor( this.delimiter, this.quote ); 169 cleanPattern = createCleanPatternFor( this.quote ); 170 escapePattern = createEscapePatternFor( this.quote ); 171 172 if( this.types != null && sinkFields.isAll() ) 173 throw new IllegalArgumentException( "when using Fields.ALL, field types may not be used" ); 174 175 if( this.types != null && this.types.length != sinkFields.size() ) 176 throw new IllegalArgumentException( "num of types must equal number of fields: " + sinkFields.printVerbose() + ", found: " + this.types.length ); 177 178 coercibles = Coercions.coercibleArray( this.numValues, this.types ); 179 } 180 181 public String getDelimiter() 182 { 183 return delimiter; 184 } 185 186 public String getQuote() 187 { 188 return quote; 189 } 190 191 /** 192 * Method createEscapePatternFor creates a regex {@link java.util.regex.Pattern} cleaning quote escapes from a String. 193 * <p/> 194 * If {@code quote} is null or empty, a null value will be returned; 195 * 196 * @param quote of type String 197 * @return Pattern 198 */ 199 public Pattern createEscapePatternFor( String quote ) 200 { 201 if( quote == null || quote.isEmpty() ) 202 return null; 203 204 return Pattern.compile( String.format( ESCAPE_REGEX_FORMAT, quote ) ); 205 } 206 207 /** 208 * Method createCleanPatternFor creates a regex {@link java.util.regex.Pattern} for removing quote characters from a String. 209 * <p/> 210 * If {@code quote} is null or empty, a null value will be returned; 211 * 212 * @param quote of type String 213 * @return Pattern 214 */ 215 public Pattern createCleanPatternFor( String quote ) 216 { 217 if( quote == null || quote.isEmpty() ) 218 return null; 219 220 return Pattern.compile( String.format( CLEAN_REGEX_FORMAT, quote ) ); 221 } 222 223 /** 224 * Method createSplitPatternFor creates a regex {@link java.util.regex.Pattern} for splitting a line of text into its component 225 * parts using the given delimiter and quote Strings. {@code quote} may be null. 226 * 227 * @param delimiter of type String 228 * @param quote of type String 229 * @return Pattern 230 */ 231 public Pattern createSplitPatternFor( String delimiter, String quote ) 232 { 233 String escapedDelimiter = delimiter.replaceAll( SPECIAL_REGEX_CHARS, "\\\\$1" ); 234 235 if( quote == null || quote.isEmpty() ) 236 return Pattern.compile( escapedDelimiter ); 237 else 238 return Pattern.compile( String.format( QUOTED_REGEX_FORMAT, quote, escapedDelimiter ) ); 239 } 240 241 /** 242 * Method createSplit will split the given {@code value} with the given {@code splitPattern}. 243 * 244 * @param value of type String 245 * @param splitPattern of type Pattern 246 * @param numValues of type int 247 * @return String[] 248 */ 249 public String[] createSplit( String value, Pattern splitPattern, int numValues ) 250 { 251 return splitPattern.split( value, numValues ); 252 } 253 254 /** 255 * Method cleanSplit will return a quote free array of String values, the given {@code split} array 256 * will be updated in place. 257 * <p/> 258 * If {@code cleanPattern} is null, quote cleaning will not be performed, but all empty String values 259 * will be replaces with a {@code null} value. 260 * 261 * @param split of type Object[] 262 * @param cleanPattern of type Pattern 263 * @param escapePattern of type Pattern 264 * @param quote of type String 265 * @return Object[] as a convenience 266 */ 267 public Object[] cleanSplit( Object[] split, Pattern cleanPattern, Pattern escapePattern, String quote ) 268 { 269 if( cleanPattern != null ) 270 { 271 for( int i = 0; i < split.length; i++ ) 272 { 273 split[ i ] = cleanPattern.matcher( (String) split[ i ] ).replaceAll( "$1" ); 274 split[ i ] = escapePattern.matcher( (String) split[ i ] ).replaceAll( quote ); 275 } 276 } 277 278 for( int i = 0; i < split.length; i++ ) 279 { 280 if( ( (String) split[ i ] ).isEmpty() ) 281 split[ i ] = null; 282 } 283 284 return split; 285 } 286 287 public Fields parseFirstLine( FlowProcess flowProcess, Tap tap ) 288 { 289 Fields sourceFields; 290 TupleEntryIterator iterator = null; 291 292 try 293 { 294 if( !tap.resourceExists( flowProcess.getConfigCopy() ) ) 295 throw new TapException( "unable to read fields from tap: " + tap + ", does not exist" ); 296 297 iterator = tap.openForRead( flowProcess ); 298 299 TupleEntry entry = iterator.hasNext() ? iterator.next() : null; 300 301 if( entry == null ) 302 throw new TapException( "unable to read fields from tap: " + tap + ", is empty" ); 303 304 Object[] result = onlyParseLine( entry.getTuple().getString( 0 ) ); // don't coerce if type info is avail 305 306 result = cleanParsedLine( result ); 307 308 Type[] inferred = inferTypes( result ); // infer type from field name, after removing quotes/escapes 309 310 result = cleanFields( result ); // clean field names to remove any meta-data or manage case 311 312 sourceFields = new Fields( Arrays.copyOf( result, result.length, Comparable[].class ) ); 313 314 if( inferred != null ) 315 sourceFields = sourceFields.applyTypes( inferred ); 316 } 317 catch( IOException exception ) 318 { 319 throw new TapException( "unable to read fields from tap: " + tap, exception ); 320 } 321 finally 322 { 323 if( iterator != null ) 324 { 325 try 326 { 327 iterator.close(); 328 } 329 catch( IOException exception ) 330 { 331 // do nothing 332 } 333 } 334 } 335 336 return sourceFields; 337 } 338 339 public Object[] parseLine( String line ) 340 { 341 Object[] split = onlyParseLine( line ); 342 343 split = cleanParsedLine( split ); 344 345 return coerceParsedLine( line, split ); 346 } 347 348 protected Object[] cleanParsedLine( Object[] split ) 349 { 350 return cleanSplit( split, cleanPattern, escapePattern, quote ); 351 } 352 353 protected Object[] coerceParsedLine( String line, Object[] split ) 354 { 355 if( types != null ) // forced null in ctor 356 { 357 Object[] result = new Object[ split.length ]; 358 359 for( int i = 0; i < split.length; i++ ) 360 { 361 try 362 { 363 result[ i ] = coercibles[ i ].canonical( split[ i ] ); 364 } 365 catch( Exception exception ) 366 { 367 result[ i ] = null; 368 369 if( !safe ) 370 throw new TapException( getSafeMessage( split[ i ], i ), exception, new Tuple( line ) ); // trap actual line data 371 372 if( LOG.isDebugEnabled() ) 373 LOG.debug( getSafeMessage( split[ i ], i ), exception ); 374 } 375 } 376 377 split = result; 378 } 379 380 return split; 381 } 382 383 private String getSafeMessage( Object object, int i ) 384 { 385 try 386 { 387 return "field " + sourceFields.get( i ) + " cannot be coerced from : " + object + " to: " + Util.getTypeName( types[ i ] ); 388 } 389 catch( Throwable throwable ) 390 { 391 // you may get an exception while composing the message (e.g. ArrayIndexOutOfBoundsException) 392 // use a generic string 393 return "field pos " + i + " cannot be coerced from: " + object + ", pos has no corresponding field name or coercion type"; 394 } 395 } 396 397 protected Object[] onlyParseLine( String line ) 398 { 399 Object[] split = createSplit( line, splitPattern, numValues == 0 ? 0 : -1 ); 400 401 if( numValues != 0 && split.length != numValues ) 402 { 403 if( enforceStrict ) 404 throw new TapException( getParseMessage( split ), new Tuple( line ) ); // trap actual line data 405 406 if( LOG.isDebugEnabled() ) 407 LOG.debug( getParseMessage( split ) ); 408 409 Object[] array = new Object[ numValues ]; 410 Arrays.fill( array, "" ); 411 System.arraycopy( split, 0, array, 0, Math.min( numValues, split.length ) ); 412 413 split = array; 414 } 415 416 return split; 417 } 418 419 private String getParseMessage( Object[] split ) 420 { 421 return "did not parse correct number of values from input data, expected: " + numValues + ", got: " + split.length + ":" + Util.join( ",", (String[]) split ); 422 } 423 424 public Appendable joinFirstLine( Iterable iterable, Appendable buffer ) 425 { 426 iterable = prepareFields( iterable ); 427 428 return joinLine( iterable, buffer ); 429 } 430 431 public Appendable joinLine( Iterable iterable, Appendable buffer ) 432 { 433 try 434 { 435 if( quote != null ) 436 return joinWithQuote( iterable, buffer ); 437 438 return joinNoQuote( iterable, buffer ); 439 } 440 catch( IOException exception ) 441 { 442 throw new TapException( "unable to append data", exception ); 443 } 444 } 445 446 protected Appendable joinWithQuote( Iterable tuple, Appendable buffer ) throws IOException 447 { 448 int count = 0; 449 450 for( Object value : tuple ) 451 { 452 if( count != 0 ) 453 buffer.append( delimiter ); 454 455 if( value != null ) 456 { 457 String valueString = value.toString(); 458 459 if( valueString.contains( quote ) ) 460 valueString = valueString.replaceAll( quote, quote + quote ); 461 462 if( valueString.contains( delimiter ) ) 463 valueString = quote + valueString + quote; 464 465 buffer.append( valueString ); 466 } 467 468 count++; 469 } 470 471 return buffer; 472 } 473 474 protected Appendable joinNoQuote( Iterable tuple, Appendable buffer ) throws IOException 475 { 476 int count = 0; 477 478 for( Object value : tuple ) 479 { 480 if( count != 0 ) 481 buffer.append( delimiter ); 482 483 if( value != null ) 484 buffer.append( value.toString() ); 485 486 count++; 487 } 488 489 return buffer; 490 } 491 492 protected Type[] inferTypes( Object[] result ) 493 { 494 if( fieldTypeResolver == null ) 495 return null; 496 497 Type[] inferred = new Type[ result.length ]; 498 499 for( int i = 0; i < result.length; i++ ) 500 { 501 String field = (String) result[ i ]; 502 503 inferred[ i ] = fieldTypeResolver.inferTypeFrom( i, field ); 504 } 505 506 return inferred; 507 } 508 509 protected Iterable prepareFields( Iterable fields ) 510 { 511 if( fieldTypeResolver == null ) 512 return fields; 513 514 List result = new ArrayList(); 515 516 for( Object field : fields ) 517 { 518 int index = result.size(); 519 Type type = types != null ? types[ index ] : null; 520 String value = fieldTypeResolver.prepareField( index, (String) field, type ); 521 522 if( value != null && !value.isEmpty() ) 523 field = value; 524 525 result.add( field ); 526 } 527 528 return result; 529 } 530 531 protected Object[] cleanFields( Object[] result ) 532 { 533 if( fieldTypeResolver == null ) 534 return result; 535 536 for( int i = 0; i < result.length; i++ ) 537 { 538 Type type = types != null ? types[ i ] : null; 539 String value = fieldTypeResolver.cleanField( i, (String) result[ i ], type ); 540 541 if( value != null && !value.isEmpty() ) 542 result[ i ] = value; 543 } 544 545 return result; 546 } 547 }