public class AggregateBy extends SubAssembly
SubAssembly
that serves two roles for handling aggregate operations.
The first role is as a base class for composable aggregate operations that have a MapReduce Map side optimization for the Reduce side aggregation. For example 'summing' a value within a grouping can be performed partially Map side and completed Reduce side. Summing is associative and commutative.
AggregateBy also supports operations that are not associative/commutative like 'counting'. Counting would result in 'counting' value occurrences Map side but summing those counts Reduce side. (Yes, counting can be transposed to summing Map and Reduce sides by emitting 1's before the first sum, but that's three operations over two, and a hack)
Think of this mechanism as a MapReduce Combiner, but more efficient as no values are serialized, deserialized, saved to disk, and multi-pass sorted in the process, which consume cpu resources in trade of memory and a little or no IO.
Further, Combiners are limited to only associative/commutative operations.
Additionally the Cascading planner can move the Map side optimization to the previous Reduce operation further increasing IO performance (between the preceding Reduce and Map phase which is over HDFS).
The second role of the AggregateBy class is to allow for composition of AggregateBy
sub-classes. That is, SumBy
and CountBy
AggregateBy sub-classes can be performed
in parallel on the same grouping keys.
Custom AggregateBy classes can be created by sub-classing this class and implementing a special
AggregateBy.Functor
for use on the Map side. Multiple Functor instances are managed by the AggregateBy.CompositeFunction
class allowing them all to share the same LRU value map for more efficiency.
AggregateBy instances return argumentFields
which are used internally to control the values passed to
internal Functor instances. If any argumentFields also have Comparator
s, they will be used
to for secondary sorting (see GroupBy
sortFields
. This feature is used by FirstBy
to
control which Tuple is seen first for a grouping.
To tune the LRU, set the capacity
value to a high enough value to utilize available memory. Or set a
default value via the AggregateByProps.AGGREGATE_BY_CAPACITY
property. The current default
(BaseCacheFactory.DEFAULT_CAPACITY
)
is 10, 000
unique keys.
The LRU cache is pluggable and defaults to LRUHashMapCache
. It can be changed
by setting AggregateByProps.AGGREGATE_BY_CACHE_FACTORY
property to the name of a sub-class of
BaseCacheFactory
.
Note using a AggregateBy instance automatically inserts a GroupBy
into the resulting Flow
.
And passing multiple AggregateBy instances to a parent AggregateBy instance still results in one GroupBy.
Also note that Unique
is not a CompositeAggregator and is slightly more optimized internally.
As of Cascading 2.6 AggregateBy honors the Hasher
interface for storing keys in the cache.
Modifier and Type | Class and Description |
---|---|
static class |
AggregateBy.Cache |
static class |
AggregateBy.CompositeFunction
Class CompositeFunction takes multiple Functor instances and manages them as a single
Function . |
static interface |
AggregateBy.Functor
Interface Functor provides a means to create a simple function for use with the
AggregateBy.CompositeFunction class. |
Modifier and Type | Field and Description |
---|---|
static int |
USE_DEFAULT_THRESHOLD |
configDef, nodeConfigDef, parent, stepConfigDef
Modifier | Constructor and Description |
---|---|
protected |
AggregateBy(Fields argumentFields,
AggregateBy.Functor functor,
Aggregator aggregator)
Constructor CompositeAggregator creates a new CompositeAggregator instance.
|
|
AggregateBy(Pipe pipe,
Fields groupingFields,
AggregateBy... assemblies)
Constructor CompositeAggregator creates a new CompositeAggregator instance.
|
|
AggregateBy(Pipe pipe,
Fields groupingFields,
int capacity,
AggregateBy... assemblies)
Constructor CompositeAggregator creates a new CompositeAggregator instance.
|
protected |
AggregateBy(java.lang.String name,
int capacity)
Constructor CompositeAggregator creates a new CompositeAggregator instance.
|
|
AggregateBy(java.lang.String name,
Pipe[] pipes,
Fields groupingFields,
AggregateBy... assemblies)
Constructor CompositeAggregator creates a new CompositeAggregator instance.
|
protected |
AggregateBy(java.lang.String name,
Pipe[] pipes,
Fields groupingFields,
Fields argumentFields,
AggregateBy.Functor functor,
Aggregator aggregator,
int capacity) |
|
AggregateBy(java.lang.String name,
Pipe[] pipes,
Fields groupingFields,
int capacity,
AggregateBy... assemblies)
Constructor CompositeAggregator creates a new CompositeAggregator instance.
|
|
AggregateBy(java.lang.String name,
Pipe pipe,
Fields groupingFields,
int capacity,
AggregateBy... assemblies)
Constructor CompositeAggregator creates a new CompositeAggregator instance.
|
Modifier and Type | Method and Description |
---|---|
protected Aggregator[] |
getAggregators() |
protected Fields[] |
getArgumentFields() |
int |
getCapacity() |
Fields[] |
getFieldDeclarations()
Method getFieldDeclarations returns an array of Fields where each Field element in the array corresponds to the
field declaration of the given Aggregator operations.
|
protected AggregateBy.Functor[] |
getFunctors() |
GroupBy |
getGroupBy()
Method getGroupBy returns the internal
GroupBy instance so that any custom properties
can be set on it via Pipe.getStepConfigDef() . |
Fields |
getGroupingFields()
Method getGroupingFields returns the Fields this instances will be grouping against.
|
protected void |
initialize(Fields groupingFields,
Pipe[] pipes,
Fields[] argumentFields,
AggregateBy.Functor[] functors,
Aggregator[] aggregators) |
protected void |
initialize(Fields groupingFields,
Pipe[] pipes,
Fields argumentFields,
AggregateBy.Functor functor,
Aggregator aggregator) |
protected void |
verify()
Method verify should be overridden by sub-classes if any values must be tested before the calling constructor returns.
|
getName, getPrevious, getTailNames, getTails, setPrevious, setTails, unwind
equals, getConfigDef, getHeads, getNodeConfigDef, getParent, getStepConfigDef, getTrace, hasConfigDef, hashCode, hasNodeConfigDef, hasStepConfigDef, id, named, names, outgoingScopeFor, pipes, print, printInternal, resolveIncomingOperationArgumentFields, resolveIncomingOperationPassThroughFields, setParent, toString
public static final int USE_DEFAULT_THRESHOLD
protected AggregateBy(java.lang.String name, int capacity)
name
- of type Stringcapacity
- of type intprotected AggregateBy(Fields argumentFields, AggregateBy.Functor functor, Aggregator aggregator)
argumentFields
- of type Fieldsfunctor
- of type Functoraggregator
- of type Aggregator@ConstructorProperties(value={"pipe","groupingFields","assemblies"}) public AggregateBy(Pipe pipe, Fields groupingFields, AggregateBy... assemblies)
pipe
- of type PipegroupingFields
- of type Fieldsassemblies
- of type CompositeAggregator...@ConstructorProperties(value={"pipe","groupingFields","capacity","assemblies"}) public AggregateBy(Pipe pipe, Fields groupingFields, int capacity, AggregateBy... assemblies)
pipe
- of type PipegroupingFields
- of type Fieldscapacity
- of type intassemblies
- of type CompositeAggregator...@ConstructorProperties(value={"name","pipe","groupingFields","capacity","assemblies"}) public AggregateBy(java.lang.String name, Pipe pipe, Fields groupingFields, int capacity, AggregateBy... assemblies)
pipe
- of type PipegroupingFields
- of type Fieldscapacity
- of type intassemblies
- of type CompositeAggregator...@ConstructorProperties(value={"name","pipes","groupingFields","assemblies"}) public AggregateBy(java.lang.String name, Pipe[] pipes, Fields groupingFields, AggregateBy... assemblies)
name
- of type Stringpipes
- of type Pipe[]groupingFields
- of type Fieldsassemblies
- of type CompositeAggregator...@ConstructorProperties(value={"name","pipes","groupingFields","capacity","assemblies"}) public AggregateBy(java.lang.String name, Pipe[] pipes, Fields groupingFields, int capacity, AggregateBy... assemblies)
name
- of type Stringpipes
- of type Pipe[]groupingFields
- of type Fieldscapacity
- of type intassemblies
- of type CompositeAggregator...protected AggregateBy(java.lang.String name, Pipe[] pipes, Fields groupingFields, Fields argumentFields, AggregateBy.Functor functor, Aggregator aggregator, int capacity)
protected void initialize(Fields groupingFields, Pipe[] pipes, Fields argumentFields, AggregateBy.Functor functor, Aggregator aggregator)
protected void initialize(Fields groupingFields, Pipe[] pipes, Fields[] argumentFields, AggregateBy.Functor[] functors, Aggregator[] aggregators)
protected void verify()
public Fields getGroupingFields()
public Fields[] getFieldDeclarations()
Note the actual Fields values are returned, not planner resolved Fields.
protected Fields[] getArgumentFields()
protected AggregateBy.Functor[] getFunctors()
protected Aggregator[] getAggregators()
public GroupBy getGroupBy()
GroupBy
instance so that any custom properties
can be set on it via Pipe.getStepConfigDef()
.public int getCapacity()
Copyright © 2007-2017 Cascading Maintainers. All Rights Reserved.