001/* 002 * Copyright (c) 2016-2017 Chris K Wensel <chris@wensel.net>. All Rights Reserved. 003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 004 * 005 * Project and contact information: http://www.cascading.org/ 006 * 007 * This file is part of the Cascading project. 008 * 009 * Licensed under the Apache License, Version 2.0 (the "License"); 010 * you may not use this file except in compliance with the License. 011 * You may obtain a copy of the License at 012 * 013 * http://www.apache.org/licenses/LICENSE-2.0 014 * 015 * Unless required by applicable law or agreed to in writing, software 016 * distributed under the License is distributed on an "AS IS" BASIS, 017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 018 * See the License for the specific language governing permissions and 019 * limitations under the License. 020 */ 021 022package cascading.pipe; 023 024import java.beans.ConstructorProperties; 025 026import cascading.operation.Aggregator; 027import cascading.operation.Filter; 028import cascading.operation.Function; 029import cascading.tuple.Fields; 030import cascading.tuple.Tuple; 031 032/** 033 * The GroupBy pipe groups the {@link Tuple} stream by the given groupFields. 034 * <p> 035 * If more than one {@link Pipe} instance is provided on the constructor, all branches will be merged. It is required 036 * that all Pipe instances output the same field names, otherwise the {@link cascading.flow.FlowConnector} will fail to create a 037 * {@link cascading.flow.Flow} instance. Again, the Pipe instances are merged together as if one Tuple stream and not joined. 038 * See {@link CoGroup} for joining by common fields. 039 * <p> 040 * Typically an {@link Every} follows GroupBy to apply an {@link Aggregator} function to every grouping. The 041 * {@link Each} operator may also follow GroupBy to apply a {@link Function} or {@link Filter} to the resulting 042 * stream. But an Each cannot come immediately before an Every. 043 * <p> 044 * Optionally a stream can be further sorted by providing sortFields. This allows an Aggregator to receive 045 * values in the order of the sortedFields. 046 * <p> 047 * Note that local sorting always happens on the groupFields, sortFields are a secondary sorting on the grouped values within the 048 * current grouping. sortFields is particularly useful if the Aggregators following the GroupBy would like to see their arguments 049 * in order. 050 * <p> 051 * For more control over sorting at the group or secondary sort level, use {@link cascading.tuple.Fields} 052 * containing {@link java.util.Comparator} instances for the appropriate fields when setting the groupFields or 053 * sortFields values. Fields allows you to set a custom {@link java.util.Comparator} instance for each field name or 054 * position. It is required that each Comparator class also be {@link java.io.Serializable}. 055 * <p> 056 * It should be noted for MapReduce systems, distributed group sorting is not 'total'. That is groups are sorted 057 * as seen by each Reducer, but they are not sorted across Reducers. See the MapReduce algorithm for details. 058 * <p> 059 * See the {@link cascading.tuple.Hasher} interface when a custom {@link java.util.Comparator} on the grouping keys is 060 * being provided that makes two values with differing hashCode values equal. For example, 061 * {@code new BigDecimal( 100.0D )} and {@code new Double 100.0D )} are equal using a custom Comparator, but 062 * {@link Object#hashCode()} will be different, thus forcing each value into differing partitions. 063 * <p> 064 * Note that grouping one String key with a lowercase value with another String key with an uppercase value using a 065 * "case insensitive" Comparator will not have consistent results. The grouping will execute and be correct, 066 * but the actual values in the key columns may be replaced with "equivalent" values from other streams. 067 * <p> 068 * That is, if two streams are merged and then grouped on a key, where one stream the key values are uppercase and the 069 * other stream values are lowercase, the resulting key value for the grouping may arbitrarily be either upper or 070 * lower case. 071 * <p> 072 * If the original key values must be retained, consider normalizing the keys with a Function and then grouping on the 073 * resulting field. 074 */ 075public class GroupBy extends Splice implements Group 076 { 077 /** 078 * Creates a new GroupBy instance that will group on {@link Fields#ALL} fields. 079 * 080 * @param pipe of type Pipe 081 */ 082 @ConstructorProperties({"pipe"}) 083 public GroupBy( Pipe pipe ) 084 { 085 super( pipe ); 086 } 087 088 /** 089 * Creates a new GroupBy instance that will group on the given groupFields field names. 090 * 091 * @param pipe of type Pipe 092 * @param groupFields of type Fields 093 */ 094 @ConstructorProperties({"pipe", "groupFields"}) 095 public GroupBy( Pipe pipe, Fields groupFields ) 096 { 097 super( pipe, groupFields ); 098 } 099 100 /** 101 * Creates a new GroupBy instance that will group on the given groupFields field names. 102 * 103 * @param pipe of type Pipe 104 * @param groupFields of type Fields 105 * @param reverseOrder of type boolean 106 */ 107 @ConstructorProperties({"pipe", "groupFields", "reverseOrder"}) 108 public GroupBy( Pipe pipe, Fields groupFields, boolean reverseOrder ) 109 { 110 super( pipe, groupFields, null, reverseOrder ); 111 } 112 113 /** 114 * Creates a new GroupBy instance that will group on the given groupFields field names. 115 * 116 * @param groupName of type String 117 * @param pipe of type Pipe 118 * @param groupFields of type Fields 119 */ 120 @ConstructorProperties({"groupName", "pipe", "groupFields"}) 121 public GroupBy( String groupName, Pipe pipe, Fields groupFields ) 122 { 123 super( groupName, pipe, groupFields ); 124 } 125 126 /** 127 * Creates a new GroupBy instance that will group on the given groupFields field names. 128 * 129 * @param groupName of type String 130 * @param pipe of type Pipe 131 * @param groupFields of type Fields 132 * @param reverseOrder of type boolean 133 */ 134 @ConstructorProperties({"groupName", "pipe", "groupFields", "reverseOrder"}) 135 public GroupBy( String groupName, Pipe pipe, Fields groupFields, boolean reverseOrder ) 136 { 137 super( groupName, pipe, groupFields, null, reverseOrder ); 138 } 139 140 /** 141 * Creates a new GroupBy instance that will group on the given groupFields field names 142 * and sorts the grouped values on the given sortFields fields names. 143 * 144 * @param pipe of type Pipe 145 * @param groupFields of type Fields 146 * @param sortFields of type Fields 147 */ 148 @ConstructorProperties({"pipe", "groupFields", "sortFields"}) 149 public GroupBy( Pipe pipe, Fields groupFields, Fields sortFields ) 150 { 151 super( pipe, groupFields, sortFields ); 152 } 153 154 /** 155 * Creates a new GroupBy instance that will group on the given groupFields field names 156 * and sorts the grouped values on the given sortFields fields names. 157 * 158 * @param groupName of type String 159 * @param pipe of type Pipe 160 * @param groupFields of type Fields 161 * @param sortFields of type Fields 162 */ 163 @ConstructorProperties({"groupName", "pipe", "groupFields", "sortFields"}) 164 public GroupBy( String groupName, Pipe pipe, Fields groupFields, Fields sortFields ) 165 { 166 super( groupName, pipe, groupFields, sortFields ); 167 } 168 169 /** 170 * Creates a new GroupBy instance that will group on the given groupFields field names 171 * and sorts the grouped values on the given sortFields fields names. 172 * 173 * @param pipe of type Pipe 174 * @param groupFields of type Fields 175 * @param sortFields of type Fields 176 * @param reverseOrder of type boolean 177 */ 178 @ConstructorProperties({"pipe", "groupFields", "sortFields", "reverseOrder"}) 179 public GroupBy( Pipe pipe, Fields groupFields, Fields sortFields, boolean reverseOrder ) 180 { 181 super( pipe, groupFields, sortFields, reverseOrder ); 182 } 183 184 /** 185 * Creates a new GroupBy instance that will group on the given groupFields field names 186 * and sorts the grouped values on the given sortFields fields names. 187 * 188 * @param groupName of type String 189 * @param pipe of type Pipe 190 * @param groupFields of type Fields 191 * @param sortFields of type Fields 192 * @param reverseOrder of type boolean 193 */ 194 @ConstructorProperties({"groupName", "pipe", "groupFields", "sortFields", "reverseOrder"}) 195 public GroupBy( String groupName, Pipe pipe, Fields groupFields, Fields sortFields, boolean reverseOrder ) 196 { 197 super( groupName, pipe, groupFields, sortFields, reverseOrder ); 198 } 199 200 ////////// 201 // MERGE 202 ////////// 203 204 /** 205 * Creates a new GroupBy instance that will first merge the given pipes, then group on Fields.FIRST. 206 * <p> 207 * The assumption is that the first fields in all streams are logically the same field, which should be true 208 * as merging assumes all incoming streams have the same fields in the same order. 209 * <p> 210 * To get the best performance, choose a field(s) that has many unique values, by using the constructor that takes 211 * a groupFields argument. If the first field has few unique values, data will only be sent to that number of reducers, 212 * or less, in the cluster, making the reduce phase a larger bottleneck. 213 * 214 * @param pipes of type Pipe 215 */ 216 @ConstructorProperties({"pipes"}) 217 public GroupBy( Pipe[] pipes ) 218 { 219 super( pipes, Fields.FIRST ); 220 } 221 222 /** 223 * Creates a new GroupBy instance that will first merge the given pipes, then group on the given groupFields field names. 224 * 225 * @param pipes of type Pipe 226 * @param groupFields of type Fields 227 */ 228 @ConstructorProperties({"pipes", "groupFields"}) 229 public GroupBy( Pipe[] pipes, Fields groupFields ) 230 { 231 super( pipes, groupFields ); 232 } 233 234 /** 235 * Creates a new GroupBy instance that will first merge the given pipes, then group on the given groupFields field names. 236 * 237 * @param lhsPipe of type Pipe 238 * @param rhsPipe of type Pipe 239 * @param groupFields of type Fields 240 */ 241 public GroupBy( Pipe lhsPipe, Pipe rhsPipe, Fields groupFields ) 242 { 243 super( Pipe.pipes( lhsPipe, rhsPipe ), groupFields ); 244 } 245 246 /** 247 * Creates a new GroupBy instance that will first merge the given pipes, then group on the given groupFields field names. 248 * 249 * @param groupName of type String 250 * @param pipes of type Pipe 251 * @param groupFields of type Fields 252 */ 253 @ConstructorProperties({"groupName", "pipes", "groupFields"}) 254 public GroupBy( String groupName, Pipe[] pipes, Fields groupFields ) 255 { 256 super( groupName, pipes, groupFields ); 257 } 258 259 /** 260 * Creates a new GroupBy instance that will first merge the given pipes, then group on the given groupFields field names. 261 * 262 * @param groupName of type String 263 * @param lhsPipe of type Pipe 264 * @param rhsPipe of type Pipe 265 * @param groupFields of type Fields 266 */ 267 public GroupBy( String groupName, Pipe lhsPipe, Pipe rhsPipe, Fields groupFields ) 268 { 269 super( groupName, Pipe.pipes( lhsPipe, rhsPipe ), groupFields ); 270 } 271 272 /** 273 * Creates a new GroupBy instance that will first merge the given pipes, then group on the given groupFields field names 274 * and sorts the grouped values on the given sortFields fields names. 275 * 276 * @param pipes of type Pipe 277 * @param groupFields of type Fields 278 * @param sortFields of type Fields 279 */ 280 @ConstructorProperties({"pipes", "groupFields", "sortFields"}) 281 public GroupBy( Pipe[] pipes, Fields groupFields, Fields sortFields ) 282 { 283 super( pipes, groupFields, sortFields ); 284 } 285 286 /** 287 * Creates a new GroupBy instance that will first merge the given pipes, then group on the given groupFields field names 288 * and sorts the grouped values on the given sortFields fields names. 289 * 290 * @param groupName of type String 291 * @param pipes of type Pipe 292 * @param groupFields of type Fields 293 * @param sortFields of type Fields 294 */ 295 @ConstructorProperties({"groupName", "pipes", "groupFields", "sortFields"}) 296 public GroupBy( String groupName, Pipe[] pipes, Fields groupFields, Fields sortFields ) 297 { 298 super( groupName, pipes, groupFields, sortFields ); 299 } 300 301 /** 302 * Creates a new GroupBy instance that will first merge the given pipes, then group on the given groupFields field names 303 * and sorts the grouped values on the given sortFields fields names. 304 * 305 * @param pipes of type Pipe 306 * @param groupFields of type Fields 307 * @param sortFields of type Fields 308 * @param reverseOrder of type boolean 309 */ 310 @ConstructorProperties({"pipes", "groupFields", "sortFields", "reverseOrder"}) 311 public GroupBy( Pipe[] pipes, Fields groupFields, Fields sortFields, boolean reverseOrder ) 312 { 313 super( pipes, groupFields, sortFields, reverseOrder ); 314 } 315 316 /** 317 * Creates a new GroupBy instance that will first merge the given pipes, then group on the given groupFields field names 318 * and sorts the grouped values on the given sortFields fields names. 319 * 320 * @param groupName of type String 321 * @param pipes of type Pipe 322 * @param groupFields of type Fields 323 * @param sortFields of type Fields 324 * @param reverseOrder of type boolean 325 */ 326 @ConstructorProperties({"groupName", "pipes", "groupFields", "sortFields", "reverseOrder"}) 327 public GroupBy( String groupName, Pipe[] pipes, Fields groupFields, Fields sortFields, boolean reverseOrder ) 328 { 329 super( groupName, pipes, groupFields, sortFields, reverseOrder ); 330 } 331 }