001/*
002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.pipe.assembly;
022
023import java.beans.ConstructorProperties;
024import java.io.Serializable;
025import java.util.ArrayList;
026import java.util.Arrays;
027import java.util.Collections;
028import java.util.Comparator;
029import java.util.List;
030import java.util.Map;
031
032import cascading.CascadingException;
033import cascading.flow.FlowProcess;
034import cascading.management.annotation.Property;
035import cascading.management.annotation.PropertyConfigured;
036import cascading.management.annotation.PropertyDescription;
037import cascading.management.annotation.Visibility;
038import cascading.operation.Aggregator;
039import cascading.operation.BaseOperation;
040import cascading.operation.Function;
041import cascading.operation.FunctionCall;
042import cascading.operation.OperationCall;
043import cascading.pipe.Each;
044import cascading.pipe.Every;
045import cascading.pipe.GroupBy;
046import cascading.pipe.Pipe;
047import cascading.pipe.SubAssembly;
048import cascading.provider.FactoryLoader;
049import cascading.tuple.Fields;
050import cascading.tuple.Tuple;
051import cascading.tuple.TupleEntry;
052import cascading.tuple.TupleEntryCollector;
053import cascading.tuple.util.TupleHasher;
054import cascading.tuple.util.TupleViews;
055import cascading.util.cache.BaseCacheFactory;
056import cascading.util.cache.CacheEvictionCallback;
057import cascading.util.cache.CascadingCache;
058
059/**
060 * Class AggregateBy is a {@link SubAssembly} that serves two roles for handling aggregate operations.
061 * <p/>
062 * The first role is as a base class for composable aggregate operations that have a MapReduce Map side optimization for the
063 * Reduce side aggregation. For example 'summing' a value within a grouping can be performed partially Map side and
064 * completed Reduce side. Summing is associative and commutative.
065 * <p/>
066 * AggregateBy also supports operations that are not associative/commutative like 'counting'. Counting
067 * would result in 'counting' value occurrences Map side but summing those counts Reduce side. (Yes, counting can be
068 * transposed to summing Map and Reduce sides by emitting 1's before the first sum, but that's three operations over
069 * two, and a hack)
070 * <p/>
071 * Think of this mechanism as a MapReduce Combiner, but more efficient as no values are serialized,
072 * deserialized, saved to disk, and multi-pass sorted in the process, which consume cpu resources in trade of
073 * memory and a little or no IO.
074 * <p/>
075 * Further, Combiners are limited to only associative/commutative operations.
076 * <p/>
077 * Additionally the Cascading planner can move the Map side optimization
078 * to the previous Reduce operation further increasing IO performance (between the preceding Reduce and Map phase which
079 * is over HDFS).
080 * <p/>
081 * The second role of the AggregateBy class is to allow for composition of AggregateBy
082 * sub-classes. That is, {@link SumBy} and {@link CountBy} AggregateBy sub-classes can be performed
083 * in parallel on the same grouping keys.
084 * </p>
085 * Custom AggregateBy classes can be created by sub-classing this class and implementing a special
086 * {@link Functor} for use on the Map side. Multiple Functor instances are managed by the {@link CompositeFunction}
087 * class allowing them all to share the same LRU value map for more efficiency.
088 * <p/>
089 * AggregateBy instances return {@code argumentFields} which are used internally to control the values passed to
090 * internal Functor instances. If any argumentFields also have {@link java.util.Comparator}s, they will be used
091 * to for secondary sorting (see {@link GroupBy} {@code sortFields}. This feature is used by {@link FirstBy} to
092 * control which Tuple is seen first for a grouping.
093 * <p/>
094 * To tune the LRU, set the {@code capacity} value to a high enough value to utilize available memory. Or set a
095 * default value via the {@link cascading.pipe.assembly.AggregateByProps#AGGREGATE_BY_CAPACITY} property. The current default
096 * ({@link cascading.util.cache.BaseCacheFactory#DEFAULT_CAPACITY})
097 * is {@code 10, 000} unique keys.
098 * <p/>
099 * The LRU cache is pluggable and defaults to {@link cascading.util.cache.LRUHashMapCache}. It can be changed
100 * by setting {@link cascading.pipe.assembly.AggregateByProps#AGGREGATE_BY_CACHE_FACTORY} property to the name of a sub-class of
101 * {@link cascading.util.cache.BaseCacheFactory}.
102 * <p/>
103 * Note using a AggregateBy instance automatically inserts a {@link GroupBy} into the resulting {@link cascading.flow.Flow}.
104 * And passing multiple AggregateBy instances to a parent AggregateBy instance still results in one GroupBy.
105 * <p/>
106 * Also note that {@link Unique} is not a CompositeAggregator and is slightly more optimized internally.
107 * <p/>
108 * As of Cascading 2.6 AggregateBy honors the {@link cascading.tuple.Hasher} interface for storing keys in the cache.
109 *
110 * @see SumBy
111 * @see CountBy
112 * @see Unique
113 * @see cascading.util.cache.LRUHashMapCacheFactory
114 * @see cascading.util.cache.DirectMappedCacheFactory
115 * @see cascading.util.cache.LRUHashMapCache
116 * @see cascading.util.cache.DirectMappedCache
117 */
118public class AggregateBy extends SubAssembly
119  {
120  public static final int USE_DEFAULT_THRESHOLD = 0;
121
122  private String name;
123  private int capacity;
124  private Fields groupingFields;
125  private Fields[] argumentFields;
126  private Functor[] functors;
127  private Aggregator[] aggregators;
128  private transient GroupBy groupBy;
129
130  public enum Cache
131    {
132      Num_Keys_Flushed,
133      Num_Keys_Hit,
134      Num_Keys_Missed
135    }
136
137  /**
138   * Interface Functor provides a means to create a simple function for use with the {@link CompositeFunction} class.
139   * <p/>
140   * Note the {@link FlowProcess} argument provides access to the underlying properties and counter APIs.
141   */
142  public interface Functor extends Serializable
143    {
144    /**
145     * Method getDeclaredFields returns the declaredFields of this Functor object.
146     *
147     * @return the declaredFields (type Fields) of this Functor object.
148     */
149    Fields getDeclaredFields();
150
151    /**
152     * Method aggregate operates on the given args in tandem (optionally) with the given context values.
153     * <p/>
154     * The context argument is the result of the previous call to this method. Use it to store values between aggregate
155     * calls (the current count, or sum of the args).
156     * <p/>
157     * On the very first invocation of aggregate for a given grouping key, context will be {@code null}. All subsequent
158     * invocations context will be the value returned on the previous invocation.
159     *
160     * @param flowProcess of type FlowProcess
161     * @param args        of type TupleEntry
162     * @param context     of type Tuple   @return Tuple
163     */
164    Tuple aggregate( FlowProcess flowProcess, TupleEntry args, Tuple context );
165
166    /**
167     * Method complete allows the final aggregate computation to be performed before the return value is collected.
168     * <p/>
169     * The number of values in the returned {@link Tuple} instance must match the number of declaredFields.
170     * <p/>
171     * It is safe to return the context object as the result value.
172     *
173     * @param flowProcess of type FlowProcess
174     * @param context     of type Tuple  @return Tuple
175     */
176    Tuple complete( FlowProcess flowProcess, Tuple context );
177    }
178
179  /**
180   * Class CompositeFunction takes multiple Functor instances and manages them as a single {@link Function}.
181   *
182   * @see Functor
183   */
184  public static class CompositeFunction extends BaseOperation<CompositeFunction.Context> implements Function<CompositeFunction.Context>
185    {
186    private int capacity = 0;
187    private final Fields groupingFields;
188    private final Fields[] argumentFields;
189    private final Fields[] functorFields;
190    private final Functor[] functors;
191    private final TupleHasher tupleHasher;
192
193    public static class Context
194      {
195      CascadingCache<Tuple, Tuple[]> lru;
196      TupleEntry[] arguments;
197      Tuple result;
198      }
199
200    /**
201     * Constructor CompositeFunction creates a new CompositeFunction instance.
202     *
203     * @param groupingFields of type Fields
204     * @param argumentFields of type Fields
205     * @param functor        of type Functor
206     * @param capacity       of type int
207     */
208    public CompositeFunction( Fields groupingFields, Fields argumentFields, Functor functor, int capacity )
209      {
210      this( groupingFields, Fields.fields( argumentFields ), new Functor[]{functor}, capacity );
211      }
212
213    /**
214     * Constructor CompositeFunction creates a new CompositeFunction instance.
215     *
216     * @param groupingFields of type Fields
217     * @param argumentFields of type Fields[]
218     * @param functors       of type Functor[]
219     * @param capacity       of type int
220     */
221    public CompositeFunction( Fields groupingFields, Fields[] argumentFields, Functor[] functors, int capacity )
222      {
223      super( getFields( groupingFields, functors ) ); // todo: groupingFields should lookup incoming type information
224      this.groupingFields = groupingFields;
225      this.argumentFields = argumentFields;
226      this.functors = functors;
227      this.capacity = capacity;
228
229      this.functorFields = new Fields[ functors.length ];
230
231      for( int i = 0; i < functors.length; i++ )
232        this.functorFields[ i ] = functors[ i ].getDeclaredFields();
233
234      Comparator[] hashers = TupleHasher.merge( functorFields );
235      if( !TupleHasher.isNull( hashers ) )
236        this.tupleHasher = new TupleHasher( null, hashers );
237      else
238        this.tupleHasher = null;
239      }
240
241    private static Fields getFields( Fields groupingFields, Functor[] functors )
242      {
243      Fields fields = groupingFields;
244
245      for( Functor functor : functors )
246        fields = fields.append( functor.getDeclaredFields() );
247
248      return fields;
249      }
250
251    @Override
252    public void prepare( final FlowProcess flowProcess, final OperationCall<CompositeFunction.Context> operationCall )
253      {
254      Fields[] fields = new Fields[ functors.length + 1 ];
255
256      fields[ 0 ] = groupingFields;
257
258      for( int i = 0; i < functors.length; i++ )
259        fields[ i + 1 ] = functors[ i ].getDeclaredFields();
260
261      final Context context = new Context();
262
263      context.arguments = new TupleEntry[ functors.length ];
264
265      for( int i = 0; i < context.arguments.length; i++ )
266        {
267        Fields resolvedArgumentFields = operationCall.getArgumentFields();
268
269        int[] pos;
270
271        if( argumentFields[ i ].isAll() )
272          pos = resolvedArgumentFields.getPos();
273        else
274          pos = resolvedArgumentFields.getPos( argumentFields[ i ] ); // returns null if selector is ALL
275
276        Tuple narrow = TupleViews.createNarrow( pos );
277
278        Fields currentFields;
279
280        if( this.argumentFields[ i ].isSubstitution() )
281          currentFields = resolvedArgumentFields.select( this.argumentFields[ i ] ); // attempt to retain comparator
282        else
283          currentFields = Fields.asDeclaration( this.argumentFields[ i ] );
284
285        context.arguments[ i ] = new TupleEntry( currentFields, narrow );
286        }
287
288      context.result = TupleViews.createComposite( fields );
289
290      class AggregateByEviction implements CacheEvictionCallback<Tuple, Tuple[]>
291        {
292        @Override
293        public void evict( Map.Entry<Tuple, Tuple[]> entry )
294          {
295          completeFunctors( flowProcess, ( (FunctionCall) operationCall ).getOutputCollector(), context.result, entry );
296          flowProcess.increment( Cache.Num_Keys_Flushed, 1 );
297          }
298        }
299
300      FactoryLoader loader = FactoryLoader.getInstance();
301
302      BaseCacheFactory<Tuple, Tuple[], ?> factory = loader.loadFactoryFrom( flowProcess, AggregateByProps.AGGREGATE_BY_CACHE_FACTORY, AggregateByProps.DEFAULT_CACHE_FACTORY_CLASS );
303
304      if( factory == null )
305        throw new CascadingException( "unable to load cache factory, please check your '" + AggregateByProps.AGGREGATE_BY_CACHE_FACTORY + "' setting." );
306
307      CascadingCache<Tuple, Tuple[]> cache = factory.create( flowProcess );
308
309      cache.setCacheEvictionCallback( new AggregateByEviction() );
310
311      Integer cacheCapacity = capacity;
312      if( capacity == 0 )
313        {
314        cacheCapacity = flowProcess.getIntegerProperty( AggregateByProps.AGGREGATE_BY_CAPACITY );
315
316        if( cacheCapacity == null )
317          cacheCapacity = AggregateByProps.AGGREGATE_BY_DEFAULT_CAPACITY;
318        }
319      cache.setCapacity( cacheCapacity.intValue() );
320      cache.initialize();
321
322      context.lru = cache;
323
324      operationCall.setContext( context );
325      }
326
327    @Override
328    public void operate( FlowProcess flowProcess, FunctionCall<CompositeFunction.Context> functionCall )
329      {
330      TupleEntry arguments = functionCall.getArguments();
331      Tuple key = TupleHasher.wrapTuple( this.tupleHasher, arguments.selectTupleCopy( groupingFields ) );
332
333      Context context = functionCall.getContext();
334      Tuple[] functorContext = context.lru.get( key );
335
336      if( functorContext == null )
337        {
338        functorContext = new Tuple[ functors.length ];
339        context.lru.put( key, functorContext );
340        flowProcess.increment( Cache.Num_Keys_Missed, 1 );
341        }
342      else
343        {
344        flowProcess.increment( Cache.Num_Keys_Hit, 1 );
345        }
346
347      for( int i = 0; i < functors.length; i++ )
348        {
349        TupleViews.reset( context.arguments[ i ].getTuple(), arguments.getTuple() );
350        functorContext[ i ] = functors[ i ].aggregate( flowProcess, context.arguments[ i ], functorContext[ i ] );
351        }
352      }
353
354    @Override
355    public void flush( FlowProcess flowProcess, OperationCall<CompositeFunction.Context> operationCall )
356      {
357      // need to drain context
358      TupleEntryCollector collector = ( (FunctionCall) operationCall ).getOutputCollector();
359
360      Tuple result = operationCall.getContext().result;
361      Map<Tuple, Tuple[]> context = operationCall.getContext().lru;
362
363      for( Map.Entry<Tuple, Tuple[]> entry : context.entrySet() )
364        completeFunctors( flowProcess, collector, result, entry );
365
366      context.clear();
367      }
368
369    @Override
370    public void cleanup( FlowProcess flowProcess, OperationCall<Context> operationCall )
371      {
372      operationCall.setContext( null );
373      }
374
375    private void completeFunctors( FlowProcess flowProcess, TupleEntryCollector outputCollector, Tuple result, Map.Entry<Tuple, Tuple[]> entry )
376      {
377      Tuple[] results = new Tuple[ functors.length + 1 ];
378
379      results[ 0 ] = entry.getKey();
380
381      Tuple[] values = entry.getValue();
382
383      for( int i = 0; i < functors.length; i++ )
384        results[ i + 1 ] = functors[ i ].complete( flowProcess, values[ i ] );
385
386      TupleViews.reset( result, results );
387
388      outputCollector.add( result );
389      }
390
391    @Override
392    public boolean equals( Object object )
393      {
394      if( this == object )
395        return true;
396      if( !( object instanceof CompositeFunction ) )
397        return false;
398      if( !super.equals( object ) )
399        return false;
400
401      CompositeFunction that = (CompositeFunction) object;
402
403      if( !Arrays.equals( argumentFields, that.argumentFields ) )
404        return false;
405      if( !Arrays.equals( functorFields, that.functorFields ) )
406        return false;
407      if( !Arrays.equals( functors, that.functors ) )
408        return false;
409      if( groupingFields != null ? !groupingFields.equals( that.groupingFields ) : that.groupingFields != null )
410        return false;
411
412      return true;
413      }
414
415    @Override
416    public int hashCode()
417      {
418      int result = super.hashCode();
419      result = 31 * result + ( groupingFields != null ? groupingFields.hashCode() : 0 );
420      result = 31 * result + ( argumentFields != null ? Arrays.hashCode( argumentFields ) : 0 );
421      result = 31 * result + ( functorFields != null ? Arrays.hashCode( functorFields ) : 0 );
422      result = 31 * result + ( functors != null ? Arrays.hashCode( functors ) : 0 );
423      return result;
424      }
425    }
426
427  /**
428   * Constructor CompositeAggregator creates a new CompositeAggregator instance.
429   *
430   * @param name     of type String
431   * @param capacity of type int
432   */
433  protected AggregateBy( String name, int capacity )
434    {
435    this.name = name;
436    this.capacity = capacity;
437    }
438
439  /**
440   * Constructor CompositeAggregator creates a new CompositeAggregator instance.
441   *
442   * @param argumentFields of type Fields
443   * @param functor        of type Functor
444   * @param aggregator     of type Aggregator
445   */
446  protected AggregateBy( Fields argumentFields, Functor functor, Aggregator aggregator )
447    {
448    this.argumentFields = Fields.fields( argumentFields );
449    this.functors = new Functor[]{functor};
450    this.aggregators = new Aggregator[]{aggregator};
451    }
452
453  /**
454   * Constructor CompositeAggregator creates a new CompositeAggregator instance.
455   *
456   * @param pipe           of type Pipe
457   * @param groupingFields of type Fields
458   * @param assemblies     of type CompositeAggregator...
459   */
460  @ConstructorProperties({"pipe", "groupingFields", "assemblies"})
461  public AggregateBy( Pipe pipe, Fields groupingFields, AggregateBy... assemblies )
462    {
463    this( null, Pipe.pipes( pipe ), groupingFields, 0, assemblies );
464    }
465
466  /**
467   * Constructor CompositeAggregator creates a new CompositeAggregator instance.
468   *
469   * @param pipe           of type Pipe
470   * @param groupingFields of type Fields
471   * @param capacity       of type int
472   * @param assemblies     of type CompositeAggregator...
473   */
474  @ConstructorProperties({"pipe", "groupingFields", "capacity", "assemblies"})
475  public AggregateBy( Pipe pipe, Fields groupingFields, int capacity, AggregateBy... assemblies )
476    {
477    this( null, Pipe.pipes( pipe ), groupingFields, capacity, assemblies );
478    }
479
480  /**
481   * Constructor CompositeAggregator creates a new CompositeAggregator instance.
482   *
483   * @param pipe           of type Pipe
484   * @param groupingFields of type Fields
485   * @param capacity       of type int
486   * @param assemblies     of type CompositeAggregator...
487   */
488  @ConstructorProperties({"name", "pipe", "groupingFields", "capacity", "assemblies"})
489  public AggregateBy( String name, Pipe pipe, Fields groupingFields, int capacity, AggregateBy... assemblies )
490    {
491    this( name, Pipe.pipes( pipe ), groupingFields, capacity, assemblies );
492    }
493
494  /**
495   * Constructor CompositeAggregator creates a new CompositeAggregator instance.
496   *
497   * @param name           of type String
498   * @param pipes          of type Pipe[]
499   * @param groupingFields of type Fields
500   * @param assemblies     of type CompositeAggregator...
501   */
502  @ConstructorProperties({"name", "pipes", "groupingFields", "assemblies"})
503  public AggregateBy( String name, Pipe[] pipes, Fields groupingFields, AggregateBy... assemblies )
504    {
505    this( name, pipes, groupingFields, 0, assemblies );
506    }
507
508  /**
509   * Constructor CompositeAggregator creates a new CompositeAggregator instance.
510   *
511   * @param name           of type String
512   * @param pipes          of type Pipe[]
513   * @param groupingFields of type Fields
514   * @param capacity       of type int
515   * @param assemblies     of type CompositeAggregator...
516   */
517  @ConstructorProperties({"name", "pipes", "groupingFields", "capacity", "assemblies"})
518  public AggregateBy( String name, Pipe[] pipes, Fields groupingFields, int capacity, AggregateBy... assemblies )
519    {
520    this( name, capacity );
521
522    List<Fields> arguments = new ArrayList<Fields>();
523    List<Functor> functors = new ArrayList<Functor>();
524    List<Aggregator> aggregators = new ArrayList<Aggregator>();
525
526    for( int i = 0; i < assemblies.length; i++ )
527      {
528      AggregateBy assembly = assemblies[ i ];
529
530      Collections.addAll( arguments, assembly.getArgumentFields() );
531      Collections.addAll( functors, assembly.getFunctors() );
532      Collections.addAll( aggregators, assembly.getAggregators() );
533      }
534
535    initialize( groupingFields, pipes, arguments.toArray( new Fields[ arguments.size() ] ), functors.toArray( new Functor[ functors.size() ] ), aggregators.toArray( new Aggregator[ aggregators.size() ] ) );
536    }
537
538  protected AggregateBy( String name, Pipe[] pipes, Fields groupingFields, Fields argumentFields, Functor functor, Aggregator aggregator, int capacity )
539    {
540    this( name, capacity );
541    initialize( groupingFields, pipes, argumentFields, functor, aggregator );
542    }
543
544  protected void initialize( Fields groupingFields, Pipe[] pipes, Fields argumentFields, Functor functor, Aggregator aggregator )
545    {
546    initialize( groupingFields, pipes, Fields.fields( argumentFields ),
547      new Functor[]{functor},
548      new Aggregator[]{aggregator} );
549    }
550
551  protected void initialize( Fields groupingFields, Pipe[] pipes, Fields[] argumentFields, Functor[] functors, Aggregator[] aggregators )
552    {
553    setPrevious( pipes );
554
555    this.groupingFields = groupingFields;
556    this.argumentFields = argumentFields;
557    this.functors = functors;
558    this.aggregators = aggregators;
559
560    verify();
561
562    Fields sortFields = Fields.copyComparators( Fields.merge( this.argumentFields ), this.argumentFields );
563    Fields argumentSelector = Fields.merge( this.groupingFields, sortFields );
564
565    if( argumentSelector.equals( Fields.NONE ) )
566      argumentSelector = Fields.ALL;
567
568    Pipe[] functions = new Pipe[ pipes.length ];
569
570    CompositeFunction function = new CompositeFunction( this.groupingFields, this.argumentFields, this.functors, capacity );
571
572    for( int i = 0; i < functions.length; i++ )
573      functions[ i ] = new Each( pipes[ i ], argumentSelector, function, Fields.RESULTS );
574
575    groupBy = new GroupBy( name, functions, this.groupingFields, sortFields.hasComparators() ? sortFields : null );
576
577    Pipe pipe = groupBy;
578
579    for( int i = 0; i < aggregators.length; i++ )
580      pipe = new Every( pipe, this.functors[ i ].getDeclaredFields(), this.aggregators[ i ], Fields.ALL );
581
582    setTails( pipe );
583    }
584
585  /** Method verify should be overridden by sub-classes if any values must be tested before the calling constructor returns. */
586  protected void verify()
587    {
588
589    }
590
591  /**
592   * Method getGroupingFields returns the Fields this instances will be grouping against.
593   *
594   * @return the current grouping fields
595   */
596  public Fields getGroupingFields()
597    {
598    return groupingFields;
599    }
600
601  /**
602   * Method getFieldDeclarations returns an array of Fields where each Field element in the array corresponds to the
603   * field declaration of the given Aggregator operations.
604   * <p/>
605   * Note the actual Fields values are returned, not planner resolved Fields.
606   *
607   * @return and array of Fields
608   */
609  public Fields[] getFieldDeclarations()
610    {
611    Fields[] fields = new Fields[ this.aggregators.length ];
612
613    for( int i = 0; i < aggregators.length; i++ )
614      fields[ i ] = aggregators[ i ].getFieldDeclaration();
615
616    return fields;
617    }
618
619  protected Fields[] getArgumentFields()
620    {
621    return argumentFields;
622    }
623
624  protected Functor[] getFunctors()
625    {
626    return functors;
627    }
628
629  protected Aggregator[] getAggregators()
630    {
631    return aggregators;
632    }
633
634  /**
635   * Method getGroupBy returns the internal {@link GroupBy} instance so that any custom properties
636   * can be set on it via {@link cascading.pipe.Pipe#getStepConfigDef()}.
637   *
638   * @return GroupBy type
639   */
640  public GroupBy getGroupBy()
641    {
642    return groupBy;
643    }
644
645  @Property(name = "capacity", visibility = Visibility.PUBLIC)
646  @PropertyDescription("Capacity of the aggregation cache.")
647  @PropertyConfigured(value = AggregateByProps.AGGREGATE_BY_CAPACITY, defaultValue = "10000")
648  public int getCapacity()
649    {
650    return capacity;
651    }
652  }