001 /* 002 * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.pipe.assembly; 022 023 import java.beans.ConstructorProperties; 024 import java.lang.reflect.Type; 025 026 import cascading.flow.FlowProcess; 027 import cascading.operation.Aggregator; 028 import cascading.operation.AggregatorCall; 029 import cascading.operation.BaseOperation; 030 import cascading.operation.OperationCall; 031 import cascading.pipe.Pipe; 032 import cascading.tuple.Fields; 033 import cascading.tuple.Tuple; 034 import cascading.tuple.TupleEntry; 035 import cascading.tuple.coerce.Coercions; 036 import cascading.tuple.type.CoercibleType; 037 038 /** 039 * Class AverageBy is used to average values associated with duplicate keys in a tuple stream. 040 * <p/> 041 * Typically finding the average value in a tuple stream relies on a {@link cascading.pipe.GroupBy} and a {@link cascading.operation.aggregator.Average} 042 * {@link cascading.operation.Aggregator} operation. 043 * <p/> 044 * If the given {@code averageFields} has an associated type, this type will be used to coerce the resulting average value, 045 * otherwise the result will be a {@link Double}. 046 * <p/> 047 * If {@code include} is {@link Include#NO_NULLS}, {@code null} values will not be included in the average (converted to zero). 048 * By default (and for backwards compatibility) {@code null} values are included, {@link Include#ALL}. 049 * <p/> 050 * This SubAssembly uses the {@link cascading.pipe.assembly.AverageBy.AveragePartials} {@link cascading.pipe.assembly.AggregateBy.Functor} 051 * and private {@link AverageFinal} Aggregator to count and sum as many field values before the GroupBy operator to reduce IO over the network. 052 * <p/> 053 * This strategy is similar to using {@code combiners}, except no sorting or serialization is invoked and results 054 * in a much simpler mechanism. 055 * <p/> 056 * The {@code threshold} value tells the underlying AveragePartials functions how many unique key sums and counts to accumulate 057 * in the LRU cache, before emitting the least recently used entry. 058 * <p/> 059 * By default, either the value of {@link #AGGREGATE_BY_THRESHOLD} System property or {@link AggregateBy#DEFAULT_THRESHOLD} 060 * will be used. 061 * 062 * @see cascading.pipe.assembly.AggregateBy 063 */ 064 public class AverageBy extends AggregateBy 065 { 066 /** DEFAULT_THRESHOLD */ 067 @Deprecated 068 public static final int DEFAULT_THRESHOLD = 10000; 069 070 public enum Include 071 { 072 ALL, 073 NO_NULLS 074 } 075 076 /** 077 * Class AveragePartials is a {@link cascading.pipe.assembly.AggregateBy.Functor} that is used to count and sum observed duplicates from the tuple stream. 078 * 079 * @see cascading.pipe.assembly.AverageBy 080 */ 081 public static class AveragePartials implements Functor 082 { 083 private final Fields declaredFields; 084 private final Include include; 085 086 /** 087 * Constructor AveragePartials creates a new AveragePartials instance. 088 * 089 * @param declaredFields of type Fields 090 */ 091 public AveragePartials( Fields declaredFields ) 092 { 093 this.declaredFields = declaredFields; 094 this.include = Include.ALL; 095 } 096 097 public AveragePartials( Fields declaredFields, Include include ) 098 { 099 this.declaredFields = declaredFields; 100 this.include = include; 101 } 102 103 @Override 104 public Fields getDeclaredFields() 105 { 106 Fields sumName = new Fields( AverageBy.class.getPackage().getName() + "." + declaredFields.get( 0 ) + ".sum", Double.class ); 107 Fields countName = new Fields( AverageBy.class.getPackage().getName() + "." + declaredFields.get( 0 ) + ".count", Long.class ); 108 109 return sumName.append( countName ); 110 } 111 112 @Override 113 public Tuple aggregate( FlowProcess flowProcess, TupleEntry args, Tuple context ) 114 { 115 if( context == null ) 116 context = Tuple.size( 2 ); 117 118 if( include == Include.NO_NULLS && args.getObject( 0 ) == null ) 119 return context; 120 121 context.set( 0, context.getDouble( 0 ) + args.getDouble( 0 ) ); 122 context.set( 1, context.getLong( 1 ) + 1 ); 123 124 return context; 125 } 126 127 @Override 128 public Tuple complete( FlowProcess flowProcess, Tuple context ) 129 { 130 return context; 131 } 132 } 133 134 /** 135 * Class AverageFinal is used to finalize the average operation on the Reduce side of the process. It must be used 136 * in tandem with a {@link AveragePartials} Functor. 137 */ 138 public static class AverageFinal extends BaseOperation<AverageFinal.Context> implements Aggregator<AverageFinal.Context> 139 { 140 /** Class Context is used to hold intermediate values. */ 141 protected static class Context 142 { 143 long nulls = 0L; 144 double sum = 0.0D; 145 long count = 0L; 146 Type type = Double.class; 147 CoercibleType canonical; 148 149 Tuple tuple = Tuple.size( 1 ); 150 151 public Context( Fields fieldDeclaration ) 152 { 153 if( fieldDeclaration.hasTypes() ) 154 this.type = fieldDeclaration.getType( 0 ); 155 156 this.canonical = Coercions.coercibleTypeFor( this.type ); 157 } 158 159 public Context reset() 160 { 161 nulls = 0L; 162 sum = 0.0D; 163 count = 0L; 164 tuple.set( 0, null ); 165 166 return this; 167 } 168 169 public Tuple result() 170 { 171 // we only saw null from upstream, so return null 172 if( count == 0 && nulls != 0 ) 173 return tuple; 174 175 tuple.set( 0, canonical.canonical( sum / count ) ); 176 177 return tuple; 178 } 179 } 180 181 /** 182 * Constructs a new instance that returns the average of the values encountered in the given fieldDeclaration field name. 183 * 184 * @param fieldDeclaration of type Fields 185 */ 186 public AverageFinal( Fields fieldDeclaration ) 187 { 188 super( 2, makeFieldDeclaration( fieldDeclaration ) ); 189 190 if( !fieldDeclaration.isSubstitution() && fieldDeclaration.size() != 1 ) 191 throw new IllegalArgumentException( "fieldDeclaration may only declare 1 field, got: " + fieldDeclaration.size() ); 192 } 193 194 private static Fields makeFieldDeclaration( Fields fieldDeclaration ) 195 { 196 if( fieldDeclaration.hasTypes() ) 197 return fieldDeclaration; 198 199 return fieldDeclaration.applyTypes( Double.class ); 200 } 201 202 @Override 203 public void prepare( FlowProcess flowProcess, OperationCall<Context> operationCall ) 204 { 205 operationCall.setContext( new Context( getFieldDeclaration() ) ); 206 } 207 208 @Override 209 public void start( FlowProcess flowProcess, AggregatorCall<Context> aggregatorCall ) 210 { 211 aggregatorCall.getContext().reset(); 212 } 213 214 @Override 215 public void aggregate( FlowProcess flowProcess, AggregatorCall<Context> aggregatorCall ) 216 { 217 Context context = aggregatorCall.getContext(); 218 TupleEntry arguments = aggregatorCall.getArguments(); 219 220 if( arguments.getObject( 0 ) == null ) 221 { 222 context.nulls++; 223 return; 224 } 225 226 context.sum += arguments.getDouble( 0 ); 227 context.count += arguments.getLong( 1 ); 228 } 229 230 @Override 231 public void complete( FlowProcess flowProcess, AggregatorCall<Context> aggregatorCall ) 232 { 233 aggregatorCall.getOutputCollector().add( aggregatorCall.getContext().result() ); 234 } 235 } 236 237 ///////// 238 239 /** 240 * Constructor AverageBy creates a new AverageBy instance. Use this constructor when used with a {@link cascading.pipe.assembly.AggregateBy} 241 * instance. 242 * 243 * @param valueField of type Fields 244 * @param averageField of type Fields 245 */ 246 @ConstructorProperties({"valueField", "averageField"}) 247 public AverageBy( Fields valueField, Fields averageField ) 248 { 249 super( valueField, new AveragePartials( averageField ), new AverageFinal( averageField ) ); 250 } 251 252 /** 253 * Constructor AverageBy creates a new AverageBy instance. Use this constructor when used with a {@link cascading.pipe.assembly.AggregateBy} 254 * instance. 255 * 256 * @param valueField of type Fields 257 * @param averageField of type Fields 258 * @param include of type boolean 259 */ 260 @ConstructorProperties({"valueField", "averageField", "include"}) 261 public AverageBy( Fields valueField, Fields averageField, Include include ) 262 { 263 super( valueField, new AveragePartials( averageField, include ), new AverageFinal( averageField ) ); 264 } 265 266 ////////////// 267 268 /** 269 * Constructor AverageBy creates a new AverageBy instance. 270 * 271 * @param pipe of type Pipe 272 * @param groupingFields of type Fields 273 * @param valueField of type Fields 274 * @param averageField of type Fields 275 */ 276 @ConstructorProperties({"pipe", "groupingFields", "valueField", "averageField"}) 277 public AverageBy( Pipe pipe, Fields groupingFields, Fields valueField, Fields averageField ) 278 { 279 this( null, pipe, groupingFields, valueField, averageField, USE_DEFAULT_THRESHOLD ); 280 } 281 282 /** 283 * Constructor AverageBy creates a new AverageBy instance. 284 * 285 * @param pipe of type Pipe 286 * @param groupingFields of type Fields 287 * @param valueField of type Fields 288 * @param averageField of type Fields 289 * @param threshold of type int 290 */ 291 @ConstructorProperties({"pipe", "groupingFields", "valueField", "averageField", "threshold"}) 292 public AverageBy( Pipe pipe, Fields groupingFields, Fields valueField, Fields averageField, int threshold ) 293 { 294 this( null, pipe, groupingFields, valueField, averageField, threshold ); 295 } 296 297 /** 298 * Constructor AverageBy creates a new AverageBy instance. 299 * 300 * @param name of type String 301 * @param pipe of type Pipe 302 * @param groupingFields of type Fields 303 * @param valueField of type Fields 304 * @param averageField of type Fields 305 */ 306 @ConstructorProperties({"name", "pipe", "groupingFields", "valueField", "averageField"}) 307 public AverageBy( String name, Pipe pipe, Fields groupingFields, Fields valueField, Fields averageField ) 308 { 309 this( name, pipe, groupingFields, valueField, averageField, USE_DEFAULT_THRESHOLD ); 310 } 311 312 /** 313 * Constructor AverageBy creates a new AverageBy instance. 314 * 315 * @param name of type String 316 * @param pipe of type Pipe 317 * @param groupingFields of type Fields 318 * @param valueField of type Fields 319 * @param averageField of type Fields 320 * @param threshold of type int 321 */ 322 @ConstructorProperties({"name", "pipe", "groupingFields", "valueField", "averageField", "threshold"}) 323 public AverageBy( String name, Pipe pipe, Fields groupingFields, Fields valueField, Fields averageField, int threshold ) 324 { 325 this( name, Pipe.pipes( pipe ), groupingFields, valueField, averageField, threshold ); 326 } 327 328 /** 329 * Constructor AverageBy creates a new AverageBy instance. 330 * 331 * @param pipes of type Pipe[] 332 * @param groupingFields of type Fields 333 * @param valueField of type Fields 334 * @param averageField of type Fields 335 */ 336 @ConstructorProperties({"pipes", "groupingFields", "valueField", "averageField"}) 337 public AverageBy( Pipe[] pipes, Fields groupingFields, Fields valueField, Fields averageField ) 338 { 339 this( null, pipes, groupingFields, valueField, averageField, USE_DEFAULT_THRESHOLD ); 340 } 341 342 /** 343 * Constructor AverageBy creates a new AverageBy instance. 344 * 345 * @param pipes of type Pipe[] 346 * @param groupingFields of type Fields 347 * @param valueField of type Fields 348 * @param averageField of type Fields 349 * @param threshold of type int 350 */ 351 @ConstructorProperties({"pipes", "groupingFields", "valueField", "averageField", "threshold"}) 352 public AverageBy( Pipe[] pipes, Fields groupingFields, Fields valueField, Fields averageField, int threshold ) 353 { 354 this( null, pipes, groupingFields, valueField, averageField, threshold ); 355 } 356 357 /** 358 * Constructor AverageBy creates a new AverageBy instance. 359 * 360 * @param name of type String 361 * @param pipes of type Pipe[] 362 * @param groupingFields of type Fields 363 * @param valueField of type Fields 364 * @param averageField of type Fields 365 */ 366 @ConstructorProperties({"name", "pipes", "groupingFields", "valueField", "averageField"}) 367 public AverageBy( String name, Pipe[] pipes, Fields groupingFields, Fields valueField, Fields averageField ) 368 { 369 this( name, pipes, groupingFields, valueField, averageField, USE_DEFAULT_THRESHOLD ); 370 } 371 372 /** 373 * Constructor AverageBy creates a new AverageBy instance. 374 * 375 * @param name of type String 376 * @param pipes of type Pipe[] 377 * @param groupingFields of type Fields 378 * @param valueField of type Fields 379 * @param averageField of type Fields 380 * @param threshold of type int 381 */ 382 @ConstructorProperties({"name", "pipes", "groupingFields", "valueField", "averageField", "threshold"}) 383 public AverageBy( String name, Pipe[] pipes, Fields groupingFields, Fields valueField, Fields averageField, int threshold ) 384 { 385 super( name, pipes, groupingFields, valueField, new AveragePartials( averageField ), new AverageFinal( averageField ), threshold ); 386 } 387 388 /** 389 * Constructor AverageBy creates a new AverageBy instance. 390 * 391 * @param pipe of type Pipe 392 * @param groupingFields of type Fields 393 * @param valueField of type Fields 394 * @param averageField of type Fields 395 * @param include of type boolean 396 */ 397 @ConstructorProperties({"pipe", "groupingFields", "valueField", "averageField", "include"}) 398 public AverageBy( Pipe pipe, Fields groupingFields, Fields valueField, Fields averageField, Include include ) 399 { 400 this( null, pipe, groupingFields, valueField, averageField, include, USE_DEFAULT_THRESHOLD ); 401 } 402 403 /** 404 * Constructor AverageBy creates a new AverageBy instance. 405 * 406 * @param pipe of type Pipe 407 * @param groupingFields of type Fields 408 * @param valueField of type Fields 409 * @param averageField of type Fields 410 * @param include of type boolean 411 * @param threshold of type int 412 */ 413 @ConstructorProperties({"pipe", "groupingFields", "valueField", "averageField", "include", "threshold"}) 414 public AverageBy( Pipe pipe, Fields groupingFields, Fields valueField, Fields averageField, Include include, int threshold ) 415 { 416 this( null, pipe, groupingFields, valueField, averageField, include, threshold ); 417 } 418 419 /** 420 * Constructor AverageBy creates a new AverageBy instance. 421 * 422 * @param name of type String 423 * @param pipe of type Pipe 424 * @param groupingFields of type Fields 425 * @param valueField of type Fields 426 * @param averageField of type Fields 427 * @param include of type boolean 428 */ 429 @ConstructorProperties({"name", "pipe", "groupingFields", "valueField", "averageField", "include"}) 430 public AverageBy( String name, Pipe pipe, Fields groupingFields, Fields valueField, Fields averageField, Include include ) 431 { 432 this( name, pipe, groupingFields, valueField, averageField, include, USE_DEFAULT_THRESHOLD ); 433 } 434 435 /** 436 * Constructor AverageBy creates a new AverageBy instance. 437 * 438 * @param name of type String 439 * @param pipe of type Pipe 440 * @param groupingFields of type Fields 441 * @param valueField of type Fields 442 * @param averageField of type Fields 443 * @param include of type boolean 444 * @param threshold of type int 445 */ 446 @ConstructorProperties({"name", "pipe", "groupingFields", "valueField", "averageField", "include", "threshold"}) 447 public AverageBy( String name, Pipe pipe, Fields groupingFields, Fields valueField, Fields averageField, Include include, int threshold ) 448 { 449 this( name, Pipe.pipes( pipe ), groupingFields, valueField, averageField, include, threshold ); 450 } 451 452 /** 453 * Constructor AverageBy creates a new AverageBy instance. 454 * 455 * @param pipes of type Pipe[] 456 * @param groupingFields of type Fields 457 * @param valueField of type Fields 458 * @param averageField of type Fields 459 * @param include of type boolean 460 */ 461 @ConstructorProperties({"pipes", "groupingFields", "valueField", "averageField", "include"}) 462 public AverageBy( Pipe[] pipes, Fields groupingFields, Fields valueField, Fields averageField, Include include ) 463 { 464 this( null, pipes, groupingFields, valueField, averageField, include, USE_DEFAULT_THRESHOLD ); 465 } 466 467 /** 468 * Constructor AverageBy creates a new AverageBy instance. 469 * 470 * @param pipes of type Pipe[] 471 * @param groupingFields of type Fields 472 * @param valueField of type Fields 473 * @param averageField of type Fields 474 * @param include of type boolean 475 * @param threshold of type int 476 */ 477 @ConstructorProperties({"pipes", "groupingFields", "valueField", "averageField", "include", "threshold"}) 478 public AverageBy( Pipe[] pipes, Fields groupingFields, Fields valueField, Fields averageField, Include include, int threshold ) 479 { 480 this( null, pipes, groupingFields, valueField, averageField, include, threshold ); 481 } 482 483 /** 484 * Constructor AverageBy creates a new AverageBy instance. 485 * 486 * @param name of type String 487 * @param pipes of type Pipe[] 488 * @param groupingFields of type Fields 489 * @param valueField of type Fields 490 * @param averageField of type Fields 491 * @param include of type boolean 492 */ 493 @ConstructorProperties({"name", "pipes", "groupingFields", "valueField", "averageField", "include"}) 494 public AverageBy( String name, Pipe[] pipes, Fields groupingFields, Fields valueField, Fields averageField, Include include ) 495 { 496 this( name, pipes, groupingFields, valueField, averageField, include, USE_DEFAULT_THRESHOLD ); 497 } 498 499 /** 500 * Constructor AverageBy creates a new AverageBy instance. 501 * 502 * @param name of type String 503 * @param pipes of type Pipe[] 504 * @param groupingFields of type Fields 505 * @param valueField of type Fields 506 * @param averageField of type Fields 507 * @param include of type boolean 508 * @param threshold of type int 509 */ 510 @ConstructorProperties({"name", "pipes", "groupingFields", "valueField", "averageField", "include", "threshold"}) 511 public AverageBy( String name, Pipe[] pipes, Fields groupingFields, Fields valueField, Fields averageField, Include include, int threshold ) 512 { 513 super( name, pipes, groupingFields, valueField, new AveragePartials( averageField, include ), new AverageFinal( averageField ), threshold ); 514 } 515 }