001/*
002 * Copyright (c) 2016-2017 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
004 *
005 * Project and contact information: http://www.cascading.org/
006 *
007 * This file is part of the Cascading project.
008 *
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *     http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 */
021
022package cascading.operation;
023
024import java.io.Serializable;
025
026import cascading.flow.FlowProcess;
027import cascading.flow.planner.Scope;
028import cascading.pipe.Each;
029import cascading.pipe.Every;
030import cascading.pipe.Pipe;
031import cascading.tuple.Fields;
032import cascading.tuple.Tuple;
033import cascading.util.TraceUtil;
034import cascading.util.Traceable;
035
036/**
037 * Class BaseOperation is the base class of predicate types that are applied to {@link Tuple} streams via
038 * the {@link Each} or {@link Every} {@link Pipe}.
039 * <p>
040 * Specific examples of Operations are {@link Function}, {@link Filter}, {@link Aggregator}, {@link Buffer},
041 * and {@link Assertion}.
042 * <p>
043 * By default, {@link #isSafe()} returns {@code true}.
044 */
045public abstract class BaseOperation<Context> implements Serializable, Operation<Context>, Traceable
046  {
047  /** Field fieldDeclaration */
048  protected Fields fieldDeclaration;
049  /** Field numArgs */
050  protected int numArgs = ANY;
051  /** Field trace */
052  protected String trace = TraceUtil.captureDebugTrace( this ); // see TraceUtil.setTrace() to override
053
054  // initialize this operation based on its subclass
055  {
056  if( this instanceof Filter || this instanceof Assertion )
057    fieldDeclaration = Fields.ALL;
058  else
059    fieldDeclaration = Fields.UNKNOWN;
060  }
061
062  /**
063   * Constructs a new instance that returns an {@link Fields#UNKNOWN} {@link Tuple} and accepts any number of arguments.
064   * <p>
065   * It is a best practice to always declare the field names and number of arguments via one of the other constructors.
066   */
067  protected BaseOperation()
068    {
069    }
070
071  /**
072   * Constructs a new instance that returns the fields declared in fieldDeclaration and accepts any number of arguments.
073   *
074   * @param fieldDeclaration of type Fields
075   */
076  protected BaseOperation( Fields fieldDeclaration )
077    {
078    this.fieldDeclaration = fieldDeclaration;
079    verify();
080    }
081
082  /**
083   * Constructs a new instance that returns an unknown field set and accepts the given numArgs number of arguments.
084   *
085   * @param numArgs of type numArgs
086   */
087  protected BaseOperation( int numArgs )
088    {
089    this.numArgs = numArgs;
090    verify();
091    }
092
093  /**
094   * Constructs a new instance that returns the fields declared in fieldDeclaration and accepts numArgs number of arguments.
095   *
096   * @param numArgs          of type numArgs
097   * @param fieldDeclaration of type Fields
098   */
099  protected BaseOperation( int numArgs, Fields fieldDeclaration )
100    {
101    this.numArgs = numArgs;
102    this.fieldDeclaration = fieldDeclaration;
103    verify();
104    }
105
106  /** Validates the state of this instance. */
107  private void verify()
108    {
109    if( this instanceof Filter && fieldDeclaration != Fields.ALL )
110      throw new IllegalArgumentException( "fieldDeclaration must be set to Fields.ALL for filter operations" );
111
112    if( this instanceof Assertion && fieldDeclaration != Fields.ALL )
113      throw new IllegalArgumentException( "fieldDeclaration must be set to Fields.ALL for assertion operations" );
114
115    if( fieldDeclaration == null )
116      throw new IllegalArgumentException( "fieldDeclaration may not be null" );
117
118    if( numArgs < 0 )
119      throw new IllegalArgumentException( "numArgs may not be negative" );
120    }
121
122  /** Method prepare does nothing, and may safely be overridden. */
123  @Override
124  public void prepare( FlowProcess flowProcess, OperationCall<Context> operationCall )
125    {
126    // do nothing
127    }
128
129  @Override
130  public void flush( FlowProcess flowProcess, OperationCall<Context> contextOperationCall )
131    {
132    }
133
134  /** Method cleanup does nothing, and may safely be overridden. */
135  @Override
136  public void cleanup( FlowProcess flowProcess, OperationCall<Context> operationCall )
137    {
138    // do nothing
139    }
140
141  @Override
142  public Fields getFieldDeclaration()
143    {
144    return fieldDeclaration;
145    }
146
147  @Override
148  public int getNumArgs()
149    {
150    return numArgs;
151    }
152
153  @Override
154  public boolean isSafe()
155    {
156    return true;
157    }
158
159  @Override
160  public String getTrace()
161    {
162    return trace;
163    }
164
165  @Override
166  public String toString()
167    {
168    return toStringInternal( (Operation) this );
169    }
170
171  public static String toStringInternal( Operation operation )
172    {
173    StringBuilder buffer = new StringBuilder();
174
175    Class<? extends Operation> type = operation.getClass();
176    if( type.getSimpleName().length() != 0 )
177      buffer.append( type.getSimpleName() );
178    else
179      buffer.append( type.getName() ); // should get something for an anonymous inner class
180
181    if( operation.getFieldDeclaration() != null )
182      buffer.append( "[decl:" ).append( operation.getFieldDeclaration() ).append( "]" );
183
184    if( operation.getNumArgs() != ANY )
185      buffer.append( "[args:" ).append( operation.getNumArgs() ).append( "]" );
186
187    return buffer.toString();
188    }
189
190  public static void printOperationInternal( Operation operation, StringBuffer buffer, Scope scope )
191    {
192    Class<? extends Operation> type = operation.getClass();
193
194    if( type.getSimpleName().length() != 0 )
195      buffer.append( type.getSimpleName() );
196    else
197      buffer.append( type.getName() ); // should get something for an anonymous inner class
198
199    if( scope.getOperationDeclaredFields() != null )
200      buffer.append( "[decl:" ).append( scope.getOperationDeclaredFields().printVerbose() ).append( "]" );
201
202    if( operation.getNumArgs() != ANY )
203      buffer.append( "[args:" ).append( operation.getNumArgs() ).append( "]" );
204    }
205
206  @Override
207  public boolean equals( Object object )
208    {
209    if( this == object )
210      return true;
211    if( !( object instanceof BaseOperation ) )
212      return false;
213
214    BaseOperation that = (BaseOperation) object;
215
216    if( numArgs != that.numArgs )
217      return false;
218    if( fieldDeclaration != null ? !fieldDeclaration.equals( that.fieldDeclaration ) : that.fieldDeclaration != null )
219      return false;
220
221    return true;
222    }
223
224  @Override
225  public int hashCode()
226    {
227    int result = fieldDeclaration != null ? fieldDeclaration.hashCode() : 0;
228    result = 31 * result + numArgs;
229    return result;
230    }
231  }