public interface Aggregator<Context> extends Operation<Context>
MaxValue
, MinValue
,
Count
, and Average
are good examples.
Aggregator implementations should be reentrant. There is no guarantee an Aggregator instance will be executed in a
unique vm, or by a single thread. The start(cascading.flow.FlowProcess, AggregatorCall)
method provides a mechanism for maintaining a 'context' object to hold intermediate values.
Note TupleEntry
instances are reused internally so should not be stored. Instead use the TupleEntry or Tuple
copy constructors to make safe copies.
Since Aggregators can be chained, and Cascading pipelines all operation results, any Aggregators
coming ahead of the current Aggregator must return a value before the complete(cascading.flow.FlowProcess, AggregatorCall)
method on this Aggregator is called. Subsequently, if any previous Aggregators return more than one Tuple result,
this complete() method will be called for each Tuple emitted.
Thus it is a best practice to implement a Buffer
when emitting more than one, or zero Tuple results.AggregatorCall
,
OperationCall
Modifier and Type | Method and Description |
---|---|
void |
aggregate(FlowProcess flowProcess,
AggregatorCall<Context> aggregatorCall)
Method aggregate is called for each
TupleEntry value in the current grouping. |
void |
complete(FlowProcess flowProcess,
AggregatorCall<Context> aggregatorCall)
Method complete will be issued last after every
TupleEntry has been passed to the
aggregate(cascading.flow.FlowProcess, AggregatorCall)
method. |
void |
start(FlowProcess flowProcess,
AggregatorCall<Context> aggregatorCall)
Method start initializes the aggregation procedure and is called for every unique grouping.
|
cleanup, flush, getFieldDeclaration, getNumArgs, isSafe, prepare
void start(FlowProcess flowProcess, AggregatorCall<Context> aggregatorCall)
aggregate(cascading.flow.FlowProcess, AggregatorCall)
call,
new HashMap() should be set on the AggregatorCall instance when OperationCall.getContext()
is null.
On the next grouping, start() will be called again, but this time with the old Map instance. In this case,
map.clear() should be invoked before returning.flowProcess
- of type FlowProcessaggregatorCall
- of type AggregatorCallvoid aggregate(FlowProcess flowProcess, AggregatorCall<Context> aggregatorCall)
TupleEntry
value in the current grouping.
TupleEntry entry, or entry.getTuple() should not be stored directly in the context. A copy of the tuple
should be made via the new Tuple( entry.getTuple() )
copy constructor.flowProcess
- of type FlowProcessaggregatorCall
- of type AggregatorCallvoid complete(FlowProcess flowProcess, AggregatorCall<Context> aggregatorCall)
TupleEntry
has been passed to the
aggregate(cascading.flow.FlowProcess, AggregatorCall)
method. Any final calculation should be completed here and passed to the outputCollector.flowProcess
- of type FlowProcessaggregatorCall
- of type AggregatorCallCopyright © 2007-2015 Concurrent, Inc. All Rights Reserved.