001/*
002 * Copyright (c) 2016-2017 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
004 *
005 * Project and contact information: http://www.cascading.org/
006 *
007 * This file is part of the Cascading project.
008 *
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *     http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 */
021
022package cascading.pipe.assembly;
023
024import java.beans.ConstructorProperties;
025import java.io.Serializable;
026import java.util.ArrayList;
027import java.util.Arrays;
028import java.util.Collections;
029import java.util.Comparator;
030import java.util.List;
031import java.util.Map;
032
033import cascading.CascadingException;
034import cascading.flow.FlowProcess;
035import cascading.management.annotation.Property;
036import cascading.management.annotation.PropertyConfigured;
037import cascading.management.annotation.PropertyDescription;
038import cascading.management.annotation.Visibility;
039import cascading.operation.Aggregator;
040import cascading.operation.BaseOperation;
041import cascading.operation.Function;
042import cascading.operation.FunctionCall;
043import cascading.operation.OperationCall;
044import cascading.pipe.Each;
045import cascading.pipe.Every;
046import cascading.pipe.GroupBy;
047import cascading.pipe.Pipe;
048import cascading.pipe.SubAssembly;
049import cascading.provider.FactoryLoader;
050import cascading.tuple.Fields;
051import cascading.tuple.Tuple;
052import cascading.tuple.TupleEntry;
053import cascading.tuple.TupleEntryCollector;
054import cascading.tuple.util.TupleHasher;
055import cascading.tuple.util.TupleViews;
056import cascading.util.cache.BaseCacheFactory;
057import cascading.util.cache.CacheEvictionCallback;
058import cascading.util.cache.CascadingCache;
059
060/**
061 * Class AggregateBy is a {@link SubAssembly} that serves two roles for handling aggregate operations.
062 * <p>
063 * The first role is as a base class for composable aggregate operations that have a MapReduce Map side optimization for the
064 * Reduce side aggregation. For example 'summing' a value within a grouping can be performed partially Map side and
065 * completed Reduce side. Summing is associative and commutative.
066 * <p>
067 * AggregateBy also supports operations that are not associative/commutative like 'counting'. Counting
068 * would result in 'counting' value occurrences Map side but summing those counts Reduce side. (Yes, counting can be
069 * transposed to summing Map and Reduce sides by emitting 1's before the first sum, but that's three operations over
070 * two, and a hack)
071 * <p>
072 * Think of this mechanism as a MapReduce Combiner, but more efficient as no values are serialized,
073 * deserialized, saved to disk, and multi-pass sorted in the process, which consume cpu resources in trade of
074 * memory and a little or no IO.
075 * <p>
076 * Further, Combiners are limited to only associative/commutative operations.
077 * <p>
078 * Additionally the Cascading planner can move the Map side optimization
079 * to the previous Reduce operation further increasing IO performance (between the preceding Reduce and Map phase which
080 * is over HDFS).
081 * <p>
082 * The second role of the AggregateBy class is to allow for composition of AggregateBy
083 * sub-classes. That is, {@link SumBy} and {@link CountBy} AggregateBy sub-classes can be performed
084 * in parallel on the same grouping keys.
085 * <p>
086 * Custom AggregateBy classes can be created by sub-classing this class and implementing a special
087 * {@link Functor} for use on the Map side. Multiple Functor instances are managed by the {@link CompositeFunction}
088 * class allowing them all to share the same LRU value map for more efficiency.
089 * <p>
090 * AggregateBy instances return {@code argumentFields} which are used internally to control the values passed to
091 * internal Functor instances. If any argumentFields also have {@link java.util.Comparator}s, they will be used
092 * to for secondary sorting (see {@link GroupBy} {@code sortFields}. This feature is used by {@link FirstBy} to
093 * control which Tuple is seen first for a grouping.
094 * <p>
095 * To tune the LRU, set the {@code capacity} value to a high enough value to utilize available memory. Or set a
096 * default value via the {@link cascading.pipe.assembly.AggregateByProps#AGGREGATE_BY_CAPACITY} property. The current default
097 * ({@link cascading.util.cache.BaseCacheFactory#DEFAULT_CAPACITY})
098 * is {@code 10, 000} unique keys.
099 * <p>
100 * The LRU cache is pluggable and defaults to {@link cascading.util.cache.LRUHashMapCache}. It can be changed
101 * by setting {@link cascading.pipe.assembly.AggregateByProps#AGGREGATE_BY_CACHE_FACTORY} property to the name of a sub-class of
102 * {@link cascading.util.cache.BaseCacheFactory}.
103 * <p>
104 * Note using a AggregateBy instance automatically inserts a {@link GroupBy} into the resulting {@link cascading.flow.Flow}.
105 * And passing multiple AggregateBy instances to a parent AggregateBy instance still results in one GroupBy.
106 * <p>
107 * Also note that {@link Unique} is not a CompositeAggregator and is slightly more optimized internally.
108 * <p>
109 * As of Cascading 2.6 AggregateBy honors the {@link cascading.tuple.Hasher} interface for storing keys in the cache.
110 *
111 * @see SumBy
112 * @see CountBy
113 * @see Unique
114 * @see cascading.util.cache.LRUHashMapCacheFactory
115 * @see cascading.util.cache.DirectMappedCacheFactory
116 * @see cascading.util.cache.LRUHashMapCache
117 * @see cascading.util.cache.DirectMappedCache
118 */
119public class AggregateBy extends SubAssembly
120  {
121  public static final int USE_DEFAULT_THRESHOLD = 0;
122
123  private String name;
124  private int capacity;
125  private Fields groupingFields;
126  private Fields[] argumentFields;
127  private Functor[] functors;
128  private Aggregator[] aggregators;
129  private transient GroupBy groupBy;
130
131  public enum Cache
132    {
133      Num_Keys_Flushed,
134      Num_Keys_Hit,
135      Num_Keys_Missed
136    }
137
138  /**
139   * Interface Functor provides a means to create a simple function for use with the {@link CompositeFunction} class.
140   * <p>
141   * Note the {@link FlowProcess} argument provides access to the underlying properties and counter APIs.
142   */
143  public interface Functor extends Serializable
144    {
145    /**
146     * Method getDeclaredFields returns the declaredFields of this Functor object.
147     *
148     * @return the declaredFields (type Fields) of this Functor object.
149     */
150    Fields getDeclaredFields();
151
152    /**
153     * Method aggregate operates on the given args in tandem (optionally) with the given context values.
154     * <p>
155     * The context argument is the result of the previous call to this method. Use it to store values between aggregate
156     * calls (the current count, or sum of the args).
157     * <p>
158     * On the very first invocation of aggregate for a given grouping key, context will be {@code null}. All subsequent
159     * invocations context will be the value returned on the previous invocation.
160     *
161     * @param flowProcess of type FlowProcess
162     * @param args        of type TupleEntry
163     * @param context     of type Tuple   @return Tuple
164     */
165    Tuple aggregate( FlowProcess flowProcess, TupleEntry args, Tuple context );
166
167    /**
168     * Method complete allows the final aggregate computation to be performed before the return value is collected.
169     * <p>
170     * The number of values in the returned {@link Tuple} instance must match the number of declaredFields.
171     * <p>
172     * It is safe to return the context object as the result value.
173     *
174     * @param flowProcess of type FlowProcess
175     * @param context     of type Tuple  @return Tuple
176     */
177    Tuple complete( FlowProcess flowProcess, Tuple context );
178    }
179
180  /**
181   * Class CompositeFunction takes multiple Functor instances and manages them as a single {@link Function}.
182   *
183   * @see Functor
184   */
185  public static class CompositeFunction extends BaseOperation<CompositeFunction.Context> implements Function<CompositeFunction.Context>
186    {
187    private int capacity = 0;
188    private final Fields groupingFields;
189    private final Fields[] argumentFields;
190    private final Fields[] functorFields;
191    private final Functor[] functors;
192    private final TupleHasher tupleHasher;
193
194    public static class Context
195      {
196      CascadingCache<Tuple, Tuple[]> lru;
197      TupleEntry[] arguments;
198      Tuple result;
199      }
200
201    /**
202     * Constructor CompositeFunction creates a new CompositeFunction instance.
203     *
204     * @param groupingFields of type Fields
205     * @param argumentFields of type Fields
206     * @param functor        of type Functor
207     * @param capacity       of type int
208     */
209    public CompositeFunction( Fields groupingFields, Fields argumentFields, Functor functor, int capacity )
210      {
211      this( groupingFields, Fields.fields( argumentFields ), new Functor[]{functor}, capacity );
212      }
213
214    /**
215     * Constructor CompositeFunction creates a new CompositeFunction instance.
216     *
217     * @param groupingFields of type Fields
218     * @param argumentFields of type Fields[]
219     * @param functors       of type Functor[]
220     * @param capacity       of type int
221     */
222    public CompositeFunction( Fields groupingFields, Fields[] argumentFields, Functor[] functors, int capacity )
223      {
224      super( getFields( groupingFields, functors ) ); // todo: groupingFields should lookup incoming type information
225      this.groupingFields = groupingFields;
226      this.argumentFields = argumentFields;
227      this.functors = functors;
228      this.capacity = capacity;
229
230      this.functorFields = new Fields[ functors.length ];
231
232      for( int i = 0; i < functors.length; i++ )
233        this.functorFields[ i ] = functors[ i ].getDeclaredFields();
234
235      Comparator[] hashers = TupleHasher.merge( functorFields );
236      if( !TupleHasher.isNull( hashers ) )
237        this.tupleHasher = new TupleHasher( null, hashers );
238      else
239        this.tupleHasher = null;
240      }
241
242    private static Fields getFields( Fields groupingFields, Functor[] functors )
243      {
244      Fields fields = groupingFields;
245
246      for( Functor functor : functors )
247        fields = fields.append( functor.getDeclaredFields() );
248
249      return fields;
250      }
251
252    @Override
253    public void prepare( final FlowProcess flowProcess, final OperationCall<CompositeFunction.Context> operationCall )
254      {
255      Fields[] fields = new Fields[ functors.length + 1 ];
256
257      fields[ 0 ] = groupingFields;
258
259      for( int i = 0; i < functors.length; i++ )
260        fields[ i + 1 ] = functors[ i ].getDeclaredFields();
261
262      final Context context = new Context();
263
264      context.arguments = new TupleEntry[ functors.length ];
265
266      for( int i = 0; i < context.arguments.length; i++ )
267        {
268        Fields resolvedArgumentFields = operationCall.getArgumentFields();
269
270        int[] pos;
271
272        if( argumentFields[ i ].isAll() )
273          pos = resolvedArgumentFields.getPos();
274        else
275          pos = resolvedArgumentFields.getPos( argumentFields[ i ] ); // returns null if selector is ALL
276
277        Tuple narrow = TupleViews.createNarrow( pos );
278
279        Fields currentFields;
280
281        if( this.argumentFields[ i ].isSubstitution() )
282          currentFields = resolvedArgumentFields.select( this.argumentFields[ i ] ); // attempt to retain comparator
283        else
284          currentFields = Fields.asDeclaration( this.argumentFields[ i ] );
285
286        context.arguments[ i ] = new TupleEntry( currentFields, narrow );
287        }
288
289      context.result = TupleViews.createComposite( fields );
290
291      class AggregateByEviction implements CacheEvictionCallback<Tuple, Tuple[]>
292        {
293        @Override
294        public void evict( Map.Entry<Tuple, Tuple[]> entry )
295          {
296          completeFunctors( flowProcess, ( (FunctionCall) operationCall ).getOutputCollector(), context.result, entry );
297          flowProcess.increment( Cache.Num_Keys_Flushed, 1 );
298          }
299        }
300
301      FactoryLoader loader = FactoryLoader.getInstance();
302
303      BaseCacheFactory<Tuple, Tuple[], ?> factory = loader.loadFactoryFrom( flowProcess, AggregateByProps.AGGREGATE_BY_CACHE_FACTORY, AggregateByProps.DEFAULT_CACHE_FACTORY_CLASS );
304
305      if( factory == null )
306        throw new CascadingException( "unable to load cache factory, please check your '" + AggregateByProps.AGGREGATE_BY_CACHE_FACTORY + "' setting." );
307
308      CascadingCache<Tuple, Tuple[]> cache = factory.create( flowProcess );
309
310      cache.setCacheEvictionCallback( new AggregateByEviction() );
311
312      Integer cacheCapacity = capacity;
313      if( capacity == 0 )
314        {
315        cacheCapacity = flowProcess.getIntegerProperty( AggregateByProps.AGGREGATE_BY_CAPACITY );
316
317        if( cacheCapacity == null )
318          cacheCapacity = AggregateByProps.AGGREGATE_BY_DEFAULT_CAPACITY;
319        }
320      cache.setCapacity( cacheCapacity.intValue() );
321      cache.initialize();
322
323      context.lru = cache;
324
325      operationCall.setContext( context );
326      }
327
328    @Override
329    public void operate( FlowProcess flowProcess, FunctionCall<CompositeFunction.Context> functionCall )
330      {
331      TupleEntry arguments = functionCall.getArguments();
332      Tuple key = TupleHasher.wrapTuple( this.tupleHasher, arguments.selectTupleCopy( groupingFields ) );
333
334      Context context = functionCall.getContext();
335      Tuple[] functorContext = context.lru.get( key );
336
337      if( functorContext == null )
338        {
339        functorContext = new Tuple[ functors.length ];
340        context.lru.put( key, functorContext );
341        flowProcess.increment( Cache.Num_Keys_Missed, 1 );
342        }
343      else
344        {
345        flowProcess.increment( Cache.Num_Keys_Hit, 1 );
346        }
347
348      for( int i = 0; i < functors.length; i++ )
349        {
350        TupleViews.reset( context.arguments[ i ].getTuple(), arguments.getTuple() );
351        functorContext[ i ] = functors[ i ].aggregate( flowProcess, context.arguments[ i ], functorContext[ i ] );
352        }
353      }
354
355    @Override
356    public void flush( FlowProcess flowProcess, OperationCall<CompositeFunction.Context> operationCall )
357      {
358      // need to drain context
359      TupleEntryCollector collector = ( (FunctionCall) operationCall ).getOutputCollector();
360
361      Tuple result = operationCall.getContext().result;
362      Map<Tuple, Tuple[]> context = operationCall.getContext().lru;
363
364      for( Map.Entry<Tuple, Tuple[]> entry : context.entrySet() )
365        completeFunctors( flowProcess, collector, result, entry );
366
367      context.clear();
368      }
369
370    @Override
371    public void cleanup( FlowProcess flowProcess, OperationCall<Context> operationCall )
372      {
373      operationCall.setContext( null );
374      }
375
376    private void completeFunctors( FlowProcess flowProcess, TupleEntryCollector outputCollector, Tuple result, Map.Entry<Tuple, Tuple[]> entry )
377      {
378      Tuple[] results = new Tuple[ functors.length + 1 ];
379
380      results[ 0 ] = entry.getKey();
381
382      Tuple[] values = entry.getValue();
383
384      for( int i = 0; i < functors.length; i++ )
385        results[ i + 1 ] = functors[ i ].complete( flowProcess, values[ i ] );
386
387      TupleViews.reset( result, results );
388
389      outputCollector.add( result );
390      }
391
392    @Override
393    public boolean equals( Object object )
394      {
395      if( this == object )
396        return true;
397      if( !( object instanceof CompositeFunction ) )
398        return false;
399      if( !super.equals( object ) )
400        return false;
401
402      CompositeFunction that = (CompositeFunction) object;
403
404      if( !Arrays.equals( argumentFields, that.argumentFields ) )
405        return false;
406      if( !Arrays.equals( functorFields, that.functorFields ) )
407        return false;
408      if( !Arrays.equals( functors, that.functors ) )
409        return false;
410      if( groupingFields != null ? !groupingFields.equals( that.groupingFields ) : that.groupingFields != null )
411        return false;
412
413      return true;
414      }
415
416    @Override
417    public int hashCode()
418      {
419      int result = super.hashCode();
420      result = 31 * result + ( groupingFields != null ? groupingFields.hashCode() : 0 );
421      result = 31 * result + ( argumentFields != null ? Arrays.hashCode( argumentFields ) : 0 );
422      result = 31 * result + ( functorFields != null ? Arrays.hashCode( functorFields ) : 0 );
423      result = 31 * result + ( functors != null ? Arrays.hashCode( functors ) : 0 );
424      return result;
425      }
426    }
427
428  /**
429   * Constructor CompositeAggregator creates a new CompositeAggregator instance.
430   *
431   * @param name     of type String
432   * @param capacity of type int
433   */
434  protected AggregateBy( String name, int capacity )
435    {
436    this.name = name;
437    this.capacity = capacity;
438    }
439
440  /**
441   * Constructor CompositeAggregator creates a new CompositeAggregator instance.
442   *
443   * @param argumentFields of type Fields
444   * @param functor        of type Functor
445   * @param aggregator     of type Aggregator
446   */
447  protected AggregateBy( Fields argumentFields, Functor functor, Aggregator aggregator )
448    {
449    this.argumentFields = Fields.fields( argumentFields );
450    this.functors = new Functor[]{functor};
451    this.aggregators = new Aggregator[]{aggregator};
452    }
453
454  /**
455   * Constructor CompositeAggregator creates a new CompositeAggregator instance.
456   *
457   * @param pipe           of type Pipe
458   * @param groupingFields of type Fields
459   * @param assemblies     of type CompositeAggregator...
460   */
461  @ConstructorProperties({"pipe", "groupingFields", "assemblies"})
462  public AggregateBy( Pipe pipe, Fields groupingFields, AggregateBy... assemblies )
463    {
464    this( null, Pipe.pipes( pipe ), groupingFields, 0, assemblies );
465    }
466
467  /**
468   * Constructor CompositeAggregator creates a new CompositeAggregator instance.
469   *
470   * @param pipe           of type Pipe
471   * @param groupingFields of type Fields
472   * @param capacity       of type int
473   * @param assemblies     of type CompositeAggregator...
474   */
475  @ConstructorProperties({"pipe", "groupingFields", "capacity", "assemblies"})
476  public AggregateBy( Pipe pipe, Fields groupingFields, int capacity, AggregateBy... assemblies )
477    {
478    this( null, Pipe.pipes( pipe ), groupingFields, capacity, assemblies );
479    }
480
481  /**
482   * Constructor CompositeAggregator creates a new CompositeAggregator instance.
483   *
484   * @param pipe           of type Pipe
485   * @param groupingFields of type Fields
486   * @param capacity       of type int
487   * @param assemblies     of type CompositeAggregator...
488   */
489  @ConstructorProperties({"name", "pipe", "groupingFields", "capacity", "assemblies"})
490  public AggregateBy( String name, Pipe pipe, Fields groupingFields, int capacity, AggregateBy... assemblies )
491    {
492    this( name, Pipe.pipes( pipe ), groupingFields, capacity, assemblies );
493    }
494
495  /**
496   * Constructor CompositeAggregator creates a new CompositeAggregator instance.
497   *
498   * @param name           of type String
499   * @param pipes          of type Pipe[]
500   * @param groupingFields of type Fields
501   * @param assemblies     of type CompositeAggregator...
502   */
503  @ConstructorProperties({"name", "pipes", "groupingFields", "assemblies"})
504  public AggregateBy( String name, Pipe[] pipes, Fields groupingFields, AggregateBy... assemblies )
505    {
506    this( name, pipes, groupingFields, 0, assemblies );
507    }
508
509  /**
510   * Constructor CompositeAggregator creates a new CompositeAggregator instance.
511   *
512   * @param name           of type String
513   * @param pipes          of type Pipe[]
514   * @param groupingFields of type Fields
515   * @param capacity       of type int
516   * @param assemblies     of type CompositeAggregator...
517   */
518  @ConstructorProperties({"name", "pipes", "groupingFields", "capacity", "assemblies"})
519  public AggregateBy( String name, Pipe[] pipes, Fields groupingFields, int capacity, AggregateBy... assemblies )
520    {
521    this( name, capacity );
522
523    List<Fields> arguments = new ArrayList<Fields>();
524    List<Functor> functors = new ArrayList<Functor>();
525    List<Aggregator> aggregators = new ArrayList<Aggregator>();
526
527    for( int i = 0; i < assemblies.length; i++ )
528      {
529      AggregateBy assembly = assemblies[ i ];
530
531      Collections.addAll( arguments, assembly.getArgumentFields() );
532      Collections.addAll( functors, assembly.getFunctors() );
533      Collections.addAll( aggregators, assembly.getAggregators() );
534      }
535
536    initialize( groupingFields, pipes, arguments.toArray( new Fields[ arguments.size() ] ), functors.toArray( new Functor[ functors.size() ] ), aggregators.toArray( new Aggregator[ aggregators.size() ] ) );
537    }
538
539  protected AggregateBy( String name, Pipe[] pipes, Fields groupingFields, Fields argumentFields, Functor functor, Aggregator aggregator, int capacity )
540    {
541    this( name, capacity );
542    initialize( groupingFields, pipes, argumentFields, functor, aggregator );
543    }
544
545  protected void initialize( Fields groupingFields, Pipe[] pipes, Fields argumentFields, Functor functor, Aggregator aggregator )
546    {
547    initialize( groupingFields, pipes, Fields.fields( argumentFields ),
548      new Functor[]{functor},
549      new Aggregator[]{aggregator} );
550    }
551
552  protected void initialize( Fields groupingFields, Pipe[] pipes, Fields[] argumentFields, Functor[] functors, Aggregator[] aggregators )
553    {
554    setPrevious( pipes );
555
556    this.groupingFields = groupingFields;
557    this.argumentFields = argumentFields;
558    this.functors = functors;
559    this.aggregators = aggregators;
560
561    verify();
562
563    Fields sortFields = Fields.copyComparators( Fields.merge( this.argumentFields ), this.argumentFields );
564    Fields argumentSelector = Fields.merge( this.groupingFields, sortFields );
565
566    if( argumentSelector.equals( Fields.NONE ) )
567      argumentSelector = Fields.ALL;
568
569    Pipe[] functions = new Pipe[ pipes.length ];
570
571    CompositeFunction function = new CompositeFunction( this.groupingFields, this.argumentFields, this.functors, capacity );
572
573    for( int i = 0; i < functions.length; i++ )
574      functions[ i ] = new Each( pipes[ i ], argumentSelector, function, Fields.RESULTS );
575
576    groupBy = new GroupBy( name, functions, this.groupingFields, sortFields.hasComparators() ? sortFields : null );
577
578    Pipe pipe = groupBy;
579
580    for( int i = 0; i < aggregators.length; i++ )
581      pipe = new Every( pipe, this.functors[ i ].getDeclaredFields(), this.aggregators[ i ], Fields.ALL );
582
583    setTails( pipe );
584    }
585
586  /** Method verify should be overridden by sub-classes if any values must be tested before the calling constructor returns. */
587  protected void verify()
588    {
589
590    }
591
592  /**
593   * Method getGroupingFields returns the Fields this instances will be grouping against.
594   *
595   * @return the current grouping fields
596   */
597  public Fields getGroupingFields()
598    {
599    return groupingFields;
600    }
601
602  /**
603   * Method getFieldDeclarations returns an array of Fields where each Field element in the array corresponds to the
604   * field declaration of the given Aggregator operations.
605   * <p>
606   * Note the actual Fields values are returned, not planner resolved Fields.
607   *
608   * @return and array of Fields
609   */
610  public Fields[] getFieldDeclarations()
611    {
612    Fields[] fields = new Fields[ this.aggregators.length ];
613
614    for( int i = 0; i < aggregators.length; i++ )
615      fields[ i ] = aggregators[ i ].getFieldDeclaration();
616
617    return fields;
618    }
619
620  protected Fields[] getArgumentFields()
621    {
622    return argumentFields;
623    }
624
625  protected Functor[] getFunctors()
626    {
627    return functors;
628    }
629
630  protected Aggregator[] getAggregators()
631    {
632    return aggregators;
633    }
634
635  /**
636   * Method getGroupBy returns the internal {@link GroupBy} instance so that any custom properties
637   * can be set on it via {@link cascading.pipe.Pipe#getStepConfigDef()}.
638   *
639   * @return GroupBy type
640   */
641  public GroupBy getGroupBy()
642    {
643    return groupBy;
644    }
645
646  @Property(name = "capacity", visibility = Visibility.PUBLIC)
647  @PropertyDescription("Capacity of the aggregation cache.")
648  @PropertyConfigured(value = AggregateByProps.AGGREGATE_BY_CAPACITY, defaultValue = "10000")
649  public int getCapacity()
650    {
651    return capacity;
652    }
653  }