001/*
002 * Copyright (c) 2016-2018 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
004 *
005 * Project and contact information: http://www.cascading.org/
006 *
007 * This file is part of the Cascading project.
008 *
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *     http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 */
021
022package cascading.tap.partition;
023
024import java.io.IOException;
025import java.util.ArrayList;
026import java.util.HashSet;
027import java.util.Iterator;
028import java.util.LinkedHashMap;
029import java.util.List;
030import java.util.Map;
031import java.util.Set;
032
033import cascading.flow.Flow;
034import cascading.flow.FlowProcess;
035import cascading.operation.Filter;
036import cascading.scheme.Scheme;
037import cascading.scheme.SinkCall;
038import cascading.scheme.SourceCall;
039import cascading.tap.SinkMode;
040import cascading.tap.Tap;
041import cascading.tap.TapException;
042import cascading.tap.type.FileType;
043import cascading.tuple.Fields;
044import cascading.tuple.Tuple;
045import cascading.tuple.TupleEntry;
046import cascading.tuple.TupleEntryCollector;
047import cascading.tuple.TupleEntryIterableChainIterator;
048import cascading.tuple.TupleEntryIterator;
049import cascading.tuple.TupleEntrySchemeCollector;
050import cascading.tuple.TupleEntrySchemeIterator;
051import cascading.tuple.util.TupleViews;
052import org.slf4j.Logger;
053import org.slf4j.LoggerFactory;
054
055/**
056 *
057 */
058public abstract class BasePartitionTap<Config, Input, Output> extends Tap<Config, Input, Output> implements FileType<Config>
059  {
060  /** Field LOG */
061  private static final Logger LOG = LoggerFactory.getLogger( BasePartitionTap.class );
062  /** Field OPEN_FILES_THRESHOLD_DEFAULT */
063  protected static final int OPEN_WRITES_THRESHOLD_DEFAULT = 300;
064
065  private class PartitionIterator extends TupleEntryIterableChainIterator
066    {
067    public PartitionIterator( final FlowProcess<? extends Config> flowProcess, Input input ) throws IOException
068      {
069      super( getSourceFields() );
070
071      List<Iterator<Tuple>> iterators = new ArrayList<>();
072
073      if( input != null )
074        {
075        String identifier = parent.getFullIdentifier( flowProcess );
076        iterators.add( createPartitionEntryIterator( flowProcess, input, identifier, getCurrentIdentifier( flowProcess ) ) );
077        }
078      else
079        {
080        String[] childIdentifiers = getChildPartitionIdentifiers( flowProcess, false );
081
082        for( String childIdentifier : childIdentifiers )
083          iterators.add( createPartitionEntryIterator( flowProcess, null, parent.getIdentifier(), childIdentifier ) );
084        }
085
086      reset( iterators );
087      }
088
089    private PartitionTupleEntryIterator createPartitionEntryIterator( FlowProcess<? extends Config> flowProcess, Input input, String parentIdentifier, String childIdentifier ) throws IOException
090      {
091      TupleEntrySchemeIterator schemeIterator = createTupleEntrySchemeIterator( flowProcess, parent, childIdentifier, input );
092
093      return new PartitionTupleEntryIterator( getSourceFields(), partition, parentIdentifier, childIdentifier, schemeIterator );
094      }
095    }
096
097  public class PartitionCollector extends TupleEntryCollector
098    {
099    private final FlowProcess<? extends Config> flowProcess;
100    private final Config conf;
101    private final Fields parentFields;
102    private final Fields partitionFields;
103    private TupleEntry partitionEntry;
104    private final Tuple partitionTuple;
105    private final Tuple parentTuple;
106
107    public PartitionCollector( FlowProcess<? extends Config> flowProcess )
108      {
109      super( Fields.asDeclaration( getSinkFields() ) );
110      this.flowProcess = flowProcess;
111      this.conf = flowProcess.getConfigCopy();
112      this.parentFields = parent.getSinkFields();
113      this.partitionFields = ( (PartitionScheme) getScheme() ).partitionFields;
114      this.partitionEntry = new TupleEntry( this.partitionFields );
115
116      this.partitionTuple = TupleViews.createNarrow( getSinkFields().getPos( this.partitionFields ) );
117      this.parentTuple = TupleViews.createNarrow( getSinkFields().getPos( this.parentFields ) );
118
119      this.partitionEntry.setTuple( partitionTuple );
120      }
121
122    TupleEntryCollector getCollector( String path )
123      {
124      TupleEntryCollector collector = collectors.get( path );
125
126      if( collector != null )
127        return collector;
128
129      try
130        {
131        if( LOG.isDebugEnabled() )
132          LOG.debug( "creating collector for parent: {}, path: {}", parent.getFullIdentifier( conf ), path );
133
134        collector = createTupleEntrySchemeCollector( flowProcess, parent, path, openedCollectors );
135
136        openedCollectors++;
137        flowProcess.increment( Counters.Paths_Opened, 1 );
138        }
139      catch( IOException exception )
140        {
141        throw new TapException( "unable to open partition path: " + path, exception );
142        }
143
144      if( collectors.size() > openWritesThreshold )
145        purgeCollectors();
146
147      collectors.put( path, collector );
148
149      if( LOG.isInfoEnabled() && collectors.size() % 100 == 0 )
150        LOG.info( "caching {} open Taps", collectors.size() );
151
152      return collector;
153      }
154
155    private void purgeCollectors()
156      {
157      int numToClose = Math.max( 1, (int) ( openWritesThreshold * .10 ) );
158
159      if( LOG.isInfoEnabled() )
160        LOG.info( "removing {} open Taps from cache of size {}", numToClose, collectors.size() );
161
162      Set<String> removeKeys = new HashSet<>();
163      Set<String> keys = collectors.keySet();
164
165      for( String key : keys )
166        {
167        if( numToClose-- == 0 )
168          break;
169
170        removeKeys.add( key );
171        }
172
173      for( String removeKey : removeKeys )
174        {
175        closeCollector( removeKey );
176        collectors.remove( removeKey );
177        }
178
179      flowProcess.increment( Counters.Path_Purges, 1 );
180      }
181
182    @Override
183    public void close()
184      {
185      super.close();
186
187      try
188        {
189        for( String path : new ArrayList<>( collectors.keySet() ) )
190          closeCollector( path );
191        }
192      finally
193        {
194        collectors.clear();
195        }
196      }
197
198    public void closeCollector( String path )
199      {
200      TupleEntryCollector collector = collectors.get( path );
201      if( collector == null )
202        return;
203      try
204        {
205        collector.close();
206
207        flowProcess.increment( Counters.Paths_Closed, 1 );
208        }
209      catch( Exception exception )
210        {
211        LOG.error( "exception while closing TupleEntryCollector {}", path, exception );
212
213        boolean failOnError = false;
214        Object failProperty = flowProcess.getProperty( PartitionTapProps.FAIL_ON_CLOSE );
215
216        if( failProperty != null )
217          failOnError = Boolean.parseBoolean( failProperty.toString() );
218
219        if( failOnError )
220          throw new TapException( exception );
221        }
222      }
223
224    protected void collect( TupleEntry tupleEntry ) throws IOException
225      {
226      // reset select views
227      TupleViews.reset( partitionTuple, tupleEntry.getTuple() ); // partitionTuple is inside partitionEntry
228      TupleViews.reset( parentTuple, tupleEntry.getTuple() );
229
230      String path = partition.toPartition( partitionEntry );
231
232      getCollector( path ).add( parentTuple );
233      }
234    }
235
236  /** Field parent */
237  protected Tap parent;
238  /** Field partition */
239  protected Partition partition;
240  /** Field sourcePartitionFilters */
241  protected final List<PartitionTapFilter> sourcePartitionFilters = new ArrayList<>();
242  /** Field keepParentOnDelete */
243  protected boolean keepParentOnDelete = false;
244  /** Field openTapsThreshold */
245  protected int openWritesThreshold = OPEN_WRITES_THRESHOLD_DEFAULT;
246
247  /** Field openedCollectors */
248  private long openedCollectors = 0;
249  /** Field collectors */
250  private final Map<String, TupleEntryCollector> collectors = new LinkedHashMap<>( 1000, .75f, true );
251
252  protected abstract TupleEntrySchemeCollector createTupleEntrySchemeCollector( FlowProcess<? extends Config> flowProcess, Tap parent, String path, long sequence ) throws IOException;
253
254  protected abstract TupleEntrySchemeIterator createTupleEntrySchemeIterator( FlowProcess<? extends Config> flowProcess, Tap parent, String path, Input input ) throws IOException;
255
256  public enum Counters
257    {
258      Paths_Opened, Paths_Closed, Path_Purges
259    }
260
261  protected BasePartitionTap( Tap parent, Partition partition, int openWritesThreshold )
262    {
263    super( new PartitionScheme( parent.getScheme(), partition.getPartitionFields() ), parent.getSinkMode() );
264    this.parent = parent;
265    this.partition = partition;
266    this.openWritesThreshold = openWritesThreshold;
267    }
268
269  protected BasePartitionTap( Tap parent, Partition partition, SinkMode sinkMode )
270    {
271    super( new PartitionScheme( parent.getScheme(), partition.getPartitionFields() ), sinkMode );
272    this.parent = parent;
273    this.partition = partition;
274    }
275
276  protected BasePartitionTap( Tap parent, Partition partition, SinkMode sinkMode, boolean keepParentOnDelete, int openWritesThreshold )
277    {
278    super( new PartitionScheme( parent.getScheme(), partition.getPartitionFields() ), sinkMode );
279    this.parent = parent;
280    this.partition = partition;
281    this.keepParentOnDelete = keepParentOnDelete;
282    this.openWritesThreshold = openWritesThreshold;
283    }
284
285  /**
286   * Method getParent returns the parent Tap of this PartitionTap object.
287   *
288   * @return the parent (type Tap) of this PartitionTap object.
289   */
290  public Tap getParent()
291    {
292    return parent;
293    }
294
295  /**
296   * Method getPartition returns the {@link Partition} instance used by this PartitionTap
297   *
298   * @return the partition instance
299   */
300  public Partition getPartition()
301    {
302    return partition;
303    }
304
305  /**
306   * Method getChildPartitionIdentifiers returns an array of all identifiers for all available partitions.
307   * <p>
308   * This method is used internally to set all incoming paths, override to limit applicable partitions.
309   * <p>
310   * Note the returns array may be large.
311   *
312   * @param flowProcess    of type FlowProcess
313   * @param fullyQualified of type boolean
314   * @return a String[] of partition identifiers
315   * @throws IOException
316   */
317  public String[] getChildPartitionIdentifiers( FlowProcess<? extends Config> flowProcess, boolean fullyQualified ) throws IOException
318    {
319    String[] childIdentifiers = ( castFileType() ).getChildIdentifiers(
320      flowProcess.getConfig(),
321      partition.getPathDepth(),
322      fullyQualified
323    );
324
325    if( sourcePartitionFilters.isEmpty() )
326      return childIdentifiers;
327
328    return getFilteredPartitionIdentifiers( flowProcess, childIdentifiers );
329    }
330
331  protected String[] getFilteredPartitionIdentifiers( FlowProcess<? extends Config> flowProcess, String[] childIdentifiers )
332    {
333    Fields partitionFields = partition.getPartitionFields();
334    TupleEntry partitionEntry = new TupleEntry( partitionFields, Tuple.size( partitionFields.size() ) );
335
336    List<String> filteredIdentifiers = new ArrayList<>( childIdentifiers.length );
337
338    for( PartitionTapFilter filter : sourcePartitionFilters )
339      filter.prepare( flowProcess );
340
341    for( String childIdentifier : childIdentifiers )
342      {
343      partition.toTuple( childIdentifier.substring( parent.getFullIdentifier( flowProcess ).length() + 1 ), partitionEntry );
344
345      boolean isRemove = false;
346      for( PartitionTapFilter filter : sourcePartitionFilters )
347        {
348        if( filter.isRemove( flowProcess, partitionEntry ) )
349          {
350          isRemove = true;
351          break;
352          }
353        }
354
355      if( !isRemove )
356        filteredIdentifiers.add( childIdentifier );
357      }
358
359    for( PartitionTapFilter filter : sourcePartitionFilters )
360      filter.cleanup( flowProcess );
361
362    return filteredIdentifiers.toArray( new String[ filteredIdentifiers.size() ] );
363    }
364
365  /**
366   * Add a {@link Filter} with its associated argument selector when using this PartitionTap as a source. On read, each
367   * child identifier is converted to a {@link Tuple} using the provided {@link Partition}. Each {@link Filter} will be
368   * applied to the {@link Tuple} so that the input paths can be filtered to only accept those required for the
369   * {@link Flow}.
370   *
371   * @param argumentSelector field selector that selects Filter arguments from the input Tuple
372   * @param filter           Filter to be applied to each input Tuple
373   */
374  public void addSourcePartitionFilter( Fields argumentSelector, Filter filter )
375    {
376    Fields argumentFields;
377
378    if( argumentSelector.isAll() )
379      argumentFields = partition.getPartitionFields();
380    else
381      argumentFields = partition.getPartitionFields().select( argumentSelector );
382
383    sourcePartitionFilters.add( new PartitionTapFilter( argumentFields, filter ) );
384    }
385
386  @Override
387  public String getIdentifier()
388    {
389    return parent.getIdentifier();
390    }
391
392  protected abstract String getCurrentIdentifier( FlowProcess<? extends Config> flowProcess );
393
394  /**
395   * Method getOpenWritesThreshold returns the openTapsThreshold of this PartitionTap object.
396   *
397   * @return the openTapsThreshold (type int) of this PartitionTap object.
398   */
399  public int getOpenWritesThreshold()
400    {
401    return openWritesThreshold;
402    }
403
404  @Override
405  public TupleEntryCollector openForWrite( FlowProcess<? extends Config> flowProcess, Output output ) throws IOException
406    {
407    return new PartitionCollector( flowProcess );
408    }
409
410  @Override
411  public TupleEntryIterator openForRead( FlowProcess<? extends Config> flowProcess, Input input ) throws IOException
412    {
413    return new PartitionIterator( flowProcess, input );
414    }
415
416  @Override
417  public boolean createResource( Config conf ) throws IOException
418    {
419    return parent.createResource( conf );
420    }
421
422  @Override
423  public boolean deleteResource( Config conf ) throws IOException
424    {
425    return keepParentOnDelete || parent.deleteResource( conf );
426    }
427
428  @Override
429  public boolean prepareResourceForRead( Config conf ) throws IOException
430    {
431    return parent.prepareResourceForRead( conf );
432    }
433
434  @Override
435  public boolean prepareResourceForWrite( Config conf ) throws IOException
436    {
437    return parent.prepareResourceForWrite( conf );
438    }
439
440  @Override
441  public boolean commitResource( Config conf ) throws IOException
442    {
443    return parent.commitResource( conf );
444    }
445
446  @Override
447  public boolean rollbackResource( Config conf ) throws IOException
448    {
449    return parent.rollbackResource( conf );
450    }
451
452  @Override
453  public boolean resourceExists( Config conf ) throws IOException
454    {
455    return parent.resourceExists( conf );
456    }
457
458  @Override
459  public long getModifiedTime( Config conf ) throws IOException
460    {
461    return parent.getModifiedTime( conf );
462    }
463
464  @Override
465  public boolean isDirectory( FlowProcess<? extends Config> flowProcess ) throws IOException
466    {
467    return castFileType().isDirectory( flowProcess );
468    }
469
470  @Override
471  public boolean isDirectory( Config conf ) throws IOException
472    {
473    return castFileType().isDirectory( conf );
474    }
475
476  @Override
477  public String[] getChildIdentifiers( FlowProcess<? extends Config> flowProcess ) throws IOException
478    {
479    return castFileType().getChildIdentifiers( flowProcess );
480    }
481
482  @Override
483  public String[] getChildIdentifiers( Config conf ) throws IOException
484    {
485    return castFileType().getChildIdentifiers( conf );
486    }
487
488  @Override
489  public String[] getChildIdentifiers( FlowProcess<? extends Config> flowProcess, int depth, boolean fullyQualified ) throws IOException
490    {
491    return castFileType().getChildIdentifiers( flowProcess, depth, fullyQualified );
492    }
493
494  @Override
495  public String[] getChildIdentifiers( Config conf, int depth, boolean fullyQualified ) throws IOException
496    {
497    return castFileType().getChildIdentifiers( conf, depth, fullyQualified );
498    }
499
500  @Override
501  public long getSize( FlowProcess<? extends Config> flowProcess ) throws IOException
502    {
503    return castFileType().getSize( flowProcess );
504    }
505
506  @Override
507  public long getSize( Config conf ) throws IOException
508    {
509    return castFileType().getSize( conf );
510    }
511
512  protected FileType<Config> castFileType()
513    {
514    if( parent instanceof FileType )
515      return (FileType<Config>) parent;
516
517    throw new UnsupportedOperationException( "parent is not an implementation of " + FileType.class.getName() + ", is type: " + parent.getClass().getName() );
518    }
519
520  @Override
521  public boolean equals( Object object )
522    {
523    if( this == object )
524      return true;
525    if( object == null || getClass() != object.getClass() )
526      return false;
527    if( !super.equals( object ) )
528      return false;
529
530    BasePartitionTap that = (BasePartitionTap) object;
531
532    if( parent != null ? !parent.equals( that.parent ) : that.parent != null )
533      return false;
534    if( partition != null ? !partition.equals( that.partition ) : that.partition != null )
535      return false;
536    if( partition != null ? !sourcePartitionFilters.equals( that.sourcePartitionFilters ) : that.sourcePartitionFilters != null )
537      return false;
538
539    return true;
540    }
541
542  @Override
543  public int hashCode()
544    {
545    int result = super.hashCode();
546    result = 31 * result + ( parent != null ? parent.hashCode() : 0 );
547    result = 31 * result + ( partition != null ? partition.hashCode() : 0 );
548    result = 31 * result + ( sourcePartitionFilters != null ? sourcePartitionFilters.hashCode() : 0 );
549    return result;
550    }
551
552  @Override
553  public String toString()
554    {
555    return getClass().getSimpleName() + "[\"" + parent + "\"]" + "[\"" + partition + "\"]" + "[\"" + sourcePartitionFilters + "\"]";
556    }
557
558  public static class PartitionScheme<Config, Input, Output> extends Scheme<Config, Input, Output, Void, Void>
559    {
560    private final Scheme scheme;
561    private final Fields partitionFields;
562
563    public PartitionScheme( Scheme scheme )
564      {
565      this.scheme = scheme;
566      this.partitionFields = null;
567      }
568
569    public PartitionScheme( Scheme scheme, Fields partitionFields )
570      {
571      this.scheme = scheme;
572
573      if( partitionFields == null || partitionFields.isAll() )
574        this.partitionFields = null;
575      else if( partitionFields.isDefined() )
576        this.partitionFields = partitionFields;
577      else
578        throw new IllegalArgumentException( "partitionFields must be defined or the ALL substitution, got: " + partitionFields.printVerbose() );
579      }
580
581    public Fields getSinkFields()
582      {
583      if( partitionFields == null || scheme.getSinkFields().isAll() )
584        return scheme.getSinkFields();
585
586      return Fields.merge( scheme.getSinkFields(), partitionFields );
587      }
588
589    public void setSinkFields( Fields sinkFields )
590      {
591      scheme.setSinkFields( sinkFields );
592      }
593
594    @Override
595    public Fields retrieveSourceFields( FlowProcess<? extends Config> flowProcess, Tap tap )
596      {
597      return scheme.retrieveSourceFields( flowProcess, tap );
598      }
599
600    @Override
601    public Fields retrieveSinkFields( FlowProcess<? extends Config> flowProcess, Tap tap )
602      {
603      return scheme.retrieveSinkFields( flowProcess, tap );
604      }
605
606    public Fields getSourceFields()
607      {
608      if( partitionFields == null || scheme.getSourceFields().isUnknown() )
609        return scheme.getSourceFields();
610
611      return Fields.merge( scheme.getSourceFields(), partitionFields );
612      }
613
614    public void setSourceFields( Fields sourceFields )
615      {
616      scheme.setSourceFields( sourceFields );
617      }
618
619    public int getNumSinkParts()
620      {
621      return scheme.getNumSinkParts();
622      }
623
624    public void setNumSinkParts( int numSinkParts )
625      {
626      scheme.setNumSinkParts( numSinkParts );
627      }
628
629    @Override
630    public void sourceConfInit( FlowProcess<? extends Config> flowProcess, Tap<Config, Input, Output> tap, Config conf )
631      {
632      scheme.sourceConfInit( flowProcess, tap, conf );
633      }
634
635    @Override
636    public void sourcePrepare( FlowProcess<? extends Config> flowProcess, SourceCall<Void, Input> sourceCall ) throws IOException
637      {
638      scheme.sourcePrepare( flowProcess, sourceCall );
639      }
640
641    @Override
642    public boolean source( FlowProcess<? extends Config> flowProcess, SourceCall<Void, Input> sourceCall ) throws IOException
643      {
644      throw new UnsupportedOperationException( "should never be called" );
645      }
646
647    @Override
648    public void sourceCleanup( FlowProcess<? extends Config> flowProcess, SourceCall<Void, Input> sourceCall ) throws IOException
649      {
650      scheme.sourceCleanup( flowProcess, sourceCall );
651      }
652
653    @Override
654    public void sinkConfInit( FlowProcess<? extends Config> flowProcess, Tap<Config, Input, Output> tap, Config conf )
655      {
656      scheme.sinkConfInit( flowProcess, tap, conf );
657      }
658
659    @Override
660    public void sinkPrepare( FlowProcess<? extends Config> flowProcess, SinkCall<Void, Output> sinkCall ) throws IOException
661      {
662      scheme.sinkPrepare( flowProcess, sinkCall );
663      }
664
665    @Override
666    public void sink( FlowProcess<? extends Config> flowProcess, SinkCall<Void, Output> sinkCall ) throws IOException
667      {
668      throw new UnsupportedOperationException( "should never be called" );
669      }
670
671    @Override
672    public void sinkCleanup( FlowProcess<? extends Config> flowProcess, SinkCall<Void, Output> sinkCall ) throws IOException
673      {
674      scheme.sinkCleanup( flowProcess, sinkCall );
675      }
676    }
677  }