001/*
002 * Copyright (c) 2016-2017 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
004 *
005 * Project and contact information: http://www.cascading.org/
006 *
007 * This file is part of the Cascading project.
008 *
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *     http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 */
021
022package cascading.operation;
023
024import cascading.flow.FlowProcess;
025
026/**
027 * A Buffer is similar to an {@link Aggregator} by the fact that it operates on unique groups of values. It differs
028 * by the fact that an {@link java.util.Iterator} is provided and it is the responsibility
029 * of the {@link #operate(cascading.flow.FlowProcess, BufferCall)} method to iterate overall all the input
030 * arguments returned by this Iterator, if any.
031 * <p>
032 * For the case where a Buffer follows a CoGroup, the method {@link #operate(cascading.flow.FlowProcess, BufferCall)}
033 * will be called for every unique group whether or not there are values available to iterate over. This may be
034 * counter-intuitive for the case of an 'inner join' where the left or right stream may have a null grouping key value.
035 * Regardless, the current grouping value can be retrieved through {@link BufferCall#getGroup()}.
036 * <p>
037 * Buffer is very useful when header or footer values need to be inserted into a grouping, or if values need to be
038 * inserted into the middle of the group values. For example, consider a stream of timestamps. A Buffer could
039 * be used to add missing entries, or to calculate running or moving averages over a smaller "window" within the grouping.
040 * <p>
041 * By default, if a result is emitted from the Buffer before the argumentsIterator is started or after it is
042 * completed ({@code argumentsIterator.hasNext() == false}), non-grouping values are forced to null (to allow for header
043 * and footer tuple results).
044 * <p>
045 * By setting {@link BufferCall#setRetainValues(boolean)} to {@code true} in the
046 * {@link Buffer#prepare(cascading.flow.FlowProcess, OperationCall)} method, the last seen Tuple values will not be
047 * nulled after completion and will be treated as the current incoming Tuple when merged with the Buffer result Tuple
048 * via the Every outgoing selector.
049 * <p>
050 * There may be only one Buffer after a {@link cascading.pipe.GroupBy} or {@link cascading.pipe.CoGroup}. And there
051 * may not be any additional {@link cascading.pipe.Every} pipes before or after the buffers Every pipe instance. A
052 * {@link cascading.flow.planner.PlannerException} will be thrown if these rules are violated.
053 * <p>
054 * Buffer implementations should be re-entrant. There is no guarantee a Buffer instance will be executed in a
055 * unique vm, or by a single thread. Also, note the Iterator will return the same {@link cascading.tuple.TupleEntry}
056 * instance, but with new values in its child {@link cascading.tuple.Tuple}.
057 * <p>
058 * As of Cascading 2.5, if the previous CoGroup uses a {@link cascading.pipe.joiner.BufferJoin} as the
059 * {@link cascading.pipe.joiner.Joiner}, a Buffer may be used to implement differing Joiner strategies.
060 * <p>
061 * Instead of calling {@link cascading.operation.BufferCall#getArgumentsIterator()} (which will return null),
062 * {@link cascading.operation.BufferCall#getJoinerClosure()} will return an {@link cascading.pipe.joiner.JoinerClosure}
063 * instance with direct access to each CoGrouped Iterator.
064 */
065public interface Buffer<Context> extends Operation<Context>
066  {
067  /**
068   * Method operate is called once for each grouping. {@link BufferCall} passes in an {@link java.util.Iterator}
069   * that returns an argument {@link cascading.tuple.TupleEntry} for each value in the grouping defined by the
070   * argument selector on the parent Every pipe instance.
071   * <p>
072   * TupleEntry entry, or entry.getTuple() should not be stored directly in a collection or modified.
073   * A copy of the tuple should be made via the {@code new Tuple( entry.getTuple() )} copy constructor.
074   * <p>
075   * This method is called for every unique group, whether or not there are values in the arguments Iterator.
076   *
077   * @param flowProcess of type FlowProcess
078   * @param bufferCall  of type BufferCall
079   */
080  void operate( FlowProcess flowProcess, BufferCall<Context> bufferCall );
081  }