001/* 002 * Copyright (c) 2016-2017 Chris K Wensel <chris@wensel.net>. All Rights Reserved. 003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 004 * 005 * Project and contact information: http://www.cascading.org/ 006 * 007 * This file is part of the Cascading project. 008 * 009 * Licensed under the Apache License, Version 2.0 (the "License"); 010 * you may not use this file except in compliance with the License. 011 * You may obtain a copy of the License at 012 * 013 * http://www.apache.org/licenses/LICENSE-2.0 014 * 015 * Unless required by applicable law or agreed to in writing, software 016 * distributed under the License is distributed on an "AS IS" BASIS, 017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 018 * See the License for the specific language governing permissions and 019 * limitations under the License. 020 */ 021 022package cascading.operation; 023 024import cascading.flow.FlowProcess; 025 026/** 027 * A Buffer is similar to an {@link Aggregator} by the fact that it operates on unique groups of values. It differs 028 * by the fact that an {@link java.util.Iterator} is provided and it is the responsibility 029 * of the {@link #operate(cascading.flow.FlowProcess, BufferCall)} method to iterate overall all the input 030 * arguments returned by this Iterator, if any. 031 * <p> 032 * For the case where a Buffer follows a CoGroup, the method {@link #operate(cascading.flow.FlowProcess, BufferCall)} 033 * will be called for every unique group whether or not there are values available to iterate over. This may be 034 * counter-intuitive for the case of an 'inner join' where the left or right stream may have a null grouping key value. 035 * Regardless, the current grouping value can be retrieved through {@link BufferCall#getGroup()}. 036 * <p> 037 * Buffer is very useful when header or footer values need to be inserted into a grouping, or if values need to be 038 * inserted into the middle of the group values. For example, consider a stream of timestamps. A Buffer could 039 * be used to add missing entries, or to calculate running or moving averages over a smaller "window" within the grouping. 040 * <p> 041 * By default, if a result is emitted from the Buffer before the argumentsIterator is started or after it is 042 * completed ({@code argumentsIterator.hasNext() == false}), non-grouping values are forced to null (to allow for header 043 * and footer tuple results). 044 * <p> 045 * By setting {@link BufferCall#setRetainValues(boolean)} to {@code true} in the 046 * {@link Buffer#prepare(cascading.flow.FlowProcess, OperationCall)} method, the last seen Tuple values will not be 047 * nulled after completion and will be treated as the current incoming Tuple when merged with the Buffer result Tuple 048 * via the Every outgoing selector. 049 * <p> 050 * There may be only one Buffer after a {@link cascading.pipe.GroupBy} or {@link cascading.pipe.CoGroup}. And there 051 * may not be any additional {@link cascading.pipe.Every} pipes before or after the buffers Every pipe instance. A 052 * {@link cascading.flow.planner.PlannerException} will be thrown if these rules are violated. 053 * <p> 054 * Buffer implementations should be re-entrant. There is no guarantee a Buffer instance will be executed in a 055 * unique vm, or by a single thread. Also, note the Iterator will return the same {@link cascading.tuple.TupleEntry} 056 * instance, but with new values in its child {@link cascading.tuple.Tuple}. 057 * <p> 058 * As of Cascading 2.5, if the previous CoGroup uses a {@link cascading.pipe.joiner.BufferJoin} as the 059 * {@link cascading.pipe.joiner.Joiner}, a Buffer may be used to implement differing Joiner strategies. 060 * <p> 061 * Instead of calling {@link cascading.operation.BufferCall#getArgumentsIterator()} (which will return null), 062 * {@link cascading.operation.BufferCall#getJoinerClosure()} will return an {@link cascading.pipe.joiner.JoinerClosure} 063 * instance with direct access to each CoGrouped Iterator. 064 */ 065public interface Buffer<Context> extends Operation<Context> 066 { 067 /** 068 * Method operate is called once for each grouping. {@link BufferCall} passes in an {@link java.util.Iterator} 069 * that returns an argument {@link cascading.tuple.TupleEntry} for each value in the grouping defined by the 070 * argument selector on the parent Every pipe instance. 071 * <p> 072 * TupleEntry entry, or entry.getTuple() should not be stored directly in a collection or modified. 073 * A copy of the tuple should be made via the {@code new Tuple( entry.getTuple() )} copy constructor. 074 * <p> 075 * This method is called for every unique group, whether or not there are values in the arguments Iterator. 076 * 077 * @param flowProcess of type FlowProcess 078 * @param bufferCall of type BufferCall 079 */ 080 void operate( FlowProcess flowProcess, BufferCall<Context> bufferCall ); 081 }