001/* 002 * Copyright (c) 2016-2017 Chris K Wensel <chris@wensel.net>. All Rights Reserved. 003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 004 * 005 * Project and contact information: http://www.cascading.org/ 006 * 007 * This file is part of the Cascading project. 008 * 009 * Licensed under the Apache License, Version 2.0 (the "License"); 010 * you may not use this file except in compliance with the License. 011 * You may obtain a copy of the License at 012 * 013 * http://www.apache.org/licenses/LICENSE-2.0 014 * 015 * Unless required by applicable law or agreed to in writing, software 016 * distributed under the License is distributed on an "AS IS" BASIS, 017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 018 * See the License for the specific language governing permissions and 019 * limitations under the License. 020 */ 021 022package cascading.pipe.assembly; 023 024import java.beans.ConstructorProperties; 025import java.lang.reflect.Type; 026 027import cascading.flow.FlowProcess; 028import cascading.operation.Aggregator; 029import cascading.operation.AggregatorCall; 030import cascading.operation.BaseOperation; 031import cascading.operation.OperationCall; 032import cascading.pipe.Pipe; 033import cascading.tuple.Fields; 034import cascading.tuple.Tuple; 035import cascading.tuple.TupleEntry; 036import cascading.tuple.coerce.Coercions; 037import cascading.tuple.type.CoercibleType; 038 039/** 040 * Class AverageBy is used to average values associated with duplicate keys in a tuple stream. 041 * <p> 042 * Typically finding the average value in a tuple stream relies on a {@link cascading.pipe.GroupBy} and a {@link cascading.operation.aggregator.Average} 043 * {@link cascading.operation.Aggregator} operation. 044 * <p> 045 * If the given {@code averageFields} has an associated type, this type will be used to coerce the resulting average value, 046 * otherwise the result will be a {@link Double}. 047 * <p> 048 * If {@code include} is {@link Include#NO_NULLS}, {@code null} values will not be included in the average (converted to zero). 049 * By default (and for backwards compatibility) {@code null} values are included, {@link Include#ALL}. 050 * <p> 051 * This SubAssembly uses the {@link cascading.pipe.assembly.AverageBy.AveragePartials} {@link cascading.pipe.assembly.AggregateBy.Functor} 052 * and private {@link AverageFinal} Aggregator to count and sum as many field values before the GroupBy operator to reduce IO over the network. 053 * <p> 054 * This strategy is similar to using {@code combiners}, except no sorting or serialization is invoked and results 055 * in a much simpler mechanism. 056 * <p> 057 * The {@code threshold} value tells the underlying AveragePartials functions how many unique key sums and counts to accumulate 058 * in the LRU cache, before emitting the least recently used entry. This accumulation happens map-side, and thus is 059 * bounded by the size of your map task JVM and the typical size of each group key. 060 * <p> 061 * By default, either the value of {@link cascading.pipe.assembly.AggregateByProps#AGGREGATE_BY_CAPACITY} System property 062 * or {@link cascading.pipe.assembly.AggregateByProps#AGGREGATE_BY_DEFAULT_CAPACITY} will be used. 063 * 064 * @see cascading.pipe.assembly.AggregateBy 065 */ 066public class AverageBy extends AggregateBy 067 { 068 public enum Include 069 { 070 ALL, 071 NO_NULLS 072 } 073 074 /** 075 * Class AveragePartials is a {@link cascading.pipe.assembly.AggregateBy.Functor} that is used to count and sum observed duplicates from the tuple stream. 076 * 077 * @see cascading.pipe.assembly.AverageBy 078 */ 079 public static class AveragePartials implements Functor 080 { 081 private final Fields declaredFields; 082 private final Include include; 083 084 /** 085 * Constructor AveragePartials creates a new AveragePartials instance. 086 * 087 * @param declaredFields of type Fields 088 */ 089 public AveragePartials( Fields declaredFields ) 090 { 091 this.declaredFields = declaredFields; 092 this.include = Include.ALL; 093 } 094 095 public AveragePartials( Fields declaredFields, Include include ) 096 { 097 this.declaredFields = declaredFields; 098 this.include = include; 099 } 100 101 @Override 102 public Fields getDeclaredFields() 103 { 104 Fields sumName = new Fields( AverageBy.class.getPackage().getName() + "." + declaredFields.get( 0 ) + ".sum", Double.class ); 105 Fields countName = new Fields( AverageBy.class.getPackage().getName() + "." + declaredFields.get( 0 ) + ".count", Long.class ); 106 107 return sumName.append( countName ); 108 } 109 110 @Override 111 public Tuple aggregate( FlowProcess flowProcess, TupleEntry args, Tuple context ) 112 { 113 if( context == null ) 114 context = Tuple.size( 2 ); 115 116 if( include == Include.NO_NULLS && args.getObject( 0 ) == null ) 117 return context; 118 119 context.set( 0, context.getDouble( 0 ) + args.getDouble( 0 ) ); 120 context.set( 1, context.getLong( 1 ) + 1 ); 121 122 return context; 123 } 124 125 @Override 126 public Tuple complete( FlowProcess flowProcess, Tuple context ) 127 { 128 return context; 129 } 130 } 131 132 /** 133 * Class AverageFinal is used to finalize the average operation on the Reduce side of the process. It must be used 134 * in tandem with a {@link AveragePartials} Functor. 135 */ 136 public static class AverageFinal extends BaseOperation<AverageFinal.Context> implements Aggregator<AverageFinal.Context> 137 { 138 /** Class Context is used to hold intermediate values. */ 139 protected static class Context 140 { 141 long nulls = 0L; 142 double sum = 0.0D; 143 long count = 0L; 144 Type type = Double.class; 145 CoercibleType canonical; 146 147 Tuple tuple = Tuple.size( 1 ); 148 149 public Context( Fields fieldDeclaration ) 150 { 151 if( fieldDeclaration.hasTypes() ) 152 this.type = fieldDeclaration.getType( 0 ); 153 154 this.canonical = Coercions.coercibleTypeFor( this.type ); 155 } 156 157 public Context reset() 158 { 159 nulls = 0L; 160 sum = 0.0D; 161 count = 0L; 162 tuple.set( 0, null ); 163 164 return this; 165 } 166 167 public Tuple result() 168 { 169 // we only saw null from upstream, so return null 170 if( count == 0 && nulls != 0 ) 171 return tuple; 172 173 tuple.set( 0, canonical.canonical( sum / count ) ); 174 175 return tuple; 176 } 177 } 178 179 /** 180 * Constructs a new instance that returns the average of the values encountered in the given fieldDeclaration field name. 181 * 182 * @param fieldDeclaration of type Fields 183 */ 184 public AverageFinal( Fields fieldDeclaration ) 185 { 186 super( 2, makeFieldDeclaration( fieldDeclaration ) ); 187 188 if( !fieldDeclaration.isSubstitution() && fieldDeclaration.size() != 1 ) 189 throw new IllegalArgumentException( "fieldDeclaration may only declare 1 field, got: " + fieldDeclaration.size() ); 190 } 191 192 private static Fields makeFieldDeclaration( Fields fieldDeclaration ) 193 { 194 if( fieldDeclaration.hasTypes() ) 195 return fieldDeclaration; 196 197 return fieldDeclaration.applyTypes( Double.class ); 198 } 199 200 @Override 201 public void prepare( FlowProcess flowProcess, OperationCall<Context> operationCall ) 202 { 203 operationCall.setContext( new Context( getFieldDeclaration() ) ); 204 } 205 206 @Override 207 public void start( FlowProcess flowProcess, AggregatorCall<Context> aggregatorCall ) 208 { 209 aggregatorCall.getContext().reset(); 210 } 211 212 @Override 213 public void aggregate( FlowProcess flowProcess, AggregatorCall<Context> aggregatorCall ) 214 { 215 Context context = aggregatorCall.getContext(); 216 TupleEntry arguments = aggregatorCall.getArguments(); 217 218 if( arguments.getObject( 0 ) == null ) 219 { 220 context.nulls++; 221 return; 222 } 223 224 context.sum += arguments.getDouble( 0 ); 225 context.count += arguments.getLong( 1 ); 226 } 227 228 @Override 229 public void complete( FlowProcess flowProcess, AggregatorCall<Context> aggregatorCall ) 230 { 231 aggregatorCall.getOutputCollector().add( aggregatorCall.getContext().result() ); 232 } 233 } 234 235 ///////// 236 237 /** 238 * Constructor AverageBy creates a new AverageBy instance. Use this constructor when used with a {@link cascading.pipe.assembly.AggregateBy} 239 * instance. 240 * 241 * @param valueField of type Fields 242 * @param averageField of type Fields 243 */ 244 @ConstructorProperties({"valueField", "averageField"}) 245 public AverageBy( Fields valueField, Fields averageField ) 246 { 247 super( valueField, new AveragePartials( averageField ), new AverageFinal( averageField ) ); 248 } 249 250 /** 251 * Constructor AverageBy creates a new AverageBy instance. Use this constructor when used with a {@link cascading.pipe.assembly.AggregateBy} 252 * instance. 253 * 254 * @param valueField of type Fields 255 * @param averageField of type Fields 256 * @param include of type boolean 257 */ 258 @ConstructorProperties({"valueField", "averageField", "include"}) 259 public AverageBy( Fields valueField, Fields averageField, Include include ) 260 { 261 super( valueField, new AveragePartials( averageField, include ), new AverageFinal( averageField ) ); 262 } 263 264 ////////////// 265 266 /** 267 * Constructor AverageBy creates a new AverageBy instance. 268 * 269 * @param pipe of type Pipe 270 * @param groupingFields of type Fields 271 * @param valueField of type Fields 272 * @param averageField of type Fields 273 */ 274 @ConstructorProperties({"pipe", "groupingFields", "valueField", "averageField"}) 275 public AverageBy( Pipe pipe, Fields groupingFields, Fields valueField, Fields averageField ) 276 { 277 this( null, pipe, groupingFields, valueField, averageField, USE_DEFAULT_THRESHOLD ); 278 } 279 280 /** 281 * Constructor AverageBy creates a new AverageBy instance. 282 * 283 * @param pipe of type Pipe 284 * @param groupingFields of type Fields 285 * @param valueField of type Fields 286 * @param averageField of type Fields 287 * @param threshold of type int 288 */ 289 @ConstructorProperties({"pipe", "groupingFields", "valueField", "averageField", "threshold"}) 290 public AverageBy( Pipe pipe, Fields groupingFields, Fields valueField, Fields averageField, int threshold ) 291 { 292 this( null, pipe, groupingFields, valueField, averageField, threshold ); 293 } 294 295 /** 296 * Constructor AverageBy creates a new AverageBy instance. 297 * 298 * @param name of type String 299 * @param pipe of type Pipe 300 * @param groupingFields of type Fields 301 * @param valueField of type Fields 302 * @param averageField of type Fields 303 */ 304 @ConstructorProperties({"name", "pipe", "groupingFields", "valueField", "averageField"}) 305 public AverageBy( String name, Pipe pipe, Fields groupingFields, Fields valueField, Fields averageField ) 306 { 307 this( name, pipe, groupingFields, valueField, averageField, USE_DEFAULT_THRESHOLD ); 308 } 309 310 /** 311 * Constructor AverageBy creates a new AverageBy instance. 312 * 313 * @param name of type String 314 * @param pipe of type Pipe 315 * @param groupingFields of type Fields 316 * @param valueField of type Fields 317 * @param averageField of type Fields 318 * @param threshold of type int 319 */ 320 @ConstructorProperties({"name", "pipe", "groupingFields", "valueField", "averageField", "threshold"}) 321 public AverageBy( String name, Pipe pipe, Fields groupingFields, Fields valueField, Fields averageField, int threshold ) 322 { 323 this( name, Pipe.pipes( pipe ), groupingFields, valueField, averageField, threshold ); 324 } 325 326 /** 327 * Constructor AverageBy creates a new AverageBy instance. 328 * 329 * @param pipes of type Pipe[] 330 * @param groupingFields of type Fields 331 * @param valueField of type Fields 332 * @param averageField of type Fields 333 */ 334 @ConstructorProperties({"pipes", "groupingFields", "valueField", "averageField"}) 335 public AverageBy( Pipe[] pipes, Fields groupingFields, Fields valueField, Fields averageField ) 336 { 337 this( null, pipes, groupingFields, valueField, averageField, USE_DEFAULT_THRESHOLD ); 338 } 339 340 /** 341 * Constructor AverageBy creates a new AverageBy instance. 342 * 343 * @param pipes of type Pipe[] 344 * @param groupingFields of type Fields 345 * @param valueField of type Fields 346 * @param averageField of type Fields 347 * @param threshold of type int 348 */ 349 @ConstructorProperties({"pipes", "groupingFields", "valueField", "averageField", "threshold"}) 350 public AverageBy( Pipe[] pipes, Fields groupingFields, Fields valueField, Fields averageField, int threshold ) 351 { 352 this( null, pipes, groupingFields, valueField, averageField, threshold ); 353 } 354 355 /** 356 * Constructor AverageBy creates a new AverageBy instance. 357 * 358 * @param name of type String 359 * @param pipes of type Pipe[] 360 * @param groupingFields of type Fields 361 * @param valueField of type Fields 362 * @param averageField of type Fields 363 */ 364 @ConstructorProperties({"name", "pipes", "groupingFields", "valueField", "averageField"}) 365 public AverageBy( String name, Pipe[] pipes, Fields groupingFields, Fields valueField, Fields averageField ) 366 { 367 this( name, pipes, groupingFields, valueField, averageField, USE_DEFAULT_THRESHOLD ); 368 } 369 370 /** 371 * Constructor AverageBy creates a new AverageBy instance. 372 * 373 * @param name of type String 374 * @param pipes of type Pipe[] 375 * @param groupingFields of type Fields 376 * @param valueField of type Fields 377 * @param averageField of type Fields 378 * @param threshold of type int 379 */ 380 @ConstructorProperties({"name", "pipes", "groupingFields", "valueField", "averageField", "threshold"}) 381 public AverageBy( String name, Pipe[] pipes, Fields groupingFields, Fields valueField, Fields averageField, int threshold ) 382 { 383 super( name, pipes, groupingFields, valueField, new AveragePartials( averageField ), new AverageFinal( averageField ), threshold ); 384 } 385 386 /** 387 * Constructor AverageBy creates a new AverageBy instance. 388 * 389 * @param pipe of type Pipe 390 * @param groupingFields of type Fields 391 * @param valueField of type Fields 392 * @param averageField of type Fields 393 * @param include of type boolean 394 */ 395 @ConstructorProperties({"pipe", "groupingFields", "valueField", "averageField", "include"}) 396 public AverageBy( Pipe pipe, Fields groupingFields, Fields valueField, Fields averageField, Include include ) 397 { 398 this( null, pipe, groupingFields, valueField, averageField, include, USE_DEFAULT_THRESHOLD ); 399 } 400 401 /** 402 * Constructor AverageBy creates a new AverageBy instance. 403 * 404 * @param pipe of type Pipe 405 * @param groupingFields of type Fields 406 * @param valueField of type Fields 407 * @param averageField of type Fields 408 * @param include of type boolean 409 * @param threshold of type int 410 */ 411 @ConstructorProperties({"pipe", "groupingFields", "valueField", "averageField", "include", "threshold"}) 412 public AverageBy( Pipe pipe, Fields groupingFields, Fields valueField, Fields averageField, Include include, int threshold ) 413 { 414 this( null, pipe, groupingFields, valueField, averageField, include, threshold ); 415 } 416 417 /** 418 * Constructor AverageBy creates a new AverageBy instance. 419 * 420 * @param name of type String 421 * @param pipe of type Pipe 422 * @param groupingFields of type Fields 423 * @param valueField of type Fields 424 * @param averageField of type Fields 425 * @param include of type boolean 426 */ 427 @ConstructorProperties({"name", "pipe", "groupingFields", "valueField", "averageField", "include"}) 428 public AverageBy( String name, Pipe pipe, Fields groupingFields, Fields valueField, Fields averageField, Include include ) 429 { 430 this( name, pipe, groupingFields, valueField, averageField, include, USE_DEFAULT_THRESHOLD ); 431 } 432 433 /** 434 * Constructor AverageBy creates a new AverageBy instance. 435 * 436 * @param name of type String 437 * @param pipe of type Pipe 438 * @param groupingFields of type Fields 439 * @param valueField of type Fields 440 * @param averageField of type Fields 441 * @param include of type boolean 442 * @param threshold of type int 443 */ 444 @ConstructorProperties({"name", "pipe", "groupingFields", "valueField", "averageField", "include", "threshold"}) 445 public AverageBy( String name, Pipe pipe, Fields groupingFields, Fields valueField, Fields averageField, Include include, int threshold ) 446 { 447 this( name, Pipe.pipes( pipe ), groupingFields, valueField, averageField, include, threshold ); 448 } 449 450 /** 451 * Constructor AverageBy creates a new AverageBy instance. 452 * 453 * @param pipes of type Pipe[] 454 * @param groupingFields of type Fields 455 * @param valueField of type Fields 456 * @param averageField of type Fields 457 * @param include of type boolean 458 */ 459 @ConstructorProperties({"pipes", "groupingFields", "valueField", "averageField", "include"}) 460 public AverageBy( Pipe[] pipes, Fields groupingFields, Fields valueField, Fields averageField, Include include ) 461 { 462 this( null, pipes, groupingFields, valueField, averageField, include, USE_DEFAULT_THRESHOLD ); 463 } 464 465 /** 466 * Constructor AverageBy creates a new AverageBy instance. 467 * 468 * @param pipes of type Pipe[] 469 * @param groupingFields of type Fields 470 * @param valueField of type Fields 471 * @param averageField of type Fields 472 * @param include of type boolean 473 * @param threshold of type int 474 */ 475 @ConstructorProperties({"pipes", "groupingFields", "valueField", "averageField", "include", "threshold"}) 476 public AverageBy( Pipe[] pipes, Fields groupingFields, Fields valueField, Fields averageField, Include include, int threshold ) 477 { 478 this( null, pipes, groupingFields, valueField, averageField, include, threshold ); 479 } 480 481 /** 482 * Constructor AverageBy creates a new AverageBy instance. 483 * 484 * @param name of type String 485 * @param pipes of type Pipe[] 486 * @param groupingFields of type Fields 487 * @param valueField of type Fields 488 * @param averageField of type Fields 489 * @param include of type boolean 490 */ 491 @ConstructorProperties({"name", "pipes", "groupingFields", "valueField", "averageField", "include"}) 492 public AverageBy( String name, Pipe[] pipes, Fields groupingFields, Fields valueField, Fields averageField, Include include ) 493 { 494 this( name, pipes, groupingFields, valueField, averageField, include, USE_DEFAULT_THRESHOLD ); 495 } 496 497 /** 498 * Constructor AverageBy creates a new AverageBy instance. 499 * 500 * @param name of type String 501 * @param pipes of type Pipe[] 502 * @param groupingFields of type Fields 503 * @param valueField of type Fields 504 * @param averageField of type Fields 505 * @param include of type boolean 506 * @param threshold of type int 507 */ 508 @ConstructorProperties({"name", "pipes", "groupingFields", "valueField", "averageField", "include", "threshold"}) 509 public AverageBy( String name, Pipe[] pipes, Fields groupingFields, Fields valueField, Fields averageField, Include include, int threshold ) 510 { 511 super( name, pipes, groupingFields, valueField, new AveragePartials( averageField, include ), new AverageFinal( averageField ), threshold ); 512 } 513 }