001    /*
002     * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.pipe.assembly;
022    
023    import java.beans.ConstructorProperties;
024    
025    import cascading.flow.FlowProcess;
026    import cascading.operation.aggregator.MaxValue;
027    import cascading.pipe.Pipe;
028    import cascading.tuple.Fields;
029    import cascading.tuple.Tuple;
030    import cascading.tuple.TupleEntry;
031    
032    /**
033     * Class MaxBy is used to find the maximum value in a grouping.
034     * <p/>
035     * Typically finding the max value of a field in a tuple stream relies on a {@link cascading.pipe.GroupBy} and a
036     * {@link cascading.operation.aggregator.MaxValue} {@link cascading.operation.Aggregator} operation.
037     * <p/>
038     * This SubAssembly also uses the {@link cascading.pipe.assembly.MaxBy.MaxPartials} {@link cascading.pipe.assembly.AggregateBy.Functor}
039     * to track the maximum value before the GroupBy operator to reduce IO over the network.
040     * <p/>
041     * This strategy is similar to using {@code combiners}, except no sorting or serialization is invoked and results
042     * in a much simpler mechanism.
043     * <p/>
044     * The {@code threshold} value tells the underlying MaxPartials functions how many unique key sums to accumulate
045     * in the LRU cache, before emitting the least recently used entry. This accumulation happens map-side, and thus is
046     * bounded by the size of your map task JVM and the typical size of each group key.
047     * <p/>
048     * By default, either the value of {@link cascading.pipe.assembly.AggregateByProps#AGGREGATE_BY_CAPACITY} System property
049     * or {@link cascading.pipe.assembly.AggregateByProps#AGGREGATE_BY_DEFAULT_CAPACITY} will be used.
050     *
051     * @see AggregateBy
052     */
053    public class MaxBy extends AggregateBy
054      {
055      /** DEFAULT_THRESHOLD */
056      @Deprecated
057      public static final int DEFAULT_THRESHOLD = 10000;
058    
059      public static class MaxPartials implements Functor
060        {
061        private final Fields declaredFields;
062    
063        public MaxPartials( Fields declaredFields )
064          {
065          this.declaredFields = declaredFields;
066    
067          if( declaredFields.size() != 1 )
068            throw new IllegalArgumentException( "declared fields may only have one field, got: " + declaredFields );
069          }
070    
071        @Override
072        public Fields getDeclaredFields()
073          {
074          return declaredFields;
075          }
076    
077        @Override
078        public Tuple aggregate( FlowProcess flowProcess, TupleEntry args, Tuple context )
079          {
080          if( context == null )
081            return args.getTupleCopy();
082          else if( args.getObject( 0 ) == null )
083            return context;
084    
085          Comparable lhs = (Comparable) context.getObject( 0 );
086          Comparable rhs = (Comparable) args.getObject( 0 );
087    
088          if( ( lhs == null ) || ( lhs.compareTo( rhs ) < 0 ) )
089            context.set( 0, rhs );
090    
091          return context;
092          }
093    
094        @Override
095        public Tuple complete( FlowProcess flowProcess, Tuple context )
096          {
097          return context;
098          }
099        }
100    
101      /**
102       * Constructor MaxBy creates a new MaxBy instance. Use this constructor when used with a {@link AggregateBy}
103       * instance.
104       *
105       * @param valueField of type Fields
106       * @param maxField   of type Fields
107       */
108      @ConstructorProperties({"valueField", "maxField"})
109      public MaxBy( Fields valueField, Fields maxField )
110        {
111        super( valueField, new MaxPartials( maxField ), new MaxValue( maxField ) );
112        }
113    
114      //////////////
115    
116      /**
117       * Constructor MaxBy creates a new MaxBy instance.
118       *
119       * @param pipe           of type Pipe
120       * @param groupingFields of type Fields
121       * @param valueField     of type Fields
122       * @param maxField       of type Fields
123       */
124      @ConstructorProperties({"pipe", "groupingFields", "valueField", "maxField"})
125      public MaxBy( Pipe pipe, Fields groupingFields, Fields valueField, Fields maxField )
126        {
127        this( null, pipe, groupingFields, valueField, maxField, USE_DEFAULT_THRESHOLD );
128        }
129    
130      /**
131       * Constructor MaxBy creates a new MaxBy instance.
132       *
133       * @param pipe           of type Pipe
134       * @param groupingFields of type Fields
135       * @param valueField     of type Fields
136       * @param maxField       of type Fields
137       * @param threshold      of type int
138       */
139      @ConstructorProperties({"pipe", "groupingFields", "valueField", "maxField", "threshold"})
140      public MaxBy( Pipe pipe, Fields groupingFields, Fields valueField, Fields maxField, int threshold )
141        {
142        this( null, pipe, groupingFields, valueField, maxField, threshold );
143        }
144    
145      /**
146       * Constructor MaxBy creates a new MaxBy instance.
147       *
148       * @param name           of type String
149       * @param pipe           of type Pipe
150       * @param groupingFields of type Fields
151       * @param valueField     of type Fields
152       * @param maxField       of type Fields
153       */
154      @ConstructorProperties({"name", "pipe", "groupingFields", "valueField", "maxField"})
155      public MaxBy( String name, Pipe pipe, Fields groupingFields, Fields valueField, Fields maxField )
156        {
157        this( name, pipe, groupingFields, valueField, maxField, USE_DEFAULT_THRESHOLD );
158        }
159    
160      /**
161       * Constructor MaxBy creates a new MaxBy instance.
162       *
163       * @param name           of type String
164       * @param pipe           of type Pipe
165       * @param groupingFields of type Fields
166       * @param valueField     of type Fields
167       * @param maxField       of type Fields
168       * @param threshold      of type int
169       */
170      @ConstructorProperties({"name", "pipe", "groupingFields", "valueField", "maxField", "threshold"})
171      public MaxBy( String name, Pipe pipe, Fields groupingFields, Fields valueField, Fields maxField, int threshold )
172        {
173        this( name, Pipe.pipes( pipe ), groupingFields, valueField, maxField, threshold );
174        }
175    
176      /**
177       * Constructor MaxBy creates a new MaxBy instance.
178       *
179       * @param pipes          of type Pipe[]
180       * @param groupingFields of type Fields
181       * @param valueField     of type Fields
182       * @param maxField       of type Fields
183       */
184      @ConstructorProperties({"pipes", "groupingFields", "valueField", "maxField"})
185      public MaxBy( Pipe[] pipes, Fields groupingFields, Fields valueField, Fields maxField )
186        {
187        this( null, pipes, groupingFields, valueField, maxField, USE_DEFAULT_THRESHOLD );
188        }
189    
190      /**
191       * Constructor MaxBy creates a new MaxBy instance.
192       *
193       * @param pipes          of type Pipe[]
194       * @param groupingFields of type Fields
195       * @param valueField     of type Fields
196       * @param maxField       of type Fields
197       * @param threshold      of type int
198       */
199      @ConstructorProperties({"pipes", "groupingFields", "valueField", "maxField", "threshold"})
200      public MaxBy( Pipe[] pipes, Fields groupingFields, Fields valueField, Fields maxField, int threshold )
201        {
202        this( null, pipes, groupingFields, valueField, maxField, threshold );
203        }
204    
205      /**
206       * Constructor MaxBy creates a new MaxBy instance.
207       *
208       * @param name           of type String
209       * @param pipes          of type Pipe[]
210       * @param groupingFields of type Fields
211       * @param valueField     of type Fields
212       * @param maxField       of type Fields
213       */
214      @ConstructorProperties({"name", "pipes", "groupingFields", "valueField", "maxField"})
215      public MaxBy( String name, Pipe[] pipes, Fields groupingFields, Fields valueField, Fields maxField )
216        {
217        this( name, pipes, groupingFields, valueField, maxField, USE_DEFAULT_THRESHOLD );
218        }
219    
220      /**
221       * Constructor MaxBy creates a new MaxBy instance.
222       *
223       * @param name           of type String
224       * @param pipes          of type Pipe[]
225       * @param groupingFields of type Fields
226       * @param valueField     of type Fields
227       * @param maxField       of type Fields
228       * @param threshold      of type int
229       */
230      @ConstructorProperties({"name", "pipes", "groupingFields", "valueField", "maxField", "threshold"})
231      public MaxBy( String name, Pipe[] pipes, Fields groupingFields, Fields valueField, Fields maxField, int threshold )
232        {
233        super( name, pipes, groupingFields, valueField, new MaxPartials( maxField ), new MaxValue( maxField ), threshold );
234        }
235      }