001    /*
002     * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.pipe.assembly;
022    
023    import java.beans.ConstructorProperties;
024    
025    import cascading.flow.FlowProcess;
026    import cascading.operation.aggregator.MinValue;
027    import cascading.pipe.Pipe;
028    import cascading.tuple.Fields;
029    import cascading.tuple.Tuple;
030    import cascading.tuple.TupleEntry;
031    
032    /**
033     * Class MinBy is used to find the minimum value in a grouping.
034     * <p/>
035     * Typically finding the min value of a field in a tuple stream relies on a {@link cascading.pipe.GroupBy} and a
036     * {@link cascading.operation.aggregator.MinValue} {@link cascading.operation.Aggregator} operation.
037     * <p/>
038     * This SubAssembly also uses the {@link cascading.pipe.assembly.MinBy.MinPartials} {@link cascading.pipe.assembly.AggregateBy.Functor}
039     * to track the minimum value before the GroupBy operator to reduce IO over the network.
040     * <p/>
041     * This strategy is similar to using {@code combiners}, except no sorting or serialization is invoked and results
042     * in a much simpler mechanism.
043     * <p/>
044     * The {@code threshold} value tells the underlying MinPartials functions how many unique key sums to accumulate
045     * in the LRU cache, before emitting the least recently used entry. This accumulation happens map-side, and thus is
046     * bounded by the size of your map task JVM and the typical size of each group key.
047     * <p/>
048     * By default, either the value of {@link cascading.pipe.assembly.AggregateByProps#AGGREGATE_BY_CAPACITY} System property
049     * or {@link cascading.pipe.assembly.AggregateByProps#AGGREGATE_BY_DEFAULT_CAPACITY} will be used.
050     *
051     * @see cascading.pipe.assembly.AggregateBy
052     */
053    public class MinBy extends AggregateBy
054      {
055      /** DEFAULT_THRESHOLD */
056      @Deprecated
057      public static final int DEFAULT_THRESHOLD = 10000;
058    
059      public static class MinPartials implements Functor
060        {
061        private final Fields declaredFields;
062    
063        /** Constructor MinPartials creates a new MinPartials instance. */
064        public MinPartials( Fields declaredFields )
065          {
066          this.declaredFields = declaredFields;
067    
068          if( declaredFields.size() != 1 )
069            throw new IllegalArgumentException( "declared fields may only have one field, got: " + declaredFields );
070          }
071    
072        @Override
073        public Fields getDeclaredFields()
074          {
075          return declaredFields;
076          }
077    
078        @Override
079        public Tuple aggregate( FlowProcess flowProcess, TupleEntry args, Tuple context )
080          {
081          if( context == null )
082            return args.getTupleCopy();
083          else if( args.getObject( 0 ) == null )
084            return context;
085    
086          Comparable lhs = (Comparable) context.getObject( 0 );
087          Comparable rhs = (Comparable) args.getObject( 0 );
088    
089          if( ( lhs == null ) || ( lhs.compareTo( rhs ) > 0 ) )
090            context.set( 0, rhs );
091    
092          return context;
093          }
094    
095        @Override
096        public Tuple complete( FlowProcess flowProcess, Tuple context )
097          {
098          return context;
099          }
100        }
101    
102      /**
103       * Constructor MinBy creates a new MinBy instance. Use this constructor when used with a {@link cascading.pipe.assembly.AggregateBy}
104       * instance.
105       *
106       * @param valueField of type Fields
107       * @param minField   of type Fields
108       */
109      @ConstructorProperties({"valueField", "minField"})
110      public MinBy( Fields valueField, Fields minField )
111        {
112        super( valueField, new MinPartials( minField ), new MinValue( minField ) );
113        }
114    
115      //////////////
116    
117      /**
118       * Constructor MinBy creates a new MinBy instance.
119       *
120       * @param pipe           of type Pipe
121       * @param groupingFields of type Fields
122       * @param valueField     of type Fields
123       * @param minField       of type Fields
124       */
125      @ConstructorProperties({"pipe", "groupingFields", "valueField", "minField"})
126      public MinBy( Pipe pipe, Fields groupingFields, Fields valueField, Fields minField )
127        {
128        this( null, pipe, groupingFields, valueField, minField, 0 );
129        }
130    
131      /**
132       * Constructor MinBy creates a new MinBy instance.
133       *
134       * @param pipe           of type Pipe
135       * @param groupingFields of type Fields
136       * @param valueField     of type Fields
137       * @param minField       of type Fields
138       * @param threshold      of type int
139       */
140      @ConstructorProperties({"pipe", "groupingFields", "valueField", "minField", "threshold"})
141      public MinBy( Pipe pipe, Fields groupingFields, Fields valueField, Fields minField, int threshold )
142        {
143        this( null, pipe, groupingFields, valueField, minField, threshold );
144        }
145    
146      /**
147       * Constructor MinBy creates a new MinBy instance.
148       *
149       * @param name           of type String
150       * @param pipe           of type Pipe
151       * @param groupingFields of type Fields
152       * @param valueField     of type Fields
153       * @param minField       of type Fields
154       */
155      @ConstructorProperties({"name", "pipe", "groupingFields", "valueField", "minField"})
156      public MinBy( String name, Pipe pipe, Fields groupingFields, Fields valueField, Fields minField )
157        {
158        this( name, pipe, groupingFields, valueField, minField, USE_DEFAULT_THRESHOLD );
159        }
160    
161      /**
162       * Constructor MinBy creates a new MinBy instance.
163       *
164       * @param name           of type String
165       * @param pipe           of type Pipe
166       * @param groupingFields of type Fields
167       * @param valueField     of type Fields
168       * @param minField       of type Fields
169       * @param threshold      of type int
170       */
171      @ConstructorProperties({"name", "pipe", "groupingFields", "valueField", "minField", "threshold"})
172      public MinBy( String name, Pipe pipe, Fields groupingFields, Fields valueField, Fields minField, int threshold )
173        {
174        this( name, Pipe.pipes( pipe ), groupingFields, valueField, minField, threshold );
175        }
176    
177      /**
178       * Constructor MinBy creates a new MinBy instance.
179       *
180       * @param pipes          of type Pipe[]
181       * @param groupingFields of type Fields
182       * @param valueField     of type Fields
183       * @param minField       of type Fields
184       */
185      @ConstructorProperties({"pipes", "groupingFields", "valueField", "minField"})
186      public MinBy( Pipe[] pipes, Fields groupingFields, Fields valueField, Fields minField )
187        {
188        this( null, pipes, groupingFields, valueField, minField, USE_DEFAULT_THRESHOLD );
189        }
190    
191      /**
192       * Constructor MinBy creates a new MinBy instance.
193       *
194       * @param pipes          of type Pipe[]
195       * @param groupingFields of type Fields
196       * @param valueField     of type Fields
197       * @param minField       of type Fields
198       * @param threshold      of type int
199       */
200      @ConstructorProperties({"pipes", "groupingFields", "valueField", "minField", "threshold"})
201      public MinBy( Pipe[] pipes, Fields groupingFields, Fields valueField, Fields minField, int threshold )
202        {
203        this( null, pipes, groupingFields, valueField, minField, threshold );
204        }
205    
206      /**
207       * Constructor MinBy creates a new MinBy instance.
208       *
209       * @param name           of type String
210       * @param pipes          of type Pipe[]
211       * @param groupingFields of type Fields
212       * @param valueField     of type Fields
213       * @param minField       of type Fields
214       */
215      @ConstructorProperties({"name", "pipes", "groupingFields", "valueField", "minField"})
216      public MinBy( String name, Pipe[] pipes, Fields groupingFields, Fields valueField, Fields minField )
217        {
218        this( name, pipes, groupingFields, valueField, minField, USE_DEFAULT_THRESHOLD );
219        }
220    
221      /**
222       * Constructor MinBy creates a new MinBy instance.
223       *
224       * @param name           of type String
225       * @param pipes          of type Pipe[]
226       * @param groupingFields of type Fields
227       * @param valueField     of type Fields
228       * @param minField       of type Fields
229       * @param threshold      of type int
230       */
231      @ConstructorProperties({"name", "pipes", "groupingFields", "valueField", "minField", "threshold"})
232      public MinBy( String name, Pipe[] pipes, Fields groupingFields, Fields valueField, Fields minField, int threshold )
233        {
234        super( name, pipes, groupingFields, valueField, new MinPartials( minField ), new MinValue( minField ), threshold );
235        }
236      }