001 /* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.pipe.assembly; 022 023 import java.beans.ConstructorProperties; 024 import java.io.Serializable; 025 import java.util.ArrayList; 026 import java.util.Arrays; 027 import java.util.Collections; 028 import java.util.Comparator; 029 import java.util.List; 030 import java.util.Map; 031 032 import cascading.CascadingException; 033 import cascading.flow.FlowProcess; 034 import cascading.management.annotation.Property; 035 import cascading.management.annotation.PropertyConfigured; 036 import cascading.management.annotation.PropertyDescription; 037 import cascading.management.annotation.Visibility; 038 import cascading.operation.Aggregator; 039 import cascading.operation.BaseOperation; 040 import cascading.operation.Function; 041 import cascading.operation.FunctionCall; 042 import cascading.operation.OperationCall; 043 import cascading.pipe.Each; 044 import cascading.pipe.Every; 045 import cascading.pipe.GroupBy; 046 import cascading.pipe.Pipe; 047 import cascading.pipe.SubAssembly; 048 import cascading.provider.FactoryLoader; 049 import cascading.tuple.Fields; 050 import cascading.tuple.Tuple; 051 import cascading.tuple.TupleEntry; 052 import cascading.tuple.TupleEntryCollector; 053 import cascading.tuple.util.TupleHasher; 054 import cascading.tuple.util.TupleViews; 055 import cascading.util.cache.BaseCacheFactory; 056 import cascading.util.cache.CacheEvictionCallback; 057 import cascading.util.cache.CascadingCache; 058 059 /** 060 * Class AggregateBy is a {@link SubAssembly} that serves two roles for handling aggregate operations. 061 * <p/> 062 * The first role is as a base class for composable aggregate operations that have a MapReduce Map side optimization for the 063 * Reduce side aggregation. For example 'summing' a value within a grouping can be performed partially Map side and 064 * completed Reduce side. Summing is associative and commutative. 065 * <p/> 066 * AggregateBy also supports operations that are not associative/commutative like 'counting'. Counting 067 * would result in 'counting' value occurrences Map side but summing those counts Reduce side. (Yes, counting can be 068 * transposed to summing Map and Reduce sides by emitting 1's before the first sum, but that's three operations over 069 * two, and a hack) 070 * <p/> 071 * Think of this mechanism as a MapReduce Combiner, but more efficient as no values are serialized, 072 * deserialized, saved to disk, and multi-pass sorted in the process, which consume cpu resources in trade of 073 * memory and a little or no IO. 074 * <p/> 075 * Further, Combiners are limited to only associative/commutative operations. 076 * <p/> 077 * Additionally the Cascading planner can move the Map side optimization 078 * to the previous Reduce operation further increasing IO performance (between the preceding Reduce and Map phase which 079 * is over HDFS). 080 * <p/> 081 * The second role of the AggregateBy class is to allow for composition of AggregateBy 082 * sub-classes. That is, {@link SumBy} and {@link CountBy} AggregateBy sub-classes can be performed 083 * in parallel on the same grouping keys. 084 * </p> 085 * Custom AggregateBy classes can be created by sub-classing this class and implementing a special 086 * {@link Functor} for use on the Map side. Multiple Functor instances are managed by the {@link CompositeFunction} 087 * class allowing them all to share the same LRU value map for more efficiency. 088 * <p/> 089 * AggregateBy instances return {@code argumentFields} which are used internally to control the values passed to 090 * internal Functor instances. If any argumentFields also have {@link java.util.Comparator}s, they will be used 091 * to for secondary sorting (see {@link GroupBy} {@code sortFields}. This feature is used by {@link FirstBy} to 092 * control which Tuple is seen first for a grouping. 093 * <p/> 094 * To tune the LRU, set the {@code capacity} value to a high enough value to utilize available memory. Or set a 095 * default value via the {@link cascading.pipe.assembly.AggregateByProps#AGGREGATE_BY_CAPACITY} property. The current default 096 * ({@link cascading.util.cache.BaseCacheFactory#DEFAULT_CAPACITY}) 097 * is {@code 10, 000} unique keys. 098 * <p/> 099 * The LRU cache is pluggable and defaults to {@link cascading.util.cache.LRUHashMapCache}. It can be changed 100 * by setting {@link cascading.pipe.assembly.AggregateByProps#AGGREGATE_BY_CACHE_FACTORY} property to the name of a sub-class of 101 * {@link cascading.util.cache.BaseCacheFactory}. 102 * <p/> 103 * Note using a AggregateBy instance automatically inserts a {@link GroupBy} into the resulting {@link cascading.flow.Flow}. 104 * And passing multiple AggregateBy instances to a parent AggregateBy instance still results in one GroupBy. 105 * <p/> 106 * Also note that {@link Unique} is not a CompositeAggregator and is slightly more optimized internally. 107 * <p/> 108 * As of Cascading 2.6 AggregateBy honors the {@link cascading.tuple.Hasher} interface for storing keys in the cache. 109 * 110 * @see SumBy 111 * @see CountBy 112 * @see Unique 113 * @see cascading.util.cache.LRUHashMapCacheFactory 114 * @see cascading.util.cache.DirectMappedCacheFactory 115 * @see cascading.util.cache.LRUHashMapCache 116 * @see cascading.util.cache.DirectMappedCache 117 */ 118 public class AggregateBy extends SubAssembly 119 { 120 @Deprecated 121 /**deprecated, use AGGREGATE_BY_CAPACITY instead.*/ 122 public static final String AGGREGATE_BY_THRESHOLD = "cascading.aggregateby.threshold"; 123 124 public static final int USE_DEFAULT_THRESHOLD = 0; 125 126 @Deprecated 127 public static final int DEFAULT_THRESHOLD = AggregateByProps.AGGREGATE_BY_DEFAULT_CAPACITY; 128 129 private String name; 130 private int capacity; 131 private Fields groupingFields; 132 private Fields[] argumentFields; 133 private Functor[] functors; 134 private Aggregator[] aggregators; 135 private transient GroupBy groupBy; 136 137 public enum Cache 138 { 139 Num_Keys_Flushed, 140 Num_Keys_Hit, 141 Num_Keys_Missed 142 } 143 144 @Deprecated 145 public enum Flush 146 { 147 Num_Keys_Flushed 148 } 149 150 /** 151 * Interface Functor provides a means to create a simple function for use with the {@link CompositeFunction} class. 152 * <p/> 153 * Note the {@link FlowProcess} argument provides access to the underlying properties and counter APIs. 154 */ 155 public interface Functor extends Serializable 156 { 157 /** 158 * Method getDeclaredFields returns the declaredFields of this Functor object. 159 * 160 * @return the declaredFields (type Fields) of this Functor object. 161 */ 162 Fields getDeclaredFields(); 163 164 /** 165 * Method aggregate operates on the given args in tandem (optionally) with the given context values. 166 * <p/> 167 * The context argument is the result of the previous call to this method. Use it to store values between aggregate 168 * calls (the current count, or sum of the args). 169 * <p/> 170 * On the very first invocation of aggregate for a given grouping key, context will be {@code null}. All subsequent 171 * invocations context will be the value returned on the previous invocation. 172 * 173 * @param flowProcess of type FlowProcess 174 * @param args of type TupleEntry 175 * @param context of type Tuple @return Tuple 176 */ 177 Tuple aggregate( FlowProcess flowProcess, TupleEntry args, Tuple context ); 178 179 /** 180 * Method complete allows the final aggregate computation to be performed before the return value is collected. 181 * <p/> 182 * The number of values in the returned {@link Tuple} instance must match the number of declaredFields. 183 * <p/> 184 * It is safe to return the context object as the result value. 185 * 186 * @param flowProcess of type FlowProcess 187 * @param context of type Tuple @return Tuple 188 */ 189 Tuple complete( FlowProcess flowProcess, Tuple context ); 190 } 191 192 /** 193 * Class CompositeFunction takes multiple Functor instances and manages them as a single {@link Function}. 194 * 195 * @see Functor 196 */ 197 public static class CompositeFunction extends BaseOperation<CompositeFunction.Context> implements Function<CompositeFunction.Context> 198 { 199 private int capacity = 0; 200 private final Fields groupingFields; 201 private final Fields[] argumentFields; 202 private final Fields[] functorFields; 203 private final Functor[] functors; 204 private final TupleHasher tupleHasher; 205 206 public static class Context 207 { 208 CascadingCache<Tuple, Tuple[]> lru; 209 TupleEntry[] arguments; 210 Tuple result; 211 } 212 213 /** 214 * Constructor CompositeFunction creates a new CompositeFunction instance. 215 * 216 * @param groupingFields of type Fields 217 * @param argumentFields of type Fields 218 * @param functor of type Functor 219 * @param capacity of type int 220 */ 221 public CompositeFunction( Fields groupingFields, Fields argumentFields, Functor functor, int capacity ) 222 { 223 this( groupingFields, Fields.fields( argumentFields ), new Functor[]{functor}, capacity ); 224 } 225 226 /** 227 * Constructor CompositeFunction creates a new CompositeFunction instance. 228 * 229 * @param groupingFields of type Fields 230 * @param argumentFields of type Fields[] 231 * @param functors of type Functor[] 232 * @param capacity of type int 233 */ 234 public CompositeFunction( Fields groupingFields, Fields[] argumentFields, Functor[] functors, int capacity ) 235 { 236 super( getFields( groupingFields, functors ) ); // todo: groupingFields should lookup incoming type information 237 this.groupingFields = groupingFields; 238 this.argumentFields = argumentFields; 239 this.functors = functors; 240 this.capacity = capacity; 241 242 this.functorFields = new Fields[ functors.length ]; 243 244 for( int i = 0; i < functors.length; i++ ) 245 this.functorFields[ i ] = functors[ i ].getDeclaredFields(); 246 247 Comparator[] hashers = TupleHasher.merge( functorFields ); 248 if( !TupleHasher.isNull( hashers ) ) 249 this.tupleHasher = new TupleHasher( null, hashers ); 250 else 251 this.tupleHasher = null; 252 } 253 254 private static Fields getFields( Fields groupingFields, Functor[] functors ) 255 { 256 Fields fields = groupingFields; 257 258 for( Functor functor : functors ) 259 fields = fields.append( functor.getDeclaredFields() ); 260 261 return fields; 262 } 263 264 @Override 265 public void prepare( final FlowProcess flowProcess, final OperationCall<CompositeFunction.Context> operationCall ) 266 { 267 Fields[] fields = new Fields[ functors.length + 1 ]; 268 269 fields[ 0 ] = groupingFields; 270 271 for( int i = 0; i < functors.length; i++ ) 272 fields[ i + 1 ] = functors[ i ].getDeclaredFields(); 273 274 final Context context = new Context(); 275 276 context.arguments = new TupleEntry[ functors.length ]; 277 278 for( int i = 0; i < context.arguments.length; i++ ) 279 { 280 Fields resolvedArgumentFields = operationCall.getArgumentFields(); 281 282 int[] pos; 283 284 if( argumentFields[ i ].isAll() ) 285 pos = resolvedArgumentFields.getPos(); 286 else 287 pos = resolvedArgumentFields.getPos( argumentFields[ i ] ); // returns null if selector is ALL 288 289 Tuple narrow = TupleViews.createNarrow( pos ); 290 291 Fields currentFields; 292 293 if( this.argumentFields[ i ].isSubstitution() ) 294 currentFields = resolvedArgumentFields.select( this.argumentFields[ i ] ); // attempt to retain comparator 295 else 296 currentFields = Fields.asDeclaration( this.argumentFields[ i ] ); 297 298 context.arguments[ i ] = new TupleEntry( currentFields, narrow ); 299 } 300 301 context.result = TupleViews.createComposite( fields ); 302 303 class AggregateByEviction implements CacheEvictionCallback<Tuple, Tuple[]> 304 { 305 @Override 306 public void evict( Map.Entry<Tuple, Tuple[]> entry ) 307 { 308 completeFunctors( flowProcess, ( (FunctionCall) operationCall ).getOutputCollector(), context.result, entry ); 309 flowProcess.increment( Cache.Num_Keys_Flushed, 1 ); 310 } 311 } 312 313 FactoryLoader loader = FactoryLoader.getInstance(); 314 315 BaseCacheFactory<Tuple, Tuple[], ?> factory = loader.loadFactoryFrom( flowProcess, AggregateByProps.AGGREGATE_BY_CACHE_FACTORY, AggregateByProps.DEFAULT_CACHE_FACTORY_CLASS ); 316 317 if( factory == null ) 318 throw new CascadingException( "unable to load cache factory, please check your '" + AggregateByProps.AGGREGATE_BY_CACHE_FACTORY + "' setting." ); 319 320 CascadingCache<Tuple, Tuple[]> cache = factory.create( flowProcess ); 321 322 cache.setCacheEvictionCallback( new AggregateByEviction() ); 323 324 Integer cacheCapacity = capacity; 325 if( capacity == 0 ) 326 { 327 cacheCapacity = flowProcess.getIntegerProperty( AggregateByProps.AGGREGATE_BY_CAPACITY ); 328 // backward compatibility 329 if( cacheCapacity == null ) 330 cacheCapacity = flowProcess.getIntegerProperty( AGGREGATE_BY_THRESHOLD ); 331 332 if( cacheCapacity == null ) 333 cacheCapacity = AggregateByProps.AGGREGATE_BY_DEFAULT_CAPACITY; 334 } 335 cache.setCapacity( cacheCapacity.intValue() ); 336 cache.initialize(); 337 338 context.lru = cache; 339 340 operationCall.setContext( context ); 341 } 342 343 @Override 344 public void operate( FlowProcess flowProcess, FunctionCall<CompositeFunction.Context> functionCall ) 345 { 346 TupleEntry arguments = functionCall.getArguments(); 347 Tuple key = TupleHasher.wrapTuple( this.tupleHasher, arguments.selectTupleCopy( groupingFields ) ); 348 349 Context context = functionCall.getContext(); 350 Tuple[] functorContext = context.lru.get( key ); 351 352 if( functorContext == null ) 353 { 354 functorContext = new Tuple[ functors.length ]; 355 context.lru.put( key, functorContext ); 356 flowProcess.increment( Cache.Num_Keys_Missed, 1 ); 357 } 358 else 359 { 360 flowProcess.increment( Cache.Num_Keys_Hit, 1 ); 361 } 362 363 for( int i = 0; i < functors.length; i++ ) 364 { 365 TupleViews.reset( context.arguments[ i ].getTuple(), arguments.getTuple() ); 366 functorContext[ i ] = functors[ i ].aggregate( flowProcess, context.arguments[ i ], functorContext[ i ] ); 367 } 368 } 369 370 @Override 371 public void flush( FlowProcess flowProcess, OperationCall<CompositeFunction.Context> operationCall ) 372 { 373 // need to drain context 374 TupleEntryCollector collector = ( (FunctionCall) operationCall ).getOutputCollector(); 375 376 Tuple result = operationCall.getContext().result; 377 Map<Tuple, Tuple[]> context = operationCall.getContext().lru; 378 379 for( Map.Entry<Tuple, Tuple[]> entry : context.entrySet() ) 380 completeFunctors( flowProcess, collector, result, entry ); 381 382 operationCall.setContext( null ); 383 } 384 385 private void completeFunctors( FlowProcess flowProcess, TupleEntryCollector outputCollector, Tuple result, Map.Entry<Tuple, Tuple[]> entry ) 386 { 387 Tuple[] results = new Tuple[ functors.length + 1 ]; 388 389 results[ 0 ] = entry.getKey(); 390 391 Tuple[] values = entry.getValue(); 392 393 for( int i = 0; i < functors.length; i++ ) 394 results[ i + 1 ] = functors[ i ].complete( flowProcess, values[ i ] ); 395 396 TupleViews.reset( result, results ); 397 398 outputCollector.add( result ); 399 } 400 401 @Override 402 public boolean equals( Object object ) 403 { 404 if( this == object ) 405 return true; 406 if( !( object instanceof CompositeFunction ) ) 407 return false; 408 if( !super.equals( object ) ) 409 return false; 410 411 CompositeFunction that = (CompositeFunction) object; 412 413 if( !Arrays.equals( argumentFields, that.argumentFields ) ) 414 return false; 415 if( !Arrays.equals( functorFields, that.functorFields ) ) 416 return false; 417 if( !Arrays.equals( functors, that.functors ) ) 418 return false; 419 if( groupingFields != null ? !groupingFields.equals( that.groupingFields ) : that.groupingFields != null ) 420 return false; 421 422 return true; 423 } 424 425 @Override 426 public int hashCode() 427 { 428 int result = super.hashCode(); 429 result = 31 * result + ( groupingFields != null ? groupingFields.hashCode() : 0 ); 430 result = 31 * result + ( argumentFields != null ? Arrays.hashCode( argumentFields ) : 0 ); 431 result = 31 * result + ( functorFields != null ? Arrays.hashCode( functorFields ) : 0 ); 432 result = 31 * result + ( functors != null ? Arrays.hashCode( functors ) : 0 ); 433 return result; 434 } 435 } 436 437 /** 438 * Constructor CompositeAggregator creates a new CompositeAggregator instance. 439 * 440 * @param name of type String 441 * @param capacity of type int 442 */ 443 protected AggregateBy( String name, int capacity ) 444 { 445 this.name = name; 446 this.capacity = capacity; 447 } 448 449 /** 450 * Constructor CompositeAggregator creates a new CompositeAggregator instance. 451 * 452 * @param argumentFields of type Fields 453 * @param functor of type Functor 454 * @param aggregator of type Aggregator 455 */ 456 protected AggregateBy( Fields argumentFields, Functor functor, Aggregator aggregator ) 457 { 458 this.argumentFields = Fields.fields( argumentFields ); 459 this.functors = new Functor[]{functor}; 460 this.aggregators = new Aggregator[]{aggregator}; 461 } 462 463 /** 464 * Constructor CompositeAggregator creates a new CompositeAggregator instance. 465 * 466 * @param pipe of type Pipe 467 * @param groupingFields of type Fields 468 * @param assemblies of type CompositeAggregator... 469 */ 470 @ConstructorProperties( {"pipe", "groupingFields", "assemblies"} ) 471 public AggregateBy( Pipe pipe, Fields groupingFields, AggregateBy... assemblies ) 472 { 473 this( null, Pipe.pipes( pipe ), groupingFields, 0, assemblies ); 474 } 475 476 /** 477 * Constructor CompositeAggregator creates a new CompositeAggregator instance. 478 * 479 * @param pipe of type Pipe 480 * @param groupingFields of type Fields 481 * @param capacity of type int 482 * @param assemblies of type CompositeAggregator... 483 */ 484 @ConstructorProperties( {"pipe", "groupingFields", "capacity", "assemblies"} ) 485 public AggregateBy( Pipe pipe, Fields groupingFields, int capacity, AggregateBy... assemblies ) 486 { 487 this( null, Pipe.pipes( pipe ), groupingFields, capacity, assemblies ); 488 } 489 490 /** 491 * Constructor CompositeAggregator creates a new CompositeAggregator instance. 492 * 493 * @param pipe of type Pipe 494 * @param groupingFields of type Fields 495 * @param capacity of type int 496 * @param assemblies of type CompositeAggregator... 497 */ 498 @ConstructorProperties( {"name", "pipe", "groupingFields", "capacity", "assemblies"} ) 499 public AggregateBy( String name, Pipe pipe, Fields groupingFields, int capacity, AggregateBy... assemblies ) 500 { 501 this( name, Pipe.pipes( pipe ), groupingFields, capacity, assemblies ); 502 } 503 504 /** 505 * Constructor CompositeAggregator creates a new CompositeAggregator instance. 506 * 507 * @param name of type String 508 * @param pipes of type Pipe[] 509 * @param groupingFields of type Fields 510 * @param assemblies of type CompositeAggregator... 511 */ 512 @ConstructorProperties( {"name", "pipes", "groupingFields", "assemblies"} ) 513 public AggregateBy( String name, Pipe[] pipes, Fields groupingFields, AggregateBy... assemblies ) 514 { 515 this( name, pipes, groupingFields, 0, assemblies ); 516 } 517 518 /** 519 * Constructor CompositeAggregator creates a new CompositeAggregator instance. 520 * 521 * @param name of type String 522 * @param pipes of type Pipe[] 523 * @param groupingFields of type Fields 524 * @param capacity of type int 525 * @param assemblies of type CompositeAggregator... 526 */ 527 @ConstructorProperties( {"name", "pipes", "groupingFields", "capacity", "assemblies"} ) 528 public AggregateBy( String name, Pipe[] pipes, Fields groupingFields, int capacity, AggregateBy... assemblies ) 529 { 530 this( name, capacity ); 531 532 List<Fields> arguments = new ArrayList<Fields>(); 533 List<Functor> functors = new ArrayList<Functor>(); 534 List<Aggregator> aggregators = new ArrayList<Aggregator>(); 535 536 for( int i = 0; i < assemblies.length; i++ ) 537 { 538 AggregateBy assembly = assemblies[ i ]; 539 540 Collections.addAll( arguments, assembly.getArgumentFields() ); 541 Collections.addAll( functors, assembly.getFunctors() ); 542 Collections.addAll( aggregators, assembly.getAggregators() ); 543 } 544 545 initialize( groupingFields, pipes, arguments.toArray( new Fields[ arguments.size() ] ), functors.toArray( new Functor[ functors.size() ] ), aggregators.toArray( new Aggregator[ aggregators.size() ] ) ); 546 } 547 548 protected AggregateBy( String name, Pipe[] pipes, Fields groupingFields, Fields argumentFields, Functor functor, Aggregator aggregator, int capacity ) 549 { 550 this( name, capacity ); 551 initialize( groupingFields, pipes, argumentFields, functor, aggregator ); 552 } 553 554 protected void initialize( Fields groupingFields, Pipe[] pipes, Fields argumentFields, Functor functor, Aggregator aggregator ) 555 { 556 initialize( groupingFields, pipes, Fields.fields( argumentFields ), 557 new Functor[]{functor}, 558 new Aggregator[]{aggregator} ); 559 } 560 561 protected void initialize( Fields groupingFields, Pipe[] pipes, Fields[] argumentFields, Functor[] functors, Aggregator[] aggregators ) 562 { 563 setPrevious( pipes ); 564 565 this.groupingFields = groupingFields; 566 this.argumentFields = argumentFields; 567 this.functors = functors; 568 this.aggregators = aggregators; 569 570 verify(); 571 572 Fields sortFields = Fields.copyComparators( Fields.merge( this.argumentFields ), this.argumentFields ); 573 Fields argumentSelector = Fields.merge( this.groupingFields, sortFields ); 574 575 if( argumentSelector.equals( Fields.NONE ) ) 576 argumentSelector = Fields.ALL; 577 578 Pipe[] functions = new Pipe[ pipes.length ]; 579 580 CompositeFunction function = new CompositeFunction( this.groupingFields, this.argumentFields, this.functors, capacity ); 581 582 for( int i = 0; i < functions.length; i++ ) 583 functions[ i ] = new Each( pipes[ i ], argumentSelector, function, Fields.RESULTS ); 584 585 groupBy = new GroupBy( name, functions, this.groupingFields, sortFields.hasComparators() ? sortFields : null ); 586 587 Pipe pipe = groupBy; 588 589 for( int i = 0; i < aggregators.length; i++ ) 590 pipe = new Every( pipe, this.functors[ i ].getDeclaredFields(), this.aggregators[ i ], Fields.ALL ); 591 592 setTails( pipe ); 593 } 594 595 /** Method verify should be overridden by sub-classes if any values must be tested before the calling constructor returns. */ 596 protected void verify() 597 { 598 599 } 600 601 /** 602 * Method getGroupingFields returns the Fields this instances will be grouping against. 603 * 604 * @return the current grouping fields 605 */ 606 public Fields getGroupingFields() 607 { 608 return groupingFields; 609 } 610 611 /** 612 * Method getFieldDeclarations returns an array of Fields where each Field element in the array corresponds to the 613 * field declaration of the given Aggregator operations. 614 * <p/> 615 * Note the actual Fields values are returned, not planner resolved Fields. 616 * 617 * @return and array of Fields 618 */ 619 public Fields[] getFieldDeclarations() 620 { 621 Fields[] fields = new Fields[ this.aggregators.length ]; 622 623 for( int i = 0; i < aggregators.length; i++ ) 624 fields[ i ] = aggregators[ i ].getFieldDeclaration(); 625 626 return fields; 627 } 628 629 protected Fields[] getArgumentFields() 630 { 631 return argumentFields; 632 } 633 634 protected Functor[] getFunctors() 635 { 636 return functors; 637 } 638 639 protected Aggregator[] getAggregators() 640 { 641 return aggregators; 642 } 643 644 /** 645 * Method getGroupBy returns the internal {@link GroupBy} instance so that any custom properties 646 * can be set on it via {@link cascading.pipe.Pipe#getStepConfigDef()}. 647 * 648 * @return GroupBy type 649 */ 650 public GroupBy getGroupBy() 651 { 652 return groupBy; 653 } 654 655 @Property( name = "threshold", visibility = Visibility.PUBLIC ) 656 @PropertyDescription( "Threshold of the aggregation." ) 657 @PropertyConfigured( value = AggregateByProps.AGGREGATE_BY_CAPACITY, defaultValue = "10000" ) 658 @Deprecated 659 public int getThreshold() 660 { 661 return capacity; 662 } 663 664 @Property( name = "capacity", visibility = Visibility.PUBLIC ) 665 @PropertyDescription( "Capacity of the aggregation cache." ) 666 @PropertyConfigured( value = AggregateByProps.AGGREGATE_BY_CAPACITY, defaultValue = "10000" ) 667 public int getCapacity() 668 { 669 return capacity; 670 } 671 }