001    /*
002     * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.pipe.assembly;
022    
023    import java.beans.ConstructorProperties;
024    import java.util.Comparator;
025    import java.util.Map;
026    
027    import cascading.CascadingException;
028    import cascading.flow.FlowProcess;
029    import cascading.operation.BaseOperation;
030    import cascading.operation.Filter;
031    import cascading.operation.FilterCall;
032    import cascading.operation.OperationCall;
033    import cascading.operation.buffer.FirstNBuffer;
034    import cascading.pipe.Each;
035    import cascading.pipe.Every;
036    import cascading.pipe.GroupBy;
037    import cascading.pipe.Pipe;
038    import cascading.pipe.SubAssembly;
039    import cascading.provider.FactoryLoader;
040    import cascading.tuple.Fields;
041    import cascading.tuple.Tuple;
042    import cascading.tuple.Tuples;
043    import cascading.tuple.util.TupleHasher;
044    import cascading.util.cache.BaseCacheFactory;
045    import cascading.util.cache.CacheEvictionCallback;
046    import cascading.util.cache.CascadingCache;
047    
048    /**
049     * Class Unique {@link SubAssembly} is used to filter all duplicates out of a tuple stream.
050     * <p/>
051     * Typically finding unique value in a tuple stream relies on a {@link GroupBy} and a {@link FirstNBuffer}
052     * {@link cascading.operation.Buffer} operation.
053     * <p/>
054     * If the {@code include} value is set to {@link Include#NO_NULLS}, any tuple consisting of only {@code null}
055     * values will be removed from the stream.
056     * <p/>
057     * This SubAssembly uses the {@link FilterPartialDuplicates} {@link cascading.operation.Filter}
058     * to remove as many observed duplicates before the GroupBy operator to reduce IO over the network.
059     * <p/>
060     * This strategy is similar to using {@code combiners}, except no sorting or serialization is invoked and results
061     * in a much simpler mechanism.
062     * <p/>
063     * Unique uses a {@link cascading.util.cache.CascadingCache} or LRU to do the filtering. To tune the cache, set the
064     * {@code capacity} value to a high enough value to utilize available memory. Or set a default value via the
065     * {@link cascading.pipe.assembly.UniqueProps#UNIQUE_CACHE_CAPACITY} property. The current default is {@code 10, 000} unique keys.
066     * <p/>
067     * The LRU cache is pluggable and defaults to {@link cascading.util.cache.LRUHashMapCache}. It can be changed
068     * by setting {@link cascading.pipe.assembly.UniqueProps#UNIQUE_CACHE_FACTORY} property to the name of a sub-class of
069     * {@link cascading.util.cache.BaseCacheFactory}.
070     * <p/>
071     * The {@code capacity} value tells the underlying FilterPartialDuplicates how many values to cache for duplicate
072     * comparison before dropping values from the LRU cache.
073     *
074     * @see cascading.util.cache.LRUHashMapCacheFactory
075     * @see cascading.util.cache.DirectMappedCacheFactory
076     * @see cascading.util.cache.LRUHashMapCache
077     * @see cascading.util.cache.DirectMappedCache
078     */
079    public class Unique extends SubAssembly
080      {
081    
082      public enum Include
083        {
084          ALL,
085          NO_NULLS
086        }
087    
088      public enum Cache
089        {
090          Num_Keys_Flushed,
091          Num_Keys_Hit,
092          Num_Keys_Missed
093        }
094    
095      /**
096       * Class FilterPartialDuplicates is a {@link cascading.operation.Filter} that is used to remove observed duplicates from the tuple stream.
097       * <p/>
098       * Use this class typically in tandem with a {@link cascading.operation.aggregator.First}
099       * {@link cascading.operation.Aggregator} in order to improve de-duping performance by removing as many values
100       * as possible before the intermediate {@link cascading.pipe.GroupBy} operator.
101       * <p/>
102       * The {@code capacity} value is used to maintain a LRU of a constant size. If more than capacity unique values
103       * are seen, the oldest cached values will be removed from the cache.
104       *
105       * @see Unique
106       */
107      public static class FilterPartialDuplicates extends BaseOperation<CascadingCache<Tuple, Object>> implements Filter<CascadingCache<Tuple, Object>>
108        {
109        /** special null value for the caches, since a cache might not permit 'null' as a value */
110        private final static Object NULL_VALUE = new Object();
111    
112        private int capacity = 0;
113        private Include include = Include.ALL;
114        private TupleHasher tupleHasher;
115    
116        /** Constructor FilterPartialDuplicates creates a new FilterPartialDuplicates instance. */
117        public FilterPartialDuplicates()
118          {
119          }
120    
121        /**
122         * Constructor FilterPartialDuplicates creates a new FilterPartialDuplicates instance.
123         *
124         * @param capacity of type int
125         */
126        @ConstructorProperties({"capacity"})
127        public FilterPartialDuplicates( int capacity )
128          {
129          this.capacity = capacity;
130          }
131    
132        /**
133         * Constructor FilterPartialDuplicates creates a new FilterPartialDuplicates instance.
134         *
135         * @param include  of type Include
136         * @param capacity of type int
137         */
138        @ConstructorProperties({"include", "capacity"})
139        public FilterPartialDuplicates( Include include, int capacity )
140          {
141          this( include, capacity, null );
142          }
143    
144        /**
145         * Constructor FilterPartialDuplicates creates a new FilterPartialDuplicates instance.
146         *
147         * @param capacity    of type int
148         * @param include     of type Include
149         * @param tupleHasher of type TupleHasher
150         */
151        @ConstructorProperties({"include", "capacity", "tupleHasher"})
152        public FilterPartialDuplicates( Include include, int capacity, TupleHasher tupleHasher )
153          {
154          this.capacity = capacity;
155          this.include = include == null ? this.include : include;
156          this.tupleHasher = tupleHasher;
157          }
158    
159        @Override
160        public void prepare( final FlowProcess flowProcess, OperationCall<CascadingCache<Tuple, Object>> operationCall )
161          {
162          CacheEvictionCallback callback = new CacheEvictionCallback()
163          {
164          @Override
165          public void evict( Map.Entry entry )
166            {
167            flowProcess.increment( Cache.Num_Keys_Flushed, 1 );
168            }
169          };
170          FactoryLoader loader = FactoryLoader.getInstance();
171          BaseCacheFactory cacheFactory = loader.loadFactoryFrom( flowProcess, UniqueProps.UNIQUE_CACHE_FACTORY, UniqueProps.DEFAULT_CACHE_FACTORY_CLASS );
172          if( cacheFactory == null )
173            throw new CascadingException( "unable to load cache factory, please check your '" + UniqueProps.UNIQUE_CACHE_FACTORY + "' setting." );
174    
175          CascadingCache cache = cacheFactory.create( flowProcess );
176          cache.setCacheEvictionCallback( callback );
177          Integer cacheCapacity = capacity;
178          if( capacity == 0 )
179            {
180            cacheCapacity = flowProcess.getIntegerProperty( UniqueProps.UNIQUE_CACHE_CAPACITY );
181            if( cacheCapacity == null )
182              cacheCapacity = UniqueProps.UNIQUE_DEFAULT_CAPACITY;
183            }
184          cache.setCapacity( cacheCapacity.intValue() );
185          cache.initialize();
186    
187          operationCall.setContext( cache );
188          }
189    
190        @Override
191        public boolean isRemove( FlowProcess flowProcess, FilterCall<CascadingCache<Tuple, Object>> filterCall )
192          {
193          // we assume its more painful to create lots of tuple copies vs comparisons
194          Tuple args = TupleHasher.wrapTuple( tupleHasher, filterCall.getArguments().getTuple() );
195    
196          switch( include )
197            {
198            case ALL:
199              break;
200    
201            case NO_NULLS:
202              if( Tuples.frequency( args, null ) == args.size() )
203                return true;
204    
205              break;
206            }
207    
208          if( filterCall.getContext().containsKey( args ) )
209            {
210            flowProcess.increment( Cache.Num_Keys_Hit, 1 );
211            return true;
212            }
213    
214          // only do the copy here
215          filterCall.getContext().put( TupleHasher.wrapTuple( tupleHasher, filterCall.getArguments().getTupleCopy() ), NULL_VALUE );
216    
217          flowProcess.increment( Cache.Num_Keys_Missed, 1 );
218    
219          return false;
220          }
221    
222        @Override
223        public void cleanup( FlowProcess flowProcess, OperationCall<CascadingCache<Tuple, Object>> operationCall )
224          {
225          operationCall.setContext( null );
226          }
227    
228        @Override
229        public boolean equals( Object object )
230          {
231          if( this == object )
232            return true;
233          if( !( object instanceof FilterPartialDuplicates ) )
234            return false;
235          if( !super.equals( object ) )
236            return false;
237    
238          FilterPartialDuplicates that = (FilterPartialDuplicates) object;
239    
240          if( capacity != that.capacity )
241            return false;
242    
243          return true;
244          }
245    
246        @Override
247        public int hashCode()
248          {
249          int result = super.hashCode();
250          result = 31 * result + capacity;
251          return result;
252          }
253        }
254    
255      /**
256       * Constructor Unique creates a new Unique instance.
257       *
258       * @param pipe         of type Pipe
259       * @param uniqueFields of type Fields
260       */
261      @ConstructorProperties({"pipe", "uniqueFields"})
262      public Unique( Pipe pipe, Fields uniqueFields )
263        {
264        this( null, pipe, uniqueFields );
265        }
266    
267      /**
268       * Constructor Unique creates a new Unique instance.
269       *
270       * @param pipe         of type Pipe
271       * @param uniqueFields of type Fields
272       * @param include      of type Include
273       */
274      @ConstructorProperties({"pipe", "uniqueFields", "include"})
275      public Unique( Pipe pipe, Fields uniqueFields, Include include )
276        {
277        this( null, pipe, uniqueFields, include );
278        }
279    
280      /**
281       * Constructor Unique creates a new Unique instance.
282       *
283       * @param pipe         of type Pipe
284       * @param uniqueFields of type Fields
285       * @param capacity     of type int
286       */
287      @ConstructorProperties({"pipe", "uniqueFields", "capacity"})
288      public Unique( Pipe pipe, Fields uniqueFields, int capacity )
289        {
290        this( null, pipe, uniqueFields, capacity );
291        }
292    
293      /**
294       * Constructor Unique creates a new Unique instance.
295       *
296       * @param pipe         of type Pipe
297       * @param uniqueFields of type Fields
298       * @param include      of type Include
299       * @param capacity     of type int
300       */
301      @ConstructorProperties({"pipe", "uniqueFields", "include", "capacity"})
302      public Unique( Pipe pipe, Fields uniqueFields, Include include, int capacity )
303        {
304        this( null, pipe, uniqueFields, include, capacity );
305        }
306    
307      /**
308       * Constructor Unique creates a new Unique instance.
309       *
310       * @param name         of type String
311       * @param pipe         of type Pipe
312       * @param uniqueFields of type Fields
313       */
314      @ConstructorProperties({"name", "pipe", "uniqueFields"})
315      public Unique( String name, Pipe pipe, Fields uniqueFields )
316        {
317        this( name, pipe, uniqueFields, null );
318        }
319    
320      /**
321       * Constructor Unique creates a new Unique instance.
322       *
323       * @param name         of type String
324       * @param pipe         of type Pipe
325       * @param uniqueFields of type Fields
326       * @param include      of type Include
327       */
328      @ConstructorProperties({"name", "pipe", "uniqueFields", "include"})
329      public Unique( String name, Pipe pipe, Fields uniqueFields, Include include )
330        {
331        this( name, pipe, uniqueFields, include, 0 );
332        }
333    
334      /**
335       * Constructor Unique creates a new Unique instance.
336       *
337       * @param name         of type String
338       * @param pipe         of type Pipe
339       * @param uniqueFields of type Fields
340       * @param capacity     of type int
341       */
342      @ConstructorProperties({"name", "pipe", "uniqueFields", "capacity"})
343      public Unique( String name, Pipe pipe, Fields uniqueFields, int capacity )
344        {
345        this( name, Pipe.pipes( pipe ), uniqueFields, capacity );
346        }
347    
348      /**
349       * Constructor Unique creates a new Unique instance.
350       *
351       * @param name         of type String
352       * @param pipe         of type Pipe
353       * @param uniqueFields of type Fields
354       * @param include      of type Include
355       * @param capacity     of type int
356       */
357      @ConstructorProperties({"name", "pipe", "uniqueFields", "include", "capacity"})
358      public Unique( String name, Pipe pipe, Fields uniqueFields, Include include, int capacity )
359        {
360        this( name, Pipe.pipes( pipe ), uniqueFields, include, capacity );
361        }
362    
363      /**
364       * Constructor Unique creates a new Unique instance.
365       *
366       * @param pipes        of type Pipe[]
367       * @param uniqueFields of type Fields
368       */
369      @ConstructorProperties({"pipes", "uniqueFields"})
370      public Unique( Pipe[] pipes, Fields uniqueFields )
371        {
372        this( null, pipes, uniqueFields );
373        }
374    
375      /**
376       * Constructor Unique creates a new Unique instance.
377       *
378       * @param pipes        of type Pipe[]
379       * @param uniqueFields of type Fields
380       * @param include      of type Include
381       */
382      @ConstructorProperties({"pipes", "uniqueFields", "include"})
383      public Unique( Pipe[] pipes, Fields uniqueFields, Include include )
384        {
385        this( null, pipes, uniqueFields, include );
386        }
387    
388      /**
389       * Constructor Unique creates a new Unique instance.
390       *
391       * @param pipes        of type Pipe[]
392       * @param uniqueFields of type Fields
393       * @param capacity     of type int
394       */
395      @ConstructorProperties({"pipes", "uniqueFields", "capacity"})
396      public Unique( Pipe[] pipes, Fields uniqueFields, int capacity )
397        {
398        this( null, pipes, uniqueFields, capacity );
399        }
400    
401      /**
402       * Constructor Unique creates a new Unique instance.
403       *
404       * @param pipes        of type Pipe[]
405       * @param uniqueFields of type Fields
406       * @param include      of type Include
407       * @param capacity     of type int
408       */
409      @ConstructorProperties({"pipes", "uniqueFields", "include", "capacity"})
410      public Unique( Pipe[] pipes, Fields uniqueFields, Include include, int capacity )
411        {
412        this( null, pipes, uniqueFields, include, capacity );
413        }
414    
415      /**
416       * Constructor Unique creates a new Unique instance.
417       *
418       * @param name         of type String
419       * @param pipes        of type Pipe[]
420       * @param uniqueFields of type Fields
421       */
422      @ConstructorProperties({"name", "pipes", "uniqueFields"})
423      public Unique( String name, Pipe[] pipes, Fields uniqueFields )
424        {
425        this( name, pipes, uniqueFields, null );
426        }
427    
428      /**
429       * Constructor Unique creates a new Unique instance.
430       *
431       * @param name         of type String
432       * @param pipes        of type Pipe[]
433       * @param uniqueFields of type Fields
434       * @param include      of type Include
435       */
436      @ConstructorProperties({"name", "pipes", "uniqueFields", "include"})
437      public Unique( String name, Pipe[] pipes, Fields uniqueFields, Include include )
438        {
439        this( name, pipes, uniqueFields, include, 0 );
440        }
441    
442      /**
443       * Constructor Unique creates a new Unique instance.
444       *
445       * @param name         of type String
446       * @param pipes        of type Pipe[]
447       * @param uniqueFields of type Fields
448       * @param capacity     of type int
449       */
450      @ConstructorProperties({"name", "pipes", "uniqueFields", "capacity"})
451      public Unique( String name, Pipe[] pipes, Fields uniqueFields, int capacity )
452        {
453        this( name, pipes, uniqueFields, null, capacity );
454        }
455    
456      /**
457       * Constructor Unique creates a new Unique instance.
458       *
459       * @param name         of type String
460       * @param pipes        of type Pipe[]
461       * @param uniqueFields of type Fields
462       * @param capacity     of type int
463       */
464      @ConstructorProperties({"name", "pipes", "uniqueFields", "include", "capacity"})
465      public Unique( String name, Pipe[] pipes, Fields uniqueFields, Include include, int capacity )
466        {
467        super( pipes );
468    
469        if( uniqueFields == null )
470          throw new IllegalArgumentException( "uniqueFields may not be null" );
471    
472        Pipe[] filters = new Pipe[ pipes.length ];
473    
474        TupleHasher tupleHasher = null;
475        Comparator[] comparators = uniqueFields.getComparators();
476    
477        if( !TupleHasher.isNull( comparators ) )
478          tupleHasher = new TupleHasher( null, comparators );
479    
480        FilterPartialDuplicates partialDuplicates = new FilterPartialDuplicates( include, capacity, tupleHasher );
481    
482        for( int i = 0; i < filters.length; i++ )
483          filters[ i ] = new Each( pipes[ i ], uniqueFields, partialDuplicates );
484    
485        Pipe pipe = new GroupBy( name, filters, uniqueFields );
486        pipe = new Every( pipe, Fields.ALL, new FirstNBuffer(), Fields.RESULTS );
487    
488        setTails( pipe );
489        }
490      }