001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.pipe.assembly;
022    
023    import java.beans.ConstructorProperties;
024    import java.io.Serializable;
025    import java.util.ArrayList;
026    import java.util.Arrays;
027    import java.util.Collections;
028    import java.util.LinkedHashMap;
029    import java.util.List;
030    import java.util.Map;
031    
032    import cascading.flow.FlowProcess;
033    import cascading.management.annotation.Property;
034    import cascading.management.annotation.PropertyConfigured;
035    import cascading.management.annotation.PropertyDescription;
036    import cascading.management.annotation.Visibility;
037    import cascading.operation.Aggregator;
038    import cascading.operation.BaseOperation;
039    import cascading.operation.Function;
040    import cascading.operation.FunctionCall;
041    import cascading.operation.OperationCall;
042    import cascading.pipe.Each;
043    import cascading.pipe.Every;
044    import cascading.pipe.GroupBy;
045    import cascading.pipe.Pipe;
046    import cascading.pipe.SubAssembly;
047    import cascading.tuple.Fields;
048    import cascading.tuple.Tuple;
049    import cascading.tuple.TupleEntry;
050    import cascading.tuple.TupleEntryCollector;
051    import cascading.tuple.util.TupleViews;
052    import org.slf4j.Logger;
053    import org.slf4j.LoggerFactory;
054    
055    /**
056     * Class AggregateBy is a {@link SubAssembly} that serves two roles for handling aggregate operations.
057     * <p/>
058     * The first role is as a base class for composable aggregate operations that have a MapReduce Map side optimization for the
059     * Reduce side aggregation. For example 'summing' a value within a grouping can be performed partially Map side and
060     * completed Reduce side. Summing is associative and commutative.
061     * <p/>
062     * AggregateBy also supports operations that are not associative/commutative like 'counting'. Counting
063     * would result in 'counting' value occurrences Map side but summing those counts Reduce side. (Yes, counting can be
064     * transposed to summing Map and Reduce sides by emitting 1's before the first sum, but that's three operations over
065     * two, and a hack)
066     * <p/>
067     * Think of this mechanism as a MapReduce Combiner, but more efficient as no values are serialized,
068     * deserialized, saved to disk, and multi-pass sorted in the process, which consume cpu resources in trade of
069     * memory and a little or no IO.
070     * <p/>
071     * Further, Combiners are limited to only associative/commutative operations.
072     * <p/>
073     * Additionally the Cascading planner can move the Map side optimization
074     * to the previous Reduce operation further increasing IO performance (between the preceding Reduce and Map phase which
075     * is over HDFS).
076     * <p/>
077     * The second role of the AggregateBy class is to allow for composition of AggregateBy
078     * sub-classes. That is, {@link SumBy} and {@link CountBy} AggregateBy sub-classes can be performed
079     * in parallel on the same grouping keys.
080     * </p>
081     * Custom AggregateBy classes can be created by sub-classing this class and implementing a special
082     * {@link Functor} for use on the Map side. Multiple Functor instances are managed by the {@link CompositeFunction}
083     * class allowing them all to share the same LRU value map for more efficiency.
084     * <p/>
085     * AggregateBy instances return {@code argumentFields} which are used internally to control the values passed to
086     * internal Functor instances. If any argumentFields also have {@link java.util.Comparator}s, they will be used
087     * to for secondary sorting (see {@link GroupBy} {@code sortFields}. This feature is used by {@link FirstBy} to
088     * control which Tuple is seen first for a grouping.
089     * <p/>
090     * <p/>
091     * To tune the LRU, set the {@code threshold} value to a high enough value to utilize available memory. Or set a
092     * default value via the {@link #AGGREGATE_BY_THRESHOLD} property. The current default ({@link CompositeFunction#DEFAULT_THRESHOLD})
093     * is {@code 10, 000} unique keys. Note "flushes" from the LRU will be logged in threshold increments along with memory
094     * information.
095     * <p/>
096     * Note using a AggregateBy instance automatically inserts a {@link GroupBy} into the resulting {@link cascading.flow.Flow}.
097     * And passing multiple AggregateBy instances to a parent AggregateBy instance still results in one GroupBy.
098     * <p/>
099     * Also note that {@link Unique} is not a CompositeAggregator and is slightly more optimized internally.
100     * <p/>
101     * Keep in mind the {@link cascading.tuple.Hasher} interface is not honored here (for storing keys in the cache). Thus
102     * arrays of primitives and object, like {@code byte[]} will not be properly stored. This is a known issue and will
103     * be resolved in a future release.
104     *
105     * @see SumBy
106     * @see CountBy
107     * @see Unique
108     */
109    public class AggregateBy extends SubAssembly
110      {
111      private static final Logger LOG = LoggerFactory.getLogger( AggregateBy.class );
112    
113      public static final int USE_DEFAULT_THRESHOLD = 0;
114      public static final int DEFAULT_THRESHOLD = CompositeFunction.DEFAULT_THRESHOLD;
115      public static final String AGGREGATE_BY_THRESHOLD = "cascading.aggregateby.threshold";
116    
117      private String name;
118      private int threshold;
119      private Fields groupingFields;
120      private Fields[] argumentFields;
121      private Functor[] functors;
122      private Aggregator[] aggregators;
123      private transient GroupBy groupBy;
124    
125      public enum Flush
126        {
127          Num_Keys_Flushed
128        }
129    
130      /**
131       * Interface Functor provides a means to create a simple function for use with the {@link CompositeFunction} class.
132       * <p/>
133       * Note the {@link FlowProcess} argument provides access to the underlying properties and counter APIs.
134       */
135      public interface Functor extends Serializable
136        {
137        /**
138         * Method getDeclaredFields returns the declaredFields of this Functor object.
139         *
140         * @return the declaredFields (type Fields) of this Functor object.
141         */
142        Fields getDeclaredFields();
143    
144        /**
145         * Method aggregate operates on the given args in tandem (optionally) with the given context values.
146         * <p/>
147         * The context argument is the result of the previous call to this method. Use it to store values between aggregate
148         * calls (the current count, or sum of the args).
149         * <p/>
150         * On the very first invocation of aggregate for a given grouping key, context will be {@code null}. All subsequent
151         * invocations context will be the value returned on the previous invocation.
152         *
153         * @param flowProcess of type FlowProcess
154         * @param args        of type TupleEntry
155         * @param context     of type Tuple   @return Tuple
156         */
157        Tuple aggregate( FlowProcess flowProcess, TupleEntry args, Tuple context );
158    
159        /**
160         * Method complete allows the final aggregate computation to be performed before the return value is collected.
161         * <p/>
162         * The number of values in the returned {@link Tuple} instance must match the number of declaredFields.
163         * <p/>
164         * It is safe to return the context object as the result value.
165         *
166         * @param flowProcess of type FlowProcess
167         * @param context     of type Tuple  @return Tuple
168         */
169        Tuple complete( FlowProcess flowProcess, Tuple context );
170        }
171    
172      /**
173       * Class CompositeFunction takes multiple Functor instances and manages them as a single {@link Function}.
174       *
175       * @see Functor
176       */
177      public static class CompositeFunction extends BaseOperation<CompositeFunction.Context> implements Function<CompositeFunction.Context>
178        {
179        public static final int DEFAULT_THRESHOLD = 10000;
180    
181        private int threshold = 0;
182        private final Fields groupingFields;
183        private final Fields[] argumentFields;
184        private final Fields[] functorFields;
185        private final Functor[] functors;
186    
187        public static class Context
188          {
189          LinkedHashMap<Tuple, Tuple[]> lru;
190          TupleEntry[] arguments;
191          Tuple result;
192          }
193    
194        /**
195         * Constructor CompositeFunction creates a new CompositeFunction instance.
196         *
197         * @param groupingFields of type Fields
198         * @param argumentFields of type Fields
199         * @param functor        of type Functor
200         * @param threshold      of type int
201         */
202        public CompositeFunction( Fields groupingFields, Fields argumentFields, Functor functor, int threshold )
203          {
204          this( groupingFields, Fields.fields( argumentFields ), new Functor[]{functor}, threshold );
205          }
206    
207        /**
208         * Constructor CompositeFunction creates a new CompositeFunction instance.
209         *
210         * @param groupingFields of type Fields
211         * @param argumentFields of type Fields[]
212         * @param functors       of type Functor[]
213         * @param threshold      of type int
214         */
215        public CompositeFunction( Fields groupingFields, Fields[] argumentFields, Functor[] functors, int threshold )
216          {
217          super( getFields( groupingFields, functors ) ); // todo: groupingFields should lookup incoming type information
218          this.groupingFields = groupingFields;
219          this.argumentFields = argumentFields;
220          this.functors = functors;
221          this.threshold = threshold;
222    
223          this.functorFields = new Fields[ functors.length ];
224    
225          for( int i = 0; i < functors.length; i++ )
226            this.functorFields[ i ] = functors[ i ].getDeclaredFields();
227          }
228    
229        private static Fields getFields( Fields groupingFields, Functor[] functors )
230          {
231          Fields fields = groupingFields;
232    
233          for( Functor functor : functors )
234            fields = fields.append( functor.getDeclaredFields() );
235    
236          return fields;
237          }
238    
239        @Override
240        public void prepare( final FlowProcess flowProcess, final OperationCall<CompositeFunction.Context> operationCall )
241          {
242          if( threshold == 0 )
243            {
244            Integer value = flowProcess.getIntegerProperty( AGGREGATE_BY_THRESHOLD );
245    
246            if( value != null && value > 0 )
247              threshold = value;
248            else
249              threshold = DEFAULT_THRESHOLD;
250            }
251    
252          LOG.info( "using threshold value: {}", threshold );
253    
254          Fields[] fields = new Fields[ functors.length + 1 ];
255    
256          fields[ 0 ] = groupingFields;
257    
258          for( int i = 0; i < functors.length; i++ )
259            fields[ i + 1 ] = functors[ i ].getDeclaredFields();
260    
261          final Context context = new Context();
262    
263          context.arguments = new TupleEntry[ functors.length ];
264    
265          for( int i = 0; i < context.arguments.length; i++ )
266            {
267            Fields resolvedArgumentFields = operationCall.getArgumentFields();
268    
269            int[] pos;
270    
271            if( argumentFields[ i ].isAll() )
272              pos = resolvedArgumentFields.getPos();
273            else
274              pos = resolvedArgumentFields.getPos( argumentFields[ i ] ); // returns null if selector is ALL
275    
276            Tuple narrow = TupleViews.createNarrow( pos );
277    
278            Fields currentFields;
279    
280            if( this.argumentFields[ i ].isSubstitution() )
281              currentFields = resolvedArgumentFields.select( this.argumentFields[ i ] ); // attempt to retain comparator
282            else
283              currentFields = Fields.asDeclaration( this.argumentFields[ i ] );
284    
285            context.arguments[ i ] = new TupleEntry( currentFields, narrow );
286            }
287    
288          context.result = TupleViews.createComposite( fields );
289    
290          context.lru = new LinkedHashMap<Tuple, Tuple[]>( threshold, 0.75f, true )
291          {
292          long flushes = 0;
293    
294          @Override
295          protected boolean removeEldestEntry( Map.Entry<Tuple, Tuple[]> eldest )
296            {
297            boolean doRemove = size() > threshold;
298    
299            if( doRemove )
300              {
301              completeFunctors( flowProcess, ( (FunctionCall) operationCall ).getOutputCollector(), context.result, eldest );
302              flowProcess.increment( Flush.Num_Keys_Flushed, 1 );
303    
304              if( flushes % threshold == 0 ) // every multiple, write out data
305                {
306                Runtime runtime = Runtime.getRuntime();
307                long freeMem = runtime.freeMemory() / 1024 / 1024;
308                long maxMem = runtime.maxMemory() / 1024 / 1024;
309                long totalMem = runtime.totalMemory() / 1024 / 1024;
310    
311                LOG.info( "flushed keys num times: {}, with threshold: {}", flushes + 1, threshold );
312                LOG.info( "mem on flush (mb), free: " + freeMem + ", total: " + totalMem + ", max: " + maxMem );
313    
314                float percent = (float) totalMem / (float) maxMem;
315    
316                if( percent < 0.80F )
317                  LOG.info( "total mem is {}% of max mem, to better utilize unused memory consider increasing current LRU threshold with system property \"{}\"", (int) ( percent * 100.0F ), AGGREGATE_BY_THRESHOLD );
318                }
319    
320              flushes++;
321              }
322    
323            return doRemove;
324            }
325          };
326    
327          operationCall.setContext( context );
328          }
329    
330        @Override
331        public void operate( FlowProcess flowProcess, FunctionCall<CompositeFunction.Context> functionCall )
332          {
333          TupleEntry arguments = functionCall.getArguments();
334          Tuple key = arguments.selectTupleCopy( groupingFields );
335    
336          Context context = functionCall.getContext();
337          Tuple[] functorContext = context.lru.get( key );
338    
339          if( functorContext == null )
340            {
341            functorContext = new Tuple[ functors.length ];
342            context.lru.put( key, functorContext );
343            }
344    
345          for( int i = 0; i < functors.length; i++ )
346            {
347            TupleViews.reset( context.arguments[ i ].getTuple(), arguments.getTuple() );
348            functorContext[ i ] = functors[ i ].aggregate( flowProcess, context.arguments[ i ], functorContext[ i ] );
349            }
350          }
351    
352        @Override
353        public void flush( FlowProcess flowProcess, OperationCall<CompositeFunction.Context> operationCall )
354          {
355          // need to drain context
356          TupleEntryCollector collector = ( (FunctionCall) operationCall ).getOutputCollector();
357    
358          Tuple result = operationCall.getContext().result;
359          LinkedHashMap<Tuple, Tuple[]> context = operationCall.getContext().lru;
360    
361          for( Map.Entry<Tuple, Tuple[]> entry : context.entrySet() )
362            completeFunctors( flowProcess, collector, result, entry );
363    
364          operationCall.setContext( null );
365          }
366    
367        private void completeFunctors( FlowProcess flowProcess, TupleEntryCollector outputCollector, Tuple result, Map.Entry<Tuple, Tuple[]> entry )
368          {
369          Tuple[] results = new Tuple[ functors.length + 1 ];
370    
371          results[ 0 ] = entry.getKey();
372    
373          Tuple[] values = entry.getValue();
374    
375          for( int i = 0; i < functors.length; i++ )
376            results[ i + 1 ] = functors[ i ].complete( flowProcess, values[ i ] );
377    
378          TupleViews.reset( result, results );
379    
380          outputCollector.add( result );
381          }
382    
383        @Override
384        public boolean equals( Object object )
385          {
386          if( this == object )
387            return true;
388          if( !( object instanceof CompositeFunction ) )
389            return false;
390          if( !super.equals( object ) )
391            return false;
392    
393          CompositeFunction that = (CompositeFunction) object;
394    
395          if( !Arrays.equals( argumentFields, that.argumentFields ) )
396            return false;
397          if( !Arrays.equals( functorFields, that.functorFields ) )
398            return false;
399          if( !Arrays.equals( functors, that.functors ) )
400            return false;
401          if( groupingFields != null ? !groupingFields.equals( that.groupingFields ) : that.groupingFields != null )
402            return false;
403    
404          return true;
405          }
406    
407        @Override
408        public int hashCode()
409          {
410          int result = super.hashCode();
411          result = 31 * result + ( groupingFields != null ? groupingFields.hashCode() : 0 );
412          result = 31 * result + ( argumentFields != null ? Arrays.hashCode( argumentFields ) : 0 );
413          result = 31 * result + ( functorFields != null ? Arrays.hashCode( functorFields ) : 0 );
414          result = 31 * result + ( functors != null ? Arrays.hashCode( functors ) : 0 );
415          return result;
416          }
417        }
418    
419      /**
420       * Constructor CompositeAggregator creates a new CompositeAggregator instance.
421       *
422       * @param name      of type String
423       * @param threshold of type int
424       */
425      protected AggregateBy( String name, int threshold )
426        {
427        this.name = name;
428        this.threshold = threshold;
429        }
430    
431      /**
432       * Constructor CompositeAggregator creates a new CompositeAggregator instance.
433       *
434       * @param argumentFields of type Fields
435       * @param functor        of type Functor
436       * @param aggregator     of type Aggregator
437       */
438      protected AggregateBy( Fields argumentFields, Functor functor, Aggregator aggregator )
439        {
440        this.argumentFields = Fields.fields( argumentFields );
441        this.functors = new Functor[]{functor};
442        this.aggregators = new Aggregator[]{aggregator};
443        }
444    
445      /**
446       * Constructor CompositeAggregator creates a new CompositeAggregator instance.
447       *
448       * @param pipe           of type Pipe
449       * @param groupingFields of type Fields
450       * @param assemblies     of type CompositeAggregator...
451       */
452      @ConstructorProperties({"pipe", "groupingFields", "assemblies"})
453      public AggregateBy( Pipe pipe, Fields groupingFields, AggregateBy... assemblies )
454        {
455        this( null, Pipe.pipes( pipe ), groupingFields, 0, assemblies );
456        }
457    
458      /**
459       * Constructor CompositeAggregator creates a new CompositeAggregator instance.
460       *
461       * @param pipe           of type Pipe
462       * @param groupingFields of type Fields
463       * @param threshold      of type int
464       * @param assemblies     of type CompositeAggregator...
465       */
466      @ConstructorProperties({"pipe", "groupingFields", "threshold", "assemblies"})
467      public AggregateBy( Pipe pipe, Fields groupingFields, int threshold, AggregateBy... assemblies )
468        {
469        this( null, Pipe.pipes( pipe ), groupingFields, threshold, assemblies );
470        }
471    
472      /**
473       * Constructor CompositeAggregator creates a new CompositeAggregator instance.
474       *
475       * @param pipe           of type Pipe
476       * @param groupingFields of type Fields
477       * @param threshold      of type int
478       * @param assemblies     of type CompositeAggregator...
479       */
480      @ConstructorProperties({"name", "pipe", "groupingFields", "threshold", "assemblies"})
481      public AggregateBy( String name, Pipe pipe, Fields groupingFields, int threshold, AggregateBy... assemblies )
482        {
483        this( name, Pipe.pipes( pipe ), groupingFields, threshold, assemblies );
484        }
485    
486      /**
487       * Constructor CompositeAggregator creates a new CompositeAggregator instance.
488       *
489       * @param name           of type String
490       * @param pipes          of type Pipe[]
491       * @param groupingFields of type Fields
492       * @param assemblies     of type CompositeAggregator...
493       */
494      @ConstructorProperties({"name", "pipes", "groupingFields", "assemblies"})
495      public AggregateBy( String name, Pipe[] pipes, Fields groupingFields, AggregateBy... assemblies )
496        {
497        this( name, pipes, groupingFields, 0, assemblies );
498        }
499    
500      /**
501       * Constructor CompositeAggregator creates a new CompositeAggregator instance.
502       *
503       * @param name           of type String
504       * @param pipes          of type Pipe[]
505       * @param groupingFields of type Fields
506       * @param threshold      of type int
507       * @param assemblies     of type CompositeAggregator...
508       */
509      @ConstructorProperties({"name", "pipes", "groupingFields", "threshold", "assemblies"})
510      public AggregateBy( String name, Pipe[] pipes, Fields groupingFields, int threshold, AggregateBy... assemblies )
511        {
512        this( name, threshold );
513    
514        List<Fields> arguments = new ArrayList<Fields>();
515        List<Functor> functors = new ArrayList<Functor>();
516        List<Aggregator> aggregators = new ArrayList<Aggregator>();
517    
518        for( int i = 0; i < assemblies.length; i++ )
519          {
520          AggregateBy assembly = assemblies[ i ];
521    
522          Collections.addAll( arguments, assembly.getArgumentFields() );
523          Collections.addAll( functors, assembly.getFunctors() );
524          Collections.addAll( aggregators, assembly.getAggregators() );
525          }
526    
527        initialize( groupingFields, pipes, arguments.toArray( new Fields[ arguments.size() ] ), functors.toArray( new Functor[ functors.size() ] ), aggregators.toArray( new Aggregator[ aggregators.size() ] ) );
528        }
529    
530      protected AggregateBy( String name, Pipe[] pipes, Fields groupingFields, Fields argumentFields, Functor functor, Aggregator aggregator, int threshold )
531        {
532        this( name, threshold );
533        initialize( groupingFields, pipes, argumentFields, functor, aggregator );
534        }
535    
536      protected void initialize( Fields groupingFields, Pipe[] pipes, Fields argumentFields, Functor functor, Aggregator aggregator )
537        {
538        initialize( groupingFields, pipes, Fields.fields( argumentFields ),
539          new Functor[]{functor},
540          new Aggregator[]{aggregator} );
541        }
542    
543      protected void initialize( Fields groupingFields, Pipe[] pipes, Fields[] argumentFields, Functor[] functors, Aggregator[] aggregators )
544        {
545        setPrevious( pipes );
546    
547        this.groupingFields = groupingFields;
548        this.argumentFields = argumentFields;
549        this.functors = functors;
550        this.aggregators = aggregators;
551    
552        verify();
553    
554        Fields sortFields = Fields.copyComparators( Fields.merge( this.argumentFields ), this.argumentFields );
555        Fields argumentSelector = Fields.merge( this.groupingFields, sortFields );
556    
557        if( argumentSelector.equals( Fields.NONE ) )
558          argumentSelector = Fields.ALL;
559    
560        Pipe[] functions = new Pipe[ pipes.length ];
561    
562        CompositeFunction function = new CompositeFunction( this.groupingFields, this.argumentFields, this.functors, threshold );
563    
564        for( int i = 0; i < functions.length; i++ )
565          functions[ i ] = new Each( pipes[ i ], argumentSelector, function, Fields.RESULTS );
566    
567        groupBy = new GroupBy( name, functions, this.groupingFields, sortFields.hasComparators() ? sortFields : null );
568    
569        Pipe pipe = groupBy;
570    
571        for( int i = 0; i < aggregators.length; i++ )
572          pipe = new Every( pipe, this.functors[ i ].getDeclaredFields(), this.aggregators[ i ], Fields.ALL );
573    
574        setTails( pipe );
575        }
576    
577      /** Method verify should be overridden by sub-classes if any values must be tested before the calling constructor returns. */
578      protected void verify()
579        {
580    
581        }
582    
583      /**
584       * Method getGroupingFields returns the Fields this instances will be grouping against.
585       *
586       * @return the current grouping fields
587       */
588      public Fields getGroupingFields()
589        {
590        return groupingFields;
591        }
592    
593      /**
594       * Method getFieldDeclarations returns an array of Fields where each Field element in the array corresponds to the
595       * field declaration of the given Aggregator operations.
596       * <p/>
597       * Note the actual Fields values are returned, not planner resolved Fields.
598       *
599       * @return and array of Fields
600       */
601      public Fields[] getFieldDeclarations()
602        {
603        Fields[] fields = new Fields[ this.aggregators.length ];
604    
605        for( int i = 0; i < aggregators.length; i++ )
606          fields[ i ] = aggregators[ i ].getFieldDeclaration();
607    
608        return fields;
609        }
610    
611      protected Fields[] getArgumentFields()
612        {
613        return argumentFields;
614        }
615    
616      protected Functor[] getFunctors()
617        {
618        return functors;
619        }
620    
621      protected Aggregator[] getAggregators()
622        {
623        return aggregators;
624        }
625    
626      /**
627       * Method getGroupBy returns the internal {@link GroupBy} instance so that any custom properties
628       * can be set on it via {@link cascading.pipe.Pipe#getStepConfigDef()}.
629       *
630       * @return GroupBy type
631       */
632      public GroupBy getGroupBy()
633        {
634        return groupBy;
635        }
636    
637      @Property(name = "threshold", visibility = Visibility.PUBLIC)
638      @PropertyDescription("Threshold of the aggregation.")
639      @PropertyConfigured(value = AGGREGATE_BY_THRESHOLD, defaultValue = "10000")
640      public int getThreshold()
641        {
642        return threshold;
643        }
644      }