001/*
002 * Copyright (c) 2016-2017 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
004 *
005 * Project and contact information: http://www.cascading.org/
006 *
007 * This file is part of the Cascading project.
008 *
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *     http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 */
021
022package cascading.pipe.assembly;
023
024import java.beans.ConstructorProperties;
025import java.util.Comparator;
026import java.util.Map;
027
028import cascading.CascadingException;
029import cascading.flow.FlowProcess;
030import cascading.operation.BaseOperation;
031import cascading.operation.Filter;
032import cascading.operation.FilterCall;
033import cascading.operation.OperationCall;
034import cascading.operation.buffer.FirstNBuffer;
035import cascading.pipe.Each;
036import cascading.pipe.Every;
037import cascading.pipe.GroupBy;
038import cascading.pipe.Pipe;
039import cascading.pipe.SubAssembly;
040import cascading.provider.FactoryLoader;
041import cascading.tuple.Fields;
042import cascading.tuple.Tuple;
043import cascading.tuple.Tuples;
044import cascading.tuple.util.TupleHasher;
045import cascading.util.cache.BaseCacheFactory;
046import cascading.util.cache.CacheEvictionCallback;
047import cascading.util.cache.CascadingCache;
048
049/**
050 * Class Unique {@link SubAssembly} is used to filter all duplicates out of a tuple stream.
051 * <p>
052 * Typically finding unique value in a tuple stream relies on a {@link GroupBy} and a {@link FirstNBuffer}
053 * {@link cascading.operation.Buffer} operation.
054 * <p>
055 * If the {@code include} value is set to {@link Include#NO_NULLS}, any tuple consisting of only {@code null}
056 * values will be removed from the stream.
057 * <p>
058 * This SubAssembly uses the {@link FilterPartialDuplicates} {@link cascading.operation.Filter}
059 * to remove as many observed duplicates before the GroupBy operator to reduce IO over the network.
060 * <p>
061 * This strategy is similar to using {@code combiners}, except no sorting or serialization is invoked and results
062 * in a much simpler mechanism.
063 * <p>
064 * Unique uses a {@link cascading.util.cache.CascadingCache} or LRU to do the filtering. To tune the cache, set the
065 * {@code capacity} value to a high enough value to utilize available memory. Or set a default value via the
066 * {@link cascading.pipe.assembly.UniqueProps#UNIQUE_CACHE_CAPACITY} property. The current default is {@code 10, 000} unique keys.
067 * <p>
068 * The LRU cache is pluggable and defaults to {@link cascading.util.cache.LRUHashMapCache}. It can be changed
069 * by setting {@link cascading.pipe.assembly.UniqueProps#UNIQUE_CACHE_FACTORY} property to the name of a sub-class of
070 * {@link cascading.util.cache.BaseCacheFactory}.
071 * <p>
072 * The {@code capacity} value tells the underlying FilterPartialDuplicates how many values to cache for duplicate
073 * comparison before dropping values from the LRU cache.
074 *
075 * @see cascading.util.cache.LRUHashMapCacheFactory
076 * @see cascading.util.cache.DirectMappedCacheFactory
077 * @see cascading.util.cache.LRUHashMapCache
078 * @see cascading.util.cache.DirectMappedCache
079 */
080public class Unique extends SubAssembly
081  {
082
083  public enum Include
084    {
085      ALL,
086      NO_NULLS
087    }
088
089  public enum Cache
090    {
091      Num_Keys_Flushed,
092      Num_Keys_Hit,
093      Num_Keys_Missed
094    }
095
096  /**
097   * Class FilterPartialDuplicates is a {@link cascading.operation.Filter} that is used to remove observed duplicates from the tuple stream.
098   * <p>
099   * Use this class typically in tandem with a {@link cascading.operation.aggregator.First}
100   * {@link cascading.operation.Aggregator} in order to improve de-duping performance by removing as many values
101   * as possible before the intermediate {@link cascading.pipe.GroupBy} operator.
102   * <p>
103   * The {@code capacity} value is used to maintain a LRU of a constant size. If more than capacity unique values
104   * are seen, the oldest cached values will be removed from the cache.
105   *
106   * @see Unique
107   */
108  public static class FilterPartialDuplicates extends BaseOperation<CascadingCache<Tuple, Object>> implements Filter<CascadingCache<Tuple, Object>>
109    {
110    /** special null value for the caches, since a cache might not permit 'null' as a value */
111    private final static Object NULL_VALUE = new Object();
112
113    private int capacity = 0;
114    private Include include = Include.ALL;
115    private TupleHasher tupleHasher;
116
117    /** Constructor FilterPartialDuplicates creates a new FilterPartialDuplicates instance. */
118    public FilterPartialDuplicates()
119      {
120      }
121
122    /**
123     * Constructor FilterPartialDuplicates creates a new FilterPartialDuplicates instance.
124     *
125     * @param capacity of type int
126     */
127    @ConstructorProperties({"capacity"})
128    public FilterPartialDuplicates( int capacity )
129      {
130      this.capacity = capacity;
131      }
132
133    /**
134     * Constructor FilterPartialDuplicates creates a new FilterPartialDuplicates instance.
135     *
136     * @param include  of type Include
137     * @param capacity of type int
138     */
139    @ConstructorProperties({"include", "capacity"})
140    public FilterPartialDuplicates( Include include, int capacity )
141      {
142      this( include, capacity, null );
143      }
144
145    /**
146     * Constructor FilterPartialDuplicates creates a new FilterPartialDuplicates instance.
147     *
148     * @param capacity    of type int
149     * @param include     of type Include
150     * @param tupleHasher of type TupleHasher
151     */
152    @ConstructorProperties({"include", "capacity", "tupleHasher"})
153    public FilterPartialDuplicates( Include include, int capacity, TupleHasher tupleHasher )
154      {
155      this.capacity = capacity;
156      this.include = include == null ? this.include : include;
157      this.tupleHasher = tupleHasher;
158      }
159
160    @Override
161    public void prepare( final FlowProcess flowProcess, OperationCall<CascadingCache<Tuple, Object>> operationCall )
162      {
163      CacheEvictionCallback callback = new CacheEvictionCallback()
164        {
165        @Override
166        public void evict( Map.Entry entry )
167          {
168          flowProcess.increment( Cache.Num_Keys_Flushed, 1 );
169          }
170        };
171      FactoryLoader loader = FactoryLoader.getInstance();
172      BaseCacheFactory cacheFactory = loader.loadFactoryFrom( flowProcess, UniqueProps.UNIQUE_CACHE_FACTORY, UniqueProps.DEFAULT_CACHE_FACTORY_CLASS );
173
174      if( cacheFactory == null )
175        throw new CascadingException( "unable to load cache factory, please check your '" + UniqueProps.UNIQUE_CACHE_FACTORY + "' setting." );
176
177      CascadingCache cache = cacheFactory.create( flowProcess );
178      cache.setCacheEvictionCallback( callback );
179      Integer cacheCapacity = capacity;
180
181      if( capacity == 0 )
182        {
183        cacheCapacity = flowProcess.getIntegerProperty( UniqueProps.UNIQUE_CACHE_CAPACITY );
184
185        if( cacheCapacity == null )
186          cacheCapacity = UniqueProps.UNIQUE_DEFAULT_CAPACITY;
187        }
188
189      cache.setCapacity( cacheCapacity.intValue() );
190      cache.initialize();
191
192      operationCall.setContext( cache );
193      }
194
195    @Override
196    public boolean isRemove( FlowProcess flowProcess, FilterCall<CascadingCache<Tuple, Object>> filterCall )
197      {
198      // we assume its more painful to create lots of tuple copies vs comparisons
199      Tuple args = TupleHasher.wrapTuple( tupleHasher, filterCall.getArguments().getTuple() );
200
201      switch( include )
202        {
203        case ALL:
204          break;
205
206        case NO_NULLS:
207          if( Tuples.frequency( args, null ) == args.size() )
208            return true;
209
210          break;
211        }
212
213      if( filterCall.getContext().containsKey( args ) )
214        {
215        flowProcess.increment( Cache.Num_Keys_Hit, 1 );
216        return true;
217        }
218
219      // only do the copy here
220      filterCall.getContext().put( TupleHasher.wrapTuple( tupleHasher, filterCall.getArguments().getTupleCopy() ), NULL_VALUE );
221
222      flowProcess.increment( Cache.Num_Keys_Missed, 1 );
223
224      return false;
225      }
226
227    @Override
228    public void cleanup( FlowProcess flowProcess, OperationCall<CascadingCache<Tuple, Object>> operationCall )
229      {
230      operationCall.setContext( null );
231      }
232
233    @Override
234    public boolean equals( Object object )
235      {
236      if( this == object )
237        return true;
238      if( !( object instanceof FilterPartialDuplicates ) )
239        return false;
240      if( !super.equals( object ) )
241        return false;
242
243      FilterPartialDuplicates that = (FilterPartialDuplicates) object;
244
245      if( capacity != that.capacity )
246        return false;
247
248      return true;
249      }
250
251    @Override
252    public int hashCode()
253      {
254      int result = super.hashCode();
255      result = 31 * result + capacity;
256      return result;
257      }
258    }
259
260  /**
261   * Constructor Unique creates a new Unique instance.
262   *
263   * @param pipe         of type Pipe
264   * @param uniqueFields of type Fields
265   */
266  @ConstructorProperties({"pipe", "uniqueFields"})
267  public Unique( Pipe pipe, Fields uniqueFields )
268    {
269    this( null, pipe, uniqueFields );
270    }
271
272  /**
273   * Constructor Unique creates a new Unique instance.
274   *
275   * @param pipe         of type Pipe
276   * @param uniqueFields of type Fields
277   * @param include      of type Include
278   */
279  @ConstructorProperties({"pipe", "uniqueFields", "include"})
280  public Unique( Pipe pipe, Fields uniqueFields, Include include )
281    {
282    this( null, pipe, uniqueFields, include );
283    }
284
285  /**
286   * Constructor Unique creates a new Unique instance.
287   *
288   * @param pipe         of type Pipe
289   * @param uniqueFields of type Fields
290   * @param capacity     of type int
291   */
292  @ConstructorProperties({"pipe", "uniqueFields", "capacity"})
293  public Unique( Pipe pipe, Fields uniqueFields, int capacity )
294    {
295    this( null, pipe, uniqueFields, capacity );
296    }
297
298  /**
299   * Constructor Unique creates a new Unique instance.
300   *
301   * @param pipe         of type Pipe
302   * @param uniqueFields of type Fields
303   * @param include      of type Include
304   * @param capacity     of type int
305   */
306  @ConstructorProperties({"pipe", "uniqueFields", "include", "capacity"})
307  public Unique( Pipe pipe, Fields uniqueFields, Include include, int capacity )
308    {
309    this( null, pipe, uniqueFields, include, capacity );
310    }
311
312  /**
313   * Constructor Unique creates a new Unique instance.
314   *
315   * @param name         of type String
316   * @param pipe         of type Pipe
317   * @param uniqueFields of type Fields
318   */
319  @ConstructorProperties({"name", "pipe", "uniqueFields"})
320  public Unique( String name, Pipe pipe, Fields uniqueFields )
321    {
322    this( name, pipe, uniqueFields, null );
323    }
324
325  /**
326   * Constructor Unique creates a new Unique instance.
327   *
328   * @param name         of type String
329   * @param pipe         of type Pipe
330   * @param uniqueFields of type Fields
331   * @param include      of type Include
332   */
333  @ConstructorProperties({"name", "pipe", "uniqueFields", "include"})
334  public Unique( String name, Pipe pipe, Fields uniqueFields, Include include )
335    {
336    this( name, pipe, uniqueFields, include, 0 );
337    }
338
339  /**
340   * Constructor Unique creates a new Unique instance.
341   *
342   * @param name         of type String
343   * @param pipe         of type Pipe
344   * @param uniqueFields of type Fields
345   * @param capacity     of type int
346   */
347  @ConstructorProperties({"name", "pipe", "uniqueFields", "capacity"})
348  public Unique( String name, Pipe pipe, Fields uniqueFields, int capacity )
349    {
350    this( name, Pipe.pipes( pipe ), uniqueFields, capacity );
351    }
352
353  /**
354   * Constructor Unique creates a new Unique instance.
355   *
356   * @param name         of type String
357   * @param pipe         of type Pipe
358   * @param uniqueFields of type Fields
359   * @param include      of type Include
360   * @param capacity     of type int
361   */
362  @ConstructorProperties({"name", "pipe", "uniqueFields", "include", "capacity"})
363  public Unique( String name, Pipe pipe, Fields uniqueFields, Include include, int capacity )
364    {
365    this( name, Pipe.pipes( pipe ), uniqueFields, include, capacity );
366    }
367
368  /**
369   * Constructor Unique creates a new Unique instance.
370   *
371   * @param pipes        of type Pipe[]
372   * @param uniqueFields of type Fields
373   */
374  @ConstructorProperties({"pipes", "uniqueFields"})
375  public Unique( Pipe[] pipes, Fields uniqueFields )
376    {
377    this( null, pipes, uniqueFields );
378    }
379
380  /**
381   * Constructor Unique creates a new Unique instance.
382   *
383   * @param pipes        of type Pipe[]
384   * @param uniqueFields of type Fields
385   * @param include      of type Include
386   */
387  @ConstructorProperties({"pipes", "uniqueFields", "include"})
388  public Unique( Pipe[] pipes, Fields uniqueFields, Include include )
389    {
390    this( null, pipes, uniqueFields, include );
391    }
392
393  /**
394   * Constructor Unique creates a new Unique instance.
395   *
396   * @param pipes        of type Pipe[]
397   * @param uniqueFields of type Fields
398   * @param capacity     of type int
399   */
400  @ConstructorProperties({"pipes", "uniqueFields", "capacity"})
401  public Unique( Pipe[] pipes, Fields uniqueFields, int capacity )
402    {
403    this( null, pipes, uniqueFields, capacity );
404    }
405
406  /**
407   * Constructor Unique creates a new Unique instance.
408   *
409   * @param pipes        of type Pipe[]
410   * @param uniqueFields of type Fields
411   * @param include      of type Include
412   * @param capacity     of type int
413   */
414  @ConstructorProperties({"pipes", "uniqueFields", "include", "capacity"})
415  public Unique( Pipe[] pipes, Fields uniqueFields, Include include, int capacity )
416    {
417    this( null, pipes, uniqueFields, include, capacity );
418    }
419
420  /**
421   * Constructor Unique creates a new Unique instance.
422   *
423   * @param name         of type String
424   * @param pipes        of type Pipe[]
425   * @param uniqueFields of type Fields
426   */
427  @ConstructorProperties({"name", "pipes", "uniqueFields"})
428  public Unique( String name, Pipe[] pipes, Fields uniqueFields )
429    {
430    this( name, pipes, uniqueFields, null );
431    }
432
433  /**
434   * Constructor Unique creates a new Unique instance.
435   *
436   * @param name         of type String
437   * @param pipes        of type Pipe[]
438   * @param uniqueFields of type Fields
439   * @param include      of type Include
440   */
441  @ConstructorProperties({"name", "pipes", "uniqueFields", "include"})
442  public Unique( String name, Pipe[] pipes, Fields uniqueFields, Include include )
443    {
444    this( name, pipes, uniqueFields, include, 0 );
445    }
446
447  /**
448   * Constructor Unique creates a new Unique instance.
449   *
450   * @param name         of type String
451   * @param pipes        of type Pipe[]
452   * @param uniqueFields of type Fields
453   * @param capacity     of type int
454   */
455  @ConstructorProperties({"name", "pipes", "uniqueFields", "capacity"})
456  public Unique( String name, Pipe[] pipes, Fields uniqueFields, int capacity )
457    {
458    this( name, pipes, uniqueFields, null, capacity );
459    }
460
461  /**
462   * Constructor Unique creates a new Unique instance.
463   *
464   * @param name         of type String
465   * @param pipes        of type Pipe[]
466   * @param uniqueFields of type Fields
467   * @param capacity     of type int
468   */
469  @ConstructorProperties({"name", "pipes", "uniqueFields", "include", "capacity"})
470  public Unique( String name, Pipe[] pipes, Fields uniqueFields, Include include, int capacity )
471    {
472    super( pipes );
473
474    if( uniqueFields == null )
475      throw new IllegalArgumentException( "uniqueFields may not be null" );
476
477    Pipe[] filters = new Pipe[ pipes.length ];
478
479    TupleHasher tupleHasher = null;
480    Comparator[] comparators = uniqueFields.getComparators();
481
482    if( !TupleHasher.isNull( comparators ) )
483      tupleHasher = new TupleHasher( null, comparators );
484
485    FilterPartialDuplicates partialDuplicates = new FilterPartialDuplicates( include, capacity, tupleHasher );
486
487    for( int i = 0; i < filters.length; i++ )
488      filters[ i ] = new Each( pipes[ i ], uniqueFields, partialDuplicates );
489
490    Pipe pipe = new GroupBy( name, filters, uniqueFields );
491    pipe = new Every( pipe, Fields.ALL, new FirstNBuffer(), Fields.RESULTS );
492
493    setTails( pipe );
494    }
495  }