001/*
002 * Copyright (c) 2016-2017 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
004 *
005 * Project and contact information: http://www.cascading.org/
006 *
007 * This file is part of the Cascading project.
008 *
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *     http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 */
021
022package cascading.pipe.assembly;
023
024import java.beans.ConstructorProperties;
025import java.lang.reflect.Type;
026
027import cascading.flow.FlowProcess;
028import cascading.operation.Aggregator;
029import cascading.operation.AggregatorCall;
030import cascading.operation.BaseOperation;
031import cascading.operation.OperationCall;
032import cascading.pipe.Pipe;
033import cascading.tuple.Fields;
034import cascading.tuple.Tuple;
035import cascading.tuple.TupleEntry;
036import cascading.tuple.coerce.Coercions;
037import cascading.tuple.type.CoercibleType;
038
039/**
040 * Class AverageBy is used to average values associated with duplicate keys in a tuple stream.
041 * <p>
042 * Typically finding the average value in a tuple stream relies on a {@link cascading.pipe.GroupBy} and a {@link cascading.operation.aggregator.Average}
043 * {@link cascading.operation.Aggregator} operation.
044 * <p>
045 * If the given {@code averageFields} has an associated type, this type will be used to coerce the resulting average value,
046 * otherwise the result will be a {@link Double}.
047 * <p>
048 * If {@code include} is {@link Include#NO_NULLS}, {@code null} values will not be included in the average (converted to zero).
049 * By default (and for backwards compatibility) {@code null} values are included, {@link Include#ALL}.
050 * <p>
051 * This SubAssembly uses the {@link cascading.pipe.assembly.AverageBy.AveragePartials} {@link cascading.pipe.assembly.AggregateBy.Functor}
052 * and private {@link AverageFinal} Aggregator to count and sum as many field values before the GroupBy operator to reduce IO over the network.
053 * <p>
054 * This strategy is similar to using {@code combiners}, except no sorting or serialization is invoked and results
055 * in a much simpler mechanism.
056 * <p>
057 * The {@code threshold} value tells the underlying AveragePartials functions how many unique key sums and counts to accumulate
058 * in the LRU cache, before emitting the least recently used entry. This accumulation happens map-side, and thus is
059 * bounded by the size of your map task JVM and the typical size of each group key.
060 * <p>
061 * By default, either the value of {@link cascading.pipe.assembly.AggregateByProps#AGGREGATE_BY_CAPACITY} System property
062 * or {@link cascading.pipe.assembly.AggregateByProps#AGGREGATE_BY_DEFAULT_CAPACITY} will be used.
063 *
064 * @see cascading.pipe.assembly.AggregateBy
065 */
066public class AverageBy extends AggregateBy
067  {
068  public enum Include
069    {
070      ALL,
071      NO_NULLS
072    }
073
074  /**
075   * Class AveragePartials is a {@link cascading.pipe.assembly.AggregateBy.Functor} that is used to count and sum observed duplicates from the tuple stream.
076   *
077   * @see cascading.pipe.assembly.AverageBy
078   */
079  public static class AveragePartials implements Functor
080    {
081    private final Fields declaredFields;
082    private final Include include;
083
084    /**
085     * Constructor AveragePartials creates a new AveragePartials instance.
086     *
087     * @param declaredFields of type Fields
088     */
089    public AveragePartials( Fields declaredFields )
090      {
091      this.declaredFields = declaredFields;
092      this.include = Include.ALL;
093      }
094
095    public AveragePartials( Fields declaredFields, Include include )
096      {
097      this.declaredFields = declaredFields;
098      this.include = include;
099      }
100
101    @Override
102    public Fields getDeclaredFields()
103      {
104      Fields sumName = new Fields( AverageBy.class.getPackage().getName() + "." + declaredFields.get( 0 ) + ".sum", Double.class );
105      Fields countName = new Fields( AverageBy.class.getPackage().getName() + "." + declaredFields.get( 0 ) + ".count", Long.class );
106
107      return sumName.append( countName );
108      }
109
110    @Override
111    public Tuple aggregate( FlowProcess flowProcess, TupleEntry args, Tuple context )
112      {
113      if( context == null )
114        context = Tuple.size( 2 );
115
116      if( include == Include.NO_NULLS && args.getObject( 0 ) == null )
117        return context;
118
119      context.set( 0, context.getDouble( 0 ) + args.getDouble( 0 ) );
120      context.set( 1, context.getLong( 1 ) + 1 );
121
122      return context;
123      }
124
125    @Override
126    public Tuple complete( FlowProcess flowProcess, Tuple context )
127      {
128      return context;
129      }
130    }
131
132  /**
133   * Class AverageFinal is used to finalize the average operation on the Reduce side of the process. It must be used
134   * in tandem with a {@link AveragePartials} Functor.
135   */
136  public static class AverageFinal extends BaseOperation<AverageFinal.Context> implements Aggregator<AverageFinal.Context>
137    {
138    /** Class Context is used to hold intermediate values. */
139    protected static class Context
140      {
141      long nulls = 0L;
142      double sum = 0.0D;
143      long count = 0L;
144      Type type = Double.class;
145      CoercibleType canonical;
146
147      Tuple tuple = Tuple.size( 1 );
148
149      public Context( Fields fieldDeclaration )
150        {
151        if( fieldDeclaration.hasTypes() )
152          this.type = fieldDeclaration.getType( 0 );
153
154        this.canonical = Coercions.coercibleTypeFor( this.type );
155        }
156
157      public Context reset()
158        {
159        nulls = 0L;
160        sum = 0.0D;
161        count = 0L;
162        tuple.set( 0, null );
163
164        return this;
165        }
166
167      public Tuple result()
168        {
169        // we only saw null from upstream, so return null
170        if( count == 0 && nulls != 0 )
171          return tuple;
172
173        tuple.set( 0, canonical.canonical( sum / count ) );
174
175        return tuple;
176        }
177      }
178
179    /**
180     * Constructs a new instance that returns the average of the values encountered in the given fieldDeclaration field name.
181     *
182     * @param fieldDeclaration of type Fields
183     */
184    public AverageFinal( Fields fieldDeclaration )
185      {
186      super( 2, makeFieldDeclaration( fieldDeclaration ) );
187
188      if( !fieldDeclaration.isSubstitution() && fieldDeclaration.size() != 1 )
189        throw new IllegalArgumentException( "fieldDeclaration may only declare 1 field, got: " + fieldDeclaration.size() );
190      }
191
192    private static Fields makeFieldDeclaration( Fields fieldDeclaration )
193      {
194      if( fieldDeclaration.hasTypes() )
195        return fieldDeclaration;
196
197      return fieldDeclaration.applyTypes( Double.class );
198      }
199
200    @Override
201    public void prepare( FlowProcess flowProcess, OperationCall<Context> operationCall )
202      {
203      operationCall.setContext( new Context( getFieldDeclaration() ) );
204      }
205
206    @Override
207    public void start( FlowProcess flowProcess, AggregatorCall<Context> aggregatorCall )
208      {
209      aggregatorCall.getContext().reset();
210      }
211
212    @Override
213    public void aggregate( FlowProcess flowProcess, AggregatorCall<Context> aggregatorCall )
214      {
215      Context context = aggregatorCall.getContext();
216      TupleEntry arguments = aggregatorCall.getArguments();
217
218      if( arguments.getObject( 0 ) == null )
219        {
220        context.nulls++;
221        return;
222        }
223
224      context.sum += arguments.getDouble( 0 );
225      context.count += arguments.getLong( 1 );
226      }
227
228    @Override
229    public void complete( FlowProcess flowProcess, AggregatorCall<Context> aggregatorCall )
230      {
231      aggregatorCall.getOutputCollector().add( aggregatorCall.getContext().result() );
232      }
233    }
234
235  /////////
236
237  /**
238   * Constructor AverageBy creates a new AverageBy instance. Use this constructor when used with a {@link cascading.pipe.assembly.AggregateBy}
239   * instance.
240   *
241   * @param valueField   of type Fields
242   * @param averageField of type Fields
243   */
244  @ConstructorProperties({"valueField", "averageField"})
245  public AverageBy( Fields valueField, Fields averageField )
246    {
247    super( valueField, new AveragePartials( averageField ), new AverageFinal( averageField ) );
248    }
249
250  /**
251   * Constructor AverageBy creates a new AverageBy instance. Use this constructor when used with a {@link cascading.pipe.assembly.AggregateBy}
252   * instance.
253   *
254   * @param valueField   of type Fields
255   * @param averageField of type Fields
256   * @param include      of type boolean
257   */
258  @ConstructorProperties({"valueField", "averageField", "include"})
259  public AverageBy( Fields valueField, Fields averageField, Include include )
260    {
261    super( valueField, new AveragePartials( averageField, include ), new AverageFinal( averageField ) );
262    }
263
264  //////////////
265
266  /**
267   * Constructor AverageBy creates a new AverageBy instance.
268   *
269   * @param pipe           of type Pipe
270   * @param groupingFields of type Fields
271   * @param valueField     of type Fields
272   * @param averageField   of type Fields
273   */
274  @ConstructorProperties({"pipe", "groupingFields", "valueField", "averageField"})
275  public AverageBy( Pipe pipe, Fields groupingFields, Fields valueField, Fields averageField )
276    {
277    this( null, pipe, groupingFields, valueField, averageField, USE_DEFAULT_THRESHOLD );
278    }
279
280  /**
281   * Constructor AverageBy creates a new AverageBy instance.
282   *
283   * @param pipe           of type Pipe
284   * @param groupingFields of type Fields
285   * @param valueField     of type Fields
286   * @param averageField   of type Fields
287   * @param threshold      of type int
288   */
289  @ConstructorProperties({"pipe", "groupingFields", "valueField", "averageField", "threshold"})
290  public AverageBy( Pipe pipe, Fields groupingFields, Fields valueField, Fields averageField, int threshold )
291    {
292    this( null, pipe, groupingFields, valueField, averageField, threshold );
293    }
294
295  /**
296   * Constructor AverageBy creates a new AverageBy instance.
297   *
298   * @param name           of type String
299   * @param pipe           of type Pipe
300   * @param groupingFields of type Fields
301   * @param valueField     of type Fields
302   * @param averageField   of type Fields
303   */
304  @ConstructorProperties({"name", "pipe", "groupingFields", "valueField", "averageField"})
305  public AverageBy( String name, Pipe pipe, Fields groupingFields, Fields valueField, Fields averageField )
306    {
307    this( name, pipe, groupingFields, valueField, averageField, USE_DEFAULT_THRESHOLD );
308    }
309
310  /**
311   * Constructor AverageBy creates a new AverageBy instance.
312   *
313   * @param name           of type String
314   * @param pipe           of type Pipe
315   * @param groupingFields of type Fields
316   * @param valueField     of type Fields
317   * @param averageField   of type Fields
318   * @param threshold      of type int
319   */
320  @ConstructorProperties({"name", "pipe", "groupingFields", "valueField", "averageField", "threshold"})
321  public AverageBy( String name, Pipe pipe, Fields groupingFields, Fields valueField, Fields averageField, int threshold )
322    {
323    this( name, Pipe.pipes( pipe ), groupingFields, valueField, averageField, threshold );
324    }
325
326  /**
327   * Constructor AverageBy creates a new AverageBy instance.
328   *
329   * @param pipes          of type Pipe[]
330   * @param groupingFields of type Fields
331   * @param valueField     of type Fields
332   * @param averageField   of type Fields
333   */
334  @ConstructorProperties({"pipes", "groupingFields", "valueField", "averageField"})
335  public AverageBy( Pipe[] pipes, Fields groupingFields, Fields valueField, Fields averageField )
336    {
337    this( null, pipes, groupingFields, valueField, averageField, USE_DEFAULT_THRESHOLD );
338    }
339
340  /**
341   * Constructor AverageBy creates a new AverageBy instance.
342   *
343   * @param pipes          of type Pipe[]
344   * @param groupingFields of type Fields
345   * @param valueField     of type Fields
346   * @param averageField   of type Fields
347   * @param threshold      of type int
348   */
349  @ConstructorProperties({"pipes", "groupingFields", "valueField", "averageField", "threshold"})
350  public AverageBy( Pipe[] pipes, Fields groupingFields, Fields valueField, Fields averageField, int threshold )
351    {
352    this( null, pipes, groupingFields, valueField, averageField, threshold );
353    }
354
355  /**
356   * Constructor AverageBy creates a new AverageBy instance.
357   *
358   * @param name           of type String
359   * @param pipes          of type Pipe[]
360   * @param groupingFields of type Fields
361   * @param valueField     of type Fields
362   * @param averageField   of type Fields
363   */
364  @ConstructorProperties({"name", "pipes", "groupingFields", "valueField", "averageField"})
365  public AverageBy( String name, Pipe[] pipes, Fields groupingFields, Fields valueField, Fields averageField )
366    {
367    this( name, pipes, groupingFields, valueField, averageField, USE_DEFAULT_THRESHOLD );
368    }
369
370  /**
371   * Constructor AverageBy creates a new AverageBy instance.
372   *
373   * @param name           of type String
374   * @param pipes          of type Pipe[]
375   * @param groupingFields of type Fields
376   * @param valueField     of type Fields
377   * @param averageField   of type Fields
378   * @param threshold      of type int
379   */
380  @ConstructorProperties({"name", "pipes", "groupingFields", "valueField", "averageField", "threshold"})
381  public AverageBy( String name, Pipe[] pipes, Fields groupingFields, Fields valueField, Fields averageField, int threshold )
382    {
383    super( name, pipes, groupingFields, valueField, new AveragePartials( averageField ), new AverageFinal( averageField ), threshold );
384    }
385
386  /**
387   * Constructor AverageBy creates a new AverageBy instance.
388   *
389   * @param pipe           of type Pipe
390   * @param groupingFields of type Fields
391   * @param valueField     of type Fields
392   * @param averageField   of type Fields
393   * @param include        of type boolean
394   */
395  @ConstructorProperties({"pipe", "groupingFields", "valueField", "averageField", "include"})
396  public AverageBy( Pipe pipe, Fields groupingFields, Fields valueField, Fields averageField, Include include )
397    {
398    this( null, pipe, groupingFields, valueField, averageField, include, USE_DEFAULT_THRESHOLD );
399    }
400
401  /**
402   * Constructor AverageBy creates a new AverageBy instance.
403   *
404   * @param pipe           of type Pipe
405   * @param groupingFields of type Fields
406   * @param valueField     of type Fields
407   * @param averageField   of type Fields
408   * @param include        of type boolean
409   * @param threshold      of type int
410   */
411  @ConstructorProperties({"pipe", "groupingFields", "valueField", "averageField", "include", "threshold"})
412  public AverageBy( Pipe pipe, Fields groupingFields, Fields valueField, Fields averageField, Include include, int threshold )
413    {
414    this( null, pipe, groupingFields, valueField, averageField, include, threshold );
415    }
416
417  /**
418   * Constructor AverageBy creates a new AverageBy instance.
419   *
420   * @param name           of type String
421   * @param pipe           of type Pipe
422   * @param groupingFields of type Fields
423   * @param valueField     of type Fields
424   * @param averageField   of type Fields
425   * @param include        of type boolean
426   */
427  @ConstructorProperties({"name", "pipe", "groupingFields", "valueField", "averageField", "include"})
428  public AverageBy( String name, Pipe pipe, Fields groupingFields, Fields valueField, Fields averageField, Include include )
429    {
430    this( name, pipe, groupingFields, valueField, averageField, include, USE_DEFAULT_THRESHOLD );
431    }
432
433  /**
434   * Constructor AverageBy creates a new AverageBy instance.
435   *
436   * @param name           of type String
437   * @param pipe           of type Pipe
438   * @param groupingFields of type Fields
439   * @param valueField     of type Fields
440   * @param averageField   of type Fields
441   * @param include        of type boolean
442   * @param threshold      of type int
443   */
444  @ConstructorProperties({"name", "pipe", "groupingFields", "valueField", "averageField", "include", "threshold"})
445  public AverageBy( String name, Pipe pipe, Fields groupingFields, Fields valueField, Fields averageField, Include include, int threshold )
446    {
447    this( name, Pipe.pipes( pipe ), groupingFields, valueField, averageField, include, threshold );
448    }
449
450  /**
451   * Constructor AverageBy creates a new AverageBy instance.
452   *
453   * @param pipes          of type Pipe[]
454   * @param groupingFields of type Fields
455   * @param valueField     of type Fields
456   * @param averageField   of type Fields
457   * @param include        of type boolean
458   */
459  @ConstructorProperties({"pipes", "groupingFields", "valueField", "averageField", "include"})
460  public AverageBy( Pipe[] pipes, Fields groupingFields, Fields valueField, Fields averageField, Include include )
461    {
462    this( null, pipes, groupingFields, valueField, averageField, include, USE_DEFAULT_THRESHOLD );
463    }
464
465  /**
466   * Constructor AverageBy creates a new AverageBy instance.
467   *
468   * @param pipes          of type Pipe[]
469   * @param groupingFields of type Fields
470   * @param valueField     of type Fields
471   * @param averageField   of type Fields
472   * @param include        of type boolean
473   * @param threshold      of type int
474   */
475  @ConstructorProperties({"pipes", "groupingFields", "valueField", "averageField", "include", "threshold"})
476  public AverageBy( Pipe[] pipes, Fields groupingFields, Fields valueField, Fields averageField, Include include, int threshold )
477    {
478    this( null, pipes, groupingFields, valueField, averageField, include, threshold );
479    }
480
481  /**
482   * Constructor AverageBy creates a new AverageBy instance.
483   *
484   * @param name           of type String
485   * @param pipes          of type Pipe[]
486   * @param groupingFields of type Fields
487   * @param valueField     of type Fields
488   * @param averageField   of type Fields
489   * @param include        of type boolean
490   */
491  @ConstructorProperties({"name", "pipes", "groupingFields", "valueField", "averageField", "include"})
492  public AverageBy( String name, Pipe[] pipes, Fields groupingFields, Fields valueField, Fields averageField, Include include )
493    {
494    this( name, pipes, groupingFields, valueField, averageField, include, USE_DEFAULT_THRESHOLD );
495    }
496
497  /**
498   * Constructor AverageBy creates a new AverageBy instance.
499   *
500   * @param name           of type String
501   * @param pipes          of type Pipe[]
502   * @param groupingFields of type Fields
503   * @param valueField     of type Fields
504   * @param averageField   of type Fields
505   * @param include        of type boolean
506   * @param threshold      of type int
507   */
508  @ConstructorProperties({"name", "pipes", "groupingFields", "valueField", "averageField", "include", "threshold"})
509  public AverageBy( String name, Pipe[] pipes, Fields groupingFields, Fields valueField, Fields averageField, Include include, int threshold )
510    {
511    super( name, pipes, groupingFields, valueField, new AveragePartials( averageField, include ), new AverageFinal( averageField ), threshold );
512    }
513  }