001/*
002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.operation;
022
023import java.beans.ConstructorProperties;
024import java.lang.reflect.Type;
025import java.util.Arrays;
026
027import cascading.flow.FlowProcess;
028import cascading.tuple.Fields;
029import cascading.tuple.Tuple;
030import cascading.tuple.TupleEntry;
031import cascading.tuple.TupleEntryCollector;
032import cascading.util.Util;
033
034/**
035 * The Identity function simply passes incoming arguments back out again. Optionally argument fields can be renamed, and/or
036 * coerced into specific types.
037 * <p/>
038 * During coercion, if the given type is a primitive ({@code long}), and the tuple value is null, {@code 0} is returned.
039 * If the type is an Object ({@code java.lang.Long}), and the tuple value is {@code null}, {@code null} is returned.
040 */
041public class Identity extends BaseOperation<Identity.Functor> implements Function<Identity.Functor>
042  {
043  /** Field types */
044  private Type[] types = null;
045
046  /**
047   * Constructor Identity creates a new Identity instance that will pass the argument values to its output. Use this
048   * constructor for a simple copy Pipe.
049   */
050  public Identity()
051    {
052    super( Fields.ARGS );
053    }
054
055  /**
056   * Constructor Identity creates a new Identity instance that will coerce the values to the give types.
057   *
058   * @param types of type Class...
059   */
060  @ConstructorProperties({"types"})
061  public Identity( Class... types )
062    {
063    super( Fields.ARGS );
064
065    if( types.length == 0 )
066      throw new IllegalArgumentException( "number of types must not be zero" );
067
068    this.types = Arrays.copyOf( types, types.length );
069    }
070
071  /**
072   * Constructor Identity creates a new Identity instance that will rename the argument fields to the given
073   * fieldDeclaration.
074   *
075   * @param fieldDeclaration of type Fields
076   */
077  @ConstructorProperties({"fieldDeclaration"})
078  public Identity( Fields fieldDeclaration )
079    {
080    super( fieldDeclaration ); // don't need to set size, default is ANY
081
082    this.types = fieldDeclaration.getTypes();
083    }
084
085  /**
086   * Constructor Identity creates a new Identity instance that will rename the argument fields to the given
087   * fieldDeclaration, and coerce the values to the give types.
088   *
089   * @param fieldDeclaration of type Fields
090   * @param types            of type Class...
091   */
092  @ConstructorProperties({"fieldDeclaration", "types"})
093  public Identity( Fields fieldDeclaration, Class... types )
094    {
095    super( fieldDeclaration );
096    this.types = Arrays.copyOf( types, types.length );
097
098    if( !fieldDeclaration.isSubstitution() && fieldDeclaration.size() != types.length )
099      throw new IllegalArgumentException( "fieldDeclaration and types must be the same size" );
100    }
101
102  public Type[] getTypes()
103    {
104    return Util.copy( types );
105    }
106
107  @Override
108  public void prepare( FlowProcess flowProcess, OperationCall<Functor> operationCall )
109    {
110    Functor functor;
111
112    if( types != null )
113      {
114      functor = new Functor()
115      {
116      Tuple result = Tuple.size( types.length );
117
118      @Override
119      public void operate( FunctionCall<Functor> functionCall )
120        {
121        TupleEntry input = functionCall.getArguments();
122        TupleEntryCollector outputCollector = functionCall.getOutputCollector();
123
124        outputCollector.add( input.getCoercedTuple( types, result ) );
125        }
126      };
127      }
128    else
129      {
130      functor = new Functor()
131      {
132      @Override
133      public void operate( FunctionCall<Functor> functionCall )
134        {
135        TupleEntryCollector outputCollector = functionCall.getOutputCollector();
136
137        outputCollector.add( functionCall.getArguments().getTuple() );
138        }
139      };
140      operationCall.setContext( functor );
141      }
142
143    operationCall.setContext( functor );
144
145    }
146
147  @Override
148  public void operate( FlowProcess flowProcess, FunctionCall<Functor> functionCall )
149    {
150    functionCall.getContext().operate( functionCall );
151    }
152
153  @Override
154  public boolean equals( Object object )
155    {
156    if( this == object )
157      return true;
158    if( !( object instanceof Identity ) )
159      return false;
160    if( !super.equals( object ) )
161      return false;
162
163    Identity identity = (Identity) object;
164
165    if( !Arrays.equals( types, identity.types ) )
166      return false;
167
168    return true;
169    }
170
171  @Override
172  public int hashCode()
173    {
174    int result = super.hashCode();
175    result = 31 * result + ( types != null ? Arrays.hashCode( types ) : 0 );
176    return result;
177    }
178
179  interface Functor
180    {
181    void operate( FunctionCall<Functor> functionCall );
182    }
183  }