001/*
002 * Copyright (c) 2016-2017 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
004 *
005 * Project and contact information: http://www.cascading.org/
006 *
007 * This file is part of the Cascading project.
008 *
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *     http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 */
021
022package cascading.pipe.assembly;
023
024import java.beans.ConstructorProperties;
025
026import cascading.flow.FlowProcess;
027import cascading.operation.aggregator.MinValue;
028import cascading.pipe.Pipe;
029import cascading.tuple.Fields;
030import cascading.tuple.Tuple;
031import cascading.tuple.TupleEntry;
032
033/**
034 * Class MinBy is used to find the minimum value in a grouping.
035 * <p>
036 * Typically finding the min value of a field in a tuple stream relies on a {@link cascading.pipe.GroupBy} and a
037 * {@link cascading.operation.aggregator.MinValue} {@link cascading.operation.Aggregator} operation.
038 * <p>
039 * This SubAssembly also uses the {@link cascading.pipe.assembly.MinBy.MinPartials} {@link cascading.pipe.assembly.AggregateBy.Functor}
040 * to track the minimum value before the GroupBy operator to reduce IO over the network.
041 * <p>
042 * This strategy is similar to using {@code combiners}, except no sorting or serialization is invoked and results
043 * in a much simpler mechanism.
044 * <p>
045 * The {@code threshold} value tells the underlying MinPartials functions how many unique key sums to accumulate
046 * in the LRU cache, before emitting the least recently used entry. This accumulation happens map-side, and thus is
047 * bounded by the size of your map task JVM and the typical size of each group key.
048 * <p>
049 * By default, either the value of {@link cascading.pipe.assembly.AggregateByProps#AGGREGATE_BY_CAPACITY} System property
050 * or {@link cascading.pipe.assembly.AggregateByProps#AGGREGATE_BY_DEFAULT_CAPACITY} will be used.
051 *
052 * @see cascading.pipe.assembly.AggregateBy
053 */
054public class MinBy extends AggregateBy
055  {
056  public static class MinPartials implements Functor
057    {
058    private final Fields declaredFields;
059
060    /** Constructor MinPartials creates a new MinPartials instance. */
061    public MinPartials( Fields declaredFields )
062      {
063      this.declaredFields = declaredFields;
064
065      if( declaredFields.size() != 1 )
066        throw new IllegalArgumentException( "declared fields may only have one field, got: " + declaredFields );
067      }
068
069    @Override
070    public Fields getDeclaredFields()
071      {
072      return declaredFields;
073      }
074
075    @Override
076    public Tuple aggregate( FlowProcess flowProcess, TupleEntry args, Tuple context )
077      {
078      if( context == null )
079        return args.getTupleCopy();
080      else if( args.getObject( 0 ) == null )
081        return context;
082
083      Comparable lhs = (Comparable) context.getObject( 0 );
084      Comparable rhs = (Comparable) args.getObject( 0 );
085
086      if( ( lhs == null ) || ( lhs.compareTo( rhs ) > 0 ) )
087        context.set( 0, rhs );
088
089      return context;
090      }
091
092    @Override
093    public Tuple complete( FlowProcess flowProcess, Tuple context )
094      {
095      return context;
096      }
097    }
098
099  /**
100   * Constructor MinBy creates a new MinBy instance. Use this constructor when used with a {@link cascading.pipe.assembly.AggregateBy}
101   * instance.
102   *
103   * @param valueField of type Fields
104   * @param minField   of type Fields
105   */
106  @ConstructorProperties({"valueField", "minField"})
107  public MinBy( Fields valueField, Fields minField )
108    {
109    super( valueField, new MinPartials( minField ), new MinValue( minField ) );
110    }
111
112  //////////////
113
114  /**
115   * Constructor MinBy creates a new MinBy instance.
116   *
117   * @param pipe           of type Pipe
118   * @param groupingFields of type Fields
119   * @param valueField     of type Fields
120   * @param minField       of type Fields
121   */
122  @ConstructorProperties({"pipe", "groupingFields", "valueField", "minField"})
123  public MinBy( Pipe pipe, Fields groupingFields, Fields valueField, Fields minField )
124    {
125    this( null, pipe, groupingFields, valueField, minField, 0 );
126    }
127
128  /**
129   * Constructor MinBy creates a new MinBy instance.
130   *
131   * @param pipe           of type Pipe
132   * @param groupingFields of type Fields
133   * @param valueField     of type Fields
134   * @param minField       of type Fields
135   * @param threshold      of type int
136   */
137  @ConstructorProperties({"pipe", "groupingFields", "valueField", "minField", "threshold"})
138  public MinBy( Pipe pipe, Fields groupingFields, Fields valueField, Fields minField, int threshold )
139    {
140    this( null, pipe, groupingFields, valueField, minField, threshold );
141    }
142
143  /**
144   * Constructor MinBy creates a new MinBy instance.
145   *
146   * @param name           of type String
147   * @param pipe           of type Pipe
148   * @param groupingFields of type Fields
149   * @param valueField     of type Fields
150   * @param minField       of type Fields
151   */
152  @ConstructorProperties({"name", "pipe", "groupingFields", "valueField", "minField"})
153  public MinBy( String name, Pipe pipe, Fields groupingFields, Fields valueField, Fields minField )
154    {
155    this( name, pipe, groupingFields, valueField, minField, USE_DEFAULT_THRESHOLD );
156    }
157
158  /**
159   * Constructor MinBy creates a new MinBy instance.
160   *
161   * @param name           of type String
162   * @param pipe           of type Pipe
163   * @param groupingFields of type Fields
164   * @param valueField     of type Fields
165   * @param minField       of type Fields
166   * @param threshold      of type int
167   */
168  @ConstructorProperties({"name", "pipe", "groupingFields", "valueField", "minField", "threshold"})
169  public MinBy( String name, Pipe pipe, Fields groupingFields, Fields valueField, Fields minField, int threshold )
170    {
171    this( name, Pipe.pipes( pipe ), groupingFields, valueField, minField, threshold );
172    }
173
174  /**
175   * Constructor MinBy creates a new MinBy instance.
176   *
177   * @param pipes          of type Pipe[]
178   * @param groupingFields of type Fields
179   * @param valueField     of type Fields
180   * @param minField       of type Fields
181   */
182  @ConstructorProperties({"pipes", "groupingFields", "valueField", "minField"})
183  public MinBy( Pipe[] pipes, Fields groupingFields, Fields valueField, Fields minField )
184    {
185    this( null, pipes, groupingFields, valueField, minField, USE_DEFAULT_THRESHOLD );
186    }
187
188  /**
189   * Constructor MinBy creates a new MinBy instance.
190   *
191   * @param pipes          of type Pipe[]
192   * @param groupingFields of type Fields
193   * @param valueField     of type Fields
194   * @param minField       of type Fields
195   * @param threshold      of type int
196   */
197  @ConstructorProperties({"pipes", "groupingFields", "valueField", "minField", "threshold"})
198  public MinBy( Pipe[] pipes, Fields groupingFields, Fields valueField, Fields minField, int threshold )
199    {
200    this( null, pipes, groupingFields, valueField, minField, threshold );
201    }
202
203  /**
204   * Constructor MinBy creates a new MinBy instance.
205   *
206   * @param name           of type String
207   * @param pipes          of type Pipe[]
208   * @param groupingFields of type Fields
209   * @param valueField     of type Fields
210   * @param minField       of type Fields
211   */
212  @ConstructorProperties({"name", "pipes", "groupingFields", "valueField", "minField"})
213  public MinBy( String name, Pipe[] pipes, Fields groupingFields, Fields valueField, Fields minField )
214    {
215    this( name, pipes, groupingFields, valueField, minField, USE_DEFAULT_THRESHOLD );
216    }
217
218  /**
219   * Constructor MinBy creates a new MinBy instance.
220   *
221   * @param name           of type String
222   * @param pipes          of type Pipe[]
223   * @param groupingFields of type Fields
224   * @param valueField     of type Fields
225   * @param minField       of type Fields
226   * @param threshold      of type int
227   */
228  @ConstructorProperties({"name", "pipes", "groupingFields", "valueField", "minField", "threshold"})
229  public MinBy( String name, Pipe[] pipes, Fields groupingFields, Fields valueField, Fields minField, int threshold )
230    {
231    super( name, pipes, groupingFields, valueField, new MinPartials( minField ), new MinValue( minField ), threshold );
232    }
233  }