001    /*
002     * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.pipe.assembly;
022    
023    import java.beans.ConstructorProperties;
024    import java.lang.reflect.Type;
025    
026    import cascading.flow.FlowProcess;
027    import cascading.operation.Aggregator;
028    import cascading.operation.AggregatorCall;
029    import cascading.operation.BaseOperation;
030    import cascading.operation.OperationCall;
031    import cascading.pipe.Pipe;
032    import cascading.tuple.Fields;
033    import cascading.tuple.Tuple;
034    import cascading.tuple.TupleEntry;
035    import cascading.tuple.coerce.Coercions;
036    import cascading.tuple.type.CoercibleType;
037    
038    /**
039     * Class AverageBy is used to average values associated with duplicate keys in a tuple stream.
040     * <p/>
041     * Typically finding the average value in a tuple stream relies on a {@link cascading.pipe.GroupBy} and a {@link cascading.operation.aggregator.Average}
042     * {@link cascading.operation.Aggregator} operation.
043     * <p/>
044     * If the given {@code averageFields} has an associated type, this type will be used to coerce the resulting average value,
045     * otherwise the result will be a {@link Double}.
046     * <p/>
047     * If {@code include} is {@link Include#NO_NULLS}, {@code null} values will not be included in the average (converted to zero).
048     * By default (and for backwards compatibility) {@code null} values are included, {@link Include#ALL}.
049     * <p/>
050     * This SubAssembly uses the {@link cascading.pipe.assembly.AverageBy.AveragePartials} {@link cascading.pipe.assembly.AggregateBy.Functor}
051     * and private {@link AverageFinal} Aggregator to count and sum as many field values before the GroupBy operator to reduce IO over the network.
052     * <p/>
053     * This strategy is similar to using {@code combiners}, except no sorting or serialization is invoked and results
054     * in a much simpler mechanism.
055     * <p/>
056     * The {@code threshold} value tells the underlying AveragePartials functions how many unique key sums and counts to accumulate
057     * in the LRU cache, before emitting the least recently used entry. This accumulation happens map-side, and thus is
058     * bounded by the size of your map task JVM and the typical size of each group key.
059     * <p/>
060     * By default, either the value of {@link cascading.pipe.assembly.AggregateByProps#AGGREGATE_BY_CAPACITY} System property
061     * or {@link cascading.pipe.assembly.AggregateByProps#AGGREGATE_BY_DEFAULT_CAPACITY} will be used.
062     *
063     * @see cascading.pipe.assembly.AggregateBy
064     */
065    public class AverageBy extends AggregateBy
066      {
067      /** DEFAULT_THRESHOLD */
068      @Deprecated
069      public static final int DEFAULT_THRESHOLD = 10000;
070    
071      public enum Include
072        {
073          ALL,
074          NO_NULLS
075        }
076    
077      /**
078       * Class AveragePartials is a {@link cascading.pipe.assembly.AggregateBy.Functor} that is used to count and sum observed duplicates from the tuple stream.
079       *
080       * @see cascading.pipe.assembly.AverageBy
081       */
082      public static class AveragePartials implements Functor
083        {
084        private final Fields declaredFields;
085        private final Include include;
086    
087        /**
088         * Constructor AveragePartials creates a new AveragePartials instance.
089         *
090         * @param declaredFields of type Fields
091         */
092        public AveragePartials( Fields declaredFields )
093          {
094          this.declaredFields = declaredFields;
095          this.include = Include.ALL;
096          }
097    
098        public AveragePartials( Fields declaredFields, Include include )
099          {
100          this.declaredFields = declaredFields;
101          this.include = include;
102          }
103    
104        @Override
105        public Fields getDeclaredFields()
106          {
107          Fields sumName = new Fields( AverageBy.class.getPackage().getName() + "." + declaredFields.get( 0 ) + ".sum", Double.class );
108          Fields countName = new Fields( AverageBy.class.getPackage().getName() + "." + declaredFields.get( 0 ) + ".count", Long.class );
109    
110          return sumName.append( countName );
111          }
112    
113        @Override
114        public Tuple aggregate( FlowProcess flowProcess, TupleEntry args, Tuple context )
115          {
116          if( context == null )
117            context = Tuple.size( 2 );
118    
119          if( include == Include.NO_NULLS && args.getObject( 0 ) == null )
120            return context;
121    
122          context.set( 0, context.getDouble( 0 ) + args.getDouble( 0 ) );
123          context.set( 1, context.getLong( 1 ) + 1 );
124    
125          return context;
126          }
127    
128        @Override
129        public Tuple complete( FlowProcess flowProcess, Tuple context )
130          {
131          return context;
132          }
133        }
134    
135      /**
136       * Class AverageFinal is used to finalize the average operation on the Reduce side of the process. It must be used
137       * in tandem with a {@link AveragePartials} Functor.
138       */
139      public static class AverageFinal extends BaseOperation<AverageFinal.Context> implements Aggregator<AverageFinal.Context>
140        {
141        /** Class Context is used to hold intermediate values. */
142        protected static class Context
143          {
144          long nulls = 0L;
145          double sum = 0.0D;
146          long count = 0L;
147          Type type = Double.class;
148          CoercibleType canonical;
149    
150          Tuple tuple = Tuple.size( 1 );
151    
152          public Context( Fields fieldDeclaration )
153            {
154            if( fieldDeclaration.hasTypes() )
155              this.type = fieldDeclaration.getType( 0 );
156    
157            this.canonical = Coercions.coercibleTypeFor( this.type );
158            }
159    
160          public Context reset()
161            {
162            nulls = 0L;
163            sum = 0.0D;
164            count = 0L;
165            tuple.set( 0, null );
166    
167            return this;
168            }
169    
170          public Tuple result()
171            {
172            // we only saw null from upstream, so return null
173            if( count == 0 && nulls != 0 )
174              return tuple;
175    
176            tuple.set( 0, canonical.canonical( sum / count ) );
177    
178            return tuple;
179            }
180          }
181    
182        /**
183         * Constructs a new instance that returns the average of the values encountered in the given fieldDeclaration field name.
184         *
185         * @param fieldDeclaration of type Fields
186         */
187        public AverageFinal( Fields fieldDeclaration )
188          {
189          super( 2, makeFieldDeclaration( fieldDeclaration ) );
190    
191          if( !fieldDeclaration.isSubstitution() && fieldDeclaration.size() != 1 )
192            throw new IllegalArgumentException( "fieldDeclaration may only declare 1 field, got: " + fieldDeclaration.size() );
193          }
194    
195        private static Fields makeFieldDeclaration( Fields fieldDeclaration )
196          {
197          if( fieldDeclaration.hasTypes() )
198            return fieldDeclaration;
199    
200          return fieldDeclaration.applyTypes( Double.class );
201          }
202    
203        @Override
204        public void prepare( FlowProcess flowProcess, OperationCall<Context> operationCall )
205          {
206          operationCall.setContext( new Context( getFieldDeclaration() ) );
207          }
208    
209        @Override
210        public void start( FlowProcess flowProcess, AggregatorCall<Context> aggregatorCall )
211          {
212          aggregatorCall.getContext().reset();
213          }
214    
215        @Override
216        public void aggregate( FlowProcess flowProcess, AggregatorCall<Context> aggregatorCall )
217          {
218          Context context = aggregatorCall.getContext();
219          TupleEntry arguments = aggregatorCall.getArguments();
220    
221          if( arguments.getObject( 0 ) == null )
222            {
223            context.nulls++;
224            return;
225            }
226    
227          context.sum += arguments.getDouble( 0 );
228          context.count += arguments.getLong( 1 );
229          }
230    
231        @Override
232        public void complete( FlowProcess flowProcess, AggregatorCall<Context> aggregatorCall )
233          {
234          aggregatorCall.getOutputCollector().add( aggregatorCall.getContext().result() );
235          }
236        }
237    
238      /////////
239    
240      /**
241       * Constructor AverageBy creates a new AverageBy instance. Use this constructor when used with a {@link cascading.pipe.assembly.AggregateBy}
242       * instance.
243       *
244       * @param valueField   of type Fields
245       * @param averageField of type Fields
246       */
247      @ConstructorProperties({"valueField", "averageField"})
248      public AverageBy( Fields valueField, Fields averageField )
249        {
250        super( valueField, new AveragePartials( averageField ), new AverageFinal( averageField ) );
251        }
252    
253      /**
254       * Constructor AverageBy creates a new AverageBy instance. Use this constructor when used with a {@link cascading.pipe.assembly.AggregateBy}
255       * instance.
256       *
257       * @param valueField   of type Fields
258       * @param averageField of type Fields
259       * @param include      of type boolean
260       */
261      @ConstructorProperties({"valueField", "averageField", "include"})
262      public AverageBy( Fields valueField, Fields averageField, Include include )
263        {
264        super( valueField, new AveragePartials( averageField, include ), new AverageFinal( averageField ) );
265        }
266    
267      //////////////
268    
269      /**
270       * Constructor AverageBy creates a new AverageBy instance.
271       *
272       * @param pipe           of type Pipe
273       * @param groupingFields of type Fields
274       * @param valueField     of type Fields
275       * @param averageField   of type Fields
276       */
277      @ConstructorProperties({"pipe", "groupingFields", "valueField", "averageField"})
278      public AverageBy( Pipe pipe, Fields groupingFields, Fields valueField, Fields averageField )
279        {
280        this( null, pipe, groupingFields, valueField, averageField, USE_DEFAULT_THRESHOLD );
281        }
282    
283      /**
284       * Constructor AverageBy creates a new AverageBy instance.
285       *
286       * @param pipe           of type Pipe
287       * @param groupingFields of type Fields
288       * @param valueField     of type Fields
289       * @param averageField   of type Fields
290       * @param threshold      of type int
291       */
292      @ConstructorProperties({"pipe", "groupingFields", "valueField", "averageField", "threshold"})
293      public AverageBy( Pipe pipe, Fields groupingFields, Fields valueField, Fields averageField, int threshold )
294        {
295        this( null, pipe, groupingFields, valueField, averageField, threshold );
296        }
297    
298      /**
299       * Constructor AverageBy creates a new AverageBy instance.
300       *
301       * @param name           of type String
302       * @param pipe           of type Pipe
303       * @param groupingFields of type Fields
304       * @param valueField     of type Fields
305       * @param averageField   of type Fields
306       */
307      @ConstructorProperties({"name", "pipe", "groupingFields", "valueField", "averageField"})
308      public AverageBy( String name, Pipe pipe, Fields groupingFields, Fields valueField, Fields averageField )
309        {
310        this( name, pipe, groupingFields, valueField, averageField, USE_DEFAULT_THRESHOLD );
311        }
312    
313      /**
314       * Constructor AverageBy creates a new AverageBy instance.
315       *
316       * @param name           of type String
317       * @param pipe           of type Pipe
318       * @param groupingFields of type Fields
319       * @param valueField     of type Fields
320       * @param averageField   of type Fields
321       * @param threshold      of type int
322       */
323      @ConstructorProperties({"name", "pipe", "groupingFields", "valueField", "averageField", "threshold"})
324      public AverageBy( String name, Pipe pipe, Fields groupingFields, Fields valueField, Fields averageField, int threshold )
325        {
326        this( name, Pipe.pipes( pipe ), groupingFields, valueField, averageField, threshold );
327        }
328    
329      /**
330       * Constructor AverageBy creates a new AverageBy instance.
331       *
332       * @param pipes          of type Pipe[]
333       * @param groupingFields of type Fields
334       * @param valueField     of type Fields
335       * @param averageField   of type Fields
336       */
337      @ConstructorProperties({"pipes", "groupingFields", "valueField", "averageField"})
338      public AverageBy( Pipe[] pipes, Fields groupingFields, Fields valueField, Fields averageField )
339        {
340        this( null, pipes, groupingFields, valueField, averageField, USE_DEFAULT_THRESHOLD );
341        }
342    
343      /**
344       * Constructor AverageBy creates a new AverageBy instance.
345       *
346       * @param pipes          of type Pipe[]
347       * @param groupingFields of type Fields
348       * @param valueField     of type Fields
349       * @param averageField   of type Fields
350       * @param threshold      of type int
351       */
352      @ConstructorProperties({"pipes", "groupingFields", "valueField", "averageField", "threshold"})
353      public AverageBy( Pipe[] pipes, Fields groupingFields, Fields valueField, Fields averageField, int threshold )
354        {
355        this( null, pipes, groupingFields, valueField, averageField, threshold );
356        }
357    
358      /**
359       * Constructor AverageBy creates a new AverageBy instance.
360       *
361       * @param name           of type String
362       * @param pipes          of type Pipe[]
363       * @param groupingFields of type Fields
364       * @param valueField     of type Fields
365       * @param averageField   of type Fields
366       */
367      @ConstructorProperties({"name", "pipes", "groupingFields", "valueField", "averageField"})
368      public AverageBy( String name, Pipe[] pipes, Fields groupingFields, Fields valueField, Fields averageField )
369        {
370        this( name, pipes, groupingFields, valueField, averageField, USE_DEFAULT_THRESHOLD );
371        }
372    
373      /**
374       * Constructor AverageBy creates a new AverageBy instance.
375       *
376       * @param name           of type String
377       * @param pipes          of type Pipe[]
378       * @param groupingFields of type Fields
379       * @param valueField     of type Fields
380       * @param averageField   of type Fields
381       * @param threshold      of type int
382       */
383      @ConstructorProperties({"name", "pipes", "groupingFields", "valueField", "averageField", "threshold"})
384      public AverageBy( String name, Pipe[] pipes, Fields groupingFields, Fields valueField, Fields averageField, int threshold )
385        {
386        super( name, pipes, groupingFields, valueField, new AveragePartials( averageField ), new AverageFinal( averageField ), threshold );
387        }
388    
389      /**
390       * Constructor AverageBy creates a new AverageBy instance.
391       *
392       * @param pipe           of type Pipe
393       * @param groupingFields of type Fields
394       * @param valueField     of type Fields
395       * @param averageField   of type Fields
396       * @param include        of type boolean
397       */
398      @ConstructorProperties({"pipe", "groupingFields", "valueField", "averageField", "include"})
399      public AverageBy( Pipe pipe, Fields groupingFields, Fields valueField, Fields averageField, Include include )
400        {
401        this( null, pipe, groupingFields, valueField, averageField, include, USE_DEFAULT_THRESHOLD );
402        }
403    
404      /**
405       * Constructor AverageBy creates a new AverageBy instance.
406       *
407       * @param pipe           of type Pipe
408       * @param groupingFields of type Fields
409       * @param valueField     of type Fields
410       * @param averageField   of type Fields
411       * @param include        of type boolean
412       * @param threshold      of type int
413       */
414      @ConstructorProperties({"pipe", "groupingFields", "valueField", "averageField", "include", "threshold"})
415      public AverageBy( Pipe pipe, Fields groupingFields, Fields valueField, Fields averageField, Include include, int threshold )
416        {
417        this( null, pipe, groupingFields, valueField, averageField, include, threshold );
418        }
419    
420      /**
421       * Constructor AverageBy creates a new AverageBy instance.
422       *
423       * @param name           of type String
424       * @param pipe           of type Pipe
425       * @param groupingFields of type Fields
426       * @param valueField     of type Fields
427       * @param averageField   of type Fields
428       * @param include        of type boolean
429       */
430      @ConstructorProperties({"name", "pipe", "groupingFields", "valueField", "averageField", "include"})
431      public AverageBy( String name, Pipe pipe, Fields groupingFields, Fields valueField, Fields averageField, Include include )
432        {
433        this( name, pipe, groupingFields, valueField, averageField, include, USE_DEFAULT_THRESHOLD );
434        }
435    
436      /**
437       * Constructor AverageBy creates a new AverageBy instance.
438       *
439       * @param name           of type String
440       * @param pipe           of type Pipe
441       * @param groupingFields of type Fields
442       * @param valueField     of type Fields
443       * @param averageField   of type Fields
444       * @param include        of type boolean
445       * @param threshold      of type int
446       */
447      @ConstructorProperties({"name", "pipe", "groupingFields", "valueField", "averageField", "include", "threshold"})
448      public AverageBy( String name, Pipe pipe, Fields groupingFields, Fields valueField, Fields averageField, Include include, int threshold )
449        {
450        this( name, Pipe.pipes( pipe ), groupingFields, valueField, averageField, include, threshold );
451        }
452    
453      /**
454       * Constructor AverageBy creates a new AverageBy instance.
455       *
456       * @param pipes          of type Pipe[]
457       * @param groupingFields of type Fields
458       * @param valueField     of type Fields
459       * @param averageField   of type Fields
460       * @param include        of type boolean
461       */
462      @ConstructorProperties({"pipes", "groupingFields", "valueField", "averageField", "include"})
463      public AverageBy( Pipe[] pipes, Fields groupingFields, Fields valueField, Fields averageField, Include include )
464        {
465        this( null, pipes, groupingFields, valueField, averageField, include, USE_DEFAULT_THRESHOLD );
466        }
467    
468      /**
469       * Constructor AverageBy creates a new AverageBy instance.
470       *
471       * @param pipes          of type Pipe[]
472       * @param groupingFields of type Fields
473       * @param valueField     of type Fields
474       * @param averageField   of type Fields
475       * @param include        of type boolean
476       * @param threshold      of type int
477       */
478      @ConstructorProperties({"pipes", "groupingFields", "valueField", "averageField", "include", "threshold"})
479      public AverageBy( Pipe[] pipes, Fields groupingFields, Fields valueField, Fields averageField, Include include, int threshold )
480        {
481        this( null, pipes, groupingFields, valueField, averageField, include, threshold );
482        }
483    
484      /**
485       * Constructor AverageBy creates a new AverageBy instance.
486       *
487       * @param name           of type String
488       * @param pipes          of type Pipe[]
489       * @param groupingFields of type Fields
490       * @param valueField     of type Fields
491       * @param averageField   of type Fields
492       * @param include        of type boolean
493       */
494      @ConstructorProperties({"name", "pipes", "groupingFields", "valueField", "averageField", "include"})
495      public AverageBy( String name, Pipe[] pipes, Fields groupingFields, Fields valueField, Fields averageField, Include include )
496        {
497        this( name, pipes, groupingFields, valueField, averageField, include, USE_DEFAULT_THRESHOLD );
498        }
499    
500      /**
501       * Constructor AverageBy creates a new AverageBy instance.
502       *
503       * @param name           of type String
504       * @param pipes          of type Pipe[]
505       * @param groupingFields of type Fields
506       * @param valueField     of type Fields
507       * @param averageField   of type Fields
508       * @param include        of type boolean
509       * @param threshold      of type int
510       */
511      @ConstructorProperties({"name", "pipes", "groupingFields", "valueField", "averageField", "include", "threshold"})
512      public AverageBy( String name, Pipe[] pipes, Fields groupingFields, Fields valueField, Fields averageField, Include include, int threshold )
513        {
514        super( name, pipes, groupingFields, valueField, new AveragePartials( averageField, include ), new AverageFinal( averageField ), threshold );
515        }
516      }