001/*
002 * Copyright (c) 2016-2017 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
004 *
005 * Project and contact information: http://www.cascading.org/
006 *
007 * This file is part of the Cascading project.
008 *
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *     http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 */
021
022package cascading.pipe;
023
024import java.beans.ConstructorProperties;
025import java.util.Set;
026
027import cascading.flow.planner.Scope;
028import cascading.operation.Aggregator;
029import cascading.operation.AssertionLevel;
030import cascading.operation.Buffer;
031import cascading.operation.GroupAssertion;
032import cascading.tuple.Fields;
033
034/**
035 * The Every operator applies an {@link Aggregator} or {@link Buffer} to every grouping.
036 * <p>
037 * Any number of Every instances may follow other Every, {@link GroupBy}, or {@link CoGroup} instances if they apply an
038 * Aggregator, not a Buffer. If a Buffer, only one Every may follow a GroupBy or CoGroup.
039 * <p>
040 * Every operators create aggregate values for every grouping they encounter. This aggregate value is added to the current
041 * grouping Tuple.
042 * <p>
043 * In the case of a CoGroup, the grouping Tuple will be all the grouping keys from all joined streams,
044 * and if an "outer" type join is used, one value on the groupingTuple may be null.
045 * <p>
046 * Subsequent Every instances can continue to append values to the grouping Tuple. When an Each follows
047 * and Every, the Each applies its operation to the grouping Tuple (thus all child values in the grouping are discarded
048 * and only aggregate values are propagated).
049 */
050public class Every extends Operator
051  {
052  /** Field AGGREGATOR_ARGUMENTS */
053  private static final Fields AGGREGATOR_ARGUMENTS = Fields.ALL;
054  /** Field AGGREGATOR_SELECTOR */
055  private static final Fields AGGREGATOR_SELECTOR = Fields.ALL;
056  /** Field ASSERTION_SELECTOR */
057  private static final Fields ASSERTION_SELECTOR = Fields.RESULTS;
058
059  /**
060   * Constructor Every creates a new Every instance.
061   *
062   * @param previous   previous Pipe to receive input Tuples from
063   * @param aggregator Aggregator to be applied to every input Tuple grouping
064   */
065  @ConstructorProperties({"previous", "aggregator"})
066  public Every( Pipe previous, Aggregator aggregator )
067    {
068    super( previous, AGGREGATOR_ARGUMENTS, aggregator, AGGREGATOR_SELECTOR );
069    }
070
071  /**
072   * Constructor Every creates a new Every instance.
073   *
074   * @param previous         previous Pipe to receive input Tuples from
075   * @param argumentSelector field selector that selects Function arguments from the input Tuple
076   * @param aggregator       Aggregator to be applied to every input Tuple grouping
077   */
078  @ConstructorProperties({"previous", "argumentSelector", "aggregator"})
079  public Every( Pipe previous, Fields argumentSelector, Aggregator aggregator )
080    {
081    super( previous, argumentSelector, aggregator, AGGREGATOR_SELECTOR );
082    }
083
084  /**
085   * Constructor Every creates a new Every instance.
086   *
087   * @param previous         previous Pipe to receive input Tuples from
088   * @param argumentSelector field selector that selects Function arguments from the input Tuple
089   * @param aggregator       Aggregator to be applied to every input Tuple grouping
090   * @param outputSelector   field selector that selects the output Tuple from the grouping and Aggregator results Tuples
091   */
092  @ConstructorProperties({"previous", "argumentSelector", "aggregator", "outputSelector"})
093  public Every( Pipe previous, Fields argumentSelector, Aggregator aggregator, Fields outputSelector )
094    {
095    super( previous, argumentSelector, aggregator, outputSelector );
096    }
097
098  /**
099   * Constructor Every creates a new Every instance.
100   *
101   * @param previous       previous Pipe to receive input Tuples from
102   * @param aggregator     Aggregator to be applied to every input Tuple grouping
103   * @param outputSelector field selector that selects the output Tuple from the grouping and Aggregator results Tuples
104   */
105  @ConstructorProperties({"previous", "aggregator", "outputSelector"})
106  public Every( Pipe previous, Aggregator aggregator, Fields outputSelector )
107    {
108    super( previous, AGGREGATOR_ARGUMENTS, aggregator, outputSelector );
109    }
110
111  /**
112   * Constructor Every creates a new Every instance.
113   *
114   * @param previous previous Pipe to receive input Tuples from
115   * @param buffer   Buffer to be applied to every input Tuple grouping
116   */
117  @ConstructorProperties({"previous", "buffer"})
118  public Every( Pipe previous, Buffer buffer )
119    {
120    super( previous, AGGREGATOR_ARGUMENTS, buffer, AGGREGATOR_SELECTOR );
121    }
122
123  /**
124   * Constructor Every creates a new Every instance.
125   *
126   * @param previous         previous Pipe to receive input Tuples from
127   * @param argumentSelector field selector that selects Function arguments from the input Tuple
128   * @param buffer           Buffer to be applied to every input Tuple grouping
129   */
130  @ConstructorProperties({"previous", "argumentSelector", "buffer"})
131  public Every( Pipe previous, Fields argumentSelector, Buffer buffer )
132    {
133    super( previous, argumentSelector, buffer, AGGREGATOR_SELECTOR );
134    }
135
136  /**
137   * Constructor Every creates a new Every instance.
138   *
139   * @param previous         previous Pipe to receive input Tuples from
140   * @param argumentSelector field selector that selects Function arguments from the input Tuple
141   * @param buffer           Buffer to be applied to every input Tuple grouping
142   * @param outputSelector   field selector that selects the output Tuple from the grouping and Buffer results Tuples
143   */
144  @ConstructorProperties({"previous", "argumentSelector", "buffer", "outputSelector"})
145  public Every( Pipe previous, Fields argumentSelector, Buffer buffer, Fields outputSelector )
146    {
147    super( previous, argumentSelector, buffer, outputSelector );
148    }
149
150  /**
151   * Constructor Every creates a new Every instance.
152   *
153   * @param previous       previous Pipe to receive input Tuples from
154   * @param buffer         Buffer to be applied to every input Tuple grouping
155   * @param outputSelector field selector that selects the output Tuple from the grouping and Buffer results Tuples
156   */
157  @ConstructorProperties({"previous", "buffer", "outputSelector"})
158  public Every( Pipe previous, Buffer buffer, Fields outputSelector )
159    {
160    super( previous, AGGREGATOR_ARGUMENTS, buffer, outputSelector );
161    }
162
163  /**
164   * Constructor Every creates a new Every instance.
165   *
166   * @param previous       previous Pipe to receive input Tuples from
167   * @param assertionLevel of type AssertionLevel
168   * @param assertion      GroupAssertion to be applied to every input Tuple grouping
169   */
170  @ConstructorProperties({"previous", "assertionLevel", "assertion"})
171  public Every( Pipe previous, AssertionLevel assertionLevel, GroupAssertion assertion )
172    {
173    super( previous, AGGREGATOR_ARGUMENTS, assertionLevel, assertion, ASSERTION_SELECTOR );
174    }
175
176  /**
177   * Constructor Every creates a new Every instance.
178   *
179   * @param previous         previous Pipe to receive input Tuples from
180   * @param argumentSelector field selector that selects Function arguments from the input Tuple
181   * @param assertionLevel   AssertionLevel to associate with the Assertion
182   * @param assertion        GroupAssertion to be applied to every input Tuple grouping
183   */
184  @ConstructorProperties({"previous", "argumentSelector", "assertionLevel", "assertion"})
185  public Every( Pipe previous, Fields argumentSelector, AssertionLevel assertionLevel, GroupAssertion assertion )
186    {
187    super( previous, argumentSelector, assertionLevel, assertion, ASSERTION_SELECTOR );
188    }
189
190  /**
191   * Method isBuffer returns true if this Every instance holds a {@link cascading.operation.Buffer} operation.
192   *
193   * @return boolean
194   */
195  public boolean isBuffer()
196    {
197    return operation instanceof Buffer;
198    }
199
200  /**
201   * Method isReducer returns true if this Every instance holds a {@link Aggregator} operation.
202   *
203   * @return boolean
204   */
205  public boolean isAggregator()
206    {
207    return operation instanceof Aggregator;
208    }
209
210  public boolean isGroupAssertion()
211    {
212    return operation instanceof GroupAssertion;
213    }
214
215  public Aggregator getAggregator()
216    {
217    return (Aggregator) operation;
218    }
219
220  public Buffer getBuffer()
221    {
222    return (Buffer) operation;
223    }
224
225  public GroupAssertion getGroupAssertion()
226    {
227    return (GroupAssertion) operation;
228    }
229
230  @Override
231  public Fields resolveIncomingOperationArgumentFields( Scope incomingScope )
232    {
233    if( isBuffer() )
234      return incomingScope.getIncomingBufferArgumentFields();
235    else
236      return incomingScope.getIncomingAggregatorArgumentFields();
237    }
238
239  @Override
240  public Fields resolveIncomingOperationPassThroughFields( Scope incomingScope )
241    {
242    if( isBuffer() )
243      return incomingScope.getIncomingBufferPassThroughFields();
244    else
245      return incomingScope.getIncomingAggregatorPassThroughFields();
246    }
247
248  @Override
249  public Scope outgoingScopeFor( Set<Scope> incomingScopes )
250    {
251    Scope incomingScope = getFirst( incomingScopes );
252
253    if( !isBuffer() && incomingScope.getOutValuesFields().isNone() )
254      throw new OperatorException( this, "only a Buffer may be preceded by a CoGroup declaring Fields.NONE as the join fields" );
255
256    Fields argumentFields = resolveArgumentSelector( incomingScopes );
257
258    verifyArguments( argumentFields );
259
260    // we currently don't support using result from a previous Every in the current Every
261    verifyAggregatorArguments( argumentFields, incomingScope );
262
263    Fields declaredFields = resolveDeclared( incomingScopes, argumentFields );
264
265    verifyDeclaredFields( declaredFields );
266
267    Fields outgoingGroupingFields = resolveOutgoingGroupingSelector( incomingScopes, argumentFields, declaredFields );
268
269    verifyOutputSelector( outgoingGroupingFields );
270
271    Fields outgoingValuesFields = incomingScope.getOutValuesFields();
272
273    // the incoming fields eligible to be outgoing, for Every only the grouping fields.
274    Fields passThroughFields = resolveIncomingOperationPassThroughFields( incomingScope );
275    Fields remainderFields = resolveRemainderFields( incomingScopes, argumentFields );
276
277    return new Scope( getName(), Scope.Kind.EVERY, passThroughFields, remainderFields, argumentFields, declaredFields, outgoingGroupingFields, outgoingValuesFields );
278    }
279
280  private void verifyAggregatorArguments( Fields argumentFields, Scope incomingScope )
281    {
282    if( ( !isBuffer() ) && incomingScope.isEvery() && argumentFields.contains( incomingScope.getOperationDeclaredFields() ) )
283      throw new OperatorException( this, "arguments may not select a declared field from a previous Every" );
284    }
285
286  Fields resolveOutgoingGroupingSelector( Set<Scope> incomingScopes, Fields argumentSelector, Fields declared )
287    {
288    try
289      {
290      return resolveOutgoingSelector( incomingScopes, argumentSelector, declared );
291      }
292    catch( Exception exception )
293      {
294      if( exception instanceof OperatorException )
295        throw (OperatorException) exception;
296
297      if( isBuffer() )
298        throw new OperatorException( this, "could not resolve outgoing values selector in: " + this, exception );
299      else
300        throw new OperatorException( this, "could not resolve outgoing grouping selector in: " + this, exception );
301      }
302    }
303  }