001 /* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.pipe.assembly; 022 023 import java.beans.ConstructorProperties; 024 import java.lang.reflect.Type; 025 026 import cascading.flow.FlowProcess; 027 import cascading.operation.Aggregator; 028 import cascading.operation.AggregatorCall; 029 import cascading.operation.BaseOperation; 030 import cascading.operation.OperationCall; 031 import cascading.pipe.Pipe; 032 import cascading.tuple.Fields; 033 import cascading.tuple.Tuple; 034 import cascading.tuple.TupleEntry; 035 import cascading.tuple.coerce.Coercions; 036 import cascading.tuple.type.CoercibleType; 037 038 /** 039 * Class AverageBy is used to average values associated with duplicate keys in a tuple stream. 040 * <p/> 041 * Typically finding the average value in a tuple stream relies on a {@link cascading.pipe.GroupBy} and a {@link cascading.operation.aggregator.Average} 042 * {@link cascading.operation.Aggregator} operation. 043 * <p/> 044 * If the given {@code averageFields} has an associated type, this type will be used to coerce the resulting average value, 045 * otherwise the result will be a {@link Double}. 046 * <p/> 047 * If {@code include} is {@link Include#NO_NULLS}, {@code null} values will not be included in the average (converted to zero). 048 * By default (and for backwards compatibility) {@code null} values are included, {@link Include#ALL}. 049 * <p/> 050 * This SubAssembly uses the {@link cascading.pipe.assembly.AverageBy.AveragePartials} {@link cascading.pipe.assembly.AggregateBy.Functor} 051 * and private {@link AverageFinal} Aggregator to count and sum as many field values before the GroupBy operator to reduce IO over the network. 052 * <p/> 053 * This strategy is similar to using {@code combiners}, except no sorting or serialization is invoked and results 054 * in a much simpler mechanism. 055 * <p/> 056 * The {@code threshold} value tells the underlying AveragePartials functions how many unique key sums and counts to accumulate 057 * in the LRU cache, before emitting the least recently used entry. This accumulation happens map-side, and thus is 058 * bounded by the size of your map task JVM and the typical size of each group key. 059 * <p/> 060 * By default, either the value of {@link cascading.pipe.assembly.AggregateByProps#AGGREGATE_BY_CAPACITY} System property 061 * or {@link cascading.pipe.assembly.AggregateByProps#AGGREGATE_BY_DEFAULT_CAPACITY} will be used. 062 * 063 * @see cascading.pipe.assembly.AggregateBy 064 */ 065 public class AverageBy extends AggregateBy 066 { 067 /** DEFAULT_THRESHOLD */ 068 @Deprecated 069 public static final int DEFAULT_THRESHOLD = 10000; 070 071 public enum Include 072 { 073 ALL, 074 NO_NULLS 075 } 076 077 /** 078 * Class AveragePartials is a {@link cascading.pipe.assembly.AggregateBy.Functor} that is used to count and sum observed duplicates from the tuple stream. 079 * 080 * @see cascading.pipe.assembly.AverageBy 081 */ 082 public static class AveragePartials implements Functor 083 { 084 private final Fields declaredFields; 085 private final Include include; 086 087 /** 088 * Constructor AveragePartials creates a new AveragePartials instance. 089 * 090 * @param declaredFields of type Fields 091 */ 092 public AveragePartials( Fields declaredFields ) 093 { 094 this.declaredFields = declaredFields; 095 this.include = Include.ALL; 096 } 097 098 public AveragePartials( Fields declaredFields, Include include ) 099 { 100 this.declaredFields = declaredFields; 101 this.include = include; 102 } 103 104 @Override 105 public Fields getDeclaredFields() 106 { 107 Fields sumName = new Fields( AverageBy.class.getPackage().getName() + "." + declaredFields.get( 0 ) + ".sum", Double.class ); 108 Fields countName = new Fields( AverageBy.class.getPackage().getName() + "." + declaredFields.get( 0 ) + ".count", Long.class ); 109 110 return sumName.append( countName ); 111 } 112 113 @Override 114 public Tuple aggregate( FlowProcess flowProcess, TupleEntry args, Tuple context ) 115 { 116 if( context == null ) 117 context = Tuple.size( 2 ); 118 119 if( include == Include.NO_NULLS && args.getObject( 0 ) == null ) 120 return context; 121 122 context.set( 0, context.getDouble( 0 ) + args.getDouble( 0 ) ); 123 context.set( 1, context.getLong( 1 ) + 1 ); 124 125 return context; 126 } 127 128 @Override 129 public Tuple complete( FlowProcess flowProcess, Tuple context ) 130 { 131 return context; 132 } 133 } 134 135 /** 136 * Class AverageFinal is used to finalize the average operation on the Reduce side of the process. It must be used 137 * in tandem with a {@link AveragePartials} Functor. 138 */ 139 public static class AverageFinal extends BaseOperation<AverageFinal.Context> implements Aggregator<AverageFinal.Context> 140 { 141 /** Class Context is used to hold intermediate values. */ 142 protected static class Context 143 { 144 long nulls = 0L; 145 double sum = 0.0D; 146 long count = 0L; 147 Type type = Double.class; 148 CoercibleType canonical; 149 150 Tuple tuple = Tuple.size( 1 ); 151 152 public Context( Fields fieldDeclaration ) 153 { 154 if( fieldDeclaration.hasTypes() ) 155 this.type = fieldDeclaration.getType( 0 ); 156 157 this.canonical = Coercions.coercibleTypeFor( this.type ); 158 } 159 160 public Context reset() 161 { 162 nulls = 0L; 163 sum = 0.0D; 164 count = 0L; 165 tuple.set( 0, null ); 166 167 return this; 168 } 169 170 public Tuple result() 171 { 172 // we only saw null from upstream, so return null 173 if( count == 0 && nulls != 0 ) 174 return tuple; 175 176 tuple.set( 0, canonical.canonical( sum / count ) ); 177 178 return tuple; 179 } 180 } 181 182 /** 183 * Constructs a new instance that returns the average of the values encountered in the given fieldDeclaration field name. 184 * 185 * @param fieldDeclaration of type Fields 186 */ 187 public AverageFinal( Fields fieldDeclaration ) 188 { 189 super( 2, makeFieldDeclaration( fieldDeclaration ) ); 190 191 if( !fieldDeclaration.isSubstitution() && fieldDeclaration.size() != 1 ) 192 throw new IllegalArgumentException( "fieldDeclaration may only declare 1 field, got: " + fieldDeclaration.size() ); 193 } 194 195 private static Fields makeFieldDeclaration( Fields fieldDeclaration ) 196 { 197 if( fieldDeclaration.hasTypes() ) 198 return fieldDeclaration; 199 200 return fieldDeclaration.applyTypes( Double.class ); 201 } 202 203 @Override 204 public void prepare( FlowProcess flowProcess, OperationCall<Context> operationCall ) 205 { 206 operationCall.setContext( new Context( getFieldDeclaration() ) ); 207 } 208 209 @Override 210 public void start( FlowProcess flowProcess, AggregatorCall<Context> aggregatorCall ) 211 { 212 aggregatorCall.getContext().reset(); 213 } 214 215 @Override 216 public void aggregate( FlowProcess flowProcess, AggregatorCall<Context> aggregatorCall ) 217 { 218 Context context = aggregatorCall.getContext(); 219 TupleEntry arguments = aggregatorCall.getArguments(); 220 221 if( arguments.getObject( 0 ) == null ) 222 { 223 context.nulls++; 224 return; 225 } 226 227 context.sum += arguments.getDouble( 0 ); 228 context.count += arguments.getLong( 1 ); 229 } 230 231 @Override 232 public void complete( FlowProcess flowProcess, AggregatorCall<Context> aggregatorCall ) 233 { 234 aggregatorCall.getOutputCollector().add( aggregatorCall.getContext().result() ); 235 } 236 } 237 238 ///////// 239 240 /** 241 * Constructor AverageBy creates a new AverageBy instance. Use this constructor when used with a {@link cascading.pipe.assembly.AggregateBy} 242 * instance. 243 * 244 * @param valueField of type Fields 245 * @param averageField of type Fields 246 */ 247 @ConstructorProperties({"valueField", "averageField"}) 248 public AverageBy( Fields valueField, Fields averageField ) 249 { 250 super( valueField, new AveragePartials( averageField ), new AverageFinal( averageField ) ); 251 } 252 253 /** 254 * Constructor AverageBy creates a new AverageBy instance. Use this constructor when used with a {@link cascading.pipe.assembly.AggregateBy} 255 * instance. 256 * 257 * @param valueField of type Fields 258 * @param averageField of type Fields 259 * @param include of type boolean 260 */ 261 @ConstructorProperties({"valueField", "averageField", "include"}) 262 public AverageBy( Fields valueField, Fields averageField, Include include ) 263 { 264 super( valueField, new AveragePartials( averageField, include ), new AverageFinal( averageField ) ); 265 } 266 267 ////////////// 268 269 /** 270 * Constructor AverageBy creates a new AverageBy instance. 271 * 272 * @param pipe of type Pipe 273 * @param groupingFields of type Fields 274 * @param valueField of type Fields 275 * @param averageField of type Fields 276 */ 277 @ConstructorProperties({"pipe", "groupingFields", "valueField", "averageField"}) 278 public AverageBy( Pipe pipe, Fields groupingFields, Fields valueField, Fields averageField ) 279 { 280 this( null, pipe, groupingFields, valueField, averageField, USE_DEFAULT_THRESHOLD ); 281 } 282 283 /** 284 * Constructor AverageBy creates a new AverageBy instance. 285 * 286 * @param pipe of type Pipe 287 * @param groupingFields of type Fields 288 * @param valueField of type Fields 289 * @param averageField of type Fields 290 * @param threshold of type int 291 */ 292 @ConstructorProperties({"pipe", "groupingFields", "valueField", "averageField", "threshold"}) 293 public AverageBy( Pipe pipe, Fields groupingFields, Fields valueField, Fields averageField, int threshold ) 294 { 295 this( null, pipe, groupingFields, valueField, averageField, threshold ); 296 } 297 298 /** 299 * Constructor AverageBy creates a new AverageBy instance. 300 * 301 * @param name of type String 302 * @param pipe of type Pipe 303 * @param groupingFields of type Fields 304 * @param valueField of type Fields 305 * @param averageField of type Fields 306 */ 307 @ConstructorProperties({"name", "pipe", "groupingFields", "valueField", "averageField"}) 308 public AverageBy( String name, Pipe pipe, Fields groupingFields, Fields valueField, Fields averageField ) 309 { 310 this( name, pipe, groupingFields, valueField, averageField, USE_DEFAULT_THRESHOLD ); 311 } 312 313 /** 314 * Constructor AverageBy creates a new AverageBy instance. 315 * 316 * @param name of type String 317 * @param pipe of type Pipe 318 * @param groupingFields of type Fields 319 * @param valueField of type Fields 320 * @param averageField of type Fields 321 * @param threshold of type int 322 */ 323 @ConstructorProperties({"name", "pipe", "groupingFields", "valueField", "averageField", "threshold"}) 324 public AverageBy( String name, Pipe pipe, Fields groupingFields, Fields valueField, Fields averageField, int threshold ) 325 { 326 this( name, Pipe.pipes( pipe ), groupingFields, valueField, averageField, threshold ); 327 } 328 329 /** 330 * Constructor AverageBy creates a new AverageBy instance. 331 * 332 * @param pipes of type Pipe[] 333 * @param groupingFields of type Fields 334 * @param valueField of type Fields 335 * @param averageField of type Fields 336 */ 337 @ConstructorProperties({"pipes", "groupingFields", "valueField", "averageField"}) 338 public AverageBy( Pipe[] pipes, Fields groupingFields, Fields valueField, Fields averageField ) 339 { 340 this( null, pipes, groupingFields, valueField, averageField, USE_DEFAULT_THRESHOLD ); 341 } 342 343 /** 344 * Constructor AverageBy creates a new AverageBy instance. 345 * 346 * @param pipes of type Pipe[] 347 * @param groupingFields of type Fields 348 * @param valueField of type Fields 349 * @param averageField of type Fields 350 * @param threshold of type int 351 */ 352 @ConstructorProperties({"pipes", "groupingFields", "valueField", "averageField", "threshold"}) 353 public AverageBy( Pipe[] pipes, Fields groupingFields, Fields valueField, Fields averageField, int threshold ) 354 { 355 this( null, pipes, groupingFields, valueField, averageField, threshold ); 356 } 357 358 /** 359 * Constructor AverageBy creates a new AverageBy instance. 360 * 361 * @param name of type String 362 * @param pipes of type Pipe[] 363 * @param groupingFields of type Fields 364 * @param valueField of type Fields 365 * @param averageField of type Fields 366 */ 367 @ConstructorProperties({"name", "pipes", "groupingFields", "valueField", "averageField"}) 368 public AverageBy( String name, Pipe[] pipes, Fields groupingFields, Fields valueField, Fields averageField ) 369 { 370 this( name, pipes, groupingFields, valueField, averageField, USE_DEFAULT_THRESHOLD ); 371 } 372 373 /** 374 * Constructor AverageBy creates a new AverageBy instance. 375 * 376 * @param name of type String 377 * @param pipes of type Pipe[] 378 * @param groupingFields of type Fields 379 * @param valueField of type Fields 380 * @param averageField of type Fields 381 * @param threshold of type int 382 */ 383 @ConstructorProperties({"name", "pipes", "groupingFields", "valueField", "averageField", "threshold"}) 384 public AverageBy( String name, Pipe[] pipes, Fields groupingFields, Fields valueField, Fields averageField, int threshold ) 385 { 386 super( name, pipes, groupingFields, valueField, new AveragePartials( averageField ), new AverageFinal( averageField ), threshold ); 387 } 388 389 /** 390 * Constructor AverageBy creates a new AverageBy instance. 391 * 392 * @param pipe of type Pipe 393 * @param groupingFields of type Fields 394 * @param valueField of type Fields 395 * @param averageField of type Fields 396 * @param include of type boolean 397 */ 398 @ConstructorProperties({"pipe", "groupingFields", "valueField", "averageField", "include"}) 399 public AverageBy( Pipe pipe, Fields groupingFields, Fields valueField, Fields averageField, Include include ) 400 { 401 this( null, pipe, groupingFields, valueField, averageField, include, USE_DEFAULT_THRESHOLD ); 402 } 403 404 /** 405 * Constructor AverageBy creates a new AverageBy instance. 406 * 407 * @param pipe of type Pipe 408 * @param groupingFields of type Fields 409 * @param valueField of type Fields 410 * @param averageField of type Fields 411 * @param include of type boolean 412 * @param threshold of type int 413 */ 414 @ConstructorProperties({"pipe", "groupingFields", "valueField", "averageField", "include", "threshold"}) 415 public AverageBy( Pipe pipe, Fields groupingFields, Fields valueField, Fields averageField, Include include, int threshold ) 416 { 417 this( null, pipe, groupingFields, valueField, averageField, include, threshold ); 418 } 419 420 /** 421 * Constructor AverageBy creates a new AverageBy instance. 422 * 423 * @param name of type String 424 * @param pipe of type Pipe 425 * @param groupingFields of type Fields 426 * @param valueField of type Fields 427 * @param averageField of type Fields 428 * @param include of type boolean 429 */ 430 @ConstructorProperties({"name", "pipe", "groupingFields", "valueField", "averageField", "include"}) 431 public AverageBy( String name, Pipe pipe, Fields groupingFields, Fields valueField, Fields averageField, Include include ) 432 { 433 this( name, pipe, groupingFields, valueField, averageField, include, USE_DEFAULT_THRESHOLD ); 434 } 435 436 /** 437 * Constructor AverageBy creates a new AverageBy instance. 438 * 439 * @param name of type String 440 * @param pipe of type Pipe 441 * @param groupingFields of type Fields 442 * @param valueField of type Fields 443 * @param averageField of type Fields 444 * @param include of type boolean 445 * @param threshold of type int 446 */ 447 @ConstructorProperties({"name", "pipe", "groupingFields", "valueField", "averageField", "include", "threshold"}) 448 public AverageBy( String name, Pipe pipe, Fields groupingFields, Fields valueField, Fields averageField, Include include, int threshold ) 449 { 450 this( name, Pipe.pipes( pipe ), groupingFields, valueField, averageField, include, threshold ); 451 } 452 453 /** 454 * Constructor AverageBy creates a new AverageBy instance. 455 * 456 * @param pipes of type Pipe[] 457 * @param groupingFields of type Fields 458 * @param valueField of type Fields 459 * @param averageField of type Fields 460 * @param include of type boolean 461 */ 462 @ConstructorProperties({"pipes", "groupingFields", "valueField", "averageField", "include"}) 463 public AverageBy( Pipe[] pipes, Fields groupingFields, Fields valueField, Fields averageField, Include include ) 464 { 465 this( null, pipes, groupingFields, valueField, averageField, include, USE_DEFAULT_THRESHOLD ); 466 } 467 468 /** 469 * Constructor AverageBy creates a new AverageBy instance. 470 * 471 * @param pipes of type Pipe[] 472 * @param groupingFields of type Fields 473 * @param valueField of type Fields 474 * @param averageField of type Fields 475 * @param include of type boolean 476 * @param threshold of type int 477 */ 478 @ConstructorProperties({"pipes", "groupingFields", "valueField", "averageField", "include", "threshold"}) 479 public AverageBy( Pipe[] pipes, Fields groupingFields, Fields valueField, Fields averageField, Include include, int threshold ) 480 { 481 this( null, pipes, groupingFields, valueField, averageField, include, threshold ); 482 } 483 484 /** 485 * Constructor AverageBy creates a new AverageBy instance. 486 * 487 * @param name of type String 488 * @param pipes of type Pipe[] 489 * @param groupingFields of type Fields 490 * @param valueField of type Fields 491 * @param averageField of type Fields 492 * @param include of type boolean 493 */ 494 @ConstructorProperties({"name", "pipes", "groupingFields", "valueField", "averageField", "include"}) 495 public AverageBy( String name, Pipe[] pipes, Fields groupingFields, Fields valueField, Fields averageField, Include include ) 496 { 497 this( name, pipes, groupingFields, valueField, averageField, include, USE_DEFAULT_THRESHOLD ); 498 } 499 500 /** 501 * Constructor AverageBy creates a new AverageBy instance. 502 * 503 * @param name of type String 504 * @param pipes of type Pipe[] 505 * @param groupingFields of type Fields 506 * @param valueField of type Fields 507 * @param averageField of type Fields 508 * @param include of type boolean 509 * @param threshold of type int 510 */ 511 @ConstructorProperties({"name", "pipes", "groupingFields", "valueField", "averageField", "include", "threshold"}) 512 public AverageBy( String name, Pipe[] pipes, Fields groupingFields, Fields valueField, Fields averageField, Include include, int threshold ) 513 { 514 super( name, pipes, groupingFields, valueField, new AveragePartials( averageField, include ), new AverageFinal( averageField ), threshold ); 515 } 516 }