001/*
002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.operation;
022
023import java.io.Serializable;
024
025import cascading.flow.FlowProcess;
026import cascading.flow.planner.Scope;
027import cascading.pipe.Each;
028import cascading.pipe.Every;
029import cascading.pipe.Pipe;
030import cascading.tuple.Fields;
031import cascading.tuple.Tuple;
032import cascading.util.TraceUtil;
033import cascading.util.Traceable;
034
035/**
036 * Class BaseOperation is the base class of predicate types that are applied to {@link Tuple} streams via
037 * the {@link Each} or {@link Every} {@link Pipe}.
038 * </p>
039 * Specific examples of Operations are {@link Function}, {@link Filter}, {@link Aggregator}, {@link Buffer},
040 * and {@link Assertion}.
041 * <p/>
042 * By default, {@link #isSafe()} returns {@code true}.
043 */
044public abstract class BaseOperation<Context> implements Serializable, Operation<Context>, Traceable
045  {
046  /** Field fieldDeclaration */
047  protected Fields fieldDeclaration;
048  /** Field numArgs */
049  protected int numArgs = ANY;
050  /** Field trace */
051  protected String trace = TraceUtil.captureDebugTrace( this ); // see TraceUtil.setTrace() to override
052
053  // initialize this operation based on its subclass
054  {
055  if( this instanceof Filter || this instanceof Assertion )
056    fieldDeclaration = Fields.ALL;
057  else
058    fieldDeclaration = Fields.UNKNOWN;
059  }
060
061  /**
062   * Constructs a new instance that returns an {@link Fields#UNKNOWN} {@link Tuple} and accepts any number of arguments.
063   * </p>
064   * It is a best practice to always declare the field names and number of arguments via one of the other constructors.
065   */
066  protected BaseOperation()
067    {
068    }
069
070  /**
071   * Constructs a new instance that returns the fields declared in fieldDeclaration and accepts any number of arguments.
072   *
073   * @param fieldDeclaration of type Fields
074   */
075  protected BaseOperation( Fields fieldDeclaration )
076    {
077    this.fieldDeclaration = fieldDeclaration;
078    verify();
079    }
080
081  /**
082   * Constructs a new instance that returns an unknown field set and accepts the given numArgs number of arguments.
083   *
084   * @param numArgs of type numArgs
085   */
086  protected BaseOperation( int numArgs )
087    {
088    this.numArgs = numArgs;
089    verify();
090    }
091
092  /**
093   * Constructs a new instance that returns the fields declared in fieldDeclaration and accepts numArgs number of arguments.
094   *
095   * @param numArgs          of type numArgs
096   * @param fieldDeclaration of type Fields
097   */
098  protected BaseOperation( int numArgs, Fields fieldDeclaration )
099    {
100    this.numArgs = numArgs;
101    this.fieldDeclaration = fieldDeclaration;
102    verify();
103    }
104
105  /** Validates the state of this instance. */
106  private final void verify()
107    {
108    if( this instanceof Filter && fieldDeclaration != Fields.ALL )
109      throw new IllegalArgumentException( "fieldDeclaration must be set to Fields.ALL for filter operations" );
110
111    if( this instanceof Assertion && fieldDeclaration != Fields.ALL )
112      throw new IllegalArgumentException( "fieldDeclaration must be set to Fields.ALL for assertion operations" );
113
114    if( fieldDeclaration == null )
115      throw new IllegalArgumentException( "fieldDeclaration may not be null" );
116
117    if( numArgs < 0 )
118      throw new IllegalArgumentException( "numArgs may not be negative" );
119    }
120
121  /** Method prepare does nothing, and may safely be overridden. */
122  @Override
123  public void prepare( FlowProcess flowProcess, OperationCall<Context> operationCall )
124    {
125    // do nothing
126    }
127
128  @Override
129  public void flush( FlowProcess flowProcess, OperationCall<Context> contextOperationCall )
130    {
131    }
132
133  /** Method cleanup does nothing, and may safely be overridden. */
134  @Override
135  public void cleanup( FlowProcess flowProcess, OperationCall<Context> operationCall )
136    {
137    // do nothing
138    }
139
140  @Override
141  public Fields getFieldDeclaration()
142    {
143    return fieldDeclaration;
144    }
145
146  @Override
147  public int getNumArgs()
148    {
149    return numArgs;
150    }
151
152  @Override
153  public boolean isSafe()
154    {
155    return true;
156    }
157
158  @Override
159  public String getTrace()
160    {
161    return trace;
162    }
163
164  @Override
165  public String toString()
166    {
167    return toStringInternal( (Operation) this );
168    }
169
170  public static String toStringInternal( Operation operation )
171    {
172    StringBuilder buffer = new StringBuilder();
173
174    Class<? extends Operation> type = operation.getClass();
175    if( type.getSimpleName().length() != 0 )
176      buffer.append( type.getSimpleName() );
177    else
178      buffer.append( type.getName() ); // should get something for an anonymous inner class
179
180    if( operation.getFieldDeclaration() != null )
181      buffer.append( "[decl:" ).append( operation.getFieldDeclaration() ).append( "]" );
182
183    if( operation.getNumArgs() != ANY )
184      buffer.append( "[args:" ).append( operation.getNumArgs() ).append( "]" );
185
186    return buffer.toString();
187    }
188
189  public static void printOperationInternal( Operation operation, StringBuffer buffer, Scope scope )
190    {
191    Class<? extends Operation> type = operation.getClass();
192
193    if( type.getSimpleName().length() != 0 )
194      buffer.append( type.getSimpleName() );
195    else
196      buffer.append( type.getName() ); // should get something for an anonymous inner class
197
198    if( scope.getOperationDeclaredFields() != null )
199      buffer.append( "[decl:" ).append( scope.getOperationDeclaredFields().printVerbose() ).append( "]" );
200
201    if( operation.getNumArgs() != ANY )
202      buffer.append( "[args:" ).append( operation.getNumArgs() ).append( "]" );
203    }
204
205  @Override
206  public boolean equals( Object object )
207    {
208    if( this == object )
209      return true;
210    if( !( object instanceof BaseOperation ) )
211      return false;
212
213    BaseOperation that = (BaseOperation) object;
214
215    if( numArgs != that.numArgs )
216      return false;
217    if( fieldDeclaration != null ? !fieldDeclaration.equals( that.fieldDeclaration ) : that.fieldDeclaration != null )
218      return false;
219
220    return true;
221    }
222
223  @Override
224  public int hashCode()
225    {
226    int result = fieldDeclaration != null ? fieldDeclaration.hashCode() : 0;
227    result = 31 * result + numArgs;
228    return result;
229    }
230  }