001    /*
002     * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.pipe.assembly;
022    
023    import java.beans.ConstructorProperties;
024    import java.lang.reflect.Type;
025    
026    import cascading.flow.FlowProcess;
027    import cascading.operation.aggregator.Sum;
028    import cascading.pipe.Pipe;
029    import cascading.tuple.Fields;
030    import cascading.tuple.Tuple;
031    import cascading.tuple.TupleEntry;
032    import cascading.tuple.coerce.Coercions;
033    import cascading.tuple.type.CoercibleType;
034    
035    /**
036     * Class SumBy is used to sum values associated with duplicate keys in a tuple stream.
037     * <p/>
038     * Typically finding the sum of field in a tuple stream relies on a {@link cascading.pipe.GroupBy} and a {@link cascading.operation.aggregator.Sum}
039     * {@link cascading.operation.Aggregator} operation.
040     * <p/>
041     * If all the values to be summed are all {@code null}, the result value is a function of how null is coerced by the
042     * given {@code sumType}. If a primitive type, {@code 0} will be returned. Otherwise {@code null}.
043     * <p/>
044     * This SubAssembly also uses the {@link SumBy.SumPartials} {@link AggregateBy.Functor}
045     * to sum field values before the GroupBy operator to reduce IO over the network.
046     * <p/>
047     * This strategy is similar to using {@code combiners}, except no sorting or serialization is invoked and results
048     * in a much simpler mechanism.
049     * <p/>
050     * The {@code threshold} value tells the underlying SumPartials functions how many unique key sums to accumulate
051     * in the LRU cache, before emitting the least recently used entry. This accumulation happens map-side, and thus is
052     * bounded by the size of your map task JVM and the typical size of each group key.
053     * <p/>
054     * By default, either the value of {@link cascading.pipe.assembly.AggregateByProps#AGGREGATE_BY_CAPACITY} System property
055     * or {@link cascading.pipe.assembly.AggregateByProps#AGGREGATE_BY_DEFAULT_CAPACITY} will be used.
056     *
057     * @see AggregateBy
058     */
059    public class SumBy extends AggregateBy
060      {
061      /** DEFAULT_THRESHOLD */
062      @Deprecated
063      public static final int DEFAULT_THRESHOLD = 10000;
064    
065      /**
066       * Class SumPartials is a {@link AggregateBy.Functor} that is used to sum observed duplicates from the tuple stream.
067       * <p/>
068       * Use this class typically in tandem with a {@link cascading.operation.aggregator.Sum}
069       * {@link cascading.operation.Aggregator} in order to improve counting performance by removing as many values
070       * as possible before the intermediate {@link cascading.pipe.GroupBy} operator.
071       *
072       * @see SumBy
073       */
074      public static class SumPartials implements Functor
075        {
076        private final Fields declaredFields;
077        private final Type sumType;
078        private final CoercibleType canonical;
079    
080        /** Constructor SumPartials creates a new SumPartials instance. */
081        public SumPartials( Fields declaredFields )
082          {
083          this.declaredFields = declaredFields;
084    
085          if( !declaredFields.hasTypes() )
086            throw new IllegalArgumentException( "result type must be declared " );
087    
088          this.sumType = declaredFields.getType( 0 );
089    
090          if( declaredFields.size() != 1 )
091            throw new IllegalArgumentException( "declared fields may only have one field, got: " + declaredFields );
092    
093          this.canonical = Coercions.coercibleTypeFor( this.sumType );
094          }
095    
096        public SumPartials( Fields declaredFields, Class sumType )
097          {
098          this.declaredFields = declaredFields;
099          this.sumType = sumType;
100    
101          if( declaredFields.size() != 1 )
102            throw new IllegalArgumentException( "declared fields may only have one field, got: " + declaredFields );
103    
104          this.canonical = Coercions.coercibleTypeFor( this.sumType );
105          }
106    
107        @Override
108        public Fields getDeclaredFields()
109          {
110          return declaredFields;
111          }
112    
113        @Override
114        public Tuple aggregate( FlowProcess flowProcess, TupleEntry args, Tuple context )
115          {
116          if( context == null )
117            return args.getTupleCopy();
118          else if( args.getObject( 0 ) == null )
119            return context;
120    
121          context.set( 0, context.getDouble( 0 ) + args.getDouble( 0 ) );
122    
123          return context;
124          }
125    
126        @Override
127        public Tuple complete( FlowProcess flowProcess, Tuple context )
128          {
129          context.set( 0, canonical.canonical( context.getObject( 0 ) ) );
130    
131          return context;
132          }
133        }
134    
135      /**
136       * Constructor SumBy creates a new SumBy instance. Use this constructor when used with a {@link AggregateBy}
137       * instance.
138       *
139       * @param valueField of type Fields
140       * @param sumField   of type Fields
141       */
142      @ConstructorProperties({"valueField", "sumField"})
143      public SumBy( Fields valueField, Fields sumField )
144        {
145        super( valueField, new SumPartials( sumField ), new Sum( sumField ) );
146        }
147    
148      //////////////
149    
150      /**
151       * Constructor SumBy creates a new SumBy instance.
152       *
153       * @param pipe           of type Pipe
154       * @param groupingFields of type Fields
155       * @param valueField     of type Fields
156       * @param sumField       of type Fields
157       */
158      @ConstructorProperties({"pipe", "groupingFields", "valueField", "sumField"})
159      public SumBy( Pipe pipe, Fields groupingFields, Fields valueField, Fields sumField )
160        {
161        this( null, pipe, groupingFields, valueField, sumField, USE_DEFAULT_THRESHOLD );
162        }
163    
164      /**
165       * Constructor SumBy creates a new SumBy instance.
166       *
167       * @param pipe           of type Pipe
168       * @param groupingFields of type Fields
169       * @param valueField     of type Fields
170       * @param sumField       of type Fields
171       * @param threshold      of type int
172       */
173      @ConstructorProperties({"pipe", "groupingFields", "valueField", "sumField", "threshold"})
174      public SumBy( Pipe pipe, Fields groupingFields, Fields valueField, Fields sumField, int threshold )
175        {
176        this( null, pipe, groupingFields, valueField, sumField, threshold );
177        }
178    
179      /**
180       * Constructor SumBy creates a new SumBy instance.
181       *
182       * @param name           of type String
183       * @param pipe           of type Pipe
184       * @param groupingFields of type Fields
185       * @param valueField     of type Fields
186       * @param sumField       of type Fields
187       */
188      @ConstructorProperties({"name", "pipe", "groupingFields", "valueField", "sumField"})
189      public SumBy( String name, Pipe pipe, Fields groupingFields, Fields valueField, Fields sumField )
190        {
191        this( name, pipe, groupingFields, valueField, sumField, USE_DEFAULT_THRESHOLD );
192        }
193    
194      /**
195       * Constructor SumBy creates a new SumBy instance.
196       *
197       * @param name           of type String
198       * @param pipe           of type Pipe
199       * @param groupingFields of type Fields
200       * @param valueField     of type Fields
201       * @param sumField       of type Fields
202       * @param threshold      of type int
203       */
204      @ConstructorProperties({"name", "pipe", "groupingFields", "valueField", "sumField", "threshold"})
205      public SumBy( String name, Pipe pipe, Fields groupingFields, Fields valueField, Fields sumField, int threshold )
206        {
207        this( name, Pipe.pipes( pipe ), groupingFields, valueField, sumField, threshold );
208        }
209    
210      /**
211       * Constructor SumBy creates a new SumBy instance.
212       *
213       * @param pipes          of type Pipe[]
214       * @param groupingFields of type Fields
215       * @param valueField     of type Fields
216       * @param sumField       of type Fields
217       */
218      @ConstructorProperties({"pipes", "groupingFields", "valueField", "sumField"})
219      public SumBy( Pipe[] pipes, Fields groupingFields, Fields valueField, Fields sumField )
220        {
221        this( null, pipes, groupingFields, valueField, sumField, USE_DEFAULT_THRESHOLD );
222        }
223    
224      /**
225       * Constructor SumBy creates a new SumBy instance.
226       *
227       * @param pipes          of type Pipe[]
228       * @param groupingFields of type Fields
229       * @param valueField     of type Fields
230       * @param sumField       of type Fields
231       * @param threshold      of type int
232       */
233      @ConstructorProperties({"pipes", "groupingFields", "valueField", "sumField", "threshold"})
234      public SumBy( Pipe[] pipes, Fields groupingFields, Fields valueField, Fields sumField, int threshold )
235        {
236        this( null, pipes, groupingFields, valueField, sumField, threshold );
237        }
238    
239      /**
240       * Constructor SumBy creates a new SumBy instance.
241       *
242       * @param name           of type String
243       * @param pipes          of type Pipe[]
244       * @param groupingFields of type Fields
245       * @param valueField     of type Fields
246       * @param sumField       of type Fields
247       */
248      @ConstructorProperties({"name", "pipes", "groupingFields", "valueField", "sumField"})
249      public SumBy( String name, Pipe[] pipes, Fields groupingFields, Fields valueField, Fields sumField )
250        {
251        this( name, pipes, groupingFields, valueField, sumField, USE_DEFAULT_THRESHOLD );
252        }
253    
254      /**
255       * Constructor SumBy creates a new SumBy instance.
256       *
257       * @param name           of type String
258       * @param pipes          of type Pipe[]
259       * @param groupingFields of type Fields
260       * @param valueField     of type Fields
261       * @param sumField       of type Fields
262       * @param threshold      of type int
263       */
264      @ConstructorProperties({"name", "pipes", "groupingFields", "valueField", "sumField", "threshold"})
265      public SumBy( String name, Pipe[] pipes, Fields groupingFields, Fields valueField, Fields sumField, int threshold )
266        {
267        super( name, pipes, groupingFields, valueField, new SumPartials( sumField ), new Sum( sumField ), threshold );
268        }
269    
270    ///////////
271    
272      /**
273       * Constructor SumBy creates a new SumBy instance. Use this constructor when used with a {@link AggregateBy}
274       * instance.
275       *
276       * @param valueField of type Fields
277       * @param sumField   of type Fields
278       * @param sumType    of type Class
279       */
280      @ConstructorProperties({"valueField", "sumField", "sumType"})
281      public SumBy( Fields valueField, Fields sumField, Class sumType )
282        {
283        super( valueField, new SumPartials( sumField, sumType ), new Sum( sumField, sumType ) );
284        }
285    
286    //////////////
287    
288      /**
289       * Constructor SumBy creates a new SumBy instance.
290       *
291       * @param pipe           of type Pipe
292       * @param groupingFields of type Fields
293       * @param valueField     of type Fields
294       * @param sumField       of type Fields
295       * @param sumType        of type Class
296       */
297      @ConstructorProperties({"pipe", "groupingFields", "valueField", "sumField", "sumType"})
298      public SumBy( Pipe pipe, Fields groupingFields, Fields valueField, Fields sumField, Class sumType )
299        {
300        this( null, pipe, groupingFields, valueField, sumField, sumType, USE_DEFAULT_THRESHOLD );
301        }
302    
303      /**
304       * Constructor SumBy creates a new SumBy instance.
305       *
306       * @param pipe           of type Pipe
307       * @param groupingFields of type Fields
308       * @param valueField     of type Fields
309       * @param sumField       of type Fields
310       * @param sumType        of type Class
311       * @param threshold      of type int
312       */
313      @ConstructorProperties({"pipe", "groupingFields", "valueField", "sumField", "sumType", "threshold"})
314      public SumBy( Pipe pipe, Fields groupingFields, Fields valueField, Fields sumField, Class sumType, int threshold )
315        {
316        this( null, pipe, groupingFields, valueField, sumField, sumType, threshold );
317        }
318    
319      /**
320       * Constructor SumBy creates a new SumBy instance.
321       *
322       * @param name           of type String
323       * @param pipe           of type Pipe
324       * @param groupingFields of type Fields
325       * @param valueField     of type Fields
326       * @param sumField       of type Fields
327       * @param sumType        of type Class
328       */
329      @ConstructorProperties({"name", "pipe", "groupingFields", "valueField", "sumField", "sumType"})
330      public SumBy( String name, Pipe pipe, Fields groupingFields, Fields valueField, Fields sumField, Class sumType )
331        {
332        this( name, pipe, groupingFields, valueField, sumField, sumType, USE_DEFAULT_THRESHOLD );
333        }
334    
335      /**
336       * Constructor SumBy creates a new SumBy instance.
337       *
338       * @param name           of type String
339       * @param pipe           of type Pipe
340       * @param groupingFields of type Fields
341       * @param valueField     of type Fields
342       * @param sumField       of type Fields
343       * @param sumType        of type Class
344       * @param threshold      of type int
345       */
346      @ConstructorProperties({"name", "pipe", "groupingFields", "valueField", "sumField", "sumType", "threshold"})
347      public SumBy( String name, Pipe pipe, Fields groupingFields, Fields valueField, Fields sumField, Class sumType, int threshold )
348        {
349        this( name, Pipe.pipes( pipe ), groupingFields, valueField, sumField, sumType, threshold );
350        }
351    
352      /**
353       * Constructor SumBy creates a new SumBy instance.
354       *
355       * @param pipes          of type Pipe[]
356       * @param groupingFields of type Fields
357       * @param valueField     of type Fields
358       * @param sumField       of type Fields
359       * @param sumType        of type Class
360       */
361      @ConstructorProperties({"pipes", "groupingFields", "valueField", "sumField", "sumType"})
362      public SumBy( Pipe[] pipes, Fields groupingFields, Fields valueField, Fields sumField, Class sumType )
363        {
364        this( null, pipes, groupingFields, valueField, sumField, sumType, USE_DEFAULT_THRESHOLD );
365        }
366    
367      /**
368       * Constructor SumBy creates a new SumBy instance.
369       *
370       * @param pipes          of type Pipe[]
371       * @param groupingFields of type Fields
372       * @param valueField     of type Fields
373       * @param sumField       of type Fields
374       * @param sumType        of type Class
375       * @param threshold      of type int
376       */
377      @ConstructorProperties({"pipes", "groupingFields", "valueField", "sumField", "sumType", "threshold"})
378      public SumBy( Pipe[] pipes, Fields groupingFields, Fields valueField, Fields sumField, Class sumType, int threshold )
379        {
380        this( null, pipes, groupingFields, valueField, sumField, sumType, threshold );
381        }
382    
383      /**
384       * Constructor SumBy creates a new SumBy instance.
385       *
386       * @param name           of type String
387       * @param pipes          of type Pipe[]
388       * @param groupingFields of type Fields
389       * @param valueField     of type Fields
390       * @param sumField       of type Fields
391       * @param sumType        of type Class
392       */
393      @ConstructorProperties({"name", "pipes", "groupingFields", "valueField", "sumField", "sumType"})
394      public SumBy( String name, Pipe[] pipes, Fields groupingFields, Fields valueField, Fields sumField, Class sumType )
395        {
396        this( name, pipes, groupingFields, valueField, sumField, sumType, USE_DEFAULT_THRESHOLD );
397        }
398    
399      /**
400       * Constructor SumBy creates a new SumBy instance.
401       *
402       * @param name           of type String
403       * @param pipes          of type Pipe[]
404       * @param groupingFields of type Fields
405       * @param valueField     of type Fields
406       * @param sumField       of type Fields
407       * @param sumType        of type Class
408       * @param threshold      of type int
409       */
410      @ConstructorProperties({"name", "pipes", "groupingFields", "valueField", "sumField", "sumType", "threshold"})
411      public SumBy( String name, Pipe[] pipes, Fields groupingFields, Fields valueField, Fields sumField, Class sumType, int threshold )
412        {
413        super( name, pipes, groupingFields, valueField, new SumPartials( sumField, sumType ), new Sum( sumField, sumType ), threshold );
414        }
415      }