001    /*
002     * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.pipe;
022    
023    import java.util.Set;
024    
025    import cascading.flow.FlowElement;
026    import cascading.flow.planner.Scope;
027    import cascading.operation.Assertion;
028    import cascading.operation.AssertionLevel;
029    import cascading.operation.BaseOperation;
030    import cascading.operation.Operation;
031    import cascading.operation.PlannedOperation;
032    import cascading.operation.PlannerLevel;
033    import cascading.tuple.Fields;
034    import cascading.tuple.FieldsResolverException;
035    import cascading.tuple.TupleException;
036    
037    /**
038     * An Operator is a type of {@link Pipe}. Operators pass specified arguments to a given {@link cascading.operation.BaseOperation}.
039     * </p>
040     * The argFields value select the input fields used by the operation. By default the whole input Tuple is passes as arguments.
041     * The outFields value select the fields in the result Tuple returned by this Pipe. By default, the operation results
042     * of the given operation replace the input Tuple.
043     */
044    public abstract class Operator extends Pipe
045      {
046      /** Field operation */
047      protected final Operation operation;
048      /** Field argumentSelector */
049      protected Fields argumentSelector = Fields.ALL; // use wildcard. let the operation choose
050      /** Field outputSelector */
051      protected Fields outputSelector = Fields.RESULTS;  // this is overridden by the subclasses via the ctor
052      /** Field assertionLevel */
053      protected PlannerLevel plannerLevel; // do not initialize a default
054    
055      protected Operator( Operation operation )
056        {
057        this.operation = operation;
058        verifyOperation();
059        }
060    
061      protected Operator( String name, Operation operation )
062        {
063        super( name );
064        this.operation = operation;
065        verifyOperation();
066        }
067    
068      protected Operator( String name, Operation operation, Fields outputSelector )
069        {
070        super( name );
071        this.operation = operation;
072        this.outputSelector = outputSelector;
073        verifyOperation();
074        }
075    
076      protected Operator( String name, Fields argumentSelector, Operation operation )
077        {
078        super( name );
079        this.operation = operation;
080        this.argumentSelector = argumentSelector;
081        verifyOperation();
082        }
083    
084      protected Operator( String name, Fields argumentSelector, Operation operation, Fields outputSelector )
085        {
086        super( name );
087        this.operation = operation;
088        this.argumentSelector = argumentSelector;
089        this.outputSelector = outputSelector;
090        verifyOperation();
091        }
092    
093      protected Operator( Pipe previous, Operation operation )
094        {
095        super( previous );
096        this.operation = operation;
097        verifyOperation();
098        }
099    
100      protected Operator( Pipe previous, Fields argumentSelector, Operation operation )
101        {
102        super( previous );
103        this.operation = operation;
104        this.argumentSelector = argumentSelector;
105        verifyOperation();
106        }
107    
108      protected Operator( Pipe previous, Fields argumentSelector, Operation operation, Fields outputSelector )
109        {
110        super( previous );
111        this.operation = operation;
112        this.argumentSelector = argumentSelector;
113        this.outputSelector = outputSelector;
114        verifyOperation();
115        }
116    
117      protected Operator( Pipe previous, Operation operation, Fields outputSelector )
118        {
119        super( previous );
120        this.operation = operation;
121        this.outputSelector = outputSelector;
122        verifyOperation();
123        }
124    
125      protected Operator( String name, PlannerLevel plannerLevel, PlannedOperation operation, Fields outputSelector )
126        {
127        super( name );
128        this.plannerLevel = plannerLevel;
129        this.operation = operation;
130        this.outputSelector = outputSelector;
131        verifyOperation();
132        }
133    
134      protected Operator( String name, Fields argumentSelector, PlannerLevel plannerLevel, PlannedOperation operation, Fields outputSelector )
135        {
136        super( name );
137        this.plannerLevel = plannerLevel;
138        this.operation = operation;
139        this.argumentSelector = argumentSelector;
140        this.outputSelector = outputSelector;
141        verifyOperation();
142        }
143    
144      protected Operator( Pipe previous, PlannerLevel plannerLevel, PlannedOperation operation, Fields outputSelector )
145        {
146        super( previous );
147        this.plannerLevel = plannerLevel;
148        this.operation = operation;
149        this.outputSelector = outputSelector;
150        verifyOperation();
151        }
152    
153      protected Operator( Pipe previous, Fields argumentSelector, PlannerLevel plannerLevel, PlannedOperation operation, Fields outputSelector )
154        {
155        super( previous );
156        this.plannerLevel = plannerLevel;
157        this.operation = operation;
158        this.argumentSelector = argumentSelector;
159        this.outputSelector = outputSelector;
160        verifyOperation();
161        }
162    
163      protected void verifyOperation()
164        {
165        if( operation == null )
166          throw new IllegalArgumentException( "operation may not be null" );
167    
168        if( argumentSelector == null )
169          throw new IllegalArgumentException( "argumentSelector may not be null" );
170    
171        if( outputSelector == null )
172          throw new IllegalArgumentException( "outputSelector may not be null" );
173    
174        if( operation instanceof PlannedOperation )
175          {
176          if( plannerLevel == null )
177            throw new IllegalArgumentException( "planner level may not be null" );
178          else if( plannerLevel.isNoneLevel() )
179            throw new IllegalArgumentException( "given planner level: " + plannerLevel.getClass().getName() + ", may not be NONE" );
180          }
181        }
182    
183      /**
184       * Method getOperation returns the operation managed by this Operator object.
185       *
186       * @return the operation (type Operation) of this Operator object.
187       */
188      public Operation getOperation()
189        {
190        return operation;
191        }
192    
193      /**
194       * Method getArgumentSelector returns the argumentSelector of this Operator object.
195       *
196       * @return the argumentSelector (type Fields) of this Operator object.
197       */
198      public Fields getArgumentSelector()
199        {
200        return argumentSelector;
201        }
202    
203      /**
204       * Method getFieldDeclaration returns the fieldDeclaration of this Operator object.
205       *
206       * @return the fieldDeclaration (type Fields) of this Operator object.
207       */
208      public Fields getFieldDeclaration()
209        {
210        return operation.getFieldDeclaration();
211        }
212    
213      /**
214       * Method getOutputSelector returns the outputSelector of this Operator object.
215       *
216       * @return the outputSelector (type Fields) of this Operator object.
217       */
218      public Fields getOutputSelector()
219        {
220        return outputSelector;
221        }
222    
223      /**
224       * Method getAssertionLevel returns the assertionLevel of this Operator object. Only used if the {@link cascading.operation.Operation}
225       * is an {@link Assertion}.
226       *
227       * @return the assertionLevel (type Assertion.Level) of this Operator object.
228       */
229      @Deprecated
230      public AssertionLevel getAssertionLevel()
231        {
232        return (AssertionLevel) plannerLevel;
233        }
234    
235      /**
236       * Method getPlannerLevel returns the plannerLevel of this Operator object.
237       *
238       * @return the plannerLevel (type PlannerLevel) of this Operator object.
239       */
240      public PlannerLevel getPlannerLevel()
241        {
242        return plannerLevel;
243        }
244    
245      /**
246       * Method hasPlannerLevel returns true if this Operator object holds a {@link PlannedOperation} object with an associated
247       * {@link PlannerLevel} level.
248       *
249       * @return boolean
250       */
251      public boolean hasPlannerLevel()
252        {
253        return plannerLevel != null;
254        }
255    
256      // FIELDS
257    
258      protected Fields resolveRemainderFields( Set<Scope> incomingScopes, Fields argumentFields )
259        {
260        Fields fields = resolveIncomingOperationArgumentFields( getFirst( incomingScopes ) );
261    
262        if( fields.isUnknown() )
263          return fields;
264    
265        return fields.subtract( argumentFields );
266        }
267    
268      public abstract Scope outgoingScopeFor( Set<Scope> incomingScopes );
269    
270      void verifyDeclaredFields( Fields declared )
271        {
272        if( declared.isDefined() && declared.size() == 0 )
273          throw new OperatorException( this, "field declaration: " + getFieldDeclaration().printVerbose() + ", resolves to an empty field set, current grouping is on all fields" );
274        }
275    
276      void verifyOutputSelector( Fields outputSelector )
277        {
278        if( outputSelector.isDefined() && outputSelector.size() == 0 )
279          throw new OperatorException( this, "output selector: " + getOutputSelector().printVerbose() + ", resolves to an empty field set, current grouping is on all fields" );
280        }
281    
282      void verifyArguments( Fields argumentSelector )
283        {
284        if( argumentSelector.isUnknown() )
285          return;
286    
287        if( operation.getNumArgs() != Operation.ANY && argumentSelector.size() < operation.getNumArgs() )
288          throw new OperatorException( this, "resolved wrong number of arguments: " + argumentSelector.printVerbose() + ", expected: " + operation.getNumArgs() );
289        }
290    
291      Fields resolveOutgoingSelector( Set<Scope> incomingScopes, Fields argumentFields, Fields declaredFields )
292        {
293        Scope incomingScope = getFirst( incomingScopes );
294        Fields outputSelector = getOutputSelector();
295    
296        if( outputSelector.isResults() )
297          return declaredFields;
298    
299        if( outputSelector.isArguments() )
300          return argumentFields;
301    
302        if( outputSelector.isGroup() )
303          return incomingScope.getOutGroupingFields();
304    
305        if( outputSelector.isValues() )
306          return incomingScope.getOutGroupingValueFields();
307    
308        Fields incomingFields = resolveIncomingOperationPassThroughFields( incomingScope );
309    
310        // not part of resolve as we need the argumentFields
311        if( outputSelector.isSwap() )
312          return Fields.asDeclaration( incomingFields.subtract( argumentFields ) ).append( declaredFields );
313    
314        try
315          {
316          return Fields.resolve( outputSelector, Fields.asDeclaration( incomingFields ), declaredFields );
317          }
318        catch( TupleException exception )
319          {
320          throw new OperatorException( this, incomingFields, declaredFields, outputSelector, exception );
321          }
322        }
323    
324      Fields resolveArgumentSelector( Set<Scope> incomingScopes )
325        {
326        Fields argumentSelector = getArgumentSelector();
327    
328        try
329          {
330          Scope incomingScope = getFirst( incomingScopes );
331    
332          if( argumentSelector.isAll() )
333            return resolveIncomingOperationArgumentFields( incomingScope );
334    
335          if( argumentSelector.isGroup() )
336            return incomingScope.getOutGroupingFields();
337    
338          if( argumentSelector.isValues() )
339            return incomingScope.getOutGroupingValueFields();
340    
341          return resolveIncomingOperationArgumentFields( incomingScope ).select( argumentSelector );
342          }
343        catch( FieldsResolverException exception )
344          {
345          throw new OperatorException( this, OperatorException.Kind.argument, exception.getSourceFields(), argumentSelector, exception );
346          }
347        catch( Exception exception )
348          {
349          throw new OperatorException( this, "unable to resolve argument selector: " + argumentSelector.printVerbose(), exception );
350          }
351        }
352    
353      Fields resolveDeclared( Set<Scope> incomingScopes, Fields arguments )
354        {
355        Fields fieldDeclaration = getFieldDeclaration();
356    
357        if( getOutputSelector().isReplace() )
358          {
359          if( arguments.isDefined() && fieldDeclaration.isDefined() && arguments.size() != fieldDeclaration.size() )
360            throw new OperatorException( this, "during REPLACE both the arguments selector and field declaration must be the same size, arguments: " + arguments.printVerbose() + " declaration: " + fieldDeclaration.printVerbose() );
361    
362          if( fieldDeclaration.isArguments() ) // there is no type info, so inherit it
363            return arguments;
364    
365          return arguments.project( fieldDeclaration );
366          }
367    
368        try
369          {
370          Scope incomingScope = getFirst( incomingScopes );
371    
372          if( fieldDeclaration.isUnknown() )
373            return fieldDeclaration;
374    
375          if( fieldDeclaration.isArguments() )
376            return Fields.asDeclaration( arguments );
377    
378          if( fieldDeclaration.isAll() )
379            return resolveIncomingOperationPassThroughFields( incomingScope );
380    
381          if( fieldDeclaration.isGroup() )
382            return incomingScope.getOutGroupingFields();
383    
384          // VALUES is the diff between all fields and group fields
385          if( fieldDeclaration.isValues() )
386            return incomingScope.getOutGroupingValueFields();
387    
388          }
389        catch( Exception exception )
390          {
391          throw new OperatorException( this, "could not resolve declared fields in:  " + this, exception );
392          }
393    
394        return fieldDeclaration;
395        }
396    
397      // OBJECT OVERRIDES
398    
399      @Override
400      public String toString()
401        {
402        return super.toString() + "[" + operation + "]";
403        }
404    
405      @Override
406      protected void printInternal( StringBuffer buffer, Scope scope )
407        {
408        super.printInternal( buffer, scope );
409        buffer.append( "[" );
410        BaseOperation.printOperationInternal( operation, buffer, scope );
411        buffer.append( "]" );
412        }
413    
414      @Override
415      public boolean isEquivalentTo( FlowElement element )
416        {
417        boolean equivalentTo = super.isEquivalentTo( element );
418    
419        if( !equivalentTo )
420          return false;
421    
422        Operator operator = (Operator) element;
423    
424        equivalentTo = argumentSelector.equals( operator.argumentSelector );
425    
426        if( !equivalentTo )
427          return false;
428    
429        if( !operation.equals( operator.operation ) )
430          return false;
431    
432        equivalentTo = outputSelector.equals( operator.outputSelector );
433    
434        if( !equivalentTo )
435          return false;
436    
437        return true;
438        }
439    
440      @SuppressWarnings({"RedundantIfStatement"})
441      public boolean equals( Object object )
442        {
443        if( this == object )
444          return true;
445        if( object == null || getClass() != object.getClass() )
446          return false;
447        if( !super.equals( object ) )
448          return false;
449    
450        Operator operator = (Operator) object;
451    
452        if( argumentSelector != null ? !argumentSelector.equals( operator.argumentSelector ) : operator.argumentSelector != null )
453          return false;
454        if( operation != null ? !operation.equals( operator.operation ) : operator.operation != null )
455          return false;
456        if( outputSelector != null ? !outputSelector.equals( operator.outputSelector ) : operator.outputSelector != null )
457          return false;
458    
459        return true;
460        }
461    
462      @Override
463      public int hashCode()
464        {
465        int result = super.hashCode();
466        result = 31 * result + ( operation != null ? operation.hashCode() : 0 );
467        result = 31 * result + ( argumentSelector != null ? argumentSelector.hashCode() : 0 );
468        result = 31 * result + ( outputSelector != null ? outputSelector.hashCode() : 0 );
469        return result;
470        }
471      }