001/*
002 * Copyright (c) 2016-2017 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
004 *
005 * Project and contact information: http://www.cascading.org/
006 *
007 * This file is part of the Cascading project.
008 *
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *     http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 */
021
022package cascading.scheme.util;
023
024import java.io.IOException;
025import java.io.Serializable;
026import java.lang.reflect.Type;
027import java.util.ArrayList;
028import java.util.Arrays;
029import java.util.List;
030import java.util.regex.Pattern;
031
032import cascading.flow.FlowProcess;
033import cascading.tap.Tap;
034import cascading.tap.TapException;
035import cascading.tuple.Fields;
036import cascading.tuple.Tuple;
037import cascading.tuple.TupleEntry;
038import cascading.tuple.TupleEntryIterator;
039import cascading.tuple.coerce.Coercions;
040import cascading.tuple.type.CoercibleType;
041import cascading.util.Util;
042import org.slf4j.Logger;
043import org.slf4j.LoggerFactory;
044
045/**
046 * Class DelimitedParser is a base class for parsing text delimited files.
047 * <p>
048 * It maybe sub-classed to change its behavior.
049 * <p>
050 * The interface {@link FieldTypeResolver} maybe used to clean and prepare field names
051 * for data columns, and to infer type information from column names.
052 */
053public class DelimitedParser implements Serializable
054  {
055  /** Field LOG */
056  private static final Logger LOG = LoggerFactory.getLogger( DelimitedParser.class );
057
058  /** Field SPECIAL_REGEX_CHARS */
059  static final String SPECIAL_REGEX_CHARS = "([\\]\\[|.*<>\\\\$^?()=!+])";
060  /** Field QUOTED_REGEX_FORMAT */
061  static final String QUOTED_REGEX_FORMAT = "%2$s(?=(?:[^%1$s]*%1$s[^%1$s]*[^%1$s%2$s]*%1$s)*(?![^%1$s]*%1$s))";
062  /** Field CLEAN_REGEX_FORMAT */
063  static final String CLEAN_REGEX_FORMAT = "^(?:%1$s)(.*)(?:%1$s)$";
064  /** Field ESCAPE_REGEX_FORMAT */
065  static final String ESCAPE_REGEX_FORMAT = "(%1$s%1$s)";
066
067  /** Field sourceFields */
068  protected Fields sourceFields;
069
070  /** Field splitPattern */
071  protected Pattern splitPattern;
072  /** Field cleanPattern */
073  protected Pattern cleanPattern;
074  /** Field escapePattern */
075  protected Pattern escapePattern;
076  /** Field delimiter * */
077  protected String delimiter;
078  /** Field quote */
079  protected String quote;
080  /** Field strict */
081  protected boolean strict = true; // need to cache value across resets
082  /** Field enforceStrict */
083  protected boolean enforceStrict = true;
084  /** Field numValues */
085  protected int numValues;
086  /** Field types */
087  protected Type[] types;
088  /** Fields coercibles */
089  protected CoercibleType[] coercibles;
090  /** Field safe */
091  protected boolean safe = true;
092  /** fieldTypeResolver */
093  protected FieldTypeResolver fieldTypeResolver;
094
095  public DelimitedParser( String delimiter, String quote, Class[] types )
096    {
097    reset( delimiter, quote, types, strict, safe, null, null, null );
098    }
099
100  public DelimitedParser( String delimiter, String quote, Class[] types, boolean strict, boolean safe )
101    {
102    reset( delimiter, quote, types, strict, safe, null, null, null );
103    }
104
105  public DelimitedParser( String delimiter, String quote, FieldTypeResolver fieldTypeResolver )
106    {
107    reset( delimiter, quote, null, strict, safe, null, null, fieldTypeResolver );
108    }
109
110  public DelimitedParser( String delimiter, String quote, Class[] types, boolean strict, boolean safe, FieldTypeResolver fieldTypeResolver )
111    {
112    reset( delimiter, quote, types, strict, safe, null, null, fieldTypeResolver );
113    }
114
115  public DelimitedParser( String delimiter, String quote, Class[] types, boolean strict, boolean safe, Fields sourceFields, Fields sinkFields )
116    {
117    reset( delimiter, quote, types, strict, safe, sourceFields, sinkFields, null );
118    }
119
120  public DelimitedParser( String delimiter, String quote, Class[] types, boolean strict, boolean safe, Fields sourceFields, Fields sinkFields, FieldTypeResolver fieldTypeResolver )
121    {
122    reset( delimiter, quote, types, strict, safe, sourceFields, sinkFields, fieldTypeResolver );
123    }
124
125  public void reset( Fields sourceFields, Fields sinkFields )
126    {
127    reset( delimiter, quote, types, strict, safe, sourceFields, sinkFields, fieldTypeResolver );
128    }
129
130  public void reset( String delimiter, String quote, Type[] types, boolean strict, boolean safe, Fields sourceFields, Fields sinkFields, FieldTypeResolver fieldTypeResolver )
131    {
132    if( delimiter == null || delimiter.isEmpty() )
133      throw new IllegalArgumentException( "delimiter may not be null or empty" );
134
135    if( delimiter.equals( quote ) )
136      throw new IllegalArgumentException( "delimiter and quote character may not be the same value, got: '" + delimiter + "'" );
137
138    this.delimiter = delimiter;
139    this.strict = strict;
140    this.safe = safe;
141    this.fieldTypeResolver = fieldTypeResolver;
142
143    if( quote != null && !quote.isEmpty() ) // if empty, leave null
144      this.quote = quote;
145
146    if( types != null && types.length == 0 )
147      this.types = null;
148
149    if( types != null )
150      this.types = Arrays.copyOf( types, types.length );
151
152    if( sourceFields == null || sinkFields == null )
153      return;
154
155    if( types == null && sourceFields.hasTypes() )
156      this.types = sourceFields.getTypes(); // gets a copy
157
158    this.sourceFields = sourceFields;
159    this.numValues = Math.max( sourceFields.size(), sinkFields.size() ); // if asymmetrical, one is zero
160
161    this.enforceStrict = this.strict;
162
163    if( sourceFields.isUnknown() )
164      this.enforceStrict = false;
165
166    if( !sinkFields.isAll() && numValues == 0 )
167      throw new IllegalArgumentException( "may not be zero declared fields, found: " + sinkFields.printVerbose() );
168
169    splitPattern = createSplitPatternFor( this.delimiter, this.quote );
170    cleanPattern = createCleanPatternFor( this.quote );
171    escapePattern = createEscapePatternFor( this.quote );
172
173    if( this.types != null && sinkFields.isAll() )
174      throw new IllegalArgumentException( "when using Fields.ALL, field types may not be used" );
175
176    if( this.types != null && this.types.length != sinkFields.size() )
177      throw new IllegalArgumentException( "num of types must equal number of fields: " + sinkFields.printVerbose() + ", found: " + this.types.length );
178
179    coercibles = Coercions.coercibleArray( this.numValues, this.types );
180    }
181
182  public String getDelimiter()
183    {
184    return delimiter;
185    }
186
187  public String getQuote()
188    {
189    return quote;
190    }
191
192  /**
193   * Method createEscapePatternFor creates a regex {@link java.util.regex.Pattern} cleaning quote escapes from a String.
194   * <p>
195   * If {@code quote} is null or empty, a null value will be returned;
196   *
197   * @param quote of type String
198   * @return Pattern
199   */
200  public Pattern createEscapePatternFor( String quote )
201    {
202    if( quote == null || quote.isEmpty() )
203      return null;
204
205    return Pattern.compile( String.format( ESCAPE_REGEX_FORMAT, quote ) );
206    }
207
208  /**
209   * Method createCleanPatternFor creates a regex {@link java.util.regex.Pattern} for removing quote characters from a String.
210   * <p>
211   * If {@code quote} is null or empty, a null value will be returned;
212   *
213   * @param quote of type String
214   * @return Pattern
215   */
216  public Pattern createCleanPatternFor( String quote )
217    {
218    if( quote == null || quote.isEmpty() )
219      return null;
220
221    return Pattern.compile( String.format( CLEAN_REGEX_FORMAT, quote ) );
222    }
223
224  /**
225   * Method createSplitPatternFor creates a regex {@link java.util.regex.Pattern} for splitting a line of text into its component
226   * parts using the given delimiter and quote Strings. {@code quote} may be null.
227   *
228   * @param delimiter of type String
229   * @param quote     of type String
230   * @return Pattern
231   */
232  public Pattern createSplitPatternFor( String delimiter, String quote )
233    {
234    String escapedDelimiter = delimiter.replaceAll( SPECIAL_REGEX_CHARS, "\\\\$1" );
235
236    if( quote == null || quote.isEmpty() )
237      return Pattern.compile( escapedDelimiter );
238    else
239      return Pattern.compile( String.format( QUOTED_REGEX_FORMAT, quote, escapedDelimiter ) );
240    }
241
242  /**
243   * Method createSplit will split the given {@code value} with the given {@code splitPattern}.
244   *
245   * @param value        of type String
246   * @param splitPattern of type Pattern
247   * @param numValues    of type int
248   * @return String[]
249   */
250  public String[] createSplit( String value, Pattern splitPattern, int numValues )
251    {
252    return splitPattern.split( value, numValues );
253    }
254
255  /**
256   * Method cleanSplit will return a quote free array of String values, the given {@code split} array
257   * will be updated in place.
258   * <p>
259   * If {@code cleanPattern} is null, quote cleaning will not be performed, but all empty String values
260   * will be replaces with a {@code null} value.
261   *
262   * @param split         of type Object[]
263   * @param cleanPattern  of type Pattern
264   * @param escapePattern of type Pattern
265   * @param quote         of type String
266   * @return Object[] as a convenience
267   */
268  public Object[] cleanSplit( Object[] split, Pattern cleanPattern, Pattern escapePattern, String quote )
269    {
270    if( cleanPattern != null )
271      {
272      for( int i = 0; i < split.length; i++ )
273        {
274        split[ i ] = cleanPattern.matcher( (String) split[ i ] ).replaceAll( "$1" );
275        split[ i ] = escapePattern.matcher( (String) split[ i ] ).replaceAll( quote );
276        }
277      }
278
279    for( int i = 0; i < split.length; i++ )
280      {
281      if( ( (String) split[ i ] ).isEmpty() )
282        split[ i ] = null;
283      }
284
285    return split;
286    }
287
288  public Fields parseFirstLine( FlowProcess flowProcess, Tap tap )
289    {
290    Fields sourceFields;
291    TupleEntryIterator iterator = null;
292
293    try
294      {
295      if( !tap.resourceExists( flowProcess ) )
296        throw new TapException( "unable to read fields from tap: " + tap + ", does not exist" );
297
298      iterator = tap.openForRead( flowProcess );
299
300      TupleEntry entry = iterator.hasNext() ? iterator.next() : null;
301
302      if( entry == null )
303        throw new TapException( "unable to read fields from tap: " + tap + ", is empty" );
304
305      Object[] result = onlyParseLine( entry.getTuple().getString( 0 ) ); // don't coerce if type info is avail
306
307      result = cleanParsedLine( result );
308
309      Type[] inferred = inferTypes( result ); // infer type from field name, after removing quotes/escapes
310
311      result = cleanFields( result ); // clean field names to remove any meta-data or manage case
312
313      sourceFields = new Fields( Arrays.copyOf( result, result.length, Comparable[].class ) );
314
315      if( inferred != null )
316        sourceFields = sourceFields.applyTypes( inferred );
317      }
318    catch( IOException exception )
319      {
320      throw new TapException( "unable to read fields from tap: " + tap, exception );
321      }
322    finally
323      {
324      if( iterator != null )
325        {
326        try
327          {
328          iterator.close();
329          }
330        catch( IOException exception )
331          {
332          // do nothing
333          }
334        }
335      }
336
337    return sourceFields;
338    }
339
340  public Object[] parseLine( String line )
341    {
342    Object[] split = onlyParseLine( line );
343
344    split = cleanParsedLine( split );
345
346    return coerceParsedLine( line, split );
347    }
348
349  protected Object[] cleanParsedLine( Object[] split )
350    {
351    return cleanSplit( split, cleanPattern, escapePattern, quote );
352    }
353
354  protected Object[] coerceParsedLine( String line, Object[] split )
355    {
356    if( types != null ) // forced null in ctor
357      {
358      Object[] result = new Object[ split.length ];
359
360      for( int i = 0; i < split.length; i++ )
361        {
362        try
363          {
364          result[ i ] = coercibles[ i ].canonical( split[ i ] );
365          }
366        catch( Exception exception )
367          {
368          result[ i ] = null;
369
370          if( !safe )
371            throw new TapException( getSafeMessage( split[ i ], i ), exception, new Tuple( line ) ); // trap actual line data
372
373          if( LOG.isDebugEnabled() )
374            LOG.debug( getSafeMessage( split[ i ], i ), exception );
375          }
376        }
377
378      split = result;
379      }
380
381    return split;
382    }
383
384  private String getSafeMessage( Object object, int i )
385    {
386    try
387      {
388      return "field " + sourceFields.get( i ) + " cannot be coerced from : " + object + " to: " + Util.getTypeName( types[ i ] );
389      }
390    catch( Throwable throwable )
391      {
392      // you may get an exception while composing the message (e.g. ArrayIndexOutOfBoundsException)
393      // use a generic string
394      return "field pos " + i + " cannot be coerced from: " + object + ", pos has no corresponding field name or coercion type";
395      }
396    }
397
398  protected Object[] onlyParseLine( String line )
399    {
400    Object[] split = createSplit( line, splitPattern, numValues == 0 ? 0 : -1 );
401
402    if( numValues != 0 && split.length != numValues )
403      {
404      if( enforceStrict )
405        throw new TapException( getParseMessage( split ), new Tuple( line ) ); // trap actual line data
406
407      if( LOG.isDebugEnabled() )
408        LOG.debug( getParseMessage( split ) );
409
410      Object[] array = new Object[ numValues ];
411      Arrays.fill( array, "" );
412      System.arraycopy( split, 0, array, 0, Math.min( numValues, split.length ) );
413
414      split = array;
415      }
416
417    return split;
418    }
419
420  private String getParseMessage( Object[] split )
421    {
422    return "did not parse correct number of values from input data, expected: " + numValues + ", got: " + split.length + ":" + Util.join( ",", (String[]) split );
423    }
424
425  public Appendable joinFirstLine( Iterable iterable, Appendable buffer )
426    {
427    iterable = prepareFields( iterable );
428
429    return joinLine( iterable, buffer );
430    }
431
432  public Appendable joinLine( Iterable iterable, Appendable buffer )
433    {
434    try
435      {
436      if( quote != null )
437        return joinWithQuote( iterable, buffer );
438
439      return joinNoQuote( iterable, buffer );
440      }
441    catch( IOException exception )
442      {
443      throw new TapException( "unable to append data", exception );
444      }
445    }
446
447  protected Appendable joinWithQuote( Iterable tuple, Appendable buffer ) throws IOException
448    {
449    int count = 0;
450
451    for( Object value : tuple )
452      {
453      if( count != 0 )
454        buffer.append( delimiter );
455
456      if( value != null )
457        {
458        String valueString = value.toString();
459
460        if( valueString.contains( quote ) )
461          valueString = valueString.replaceAll( quote, quote + quote );
462
463        if( valueString.contains( delimiter ) )
464          valueString = quote + valueString + quote;
465
466        buffer.append( valueString );
467        }
468
469      count++;
470      }
471
472    return buffer;
473    }
474
475  protected Appendable joinNoQuote( Iterable tuple, Appendable buffer ) throws IOException
476    {
477    int count = 0;
478
479    for( Object value : tuple )
480      {
481      if( count != 0 )
482        buffer.append( delimiter );
483
484      if( value != null )
485        buffer.append( value.toString() );
486
487      count++;
488      }
489
490    return buffer;
491    }
492
493  protected Type[] inferTypes( Object[] result )
494    {
495    if( fieldTypeResolver == null )
496      return null;
497
498    Type[] inferred = new Type[ result.length ];
499
500    for( int i = 0; i < result.length; i++ )
501      {
502      String field = (String) result[ i ];
503
504      inferred[ i ] = fieldTypeResolver.inferTypeFrom( i, field );
505      }
506
507    return inferred;
508    }
509
510  protected Iterable prepareFields( Iterable fields )
511    {
512    if( fieldTypeResolver == null )
513      return fields;
514
515    List result = new ArrayList();
516
517    for( Object field : fields )
518      {
519      int index = result.size();
520      Type type = types != null ? types[ index ] : null;
521      String value = fieldTypeResolver.prepareField( index, (String) field, type );
522
523      if( value != null && !value.isEmpty() )
524        field = value;
525
526      result.add( field );
527      }
528
529    return result;
530    }
531
532  protected Object[] cleanFields( Object[] result )
533    {
534    if( fieldTypeResolver == null )
535      return result;
536
537    for( int i = 0; i < result.length; i++ )
538      {
539      Type type = types != null ? types[ i ] : null;
540      String value = fieldTypeResolver.cleanField( i, (String) result[ i ], type );
541
542      if( value != null && !value.isEmpty() )
543        result[ i ] = value;
544      }
545
546    return result;
547    }
548  }