001 /* 002 * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.pipe.assembly; 022 023 import java.beans.ConstructorProperties; 024 import java.io.Serializable; 025 import java.util.ArrayList; 026 import java.util.Arrays; 027 import java.util.Collections; 028 import java.util.LinkedHashMap; 029 import java.util.List; 030 import java.util.Map; 031 032 import cascading.flow.FlowProcess; 033 import cascading.management.annotation.Property; 034 import cascading.management.annotation.PropertyConfigured; 035 import cascading.management.annotation.PropertyDescription; 036 import cascading.management.annotation.Visibility; 037 import cascading.operation.Aggregator; 038 import cascading.operation.BaseOperation; 039 import cascading.operation.Function; 040 import cascading.operation.FunctionCall; 041 import cascading.operation.OperationCall; 042 import cascading.pipe.Each; 043 import cascading.pipe.Every; 044 import cascading.pipe.GroupBy; 045 import cascading.pipe.Pipe; 046 import cascading.pipe.SubAssembly; 047 import cascading.tuple.Fields; 048 import cascading.tuple.Tuple; 049 import cascading.tuple.TupleEntry; 050 import cascading.tuple.TupleEntryCollector; 051 import cascading.tuple.util.TupleViews; 052 import org.slf4j.Logger; 053 import org.slf4j.LoggerFactory; 054 055 /** 056 * Class AggregateBy is a {@link SubAssembly} that serves two roles for handling aggregate operations. 057 * <p/> 058 * The first role is as a base class for composable aggregate operations that have a MapReduce Map side optimization for the 059 * Reduce side aggregation. For example 'summing' a value within a grouping can be performed partially Map side and 060 * completed Reduce side. Summing is associative and commutative. 061 * <p/> 062 * AggregateBy also supports operations that are not associative/commutative like 'counting'. Counting 063 * would result in 'counting' value occurrences Map side but summing those counts Reduce side. (Yes, counting can be 064 * transposed to summing Map and Reduce sides by emitting 1's before the first sum, but that's three operations over 065 * two, and a hack) 066 * <p/> 067 * Think of this mechanism as a MapReduce Combiner, but more efficient as no values are serialized, 068 * deserialized, saved to disk, and multi-pass sorted in the process, which consume cpu resources in trade of 069 * memory and a little or no IO. 070 * <p/> 071 * Further, Combiners are limited to only associative/commutative operations. 072 * <p/> 073 * Additionally the Cascading planner can move the Map side optimization 074 * to the previous Reduce operation further increasing IO performance (between the preceding Reduce and Map phase which 075 * is over HDFS). 076 * <p/> 077 * The second role of the AggregateBy class is to allow for composition of AggregateBy 078 * sub-classes. That is, {@link SumBy} and {@link CountBy} AggregateBy sub-classes can be performed 079 * in parallel on the same grouping keys. 080 * </p> 081 * Custom AggregateBy classes can be created by sub-classing this class and implementing a special 082 * {@link Functor} for use on the Map side. Multiple Functor instances are managed by the {@link CompositeFunction} 083 * class allowing them all to share the same LRU value map for more efficiency. 084 * <p/> 085 * AggregateBy instances return {@code argumentFields} which are used internally to control the values passed to 086 * internal Functor instances. If any argumentFields also have {@link java.util.Comparator}s, they will be used 087 * to for secondary sorting (see {@link GroupBy} {@code sortFields}. This feature is used by {@link FirstBy} to 088 * control which Tuple is seen first for a grouping. 089 * <p/> 090 * <p/> 091 * To tune the LRU, set the {@code threshold} value to a high enough value to utilize available memory. Or set a 092 * default value via the {@link #AGGREGATE_BY_THRESHOLD} property. The current default ({@link CompositeFunction#DEFAULT_THRESHOLD}) 093 * is {@code 10, 000} unique keys. Note "flushes" from the LRU will be logged in threshold increments along with memory 094 * information. 095 * <p/> 096 * Note using a AggregateBy instance automatically inserts a {@link GroupBy} into the resulting {@link cascading.flow.Flow}. 097 * And passing multiple AggregateBy instances to a parent AggregateBy instance still results in one GroupBy. 098 * <p/> 099 * Also note that {@link Unique} is not a CompositeAggregator and is slightly more optimized internally. 100 * <p/> 101 * Keep in mind the {@link cascading.tuple.Hasher} interface is not honored here (for storing keys in the cache). Thus 102 * arrays of primitives and object, like {@code byte[]} will not be properly stored. This is a known issue and will 103 * be resolved in a future release. 104 * 105 * @see SumBy 106 * @see CountBy 107 * @see Unique 108 */ 109 public class AggregateBy extends SubAssembly 110 { 111 private static final Logger LOG = LoggerFactory.getLogger( AggregateBy.class ); 112 113 public static final int USE_DEFAULT_THRESHOLD = 0; 114 public static final int DEFAULT_THRESHOLD = CompositeFunction.DEFAULT_THRESHOLD; 115 public static final String AGGREGATE_BY_THRESHOLD = "cascading.aggregateby.threshold"; 116 117 private String name; 118 private int threshold; 119 private Fields groupingFields; 120 private Fields[] argumentFields; 121 private Functor[] functors; 122 private Aggregator[] aggregators; 123 private transient GroupBy groupBy; 124 125 public enum Flush 126 { 127 Num_Keys_Flushed 128 } 129 130 /** 131 * Interface Functor provides a means to create a simple function for use with the {@link CompositeFunction} class. 132 * <p/> 133 * Note the {@link FlowProcess} argument provides access to the underlying properties and counter APIs. 134 */ 135 public interface Functor extends Serializable 136 { 137 /** 138 * Method getDeclaredFields returns the declaredFields of this Functor object. 139 * 140 * @return the declaredFields (type Fields) of this Functor object. 141 */ 142 Fields getDeclaredFields(); 143 144 /** 145 * Method aggregate operates on the given args in tandem (optionally) with the given context values. 146 * <p/> 147 * The context argument is the result of the previous call to this method. Use it to store values between aggregate 148 * calls (the current count, or sum of the args). 149 * <p/> 150 * On the very first invocation of aggregate for a given grouping key, context will be {@code null}. All subsequent 151 * invocations context will be the value returned on the previous invocation. 152 * 153 * @param flowProcess of type FlowProcess 154 * @param args of type TupleEntry 155 * @param context of type Tuple @return Tuple 156 */ 157 Tuple aggregate( FlowProcess flowProcess, TupleEntry args, Tuple context ); 158 159 /** 160 * Method complete allows the final aggregate computation to be performed before the return value is collected. 161 * <p/> 162 * The number of values in the returned {@link Tuple} instance must match the number of declaredFields. 163 * <p/> 164 * It is safe to return the context object as the result value. 165 * 166 * @param flowProcess of type FlowProcess 167 * @param context of type Tuple @return Tuple 168 */ 169 Tuple complete( FlowProcess flowProcess, Tuple context ); 170 } 171 172 /** 173 * Class CompositeFunction takes multiple Functor instances and manages them as a single {@link Function}. 174 * 175 * @see Functor 176 */ 177 public static class CompositeFunction extends BaseOperation<CompositeFunction.Context> implements Function<CompositeFunction.Context> 178 { 179 public static final int DEFAULT_THRESHOLD = 10000; 180 181 private int threshold = 0; 182 private final Fields groupingFields; 183 private final Fields[] argumentFields; 184 private final Fields[] functorFields; 185 private final Functor[] functors; 186 187 public static class Context 188 { 189 LinkedHashMap<Tuple, Tuple[]> lru; 190 TupleEntry[] arguments; 191 Tuple result; 192 } 193 194 /** 195 * Constructor CompositeFunction creates a new CompositeFunction instance. 196 * 197 * @param groupingFields of type Fields 198 * @param argumentFields of type Fields 199 * @param functor of type Functor 200 * @param threshold of type int 201 */ 202 public CompositeFunction( Fields groupingFields, Fields argumentFields, Functor functor, int threshold ) 203 { 204 this( groupingFields, Fields.fields( argumentFields ), new Functor[]{functor}, threshold ); 205 } 206 207 /** 208 * Constructor CompositeFunction creates a new CompositeFunction instance. 209 * 210 * @param groupingFields of type Fields 211 * @param argumentFields of type Fields[] 212 * @param functors of type Functor[] 213 * @param threshold of type int 214 */ 215 public CompositeFunction( Fields groupingFields, Fields[] argumentFields, Functor[] functors, int threshold ) 216 { 217 super( getFields( groupingFields, functors ) ); // todo: groupingFields should lookup incoming type information 218 this.groupingFields = groupingFields; 219 this.argumentFields = argumentFields; 220 this.functors = functors; 221 this.threshold = threshold; 222 223 this.functorFields = new Fields[ functors.length ]; 224 225 for( int i = 0; i < functors.length; i++ ) 226 this.functorFields[ i ] = functors[ i ].getDeclaredFields(); 227 } 228 229 private static Fields getFields( Fields groupingFields, Functor[] functors ) 230 { 231 Fields fields = groupingFields; 232 233 for( Functor functor : functors ) 234 fields = fields.append( functor.getDeclaredFields() ); 235 236 return fields; 237 } 238 239 @Override 240 public void prepare( final FlowProcess flowProcess, final OperationCall<CompositeFunction.Context> operationCall ) 241 { 242 if( threshold == 0 ) 243 { 244 Integer value = flowProcess.getIntegerProperty( AGGREGATE_BY_THRESHOLD ); 245 246 if( value != null && value > 0 ) 247 threshold = value; 248 else 249 threshold = DEFAULT_THRESHOLD; 250 } 251 252 LOG.info( "using threshold value: {}", threshold ); 253 254 Fields[] fields = new Fields[ functors.length + 1 ]; 255 256 fields[ 0 ] = groupingFields; 257 258 for( int i = 0; i < functors.length; i++ ) 259 fields[ i + 1 ] = functors[ i ].getDeclaredFields(); 260 261 final Context context = new Context(); 262 263 context.arguments = new TupleEntry[ functors.length ]; 264 265 for( int i = 0; i < context.arguments.length; i++ ) 266 { 267 Fields resolvedArgumentFields = operationCall.getArgumentFields(); 268 269 int[] pos; 270 271 if( argumentFields[ i ].isAll() ) 272 pos = resolvedArgumentFields.getPos(); 273 else 274 pos = resolvedArgumentFields.getPos( argumentFields[ i ] ); // returns null if selector is ALL 275 276 Tuple narrow = TupleViews.createNarrow( pos ); 277 278 Fields currentFields; 279 280 if( this.argumentFields[ i ].isSubstitution() ) 281 currentFields = resolvedArgumentFields.select( this.argumentFields[ i ] ); // attempt to retain comparator 282 else 283 currentFields = Fields.asDeclaration( this.argumentFields[ i ] ); 284 285 context.arguments[ i ] = new TupleEntry( currentFields, narrow ); 286 } 287 288 context.result = TupleViews.createComposite( fields ); 289 290 context.lru = new LinkedHashMap<Tuple, Tuple[]>( threshold, 0.75f, true ) 291 { 292 long flushes = 0; 293 294 @Override 295 protected boolean removeEldestEntry( Map.Entry<Tuple, Tuple[]> eldest ) 296 { 297 boolean doRemove = size() > threshold; 298 299 if( doRemove ) 300 { 301 completeFunctors( flowProcess, ( (FunctionCall) operationCall ).getOutputCollector(), context.result, eldest ); 302 flowProcess.increment( Flush.Num_Keys_Flushed, 1 ); 303 304 if( flushes % threshold == 0 ) // every multiple, write out data 305 { 306 Runtime runtime = Runtime.getRuntime(); 307 long freeMem = runtime.freeMemory() / 1024 / 1024; 308 long maxMem = runtime.maxMemory() / 1024 / 1024; 309 long totalMem = runtime.totalMemory() / 1024 / 1024; 310 311 LOG.info( "flushed keys num times: {}, with threshold: {}", flushes + 1, threshold ); 312 LOG.info( "mem on flush (mb), free: " + freeMem + ", total: " + totalMem + ", max: " + maxMem ); 313 314 float percent = (float) totalMem / (float) maxMem; 315 316 if( percent < 0.80F ) 317 LOG.info( "total mem is {}% of max mem, to better utilize unused memory consider increasing current LRU threshold with system property \"{}\"", (int) ( percent * 100.0F ), AGGREGATE_BY_THRESHOLD ); 318 } 319 320 flushes++; 321 } 322 323 return doRemove; 324 } 325 }; 326 327 operationCall.setContext( context ); 328 } 329 330 @Override 331 public void operate( FlowProcess flowProcess, FunctionCall<CompositeFunction.Context> functionCall ) 332 { 333 TupleEntry arguments = functionCall.getArguments(); 334 Tuple key = arguments.selectTupleCopy( groupingFields ); 335 336 Context context = functionCall.getContext(); 337 Tuple[] functorContext = context.lru.get( key ); 338 339 if( functorContext == null ) 340 { 341 functorContext = new Tuple[ functors.length ]; 342 context.lru.put( key, functorContext ); 343 } 344 345 for( int i = 0; i < functors.length; i++ ) 346 { 347 TupleViews.reset( context.arguments[ i ].getTuple(), arguments.getTuple() ); 348 functorContext[ i ] = functors[ i ].aggregate( flowProcess, context.arguments[ i ], functorContext[ i ] ); 349 } 350 } 351 352 @Override 353 public void flush( FlowProcess flowProcess, OperationCall<CompositeFunction.Context> operationCall ) 354 { 355 // need to drain context 356 TupleEntryCollector collector = ( (FunctionCall) operationCall ).getOutputCollector(); 357 358 Tuple result = operationCall.getContext().result; 359 LinkedHashMap<Tuple, Tuple[]> context = operationCall.getContext().lru; 360 361 for( Map.Entry<Tuple, Tuple[]> entry : context.entrySet() ) 362 completeFunctors( flowProcess, collector, result, entry ); 363 364 operationCall.setContext( null ); 365 } 366 367 private void completeFunctors( FlowProcess flowProcess, TupleEntryCollector outputCollector, Tuple result, Map.Entry<Tuple, Tuple[]> entry ) 368 { 369 Tuple[] results = new Tuple[ functors.length + 1 ]; 370 371 results[ 0 ] = entry.getKey(); 372 373 Tuple[] values = entry.getValue(); 374 375 for( int i = 0; i < functors.length; i++ ) 376 results[ i + 1 ] = functors[ i ].complete( flowProcess, values[ i ] ); 377 378 TupleViews.reset( result, results ); 379 380 outputCollector.add( result ); 381 } 382 383 @Override 384 public boolean equals( Object object ) 385 { 386 if( this == object ) 387 return true; 388 if( !( object instanceof CompositeFunction ) ) 389 return false; 390 if( !super.equals( object ) ) 391 return false; 392 393 CompositeFunction that = (CompositeFunction) object; 394 395 if( !Arrays.equals( argumentFields, that.argumentFields ) ) 396 return false; 397 if( !Arrays.equals( functorFields, that.functorFields ) ) 398 return false; 399 if( !Arrays.equals( functors, that.functors ) ) 400 return false; 401 if( groupingFields != null ? !groupingFields.equals( that.groupingFields ) : that.groupingFields != null ) 402 return false; 403 404 return true; 405 } 406 407 @Override 408 public int hashCode() 409 { 410 int result = super.hashCode(); 411 result = 31 * result + ( groupingFields != null ? groupingFields.hashCode() : 0 ); 412 result = 31 * result + ( argumentFields != null ? Arrays.hashCode( argumentFields ) : 0 ); 413 result = 31 * result + ( functorFields != null ? Arrays.hashCode( functorFields ) : 0 ); 414 result = 31 * result + ( functors != null ? Arrays.hashCode( functors ) : 0 ); 415 return result; 416 } 417 } 418 419 /** 420 * Constructor CompositeAggregator creates a new CompositeAggregator instance. 421 * 422 * @param name of type String 423 * @param threshold of type int 424 */ 425 protected AggregateBy( String name, int threshold ) 426 { 427 this.name = name; 428 this.threshold = threshold; 429 } 430 431 /** 432 * Constructor CompositeAggregator creates a new CompositeAggregator instance. 433 * 434 * @param argumentFields of type Fields 435 * @param functor of type Functor 436 * @param aggregator of type Aggregator 437 */ 438 protected AggregateBy( Fields argumentFields, Functor functor, Aggregator aggregator ) 439 { 440 this.argumentFields = Fields.fields( argumentFields ); 441 this.functors = new Functor[]{functor}; 442 this.aggregators = new Aggregator[]{aggregator}; 443 } 444 445 /** 446 * Constructor CompositeAggregator creates a new CompositeAggregator instance. 447 * 448 * @param pipe of type Pipe 449 * @param groupingFields of type Fields 450 * @param assemblies of type CompositeAggregator... 451 */ 452 @ConstructorProperties({"pipe", "groupingFields", "assemblies"}) 453 public AggregateBy( Pipe pipe, Fields groupingFields, AggregateBy... assemblies ) 454 { 455 this( null, Pipe.pipes( pipe ), groupingFields, 0, assemblies ); 456 } 457 458 /** 459 * Constructor CompositeAggregator creates a new CompositeAggregator instance. 460 * 461 * @param pipe of type Pipe 462 * @param groupingFields of type Fields 463 * @param threshold of type int 464 * @param assemblies of type CompositeAggregator... 465 */ 466 @ConstructorProperties({"pipe", "groupingFields", "threshold", "assemblies"}) 467 public AggregateBy( Pipe pipe, Fields groupingFields, int threshold, AggregateBy... assemblies ) 468 { 469 this( null, Pipe.pipes( pipe ), groupingFields, threshold, assemblies ); 470 } 471 472 /** 473 * Constructor CompositeAggregator creates a new CompositeAggregator instance. 474 * 475 * @param pipe of type Pipe 476 * @param groupingFields of type Fields 477 * @param threshold of type int 478 * @param assemblies of type CompositeAggregator... 479 */ 480 @ConstructorProperties({"name", "pipe", "groupingFields", "threshold", "assemblies"}) 481 public AggregateBy( String name, Pipe pipe, Fields groupingFields, int threshold, AggregateBy... assemblies ) 482 { 483 this( name, Pipe.pipes( pipe ), groupingFields, threshold, assemblies ); 484 } 485 486 /** 487 * Constructor CompositeAggregator creates a new CompositeAggregator instance. 488 * 489 * @param name of type String 490 * @param pipes of type Pipe[] 491 * @param groupingFields of type Fields 492 * @param assemblies of type CompositeAggregator... 493 */ 494 @ConstructorProperties({"name", "pipes", "groupingFields", "assemblies"}) 495 public AggregateBy( String name, Pipe[] pipes, Fields groupingFields, AggregateBy... assemblies ) 496 { 497 this( name, pipes, groupingFields, 0, assemblies ); 498 } 499 500 /** 501 * Constructor CompositeAggregator creates a new CompositeAggregator instance. 502 * 503 * @param name of type String 504 * @param pipes of type Pipe[] 505 * @param groupingFields of type Fields 506 * @param threshold of type int 507 * @param assemblies of type CompositeAggregator... 508 */ 509 @ConstructorProperties({"name", "pipes", "groupingFields", "threshold", "assemblies"}) 510 public AggregateBy( String name, Pipe[] pipes, Fields groupingFields, int threshold, AggregateBy... assemblies ) 511 { 512 this( name, threshold ); 513 514 List<Fields> arguments = new ArrayList<Fields>(); 515 List<Functor> functors = new ArrayList<Functor>(); 516 List<Aggregator> aggregators = new ArrayList<Aggregator>(); 517 518 for( int i = 0; i < assemblies.length; i++ ) 519 { 520 AggregateBy assembly = assemblies[ i ]; 521 522 Collections.addAll( arguments, assembly.getArgumentFields() ); 523 Collections.addAll( functors, assembly.getFunctors() ); 524 Collections.addAll( aggregators, assembly.getAggregators() ); 525 } 526 527 initialize( groupingFields, pipes, arguments.toArray( new Fields[ arguments.size() ] ), functors.toArray( new Functor[ functors.size() ] ), aggregators.toArray( new Aggregator[ aggregators.size() ] ) ); 528 } 529 530 protected AggregateBy( String name, Pipe[] pipes, Fields groupingFields, Fields argumentFields, Functor functor, Aggregator aggregator, int threshold ) 531 { 532 this( name, threshold ); 533 initialize( groupingFields, pipes, argumentFields, functor, aggregator ); 534 } 535 536 protected void initialize( Fields groupingFields, Pipe[] pipes, Fields argumentFields, Functor functor, Aggregator aggregator ) 537 { 538 initialize( groupingFields, pipes, Fields.fields( argumentFields ), 539 new Functor[]{functor}, 540 new Aggregator[]{aggregator} ); 541 } 542 543 protected void initialize( Fields groupingFields, Pipe[] pipes, Fields[] argumentFields, Functor[] functors, Aggregator[] aggregators ) 544 { 545 setPrevious( pipes ); 546 547 this.groupingFields = groupingFields; 548 this.argumentFields = argumentFields; 549 this.functors = functors; 550 this.aggregators = aggregators; 551 552 verify(); 553 554 Fields sortFields = Fields.copyComparators( Fields.merge( this.argumentFields ), this.argumentFields ); 555 Fields argumentSelector = Fields.merge( this.groupingFields, sortFields ); 556 557 if( argumentSelector.equals( Fields.NONE ) ) 558 argumentSelector = Fields.ALL; 559 560 Pipe[] functions = new Pipe[ pipes.length ]; 561 562 CompositeFunction function = new CompositeFunction( this.groupingFields, this.argumentFields, this.functors, threshold ); 563 564 for( int i = 0; i < functions.length; i++ ) 565 functions[ i ] = new Each( pipes[ i ], argumentSelector, function, Fields.RESULTS ); 566 567 groupBy = new GroupBy( name, functions, this.groupingFields, sortFields.hasComparators() ? sortFields : null ); 568 569 Pipe pipe = groupBy; 570 571 for( int i = 0; i < aggregators.length; i++ ) 572 pipe = new Every( pipe, this.functors[ i ].getDeclaredFields(), this.aggregators[ i ], Fields.ALL ); 573 574 setTails( pipe ); 575 } 576 577 /** Method verify should be overridden by sub-classes if any values must be tested before the calling constructor returns. */ 578 protected void verify() 579 { 580 581 } 582 583 /** 584 * Method getGroupingFields returns the Fields this instances will be grouping against. 585 * 586 * @return the current grouping fields 587 */ 588 public Fields getGroupingFields() 589 { 590 return groupingFields; 591 } 592 593 /** 594 * Method getFieldDeclarations returns an array of Fields where each Field element in the array corresponds to the 595 * field declaration of the given Aggregator operations. 596 * <p/> 597 * Note the actual Fields values are returned, not planner resolved Fields. 598 * 599 * @return and array of Fields 600 */ 601 public Fields[] getFieldDeclarations() 602 { 603 Fields[] fields = new Fields[ this.aggregators.length ]; 604 605 for( int i = 0; i < aggregators.length; i++ ) 606 fields[ i ] = aggregators[ i ].getFieldDeclaration(); 607 608 return fields; 609 } 610 611 protected Fields[] getArgumentFields() 612 { 613 return argumentFields; 614 } 615 616 protected Functor[] getFunctors() 617 { 618 return functors; 619 } 620 621 protected Aggregator[] getAggregators() 622 { 623 return aggregators; 624 } 625 626 /** 627 * Method getGroupBy returns the internal {@link GroupBy} instance so that any custom properties 628 * can be set on it via {@link cascading.pipe.Pipe#getStepConfigDef()}. 629 * 630 * @return GroupBy type 631 */ 632 public GroupBy getGroupBy() 633 { 634 return groupBy; 635 } 636 637 @Property(name = "threshold", visibility = Visibility.PUBLIC) 638 @PropertyDescription("Threshold of the aggregation.") 639 @PropertyConfigured(value = AGGREGATE_BY_THRESHOLD, defaultValue = "10000") 640 public int getThreshold() 641 { 642 return threshold; 643 } 644 }