001/*
002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.pipe.assembly;
022
023import java.beans.ConstructorProperties;
024import java.lang.reflect.Type;
025
026import cascading.flow.FlowProcess;
027import cascading.operation.Aggregator;
028import cascading.operation.AggregatorCall;
029import cascading.operation.BaseOperation;
030import cascading.operation.OperationCall;
031import cascading.pipe.Pipe;
032import cascading.tuple.Fields;
033import cascading.tuple.Tuple;
034import cascading.tuple.TupleEntry;
035import cascading.tuple.coerce.Coercions;
036import cascading.tuple.type.CoercibleType;
037
038/**
039 * Class AverageBy is used to average values associated with duplicate keys in a tuple stream.
040 * <p/>
041 * Typically finding the average value in a tuple stream relies on a {@link cascading.pipe.GroupBy} and a {@link cascading.operation.aggregator.Average}
042 * {@link cascading.operation.Aggregator} operation.
043 * <p/>
044 * If the given {@code averageFields} has an associated type, this type will be used to coerce the resulting average value,
045 * otherwise the result will be a {@link Double}.
046 * <p/>
047 * If {@code include} is {@link Include#NO_NULLS}, {@code null} values will not be included in the average (converted to zero).
048 * By default (and for backwards compatibility) {@code null} values are included, {@link Include#ALL}.
049 * <p/>
050 * This SubAssembly uses the {@link cascading.pipe.assembly.AverageBy.AveragePartials} {@link cascading.pipe.assembly.AggregateBy.Functor}
051 * and private {@link AverageFinal} Aggregator to count and sum as many field values before the GroupBy operator to reduce IO over the network.
052 * <p/>
053 * This strategy is similar to using {@code combiners}, except no sorting or serialization is invoked and results
054 * in a much simpler mechanism.
055 * <p/>
056 * The {@code threshold} value tells the underlying AveragePartials functions how many unique key sums and counts to accumulate
057 * in the LRU cache, before emitting the least recently used entry. This accumulation happens map-side, and thus is
058 * bounded by the size of your map task JVM and the typical size of each group key.
059 * <p/>
060 * By default, either the value of {@link cascading.pipe.assembly.AggregateByProps#AGGREGATE_BY_CAPACITY} System property
061 * or {@link cascading.pipe.assembly.AggregateByProps#AGGREGATE_BY_DEFAULT_CAPACITY} will be used.
062 *
063 * @see cascading.pipe.assembly.AggregateBy
064 */
065public class AverageBy extends AggregateBy
066  {
067  public enum Include
068    {
069      ALL,
070      NO_NULLS
071    }
072
073  /**
074   * Class AveragePartials is a {@link cascading.pipe.assembly.AggregateBy.Functor} that is used to count and sum observed duplicates from the tuple stream.
075   *
076   * @see cascading.pipe.assembly.AverageBy
077   */
078  public static class AveragePartials implements Functor
079    {
080    private final Fields declaredFields;
081    private final Include include;
082
083    /**
084     * Constructor AveragePartials creates a new AveragePartials instance.
085     *
086     * @param declaredFields of type Fields
087     */
088    public AveragePartials( Fields declaredFields )
089      {
090      this.declaredFields = declaredFields;
091      this.include = Include.ALL;
092      }
093
094    public AveragePartials( Fields declaredFields, Include include )
095      {
096      this.declaredFields = declaredFields;
097      this.include = include;
098      }
099
100    @Override
101    public Fields getDeclaredFields()
102      {
103      Fields sumName = new Fields( AverageBy.class.getPackage().getName() + "." + declaredFields.get( 0 ) + ".sum", Double.class );
104      Fields countName = new Fields( AverageBy.class.getPackage().getName() + "." + declaredFields.get( 0 ) + ".count", Long.class );
105
106      return sumName.append( countName );
107      }
108
109    @Override
110    public Tuple aggregate( FlowProcess flowProcess, TupleEntry args, Tuple context )
111      {
112      if( context == null )
113        context = Tuple.size( 2 );
114
115      if( include == Include.NO_NULLS && args.getObject( 0 ) == null )
116        return context;
117
118      context.set( 0, context.getDouble( 0 ) + args.getDouble( 0 ) );
119      context.set( 1, context.getLong( 1 ) + 1 );
120
121      return context;
122      }
123
124    @Override
125    public Tuple complete( FlowProcess flowProcess, Tuple context )
126      {
127      return context;
128      }
129    }
130
131  /**
132   * Class AverageFinal is used to finalize the average operation on the Reduce side of the process. It must be used
133   * in tandem with a {@link AveragePartials} Functor.
134   */
135  public static class AverageFinal extends BaseOperation<AverageFinal.Context> implements Aggregator<AverageFinal.Context>
136    {
137    /** Class Context is used to hold intermediate values. */
138    protected static class Context
139      {
140      long nulls = 0L;
141      double sum = 0.0D;
142      long count = 0L;
143      Type type = Double.class;
144      CoercibleType canonical;
145
146      Tuple tuple = Tuple.size( 1 );
147
148      public Context( Fields fieldDeclaration )
149        {
150        if( fieldDeclaration.hasTypes() )
151          this.type = fieldDeclaration.getType( 0 );
152
153        this.canonical = Coercions.coercibleTypeFor( this.type );
154        }
155
156      public Context reset()
157        {
158        nulls = 0L;
159        sum = 0.0D;
160        count = 0L;
161        tuple.set( 0, null );
162
163        return this;
164        }
165
166      public Tuple result()
167        {
168        // we only saw null from upstream, so return null
169        if( count == 0 && nulls != 0 )
170          return tuple;
171
172        tuple.set( 0, canonical.canonical( sum / count ) );
173
174        return tuple;
175        }
176      }
177
178    /**
179     * Constructs a new instance that returns the average of the values encountered in the given fieldDeclaration field name.
180     *
181     * @param fieldDeclaration of type Fields
182     */
183    public AverageFinal( Fields fieldDeclaration )
184      {
185      super( 2, makeFieldDeclaration( fieldDeclaration ) );
186
187      if( !fieldDeclaration.isSubstitution() && fieldDeclaration.size() != 1 )
188        throw new IllegalArgumentException( "fieldDeclaration may only declare 1 field, got: " + fieldDeclaration.size() );
189      }
190
191    private static Fields makeFieldDeclaration( Fields fieldDeclaration )
192      {
193      if( fieldDeclaration.hasTypes() )
194        return fieldDeclaration;
195
196      return fieldDeclaration.applyTypes( Double.class );
197      }
198
199    @Override
200    public void prepare( FlowProcess flowProcess, OperationCall<Context> operationCall )
201      {
202      operationCall.setContext( new Context( getFieldDeclaration() ) );
203      }
204
205    @Override
206    public void start( FlowProcess flowProcess, AggregatorCall<Context> aggregatorCall )
207      {
208      aggregatorCall.getContext().reset();
209      }
210
211    @Override
212    public void aggregate( FlowProcess flowProcess, AggregatorCall<Context> aggregatorCall )
213      {
214      Context context = aggregatorCall.getContext();
215      TupleEntry arguments = aggregatorCall.getArguments();
216
217      if( arguments.getObject( 0 ) == null )
218        {
219        context.nulls++;
220        return;
221        }
222
223      context.sum += arguments.getDouble( 0 );
224      context.count += arguments.getLong( 1 );
225      }
226
227    @Override
228    public void complete( FlowProcess flowProcess, AggregatorCall<Context> aggregatorCall )
229      {
230      aggregatorCall.getOutputCollector().add( aggregatorCall.getContext().result() );
231      }
232    }
233
234  /////////
235
236  /**
237   * Constructor AverageBy creates a new AverageBy instance. Use this constructor when used with a {@link cascading.pipe.assembly.AggregateBy}
238   * instance.
239   *
240   * @param valueField   of type Fields
241   * @param averageField of type Fields
242   */
243  @ConstructorProperties({"valueField", "averageField"})
244  public AverageBy( Fields valueField, Fields averageField )
245    {
246    super( valueField, new AveragePartials( averageField ), new AverageFinal( averageField ) );
247    }
248
249  /**
250   * Constructor AverageBy creates a new AverageBy instance. Use this constructor when used with a {@link cascading.pipe.assembly.AggregateBy}
251   * instance.
252   *
253   * @param valueField   of type Fields
254   * @param averageField of type Fields
255   * @param include      of type boolean
256   */
257  @ConstructorProperties({"valueField", "averageField", "include"})
258  public AverageBy( Fields valueField, Fields averageField, Include include )
259    {
260    super( valueField, new AveragePartials( averageField, include ), new AverageFinal( averageField ) );
261    }
262
263  //////////////
264
265  /**
266   * Constructor AverageBy creates a new AverageBy instance.
267   *
268   * @param pipe           of type Pipe
269   * @param groupingFields of type Fields
270   * @param valueField     of type Fields
271   * @param averageField   of type Fields
272   */
273  @ConstructorProperties({"pipe", "groupingFields", "valueField", "averageField"})
274  public AverageBy( Pipe pipe, Fields groupingFields, Fields valueField, Fields averageField )
275    {
276    this( null, pipe, groupingFields, valueField, averageField, USE_DEFAULT_THRESHOLD );
277    }
278
279  /**
280   * Constructor AverageBy creates a new AverageBy instance.
281   *
282   * @param pipe           of type Pipe
283   * @param groupingFields of type Fields
284   * @param valueField     of type Fields
285   * @param averageField   of type Fields
286   * @param threshold      of type int
287   */
288  @ConstructorProperties({"pipe", "groupingFields", "valueField", "averageField", "threshold"})
289  public AverageBy( Pipe pipe, Fields groupingFields, Fields valueField, Fields averageField, int threshold )
290    {
291    this( null, pipe, groupingFields, valueField, averageField, threshold );
292    }
293
294  /**
295   * Constructor AverageBy creates a new AverageBy instance.
296   *
297   * @param name           of type String
298   * @param pipe           of type Pipe
299   * @param groupingFields of type Fields
300   * @param valueField     of type Fields
301   * @param averageField   of type Fields
302   */
303  @ConstructorProperties({"name", "pipe", "groupingFields", "valueField", "averageField"})
304  public AverageBy( String name, Pipe pipe, Fields groupingFields, Fields valueField, Fields averageField )
305    {
306    this( name, pipe, groupingFields, valueField, averageField, USE_DEFAULT_THRESHOLD );
307    }
308
309  /**
310   * Constructor AverageBy creates a new AverageBy instance.
311   *
312   * @param name           of type String
313   * @param pipe           of type Pipe
314   * @param groupingFields of type Fields
315   * @param valueField     of type Fields
316   * @param averageField   of type Fields
317   * @param threshold      of type int
318   */
319  @ConstructorProperties({"name", "pipe", "groupingFields", "valueField", "averageField", "threshold"})
320  public AverageBy( String name, Pipe pipe, Fields groupingFields, Fields valueField, Fields averageField, int threshold )
321    {
322    this( name, Pipe.pipes( pipe ), groupingFields, valueField, averageField, threshold );
323    }
324
325  /**
326   * Constructor AverageBy creates a new AverageBy instance.
327   *
328   * @param pipes          of type Pipe[]
329   * @param groupingFields of type Fields
330   * @param valueField     of type Fields
331   * @param averageField   of type Fields
332   */
333  @ConstructorProperties({"pipes", "groupingFields", "valueField", "averageField"})
334  public AverageBy( Pipe[] pipes, Fields groupingFields, Fields valueField, Fields averageField )
335    {
336    this( null, pipes, groupingFields, valueField, averageField, USE_DEFAULT_THRESHOLD );
337    }
338
339  /**
340   * Constructor AverageBy creates a new AverageBy instance.
341   *
342   * @param pipes          of type Pipe[]
343   * @param groupingFields of type Fields
344   * @param valueField     of type Fields
345   * @param averageField   of type Fields
346   * @param threshold      of type int
347   */
348  @ConstructorProperties({"pipes", "groupingFields", "valueField", "averageField", "threshold"})
349  public AverageBy( Pipe[] pipes, Fields groupingFields, Fields valueField, Fields averageField, int threshold )
350    {
351    this( null, pipes, groupingFields, valueField, averageField, threshold );
352    }
353
354  /**
355   * Constructor AverageBy creates a new AverageBy instance.
356   *
357   * @param name           of type String
358   * @param pipes          of type Pipe[]
359   * @param groupingFields of type Fields
360   * @param valueField     of type Fields
361   * @param averageField   of type Fields
362   */
363  @ConstructorProperties({"name", "pipes", "groupingFields", "valueField", "averageField"})
364  public AverageBy( String name, Pipe[] pipes, Fields groupingFields, Fields valueField, Fields averageField )
365    {
366    this( name, pipes, groupingFields, valueField, averageField, USE_DEFAULT_THRESHOLD );
367    }
368
369  /**
370   * Constructor AverageBy creates a new AverageBy instance.
371   *
372   * @param name           of type String
373   * @param pipes          of type Pipe[]
374   * @param groupingFields of type Fields
375   * @param valueField     of type Fields
376   * @param averageField   of type Fields
377   * @param threshold      of type int
378   */
379  @ConstructorProperties({"name", "pipes", "groupingFields", "valueField", "averageField", "threshold"})
380  public AverageBy( String name, Pipe[] pipes, Fields groupingFields, Fields valueField, Fields averageField, int threshold )
381    {
382    super( name, pipes, groupingFields, valueField, new AveragePartials( averageField ), new AverageFinal( averageField ), threshold );
383    }
384
385  /**
386   * Constructor AverageBy creates a new AverageBy instance.
387   *
388   * @param pipe           of type Pipe
389   * @param groupingFields of type Fields
390   * @param valueField     of type Fields
391   * @param averageField   of type Fields
392   * @param include        of type boolean
393   */
394  @ConstructorProperties({"pipe", "groupingFields", "valueField", "averageField", "include"})
395  public AverageBy( Pipe pipe, Fields groupingFields, Fields valueField, Fields averageField, Include include )
396    {
397    this( null, pipe, groupingFields, valueField, averageField, include, USE_DEFAULT_THRESHOLD );
398    }
399
400  /**
401   * Constructor AverageBy creates a new AverageBy instance.
402   *
403   * @param pipe           of type Pipe
404   * @param groupingFields of type Fields
405   * @param valueField     of type Fields
406   * @param averageField   of type Fields
407   * @param include        of type boolean
408   * @param threshold      of type int
409   */
410  @ConstructorProperties({"pipe", "groupingFields", "valueField", "averageField", "include", "threshold"})
411  public AverageBy( Pipe pipe, Fields groupingFields, Fields valueField, Fields averageField, Include include, int threshold )
412    {
413    this( null, pipe, groupingFields, valueField, averageField, include, threshold );
414    }
415
416  /**
417   * Constructor AverageBy creates a new AverageBy instance.
418   *
419   * @param name           of type String
420   * @param pipe           of type Pipe
421   * @param groupingFields of type Fields
422   * @param valueField     of type Fields
423   * @param averageField   of type Fields
424   * @param include        of type boolean
425   */
426  @ConstructorProperties({"name", "pipe", "groupingFields", "valueField", "averageField", "include"})
427  public AverageBy( String name, Pipe pipe, Fields groupingFields, Fields valueField, Fields averageField, Include include )
428    {
429    this( name, pipe, groupingFields, valueField, averageField, include, USE_DEFAULT_THRESHOLD );
430    }
431
432  /**
433   * Constructor AverageBy creates a new AverageBy instance.
434   *
435   * @param name           of type String
436   * @param pipe           of type Pipe
437   * @param groupingFields of type Fields
438   * @param valueField     of type Fields
439   * @param averageField   of type Fields
440   * @param include        of type boolean
441   * @param threshold      of type int
442   */
443  @ConstructorProperties({"name", "pipe", "groupingFields", "valueField", "averageField", "include", "threshold"})
444  public AverageBy( String name, Pipe pipe, Fields groupingFields, Fields valueField, Fields averageField, Include include, int threshold )
445    {
446    this( name, Pipe.pipes( pipe ), groupingFields, valueField, averageField, include, threshold );
447    }
448
449  /**
450   * Constructor AverageBy creates a new AverageBy instance.
451   *
452   * @param pipes          of type Pipe[]
453   * @param groupingFields of type Fields
454   * @param valueField     of type Fields
455   * @param averageField   of type Fields
456   * @param include        of type boolean
457   */
458  @ConstructorProperties({"pipes", "groupingFields", "valueField", "averageField", "include"})
459  public AverageBy( Pipe[] pipes, Fields groupingFields, Fields valueField, Fields averageField, Include include )
460    {
461    this( null, pipes, groupingFields, valueField, averageField, include, USE_DEFAULT_THRESHOLD );
462    }
463
464  /**
465   * Constructor AverageBy creates a new AverageBy instance.
466   *
467   * @param pipes          of type Pipe[]
468   * @param groupingFields of type Fields
469   * @param valueField     of type Fields
470   * @param averageField   of type Fields
471   * @param include        of type boolean
472   * @param threshold      of type int
473   */
474  @ConstructorProperties({"pipes", "groupingFields", "valueField", "averageField", "include", "threshold"})
475  public AverageBy( Pipe[] pipes, Fields groupingFields, Fields valueField, Fields averageField, Include include, int threshold )
476    {
477    this( null, pipes, groupingFields, valueField, averageField, include, threshold );
478    }
479
480  /**
481   * Constructor AverageBy creates a new AverageBy instance.
482   *
483   * @param name           of type String
484   * @param pipes          of type Pipe[]
485   * @param groupingFields of type Fields
486   * @param valueField     of type Fields
487   * @param averageField   of type Fields
488   * @param include        of type boolean
489   */
490  @ConstructorProperties({"name", "pipes", "groupingFields", "valueField", "averageField", "include"})
491  public AverageBy( String name, Pipe[] pipes, Fields groupingFields, Fields valueField, Fields averageField, Include include )
492    {
493    this( name, pipes, groupingFields, valueField, averageField, include, USE_DEFAULT_THRESHOLD );
494    }
495
496  /**
497   * Constructor AverageBy creates a new AverageBy instance.
498   *
499   * @param name           of type String
500   * @param pipes          of type Pipe[]
501   * @param groupingFields of type Fields
502   * @param valueField     of type Fields
503   * @param averageField   of type Fields
504   * @param include        of type boolean
505   * @param threshold      of type int
506   */
507  @ConstructorProperties({"name", "pipes", "groupingFields", "valueField", "averageField", "include", "threshold"})
508  public AverageBy( String name, Pipe[] pipes, Fields groupingFields, Fields valueField, Fields averageField, Include include, int threshold )
509    {
510    super( name, pipes, groupingFields, valueField, new AveragePartials( averageField, include ), new AverageFinal( averageField ), threshold );
511    }
512  }