001/* 002 * Copyright (c) 2016-2017 Chris K Wensel <chris@wensel.net>. All Rights Reserved. 003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 004 * 005 * Project and contact information: http://www.cascading.org/ 006 * 007 * This file is part of the Cascading project. 008 * 009 * Licensed under the Apache License, Version 2.0 (the "License"); 010 * you may not use this file except in compliance with the License. 011 * You may obtain a copy of the License at 012 * 013 * http://www.apache.org/licenses/LICENSE-2.0 014 * 015 * Unless required by applicable law or agreed to in writing, software 016 * distributed under the License is distributed on an "AS IS" BASIS, 017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 018 * See the License for the specific language governing permissions and 019 * limitations under the License. 020 */ 021 022package cascading.pipe.assembly; 023 024import java.beans.ConstructorProperties; 025import java.lang.reflect.Type; 026 027import cascading.flow.FlowProcess; 028import cascading.operation.aggregator.Sum; 029import cascading.pipe.Pipe; 030import cascading.tuple.Fields; 031import cascading.tuple.Tuple; 032import cascading.tuple.TupleEntry; 033import cascading.tuple.coerce.Coercions; 034import cascading.tuple.type.CoercibleType; 035 036/** 037 * Class SumBy is used to sum values associated with duplicate keys in a tuple stream. 038 * <p> 039 * Typically finding the sum of field in a tuple stream relies on a {@link cascading.pipe.GroupBy} and a {@link cascading.operation.aggregator.Sum} 040 * {@link cascading.operation.Aggregator} operation. 041 * <p> 042 * If all the values to be summed are all {@code null}, the result value is a function of how null is coerced by the 043 * given {@code sumType}. If a primitive type, {@code 0} will be returned. Otherwise {@code null}. 044 * <p> 045 * This SubAssembly also uses the {@link SumBy.SumPartials} {@link AggregateBy.Functor} 046 * to sum field values before the GroupBy operator to reduce IO over the network. 047 * <p> 048 * This strategy is similar to using {@code combiners}, except no sorting or serialization is invoked and results 049 * in a much simpler mechanism. 050 * <p> 051 * The {@code threshold} value tells the underlying SumPartials functions how many unique key sums to accumulate 052 * in the LRU cache, before emitting the least recently used entry. This accumulation happens map-side, and thus is 053 * bounded by the size of your map task JVM and the typical size of each group key. 054 * <p> 055 * By default, either the value of {@link cascading.pipe.assembly.AggregateByProps#AGGREGATE_BY_CAPACITY} System property 056 * or {@link cascading.pipe.assembly.AggregateByProps#AGGREGATE_BY_DEFAULT_CAPACITY} will be used. 057 * 058 * @see AggregateBy 059 */ 060public class SumBy extends AggregateBy 061 { 062 /** 063 * Class SumPartials is a {@link AggregateBy.Functor} that is used to sum observed duplicates from the tuple stream. 064 * <p> 065 * Use this class typically in tandem with a {@link cascading.operation.aggregator.Sum} 066 * {@link cascading.operation.Aggregator} in order to improve counting performance by removing as many values 067 * as possible before the intermediate {@link cascading.pipe.GroupBy} operator. 068 * 069 * @see SumBy 070 */ 071 public static class SumPartials implements Functor 072 { 073 private final Fields declaredFields; 074 private final Type sumType; 075 private final CoercibleType canonical; 076 077 /** Constructor SumPartials creates a new SumPartials instance. */ 078 public SumPartials( Fields declaredFields ) 079 { 080 this.declaredFields = declaredFields; 081 082 if( !declaredFields.hasTypes() ) 083 throw new IllegalArgumentException( "result type must be declared " ); 084 085 this.sumType = declaredFields.getType( 0 ); 086 087 if( declaredFields.size() != 1 ) 088 throw new IllegalArgumentException( "declared fields may only have one field, got: " + declaredFields ); 089 090 this.canonical = Coercions.coercibleTypeFor( this.sumType ); 091 } 092 093 public SumPartials( Fields declaredFields, Class sumType ) 094 { 095 this.declaredFields = declaredFields; 096 this.sumType = sumType; 097 098 if( declaredFields.size() != 1 ) 099 throw new IllegalArgumentException( "declared fields may only have one field, got: " + declaredFields ); 100 101 this.canonical = Coercions.coercibleTypeFor( this.sumType ); 102 } 103 104 @Override 105 public Fields getDeclaredFields() 106 { 107 return declaredFields; 108 } 109 110 @Override 111 public Tuple aggregate( FlowProcess flowProcess, TupleEntry args, Tuple context ) 112 { 113 if( context == null ) 114 return args.getTupleCopy(); 115 else if( args.getObject( 0 ) == null ) 116 return context; 117 118 context.set( 0, context.getDouble( 0 ) + args.getDouble( 0 ) ); 119 120 return context; 121 } 122 123 @Override 124 public Tuple complete( FlowProcess flowProcess, Tuple context ) 125 { 126 context.set( 0, canonical.canonical( context.getObject( 0 ) ) ); 127 128 return context; 129 } 130 } 131 132 /** 133 * Constructor SumBy creates a new SumBy instance. Use this constructor when used with a {@link AggregateBy} 134 * instance. 135 * 136 * @param valueField of type Fields 137 * @param sumField of type Fields 138 */ 139 @ConstructorProperties({"valueField", "sumField"}) 140 public SumBy( Fields valueField, Fields sumField ) 141 { 142 super( valueField, new SumPartials( sumField ), new Sum( sumField ) ); 143 } 144 145 ////////////// 146 147 /** 148 * Constructor SumBy creates a new SumBy instance. 149 * 150 * @param pipe of type Pipe 151 * @param groupingFields of type Fields 152 * @param valueField of type Fields 153 * @param sumField of type Fields 154 */ 155 @ConstructorProperties({"pipe", "groupingFields", "valueField", "sumField"}) 156 public SumBy( Pipe pipe, Fields groupingFields, Fields valueField, Fields sumField ) 157 { 158 this( null, pipe, groupingFields, valueField, sumField, USE_DEFAULT_THRESHOLD ); 159 } 160 161 /** 162 * Constructor SumBy creates a new SumBy instance. 163 * 164 * @param pipe of type Pipe 165 * @param groupingFields of type Fields 166 * @param valueField of type Fields 167 * @param sumField of type Fields 168 * @param threshold of type int 169 */ 170 @ConstructorProperties({"pipe", "groupingFields", "valueField", "sumField", "threshold"}) 171 public SumBy( Pipe pipe, Fields groupingFields, Fields valueField, Fields sumField, int threshold ) 172 { 173 this( null, pipe, groupingFields, valueField, sumField, threshold ); 174 } 175 176 /** 177 * Constructor SumBy creates a new SumBy instance. 178 * 179 * @param name of type String 180 * @param pipe of type Pipe 181 * @param groupingFields of type Fields 182 * @param valueField of type Fields 183 * @param sumField of type Fields 184 */ 185 @ConstructorProperties({"name", "pipe", "groupingFields", "valueField", "sumField"}) 186 public SumBy( String name, Pipe pipe, Fields groupingFields, Fields valueField, Fields sumField ) 187 { 188 this( name, pipe, groupingFields, valueField, sumField, USE_DEFAULT_THRESHOLD ); 189 } 190 191 /** 192 * Constructor SumBy creates a new SumBy instance. 193 * 194 * @param name of type String 195 * @param pipe of type Pipe 196 * @param groupingFields of type Fields 197 * @param valueField of type Fields 198 * @param sumField of type Fields 199 * @param threshold of type int 200 */ 201 @ConstructorProperties({"name", "pipe", "groupingFields", "valueField", "sumField", "threshold"}) 202 public SumBy( String name, Pipe pipe, Fields groupingFields, Fields valueField, Fields sumField, int threshold ) 203 { 204 this( name, Pipe.pipes( pipe ), groupingFields, valueField, sumField, threshold ); 205 } 206 207 /** 208 * Constructor SumBy creates a new SumBy instance. 209 * 210 * @param pipes of type Pipe[] 211 * @param groupingFields of type Fields 212 * @param valueField of type Fields 213 * @param sumField of type Fields 214 */ 215 @ConstructorProperties({"pipes", "groupingFields", "valueField", "sumField"}) 216 public SumBy( Pipe[] pipes, Fields groupingFields, Fields valueField, Fields sumField ) 217 { 218 this( null, pipes, groupingFields, valueField, sumField, USE_DEFAULT_THRESHOLD ); 219 } 220 221 /** 222 * Constructor SumBy creates a new SumBy instance. 223 * 224 * @param pipes of type Pipe[] 225 * @param groupingFields of type Fields 226 * @param valueField of type Fields 227 * @param sumField of type Fields 228 * @param threshold of type int 229 */ 230 @ConstructorProperties({"pipes", "groupingFields", "valueField", "sumField", "threshold"}) 231 public SumBy( Pipe[] pipes, Fields groupingFields, Fields valueField, Fields sumField, int threshold ) 232 { 233 this( null, pipes, groupingFields, valueField, sumField, threshold ); 234 } 235 236 /** 237 * Constructor SumBy creates a new SumBy instance. 238 * 239 * @param name of type String 240 * @param pipes of type Pipe[] 241 * @param groupingFields of type Fields 242 * @param valueField of type Fields 243 * @param sumField of type Fields 244 */ 245 @ConstructorProperties({"name", "pipes", "groupingFields", "valueField", "sumField"}) 246 public SumBy( String name, Pipe[] pipes, Fields groupingFields, Fields valueField, Fields sumField ) 247 { 248 this( name, pipes, groupingFields, valueField, sumField, USE_DEFAULT_THRESHOLD ); 249 } 250 251 /** 252 * Constructor SumBy creates a new SumBy instance. 253 * 254 * @param name of type String 255 * @param pipes of type Pipe[] 256 * @param groupingFields of type Fields 257 * @param valueField of type Fields 258 * @param sumField of type Fields 259 * @param threshold of type int 260 */ 261 @ConstructorProperties({"name", "pipes", "groupingFields", "valueField", "sumField", "threshold"}) 262 public SumBy( String name, Pipe[] pipes, Fields groupingFields, Fields valueField, Fields sumField, int threshold ) 263 { 264 super( name, pipes, groupingFields, valueField, new SumPartials( sumField ), new Sum( sumField ), threshold ); 265 } 266 267/////////// 268 269 /** 270 * Constructor SumBy creates a new SumBy instance. Use this constructor when used with a {@link AggregateBy} 271 * instance. 272 * 273 * @param valueField of type Fields 274 * @param sumField of type Fields 275 * @param sumType of type Class 276 */ 277 @ConstructorProperties({"valueField", "sumField", "sumType"}) 278 public SumBy( Fields valueField, Fields sumField, Class sumType ) 279 { 280 super( valueField, new SumPartials( sumField, sumType ), new Sum( sumField, sumType ) ); 281 } 282 283////////////// 284 285 /** 286 * Constructor SumBy creates a new SumBy instance. 287 * 288 * @param pipe of type Pipe 289 * @param groupingFields of type Fields 290 * @param valueField of type Fields 291 * @param sumField of type Fields 292 * @param sumType of type Class 293 */ 294 @ConstructorProperties({"pipe", "groupingFields", "valueField", "sumField", "sumType"}) 295 public SumBy( Pipe pipe, Fields groupingFields, Fields valueField, Fields sumField, Class sumType ) 296 { 297 this( null, pipe, groupingFields, valueField, sumField, sumType, USE_DEFAULT_THRESHOLD ); 298 } 299 300 /** 301 * Constructor SumBy creates a new SumBy instance. 302 * 303 * @param pipe of type Pipe 304 * @param groupingFields of type Fields 305 * @param valueField of type Fields 306 * @param sumField of type Fields 307 * @param sumType of type Class 308 * @param threshold of type int 309 */ 310 @ConstructorProperties({"pipe", "groupingFields", "valueField", "sumField", "sumType", "threshold"}) 311 public SumBy( Pipe pipe, Fields groupingFields, Fields valueField, Fields sumField, Class sumType, int threshold ) 312 { 313 this( null, pipe, groupingFields, valueField, sumField, sumType, threshold ); 314 } 315 316 /** 317 * Constructor SumBy creates a new SumBy instance. 318 * 319 * @param name of type String 320 * @param pipe of type Pipe 321 * @param groupingFields of type Fields 322 * @param valueField of type Fields 323 * @param sumField of type Fields 324 * @param sumType of type Class 325 */ 326 @ConstructorProperties({"name", "pipe", "groupingFields", "valueField", "sumField", "sumType"}) 327 public SumBy( String name, Pipe pipe, Fields groupingFields, Fields valueField, Fields sumField, Class sumType ) 328 { 329 this( name, pipe, groupingFields, valueField, sumField, sumType, USE_DEFAULT_THRESHOLD ); 330 } 331 332 /** 333 * Constructor SumBy creates a new SumBy instance. 334 * 335 * @param name of type String 336 * @param pipe of type Pipe 337 * @param groupingFields of type Fields 338 * @param valueField of type Fields 339 * @param sumField of type Fields 340 * @param sumType of type Class 341 * @param threshold of type int 342 */ 343 @ConstructorProperties({"name", "pipe", "groupingFields", "valueField", "sumField", "sumType", "threshold"}) 344 public SumBy( String name, Pipe pipe, Fields groupingFields, Fields valueField, Fields sumField, Class sumType, int threshold ) 345 { 346 this( name, Pipe.pipes( pipe ), groupingFields, valueField, sumField, sumType, threshold ); 347 } 348 349 /** 350 * Constructor SumBy creates a new SumBy instance. 351 * 352 * @param pipes of type Pipe[] 353 * @param groupingFields of type Fields 354 * @param valueField of type Fields 355 * @param sumField of type Fields 356 * @param sumType of type Class 357 */ 358 @ConstructorProperties({"pipes", "groupingFields", "valueField", "sumField", "sumType"}) 359 public SumBy( Pipe[] pipes, Fields groupingFields, Fields valueField, Fields sumField, Class sumType ) 360 { 361 this( null, pipes, groupingFields, valueField, sumField, sumType, USE_DEFAULT_THRESHOLD ); 362 } 363 364 /** 365 * Constructor SumBy creates a new SumBy instance. 366 * 367 * @param pipes of type Pipe[] 368 * @param groupingFields of type Fields 369 * @param valueField of type Fields 370 * @param sumField of type Fields 371 * @param sumType of type Class 372 * @param threshold of type int 373 */ 374 @ConstructorProperties({"pipes", "groupingFields", "valueField", "sumField", "sumType", "threshold"}) 375 public SumBy( Pipe[] pipes, Fields groupingFields, Fields valueField, Fields sumField, Class sumType, int threshold ) 376 { 377 this( null, pipes, groupingFields, valueField, sumField, sumType, threshold ); 378 } 379 380 /** 381 * Constructor SumBy creates a new SumBy instance. 382 * 383 * @param name of type String 384 * @param pipes of type Pipe[] 385 * @param groupingFields of type Fields 386 * @param valueField of type Fields 387 * @param sumField of type Fields 388 * @param sumType of type Class 389 */ 390 @ConstructorProperties({"name", "pipes", "groupingFields", "valueField", "sumField", "sumType"}) 391 public SumBy( String name, Pipe[] pipes, Fields groupingFields, Fields valueField, Fields sumField, Class sumType ) 392 { 393 this( name, pipes, groupingFields, valueField, sumField, sumType, USE_DEFAULT_THRESHOLD ); 394 } 395 396 /** 397 * Constructor SumBy creates a new SumBy instance. 398 * 399 * @param name of type String 400 * @param pipes of type Pipe[] 401 * @param groupingFields of type Fields 402 * @param valueField of type Fields 403 * @param sumField of type Fields 404 * @param sumType of type Class 405 * @param threshold of type int 406 */ 407 @ConstructorProperties({"name", "pipes", "groupingFields", "valueField", "sumField", "sumType", "threshold"}) 408 public SumBy( String name, Pipe[] pipes, Fields groupingFields, Fields valueField, Fields sumField, Class sumType, int threshold ) 409 { 410 super( name, pipes, groupingFields, valueField, new SumPartials( sumField, sumType ), new Sum( sumField, sumType ), threshold ); 411 } 412 }