001/*
002 * Copyright (c) 2016-2017 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
004 *
005 * Project and contact information: http://www.cascading.org/
006 *
007 * This file is part of the Cascading project.
008 *
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *     http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 */
021
022package cascading.pipe.assembly;
023
024import java.beans.ConstructorProperties;
025
026import cascading.flow.FlowProcess;
027import cascading.operation.aggregator.First;
028import cascading.pipe.Pipe;
029import cascading.tuple.Fields;
030import cascading.tuple.Tuple;
031import cascading.tuple.TupleEntry;
032
033/**
034 * Class FirstBy is used to return the first encountered Tuple in a tuple stream grouping.
035 * <p>
036 * Typically finding the first Tuple in a tuple stream grouping relies on a {@link cascading.pipe.GroupBy} and a
037 * {@link cascading.operation.aggregator.First} {@link cascading.operation.Aggregator} operation.
038 * <p>
039 * If the {@code firstFields} argument has custom {@link java.util.Comparator} instances, they will be used
040 * as the GroupBy {@code sortFields}.
041 * <p>
042 * This SubAssembly also uses the {@link cascading.pipe.assembly.FirstBy.FirstPartials}
043 * {@link cascading.pipe.assembly.AggregateBy.Functor}
044 * to collect field values before the GroupBy operator to reduce IO over the network.
045 * <p>
046 * This strategy is similar to using {@code combiners}, except no sorting or serialization is invoked and results
047 * in a much simpler mechanism.
048 * <p>
049 * The {@code threshold} value tells the underlying FirstPartials functions how many unique key counts to accumulate
050 * in the LRU cache, before emitting the least recently used entry.  This accumulation happens map-side, and thus is
051 * bounded by the size of your map task JVM and the typical size of each group key.
052 * <p>
053 * By default, either the value of {@link cascading.pipe.assembly.AggregateByProps#AGGREGATE_BY_CAPACITY} System property
054 * or {@link cascading.pipe.assembly.AggregateByProps#AGGREGATE_BY_DEFAULT_CAPACITY} will be used.
055 *
056 * @see AggregateBy
057 */
058public class FirstBy extends AggregateBy
059  {
060  /**
061   * Class CountPartials is a {@link cascading.pipe.assembly.AggregateBy.Functor} that is used to count observed duplicates from the tuple stream.
062   * <p>
063   * Use this class typically in tandem with a {@link cascading.operation.aggregator.Sum}
064   * {@link cascading.operation.Aggregator} in order to improve counting performance by removing as many values
065   * as possible before the intermediate {@link cascading.pipe.GroupBy} operator.
066   *
067   * @see cascading.pipe.assembly.FirstBy
068   */
069  public static class FirstPartials implements Functor
070    {
071    private final Fields declaredFields;
072    private Boolean doComparison;
073
074    /**
075     * Constructor FirstPartials creates a new FirstPartials instance.
076     *
077     * @param declaredFields of type Fields
078     */
079    public FirstPartials( Fields declaredFields )
080      {
081      this.declaredFields = declaredFields;
082      }
083
084    @Override
085    public Fields getDeclaredFields()
086      {
087      return declaredFields;
088      }
089
090    @Override
091    public Tuple aggregate( FlowProcess flowProcess, TupleEntry args, Tuple context )
092      {
093      if( doComparison == null )
094        doComparison = args.getFields().hasComparators(); // ensure we use resolved fields
095
096      if( context == null || ( doComparison && args.getFields().compare( context, args.getTuple() ) > 0 ) )
097        return args.getTupleCopy();
098
099      return context;
100      }
101
102    @Override
103    public Tuple complete( FlowProcess flowProcess, Tuple context )
104      {
105      return context;
106      }
107    }
108
109  /**
110   * Constructor FirstBy creates a new FirstBy instance. Use this constructor when used with a {@link AggregateBy}
111   * instance.
112   *
113   * @param firstFields of type Fields
114   */
115  @ConstructorProperties({"firstFields"})
116  public FirstBy( Fields firstFields )
117    {
118    super( firstFields, new FirstPartials( firstFields ), new First( firstFields ) );
119    }
120
121  /**
122   * Constructor FirstBy creates a new FirstBy instance. Use this constructor when used with a {@link AggregateBy}
123   * instance.
124   *
125   * @param firstFields of type Fields
126   */
127  @ConstructorProperties({"argumentFields", "firstFields"})
128  public FirstBy( Fields argumentFields, Fields firstFields )
129    {
130    super( argumentFields, new FirstPartials( argumentFields ), new First( firstFields ) );
131    }
132
133  ///////
134
135  /**
136   * Constructor FirstBy creates a new FirstBy instance.
137   *
138   * @param pipe           of type Pipe
139   * @param groupingFields of type Fields
140   * @param firstFields    of type Fields
141   */
142  @ConstructorProperties({"pipe", "groupingFields", "firstFields"})
143  public FirstBy( Pipe pipe, Fields groupingFields, Fields firstFields )
144    {
145    this( null, pipe, groupingFields, firstFields );
146    }
147
148  /**
149   * Constructor FirstBy creates a new FirstBy instance.
150   *
151   * @param pipe           of type Pipe
152   * @param groupingFields of type Fields
153   * @param firstFields    fo type Fields
154   * @param threshold      of type int
155   */
156  @ConstructorProperties({"pipe", "groupingFields", "firstFields", "threshold"})
157  public FirstBy( Pipe pipe, Fields groupingFields, Fields firstFields, int threshold )
158    {
159    this( null, pipe, groupingFields, firstFields, threshold );
160    }
161
162  /**
163   * Constructor FirstBy creates a new FirstBy instance.
164   *
165   * @param name           of type String
166   * @param pipe           of type Pipe
167   * @param groupingFields of type Fields
168   * @param firstFields    of type Fields
169   */
170  @ConstructorProperties({"name", "pipe", "groupingFields", "firstFields"})
171  public FirstBy( String name, Pipe pipe, Fields groupingFields, Fields firstFields )
172    {
173    this( name, pipe, groupingFields, firstFields, USE_DEFAULT_THRESHOLD );
174    }
175
176  /**
177   * Constructor FirstBy creates a new FirstBy instance.
178   *
179   * @param name           of type String
180   * @param pipe           of type Pipe
181   * @param groupingFields of type Fields
182   * @param firstFields    of type Fields
183   * @param threshold      of type int
184   */
185  @ConstructorProperties({"name", "pipe", "groupingFields", "firstFields", "threshold"})
186  public FirstBy( String name, Pipe pipe, Fields groupingFields, Fields firstFields, int threshold )
187    {
188    this( name, Pipe.pipes( pipe ), groupingFields, firstFields, threshold );
189    }
190
191  /**
192   * Constructor FirstBy creates a new FirstBy instance.
193   *
194   * @param pipes          of type Pipe[]
195   * @param groupingFields of type Fields
196   * @param firstFields    of type Fields
197   */
198  @ConstructorProperties({"pipes", "groupingFields", "firstFields"})
199  public FirstBy( Pipe[] pipes, Fields groupingFields, Fields firstFields )
200    {
201    this( null, pipes, groupingFields, firstFields, USE_DEFAULT_THRESHOLD );
202    }
203
204  /**
205   * Constructor FirstBy creates a new FirstBy instance.
206   *
207   * @param pipes          of type Pipe[]
208   * @param groupingFields of type Fields
209   * @param firstFields    of type Fields
210   * @param threshold      of type int
211   */
212  @ConstructorProperties({"pipes", "groupingFields", "firstFields", "threshold"})
213  public FirstBy( Pipe[] pipes, Fields groupingFields, Fields firstFields, int threshold )
214    {
215    this( null, pipes, groupingFields, firstFields, threshold );
216    }
217
218  /**
219   * Constructor FirstBy creates a new FirstBy instance.
220   *
221   * @param name           of type String
222   * @param pipes          of type Pipe[]
223   * @param groupingFields of type Fields
224   * @param firstFields    of type Fields
225   */
226  @ConstructorProperties({"name", "pipes", "groupingFields", "firstFields"})
227  public FirstBy( String name, Pipe[] pipes, Fields groupingFields, Fields firstFields )
228    {
229    this( name, pipes, groupingFields, firstFields, USE_DEFAULT_THRESHOLD );
230    }
231
232  /**
233   * Constructor FirstBy creates a new FirstBy instance.
234   *
235   * @param name           of type String
236   * @param pipes          of type Pipe[]
237   * @param groupingFields of type Fields
238   * @param firstFields    of type Fields
239   * @param threshold      of type int
240   */
241  @ConstructorProperties({"name", "pipes", "groupingFields", "firstFields", "threshold"})
242  public FirstBy( String name, Pipe[] pipes, Fields groupingFields, Fields firstFields, int threshold )
243    {
244    super( name, pipes, groupingFields, firstFields, new FirstPartials( firstFields ), new First( firstFields ), threshold );
245    }
246  }