001/*
002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.pipe;
022
023import java.beans.ConstructorProperties;
024import java.util.Set;
025
026import cascading.flow.planner.Scope;
027import cascading.operation.Aggregator;
028import cascading.operation.AssertionLevel;
029import cascading.operation.Buffer;
030import cascading.operation.GroupAssertion;
031import cascading.tuple.Fields;
032
033/**
034 * The Every operator applies an {@link Aggregator} or {@link Buffer} to every grouping.
035 * <p/>
036 * Any number of Every instances may follow other Every, {@link GroupBy}, or {@link CoGroup} instances if they apply an
037 * Aggregator, not a Buffer. If a Buffer, only one Every may follow a GroupBy or CoGroup.
038 * <p/>
039 * Every operators create aggregate values for every grouping they encounter. This aggregate value is added to the current
040 * grouping Tuple.
041 * <p/>
042 * In the case of a CoGroup, the grouping Tuple will be all the grouping keys from all joined streams,
043 * and if an "outer" type join is used, one value on the groupingTuple may be null.
044 * <p/>
045 * Subsequent Every instances can continue to append values to the grouping Tuple. When an Each follows
046 * and Every, the Each applies its operation to the grouping Tuple (thus all child values in the grouping are discarded
047 * and only aggregate values are propagated).
048 */
049public class Every extends Operator
050  {
051  /** Field AGGREGATOR_ARGUMENTS */
052  private static final Fields AGGREGATOR_ARGUMENTS = Fields.ALL;
053  /** Field AGGREGATOR_SELECTOR */
054  private static final Fields AGGREGATOR_SELECTOR = Fields.ALL;
055  /** Field ASSERTION_SELECTOR */
056  private static final Fields ASSERTION_SELECTOR = Fields.RESULTS;
057
058  /**
059   * Constructor Every creates a new Every instance.
060   *
061   * @param previous   previous Pipe to receive input Tuples from
062   * @param aggregator Aggregator to be applied to every input Tuple grouping
063   */
064  @ConstructorProperties({"previous", "aggregator"})
065  public Every( Pipe previous, Aggregator aggregator )
066    {
067    super( previous, AGGREGATOR_ARGUMENTS, aggregator, AGGREGATOR_SELECTOR );
068    }
069
070  /**
071   * Constructor Every creates a new Every instance.
072   *
073   * @param previous         previous Pipe to receive input Tuples from
074   * @param argumentSelector field selector that selects Function arguments from the input Tuple
075   * @param aggregator       Aggregator to be applied to every input Tuple grouping
076   */
077  @ConstructorProperties({"previous", "argumentSelector", "aggregator"})
078  public Every( Pipe previous, Fields argumentSelector, Aggregator aggregator )
079    {
080    super( previous, argumentSelector, aggregator, AGGREGATOR_SELECTOR );
081    }
082
083  /**
084   * Constructor Every creates a new Every instance.
085   *
086   * @param previous         previous Pipe to receive input Tuples from
087   * @param argumentSelector field selector that selects Function arguments from the input Tuple
088   * @param aggregator       Aggregator to be applied to every input Tuple grouping
089   * @param outputSelector   field selector that selects the output Tuple from the grouping and Aggregator results Tuples
090   */
091  @ConstructorProperties({"previous", "argumentSelector", "aggregator", "outputSelector"})
092  public Every( Pipe previous, Fields argumentSelector, Aggregator aggregator, Fields outputSelector )
093    {
094    super( previous, argumentSelector, aggregator, outputSelector );
095    }
096
097  /**
098   * Constructor Every creates a new Every instance.
099   *
100   * @param previous       previous Pipe to receive input Tuples from
101   * @param aggregator     Aggregator to be applied to every input Tuple grouping
102   * @param outputSelector field selector that selects the output Tuple from the grouping and Aggregator results Tuples
103   */
104  @ConstructorProperties({"previous", "aggregator", "outputSelector"})
105  public Every( Pipe previous, Aggregator aggregator, Fields outputSelector )
106    {
107    super( previous, AGGREGATOR_ARGUMENTS, aggregator, outputSelector );
108    }
109
110  /**
111   * Constructor Every creates a new Every instance.
112   *
113   * @param previous previous Pipe to receive input Tuples from
114   * @param buffer   Buffer to be applied to every input Tuple grouping
115   */
116  @ConstructorProperties({"previous", "buffer"})
117  public Every( Pipe previous, Buffer buffer )
118    {
119    super( previous, AGGREGATOR_ARGUMENTS, buffer, AGGREGATOR_SELECTOR );
120    }
121
122  /**
123   * Constructor Every creates a new Every instance.
124   *
125   * @param previous         previous Pipe to receive input Tuples from
126   * @param argumentSelector field selector that selects Function arguments from the input Tuple
127   * @param buffer           Buffer to be applied to every input Tuple grouping
128   */
129  @ConstructorProperties({"previous", "argumentSelector", "buffer"})
130  public Every( Pipe previous, Fields argumentSelector, Buffer buffer )
131    {
132    super( previous, argumentSelector, buffer, AGGREGATOR_SELECTOR );
133    }
134
135  /**
136   * Constructor Every creates a new Every instance.
137   *
138   * @param previous         previous Pipe to receive input Tuples from
139   * @param argumentSelector field selector that selects Function arguments from the input Tuple
140   * @param buffer           Buffer to be applied to every input Tuple grouping
141   * @param outputSelector   field selector that selects the output Tuple from the grouping and Buffer results Tuples
142   */
143  @ConstructorProperties({"previous", "argumentSelector", "buffer", "outputSelector"})
144  public Every( Pipe previous, Fields argumentSelector, Buffer buffer, Fields outputSelector )
145    {
146    super( previous, argumentSelector, buffer, outputSelector );
147    }
148
149  /**
150   * Constructor Every creates a new Every instance.
151   *
152   * @param previous       previous Pipe to receive input Tuples from
153   * @param buffer         Buffer to be applied to every input Tuple grouping
154   * @param outputSelector field selector that selects the output Tuple from the grouping and Buffer results Tuples
155   */
156  @ConstructorProperties({"previous", "buffer", "outputSelector"})
157  public Every( Pipe previous, Buffer buffer, Fields outputSelector )
158    {
159    super( previous, AGGREGATOR_ARGUMENTS, buffer, outputSelector );
160    }
161
162  /**
163   * Constructor Every creates a new Every instance.
164   *
165   * @param previous       previous Pipe to receive input Tuples from
166   * @param assertionLevel of type AssertionLevel
167   * @param assertion      GroupAssertion to be applied to every input Tuple grouping
168   */
169  @ConstructorProperties({"previous", "assertionLevel", "assertion"})
170  public Every( Pipe previous, AssertionLevel assertionLevel, GroupAssertion assertion )
171    {
172    super( previous, AGGREGATOR_ARGUMENTS, assertionLevel, assertion, ASSERTION_SELECTOR );
173    }
174
175  /**
176   * Constructor Every creates a new Every instance.
177   *
178   * @param previous         previous Pipe to receive input Tuples from
179   * @param argumentSelector field selector that selects Function arguments from the input Tuple
180   * @param assertionLevel   AssertionLevel to associate with the Assertion
181   * @param assertion        GroupAssertion to be applied to every input Tuple grouping
182   */
183  @ConstructorProperties({"previous", "argumentSelector", "assertionLevel", "assertion"})
184  public Every( Pipe previous, Fields argumentSelector, AssertionLevel assertionLevel, GroupAssertion assertion )
185    {
186    super( previous, argumentSelector, assertionLevel, assertion, ASSERTION_SELECTOR );
187    }
188
189  /**
190   * Method isBuffer returns true if this Every instance holds a {@link cascading.operation.Buffer} operation.
191   *
192   * @return boolean
193   */
194  public boolean isBuffer()
195    {
196    return operation instanceof Buffer;
197    }
198
199  /**
200   * Method isReducer returns true if this Every instance holds a {@link Aggregator} operation.
201   *
202   * @return boolean
203   */
204  public boolean isAggregator()
205    {
206    return operation instanceof Aggregator;
207    }
208
209  public boolean isGroupAssertion()
210    {
211    return operation instanceof GroupAssertion;
212    }
213
214  public Aggregator getAggregator()
215    {
216    return (Aggregator) operation;
217    }
218
219  public Buffer getBuffer()
220    {
221    return (Buffer) operation;
222    }
223
224  public GroupAssertion getGroupAssertion()
225    {
226    return (GroupAssertion) operation;
227    }
228
229  @Override
230  public Fields resolveIncomingOperationArgumentFields( Scope incomingScope )
231    {
232    if( isBuffer() )
233      return incomingScope.getIncomingBufferArgumentFields();
234    else
235      return incomingScope.getIncomingAggregatorArgumentFields();
236    }
237
238  @Override
239  public Fields resolveIncomingOperationPassThroughFields( Scope incomingScope )
240    {
241    if( isBuffer() )
242      return incomingScope.getIncomingBufferPassThroughFields();
243    else
244      return incomingScope.getIncomingAggregatorPassThroughFields();
245    }
246
247  @Override
248  public Scope outgoingScopeFor( Set<Scope> incomingScopes )
249    {
250    Scope incomingScope = getFirst( incomingScopes );
251
252    if( !isBuffer() && incomingScope.getOutValuesFields().isNone() )
253      throw new OperatorException( this, "only a Buffer may be preceded by a CoGroup declaring Fields.NONE as the join fields" );
254
255    Fields argumentFields = resolveArgumentSelector( incomingScopes );
256
257    verifyArguments( argumentFields );
258
259    // we currently don't support using result from a previous Every in the current Every
260    verifyAggregatorArguments( argumentFields, incomingScope );
261
262    Fields declaredFields = resolveDeclared( incomingScopes, argumentFields );
263
264    verifyDeclaredFields( declaredFields );
265
266    Fields outgoingGroupingFields = resolveOutgoingGroupingSelector( incomingScopes, argumentFields, declaredFields );
267
268    verifyOutputSelector( outgoingGroupingFields );
269
270    Fields outgoingValuesFields = incomingScope.getOutValuesFields();
271
272    // the incoming fields eligible to be outgoing, for Every only the grouping fields.
273    Fields passThroughFields = resolveIncomingOperationPassThroughFields( incomingScope );
274    Fields remainderFields = resolveRemainderFields( incomingScopes, argumentFields );
275
276    return new Scope( getName(), Scope.Kind.EVERY, passThroughFields, remainderFields, argumentFields, declaredFields, outgoingGroupingFields, outgoingValuesFields );
277    }
278
279  private void verifyAggregatorArguments( Fields argumentFields, Scope incomingScope )
280    {
281    if( ( !isBuffer() ) && incomingScope.isEvery() && argumentFields.contains( incomingScope.getOperationDeclaredFields() ) )
282      throw new OperatorException( this, "arguments may not select a declared field from a previous Every" );
283    }
284
285  Fields resolveOutgoingGroupingSelector( Set<Scope> incomingScopes, Fields argumentSelector, Fields declared )
286    {
287    try
288      {
289      return resolveOutgoingSelector( incomingScopes, argumentSelector, declared );
290      }
291    catch( Exception exception )
292      {
293      if( exception instanceof OperatorException )
294        throw (OperatorException) exception;
295
296      if( isBuffer() )
297        throw new OperatorException( this, "could not resolve outgoing values selector in: " + this, exception );
298      else
299        throw new OperatorException( this, "could not resolve outgoing grouping selector in: " + this, exception );
300      }
301    }
302  }