001/*
002 * Copyright (c) 2016-2017 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
004 *
005 * Project and contact information: http://www.cascading.org/
006 *
007 * This file is part of the Cascading project.
008 *
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *     http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 */
021
022package cascading.pipe.assembly;
023
024import java.beans.ConstructorProperties;
025import java.lang.reflect.Type;
026
027import cascading.flow.FlowProcess;
028import cascading.operation.aggregator.Sum;
029import cascading.pipe.Pipe;
030import cascading.tuple.Fields;
031import cascading.tuple.Tuple;
032import cascading.tuple.TupleEntry;
033import cascading.tuple.coerce.Coercions;
034import cascading.tuple.type.CoercibleType;
035
036/**
037 * Class SumBy is used to sum values associated with duplicate keys in a tuple stream.
038 * <p>
039 * Typically finding the sum of field in a tuple stream relies on a {@link cascading.pipe.GroupBy} and a {@link cascading.operation.aggregator.Sum}
040 * {@link cascading.operation.Aggregator} operation.
041 * <p>
042 * If all the values to be summed are all {@code null}, the result value is a function of how null is coerced by the
043 * given {@code sumType}. If a primitive type, {@code 0} will be returned. Otherwise {@code null}.
044 * <p>
045 * This SubAssembly also uses the {@link SumBy.SumPartials} {@link AggregateBy.Functor}
046 * to sum field values before the GroupBy operator to reduce IO over the network.
047 * <p>
048 * This strategy is similar to using {@code combiners}, except no sorting or serialization is invoked and results
049 * in a much simpler mechanism.
050 * <p>
051 * The {@code threshold} value tells the underlying SumPartials functions how many unique key sums to accumulate
052 * in the LRU cache, before emitting the least recently used entry. This accumulation happens map-side, and thus is
053 * bounded by the size of your map task JVM and the typical size of each group key.
054 * <p>
055 * By default, either the value of {@link cascading.pipe.assembly.AggregateByProps#AGGREGATE_BY_CAPACITY} System property
056 * or {@link cascading.pipe.assembly.AggregateByProps#AGGREGATE_BY_DEFAULT_CAPACITY} will be used.
057 *
058 * @see AggregateBy
059 */
060public class SumBy extends AggregateBy
061  {
062  /**
063   * Class SumPartials is a {@link AggregateBy.Functor} that is used to sum observed duplicates from the tuple stream.
064   * <p>
065   * Use this class typically in tandem with a {@link cascading.operation.aggregator.Sum}
066   * {@link cascading.operation.Aggregator} in order to improve counting performance by removing as many values
067   * as possible before the intermediate {@link cascading.pipe.GroupBy} operator.
068   *
069   * @see SumBy
070   */
071  public static class SumPartials implements Functor
072    {
073    private final Fields declaredFields;
074    private final Type sumType;
075    private final CoercibleType canonical;
076
077    /** Constructor SumPartials creates a new SumPartials instance. */
078    public SumPartials( Fields declaredFields )
079      {
080      this.declaredFields = declaredFields;
081
082      if( !declaredFields.hasTypes() )
083        throw new IllegalArgumentException( "result type must be declared " );
084
085      this.sumType = declaredFields.getType( 0 );
086
087      if( declaredFields.size() != 1 )
088        throw new IllegalArgumentException( "declared fields may only have one field, got: " + declaredFields );
089
090      this.canonical = Coercions.coercibleTypeFor( this.sumType );
091      }
092
093    public SumPartials( Fields declaredFields, Class sumType )
094      {
095      this.declaredFields = declaredFields;
096      this.sumType = sumType;
097
098      if( declaredFields.size() != 1 )
099        throw new IllegalArgumentException( "declared fields may only have one field, got: " + declaredFields );
100
101      this.canonical = Coercions.coercibleTypeFor( this.sumType );
102      }
103
104    @Override
105    public Fields getDeclaredFields()
106      {
107      return declaredFields;
108      }
109
110    @Override
111    public Tuple aggregate( FlowProcess flowProcess, TupleEntry args, Tuple context )
112      {
113      if( context == null )
114        return args.getTupleCopy();
115      else if( args.getObject( 0 ) == null )
116        return context;
117
118      context.set( 0, context.getDouble( 0 ) + args.getDouble( 0 ) );
119
120      return context;
121      }
122
123    @Override
124    public Tuple complete( FlowProcess flowProcess, Tuple context )
125      {
126      context.set( 0, canonical.canonical( context.getObject( 0 ) ) );
127
128      return context;
129      }
130    }
131
132  /**
133   * Constructor SumBy creates a new SumBy instance. Use this constructor when used with a {@link AggregateBy}
134   * instance.
135   *
136   * @param valueField of type Fields
137   * @param sumField   of type Fields
138   */
139  @ConstructorProperties({"valueField", "sumField"})
140  public SumBy( Fields valueField, Fields sumField )
141    {
142    super( valueField, new SumPartials( sumField ), new Sum( sumField ) );
143    }
144
145  //////////////
146
147  /**
148   * Constructor SumBy creates a new SumBy instance.
149   *
150   * @param pipe           of type Pipe
151   * @param groupingFields of type Fields
152   * @param valueField     of type Fields
153   * @param sumField       of type Fields
154   */
155  @ConstructorProperties({"pipe", "groupingFields", "valueField", "sumField"})
156  public SumBy( Pipe pipe, Fields groupingFields, Fields valueField, Fields sumField )
157    {
158    this( null, pipe, groupingFields, valueField, sumField, USE_DEFAULT_THRESHOLD );
159    }
160
161  /**
162   * Constructor SumBy creates a new SumBy instance.
163   *
164   * @param pipe           of type Pipe
165   * @param groupingFields of type Fields
166   * @param valueField     of type Fields
167   * @param sumField       of type Fields
168   * @param threshold      of type int
169   */
170  @ConstructorProperties({"pipe", "groupingFields", "valueField", "sumField", "threshold"})
171  public SumBy( Pipe pipe, Fields groupingFields, Fields valueField, Fields sumField, int threshold )
172    {
173    this( null, pipe, groupingFields, valueField, sumField, threshold );
174    }
175
176  /**
177   * Constructor SumBy creates a new SumBy instance.
178   *
179   * @param name           of type String
180   * @param pipe           of type Pipe
181   * @param groupingFields of type Fields
182   * @param valueField     of type Fields
183   * @param sumField       of type Fields
184   */
185  @ConstructorProperties({"name", "pipe", "groupingFields", "valueField", "sumField"})
186  public SumBy( String name, Pipe pipe, Fields groupingFields, Fields valueField, Fields sumField )
187    {
188    this( name, pipe, groupingFields, valueField, sumField, USE_DEFAULT_THRESHOLD );
189    }
190
191  /**
192   * Constructor SumBy creates a new SumBy instance.
193   *
194   * @param name           of type String
195   * @param pipe           of type Pipe
196   * @param groupingFields of type Fields
197   * @param valueField     of type Fields
198   * @param sumField       of type Fields
199   * @param threshold      of type int
200   */
201  @ConstructorProperties({"name", "pipe", "groupingFields", "valueField", "sumField", "threshold"})
202  public SumBy( String name, Pipe pipe, Fields groupingFields, Fields valueField, Fields sumField, int threshold )
203    {
204    this( name, Pipe.pipes( pipe ), groupingFields, valueField, sumField, threshold );
205    }
206
207  /**
208   * Constructor SumBy creates a new SumBy instance.
209   *
210   * @param pipes          of type Pipe[]
211   * @param groupingFields of type Fields
212   * @param valueField     of type Fields
213   * @param sumField       of type Fields
214   */
215  @ConstructorProperties({"pipes", "groupingFields", "valueField", "sumField"})
216  public SumBy( Pipe[] pipes, Fields groupingFields, Fields valueField, Fields sumField )
217    {
218    this( null, pipes, groupingFields, valueField, sumField, USE_DEFAULT_THRESHOLD );
219    }
220
221  /**
222   * Constructor SumBy creates a new SumBy instance.
223   *
224   * @param pipes          of type Pipe[]
225   * @param groupingFields of type Fields
226   * @param valueField     of type Fields
227   * @param sumField       of type Fields
228   * @param threshold      of type int
229   */
230  @ConstructorProperties({"pipes", "groupingFields", "valueField", "sumField", "threshold"})
231  public SumBy( Pipe[] pipes, Fields groupingFields, Fields valueField, Fields sumField, int threshold )
232    {
233    this( null, pipes, groupingFields, valueField, sumField, threshold );
234    }
235
236  /**
237   * Constructor SumBy creates a new SumBy instance.
238   *
239   * @param name           of type String
240   * @param pipes          of type Pipe[]
241   * @param groupingFields of type Fields
242   * @param valueField     of type Fields
243   * @param sumField       of type Fields
244   */
245  @ConstructorProperties({"name", "pipes", "groupingFields", "valueField", "sumField"})
246  public SumBy( String name, Pipe[] pipes, Fields groupingFields, Fields valueField, Fields sumField )
247    {
248    this( name, pipes, groupingFields, valueField, sumField, USE_DEFAULT_THRESHOLD );
249    }
250
251  /**
252   * Constructor SumBy creates a new SumBy instance.
253   *
254   * @param name           of type String
255   * @param pipes          of type Pipe[]
256   * @param groupingFields of type Fields
257   * @param valueField     of type Fields
258   * @param sumField       of type Fields
259   * @param threshold      of type int
260   */
261  @ConstructorProperties({"name", "pipes", "groupingFields", "valueField", "sumField", "threshold"})
262  public SumBy( String name, Pipe[] pipes, Fields groupingFields, Fields valueField, Fields sumField, int threshold )
263    {
264    super( name, pipes, groupingFields, valueField, new SumPartials( sumField ), new Sum( sumField ), threshold );
265    }
266
267///////////
268
269  /**
270   * Constructor SumBy creates a new SumBy instance. Use this constructor when used with a {@link AggregateBy}
271   * instance.
272   *
273   * @param valueField of type Fields
274   * @param sumField   of type Fields
275   * @param sumType    of type Class
276   */
277  @ConstructorProperties({"valueField", "sumField", "sumType"})
278  public SumBy( Fields valueField, Fields sumField, Class sumType )
279    {
280    super( valueField, new SumPartials( sumField, sumType ), new Sum( sumField, sumType ) );
281    }
282
283//////////////
284
285  /**
286   * Constructor SumBy creates a new SumBy instance.
287   *
288   * @param pipe           of type Pipe
289   * @param groupingFields of type Fields
290   * @param valueField     of type Fields
291   * @param sumField       of type Fields
292   * @param sumType        of type Class
293   */
294  @ConstructorProperties({"pipe", "groupingFields", "valueField", "sumField", "sumType"})
295  public SumBy( Pipe pipe, Fields groupingFields, Fields valueField, Fields sumField, Class sumType )
296    {
297    this( null, pipe, groupingFields, valueField, sumField, sumType, USE_DEFAULT_THRESHOLD );
298    }
299
300  /**
301   * Constructor SumBy creates a new SumBy instance.
302   *
303   * @param pipe           of type Pipe
304   * @param groupingFields of type Fields
305   * @param valueField     of type Fields
306   * @param sumField       of type Fields
307   * @param sumType        of type Class
308   * @param threshold      of type int
309   */
310  @ConstructorProperties({"pipe", "groupingFields", "valueField", "sumField", "sumType", "threshold"})
311  public SumBy( Pipe pipe, Fields groupingFields, Fields valueField, Fields sumField, Class sumType, int threshold )
312    {
313    this( null, pipe, groupingFields, valueField, sumField, sumType, threshold );
314    }
315
316  /**
317   * Constructor SumBy creates a new SumBy instance.
318   *
319   * @param name           of type String
320   * @param pipe           of type Pipe
321   * @param groupingFields of type Fields
322   * @param valueField     of type Fields
323   * @param sumField       of type Fields
324   * @param sumType        of type Class
325   */
326  @ConstructorProperties({"name", "pipe", "groupingFields", "valueField", "sumField", "sumType"})
327  public SumBy( String name, Pipe pipe, Fields groupingFields, Fields valueField, Fields sumField, Class sumType )
328    {
329    this( name, pipe, groupingFields, valueField, sumField, sumType, USE_DEFAULT_THRESHOLD );
330    }
331
332  /**
333   * Constructor SumBy creates a new SumBy instance.
334   *
335   * @param name           of type String
336   * @param pipe           of type Pipe
337   * @param groupingFields of type Fields
338   * @param valueField     of type Fields
339   * @param sumField       of type Fields
340   * @param sumType        of type Class
341   * @param threshold      of type int
342   */
343  @ConstructorProperties({"name", "pipe", "groupingFields", "valueField", "sumField", "sumType", "threshold"})
344  public SumBy( String name, Pipe pipe, Fields groupingFields, Fields valueField, Fields sumField, Class sumType, int threshold )
345    {
346    this( name, Pipe.pipes( pipe ), groupingFields, valueField, sumField, sumType, threshold );
347    }
348
349  /**
350   * Constructor SumBy creates a new SumBy instance.
351   *
352   * @param pipes          of type Pipe[]
353   * @param groupingFields of type Fields
354   * @param valueField     of type Fields
355   * @param sumField       of type Fields
356   * @param sumType        of type Class
357   */
358  @ConstructorProperties({"pipes", "groupingFields", "valueField", "sumField", "sumType"})
359  public SumBy( Pipe[] pipes, Fields groupingFields, Fields valueField, Fields sumField, Class sumType )
360    {
361    this( null, pipes, groupingFields, valueField, sumField, sumType, USE_DEFAULT_THRESHOLD );
362    }
363
364  /**
365   * Constructor SumBy creates a new SumBy instance.
366   *
367   * @param pipes          of type Pipe[]
368   * @param groupingFields of type Fields
369   * @param valueField     of type Fields
370   * @param sumField       of type Fields
371   * @param sumType        of type Class
372   * @param threshold      of type int
373   */
374  @ConstructorProperties({"pipes", "groupingFields", "valueField", "sumField", "sumType", "threshold"})
375  public SumBy( Pipe[] pipes, Fields groupingFields, Fields valueField, Fields sumField, Class sumType, int threshold )
376    {
377    this( null, pipes, groupingFields, valueField, sumField, sumType, threshold );
378    }
379
380  /**
381   * Constructor SumBy creates a new SumBy instance.
382   *
383   * @param name           of type String
384   * @param pipes          of type Pipe[]
385   * @param groupingFields of type Fields
386   * @param valueField     of type Fields
387   * @param sumField       of type Fields
388   * @param sumType        of type Class
389   */
390  @ConstructorProperties({"name", "pipes", "groupingFields", "valueField", "sumField", "sumType"})
391  public SumBy( String name, Pipe[] pipes, Fields groupingFields, Fields valueField, Fields sumField, Class sumType )
392    {
393    this( name, pipes, groupingFields, valueField, sumField, sumType, USE_DEFAULT_THRESHOLD );
394    }
395
396  /**
397   * Constructor SumBy creates a new SumBy instance.
398   *
399   * @param name           of type String
400   * @param pipes          of type Pipe[]
401   * @param groupingFields of type Fields
402   * @param valueField     of type Fields
403   * @param sumField       of type Fields
404   * @param sumType        of type Class
405   * @param threshold      of type int
406   */
407  @ConstructorProperties({"name", "pipes", "groupingFields", "valueField", "sumField", "sumType", "threshold"})
408  public SumBy( String name, Pipe[] pipes, Fields groupingFields, Fields valueField, Fields sumField, Class sumType, int threshold )
409    {
410    super( name, pipes, groupingFields, valueField, new SumPartials( sumField, sumType ), new Sum( sumField, sumType ), threshold );
411    }
412  }