001    /*
002     * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.operation;
022    
023    import java.beans.ConstructorProperties;
024    import java.lang.reflect.Type;
025    import java.util.Arrays;
026    
027    import cascading.flow.FlowProcess;
028    import cascading.tuple.Fields;
029    import cascading.tuple.Tuple;
030    import cascading.tuple.TupleEntry;
031    import cascading.tuple.TupleEntryCollector;
032    import cascading.util.Util;
033    
034    /**
035     * The Identity function simply passes incoming arguments back out again. Optionally argument fields can be renamed, and/or
036     * coerced into specific types.
037     * <p/>
038     * During coercion, if the given type is a primitive ({@code long}), and the tuple value is null, {@code 0} is returned.
039     * If the type is an Object ({@code java.lang.Long}), and the tuple value is {@code null}, {@code null} is returned.
040     */
041    public class Identity extends BaseOperation<Identity.Functor> implements Function<Identity.Functor>
042      {
043      /** Field types */
044      private Type[] types = null;
045    
046      /**
047       * Constructor Identity creates a new Identity instance that will pass the argument values to its output. Use this
048       * constructor for a simple copy Pipe.
049       */
050      public Identity()
051        {
052        super( Fields.ARGS );
053        }
054    
055      /**
056       * Constructor Identity creates a new Identity instance that will coerce the values to the give types.
057       *
058       * @param types of type Class...
059       */
060      @ConstructorProperties({"types"})
061      public Identity( Class... types )
062        {
063        super( Fields.ARGS );
064    
065        if( types.length == 0 )
066          throw new IllegalArgumentException( "number of types must not be zero" );
067    
068        this.types = Arrays.copyOf( types, types.length );
069        }
070    
071      /**
072       * Constructor Identity creates a new Identity instance that will rename the argument fields to the given
073       * fieldDeclaration.
074       *
075       * @param fieldDeclaration of type Fields
076       */
077      @ConstructorProperties({"fieldDeclaration"})
078      public Identity( Fields fieldDeclaration )
079        {
080        super( fieldDeclaration ); // don't need to set size, default is ANY
081    
082        this.types = fieldDeclaration.getTypes();
083        }
084    
085      /**
086       * Constructor Identity creates a new Identity instance that will rename the argument fields to the given
087       * fieldDeclaration, and coerce the values to the give types.
088       *
089       * @param fieldDeclaration of type Fields
090       * @param types            of type Class...
091       */
092      @ConstructorProperties({"fieldDeclaration", "types"})
093      public Identity( Fields fieldDeclaration, Class... types )
094        {
095        super( fieldDeclaration );
096        this.types = Arrays.copyOf( types, types.length );
097    
098        if( !fieldDeclaration.isSubstitution() && fieldDeclaration.size() != types.length )
099          throw new IllegalArgumentException( "fieldDeclaration and types must be the same size" );
100        }
101    
102      public Type[] getTypes()
103        {
104        return Util.copy( types );
105        }
106    
107      @Override
108      public void prepare( FlowProcess flowProcess, OperationCall<Functor> operationCall )
109        {
110        Functor functor;
111    
112        if( types != null )
113          {
114          functor = new Functor()
115          {
116          Tuple result = Tuple.size( types.length );
117    
118          @Override
119          public void operate( FunctionCall<Functor> functionCall )
120            {
121            TupleEntry input = functionCall.getArguments();
122            TupleEntryCollector outputCollector = functionCall.getOutputCollector();
123    
124            outputCollector.add( input.getCoercedTuple( types, result ) );
125            }
126          };
127          }
128        else
129          {
130          functor = new Functor()
131          {
132          @Override
133          public void operate( FunctionCall<Functor> functionCall )
134            {
135            TupleEntryCollector outputCollector = functionCall.getOutputCollector();
136    
137            outputCollector.add( functionCall.getArguments().getTuple() );
138            }
139          };
140          operationCall.setContext( functor );
141          }
142    
143        operationCall.setContext( functor );
144    
145        }
146    
147      @Override
148      public void operate( FlowProcess flowProcess, FunctionCall<Functor> functionCall )
149        {
150        functionCall.getContext().operate( functionCall );
151        }
152    
153      @Override
154      public boolean equals( Object object )
155        {
156        if( this == object )
157          return true;
158        if( !( object instanceof Identity ) )
159          return false;
160        if( !super.equals( object ) )
161          return false;
162    
163        Identity identity = (Identity) object;
164    
165        if( !Arrays.equals( types, identity.types ) )
166          return false;
167    
168        return true;
169        }
170    
171      @Override
172      public int hashCode()
173        {
174        int result = super.hashCode();
175        result = 31 * result + ( types != null ? Arrays.hashCode( types ) : 0 );
176        return result;
177        }
178    
179      interface Functor
180        {
181        void operate( FunctionCall<Functor> functionCall );
182        }
183      }