001/* 002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.pipe.assembly; 022 023import java.beans.ConstructorProperties; 024import java.io.Serializable; 025import java.util.ArrayList; 026import java.util.Arrays; 027import java.util.Collections; 028import java.util.Comparator; 029import java.util.List; 030import java.util.Map; 031 032import cascading.CascadingException; 033import cascading.flow.FlowProcess; 034import cascading.management.annotation.Property; 035import cascading.management.annotation.PropertyConfigured; 036import cascading.management.annotation.PropertyDescription; 037import cascading.management.annotation.Visibility; 038import cascading.operation.Aggregator; 039import cascading.operation.BaseOperation; 040import cascading.operation.Function; 041import cascading.operation.FunctionCall; 042import cascading.operation.OperationCall; 043import cascading.pipe.Each; 044import cascading.pipe.Every; 045import cascading.pipe.GroupBy; 046import cascading.pipe.Pipe; 047import cascading.pipe.SubAssembly; 048import cascading.provider.FactoryLoader; 049import cascading.tuple.Fields; 050import cascading.tuple.Tuple; 051import cascading.tuple.TupleEntry; 052import cascading.tuple.TupleEntryCollector; 053import cascading.tuple.util.TupleHasher; 054import cascading.tuple.util.TupleViews; 055import cascading.util.cache.BaseCacheFactory; 056import cascading.util.cache.CacheEvictionCallback; 057import cascading.util.cache.CascadingCache; 058 059/** 060 * Class AggregateBy is a {@link SubAssembly} that serves two roles for handling aggregate operations. 061 * <p/> 062 * The first role is as a base class for composable aggregate operations that have a MapReduce Map side optimization for the 063 * Reduce side aggregation. For example 'summing' a value within a grouping can be performed partially Map side and 064 * completed Reduce side. Summing is associative and commutative. 065 * <p/> 066 * AggregateBy also supports operations that are not associative/commutative like 'counting'. Counting 067 * would result in 'counting' value occurrences Map side but summing those counts Reduce side. (Yes, counting can be 068 * transposed to summing Map and Reduce sides by emitting 1's before the first sum, but that's three operations over 069 * two, and a hack) 070 * <p/> 071 * Think of this mechanism as a MapReduce Combiner, but more efficient as no values are serialized, 072 * deserialized, saved to disk, and multi-pass sorted in the process, which consume cpu resources in trade of 073 * memory and a little or no IO. 074 * <p/> 075 * Further, Combiners are limited to only associative/commutative operations. 076 * <p/> 077 * Additionally the Cascading planner can move the Map side optimization 078 * to the previous Reduce operation further increasing IO performance (between the preceding Reduce and Map phase which 079 * is over HDFS). 080 * <p/> 081 * The second role of the AggregateBy class is to allow for composition of AggregateBy 082 * sub-classes. That is, {@link SumBy} and {@link CountBy} AggregateBy sub-classes can be performed 083 * in parallel on the same grouping keys. 084 * </p> 085 * Custom AggregateBy classes can be created by sub-classing this class and implementing a special 086 * {@link Functor} for use on the Map side. Multiple Functor instances are managed by the {@link CompositeFunction} 087 * class allowing them all to share the same LRU value map for more efficiency. 088 * <p/> 089 * AggregateBy instances return {@code argumentFields} which are used internally to control the values passed to 090 * internal Functor instances. If any argumentFields also have {@link java.util.Comparator}s, they will be used 091 * to for secondary sorting (see {@link GroupBy} {@code sortFields}. This feature is used by {@link FirstBy} to 092 * control which Tuple is seen first for a grouping. 093 * <p/> 094 * To tune the LRU, set the {@code capacity} value to a high enough value to utilize available memory. Or set a 095 * default value via the {@link cascading.pipe.assembly.AggregateByProps#AGGREGATE_BY_CAPACITY} property. The current default 096 * ({@link cascading.util.cache.BaseCacheFactory#DEFAULT_CAPACITY}) 097 * is {@code 10, 000} unique keys. 098 * <p/> 099 * The LRU cache is pluggable and defaults to {@link cascading.util.cache.LRUHashMapCache}. It can be changed 100 * by setting {@link cascading.pipe.assembly.AggregateByProps#AGGREGATE_BY_CACHE_FACTORY} property to the name of a sub-class of 101 * {@link cascading.util.cache.BaseCacheFactory}. 102 * <p/> 103 * Note using a AggregateBy instance automatically inserts a {@link GroupBy} into the resulting {@link cascading.flow.Flow}. 104 * And passing multiple AggregateBy instances to a parent AggregateBy instance still results in one GroupBy. 105 * <p/> 106 * Also note that {@link Unique} is not a CompositeAggregator and is slightly more optimized internally. 107 * <p/> 108 * As of Cascading 2.6 AggregateBy honors the {@link cascading.tuple.Hasher} interface for storing keys in the cache. 109 * 110 * @see SumBy 111 * @see CountBy 112 * @see Unique 113 * @see cascading.util.cache.LRUHashMapCacheFactory 114 * @see cascading.util.cache.DirectMappedCacheFactory 115 * @see cascading.util.cache.LRUHashMapCache 116 * @see cascading.util.cache.DirectMappedCache 117 */ 118public class AggregateBy extends SubAssembly 119 { 120 public static final int USE_DEFAULT_THRESHOLD = 0; 121 122 private String name; 123 private int capacity; 124 private Fields groupingFields; 125 private Fields[] argumentFields; 126 private Functor[] functors; 127 private Aggregator[] aggregators; 128 private transient GroupBy groupBy; 129 130 public enum Cache 131 { 132 Num_Keys_Flushed, 133 Num_Keys_Hit, 134 Num_Keys_Missed 135 } 136 137 /** 138 * Interface Functor provides a means to create a simple function for use with the {@link CompositeFunction} class. 139 * <p/> 140 * Note the {@link FlowProcess} argument provides access to the underlying properties and counter APIs. 141 */ 142 public interface Functor extends Serializable 143 { 144 /** 145 * Method getDeclaredFields returns the declaredFields of this Functor object. 146 * 147 * @return the declaredFields (type Fields) of this Functor object. 148 */ 149 Fields getDeclaredFields(); 150 151 /** 152 * Method aggregate operates on the given args in tandem (optionally) with the given context values. 153 * <p/> 154 * The context argument is the result of the previous call to this method. Use it to store values between aggregate 155 * calls (the current count, or sum of the args). 156 * <p/> 157 * On the very first invocation of aggregate for a given grouping key, context will be {@code null}. All subsequent 158 * invocations context will be the value returned on the previous invocation. 159 * 160 * @param flowProcess of type FlowProcess 161 * @param args of type TupleEntry 162 * @param context of type Tuple @return Tuple 163 */ 164 Tuple aggregate( FlowProcess flowProcess, TupleEntry args, Tuple context ); 165 166 /** 167 * Method complete allows the final aggregate computation to be performed before the return value is collected. 168 * <p/> 169 * The number of values in the returned {@link Tuple} instance must match the number of declaredFields. 170 * <p/> 171 * It is safe to return the context object as the result value. 172 * 173 * @param flowProcess of type FlowProcess 174 * @param context of type Tuple @return Tuple 175 */ 176 Tuple complete( FlowProcess flowProcess, Tuple context ); 177 } 178 179 /** 180 * Class CompositeFunction takes multiple Functor instances and manages them as a single {@link Function}. 181 * 182 * @see Functor 183 */ 184 public static class CompositeFunction extends BaseOperation<CompositeFunction.Context> implements Function<CompositeFunction.Context> 185 { 186 private int capacity = 0; 187 private final Fields groupingFields; 188 private final Fields[] argumentFields; 189 private final Fields[] functorFields; 190 private final Functor[] functors; 191 private final TupleHasher tupleHasher; 192 193 public static class Context 194 { 195 CascadingCache<Tuple, Tuple[]> lru; 196 TupleEntry[] arguments; 197 Tuple result; 198 } 199 200 /** 201 * Constructor CompositeFunction creates a new CompositeFunction instance. 202 * 203 * @param groupingFields of type Fields 204 * @param argumentFields of type Fields 205 * @param functor of type Functor 206 * @param capacity of type int 207 */ 208 public CompositeFunction( Fields groupingFields, Fields argumentFields, Functor functor, int capacity ) 209 { 210 this( groupingFields, Fields.fields( argumentFields ), new Functor[]{functor}, capacity ); 211 } 212 213 /** 214 * Constructor CompositeFunction creates a new CompositeFunction instance. 215 * 216 * @param groupingFields of type Fields 217 * @param argumentFields of type Fields[] 218 * @param functors of type Functor[] 219 * @param capacity of type int 220 */ 221 public CompositeFunction( Fields groupingFields, Fields[] argumentFields, Functor[] functors, int capacity ) 222 { 223 super( getFields( groupingFields, functors ) ); // todo: groupingFields should lookup incoming type information 224 this.groupingFields = groupingFields; 225 this.argumentFields = argumentFields; 226 this.functors = functors; 227 this.capacity = capacity; 228 229 this.functorFields = new Fields[ functors.length ]; 230 231 for( int i = 0; i < functors.length; i++ ) 232 this.functorFields[ i ] = functors[ i ].getDeclaredFields(); 233 234 Comparator[] hashers = TupleHasher.merge( functorFields ); 235 if( !TupleHasher.isNull( hashers ) ) 236 this.tupleHasher = new TupleHasher( null, hashers ); 237 else 238 this.tupleHasher = null; 239 } 240 241 private static Fields getFields( Fields groupingFields, Functor[] functors ) 242 { 243 Fields fields = groupingFields; 244 245 for( Functor functor : functors ) 246 fields = fields.append( functor.getDeclaredFields() ); 247 248 return fields; 249 } 250 251 @Override 252 public void prepare( final FlowProcess flowProcess, final OperationCall<CompositeFunction.Context> operationCall ) 253 { 254 Fields[] fields = new Fields[ functors.length + 1 ]; 255 256 fields[ 0 ] = groupingFields; 257 258 for( int i = 0; i < functors.length; i++ ) 259 fields[ i + 1 ] = functors[ i ].getDeclaredFields(); 260 261 final Context context = new Context(); 262 263 context.arguments = new TupleEntry[ functors.length ]; 264 265 for( int i = 0; i < context.arguments.length; i++ ) 266 { 267 Fields resolvedArgumentFields = operationCall.getArgumentFields(); 268 269 int[] pos; 270 271 if( argumentFields[ i ].isAll() ) 272 pos = resolvedArgumentFields.getPos(); 273 else 274 pos = resolvedArgumentFields.getPos( argumentFields[ i ] ); // returns null if selector is ALL 275 276 Tuple narrow = TupleViews.createNarrow( pos ); 277 278 Fields currentFields; 279 280 if( this.argumentFields[ i ].isSubstitution() ) 281 currentFields = resolvedArgumentFields.select( this.argumentFields[ i ] ); // attempt to retain comparator 282 else 283 currentFields = Fields.asDeclaration( this.argumentFields[ i ] ); 284 285 context.arguments[ i ] = new TupleEntry( currentFields, narrow ); 286 } 287 288 context.result = TupleViews.createComposite( fields ); 289 290 class AggregateByEviction implements CacheEvictionCallback<Tuple, Tuple[]> 291 { 292 @Override 293 public void evict( Map.Entry<Tuple, Tuple[]> entry ) 294 { 295 completeFunctors( flowProcess, ( (FunctionCall) operationCall ).getOutputCollector(), context.result, entry ); 296 flowProcess.increment( Cache.Num_Keys_Flushed, 1 ); 297 } 298 } 299 300 FactoryLoader loader = FactoryLoader.getInstance(); 301 302 BaseCacheFactory<Tuple, Tuple[], ?> factory = loader.loadFactoryFrom( flowProcess, AggregateByProps.AGGREGATE_BY_CACHE_FACTORY, AggregateByProps.DEFAULT_CACHE_FACTORY_CLASS ); 303 304 if( factory == null ) 305 throw new CascadingException( "unable to load cache factory, please check your '" + AggregateByProps.AGGREGATE_BY_CACHE_FACTORY + "' setting." ); 306 307 CascadingCache<Tuple, Tuple[]> cache = factory.create( flowProcess ); 308 309 cache.setCacheEvictionCallback( new AggregateByEviction() ); 310 311 Integer cacheCapacity = capacity; 312 if( capacity == 0 ) 313 { 314 cacheCapacity = flowProcess.getIntegerProperty( AggregateByProps.AGGREGATE_BY_CAPACITY ); 315 316 if( cacheCapacity == null ) 317 cacheCapacity = AggregateByProps.AGGREGATE_BY_DEFAULT_CAPACITY; 318 } 319 cache.setCapacity( cacheCapacity.intValue() ); 320 cache.initialize(); 321 322 context.lru = cache; 323 324 operationCall.setContext( context ); 325 } 326 327 @Override 328 public void operate( FlowProcess flowProcess, FunctionCall<CompositeFunction.Context> functionCall ) 329 { 330 TupleEntry arguments = functionCall.getArguments(); 331 Tuple key = TupleHasher.wrapTuple( this.tupleHasher, arguments.selectTupleCopy( groupingFields ) ); 332 333 Context context = functionCall.getContext(); 334 Tuple[] functorContext = context.lru.get( key ); 335 336 if( functorContext == null ) 337 { 338 functorContext = new Tuple[ functors.length ]; 339 context.lru.put( key, functorContext ); 340 flowProcess.increment( Cache.Num_Keys_Missed, 1 ); 341 } 342 else 343 { 344 flowProcess.increment( Cache.Num_Keys_Hit, 1 ); 345 } 346 347 for( int i = 0; i < functors.length; i++ ) 348 { 349 TupleViews.reset( context.arguments[ i ].getTuple(), arguments.getTuple() ); 350 functorContext[ i ] = functors[ i ].aggregate( flowProcess, context.arguments[ i ], functorContext[ i ] ); 351 } 352 } 353 354 @Override 355 public void flush( FlowProcess flowProcess, OperationCall<CompositeFunction.Context> operationCall ) 356 { 357 // need to drain context 358 TupleEntryCollector collector = ( (FunctionCall) operationCall ).getOutputCollector(); 359 360 Tuple result = operationCall.getContext().result; 361 Map<Tuple, Tuple[]> context = operationCall.getContext().lru; 362 363 for( Map.Entry<Tuple, Tuple[]> entry : context.entrySet() ) 364 completeFunctors( flowProcess, collector, result, entry ); 365 366 context.clear(); 367 } 368 369 @Override 370 public void cleanup( FlowProcess flowProcess, OperationCall<Context> operationCall ) 371 { 372 operationCall.setContext( null ); 373 } 374 375 private void completeFunctors( FlowProcess flowProcess, TupleEntryCollector outputCollector, Tuple result, Map.Entry<Tuple, Tuple[]> entry ) 376 { 377 Tuple[] results = new Tuple[ functors.length + 1 ]; 378 379 results[ 0 ] = entry.getKey(); 380 381 Tuple[] values = entry.getValue(); 382 383 for( int i = 0; i < functors.length; i++ ) 384 results[ i + 1 ] = functors[ i ].complete( flowProcess, values[ i ] ); 385 386 TupleViews.reset( result, results ); 387 388 outputCollector.add( result ); 389 } 390 391 @Override 392 public boolean equals( Object object ) 393 { 394 if( this == object ) 395 return true; 396 if( !( object instanceof CompositeFunction ) ) 397 return false; 398 if( !super.equals( object ) ) 399 return false; 400 401 CompositeFunction that = (CompositeFunction) object; 402 403 if( !Arrays.equals( argumentFields, that.argumentFields ) ) 404 return false; 405 if( !Arrays.equals( functorFields, that.functorFields ) ) 406 return false; 407 if( !Arrays.equals( functors, that.functors ) ) 408 return false; 409 if( groupingFields != null ? !groupingFields.equals( that.groupingFields ) : that.groupingFields != null ) 410 return false; 411 412 return true; 413 } 414 415 @Override 416 public int hashCode() 417 { 418 int result = super.hashCode(); 419 result = 31 * result + ( groupingFields != null ? groupingFields.hashCode() : 0 ); 420 result = 31 * result + ( argumentFields != null ? Arrays.hashCode( argumentFields ) : 0 ); 421 result = 31 * result + ( functorFields != null ? Arrays.hashCode( functorFields ) : 0 ); 422 result = 31 * result + ( functors != null ? Arrays.hashCode( functors ) : 0 ); 423 return result; 424 } 425 } 426 427 /** 428 * Constructor CompositeAggregator creates a new CompositeAggregator instance. 429 * 430 * @param name of type String 431 * @param capacity of type int 432 */ 433 protected AggregateBy( String name, int capacity ) 434 { 435 this.name = name; 436 this.capacity = capacity; 437 } 438 439 /** 440 * Constructor CompositeAggregator creates a new CompositeAggregator instance. 441 * 442 * @param argumentFields of type Fields 443 * @param functor of type Functor 444 * @param aggregator of type Aggregator 445 */ 446 protected AggregateBy( Fields argumentFields, Functor functor, Aggregator aggregator ) 447 { 448 this.argumentFields = Fields.fields( argumentFields ); 449 this.functors = new Functor[]{functor}; 450 this.aggregators = new Aggregator[]{aggregator}; 451 } 452 453 /** 454 * Constructor CompositeAggregator creates a new CompositeAggregator instance. 455 * 456 * @param pipe of type Pipe 457 * @param groupingFields of type Fields 458 * @param assemblies of type CompositeAggregator... 459 */ 460 @ConstructorProperties({"pipe", "groupingFields", "assemblies"}) 461 public AggregateBy( Pipe pipe, Fields groupingFields, AggregateBy... assemblies ) 462 { 463 this( null, Pipe.pipes( pipe ), groupingFields, 0, assemblies ); 464 } 465 466 /** 467 * Constructor CompositeAggregator creates a new CompositeAggregator instance. 468 * 469 * @param pipe of type Pipe 470 * @param groupingFields of type Fields 471 * @param capacity of type int 472 * @param assemblies of type CompositeAggregator... 473 */ 474 @ConstructorProperties({"pipe", "groupingFields", "capacity", "assemblies"}) 475 public AggregateBy( Pipe pipe, Fields groupingFields, int capacity, AggregateBy... assemblies ) 476 { 477 this( null, Pipe.pipes( pipe ), groupingFields, capacity, assemblies ); 478 } 479 480 /** 481 * Constructor CompositeAggregator creates a new CompositeAggregator instance. 482 * 483 * @param pipe of type Pipe 484 * @param groupingFields of type Fields 485 * @param capacity of type int 486 * @param assemblies of type CompositeAggregator... 487 */ 488 @ConstructorProperties({"name", "pipe", "groupingFields", "capacity", "assemblies"}) 489 public AggregateBy( String name, Pipe pipe, Fields groupingFields, int capacity, AggregateBy... assemblies ) 490 { 491 this( name, Pipe.pipes( pipe ), groupingFields, capacity, assemblies ); 492 } 493 494 /** 495 * Constructor CompositeAggregator creates a new CompositeAggregator instance. 496 * 497 * @param name of type String 498 * @param pipes of type Pipe[] 499 * @param groupingFields of type Fields 500 * @param assemblies of type CompositeAggregator... 501 */ 502 @ConstructorProperties({"name", "pipes", "groupingFields", "assemblies"}) 503 public AggregateBy( String name, Pipe[] pipes, Fields groupingFields, AggregateBy... assemblies ) 504 { 505 this( name, pipes, groupingFields, 0, assemblies ); 506 } 507 508 /** 509 * Constructor CompositeAggregator creates a new CompositeAggregator instance. 510 * 511 * @param name of type String 512 * @param pipes of type Pipe[] 513 * @param groupingFields of type Fields 514 * @param capacity of type int 515 * @param assemblies of type CompositeAggregator... 516 */ 517 @ConstructorProperties({"name", "pipes", "groupingFields", "capacity", "assemblies"}) 518 public AggregateBy( String name, Pipe[] pipes, Fields groupingFields, int capacity, AggregateBy... assemblies ) 519 { 520 this( name, capacity ); 521 522 List<Fields> arguments = new ArrayList<Fields>(); 523 List<Functor> functors = new ArrayList<Functor>(); 524 List<Aggregator> aggregators = new ArrayList<Aggregator>(); 525 526 for( int i = 0; i < assemblies.length; i++ ) 527 { 528 AggregateBy assembly = assemblies[ i ]; 529 530 Collections.addAll( arguments, assembly.getArgumentFields() ); 531 Collections.addAll( functors, assembly.getFunctors() ); 532 Collections.addAll( aggregators, assembly.getAggregators() ); 533 } 534 535 initialize( groupingFields, pipes, arguments.toArray( new Fields[ arguments.size() ] ), functors.toArray( new Functor[ functors.size() ] ), aggregators.toArray( new Aggregator[ aggregators.size() ] ) ); 536 } 537 538 protected AggregateBy( String name, Pipe[] pipes, Fields groupingFields, Fields argumentFields, Functor functor, Aggregator aggregator, int capacity ) 539 { 540 this( name, capacity ); 541 initialize( groupingFields, pipes, argumentFields, functor, aggregator ); 542 } 543 544 protected void initialize( Fields groupingFields, Pipe[] pipes, Fields argumentFields, Functor functor, Aggregator aggregator ) 545 { 546 initialize( groupingFields, pipes, Fields.fields( argumentFields ), 547 new Functor[]{functor}, 548 new Aggregator[]{aggregator} ); 549 } 550 551 protected void initialize( Fields groupingFields, Pipe[] pipes, Fields[] argumentFields, Functor[] functors, Aggregator[] aggregators ) 552 { 553 setPrevious( pipes ); 554 555 this.groupingFields = groupingFields; 556 this.argumentFields = argumentFields; 557 this.functors = functors; 558 this.aggregators = aggregators; 559 560 verify(); 561 562 Fields sortFields = Fields.copyComparators( Fields.merge( this.argumentFields ), this.argumentFields ); 563 Fields argumentSelector = Fields.merge( this.groupingFields, sortFields ); 564 565 if( argumentSelector.equals( Fields.NONE ) ) 566 argumentSelector = Fields.ALL; 567 568 Pipe[] functions = new Pipe[ pipes.length ]; 569 570 CompositeFunction function = new CompositeFunction( this.groupingFields, this.argumentFields, this.functors, capacity ); 571 572 for( int i = 0; i < functions.length; i++ ) 573 functions[ i ] = new Each( pipes[ i ], argumentSelector, function, Fields.RESULTS ); 574 575 groupBy = new GroupBy( name, functions, this.groupingFields, sortFields.hasComparators() ? sortFields : null ); 576 577 Pipe pipe = groupBy; 578 579 for( int i = 0; i < aggregators.length; i++ ) 580 pipe = new Every( pipe, this.functors[ i ].getDeclaredFields(), this.aggregators[ i ], Fields.ALL ); 581 582 setTails( pipe ); 583 } 584 585 /** Method verify should be overridden by sub-classes if any values must be tested before the calling constructor returns. */ 586 protected void verify() 587 { 588 589 } 590 591 /** 592 * Method getGroupingFields returns the Fields this instances will be grouping against. 593 * 594 * @return the current grouping fields 595 */ 596 public Fields getGroupingFields() 597 { 598 return groupingFields; 599 } 600 601 /** 602 * Method getFieldDeclarations returns an array of Fields where each Field element in the array corresponds to the 603 * field declaration of the given Aggregator operations. 604 * <p/> 605 * Note the actual Fields values are returned, not planner resolved Fields. 606 * 607 * @return and array of Fields 608 */ 609 public Fields[] getFieldDeclarations() 610 { 611 Fields[] fields = new Fields[ this.aggregators.length ]; 612 613 for( int i = 0; i < aggregators.length; i++ ) 614 fields[ i ] = aggregators[ i ].getFieldDeclaration(); 615 616 return fields; 617 } 618 619 protected Fields[] getArgumentFields() 620 { 621 return argumentFields; 622 } 623 624 protected Functor[] getFunctors() 625 { 626 return functors; 627 } 628 629 protected Aggregator[] getAggregators() 630 { 631 return aggregators; 632 } 633 634 /** 635 * Method getGroupBy returns the internal {@link GroupBy} instance so that any custom properties 636 * can be set on it via {@link cascading.pipe.Pipe#getStepConfigDef()}. 637 * 638 * @return GroupBy type 639 */ 640 public GroupBy getGroupBy() 641 { 642 return groupBy; 643 } 644 645 @Property(name = "capacity", visibility = Visibility.PUBLIC) 646 @PropertyDescription("Capacity of the aggregation cache.") 647 @PropertyConfigured(value = AggregateByProps.AGGREGATE_BY_CAPACITY, defaultValue = "10000") 648 public int getCapacity() 649 { 650 return capacity; 651 } 652 }