001/* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.pipe; 022 023import java.beans.ConstructorProperties; 024 025import cascading.operation.Aggregator; 026import cascading.operation.Filter; 027import cascading.operation.Function; 028import cascading.tuple.Fields; 029import cascading.tuple.Tuple; 030 031/** 032 * The GroupBy pipe groups the {@link Tuple} stream by the given groupFields. 033 * </p> 034 * If more than one {@link Pipe} instance is provided on the constructor, all branches will be merged. It is required 035 * that all Pipe instances output the same field names, otherwise the {@link cascading.flow.FlowConnector} will fail to create a 036 * {@link cascading.flow.Flow} instance. Again, the Pipe instances are merged together as if one Tuple stream and not joined. 037 * See {@link CoGroup} for joining by common fields. 038 * </p> 039 * Typically an {@link Every} follows GroupBy to apply an {@link Aggregator} function to every grouping. The 040 * {@link Each} operator may also follow GroupBy to apply a {@link Function} or {@link Filter} to the resulting 041 * stream. But an Each cannot come immediately before an Every. 042 * <p/> 043 * Optionally a stream can be further sorted by providing sortFields. This allows an Aggregator to receive 044 * values in the order of the sortedFields. 045 * <p/> 046 * Note that local sorting always happens on the groupFields, sortFields are a secondary sorting on the grouped values within the 047 * current grouping. sortFields is particularly useful if the Aggregators following the GroupBy would like to see their arguments 048 * in order. 049 * <p/> 050 * For more control over sorting at the group or secondary sort level, use {@link cascading.tuple.Fields} 051 * containing {@link java.util.Comparator} instances for the appropriate fields when setting the groupFields or 052 * sortFields values. Fields allows you to set a custom {@link java.util.Comparator} instance for each field name or 053 * position. It is required that each Comparator class also be {@link java.io.Serializable}. 054 * <p/> 055 * It should be noted for MapReduce systems, distributed group sorting is not 'total'. That is groups are sorted 056 * as seen by each Reducer, but they are not sorted across Reducers. See the MapReduce algorithm for details. 057 * <p/> 058 * See the {@link cascading.tuple.Hasher} interface when a custom {@link java.util.Comparator} on the grouping keys is 059 * being provided that makes two values with differing hashCode values equal. For example, 060 * {@code new BigDecimal( 100.0D )} and {@code new Double 100.0D )} are equal using a custom Comparator, but 061 * {@link Object#hashCode()} will be different, thus forcing each value into differing partitions. 062 * <p/> 063 * Note that grouping one String key with a lowercase value with another String key with an uppercase value using a 064 * "case insensitive" Comparator will not have consistent results. The grouping will execute and be correct, 065 * but the actual values in the key columns may be replaced with "equivalent" values from other streams. 066 * <p/> 067 * That is, if two streams are merged and then grouped on a key, where one stream the key values are uppercase and the 068 * other stream values are lowercase, the resulting key value for the grouping may arbitrarily be either upper or 069 * lower case. 070 * <p/> 071 * If the original key values must be retained, consider normalizing the keys with a Function and then grouping on the 072 * resulting field. 073 */ 074public class GroupBy extends Splice implements Group 075 { 076 /** 077 * Creates a new GroupBy instance that will group on {@link Fields#ALL} fields. 078 * 079 * @param pipe of type Pipe 080 */ 081 @ConstructorProperties({"pipe"}) 082 public GroupBy( Pipe pipe ) 083 { 084 super( pipe ); 085 } 086 087 /** 088 * Creates a new GroupBy instance that will group on the given groupFields field names. 089 * 090 * @param pipe of type Pipe 091 * @param groupFields of type Fields 092 */ 093 @ConstructorProperties({"pipe", "groupFields"}) 094 public GroupBy( Pipe pipe, Fields groupFields ) 095 { 096 super( pipe, groupFields ); 097 } 098 099 /** 100 * Creates a new GroupBy instance that will group on the given groupFields field names. 101 * 102 * @param pipe of type Pipe 103 * @param groupFields of type Fields 104 * @param reverseOrder of type boolean 105 */ 106 @ConstructorProperties({"pipe", "groupFields", "reverseOrder"}) 107 public GroupBy( Pipe pipe, Fields groupFields, boolean reverseOrder ) 108 { 109 super( pipe, groupFields, null, reverseOrder ); 110 } 111 112 /** 113 * Creates a new GroupBy instance that will group on the given groupFields field names. 114 * 115 * @param groupName of type String 116 * @param pipe of type Pipe 117 * @param groupFields of type Fields 118 */ 119 @ConstructorProperties({"groupName", "pipe", "groupFields"}) 120 public GroupBy( String groupName, Pipe pipe, Fields groupFields ) 121 { 122 super( groupName, pipe, groupFields ); 123 } 124 125 /** 126 * Creates a new GroupBy instance that will group on the given groupFields field names. 127 * 128 * @param groupName of type String 129 * @param pipe of type Pipe 130 * @param groupFields of type Fields 131 * @param reverseOrder of type boolean 132 */ 133 @ConstructorProperties({"groupName", "pipe", "groupFields", "reverseOrder"}) 134 public GroupBy( String groupName, Pipe pipe, Fields groupFields, boolean reverseOrder ) 135 { 136 super( groupName, pipe, groupFields, null, reverseOrder ); 137 } 138 139 /** 140 * Creates a new GroupBy instance that will group on the given groupFields field names 141 * and sorts the grouped values on the given sortFields fields names. 142 * 143 * @param pipe of type Pipe 144 * @param groupFields of type Fields 145 * @param sortFields of type Fields 146 */ 147 @ConstructorProperties({"pipe", "groupFields", "sortFields"}) 148 public GroupBy( Pipe pipe, Fields groupFields, Fields sortFields ) 149 { 150 super( pipe, groupFields, sortFields ); 151 } 152 153 /** 154 * Creates a new GroupBy instance that will group on the given groupFields field names 155 * and sorts the grouped values on the given sortFields fields names. 156 * 157 * @param groupName of type String 158 * @param pipe of type Pipe 159 * @param groupFields of type Fields 160 * @param sortFields of type Fields 161 */ 162 @ConstructorProperties({"groupName", "pipe", "groupFields", "sortFields"}) 163 public GroupBy( String groupName, Pipe pipe, Fields groupFields, Fields sortFields ) 164 { 165 super( groupName, pipe, groupFields, sortFields ); 166 } 167 168 /** 169 * Creates a new GroupBy instance that will group on the given groupFields field names 170 * and sorts the grouped values on the given sortFields fields names. 171 * 172 * @param pipe of type Pipe 173 * @param groupFields of type Fields 174 * @param sortFields of type Fields 175 * @param reverseOrder of type boolean 176 */ 177 @ConstructorProperties({"pipe", "groupFields", "sortFields", "reverseOrder"}) 178 public GroupBy( Pipe pipe, Fields groupFields, Fields sortFields, boolean reverseOrder ) 179 { 180 super( pipe, groupFields, sortFields, reverseOrder ); 181 } 182 183 /** 184 * Creates a new GroupBy instance that will group on the given groupFields field names 185 * and sorts the grouped values on the given sortFields fields names. 186 * 187 * @param groupName of type String 188 * @param pipe of type Pipe 189 * @param groupFields of type Fields 190 * @param sortFields of type Fields 191 * @param reverseOrder of type boolean 192 */ 193 @ConstructorProperties({"groupName", "pipe", "groupFields", "sortFields", "reverseOrder"}) 194 public GroupBy( String groupName, Pipe pipe, Fields groupFields, Fields sortFields, boolean reverseOrder ) 195 { 196 super( groupName, pipe, groupFields, sortFields, reverseOrder ); 197 } 198 199 ////////// 200 // MERGE 201 ////////// 202 203 /** 204 * Creates a new GroupBy instance that will first merge the given pipes, then group on Fields.FIRST. 205 * <p/> 206 * The assumption is that the first fields in all streams are logically the same field, which should be true 207 * as merging assumes all incoming streams have the same fields in the same order. 208 * <p/> 209 * To get the best performance, choose a field(s) that has many unique values, by using the constructor that takes 210 * a groupFields argument. If the first field has few unique values, data will only be sent to that number of reducers, 211 * or less, in the cluster, making the reduce phase a larger bottleneck. 212 * 213 * @param pipes of type Pipe 214 */ 215 @ConstructorProperties({"pipes"}) 216 public GroupBy( Pipe[] pipes ) 217 { 218 super( pipes, Fields.FIRST ); 219 } 220 221 /** 222 * Creates a new GroupBy instance that will first merge the given pipes, then group on the given groupFields field names. 223 * 224 * @param pipes of type Pipe 225 * @param groupFields of type Fields 226 */ 227 @ConstructorProperties({"pipes", "groupFields"}) 228 public GroupBy( Pipe[] pipes, Fields groupFields ) 229 { 230 super( pipes, groupFields ); 231 } 232 233 /** 234 * Creates a new GroupBy instance that will first merge the given pipes, then group on the given groupFields field names. 235 * 236 * @param lhsPipe of type Pipe 237 * @param rhsPipe of type Pipe 238 * @param groupFields of type Fields 239 */ 240 public GroupBy( Pipe lhsPipe, Pipe rhsPipe, Fields groupFields ) 241 { 242 super( Pipe.pipes( lhsPipe, rhsPipe ), groupFields ); 243 } 244 245 /** 246 * Creates a new GroupBy instance that will first merge the given pipes, then group on the given groupFields field names. 247 * 248 * @param groupName of type String 249 * @param pipes of type Pipe 250 * @param groupFields of type Fields 251 */ 252 @ConstructorProperties({"groupName", "pipes", "groupFields"}) 253 public GroupBy( String groupName, Pipe[] pipes, Fields groupFields ) 254 { 255 super( groupName, pipes, groupFields ); 256 } 257 258 /** 259 * Creates a new GroupBy instance that will first merge the given pipes, then group on the given groupFields field names. 260 * 261 * @param groupName of type String 262 * @param lhsPipe of type Pipe 263 * @param rhsPipe of type Pipe 264 * @param groupFields of type Fields 265 */ 266 public GroupBy( String groupName, Pipe lhsPipe, Pipe rhsPipe, Fields groupFields ) 267 { 268 super( groupName, Pipe.pipes( lhsPipe, rhsPipe ), groupFields ); 269 } 270 271 /** 272 * Creates a new GroupBy instance that will first merge the given pipes, then group on the given groupFields field names 273 * and sorts the grouped values on the given sortFields fields names. 274 * 275 * @param pipes of type Pipe 276 * @param groupFields of type Fields 277 * @param sortFields of type Fields 278 */ 279 @ConstructorProperties({"pipes", "groupFields", "sortFields"}) 280 public GroupBy( Pipe[] pipes, Fields groupFields, Fields sortFields ) 281 { 282 super( pipes, groupFields, sortFields ); 283 } 284 285 /** 286 * Creates a new GroupBy instance that will first merge the given pipes, then group on the given groupFields field names 287 * and sorts the grouped values on the given sortFields fields names. 288 * 289 * @param groupName of type String 290 * @param pipes of type Pipe 291 * @param groupFields of type Fields 292 * @param sortFields of type Fields 293 */ 294 @ConstructorProperties({"groupName", "pipes", "groupFields", "sortFields"}) 295 public GroupBy( String groupName, Pipe[] pipes, Fields groupFields, Fields sortFields ) 296 { 297 super( groupName, pipes, groupFields, sortFields ); 298 } 299 300 /** 301 * Creates a new GroupBy instance that will first merge the given pipes, then group on the given groupFields field names 302 * and sorts the grouped values on the given sortFields fields names. 303 * 304 * @param pipes of type Pipe 305 * @param groupFields of type Fields 306 * @param sortFields of type Fields 307 * @param reverseOrder of type boolean 308 */ 309 @ConstructorProperties({"pipes", "groupFields", "sortFields", "reverseOrder"}) 310 public GroupBy( Pipe[] pipes, Fields groupFields, Fields sortFields, boolean reverseOrder ) 311 { 312 super( pipes, groupFields, sortFields, reverseOrder ); 313 } 314 315 /** 316 * Creates a new GroupBy instance that will first merge the given pipes, then group on the given groupFields field names 317 * and sorts the grouped values on the given sortFields fields names. 318 * 319 * @param groupName of type String 320 * @param pipes of type Pipe 321 * @param groupFields of type Fields 322 * @param sortFields of type Fields 323 * @param reverseOrder of type boolean 324 */ 325 @ConstructorProperties({"groupName", "pipes", "groupFields", "sortFields", "reverseOrder"}) 326 public GroupBy( String groupName, Pipe[] pipes, Fields groupFields, Fields sortFields, boolean reverseOrder ) 327 { 328 super( groupName, pipes, groupFields, sortFields, reverseOrder ); 329 } 330 }