001    /*
002     * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.operation.expression;
022    
023    import java.io.IOException;
024    import java.lang.reflect.InvocationTargetException;
025    import java.util.Arrays;
026    
027    import cascading.flow.FlowProcess;
028    import cascading.management.annotation.Property;
029    import cascading.management.annotation.PropertyDescription;
030    import cascading.management.annotation.Visibility;
031    import cascading.operation.BaseOperation;
032    import cascading.operation.OperationCall;
033    import cascading.operation.OperationException;
034    import cascading.tuple.Fields;
035    import cascading.tuple.Tuple;
036    import cascading.tuple.TupleEntry;
037    import cascading.tuple.Tuples;
038    import cascading.tuple.coerce.Coercions;
039    import cascading.tuple.type.CoercibleType;
040    import cascading.tuple.util.TupleViews;
041    import cascading.util.Util;
042    import org.codehaus.commons.compiler.CompileException;
043    import org.codehaus.janino.ScriptEvaluator;
044    
045    /**
046     *
047     */
048    public abstract class ScriptOperation extends BaseOperation<ScriptOperation.Context>
049      {
050      /** Field expression */
051      protected final String block;
052      /** Field parameterTypes */
053      protected Class[] parameterTypes;
054      /** Field parameterNames */
055      protected String[] parameterNames;
056      /** returnType */
057      protected Class returnType = Object.class;
058    
059      public ScriptOperation( int numArgs, Fields fieldDeclaration, String block )
060        {
061        super( numArgs, fieldDeclaration );
062        this.block = block;
063        this.returnType = fieldDeclaration.getTypeClass( 0 ) == null ? this.returnType : fieldDeclaration.getTypeClass( 0 );
064        }
065    
066      public ScriptOperation( int numArgs, Fields fieldDeclaration, String block, Class returnType )
067        {
068        super( numArgs, fieldDeclaration );
069        this.block = block;
070        this.returnType = returnType == null ? this.returnType : returnType;
071        }
072    
073      public ScriptOperation( int numArgs, Fields fieldDeclaration, String block, Class returnType, Class[] expectedTypes )
074        {
075        super( numArgs, fieldDeclaration );
076        this.block = block;
077        this.returnType = returnType == null ? this.returnType : returnType;
078    
079        if( expectedTypes == null )
080          throw new IllegalArgumentException( "expectedTypes may not be null" );
081    
082        this.parameterTypes = Arrays.copyOf( expectedTypes, expectedTypes.length );
083        }
084    
085      public ScriptOperation( int numArgs, Fields fieldDeclaration, String block, Class returnType, String[] parameterNames, Class[] parameterTypes )
086        {
087        super( numArgs, fieldDeclaration );
088        this.parameterNames = parameterNames == null ? null : Arrays.copyOf( parameterNames, parameterNames.length );
089        this.block = block;
090        this.returnType = returnType == null ? this.returnType : returnType;
091        this.parameterTypes = Arrays.copyOf( parameterTypes, parameterTypes.length );
092    
093        if( getParameterNamesInternal().length != getParameterTypesInternal().length )
094          throw new IllegalArgumentException( "parameterNames must be same length as parameterTypes" );
095        }
096    
097      public ScriptOperation( int numArgs, String block, Class returnType )
098        {
099        super( numArgs );
100        this.block = block;
101        this.returnType = returnType == null ? this.returnType : returnType;
102        }
103    
104      public ScriptOperation( int numArgs, String block, Class returnType, Class[] expectedTypes )
105        {
106        super( numArgs );
107        this.block = block;
108        this.returnType = returnType == null ? this.returnType : returnType;
109    
110        if( expectedTypes == null || expectedTypes.length == 0 )
111          throw new IllegalArgumentException( "expectedTypes may not be null or empty" );
112    
113        this.parameterTypes = Arrays.copyOf( expectedTypes, expectedTypes.length );
114        }
115    
116      public ScriptOperation( int numArgs, String block, Class returnType, String[] parameterNames, Class[] parameterTypes )
117        {
118        super( numArgs );
119        this.parameterNames = parameterNames == null ? null : Arrays.copyOf( parameterNames, parameterNames.length );
120        this.block = block;
121        this.returnType = returnType == null ? this.returnType : returnType;
122        this.parameterTypes = Arrays.copyOf( parameterTypes, parameterTypes.length );
123    
124        if( getParameterNamesInternal().length != getParameterTypesInternal().length )
125          throw new IllegalArgumentException( "parameterNames must be same length as parameterTypes" );
126        }
127    
128      @Property(name = "source", visibility = Visibility.PRIVATE)
129      @PropertyDescription("The Java source to execute.")
130      public String getBlock()
131        {
132        return block;
133        }
134    
135      private boolean hasParameterNames()
136        {
137        return parameterNames != null;
138        }
139    
140      @Property(name = "parameterNames", visibility = Visibility.PUBLIC)
141      @PropertyDescription("The declared parameter names.")
142      public String[] getParameterNames()
143        {
144        return Util.copy( parameterNames );
145        }
146    
147      private String[] getParameterNamesInternal()
148        {
149        if( parameterNames != null )
150          return parameterNames;
151    
152        try
153          {
154          parameterNames = guessParameterNames();
155          }
156        catch( IOException exception )
157          {
158          throw new OperationException( "could not read expression: " + block, exception );
159          }
160        catch( CompileException exception )
161          {
162          throw new OperationException( "could not compile expression: " + block, exception );
163          }
164    
165        return parameterNames;
166        }
167    
168      protected String[] guessParameterNames() throws CompileException, IOException
169        {
170        throw new OperationException( "parameter names are required" );
171        }
172    
173      private Fields getParameterFields()
174        {
175        return makeFields( getParameterNamesInternal() );
176        }
177    
178      private boolean hasParameterTypes()
179        {
180        return parameterTypes != null;
181        }
182    
183      @Property(name = "parameterTypes", visibility = Visibility.PUBLIC)
184      @PropertyDescription("The declared parameter types.")
185      public Class[] getParameterTypes()
186        {
187        return Util.copy( parameterTypes );
188        }
189    
190      private Class[] getParameterTypesInternal()
191        {
192        if( !hasParameterNames() )
193          return parameterTypes;
194    
195        if( hasParameterNames() && parameterNames.length == parameterTypes.length )
196          return parameterTypes;
197    
198        if( parameterNames.length > 0 && parameterTypes.length != 1 )
199          throw new IllegalStateException( "wrong number of parameter types, expects: " + parameterNames.length );
200    
201        Class[] types = new Class[ parameterNames.length ];
202    
203        Arrays.fill( types, parameterTypes[ 0 ] );
204    
205        parameterTypes = types;
206    
207        return parameterTypes;
208        }
209    
210      protected ScriptEvaluator getEvaluator( Class returnType, String[] parameterNames, Class[] parameterTypes )
211        {
212        try
213          {
214          return new ScriptEvaluator( block, returnType, parameterNames, parameterTypes );
215          }
216        catch( CompileException exception )
217          {
218          throw new OperationException( "could not compile script: " + block, exception );
219          }
220        }
221    
222      private Fields makeFields( String[] parameters )
223        {
224        Comparable[] fields = new Comparable[ parameters.length ];
225    
226        for( int i = 0; i < parameters.length; i++ )
227          {
228          String parameter = parameters[ i ];
229    
230          if( parameter.startsWith( "$" ) )
231            fields[ i ] = parse( parameter ); // returns parameter if not a number after $
232          else
233            fields[ i ] = parameter;
234          }
235    
236        return new Fields( fields );
237        }
238    
239      private Comparable parse( String parameter )
240        {
241        try
242          {
243          return Integer.parseInt( parameter.substring( 1 ) );
244          }
245        catch( NumberFormatException exception )
246          {
247          return parameter;
248          }
249        }
250    
251      @Override
252      public void prepare( FlowProcess flowProcess, OperationCall<Context> operationCall )
253        {
254        if( operationCall.getContext() == null )
255          operationCall.setContext( new Context() );
256    
257        Context context = operationCall.getContext();
258    
259        Fields argumentFields = operationCall.getArgumentFields();
260    
261        if( hasParameterNames() && hasParameterTypes() )
262          {
263          context.parameterNames = getParameterNamesInternal();
264          context.parameterFields = argumentFields.select( getParameterFields() ); // inherit argument types
265          context.parameterTypes = getParameterTypesInternal();
266          }
267        else if( hasParameterTypes() )
268          {
269          context.parameterNames = toNames( argumentFields );
270          context.parameterFields = argumentFields.applyTypes( getParameterTypesInternal() );
271          context.parameterTypes = getParameterTypesInternal();
272          }
273        else
274          {
275          context.parameterNames = toNames( argumentFields );
276          context.parameterFields = argumentFields;
277          context.parameterTypes = argumentFields.getTypesClasses();
278    
279          if( argumentFields.isNone() )
280            context.parameterTypes = new Class[ 0 ]; // to match names
281    
282          if( context.parameterTypes == null )
283            throw new IllegalArgumentException( "field types may not be empty, incoming tuple stream should declare field types" );
284          }
285    
286        context.parameterCoercions = Coercions.coercibleArray( context.parameterFields );
287        context.parameterArray = new Object[ context.parameterTypes.length ]; // re-use object array
288        context.scriptEvaluator = getEvaluator( getReturnType(), context.parameterNames, context.parameterTypes );
289        context.intermediate = TupleViews.createNarrow( argumentFields.getPos( context.parameterFields ) );
290        context.result = Tuple.size( 1 ); // re-use the output tuple
291        }
292    
293      private String[] toNames( Fields argumentFields )
294        {
295        String[] names = new String[ argumentFields.size() ];
296    
297        for( int i = 0; i < names.length; i++ )
298          {
299          Comparable comparable = argumentFields.get( i );
300          if( comparable instanceof String )
301            names[ i ] = (String) comparable;
302          else
303            names[ i ] = "$" + comparable;
304          }
305    
306        return names;
307        }
308    
309      public Class getReturnType()
310        {
311        return returnType;
312        }
313    
314      /**
315       * Performs the actual expression evaluation.
316       *
317       * @param context
318       * @param input   of type TupleEntry
319       * @return Comparable
320       */
321      protected Object evaluate( Context context, TupleEntry input )
322        {
323        try
324          {
325          if( context.parameterTypes.length == 0 )
326            return context.scriptEvaluator.evaluate( null );
327    
328          Tuple parameterTuple = TupleViews.reset( context.intermediate, input.getTuple() );
329          Object[] arguments = Tuples.asArray( parameterTuple, context.parameterCoercions, context.parameterTypes, context.parameterArray );
330    
331          return context.scriptEvaluator.evaluate( arguments );
332          }
333        catch( InvocationTargetException exception )
334          {
335          throw new OperationException( "could not evaluate expression: " + block, exception.getTargetException() );
336          }
337        }
338    
339      @Override
340      public boolean equals( Object object )
341        {
342        if( this == object )
343          return true;
344        if( !( object instanceof ExpressionOperation ) )
345          return false;
346        if( !super.equals( object ) )
347          return false;
348    
349        ExpressionOperation that = (ExpressionOperation) object;
350    
351        if( block != null ? !block.equals( that.block ) : that.block != null )
352          return false;
353        if( !Arrays.equals( parameterNames, that.parameterNames ) )
354          return false;
355        if( !Arrays.equals( parameterTypes, that.parameterTypes ) )
356          return false;
357    
358        return true;
359        }
360    
361      @Override
362      public int hashCode()
363        {
364        int result = super.hashCode();
365        result = 31 * result + ( block != null ? block.hashCode() : 0 );
366        result = 31 * result + ( parameterTypes != null ? Arrays.hashCode( parameterTypes ) : 0 );
367        result = 31 * result + ( parameterNames != null ? Arrays.hashCode( parameterNames ) : 0 );
368        return result;
369        }
370    
371      public static class Context
372        {
373        private Class[] parameterTypes;
374        private ScriptEvaluator scriptEvaluator;
375        private Fields parameterFields;
376        private CoercibleType[] parameterCoercions;
377        private String[] parameterNames;
378        private Object[] parameterArray;
379        private Tuple intermediate;
380        protected Tuple result;
381        }
382      }