001/*
002 * Copyright (c) 2016-2017 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
004 *
005 * Project and contact information: http://www.cascading.org/
006 *
007 * This file is part of the Cascading project.
008 *
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *     http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 */
021
022package cascading.pipe.assembly;
023
024import java.beans.ConstructorProperties;
025
026import cascading.flow.FlowProcess;
027import cascading.operation.aggregator.MaxValue;
028import cascading.pipe.Pipe;
029import cascading.tuple.Fields;
030import cascading.tuple.Tuple;
031import cascading.tuple.TupleEntry;
032
033/**
034 * Class MaxBy is used to find the maximum value in a grouping.
035 * <p>
036 * Typically finding the max value of a field in a tuple stream relies on a {@link cascading.pipe.GroupBy} and a
037 * {@link cascading.operation.aggregator.MaxValue} {@link cascading.operation.Aggregator} operation.
038 * <p>
039 * This SubAssembly also uses the {@link cascading.pipe.assembly.MaxBy.MaxPartials} {@link cascading.pipe.assembly.AggregateBy.Functor}
040 * to track the maximum value before the GroupBy operator to reduce IO over the network.
041 * <p>
042 * This strategy is similar to using {@code combiners}, except no sorting or serialization is invoked and results
043 * in a much simpler mechanism.
044 * <p>
045 * The {@code threshold} value tells the underlying MaxPartials functions how many unique key sums to accumulate
046 * in the LRU cache, before emitting the least recently used entry. This accumulation happens map-side, and thus is
047 * bounded by the size of your map task JVM and the typical size of each group key.
048 * <p>
049 * By default, either the value of {@link cascading.pipe.assembly.AggregateByProps#AGGREGATE_BY_CAPACITY} System property
050 * or {@link cascading.pipe.assembly.AggregateByProps#AGGREGATE_BY_DEFAULT_CAPACITY} will be used.
051 *
052 * @see AggregateBy
053 */
054public class MaxBy extends AggregateBy
055  {
056  /** DEFAULT_THRESHOLD */
057  @Deprecated
058  public static final int DEFAULT_THRESHOLD = 10000;
059
060  public static class MaxPartials implements Functor
061    {
062    private final Fields declaredFields;
063
064    public MaxPartials( Fields declaredFields )
065      {
066      this.declaredFields = declaredFields;
067
068      if( declaredFields.size() != 1 )
069        throw new IllegalArgumentException( "declared fields may only have one field, got: " + declaredFields );
070      }
071
072    @Override
073    public Fields getDeclaredFields()
074      {
075      return declaredFields;
076      }
077
078    @Override
079    public Tuple aggregate( FlowProcess flowProcess, TupleEntry args, Tuple context )
080      {
081      if( context == null )
082        return args.getTupleCopy();
083      else if( args.getObject( 0 ) == null )
084        return context;
085
086      Comparable lhs = (Comparable) context.getObject( 0 );
087      Comparable rhs = (Comparable) args.getObject( 0 );
088
089      if( ( lhs == null ) || ( lhs.compareTo( rhs ) < 0 ) )
090        context.set( 0, rhs );
091
092      return context;
093      }
094
095    @Override
096    public Tuple complete( FlowProcess flowProcess, Tuple context )
097      {
098      return context;
099      }
100    }
101
102  /**
103   * Constructor MaxBy creates a new MaxBy instance. Use this constructor when used with a {@link AggregateBy}
104   * instance.
105   *
106   * @param valueField of type Fields
107   * @param maxField   of type Fields
108   */
109  @ConstructorProperties({"valueField", "maxField"})
110  public MaxBy( Fields valueField, Fields maxField )
111    {
112    super( valueField, new MaxPartials( maxField ), new MaxValue( maxField ) );
113    }
114
115  //////////////
116
117  /**
118   * Constructor MaxBy creates a new MaxBy instance.
119   *
120   * @param pipe           of type Pipe
121   * @param groupingFields of type Fields
122   * @param valueField     of type Fields
123   * @param maxField       of type Fields
124   */
125  @ConstructorProperties({"pipe", "groupingFields", "valueField", "maxField"})
126  public MaxBy( Pipe pipe, Fields groupingFields, Fields valueField, Fields maxField )
127    {
128    this( null, pipe, groupingFields, valueField, maxField, USE_DEFAULT_THRESHOLD );
129    }
130
131  /**
132   * Constructor MaxBy creates a new MaxBy instance.
133   *
134   * @param pipe           of type Pipe
135   * @param groupingFields of type Fields
136   * @param valueField     of type Fields
137   * @param maxField       of type Fields
138   * @param threshold      of type int
139   */
140  @ConstructorProperties({"pipe", "groupingFields", "valueField", "maxField", "threshold"})
141  public MaxBy( Pipe pipe, Fields groupingFields, Fields valueField, Fields maxField, int threshold )
142    {
143    this( null, pipe, groupingFields, valueField, maxField, threshold );
144    }
145
146  /**
147   * Constructor MaxBy creates a new MaxBy instance.
148   *
149   * @param name           of type String
150   * @param pipe           of type Pipe
151   * @param groupingFields of type Fields
152   * @param valueField     of type Fields
153   * @param maxField       of type Fields
154   */
155  @ConstructorProperties({"name", "pipe", "groupingFields", "valueField", "maxField"})
156  public MaxBy( String name, Pipe pipe, Fields groupingFields, Fields valueField, Fields maxField )
157    {
158    this( name, pipe, groupingFields, valueField, maxField, USE_DEFAULT_THRESHOLD );
159    }
160
161  /**
162   * Constructor MaxBy creates a new MaxBy instance.
163   *
164   * @param name           of type String
165   * @param pipe           of type Pipe
166   * @param groupingFields of type Fields
167   * @param valueField     of type Fields
168   * @param maxField       of type Fields
169   * @param threshold      of type int
170   */
171  @ConstructorProperties({"name", "pipe", "groupingFields", "valueField", "maxField", "threshold"})
172  public MaxBy( String name, Pipe pipe, Fields groupingFields, Fields valueField, Fields maxField, int threshold )
173    {
174    this( name, Pipe.pipes( pipe ), groupingFields, valueField, maxField, threshold );
175    }
176
177  /**
178   * Constructor MaxBy creates a new MaxBy instance.
179   *
180   * @param pipes          of type Pipe[]
181   * @param groupingFields of type Fields
182   * @param valueField     of type Fields
183   * @param maxField       of type Fields
184   */
185  @ConstructorProperties({"pipes", "groupingFields", "valueField", "maxField"})
186  public MaxBy( Pipe[] pipes, Fields groupingFields, Fields valueField, Fields maxField )
187    {
188    this( null, pipes, groupingFields, valueField, maxField, USE_DEFAULT_THRESHOLD );
189    }
190
191  /**
192   * Constructor MaxBy creates a new MaxBy instance.
193   *
194   * @param pipes          of type Pipe[]
195   * @param groupingFields of type Fields
196   * @param valueField     of type Fields
197   * @param maxField       of type Fields
198   * @param threshold      of type int
199   */
200  @ConstructorProperties({"pipes", "groupingFields", "valueField", "maxField", "threshold"})
201  public MaxBy( Pipe[] pipes, Fields groupingFields, Fields valueField, Fields maxField, int threshold )
202    {
203    this( null, pipes, groupingFields, valueField, maxField, threshold );
204    }
205
206  /**
207   * Constructor MaxBy creates a new MaxBy instance.
208   *
209   * @param name           of type String
210   * @param pipes          of type Pipe[]
211   * @param groupingFields of type Fields
212   * @param valueField     of type Fields
213   * @param maxField       of type Fields
214   */
215  @ConstructorProperties({"name", "pipes", "groupingFields", "valueField", "maxField"})
216  public MaxBy( String name, Pipe[] pipes, Fields groupingFields, Fields valueField, Fields maxField )
217    {
218    this( name, pipes, groupingFields, valueField, maxField, USE_DEFAULT_THRESHOLD );
219    }
220
221  /**
222   * Constructor MaxBy creates a new MaxBy instance.
223   *
224   * @param name           of type String
225   * @param pipes          of type Pipe[]
226   * @param groupingFields of type Fields
227   * @param valueField     of type Fields
228   * @param maxField       of type Fields
229   * @param threshold      of type int
230   */
231  @ConstructorProperties({"name", "pipes", "groupingFields", "valueField", "maxField", "threshold"})
232  public MaxBy( String name, Pipe[] pipes, Fields groupingFields, Fields valueField, Fields maxField, int threshold )
233    {
234    super( name, pipes, groupingFields, valueField, new MaxPartials( maxField ), new MaxValue( maxField ), threshold );
235    }
236  }