001/*
002 * Copyright (c) 2016-2017 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
004 *
005 * Project and contact information: http://www.cascading.org/
006 *
007 * This file is part of the Cascading project.
008 *
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *     http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 */
021
022package cascading.pipe;
023
024import java.beans.ConstructorProperties;
025
026import cascading.operation.Aggregator;
027import cascading.operation.Filter;
028import cascading.operation.Function;
029import cascading.tuple.Fields;
030import cascading.tuple.Tuple;
031
032/**
033 * The GroupBy pipe groups the {@link Tuple} stream by the given groupFields.
034 * <p>
035 * If more than one {@link Pipe} instance is provided on the constructor, all branches will be merged. It is required
036 * that all Pipe instances output the same field names, otherwise the {@link cascading.flow.FlowConnector} will fail to create a
037 * {@link cascading.flow.Flow} instance. Again, the Pipe instances are merged together as if one Tuple stream and not joined.
038 * See {@link CoGroup} for joining by common fields.
039 * <p>
040 * Typically an {@link Every} follows GroupBy to apply an {@link Aggregator} function to every grouping. The
041 * {@link Each} operator may also follow GroupBy to apply a {@link Function} or {@link Filter} to the resulting
042 * stream. But an Each cannot come immediately before an Every.
043 * <p>
044 * Optionally a stream can be further sorted by providing sortFields. This allows an Aggregator to receive
045 * values in the order of the sortedFields.
046 * <p>
047 * Note that local sorting always happens on the groupFields, sortFields are a secondary sorting on the grouped values within the
048 * current grouping. sortFields is particularly useful if the Aggregators following the GroupBy would like to see their arguments
049 * in order.
050 * <p>
051 * For more control over sorting at the group or secondary sort level, use {@link cascading.tuple.Fields}
052 * containing {@link java.util.Comparator} instances for the appropriate fields when setting the groupFields or
053 * sortFields values. Fields allows you to set a custom {@link java.util.Comparator} instance for each field name or
054 * position. It is required that each Comparator class also be {@link java.io.Serializable}.
055 * <p>
056 * It should be noted for MapReduce systems, distributed group sorting is not 'total'. That is groups are sorted
057 * as seen by each Reducer, but they are not sorted across Reducers. See the MapReduce algorithm for details.
058 * <p>
059 * See the {@link cascading.tuple.Hasher} interface when a custom {@link java.util.Comparator} on the grouping keys is
060 * being provided that makes two values with differing hashCode values equal. For example,
061 * {@code new BigDecimal( 100.0D )} and {@code new Double 100.0D )} are equal using a custom Comparator, but
062 * {@link Object#hashCode()} will be different, thus forcing each value into differing partitions.
063 * <p>
064 * Note that grouping one String key with a lowercase value with another String key with an uppercase value using a
065 * "case insensitive" Comparator will not have consistent results. The grouping will execute and be correct,
066 * but the actual values in the key columns may be replaced with "equivalent" values from other streams.
067 * <p>
068 * That is, if two streams are merged and then grouped on a key, where one stream the key values are uppercase and the
069 * other stream values are lowercase, the resulting key value for the grouping may arbitrarily be either upper or
070 * lower case.
071 * <p>
072 * If the original key values must be retained, consider normalizing the keys with a Function and then grouping on the
073 * resulting field.
074 */
075public class GroupBy extends Splice implements Group
076  {
077  /**
078   * Creates a new GroupBy instance that will group on {@link Fields#ALL} fields.
079   *
080   * @param pipe of type Pipe
081   */
082  @ConstructorProperties({"pipe"})
083  public GroupBy( Pipe pipe )
084    {
085    super( pipe );
086    }
087
088  /**
089   * Creates a new GroupBy instance that will group on the given groupFields field names.
090   *
091   * @param pipe        of type Pipe
092   * @param groupFields of type Fields
093   */
094  @ConstructorProperties({"pipe", "groupFields"})
095  public GroupBy( Pipe pipe, Fields groupFields )
096    {
097    super( pipe, groupFields );
098    }
099
100  /**
101   * Creates a new GroupBy instance that will group on the given groupFields field names.
102   *
103   * @param pipe         of type Pipe
104   * @param groupFields  of type Fields
105   * @param reverseOrder of type boolean
106   */
107  @ConstructorProperties({"pipe", "groupFields", "reverseOrder"})
108  public GroupBy( Pipe pipe, Fields groupFields, boolean reverseOrder )
109    {
110    super( pipe, groupFields, null, reverseOrder );
111    }
112
113  /**
114   * Creates a new GroupBy instance that will group on the given groupFields field names.
115   *
116   * @param groupName   of type String
117   * @param pipe        of type Pipe
118   * @param groupFields of type Fields
119   */
120  @ConstructorProperties({"groupName", "pipe", "groupFields"})
121  public GroupBy( String groupName, Pipe pipe, Fields groupFields )
122    {
123    super( groupName, pipe, groupFields );
124    }
125
126  /**
127   * Creates a new GroupBy instance that will group on the given groupFields field names.
128   *
129   * @param groupName    of type String
130   * @param pipe         of type Pipe
131   * @param groupFields  of type Fields
132   * @param reverseOrder of type boolean
133   */
134  @ConstructorProperties({"groupName", "pipe", "groupFields", "reverseOrder"})
135  public GroupBy( String groupName, Pipe pipe, Fields groupFields, boolean reverseOrder )
136    {
137    super( groupName, pipe, groupFields, null, reverseOrder );
138    }
139
140  /**
141   * Creates a new GroupBy instance that will group on the given groupFields field names
142   * and sorts the grouped values on the given sortFields fields names.
143   *
144   * @param pipe        of type Pipe
145   * @param groupFields of type Fields
146   * @param sortFields  of type Fields
147   */
148  @ConstructorProperties({"pipe", "groupFields", "sortFields"})
149  public GroupBy( Pipe pipe, Fields groupFields, Fields sortFields )
150    {
151    super( pipe, groupFields, sortFields );
152    }
153
154  /**
155   * Creates a new GroupBy instance that will group on the given groupFields field names
156   * and sorts the grouped values on the given sortFields fields names.
157   *
158   * @param groupName   of type String
159   * @param pipe        of type Pipe
160   * @param groupFields of type Fields
161   * @param sortFields  of type Fields
162   */
163  @ConstructorProperties({"groupName", "pipe", "groupFields", "sortFields"})
164  public GroupBy( String groupName, Pipe pipe, Fields groupFields, Fields sortFields )
165    {
166    super( groupName, pipe, groupFields, sortFields );
167    }
168
169  /**
170   * Creates a new GroupBy instance that will group on the given groupFields field names
171   * and sorts the grouped values on the given sortFields fields names.
172   *
173   * @param pipe         of type Pipe
174   * @param groupFields  of type Fields
175   * @param sortFields   of type Fields
176   * @param reverseOrder of type boolean
177   */
178  @ConstructorProperties({"pipe", "groupFields", "sortFields", "reverseOrder"})
179  public GroupBy( Pipe pipe, Fields groupFields, Fields sortFields, boolean reverseOrder )
180    {
181    super( pipe, groupFields, sortFields, reverseOrder );
182    }
183
184  /**
185   * Creates a new GroupBy instance that will group on the given groupFields field names
186   * and sorts the grouped values on the given sortFields fields names.
187   *
188   * @param groupName    of type String
189   * @param pipe         of type Pipe
190   * @param groupFields  of type Fields
191   * @param sortFields   of type Fields
192   * @param reverseOrder of type boolean
193   */
194  @ConstructorProperties({"groupName", "pipe", "groupFields", "sortFields", "reverseOrder"})
195  public GroupBy( String groupName, Pipe pipe, Fields groupFields, Fields sortFields, boolean reverseOrder )
196    {
197    super( groupName, pipe, groupFields, sortFields, reverseOrder );
198    }
199
200  //////////
201  // MERGE
202  //////////
203
204  /**
205   * Creates a new GroupBy instance that will first merge the given pipes, then group on Fields.FIRST.
206   * <p>
207   * The assumption is that the first fields in all streams are logically the same field, which should be true
208   * as merging assumes all incoming streams have the same fields in the same order.
209   * <p>
210   * To get the best performance, choose a field(s) that has many unique values, by using the constructor that takes
211   * a groupFields argument. If the first field has few unique values, data will only be sent to that number of reducers,
212   * or less, in the cluster, making the reduce phase a larger bottleneck.
213   *
214   * @param pipes of type Pipe
215   */
216  @ConstructorProperties({"pipes"})
217  public GroupBy( Pipe[] pipes )
218    {
219    super( pipes, Fields.FIRST );
220    }
221
222  /**
223   * Creates a new GroupBy instance that will first merge the given pipes, then group on the given groupFields field names.
224   *
225   * @param pipes       of type Pipe
226   * @param groupFields of type Fields
227   */
228  @ConstructorProperties({"pipes", "groupFields"})
229  public GroupBy( Pipe[] pipes, Fields groupFields )
230    {
231    super( pipes, groupFields );
232    }
233
234  /**
235   * Creates a new GroupBy instance that will first merge the given pipes, then group on the given groupFields field names.
236   *
237   * @param lhsPipe     of type Pipe
238   * @param rhsPipe     of type Pipe
239   * @param groupFields of type Fields
240   */
241  public GroupBy( Pipe lhsPipe, Pipe rhsPipe, Fields groupFields )
242    {
243    super( Pipe.pipes( lhsPipe, rhsPipe ), groupFields );
244    }
245
246  /**
247   * Creates a new GroupBy instance that will first merge the given pipes, then group on the given groupFields field names.
248   *
249   * @param groupName   of type String
250   * @param pipes       of type Pipe
251   * @param groupFields of type Fields
252   */
253  @ConstructorProperties({"groupName", "pipes", "groupFields"})
254  public GroupBy( String groupName, Pipe[] pipes, Fields groupFields )
255    {
256    super( groupName, pipes, groupFields );
257    }
258
259  /**
260   * Creates a new GroupBy instance that will first merge the given pipes, then group on the given groupFields field names.
261   *
262   * @param groupName   of type String
263   * @param lhsPipe     of type Pipe
264   * @param rhsPipe     of type Pipe
265   * @param groupFields of type Fields
266   */
267  public GroupBy( String groupName, Pipe lhsPipe, Pipe rhsPipe, Fields groupFields )
268    {
269    super( groupName, Pipe.pipes( lhsPipe, rhsPipe ), groupFields );
270    }
271
272  /**
273   * Creates a new GroupBy instance that will first merge the given pipes, then group on the given groupFields field names
274   * and sorts the grouped values on the given sortFields fields names.
275   *
276   * @param pipes       of type Pipe
277   * @param groupFields of type Fields
278   * @param sortFields  of type Fields
279   */
280  @ConstructorProperties({"pipes", "groupFields", "sortFields"})
281  public GroupBy( Pipe[] pipes, Fields groupFields, Fields sortFields )
282    {
283    super( pipes, groupFields, sortFields );
284    }
285
286  /**
287   * Creates a new GroupBy instance that will first merge the given pipes, then group on the given groupFields field names
288   * and sorts the grouped values on the given sortFields fields names.
289   *
290   * @param groupName   of type String
291   * @param pipes       of type Pipe
292   * @param groupFields of type Fields
293   * @param sortFields  of type Fields
294   */
295  @ConstructorProperties({"groupName", "pipes", "groupFields", "sortFields"})
296  public GroupBy( String groupName, Pipe[] pipes, Fields groupFields, Fields sortFields )
297    {
298    super( groupName, pipes, groupFields, sortFields );
299    }
300
301  /**
302   * Creates a new GroupBy instance that will first merge the given pipes, then group on the given groupFields field names
303   * and sorts the grouped values on the given sortFields fields names.
304   *
305   * @param pipes        of type Pipe
306   * @param groupFields  of type Fields
307   * @param sortFields   of type Fields
308   * @param reverseOrder of type boolean
309   */
310  @ConstructorProperties({"pipes", "groupFields", "sortFields", "reverseOrder"})
311  public GroupBy( Pipe[] pipes, Fields groupFields, Fields sortFields, boolean reverseOrder )
312    {
313    super( pipes, groupFields, sortFields, reverseOrder );
314    }
315
316  /**
317   * Creates a new GroupBy instance that will first merge the given pipes, then group on the given groupFields field names
318   * and sorts the grouped values on the given sortFields fields names.
319   *
320   * @param groupName    of type String
321   * @param pipes        of type Pipe
322   * @param groupFields  of type Fields
323   * @param sortFields   of type Fields
324   * @param reverseOrder of type boolean
325   */
326  @ConstructorProperties({"groupName", "pipes", "groupFields", "sortFields", "reverseOrder"})
327  public GroupBy( String groupName, Pipe[] pipes, Fields groupFields, Fields sortFields, boolean reverseOrder )
328    {
329    super( groupName, pipes, groupFields, sortFields, reverseOrder );
330    }
331  }