001    /*
002     * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.pipe.assembly;
022    
023    import java.beans.ConstructorProperties;
024    import java.io.Serializable;
025    import java.util.ArrayList;
026    import java.util.Arrays;
027    import java.util.Collections;
028    import java.util.Comparator;
029    import java.util.List;
030    import java.util.Map;
031    
032    import cascading.CascadingException;
033    import cascading.flow.FlowProcess;
034    import cascading.management.annotation.Property;
035    import cascading.management.annotation.PropertyConfigured;
036    import cascading.management.annotation.PropertyDescription;
037    import cascading.management.annotation.Visibility;
038    import cascading.operation.Aggregator;
039    import cascading.operation.BaseOperation;
040    import cascading.operation.Function;
041    import cascading.operation.FunctionCall;
042    import cascading.operation.OperationCall;
043    import cascading.pipe.Each;
044    import cascading.pipe.Every;
045    import cascading.pipe.GroupBy;
046    import cascading.pipe.Pipe;
047    import cascading.pipe.SubAssembly;
048    import cascading.provider.FactoryLoader;
049    import cascading.tuple.Fields;
050    import cascading.tuple.Tuple;
051    import cascading.tuple.TupleEntry;
052    import cascading.tuple.TupleEntryCollector;
053    import cascading.tuple.util.TupleHasher;
054    import cascading.tuple.util.TupleViews;
055    import cascading.util.cache.BaseCacheFactory;
056    import cascading.util.cache.CacheEvictionCallback;
057    import cascading.util.cache.CascadingCache;
058    
059    /**
060     * Class AggregateBy is a {@link SubAssembly} that serves two roles for handling aggregate operations.
061     * <p/>
062     * The first role is as a base class for composable aggregate operations that have a MapReduce Map side optimization for the
063     * Reduce side aggregation. For example 'summing' a value within a grouping can be performed partially Map side and
064     * completed Reduce side. Summing is associative and commutative.
065     * <p/>
066     * AggregateBy also supports operations that are not associative/commutative like 'counting'. Counting
067     * would result in 'counting' value occurrences Map side but summing those counts Reduce side. (Yes, counting can be
068     * transposed to summing Map and Reduce sides by emitting 1's before the first sum, but that's three operations over
069     * two, and a hack)
070     * <p/>
071     * Think of this mechanism as a MapReduce Combiner, but more efficient as no values are serialized,
072     * deserialized, saved to disk, and multi-pass sorted in the process, which consume cpu resources in trade of
073     * memory and a little or no IO.
074     * <p/>
075     * Further, Combiners are limited to only associative/commutative operations.
076     * <p/>
077     * Additionally the Cascading planner can move the Map side optimization
078     * to the previous Reduce operation further increasing IO performance (between the preceding Reduce and Map phase which
079     * is over HDFS).
080     * <p/>
081     * The second role of the AggregateBy class is to allow for composition of AggregateBy
082     * sub-classes. That is, {@link SumBy} and {@link CountBy} AggregateBy sub-classes can be performed
083     * in parallel on the same grouping keys.
084     * </p>
085     * Custom AggregateBy classes can be created by sub-classing this class and implementing a special
086     * {@link Functor} for use on the Map side. Multiple Functor instances are managed by the {@link CompositeFunction}
087     * class allowing them all to share the same LRU value map for more efficiency.
088     * <p/>
089     * AggregateBy instances return {@code argumentFields} which are used internally to control the values passed to
090     * internal Functor instances. If any argumentFields also have {@link java.util.Comparator}s, they will be used
091     * to for secondary sorting (see {@link GroupBy} {@code sortFields}. This feature is used by {@link FirstBy} to
092     * control which Tuple is seen first for a grouping.
093     * <p/>
094     * To tune the LRU, set the {@code capacity} value to a high enough value to utilize available memory. Or set a
095     * default value via the {@link cascading.pipe.assembly.AggregateByProps#AGGREGATE_BY_CAPACITY} property. The current default
096     * ({@link cascading.util.cache.BaseCacheFactory#DEFAULT_CAPACITY})
097     * is {@code 10, 000} unique keys.
098     * <p/>
099     * The LRU cache is pluggable and defaults to {@link cascading.util.cache.LRUHashMapCache}. It can be changed
100     * by setting {@link cascading.pipe.assembly.AggregateByProps#AGGREGATE_BY_CACHE_FACTORY} property to the name of a sub-class of
101     * {@link cascading.util.cache.BaseCacheFactory}.
102     * <p/>
103     * Note using a AggregateBy instance automatically inserts a {@link GroupBy} into the resulting {@link cascading.flow.Flow}.
104     * And passing multiple AggregateBy instances to a parent AggregateBy instance still results in one GroupBy.
105     * <p/>
106     * Also note that {@link Unique} is not a CompositeAggregator and is slightly more optimized internally.
107     * <p/>
108     * As of Cascading 2.6 AggregateBy honors the {@link cascading.tuple.Hasher} interface for storing keys in the cache.
109     *
110     * @see SumBy
111     * @see CountBy
112     * @see Unique
113     * @see cascading.util.cache.LRUHashMapCacheFactory
114     * @see cascading.util.cache.DirectMappedCacheFactory
115     * @see cascading.util.cache.LRUHashMapCache
116     * @see cascading.util.cache.DirectMappedCache
117     */
118    public class AggregateBy extends SubAssembly
119      {
120      @Deprecated
121      /**deprecated, use AGGREGATE_BY_CAPACITY instead.*/
122      public static final String AGGREGATE_BY_THRESHOLD = "cascading.aggregateby.threshold";
123    
124      public static final int USE_DEFAULT_THRESHOLD = 0;
125    
126      @Deprecated
127      public static final int DEFAULT_THRESHOLD = AggregateByProps.AGGREGATE_BY_DEFAULT_CAPACITY;
128    
129      private String name;
130      private int capacity;
131      private Fields groupingFields;
132      private Fields[] argumentFields;
133      private Functor[] functors;
134      private Aggregator[] aggregators;
135      private transient GroupBy groupBy;
136    
137      public enum Cache
138        {
139          Num_Keys_Flushed,
140          Num_Keys_Hit,
141          Num_Keys_Missed
142        }
143    
144      @Deprecated
145      public enum Flush
146        {
147          Num_Keys_Flushed
148        }
149    
150      /**
151       * Interface Functor provides a means to create a simple function for use with the {@link CompositeFunction} class.
152       * <p/>
153       * Note the {@link FlowProcess} argument provides access to the underlying properties and counter APIs.
154       */
155      public interface Functor extends Serializable
156        {
157        /**
158         * Method getDeclaredFields returns the declaredFields of this Functor object.
159         *
160         * @return the declaredFields (type Fields) of this Functor object.
161         */
162        Fields getDeclaredFields();
163    
164        /**
165         * Method aggregate operates on the given args in tandem (optionally) with the given context values.
166         * <p/>
167         * The context argument is the result of the previous call to this method. Use it to store values between aggregate
168         * calls (the current count, or sum of the args).
169         * <p/>
170         * On the very first invocation of aggregate for a given grouping key, context will be {@code null}. All subsequent
171         * invocations context will be the value returned on the previous invocation.
172         *
173         * @param flowProcess of type FlowProcess
174         * @param args        of type TupleEntry
175         * @param context     of type Tuple   @return Tuple
176         */
177        Tuple aggregate( FlowProcess flowProcess, TupleEntry args, Tuple context );
178    
179        /**
180         * Method complete allows the final aggregate computation to be performed before the return value is collected.
181         * <p/>
182         * The number of values in the returned {@link Tuple} instance must match the number of declaredFields.
183         * <p/>
184         * It is safe to return the context object as the result value.
185         *
186         * @param flowProcess of type FlowProcess
187         * @param context     of type Tuple  @return Tuple
188         */
189        Tuple complete( FlowProcess flowProcess, Tuple context );
190        }
191    
192      /**
193       * Class CompositeFunction takes multiple Functor instances and manages them as a single {@link Function}.
194       *
195       * @see Functor
196       */
197      public static class CompositeFunction extends BaseOperation<CompositeFunction.Context> implements Function<CompositeFunction.Context>
198        {
199        private int capacity = 0;
200        private final Fields groupingFields;
201        private final Fields[] argumentFields;
202        private final Fields[] functorFields;
203        private final Functor[] functors;
204        private final TupleHasher tupleHasher;
205    
206        public static class Context
207          {
208          CascadingCache<Tuple, Tuple[]> lru;
209          TupleEntry[] arguments;
210          Tuple result;
211          }
212    
213        /**
214         * Constructor CompositeFunction creates a new CompositeFunction instance.
215         *
216         * @param groupingFields of type Fields
217         * @param argumentFields of type Fields
218         * @param functor        of type Functor
219         * @param capacity       of type int
220         */
221        public CompositeFunction( Fields groupingFields, Fields argumentFields, Functor functor, int capacity )
222          {
223          this( groupingFields, Fields.fields( argumentFields ), new Functor[]{functor}, capacity );
224          }
225    
226        /**
227         * Constructor CompositeFunction creates a new CompositeFunction instance.
228         *
229         * @param groupingFields of type Fields
230         * @param argumentFields of type Fields[]
231         * @param functors       of type Functor[]
232         * @param capacity       of type int
233         */
234        public CompositeFunction( Fields groupingFields, Fields[] argumentFields, Functor[] functors, int capacity )
235          {
236          super( getFields( groupingFields, functors ) ); // todo: groupingFields should lookup incoming type information
237          this.groupingFields = groupingFields;
238          this.argumentFields = argumentFields;
239          this.functors = functors;
240          this.capacity = capacity;
241    
242          this.functorFields = new Fields[ functors.length ];
243    
244          for( int i = 0; i < functors.length; i++ )
245            this.functorFields[ i ] = functors[ i ].getDeclaredFields();
246    
247          Comparator[] hashers = TupleHasher.merge( functorFields );
248          if( !TupleHasher.isNull( hashers ) )
249            this.tupleHasher = new TupleHasher( null, hashers );
250          else
251            this.tupleHasher = null;
252          }
253    
254        private static Fields getFields( Fields groupingFields, Functor[] functors )
255          {
256          Fields fields = groupingFields;
257    
258          for( Functor functor : functors )
259            fields = fields.append( functor.getDeclaredFields() );
260    
261          return fields;
262          }
263    
264        @Override
265        public void prepare( final FlowProcess flowProcess, final OperationCall<CompositeFunction.Context> operationCall )
266          {
267          Fields[] fields = new Fields[ functors.length + 1 ];
268    
269          fields[ 0 ] = groupingFields;
270    
271          for( int i = 0; i < functors.length; i++ )
272            fields[ i + 1 ] = functors[ i ].getDeclaredFields();
273    
274          final Context context = new Context();
275    
276          context.arguments = new TupleEntry[ functors.length ];
277    
278          for( int i = 0; i < context.arguments.length; i++ )
279            {
280            Fields resolvedArgumentFields = operationCall.getArgumentFields();
281    
282            int[] pos;
283    
284            if( argumentFields[ i ].isAll() )
285              pos = resolvedArgumentFields.getPos();
286            else
287              pos = resolvedArgumentFields.getPos( argumentFields[ i ] ); // returns null if selector is ALL
288    
289            Tuple narrow = TupleViews.createNarrow( pos );
290    
291            Fields currentFields;
292    
293            if( this.argumentFields[ i ].isSubstitution() )
294              currentFields = resolvedArgumentFields.select( this.argumentFields[ i ] ); // attempt to retain comparator
295            else
296              currentFields = Fields.asDeclaration( this.argumentFields[ i ] );
297    
298            context.arguments[ i ] = new TupleEntry( currentFields, narrow );
299            }
300    
301          context.result = TupleViews.createComposite( fields );
302    
303          class AggregateByEviction implements CacheEvictionCallback<Tuple, Tuple[]>
304            {
305            @Override
306            public void evict( Map.Entry<Tuple, Tuple[]> entry )
307              {
308              completeFunctors( flowProcess, ( (FunctionCall) operationCall ).getOutputCollector(), context.result, entry );
309              flowProcess.increment( Cache.Num_Keys_Flushed, 1 );
310              }
311            }
312    
313          FactoryLoader loader = FactoryLoader.getInstance();
314    
315          BaseCacheFactory<Tuple, Tuple[], ?> factory = loader.loadFactoryFrom( flowProcess, AggregateByProps.AGGREGATE_BY_CACHE_FACTORY, AggregateByProps.DEFAULT_CACHE_FACTORY_CLASS );
316    
317          if( factory == null )
318            throw new CascadingException( "unable to load cache factory, please check your '" + AggregateByProps.AGGREGATE_BY_CACHE_FACTORY + "' setting." );
319    
320          CascadingCache<Tuple, Tuple[]> cache = factory.create( flowProcess );
321    
322          cache.setCacheEvictionCallback( new AggregateByEviction() );
323    
324          Integer cacheCapacity = capacity;
325          if( capacity == 0 )
326            {
327            cacheCapacity = flowProcess.getIntegerProperty( AggregateByProps.AGGREGATE_BY_CAPACITY );
328            // backward compatibility
329            if( cacheCapacity == null )
330              cacheCapacity = flowProcess.getIntegerProperty( AGGREGATE_BY_THRESHOLD );
331    
332            if( cacheCapacity == null )
333              cacheCapacity = AggregateByProps.AGGREGATE_BY_DEFAULT_CAPACITY;
334            }
335          cache.setCapacity( cacheCapacity.intValue() );
336          cache.initialize();
337    
338          context.lru = cache;
339    
340          operationCall.setContext( context );
341          }
342    
343        @Override
344        public void operate( FlowProcess flowProcess, FunctionCall<CompositeFunction.Context> functionCall )
345          {
346          TupleEntry arguments = functionCall.getArguments();
347          Tuple key = TupleHasher.wrapTuple( this.tupleHasher, arguments.selectTupleCopy( groupingFields ) );
348    
349          Context context = functionCall.getContext();
350          Tuple[] functorContext = context.lru.get( key );
351    
352          if( functorContext == null )
353            {
354            functorContext = new Tuple[ functors.length ];
355            context.lru.put( key, functorContext );
356            flowProcess.increment( Cache.Num_Keys_Missed, 1 );
357            }
358          else
359            {
360            flowProcess.increment( Cache.Num_Keys_Hit, 1 );
361            }
362    
363          for( int i = 0; i < functors.length; i++ )
364            {
365            TupleViews.reset( context.arguments[ i ].getTuple(), arguments.getTuple() );
366            functorContext[ i ] = functors[ i ].aggregate( flowProcess, context.arguments[ i ], functorContext[ i ] );
367            }
368          }
369    
370        @Override
371        public void flush( FlowProcess flowProcess, OperationCall<CompositeFunction.Context> operationCall )
372          {
373          // need to drain context
374          TupleEntryCollector collector = ( (FunctionCall) operationCall ).getOutputCollector();
375    
376          Tuple result = operationCall.getContext().result;
377          Map<Tuple, Tuple[]> context = operationCall.getContext().lru;
378    
379          for( Map.Entry<Tuple, Tuple[]> entry : context.entrySet() )
380            completeFunctors( flowProcess, collector, result, entry );
381    
382          operationCall.setContext( null );
383          }
384    
385        private void completeFunctors( FlowProcess flowProcess, TupleEntryCollector outputCollector, Tuple result, Map.Entry<Tuple, Tuple[]> entry )
386          {
387          Tuple[] results = new Tuple[ functors.length + 1 ];
388    
389          results[ 0 ] = entry.getKey();
390    
391          Tuple[] values = entry.getValue();
392    
393          for( int i = 0; i < functors.length; i++ )
394            results[ i + 1 ] = functors[ i ].complete( flowProcess, values[ i ] );
395    
396          TupleViews.reset( result, results );
397    
398          outputCollector.add( result );
399          }
400    
401        @Override
402        public boolean equals( Object object )
403          {
404          if( this == object )
405            return true;
406          if( !( object instanceof CompositeFunction ) )
407            return false;
408          if( !super.equals( object ) )
409            return false;
410    
411          CompositeFunction that = (CompositeFunction) object;
412    
413          if( !Arrays.equals( argumentFields, that.argumentFields ) )
414            return false;
415          if( !Arrays.equals( functorFields, that.functorFields ) )
416            return false;
417          if( !Arrays.equals( functors, that.functors ) )
418            return false;
419          if( groupingFields != null ? !groupingFields.equals( that.groupingFields ) : that.groupingFields != null )
420            return false;
421    
422          return true;
423          }
424    
425        @Override
426        public int hashCode()
427          {
428          int result = super.hashCode();
429          result = 31 * result + ( groupingFields != null ? groupingFields.hashCode() : 0 );
430          result = 31 * result + ( argumentFields != null ? Arrays.hashCode( argumentFields ) : 0 );
431          result = 31 * result + ( functorFields != null ? Arrays.hashCode( functorFields ) : 0 );
432          result = 31 * result + ( functors != null ? Arrays.hashCode( functors ) : 0 );
433          return result;
434          }
435        }
436    
437      /**
438       * Constructor CompositeAggregator creates a new CompositeAggregator instance.
439       *
440       * @param name     of type String
441       * @param capacity of type int
442       */
443      protected AggregateBy( String name, int capacity )
444        {
445        this.name = name;
446        this.capacity = capacity;
447        }
448    
449      /**
450       * Constructor CompositeAggregator creates a new CompositeAggregator instance.
451       *
452       * @param argumentFields of type Fields
453       * @param functor        of type Functor
454       * @param aggregator     of type Aggregator
455       */
456      protected AggregateBy( Fields argumentFields, Functor functor, Aggregator aggregator )
457        {
458        this.argumentFields = Fields.fields( argumentFields );
459        this.functors = new Functor[]{functor};
460        this.aggregators = new Aggregator[]{aggregator};
461        }
462    
463      /**
464       * Constructor CompositeAggregator creates a new CompositeAggregator instance.
465       *
466       * @param pipe           of type Pipe
467       * @param groupingFields of type Fields
468       * @param assemblies     of type CompositeAggregator...
469       */
470      @ConstructorProperties( {"pipe", "groupingFields", "assemblies"} )
471      public AggregateBy( Pipe pipe, Fields groupingFields, AggregateBy... assemblies )
472        {
473        this( null, Pipe.pipes( pipe ), groupingFields, 0, assemblies );
474        }
475    
476      /**
477       * Constructor CompositeAggregator creates a new CompositeAggregator instance.
478       *
479       * @param pipe           of type Pipe
480       * @param groupingFields of type Fields
481       * @param capacity       of type int
482       * @param assemblies     of type CompositeAggregator...
483       */
484      @ConstructorProperties( {"pipe", "groupingFields", "capacity", "assemblies"} )
485      public AggregateBy( Pipe pipe, Fields groupingFields, int capacity, AggregateBy... assemblies )
486        {
487        this( null, Pipe.pipes( pipe ), groupingFields, capacity, assemblies );
488        }
489    
490      /**
491       * Constructor CompositeAggregator creates a new CompositeAggregator instance.
492       *
493       * @param pipe           of type Pipe
494       * @param groupingFields of type Fields
495       * @param capacity       of type int
496       * @param assemblies     of type CompositeAggregator...
497       */
498      @ConstructorProperties( {"name", "pipe", "groupingFields", "capacity", "assemblies"} )
499      public AggregateBy( String name, Pipe pipe, Fields groupingFields, int capacity, AggregateBy... assemblies )
500        {
501        this( name, Pipe.pipes( pipe ), groupingFields, capacity, assemblies );
502        }
503    
504      /**
505       * Constructor CompositeAggregator creates a new CompositeAggregator instance.
506       *
507       * @param name           of type String
508       * @param pipes          of type Pipe[]
509       * @param groupingFields of type Fields
510       * @param assemblies     of type CompositeAggregator...
511       */
512      @ConstructorProperties( {"name", "pipes", "groupingFields", "assemblies"} )
513      public AggregateBy( String name, Pipe[] pipes, Fields groupingFields, AggregateBy... assemblies )
514        {
515        this( name, pipes, groupingFields, 0, assemblies );
516        }
517    
518      /**
519       * Constructor CompositeAggregator creates a new CompositeAggregator instance.
520       *
521       * @param name           of type String
522       * @param pipes          of type Pipe[]
523       * @param groupingFields of type Fields
524       * @param capacity       of type int
525       * @param assemblies     of type CompositeAggregator...
526       */
527      @ConstructorProperties( {"name", "pipes", "groupingFields", "capacity", "assemblies"} )
528      public AggregateBy( String name, Pipe[] pipes, Fields groupingFields, int capacity, AggregateBy... assemblies )
529        {
530        this( name, capacity );
531    
532        List<Fields> arguments = new ArrayList<Fields>();
533        List<Functor> functors = new ArrayList<Functor>();
534        List<Aggregator> aggregators = new ArrayList<Aggregator>();
535    
536        for( int i = 0; i < assemblies.length; i++ )
537          {
538          AggregateBy assembly = assemblies[ i ];
539    
540          Collections.addAll( arguments, assembly.getArgumentFields() );
541          Collections.addAll( functors, assembly.getFunctors() );
542          Collections.addAll( aggregators, assembly.getAggregators() );
543          }
544    
545        initialize( groupingFields, pipes, arguments.toArray( new Fields[ arguments.size() ] ), functors.toArray( new Functor[ functors.size() ] ), aggregators.toArray( new Aggregator[ aggregators.size() ] ) );
546        }
547    
548      protected AggregateBy( String name, Pipe[] pipes, Fields groupingFields, Fields argumentFields, Functor functor, Aggregator aggregator, int capacity )
549        {
550        this( name, capacity );
551        initialize( groupingFields, pipes, argumentFields, functor, aggregator );
552        }
553    
554      protected void initialize( Fields groupingFields, Pipe[] pipes, Fields argumentFields, Functor functor, Aggregator aggregator )
555        {
556        initialize( groupingFields, pipes, Fields.fields( argumentFields ),
557          new Functor[]{functor},
558          new Aggregator[]{aggregator} );
559        }
560    
561      protected void initialize( Fields groupingFields, Pipe[] pipes, Fields[] argumentFields, Functor[] functors, Aggregator[] aggregators )
562        {
563        setPrevious( pipes );
564    
565        this.groupingFields = groupingFields;
566        this.argumentFields = argumentFields;
567        this.functors = functors;
568        this.aggregators = aggregators;
569    
570        verify();
571    
572        Fields sortFields = Fields.copyComparators( Fields.merge( this.argumentFields ), this.argumentFields );
573        Fields argumentSelector = Fields.merge( this.groupingFields, sortFields );
574    
575        if( argumentSelector.equals( Fields.NONE ) )
576          argumentSelector = Fields.ALL;
577    
578        Pipe[] functions = new Pipe[ pipes.length ];
579    
580        CompositeFunction function = new CompositeFunction( this.groupingFields, this.argumentFields, this.functors, capacity );
581    
582        for( int i = 0; i < functions.length; i++ )
583          functions[ i ] = new Each( pipes[ i ], argumentSelector, function, Fields.RESULTS );
584    
585        groupBy = new GroupBy( name, functions, this.groupingFields, sortFields.hasComparators() ? sortFields : null );
586    
587        Pipe pipe = groupBy;
588    
589        for( int i = 0; i < aggregators.length; i++ )
590          pipe = new Every( pipe, this.functors[ i ].getDeclaredFields(), this.aggregators[ i ], Fields.ALL );
591    
592        setTails( pipe );
593        }
594    
595      /** Method verify should be overridden by sub-classes if any values must be tested before the calling constructor returns. */
596      protected void verify()
597        {
598    
599        }
600    
601      /**
602       * Method getGroupingFields returns the Fields this instances will be grouping against.
603       *
604       * @return the current grouping fields
605       */
606      public Fields getGroupingFields()
607        {
608        return groupingFields;
609        }
610    
611      /**
612       * Method getFieldDeclarations returns an array of Fields where each Field element in the array corresponds to the
613       * field declaration of the given Aggregator operations.
614       * <p/>
615       * Note the actual Fields values are returned, not planner resolved Fields.
616       *
617       * @return and array of Fields
618       */
619      public Fields[] getFieldDeclarations()
620        {
621        Fields[] fields = new Fields[ this.aggregators.length ];
622    
623        for( int i = 0; i < aggregators.length; i++ )
624          fields[ i ] = aggregators[ i ].getFieldDeclaration();
625    
626        return fields;
627        }
628    
629      protected Fields[] getArgumentFields()
630        {
631        return argumentFields;
632        }
633    
634      protected Functor[] getFunctors()
635        {
636        return functors;
637        }
638    
639      protected Aggregator[] getAggregators()
640        {
641        return aggregators;
642        }
643    
644      /**
645       * Method getGroupBy returns the internal {@link GroupBy} instance so that any custom properties
646       * can be set on it via {@link cascading.pipe.Pipe#getStepConfigDef()}.
647       *
648       * @return GroupBy type
649       */
650      public GroupBy getGroupBy()
651        {
652        return groupBy;
653        }
654    
655      @Property( name = "threshold", visibility = Visibility.PUBLIC )
656      @PropertyDescription( "Threshold of the aggregation." )
657      @PropertyConfigured( value = AggregateByProps.AGGREGATE_BY_CAPACITY, defaultValue = "10000" )
658      @Deprecated
659      public int getThreshold()
660        {
661        return capacity;
662        }
663    
664      @Property( name = "capacity", visibility = Visibility.PUBLIC )
665      @PropertyDescription( "Capacity of the aggregation cache." )
666      @PropertyConfigured( value = AggregateByProps.AGGREGATE_BY_CAPACITY, defaultValue = "10000" )
667      public int getCapacity()
668        {
669        return capacity;
670        }
671      }