001/* 002 * Copyright (c) 2016-2017 Chris K Wensel <chris@wensel.net>. All Rights Reserved. 003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 004 * 005 * Project and contact information: http://www.cascading.org/ 006 * 007 * This file is part of the Cascading project. 008 * 009 * Licensed under the Apache License, Version 2.0 (the "License"); 010 * you may not use this file except in compliance with the License. 011 * You may obtain a copy of the License at 012 * 013 * http://www.apache.org/licenses/LICENSE-2.0 014 * 015 * Unless required by applicable law or agreed to in writing, software 016 * distributed under the License is distributed on an "AS IS" BASIS, 017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 018 * See the License for the specific language governing permissions and 019 * limitations under the License. 020 */ 021 022package cascading.pipe.assembly; 023 024import java.beans.ConstructorProperties; 025import java.io.Serializable; 026import java.util.ArrayList; 027import java.util.Arrays; 028import java.util.Collections; 029import java.util.Comparator; 030import java.util.List; 031import java.util.Map; 032 033import cascading.CascadingException; 034import cascading.flow.FlowProcess; 035import cascading.management.annotation.Property; 036import cascading.management.annotation.PropertyConfigured; 037import cascading.management.annotation.PropertyDescription; 038import cascading.management.annotation.Visibility; 039import cascading.operation.Aggregator; 040import cascading.operation.BaseOperation; 041import cascading.operation.Function; 042import cascading.operation.FunctionCall; 043import cascading.operation.OperationCall; 044import cascading.pipe.Each; 045import cascading.pipe.Every; 046import cascading.pipe.GroupBy; 047import cascading.pipe.Pipe; 048import cascading.pipe.SubAssembly; 049import cascading.provider.FactoryLoader; 050import cascading.tuple.Fields; 051import cascading.tuple.Tuple; 052import cascading.tuple.TupleEntry; 053import cascading.tuple.TupleEntryCollector; 054import cascading.tuple.util.TupleHasher; 055import cascading.tuple.util.TupleViews; 056import cascading.util.cache.BaseCacheFactory; 057import cascading.util.cache.CacheEvictionCallback; 058import cascading.util.cache.CascadingCache; 059 060/** 061 * Class AggregateBy is a {@link SubAssembly} that serves two roles for handling aggregate operations. 062 * <p> 063 * The first role is as a base class for composable aggregate operations that have a MapReduce Map side optimization for the 064 * Reduce side aggregation. For example 'summing' a value within a grouping can be performed partially Map side and 065 * completed Reduce side. Summing is associative and commutative. 066 * <p> 067 * AggregateBy also supports operations that are not associative/commutative like 'counting'. Counting 068 * would result in 'counting' value occurrences Map side but summing those counts Reduce side. (Yes, counting can be 069 * transposed to summing Map and Reduce sides by emitting 1's before the first sum, but that's three operations over 070 * two, and a hack) 071 * <p> 072 * Think of this mechanism as a MapReduce Combiner, but more efficient as no values are serialized, 073 * deserialized, saved to disk, and multi-pass sorted in the process, which consume cpu resources in trade of 074 * memory and a little or no IO. 075 * <p> 076 * Further, Combiners are limited to only associative/commutative operations. 077 * <p> 078 * Additionally the Cascading planner can move the Map side optimization 079 * to the previous Reduce operation further increasing IO performance (between the preceding Reduce and Map phase which 080 * is over HDFS). 081 * <p> 082 * The second role of the AggregateBy class is to allow for composition of AggregateBy 083 * sub-classes. That is, {@link SumBy} and {@link CountBy} AggregateBy sub-classes can be performed 084 * in parallel on the same grouping keys. 085 * <p> 086 * Custom AggregateBy classes can be created by sub-classing this class and implementing a special 087 * {@link Functor} for use on the Map side. Multiple Functor instances are managed by the {@link CompositeFunction} 088 * class allowing them all to share the same LRU value map for more efficiency. 089 * <p> 090 * AggregateBy instances return {@code argumentFields} which are used internally to control the values passed to 091 * internal Functor instances. If any argumentFields also have {@link java.util.Comparator}s, they will be used 092 * to for secondary sorting (see {@link GroupBy} {@code sortFields}. This feature is used by {@link FirstBy} to 093 * control which Tuple is seen first for a grouping. 094 * <p> 095 * To tune the LRU, set the {@code capacity} value to a high enough value to utilize available memory. Or set a 096 * default value via the {@link cascading.pipe.assembly.AggregateByProps#AGGREGATE_BY_CAPACITY} property. The current default 097 * ({@link cascading.util.cache.BaseCacheFactory#DEFAULT_CAPACITY}) 098 * is {@code 10, 000} unique keys. 099 * <p> 100 * The LRU cache is pluggable and defaults to {@link cascading.util.cache.LRUHashMapCache}. It can be changed 101 * by setting {@link cascading.pipe.assembly.AggregateByProps#AGGREGATE_BY_CACHE_FACTORY} property to the name of a sub-class of 102 * {@link cascading.util.cache.BaseCacheFactory}. 103 * <p> 104 * Note using a AggregateBy instance automatically inserts a {@link GroupBy} into the resulting {@link cascading.flow.Flow}. 105 * And passing multiple AggregateBy instances to a parent AggregateBy instance still results in one GroupBy. 106 * <p> 107 * Also note that {@link Unique} is not a CompositeAggregator and is slightly more optimized internally. 108 * <p> 109 * As of Cascading 2.6 AggregateBy honors the {@link cascading.tuple.Hasher} interface for storing keys in the cache. 110 * 111 * @see SumBy 112 * @see CountBy 113 * @see Unique 114 * @see cascading.util.cache.LRUHashMapCacheFactory 115 * @see cascading.util.cache.DirectMappedCacheFactory 116 * @see cascading.util.cache.LRUHashMapCache 117 * @see cascading.util.cache.DirectMappedCache 118 */ 119public class AggregateBy extends SubAssembly 120 { 121 public static final int USE_DEFAULT_THRESHOLD = 0; 122 123 private String name; 124 private int capacity; 125 private Fields groupingFields; 126 private Fields[] argumentFields; 127 private Functor[] functors; 128 private Aggregator[] aggregators; 129 private transient GroupBy groupBy; 130 131 public enum Cache 132 { 133 Num_Keys_Flushed, 134 Num_Keys_Hit, 135 Num_Keys_Missed 136 } 137 138 /** 139 * Interface Functor provides a means to create a simple function for use with the {@link CompositeFunction} class. 140 * <p> 141 * Note the {@link FlowProcess} argument provides access to the underlying properties and counter APIs. 142 */ 143 public interface Functor extends Serializable 144 { 145 /** 146 * Method getDeclaredFields returns the declaredFields of this Functor object. 147 * 148 * @return the declaredFields (type Fields) of this Functor object. 149 */ 150 Fields getDeclaredFields(); 151 152 /** 153 * Method aggregate operates on the given args in tandem (optionally) with the given context values. 154 * <p> 155 * The context argument is the result of the previous call to this method. Use it to store values between aggregate 156 * calls (the current count, or sum of the args). 157 * <p> 158 * On the very first invocation of aggregate for a given grouping key, context will be {@code null}. All subsequent 159 * invocations context will be the value returned on the previous invocation. 160 * 161 * @param flowProcess of type FlowProcess 162 * @param args of type TupleEntry 163 * @param context of type Tuple @return Tuple 164 */ 165 Tuple aggregate( FlowProcess flowProcess, TupleEntry args, Tuple context ); 166 167 /** 168 * Method complete allows the final aggregate computation to be performed before the return value is collected. 169 * <p> 170 * The number of values in the returned {@link Tuple} instance must match the number of declaredFields. 171 * <p> 172 * It is safe to return the context object as the result value. 173 * 174 * @param flowProcess of type FlowProcess 175 * @param context of type Tuple @return Tuple 176 */ 177 Tuple complete( FlowProcess flowProcess, Tuple context ); 178 } 179 180 /** 181 * Class CompositeFunction takes multiple Functor instances and manages them as a single {@link Function}. 182 * 183 * @see Functor 184 */ 185 public static class CompositeFunction extends BaseOperation<CompositeFunction.Context> implements Function<CompositeFunction.Context> 186 { 187 private int capacity = 0; 188 private final Fields groupingFields; 189 private final Fields[] argumentFields; 190 private final Fields[] functorFields; 191 private final Functor[] functors; 192 private final TupleHasher tupleHasher; 193 194 public static class Context 195 { 196 CascadingCache<Tuple, Tuple[]> lru; 197 TupleEntry[] arguments; 198 Tuple result; 199 } 200 201 /** 202 * Constructor CompositeFunction creates a new CompositeFunction instance. 203 * 204 * @param groupingFields of type Fields 205 * @param argumentFields of type Fields 206 * @param functor of type Functor 207 * @param capacity of type int 208 */ 209 public CompositeFunction( Fields groupingFields, Fields argumentFields, Functor functor, int capacity ) 210 { 211 this( groupingFields, Fields.fields( argumentFields ), new Functor[]{functor}, capacity ); 212 } 213 214 /** 215 * Constructor CompositeFunction creates a new CompositeFunction instance. 216 * 217 * @param groupingFields of type Fields 218 * @param argumentFields of type Fields[] 219 * @param functors of type Functor[] 220 * @param capacity of type int 221 */ 222 public CompositeFunction( Fields groupingFields, Fields[] argumentFields, Functor[] functors, int capacity ) 223 { 224 super( getFields( groupingFields, functors ) ); // todo: groupingFields should lookup incoming type information 225 this.groupingFields = groupingFields; 226 this.argumentFields = argumentFields; 227 this.functors = functors; 228 this.capacity = capacity; 229 230 this.functorFields = new Fields[ functors.length ]; 231 232 for( int i = 0; i < functors.length; i++ ) 233 this.functorFields[ i ] = functors[ i ].getDeclaredFields(); 234 235 Comparator[] hashers = TupleHasher.merge( functorFields ); 236 if( !TupleHasher.isNull( hashers ) ) 237 this.tupleHasher = new TupleHasher( null, hashers ); 238 else 239 this.tupleHasher = null; 240 } 241 242 private static Fields getFields( Fields groupingFields, Functor[] functors ) 243 { 244 Fields fields = groupingFields; 245 246 for( Functor functor : functors ) 247 fields = fields.append( functor.getDeclaredFields() ); 248 249 return fields; 250 } 251 252 @Override 253 public void prepare( final FlowProcess flowProcess, final OperationCall<CompositeFunction.Context> operationCall ) 254 { 255 Fields[] fields = new Fields[ functors.length + 1 ]; 256 257 fields[ 0 ] = groupingFields; 258 259 for( int i = 0; i < functors.length; i++ ) 260 fields[ i + 1 ] = functors[ i ].getDeclaredFields(); 261 262 final Context context = new Context(); 263 264 context.arguments = new TupleEntry[ functors.length ]; 265 266 for( int i = 0; i < context.arguments.length; i++ ) 267 { 268 Fields resolvedArgumentFields = operationCall.getArgumentFields(); 269 270 int[] pos; 271 272 if( argumentFields[ i ].isAll() ) 273 pos = resolvedArgumentFields.getPos(); 274 else 275 pos = resolvedArgumentFields.getPos( argumentFields[ i ] ); // returns null if selector is ALL 276 277 Tuple narrow = TupleViews.createNarrow( pos ); 278 279 Fields currentFields; 280 281 if( this.argumentFields[ i ].isSubstitution() ) 282 currentFields = resolvedArgumentFields.select( this.argumentFields[ i ] ); // attempt to retain comparator 283 else 284 currentFields = Fields.asDeclaration( this.argumentFields[ i ] ); 285 286 context.arguments[ i ] = new TupleEntry( currentFields, narrow ); 287 } 288 289 context.result = TupleViews.createComposite( fields ); 290 291 class AggregateByEviction implements CacheEvictionCallback<Tuple, Tuple[]> 292 { 293 @Override 294 public void evict( Map.Entry<Tuple, Tuple[]> entry ) 295 { 296 completeFunctors( flowProcess, ( (FunctionCall) operationCall ).getOutputCollector(), context.result, entry ); 297 flowProcess.increment( Cache.Num_Keys_Flushed, 1 ); 298 } 299 } 300 301 FactoryLoader loader = FactoryLoader.getInstance(); 302 303 BaseCacheFactory<Tuple, Tuple[], ?> factory = loader.loadFactoryFrom( flowProcess, AggregateByProps.AGGREGATE_BY_CACHE_FACTORY, AggregateByProps.DEFAULT_CACHE_FACTORY_CLASS ); 304 305 if( factory == null ) 306 throw new CascadingException( "unable to load cache factory, please check your '" + AggregateByProps.AGGREGATE_BY_CACHE_FACTORY + "' setting." ); 307 308 CascadingCache<Tuple, Tuple[]> cache = factory.create( flowProcess ); 309 310 cache.setCacheEvictionCallback( new AggregateByEviction() ); 311 312 Integer cacheCapacity = capacity; 313 if( capacity == 0 ) 314 { 315 cacheCapacity = flowProcess.getIntegerProperty( AggregateByProps.AGGREGATE_BY_CAPACITY ); 316 317 if( cacheCapacity == null ) 318 cacheCapacity = AggregateByProps.AGGREGATE_BY_DEFAULT_CAPACITY; 319 } 320 cache.setCapacity( cacheCapacity.intValue() ); 321 cache.initialize(); 322 323 context.lru = cache; 324 325 operationCall.setContext( context ); 326 } 327 328 @Override 329 public void operate( FlowProcess flowProcess, FunctionCall<CompositeFunction.Context> functionCall ) 330 { 331 TupleEntry arguments = functionCall.getArguments(); 332 Tuple key = TupleHasher.wrapTuple( this.tupleHasher, arguments.selectTupleCopy( groupingFields ) ); 333 334 Context context = functionCall.getContext(); 335 Tuple[] functorContext = context.lru.get( key ); 336 337 if( functorContext == null ) 338 { 339 functorContext = new Tuple[ functors.length ]; 340 context.lru.put( key, functorContext ); 341 flowProcess.increment( Cache.Num_Keys_Missed, 1 ); 342 } 343 else 344 { 345 flowProcess.increment( Cache.Num_Keys_Hit, 1 ); 346 } 347 348 for( int i = 0; i < functors.length; i++ ) 349 { 350 TupleViews.reset( context.arguments[ i ].getTuple(), arguments.getTuple() ); 351 functorContext[ i ] = functors[ i ].aggregate( flowProcess, context.arguments[ i ], functorContext[ i ] ); 352 } 353 } 354 355 @Override 356 public void flush( FlowProcess flowProcess, OperationCall<CompositeFunction.Context> operationCall ) 357 { 358 // need to drain context 359 TupleEntryCollector collector = ( (FunctionCall) operationCall ).getOutputCollector(); 360 361 Tuple result = operationCall.getContext().result; 362 Map<Tuple, Tuple[]> context = operationCall.getContext().lru; 363 364 for( Map.Entry<Tuple, Tuple[]> entry : context.entrySet() ) 365 completeFunctors( flowProcess, collector, result, entry ); 366 367 context.clear(); 368 } 369 370 @Override 371 public void cleanup( FlowProcess flowProcess, OperationCall<Context> operationCall ) 372 { 373 operationCall.setContext( null ); 374 } 375 376 private void completeFunctors( FlowProcess flowProcess, TupleEntryCollector outputCollector, Tuple result, Map.Entry<Tuple, Tuple[]> entry ) 377 { 378 Tuple[] results = new Tuple[ functors.length + 1 ]; 379 380 results[ 0 ] = entry.getKey(); 381 382 Tuple[] values = entry.getValue(); 383 384 for( int i = 0; i < functors.length; i++ ) 385 results[ i + 1 ] = functors[ i ].complete( flowProcess, values[ i ] ); 386 387 TupleViews.reset( result, results ); 388 389 outputCollector.add( result ); 390 } 391 392 @Override 393 public boolean equals( Object object ) 394 { 395 if( this == object ) 396 return true; 397 if( !( object instanceof CompositeFunction ) ) 398 return false; 399 if( !super.equals( object ) ) 400 return false; 401 402 CompositeFunction that = (CompositeFunction) object; 403 404 if( !Arrays.equals( argumentFields, that.argumentFields ) ) 405 return false; 406 if( !Arrays.equals( functorFields, that.functorFields ) ) 407 return false; 408 if( !Arrays.equals( functors, that.functors ) ) 409 return false; 410 if( groupingFields != null ? !groupingFields.equals( that.groupingFields ) : that.groupingFields != null ) 411 return false; 412 413 return true; 414 } 415 416 @Override 417 public int hashCode() 418 { 419 int result = super.hashCode(); 420 result = 31 * result + ( groupingFields != null ? groupingFields.hashCode() : 0 ); 421 result = 31 * result + ( argumentFields != null ? Arrays.hashCode( argumentFields ) : 0 ); 422 result = 31 * result + ( functorFields != null ? Arrays.hashCode( functorFields ) : 0 ); 423 result = 31 * result + ( functors != null ? Arrays.hashCode( functors ) : 0 ); 424 return result; 425 } 426 } 427 428 /** 429 * Constructor CompositeAggregator creates a new CompositeAggregator instance. 430 * 431 * @param name of type String 432 * @param capacity of type int 433 */ 434 protected AggregateBy( String name, int capacity ) 435 { 436 this.name = name; 437 this.capacity = capacity; 438 } 439 440 /** 441 * Constructor CompositeAggregator creates a new CompositeAggregator instance. 442 * 443 * @param argumentFields of type Fields 444 * @param functor of type Functor 445 * @param aggregator of type Aggregator 446 */ 447 protected AggregateBy( Fields argumentFields, Functor functor, Aggregator aggregator ) 448 { 449 this.argumentFields = Fields.fields( argumentFields ); 450 this.functors = new Functor[]{functor}; 451 this.aggregators = new Aggregator[]{aggregator}; 452 } 453 454 /** 455 * Constructor CompositeAggregator creates a new CompositeAggregator instance. 456 * 457 * @param pipe of type Pipe 458 * @param groupingFields of type Fields 459 * @param assemblies of type CompositeAggregator... 460 */ 461 @ConstructorProperties({"pipe", "groupingFields", "assemblies"}) 462 public AggregateBy( Pipe pipe, Fields groupingFields, AggregateBy... assemblies ) 463 { 464 this( null, Pipe.pipes( pipe ), groupingFields, 0, assemblies ); 465 } 466 467 /** 468 * Constructor CompositeAggregator creates a new CompositeAggregator instance. 469 * 470 * @param pipe of type Pipe 471 * @param groupingFields of type Fields 472 * @param capacity of type int 473 * @param assemblies of type CompositeAggregator... 474 */ 475 @ConstructorProperties({"pipe", "groupingFields", "capacity", "assemblies"}) 476 public AggregateBy( Pipe pipe, Fields groupingFields, int capacity, AggregateBy... assemblies ) 477 { 478 this( null, Pipe.pipes( pipe ), groupingFields, capacity, assemblies ); 479 } 480 481 /** 482 * Constructor CompositeAggregator creates a new CompositeAggregator instance. 483 * 484 * @param pipe of type Pipe 485 * @param groupingFields of type Fields 486 * @param capacity of type int 487 * @param assemblies of type CompositeAggregator... 488 */ 489 @ConstructorProperties({"name", "pipe", "groupingFields", "capacity", "assemblies"}) 490 public AggregateBy( String name, Pipe pipe, Fields groupingFields, int capacity, AggregateBy... assemblies ) 491 { 492 this( name, Pipe.pipes( pipe ), groupingFields, capacity, assemblies ); 493 } 494 495 /** 496 * Constructor CompositeAggregator creates a new CompositeAggregator instance. 497 * 498 * @param name of type String 499 * @param pipes of type Pipe[] 500 * @param groupingFields of type Fields 501 * @param assemblies of type CompositeAggregator... 502 */ 503 @ConstructorProperties({"name", "pipes", "groupingFields", "assemblies"}) 504 public AggregateBy( String name, Pipe[] pipes, Fields groupingFields, AggregateBy... assemblies ) 505 { 506 this( name, pipes, groupingFields, 0, assemblies ); 507 } 508 509 /** 510 * Constructor CompositeAggregator creates a new CompositeAggregator instance. 511 * 512 * @param name of type String 513 * @param pipes of type Pipe[] 514 * @param groupingFields of type Fields 515 * @param capacity of type int 516 * @param assemblies of type CompositeAggregator... 517 */ 518 @ConstructorProperties({"name", "pipes", "groupingFields", "capacity", "assemblies"}) 519 public AggregateBy( String name, Pipe[] pipes, Fields groupingFields, int capacity, AggregateBy... assemblies ) 520 { 521 this( name, capacity ); 522 523 List<Fields> arguments = new ArrayList<Fields>(); 524 List<Functor> functors = new ArrayList<Functor>(); 525 List<Aggregator> aggregators = new ArrayList<Aggregator>(); 526 527 for( int i = 0; i < assemblies.length; i++ ) 528 { 529 AggregateBy assembly = assemblies[ i ]; 530 531 Collections.addAll( arguments, assembly.getArgumentFields() ); 532 Collections.addAll( functors, assembly.getFunctors() ); 533 Collections.addAll( aggregators, assembly.getAggregators() ); 534 } 535 536 initialize( groupingFields, pipes, arguments.toArray( new Fields[ arguments.size() ] ), functors.toArray( new Functor[ functors.size() ] ), aggregators.toArray( new Aggregator[ aggregators.size() ] ) ); 537 } 538 539 protected AggregateBy( String name, Pipe[] pipes, Fields groupingFields, Fields argumentFields, Functor functor, Aggregator aggregator, int capacity ) 540 { 541 this( name, capacity ); 542 initialize( groupingFields, pipes, argumentFields, functor, aggregator ); 543 } 544 545 protected void initialize( Fields groupingFields, Pipe[] pipes, Fields argumentFields, Functor functor, Aggregator aggregator ) 546 { 547 initialize( groupingFields, pipes, Fields.fields( argumentFields ), 548 new Functor[]{functor}, 549 new Aggregator[]{aggregator} ); 550 } 551 552 protected void initialize( Fields groupingFields, Pipe[] pipes, Fields[] argumentFields, Functor[] functors, Aggregator[] aggregators ) 553 { 554 setPrevious( pipes ); 555 556 this.groupingFields = groupingFields; 557 this.argumentFields = argumentFields; 558 this.functors = functors; 559 this.aggregators = aggregators; 560 561 verify(); 562 563 Fields sortFields = Fields.copyComparators( Fields.merge( this.argumentFields ), this.argumentFields ); 564 Fields argumentSelector = Fields.merge( this.groupingFields, sortFields ); 565 566 if( argumentSelector.equals( Fields.NONE ) ) 567 argumentSelector = Fields.ALL; 568 569 Pipe[] functions = new Pipe[ pipes.length ]; 570 571 CompositeFunction function = new CompositeFunction( this.groupingFields, this.argumentFields, this.functors, capacity ); 572 573 for( int i = 0; i < functions.length; i++ ) 574 functions[ i ] = new Each( pipes[ i ], argumentSelector, function, Fields.RESULTS ); 575 576 groupBy = new GroupBy( name, functions, this.groupingFields, sortFields.hasComparators() ? sortFields : null ); 577 578 Pipe pipe = groupBy; 579 580 for( int i = 0; i < aggregators.length; i++ ) 581 pipe = new Every( pipe, this.functors[ i ].getDeclaredFields(), this.aggregators[ i ], Fields.ALL ); 582 583 setTails( pipe ); 584 } 585 586 /** Method verify should be overridden by sub-classes if any values must be tested before the calling constructor returns. */ 587 protected void verify() 588 { 589 590 } 591 592 /** 593 * Method getGroupingFields returns the Fields this instances will be grouping against. 594 * 595 * @return the current grouping fields 596 */ 597 public Fields getGroupingFields() 598 { 599 return groupingFields; 600 } 601 602 /** 603 * Method getFieldDeclarations returns an array of Fields where each Field element in the array corresponds to the 604 * field declaration of the given Aggregator operations. 605 * <p> 606 * Note the actual Fields values are returned, not planner resolved Fields. 607 * 608 * @return and array of Fields 609 */ 610 public Fields[] getFieldDeclarations() 611 { 612 Fields[] fields = new Fields[ this.aggregators.length ]; 613 614 for( int i = 0; i < aggregators.length; i++ ) 615 fields[ i ] = aggregators[ i ].getFieldDeclaration(); 616 617 return fields; 618 } 619 620 protected Fields[] getArgumentFields() 621 { 622 return argumentFields; 623 } 624 625 protected Functor[] getFunctors() 626 { 627 return functors; 628 } 629 630 protected Aggregator[] getAggregators() 631 { 632 return aggregators; 633 } 634 635 /** 636 * Method getGroupBy returns the internal {@link GroupBy} instance so that any custom properties 637 * can be set on it via {@link cascading.pipe.Pipe#getStepConfigDef()}. 638 * 639 * @return GroupBy type 640 */ 641 public GroupBy getGroupBy() 642 { 643 return groupBy; 644 } 645 646 @Property(name = "capacity", visibility = Visibility.PUBLIC) 647 @PropertyDescription("Capacity of the aggregation cache.") 648 @PropertyConfigured(value = AggregateByProps.AGGREGATE_BY_CAPACITY, defaultValue = "10000") 649 public int getCapacity() 650 { 651 return capacity; 652 } 653 }