001 /* 002 * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.pipe.assembly; 022 023 import java.beans.ConstructorProperties; 024 import java.lang.reflect.Type; 025 026 import cascading.flow.FlowProcess; 027 import cascading.operation.aggregator.Sum; 028 import cascading.pipe.Pipe; 029 import cascading.tuple.Fields; 030 import cascading.tuple.Tuple; 031 import cascading.tuple.TupleEntry; 032 import cascading.tuple.coerce.Coercions; 033 import cascading.tuple.type.CoercibleType; 034 035 /** 036 * Class SumBy is used to sum values associated with duplicate keys in a tuple stream. 037 * <p/> 038 * Typically finding the sum of field in a tuple stream relies on a {@link cascading.pipe.GroupBy} and a {@link cascading.operation.aggregator.Sum} 039 * {@link cascading.operation.Aggregator} operation. 040 * <p/> 041 * If all the values to be summed are all {@code null}, the result value is a function of how null is coerced by the 042 * given {@code sumType}. If a primitive type, {@code 0} will be returned. Otherwise {@code null}. 043 * <p/> 044 * This SubAssembly also uses the {@link SumBy.SumPartials} {@link AggregateBy.Functor} 045 * to sum field values before the GroupBy operator to reduce IO over the network. 046 * <p/> 047 * This strategy is similar to using {@code combiners}, except no sorting or serialization is invoked and results 048 * in a much simpler mechanism. 049 * <p/> 050 * The {@code threshold} value tells the underlying SumPartials functions how many unique key sums to accumulate 051 * in the LRU cache, before emitting the least recently used entry. 052 * <p/> 053 * By default, either the value of {@link #AGGREGATE_BY_THRESHOLD} System property or {@link AggregateBy#DEFAULT_THRESHOLD} 054 * will be used. 055 * 056 * @see AggregateBy 057 */ 058 public class SumBy extends AggregateBy 059 { 060 /** DEFAULT_THRESHOLD */ 061 @Deprecated 062 public static final int DEFAULT_THRESHOLD = 10000; 063 064 /** 065 * Class SumPartials is a {@link AggregateBy.Functor} that is used to sum observed duplicates from the tuple stream. 066 * <p/> 067 * Use this class typically in tandem with a {@link cascading.operation.aggregator.Sum} 068 * {@link cascading.operation.Aggregator} in order to improve counting performance by removing as many values 069 * as possible before the intermediate {@link cascading.pipe.GroupBy} operator. 070 * 071 * @see SumBy 072 */ 073 public static class SumPartials implements Functor 074 { 075 private final Fields declaredFields; 076 private final Type sumType; 077 private final CoercibleType canonical; 078 079 /** Constructor SumPartials creates a new SumPartials instance. */ 080 public SumPartials( Fields declaredFields ) 081 { 082 this.declaredFields = declaredFields; 083 084 if( !declaredFields.hasTypes() ) 085 throw new IllegalArgumentException( "result type must be declared " ); 086 087 this.sumType = declaredFields.getType( 0 ); 088 089 if( declaredFields.size() != 1 ) 090 throw new IllegalArgumentException( "declared fields may only have one field, got: " + declaredFields ); 091 092 this.canonical = Coercions.coercibleTypeFor( this.sumType ); 093 } 094 095 public SumPartials( Fields declaredFields, Class sumType ) 096 { 097 this.declaredFields = declaredFields; 098 this.sumType = sumType; 099 100 if( declaredFields.size() != 1 ) 101 throw new IllegalArgumentException( "declared fields may only have one field, got: " + declaredFields ); 102 103 this.canonical = Coercions.coercibleTypeFor( this.sumType ); 104 } 105 106 @Override 107 public Fields getDeclaredFields() 108 { 109 return declaredFields; 110 } 111 112 @Override 113 public Tuple aggregate( FlowProcess flowProcess, TupleEntry args, Tuple context ) 114 { 115 if( context == null ) 116 return args.getTupleCopy(); 117 else if( args.getObject( 0 ) == null ) 118 return context; 119 120 context.set( 0, context.getDouble( 0 ) + args.getDouble( 0 ) ); 121 122 return context; 123 } 124 125 @Override 126 public Tuple complete( FlowProcess flowProcess, Tuple context ) 127 { 128 context.set( 0, canonical.canonical( context.getObject( 0 ) ) ); 129 130 return context; 131 } 132 } 133 134 /** 135 * Constructor SumBy creates a new SumBy instance. Use this constructor when used with a {@link AggregateBy} 136 * instance. 137 * 138 * @param valueField of type Fields 139 * @param sumField of type Fields 140 */ 141 @ConstructorProperties({"valueField", "sumField"}) 142 public SumBy( Fields valueField, Fields sumField ) 143 { 144 super( valueField, new SumPartials( sumField ), new Sum( sumField ) ); 145 } 146 147 ////////////// 148 149 /** 150 * Constructor SumBy creates a new SumBy instance. 151 * 152 * @param pipe of type Pipe 153 * @param groupingFields of type Fields 154 * @param valueField of type Fields 155 * @param sumField of type Fields 156 */ 157 @ConstructorProperties({"pipe", "groupingFields", "valueField", "sumField"}) 158 public SumBy( Pipe pipe, Fields groupingFields, Fields valueField, Fields sumField ) 159 { 160 this( null, pipe, groupingFields, valueField, sumField, USE_DEFAULT_THRESHOLD ); 161 } 162 163 /** 164 * Constructor SumBy creates a new SumBy instance. 165 * 166 * @param pipe of type Pipe 167 * @param groupingFields of type Fields 168 * @param valueField of type Fields 169 * @param sumField of type Fields 170 * @param threshold of type int 171 */ 172 @ConstructorProperties({"pipe", "groupingFields", "valueField", "sumField", "threshold"}) 173 public SumBy( Pipe pipe, Fields groupingFields, Fields valueField, Fields sumField, int threshold ) 174 { 175 this( null, pipe, groupingFields, valueField, sumField, threshold ); 176 } 177 178 /** 179 * Constructor SumBy creates a new SumBy instance. 180 * 181 * @param name of type String 182 * @param pipe of type Pipe 183 * @param groupingFields of type Fields 184 * @param valueField of type Fields 185 * @param sumField of type Fields 186 */ 187 @ConstructorProperties({"name", "pipe", "groupingFields", "valueField", "sumField"}) 188 public SumBy( String name, Pipe pipe, Fields groupingFields, Fields valueField, Fields sumField ) 189 { 190 this( name, pipe, groupingFields, valueField, sumField, USE_DEFAULT_THRESHOLD ); 191 } 192 193 /** 194 * Constructor SumBy creates a new SumBy instance. 195 * 196 * @param name of type String 197 * @param pipe of type Pipe 198 * @param groupingFields of type Fields 199 * @param valueField of type Fields 200 * @param sumField of type Fields 201 * @param threshold of type int 202 */ 203 @ConstructorProperties({"name", "pipe", "groupingFields", "valueField", "sumField", "threshold"}) 204 public SumBy( String name, Pipe pipe, Fields groupingFields, Fields valueField, Fields sumField, int threshold ) 205 { 206 this( name, Pipe.pipes( pipe ), groupingFields, valueField, sumField, threshold ); 207 } 208 209 /** 210 * Constructor SumBy creates a new SumBy instance. 211 * 212 * @param pipes of type Pipe[] 213 * @param groupingFields of type Fields 214 * @param valueField of type Fields 215 * @param sumField of type Fields 216 */ 217 @ConstructorProperties({"pipes", "groupingFields", "valueField", "sumField"}) 218 public SumBy( Pipe[] pipes, Fields groupingFields, Fields valueField, Fields sumField ) 219 { 220 this( null, pipes, groupingFields, valueField, sumField, USE_DEFAULT_THRESHOLD ); 221 } 222 223 /** 224 * Constructor SumBy creates a new SumBy instance. 225 * 226 * @param pipes of type Pipe[] 227 * @param groupingFields of type Fields 228 * @param valueField of type Fields 229 * @param sumField of type Fields 230 * @param threshold of type int 231 */ 232 @ConstructorProperties({"pipes", "groupingFields", "valueField", "sumField", "threshold"}) 233 public SumBy( Pipe[] pipes, Fields groupingFields, Fields valueField, Fields sumField, int threshold ) 234 { 235 this( null, pipes, groupingFields, valueField, sumField, threshold ); 236 } 237 238 /** 239 * Constructor SumBy creates a new SumBy instance. 240 * 241 * @param name of type String 242 * @param pipes of type Pipe[] 243 * @param groupingFields of type Fields 244 * @param valueField of type Fields 245 * @param sumField of type Fields 246 */ 247 @ConstructorProperties({"name", "pipes", "groupingFields", "valueField", "sumField"}) 248 public SumBy( String name, Pipe[] pipes, Fields groupingFields, Fields valueField, Fields sumField ) 249 { 250 this( name, pipes, groupingFields, valueField, sumField, USE_DEFAULT_THRESHOLD ); 251 } 252 253 /** 254 * Constructor SumBy creates a new SumBy instance. 255 * 256 * @param name of type String 257 * @param pipes of type Pipe[] 258 * @param groupingFields of type Fields 259 * @param valueField of type Fields 260 * @param sumField of type Fields 261 * @param threshold of type int 262 */ 263 @ConstructorProperties({"name", "pipes", "groupingFields", "valueField", "sumField", "threshold"}) 264 public SumBy( String name, Pipe[] pipes, Fields groupingFields, Fields valueField, Fields sumField, int threshold ) 265 { 266 super( name, pipes, groupingFields, valueField, new SumPartials( sumField ), new Sum( sumField ), threshold ); 267 } 268 269 /////////// 270 271 /** 272 * Constructor SumBy creates a new SumBy instance. Use this constructor when used with a {@link AggregateBy} 273 * instance. 274 * 275 * @param valueField of type Fields 276 * @param sumField of type Fields 277 * @param sumType of type Class 278 */ 279 @ConstructorProperties({"valueField", "sumField", "sumType"}) 280 public SumBy( Fields valueField, Fields sumField, Class sumType ) 281 { 282 super( valueField, new SumPartials( sumField, sumType ), new Sum( sumField, sumType ) ); 283 } 284 285 ////////////// 286 287 /** 288 * Constructor SumBy creates a new SumBy instance. 289 * 290 * @param pipe of type Pipe 291 * @param groupingFields of type Fields 292 * @param valueField of type Fields 293 * @param sumField of type Fields 294 * @param sumType of type Class 295 */ 296 @ConstructorProperties({"pipe", "groupingFields", "valueField", "sumField", "sumType"}) 297 public SumBy( Pipe pipe, Fields groupingFields, Fields valueField, Fields sumField, Class sumType ) 298 { 299 this( null, pipe, groupingFields, valueField, sumField, sumType, USE_DEFAULT_THRESHOLD ); 300 } 301 302 /** 303 * Constructor SumBy creates a new SumBy instance. 304 * 305 * @param pipe of type Pipe 306 * @param groupingFields of type Fields 307 * @param valueField of type Fields 308 * @param sumField of type Fields 309 * @param sumType of type Class 310 * @param threshold of type int 311 */ 312 @ConstructorProperties({"pipe", "groupingFields", "valueField", "sumField", "sumType", "threshold"}) 313 public SumBy( Pipe pipe, Fields groupingFields, Fields valueField, Fields sumField, Class sumType, int threshold ) 314 { 315 this( null, pipe, groupingFields, valueField, sumField, sumType, threshold ); 316 } 317 318 /** 319 * Constructor SumBy creates a new SumBy instance. 320 * 321 * @param name of type String 322 * @param pipe of type Pipe 323 * @param groupingFields of type Fields 324 * @param valueField of type Fields 325 * @param sumField of type Fields 326 * @param sumType of type Class 327 */ 328 @ConstructorProperties({"name", "pipe", "groupingFields", "valueField", "sumField", "sumType"}) 329 public SumBy( String name, Pipe pipe, Fields groupingFields, Fields valueField, Fields sumField, Class sumType ) 330 { 331 this( name, pipe, groupingFields, valueField, sumField, sumType, USE_DEFAULT_THRESHOLD ); 332 } 333 334 /** 335 * Constructor SumBy creates a new SumBy instance. 336 * 337 * @param name of type String 338 * @param pipe of type Pipe 339 * @param groupingFields of type Fields 340 * @param valueField of type Fields 341 * @param sumField of type Fields 342 * @param sumType of type Class 343 * @param threshold of type int 344 */ 345 @ConstructorProperties({"name", "pipe", "groupingFields", "valueField", "sumField", "sumType", "threshold"}) 346 public SumBy( String name, Pipe pipe, Fields groupingFields, Fields valueField, Fields sumField, Class sumType, int threshold ) 347 { 348 this( name, Pipe.pipes( pipe ), groupingFields, valueField, sumField, sumType, threshold ); 349 } 350 351 /** 352 * Constructor SumBy creates a new SumBy instance. 353 * 354 * @param pipes of type Pipe[] 355 * @param groupingFields of type Fields 356 * @param valueField of type Fields 357 * @param sumField of type Fields 358 * @param sumType of type Class 359 */ 360 @ConstructorProperties({"pipes", "groupingFields", "valueField", "sumField", "sumType"}) 361 public SumBy( Pipe[] pipes, Fields groupingFields, Fields valueField, Fields sumField, Class sumType ) 362 { 363 this( null, pipes, groupingFields, valueField, sumField, sumType, USE_DEFAULT_THRESHOLD ); 364 } 365 366 /** 367 * Constructor SumBy creates a new SumBy instance. 368 * 369 * @param pipes of type Pipe[] 370 * @param groupingFields of type Fields 371 * @param valueField of type Fields 372 * @param sumField of type Fields 373 * @param sumType of type Class 374 * @param threshold of type int 375 */ 376 @ConstructorProperties({"pipes", "groupingFields", "valueField", "sumField", "sumType", "threshold"}) 377 public SumBy( Pipe[] pipes, Fields groupingFields, Fields valueField, Fields sumField, Class sumType, int threshold ) 378 { 379 this( null, pipes, groupingFields, valueField, sumField, sumType, threshold ); 380 } 381 382 /** 383 * Constructor SumBy creates a new SumBy instance. 384 * 385 * @param name of type String 386 * @param pipes of type Pipe[] 387 * @param groupingFields of type Fields 388 * @param valueField of type Fields 389 * @param sumField of type Fields 390 * @param sumType of type Class 391 */ 392 @ConstructorProperties({"name", "pipes", "groupingFields", "valueField", "sumField", "sumType"}) 393 public SumBy( String name, Pipe[] pipes, Fields groupingFields, Fields valueField, Fields sumField, Class sumType ) 394 { 395 this( name, pipes, groupingFields, valueField, sumField, sumType, USE_DEFAULT_THRESHOLD ); 396 } 397 398 /** 399 * Constructor SumBy creates a new SumBy instance. 400 * 401 * @param name of type String 402 * @param pipes of type Pipe[] 403 * @param groupingFields of type Fields 404 * @param valueField of type Fields 405 * @param sumField of type Fields 406 * @param sumType of type Class 407 * @param threshold of type int 408 */ 409 @ConstructorProperties({"name", "pipes", "groupingFields", "valueField", "sumField", "sumType", "threshold"}) 410 public SumBy( String name, Pipe[] pipes, Fields groupingFields, Fields valueField, Fields sumField, Class sumType, int threshold ) 411 { 412 super( name, pipes, groupingFields, valueField, new SumPartials( sumField, sumType ), new Sum( sumField, sumType ), threshold ); 413 } 414 }