001/*
002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.pipe;
022
023import java.beans.ConstructorProperties;
024import java.util.Set;
025
026import cascading.flow.planner.Scope;
027import cascading.operation.Assertion;
028import cascading.operation.AssertionLevel;
029import cascading.operation.Debug;
030import cascading.operation.DebugLevel;
031import cascading.operation.Filter;
032import cascading.operation.Function;
033import cascading.operation.ValueAssertion;
034import cascading.tuple.Fields;
035import cascading.tuple.Tuple;
036
037/**
038 * The Each operator applies either a {@link Function} or a {@link Filter} to each entry in the {@link Tuple}
039 * stream. Any number of Each operators can follow an Each, {@link Splice}, or {@link Every}
040 * operator.
041 */
042public class Each extends Operator
043  {
044  /** Field FUNCTION_SELECTOR */
045  private static final Fields FUNCTION_SELECTOR = Fields.RESULTS;
046  /** Field FILTER_SELECTOR */
047  private static final Fields FILTER_SELECTOR = Fields.RESULTS;
048
049  ///////////////////
050  // TAKE FUNCTIONS
051  ///////////////////
052
053  /**
054   * Pass all fields to the given function, only return fields declared by the function.
055   *
056   * @param name     name for this branch of Pipes
057   * @param function Function to be applied to each input Tuple
058   */
059  @ConstructorProperties({"name", "function"})
060  public Each( String name, Function function )
061    {
062    super( name, function, FUNCTION_SELECTOR );
063    }
064
065  /**
066   * Only pass argumentFields to the given function, only return fields declared by the function.
067   *
068   * @param name             name for this branch of Pipes
069   * @param argumentSelector field selector that selects Function arguments from the input Tuple
070   * @param function         Function to be applied to each input Tuple
071   */
072  @ConstructorProperties({"name", "argumentSelector", "function"})
073  public Each( String name, Fields argumentSelector, Function function )
074    {
075    super( name, argumentSelector, function, FUNCTION_SELECTOR );
076    }
077
078  /**
079   * Only pass argumentFields to the given function, only return fields selected by the outputSelector.
080   *
081   * @param name             name for this branch of Pipes
082   * @param argumentSelector field selector that selects Function arguments from the input Tuple
083   * @param function         Function to be applied to each input Tuple
084   * @param outputSelector   field selector that selects the output Tuple from the input and Function results Tuples
085   */
086  @ConstructorProperties({"name", "argumentSelector", "function", "outputSelector"})
087  public Each( String name, Fields argumentSelector, Function function, Fields outputSelector )
088    {
089    super( name, argumentSelector, function, outputSelector );
090    }
091
092  /**
093   * Only return fields selected by the outputSelector.
094   *
095   * @param name           name for this branch of Pipes
096   * @param function       Function to be applied to each input Tuple
097   * @param outputSelector field selector that selects the output Tuple from the input and Function results Tuples
098   */
099  @ConstructorProperties({"name", "function", "outputSelector"})
100  public Each( String name, Function function, Fields outputSelector )
101    {
102    super( name, function, outputSelector );
103    }
104
105  /**
106   * Pass all fields to the given function, only return fields declared by the function.
107   *
108   * @param previous previous Pipe to receive input Tuples from
109   * @param function Function to be applied to each input Tuple
110   */
111  @ConstructorProperties({"previous", "function"})
112  public Each( Pipe previous, Function function )
113    {
114    super( previous, function, FUNCTION_SELECTOR );
115    }
116
117  /**
118   * Only pass argumentFields to the given function, only return fields declared by the function.
119   *
120   * @param previous         previous Pipe to receive input Tuples from
121   * @param argumentSelector field selector that selects Function arguments from the input Tuple
122   * @param function         Function to be applied to each input Tuple
123   */
124  @ConstructorProperties({"previous", "argumentSelector", "function"})
125  public Each( Pipe previous, Fields argumentSelector, Function function )
126    {
127    super( previous, argumentSelector, function, FUNCTION_SELECTOR );
128    }
129
130  /**
131   * Only pass argumentFields to the given function, only return fields selected by the outputSelector.
132   *
133   * @param previous         previous Pipe to receive input Tuples from
134   * @param argumentSelector field selector that selects Function arguments from the input Tuple
135   * @param function         Function to be applied to each input Tuple
136   * @param outputSelector   field selector that selects the output Tuple from the input and Function results Tuples
137   */
138  @ConstructorProperties({"previous", "argumentSelector", "function", "outputSelector"})
139  public Each( Pipe previous, Fields argumentSelector, Function function, Fields outputSelector )
140    {
141    super( previous, argumentSelector, function, outputSelector );
142    }
143
144  /**
145   * Only pass argumentFields to the given function, only return fields selected by the outputSelector.
146   *
147   * @param previous       previous Pipe to receive input Tuples from
148   * @param function       Function to be applied to each input Tuple
149   * @param outputSelector field selector that selects the output Tuple from the input and Function results Tuples
150   */
151  @ConstructorProperties({"previous", "function", "outputSelector"})
152  public Each( Pipe previous, Function function, Fields outputSelector )
153    {
154    super( previous, function, outputSelector );
155    }
156
157  /////////////////
158  // TAKE FILTERS
159  /////////////////
160
161  /**
162   * Constructor Each creates a new Each instance.
163   *
164   * @param name   name for this branch of Pipes
165   * @param filter Filter to be applied to each input Tuple
166   */
167  @ConstructorProperties({"name", "filter"})
168  public Each( String name, Filter filter )
169    {
170    super( name, filter, FILTER_SELECTOR );
171    }
172
173  /**
174   * Constructor Each creates a new Each instance.
175   *
176   * @param name             name for this branch of Pipes
177   * @param argumentSelector field selector that selects Function arguments from the input Tuple
178   * @param filter           Filter to be applied to each input Tuple
179   */
180  @ConstructorProperties({"name", "argumentSelector", "filter"})
181  public Each( String name, Fields argumentSelector, Filter filter )
182    {
183    super( name, argumentSelector, filter, FILTER_SELECTOR );
184    }
185
186  /**
187   * Constructor Each creates a new Each instance.
188   *
189   * @param previous previous Pipe to receive input Tuples from
190   * @param filter   Filter to be applied to each input Tuple
191   */
192  @ConstructorProperties({"previous", "filter"})
193  public Each( Pipe previous, Filter filter )
194    {
195    super( previous, filter, FILTER_SELECTOR );
196    }
197
198  /**
199   * Constructor Each creates a new Each instance.
200   *
201   * @param previous         previous Pipe to receive input Tuples from
202   * @param argumentSelector field selector that selects Function arguments from the input Tuple
203   * @param filter           Filter to be applied to each input Tuple
204   */
205  @ConstructorProperties({"previous", "argumentSelector", "filter"})
206  public Each( Pipe previous, Fields argumentSelector, Filter filter )
207    {
208    super( previous, argumentSelector, filter, FILTER_SELECTOR );
209    }
210
211  ///////////////
212  // ASSERTIONS
213  ///////////////
214
215  /**
216   * Constructor Each creates a new Each instance.
217   *
218   * @param name           name for this branch of Pipes
219   * @param assertionLevel AssertionLevel to associate with the Assertion
220   * @param assertion      Assertion to be applied to each input Tuple
221   */
222  @ConstructorProperties({"name", "assertionLevel", "assertion"})
223  public Each( String name, AssertionLevel assertionLevel, Assertion assertion )
224    {
225    super( name, assertionLevel, assertion, FILTER_SELECTOR );
226    }
227
228  /**
229   * @param name             name for this branch of Pipes
230   * @param argumentSelector field selector that selects Function arguments from the input Tuple
231   * @param assertionLevel   AssertionLevel to associate with the Assertion
232   * @param assertion        Assertion to be applied to each input Tuple
233   */
234  @ConstructorProperties({"name", "argumentSelector", "assertionLevel", "assertion"})
235  public Each( String name, Fields argumentSelector, AssertionLevel assertionLevel, Assertion assertion )
236    {
237    super( name, argumentSelector, assertionLevel, assertion, FILTER_SELECTOR );
238    }
239
240  /**
241   * @param previous       previous Pipe to receive input Tuples from
242   * @param assertionLevel AssertionLevel to associate with the Assertion
243   * @param assertion      Assertion to be applied to each input Tuple
244   */
245  @ConstructorProperties({"previous", "assertionLevel", "assertion"})
246  public Each( Pipe previous, AssertionLevel assertionLevel, Assertion assertion )
247    {
248    super( previous, assertionLevel, assertion, FILTER_SELECTOR );
249    }
250
251  /**
252   * @param previous         previous Pipe to receive input Tuples from
253   * @param argumentSelector field selector that selects Function arguments from the input Tuple
254   * @param assertionLevel   AssertionLevel to associate with the Assertion
255   * @param assertion        Assertion to be applied to each input Tuple
256   */
257  @ConstructorProperties({"previous", "argumentSelector", "assertionLevel", "assertion"})
258  public Each( Pipe previous, Fields argumentSelector, AssertionLevel assertionLevel, Assertion assertion )
259    {
260    super( previous, argumentSelector, assertionLevel, assertion, FILTER_SELECTOR );
261    }
262
263  //////////
264  //DEBUG
265  //////////
266
267  /**
268   * @param name             name for this branch of Pipes
269   * @param argumentSelector field selector that selects Function arguments from the input Tuple
270   * @param debugLevel       DebugLevel to associate with the Debug
271   * @param debug            Debug to be applied to each input Tuple
272   */
273  @ConstructorProperties({"name", "argumentSelector", "debugLevel", "debug"})
274  public Each( String name, Fields argumentSelector, DebugLevel debugLevel, Debug debug )
275    {
276    super( name, argumentSelector, debugLevel, debug, FILTER_SELECTOR );
277    }
278
279  /**
280   * @param previous   previous Pipe to receive input Tuples from
281   * @param debugLevel DebugLevel to associate with the Debug
282   * @param debug      Debug to be applied to each input Tuple
283   */
284  @ConstructorProperties({"previous", "debugLevel", "debug"})
285  public Each( Pipe previous, DebugLevel debugLevel, Debug debug )
286    {
287    super( previous, debugLevel, debug, FILTER_SELECTOR );
288    }
289
290  /**
291   * @param previous         previous Pipe to receive input Tuples from
292   * @param argumentSelector field selector that selects Function arguments from the input Tuple
293   * @param debugLevel       DebugLevel to associate with the Debug
294   * @param debug            Debug to be applied to each input Tuple
295   */
296  @ConstructorProperties({"previous", "argumentSelector", "debugLevel", "debug"})
297  public Each( Pipe previous, Fields argumentSelector, DebugLevel debugLevel, Debug debug )
298    {
299    super( previous, argumentSelector, debugLevel, debug, FILTER_SELECTOR );
300    }
301
302  @Override
303  protected void verifyOperation()
304    {
305    // backwards compatibility with 1.0
306    if( plannerLevel == null && operation instanceof Debug )
307      plannerLevel = DebugLevel.DEFAULT;
308
309    super.verifyOperation();
310
311    if( !argumentSelector.isArgSelector() )
312      throw new IllegalArgumentException( "invalid argument selector: " + argumentSelector );
313
314    if( !operation.getFieldDeclaration().isDeclarator() )
315      throw new IllegalArgumentException( "invalid field declaration: " + operation.getFieldDeclaration() );
316
317    if( !outputSelector.isOutSelector() )
318      throw new IllegalArgumentException( "invalid output selector: " + outputSelector );
319    }
320
321  public Function getFunction()
322    {
323    return (Function) operation;
324    }
325
326  public Filter getFilter()
327    {
328    return (Filter) operation;
329    }
330
331  public ValueAssertion getValueAssertion()
332    {
333    return (ValueAssertion) operation;
334    }
335
336  public boolean isFunction()
337    {
338    return operation instanceof Function;
339    }
340
341  public boolean isFilter()
342    {
343    return operation instanceof Filter;
344    }
345
346  public boolean isValueAssertion()
347    {
348    return operation instanceof ValueAssertion;
349    }
350
351  // FIELDS
352
353  @Override
354  public Fields resolveIncomingOperationArgumentFields( Scope incomingScope )
355    {
356    return incomingScope.getIncomingFunctionArgumentFields();
357    }
358
359  @Override
360  public Fields resolveIncomingOperationPassThroughFields( Scope incomingScope )
361    {
362    return incomingScope.getIncomingFunctionPassThroughFields();
363    }
364
365  @Override
366  public Scope outgoingScopeFor( Set<Scope> incomingScopes )
367    {
368    Fields argumentFields = resolveArgumentSelector( incomingScopes );
369
370    verifyArguments( argumentFields );
371
372    Fields declaredFields = resolveDeclared( incomingScopes, argumentFields );
373
374    verifyDeclaredFields( declaredFields );
375
376    Fields outgoingValuesFields = resolveOutgoingValuesSelector( incomingScopes, argumentFields, declaredFields );
377
378    verifyOutputSelector( outgoingValuesFields );
379
380    Fields outgoingGroupingFields = Fields.asDeclaration( outgoingValuesFields );
381
382    // the incoming fields eligible to be outgoing
383    Fields passThroughFields = resolveIncomingOperationPassThroughFields( getFirst( incomingScopes ) );
384    Fields remainderFields = resolveRemainderFields( incomingScopes, argumentFields );
385
386    return new Scope( getName(), Scope.Kind.EACH, passThroughFields, remainderFields, argumentFields, declaredFields, outgoingGroupingFields, outgoingValuesFields );
387    }
388
389  Fields resolveOutgoingValuesSelector( Set<Scope> incomingScopes, Fields argumentFields, Fields declaredFields )
390    {
391    try
392      {
393      return resolveOutgoingSelector( incomingScopes, argumentFields, declaredFields );
394      }
395    catch( Exception exception )
396      {
397      if( exception instanceof OperatorException )
398        throw (OperatorException) exception;
399
400      throw new OperatorException( this, "could not resolve outgoing values selector in: " + this, exception );
401      }
402    }
403  }