001/* 002 * Copyright (c) 2016-2017 Chris K Wensel <chris@wensel.net>. All Rights Reserved. 003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 004 * 005 * Project and contact information: http://www.cascading.org/ 006 * 007 * This file is part of the Cascading project. 008 * 009 * Licensed under the Apache License, Version 2.0 (the "License"); 010 * you may not use this file except in compliance with the License. 011 * You may obtain a copy of the License at 012 * 013 * http://www.apache.org/licenses/LICENSE-2.0 014 * 015 * Unless required by applicable law or agreed to in writing, software 016 * distributed under the License is distributed on an "AS IS" BASIS, 017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 018 * See the License for the specific language governing permissions and 019 * limitations under the License. 020 */ 021 022package cascading.pipe.assembly; 023 024import java.beans.ConstructorProperties; 025import java.util.Comparator; 026import java.util.Map; 027 028import cascading.CascadingException; 029import cascading.flow.FlowProcess; 030import cascading.operation.BaseOperation; 031import cascading.operation.Filter; 032import cascading.operation.FilterCall; 033import cascading.operation.OperationCall; 034import cascading.operation.buffer.FirstNBuffer; 035import cascading.pipe.Each; 036import cascading.pipe.Every; 037import cascading.pipe.GroupBy; 038import cascading.pipe.Pipe; 039import cascading.pipe.SubAssembly; 040import cascading.provider.FactoryLoader; 041import cascading.tuple.Fields; 042import cascading.tuple.Tuple; 043import cascading.tuple.Tuples; 044import cascading.tuple.util.TupleHasher; 045import cascading.util.cache.BaseCacheFactory; 046import cascading.util.cache.CacheEvictionCallback; 047import cascading.util.cache.CascadingCache; 048 049/** 050 * Class Unique {@link SubAssembly} is used to filter all duplicates out of a tuple stream. 051 * <p> 052 * Typically finding unique value in a tuple stream relies on a {@link GroupBy} and a {@link FirstNBuffer} 053 * {@link cascading.operation.Buffer} operation. 054 * <p> 055 * If the {@code include} value is set to {@link Include#NO_NULLS}, any tuple consisting of only {@code null} 056 * values will be removed from the stream. 057 * <p> 058 * This SubAssembly uses the {@link FilterPartialDuplicates} {@link cascading.operation.Filter} 059 * to remove as many observed duplicates before the GroupBy operator to reduce IO over the network. 060 * <p> 061 * This strategy is similar to using {@code combiners}, except no sorting or serialization is invoked and results 062 * in a much simpler mechanism. 063 * <p> 064 * Unique uses a {@link cascading.util.cache.CascadingCache} or LRU to do the filtering. To tune the cache, set the 065 * {@code capacity} value to a high enough value to utilize available memory. Or set a default value via the 066 * {@link cascading.pipe.assembly.UniqueProps#UNIQUE_CACHE_CAPACITY} property. The current default is {@code 10, 000} unique keys. 067 * <p> 068 * The LRU cache is pluggable and defaults to {@link cascading.util.cache.LRUHashMapCache}. It can be changed 069 * by setting {@link cascading.pipe.assembly.UniqueProps#UNIQUE_CACHE_FACTORY} property to the name of a sub-class of 070 * {@link cascading.util.cache.BaseCacheFactory}. 071 * <p> 072 * The {@code capacity} value tells the underlying FilterPartialDuplicates how many values to cache for duplicate 073 * comparison before dropping values from the LRU cache. 074 * 075 * @see cascading.util.cache.LRUHashMapCacheFactory 076 * @see cascading.util.cache.DirectMappedCacheFactory 077 * @see cascading.util.cache.LRUHashMapCache 078 * @see cascading.util.cache.DirectMappedCache 079 */ 080public class Unique extends SubAssembly 081 { 082 083 public enum Include 084 { 085 ALL, 086 NO_NULLS 087 } 088 089 public enum Cache 090 { 091 Num_Keys_Flushed, 092 Num_Keys_Hit, 093 Num_Keys_Missed 094 } 095 096 /** 097 * Class FilterPartialDuplicates is a {@link cascading.operation.Filter} that is used to remove observed duplicates from the tuple stream. 098 * <p> 099 * Use this class typically in tandem with a {@link cascading.operation.aggregator.First} 100 * {@link cascading.operation.Aggregator} in order to improve de-duping performance by removing as many values 101 * as possible before the intermediate {@link cascading.pipe.GroupBy} operator. 102 * <p> 103 * The {@code capacity} value is used to maintain a LRU of a constant size. If more than capacity unique values 104 * are seen, the oldest cached values will be removed from the cache. 105 * 106 * @see Unique 107 */ 108 public static class FilterPartialDuplicates extends BaseOperation<CascadingCache<Tuple, Object>> implements Filter<CascadingCache<Tuple, Object>> 109 { 110 /** special null value for the caches, since a cache might not permit 'null' as a value */ 111 private final static Object NULL_VALUE = new Object(); 112 113 private int capacity = 0; 114 private Include include = Include.ALL; 115 private TupleHasher tupleHasher; 116 117 /** Constructor FilterPartialDuplicates creates a new FilterPartialDuplicates instance. */ 118 public FilterPartialDuplicates() 119 { 120 } 121 122 /** 123 * Constructor FilterPartialDuplicates creates a new FilterPartialDuplicates instance. 124 * 125 * @param capacity of type int 126 */ 127 @ConstructorProperties({"capacity"}) 128 public FilterPartialDuplicates( int capacity ) 129 { 130 this.capacity = capacity; 131 } 132 133 /** 134 * Constructor FilterPartialDuplicates creates a new FilterPartialDuplicates instance. 135 * 136 * @param include of type Include 137 * @param capacity of type int 138 */ 139 @ConstructorProperties({"include", "capacity"}) 140 public FilterPartialDuplicates( Include include, int capacity ) 141 { 142 this( include, capacity, null ); 143 } 144 145 /** 146 * Constructor FilterPartialDuplicates creates a new FilterPartialDuplicates instance. 147 * 148 * @param capacity of type int 149 * @param include of type Include 150 * @param tupleHasher of type TupleHasher 151 */ 152 @ConstructorProperties({"include", "capacity", "tupleHasher"}) 153 public FilterPartialDuplicates( Include include, int capacity, TupleHasher tupleHasher ) 154 { 155 this.capacity = capacity; 156 this.include = include == null ? this.include : include; 157 this.tupleHasher = tupleHasher; 158 } 159 160 @Override 161 public void prepare( final FlowProcess flowProcess, OperationCall<CascadingCache<Tuple, Object>> operationCall ) 162 { 163 CacheEvictionCallback callback = new CacheEvictionCallback() 164 { 165 @Override 166 public void evict( Map.Entry entry ) 167 { 168 flowProcess.increment( Cache.Num_Keys_Flushed, 1 ); 169 } 170 }; 171 FactoryLoader loader = FactoryLoader.getInstance(); 172 BaseCacheFactory cacheFactory = loader.loadFactoryFrom( flowProcess, UniqueProps.UNIQUE_CACHE_FACTORY, UniqueProps.DEFAULT_CACHE_FACTORY_CLASS ); 173 174 if( cacheFactory == null ) 175 throw new CascadingException( "unable to load cache factory, please check your '" + UniqueProps.UNIQUE_CACHE_FACTORY + "' setting." ); 176 177 CascadingCache cache = cacheFactory.create( flowProcess ); 178 cache.setCacheEvictionCallback( callback ); 179 Integer cacheCapacity = capacity; 180 181 if( capacity == 0 ) 182 { 183 cacheCapacity = flowProcess.getIntegerProperty( UniqueProps.UNIQUE_CACHE_CAPACITY ); 184 185 if( cacheCapacity == null ) 186 cacheCapacity = UniqueProps.UNIQUE_DEFAULT_CAPACITY; 187 } 188 189 cache.setCapacity( cacheCapacity.intValue() ); 190 cache.initialize(); 191 192 operationCall.setContext( cache ); 193 } 194 195 @Override 196 public boolean isRemove( FlowProcess flowProcess, FilterCall<CascadingCache<Tuple, Object>> filterCall ) 197 { 198 // we assume its more painful to create lots of tuple copies vs comparisons 199 Tuple args = TupleHasher.wrapTuple( tupleHasher, filterCall.getArguments().getTuple() ); 200 201 switch( include ) 202 { 203 case ALL: 204 break; 205 206 case NO_NULLS: 207 if( Tuples.frequency( args, null ) == args.size() ) 208 return true; 209 210 break; 211 } 212 213 if( filterCall.getContext().containsKey( args ) ) 214 { 215 flowProcess.increment( Cache.Num_Keys_Hit, 1 ); 216 return true; 217 } 218 219 // only do the copy here 220 filterCall.getContext().put( TupleHasher.wrapTuple( tupleHasher, filterCall.getArguments().getTupleCopy() ), NULL_VALUE ); 221 222 flowProcess.increment( Cache.Num_Keys_Missed, 1 ); 223 224 return false; 225 } 226 227 @Override 228 public void cleanup( FlowProcess flowProcess, OperationCall<CascadingCache<Tuple, Object>> operationCall ) 229 { 230 operationCall.setContext( null ); 231 } 232 233 @Override 234 public boolean equals( Object object ) 235 { 236 if( this == object ) 237 return true; 238 if( !( object instanceof FilterPartialDuplicates ) ) 239 return false; 240 if( !super.equals( object ) ) 241 return false; 242 243 FilterPartialDuplicates that = (FilterPartialDuplicates) object; 244 245 if( capacity != that.capacity ) 246 return false; 247 248 return true; 249 } 250 251 @Override 252 public int hashCode() 253 { 254 int result = super.hashCode(); 255 result = 31 * result + capacity; 256 return result; 257 } 258 } 259 260 /** 261 * Constructor Unique creates a new Unique instance. 262 * 263 * @param pipe of type Pipe 264 * @param uniqueFields of type Fields 265 */ 266 @ConstructorProperties({"pipe", "uniqueFields"}) 267 public Unique( Pipe pipe, Fields uniqueFields ) 268 { 269 this( null, pipe, uniqueFields ); 270 } 271 272 /** 273 * Constructor Unique creates a new Unique instance. 274 * 275 * @param pipe of type Pipe 276 * @param uniqueFields of type Fields 277 * @param include of type Include 278 */ 279 @ConstructorProperties({"pipe", "uniqueFields", "include"}) 280 public Unique( Pipe pipe, Fields uniqueFields, Include include ) 281 { 282 this( null, pipe, uniqueFields, include ); 283 } 284 285 /** 286 * Constructor Unique creates a new Unique instance. 287 * 288 * @param pipe of type Pipe 289 * @param uniqueFields of type Fields 290 * @param capacity of type int 291 */ 292 @ConstructorProperties({"pipe", "uniqueFields", "capacity"}) 293 public Unique( Pipe pipe, Fields uniqueFields, int capacity ) 294 { 295 this( null, pipe, uniqueFields, capacity ); 296 } 297 298 /** 299 * Constructor Unique creates a new Unique instance. 300 * 301 * @param pipe of type Pipe 302 * @param uniqueFields of type Fields 303 * @param include of type Include 304 * @param capacity of type int 305 */ 306 @ConstructorProperties({"pipe", "uniqueFields", "include", "capacity"}) 307 public Unique( Pipe pipe, Fields uniqueFields, Include include, int capacity ) 308 { 309 this( null, pipe, uniqueFields, include, capacity ); 310 } 311 312 /** 313 * Constructor Unique creates a new Unique instance. 314 * 315 * @param name of type String 316 * @param pipe of type Pipe 317 * @param uniqueFields of type Fields 318 */ 319 @ConstructorProperties({"name", "pipe", "uniqueFields"}) 320 public Unique( String name, Pipe pipe, Fields uniqueFields ) 321 { 322 this( name, pipe, uniqueFields, null ); 323 } 324 325 /** 326 * Constructor Unique creates a new Unique instance. 327 * 328 * @param name of type String 329 * @param pipe of type Pipe 330 * @param uniqueFields of type Fields 331 * @param include of type Include 332 */ 333 @ConstructorProperties({"name", "pipe", "uniqueFields", "include"}) 334 public Unique( String name, Pipe pipe, Fields uniqueFields, Include include ) 335 { 336 this( name, pipe, uniqueFields, include, 0 ); 337 } 338 339 /** 340 * Constructor Unique creates a new Unique instance. 341 * 342 * @param name of type String 343 * @param pipe of type Pipe 344 * @param uniqueFields of type Fields 345 * @param capacity of type int 346 */ 347 @ConstructorProperties({"name", "pipe", "uniqueFields", "capacity"}) 348 public Unique( String name, Pipe pipe, Fields uniqueFields, int capacity ) 349 { 350 this( name, Pipe.pipes( pipe ), uniqueFields, capacity ); 351 } 352 353 /** 354 * Constructor Unique creates a new Unique instance. 355 * 356 * @param name of type String 357 * @param pipe of type Pipe 358 * @param uniqueFields of type Fields 359 * @param include of type Include 360 * @param capacity of type int 361 */ 362 @ConstructorProperties({"name", "pipe", "uniqueFields", "include", "capacity"}) 363 public Unique( String name, Pipe pipe, Fields uniqueFields, Include include, int capacity ) 364 { 365 this( name, Pipe.pipes( pipe ), uniqueFields, include, capacity ); 366 } 367 368 /** 369 * Constructor Unique creates a new Unique instance. 370 * 371 * @param pipes of type Pipe[] 372 * @param uniqueFields of type Fields 373 */ 374 @ConstructorProperties({"pipes", "uniqueFields"}) 375 public Unique( Pipe[] pipes, Fields uniqueFields ) 376 { 377 this( null, pipes, uniqueFields ); 378 } 379 380 /** 381 * Constructor Unique creates a new Unique instance. 382 * 383 * @param pipes of type Pipe[] 384 * @param uniqueFields of type Fields 385 * @param include of type Include 386 */ 387 @ConstructorProperties({"pipes", "uniqueFields", "include"}) 388 public Unique( Pipe[] pipes, Fields uniqueFields, Include include ) 389 { 390 this( null, pipes, uniqueFields, include ); 391 } 392 393 /** 394 * Constructor Unique creates a new Unique instance. 395 * 396 * @param pipes of type Pipe[] 397 * @param uniqueFields of type Fields 398 * @param capacity of type int 399 */ 400 @ConstructorProperties({"pipes", "uniqueFields", "capacity"}) 401 public Unique( Pipe[] pipes, Fields uniqueFields, int capacity ) 402 { 403 this( null, pipes, uniqueFields, capacity ); 404 } 405 406 /** 407 * Constructor Unique creates a new Unique instance. 408 * 409 * @param pipes of type Pipe[] 410 * @param uniqueFields of type Fields 411 * @param include of type Include 412 * @param capacity of type int 413 */ 414 @ConstructorProperties({"pipes", "uniqueFields", "include", "capacity"}) 415 public Unique( Pipe[] pipes, Fields uniqueFields, Include include, int capacity ) 416 { 417 this( null, pipes, uniqueFields, include, capacity ); 418 } 419 420 /** 421 * Constructor Unique creates a new Unique instance. 422 * 423 * @param name of type String 424 * @param pipes of type Pipe[] 425 * @param uniqueFields of type Fields 426 */ 427 @ConstructorProperties({"name", "pipes", "uniqueFields"}) 428 public Unique( String name, Pipe[] pipes, Fields uniqueFields ) 429 { 430 this( name, pipes, uniqueFields, null ); 431 } 432 433 /** 434 * Constructor Unique creates a new Unique instance. 435 * 436 * @param name of type String 437 * @param pipes of type Pipe[] 438 * @param uniqueFields of type Fields 439 * @param include of type Include 440 */ 441 @ConstructorProperties({"name", "pipes", "uniqueFields", "include"}) 442 public Unique( String name, Pipe[] pipes, Fields uniqueFields, Include include ) 443 { 444 this( name, pipes, uniqueFields, include, 0 ); 445 } 446 447 /** 448 * Constructor Unique creates a new Unique instance. 449 * 450 * @param name of type String 451 * @param pipes of type Pipe[] 452 * @param uniqueFields of type Fields 453 * @param capacity of type int 454 */ 455 @ConstructorProperties({"name", "pipes", "uniqueFields", "capacity"}) 456 public Unique( String name, Pipe[] pipes, Fields uniqueFields, int capacity ) 457 { 458 this( name, pipes, uniqueFields, null, capacity ); 459 } 460 461 /** 462 * Constructor Unique creates a new Unique instance. 463 * 464 * @param name of type String 465 * @param pipes of type Pipe[] 466 * @param uniqueFields of type Fields 467 * @param capacity of type int 468 */ 469 @ConstructorProperties({"name", "pipes", "uniqueFields", "include", "capacity"}) 470 public Unique( String name, Pipe[] pipes, Fields uniqueFields, Include include, int capacity ) 471 { 472 super( pipes ); 473 474 if( uniqueFields == null ) 475 throw new IllegalArgumentException( "uniqueFields may not be null" ); 476 477 Pipe[] filters = new Pipe[ pipes.length ]; 478 479 TupleHasher tupleHasher = null; 480 Comparator[] comparators = uniqueFields.getComparators(); 481 482 if( !TupleHasher.isNull( comparators ) ) 483 tupleHasher = new TupleHasher( null, comparators ); 484 485 FilterPartialDuplicates partialDuplicates = new FilterPartialDuplicates( include, capacity, tupleHasher ); 486 487 for( int i = 0; i < filters.length; i++ ) 488 filters[ i ] = new Each( pipes[ i ], uniqueFields, partialDuplicates ); 489 490 Pipe pipe = new GroupBy( name, filters, uniqueFields ); 491 pipe = new Every( pipe, Fields.ALL, new FirstNBuffer(), Fields.RESULTS ); 492 493 setTails( pipe ); 494 } 495 }