001/*
002 * Copyright (c) 2016-2017 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
004 *
005 * Project and contact information: http://www.cascading.org/
006 *
007 * This file is part of the Cascading project.
008 *
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *     http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 */
021
022package cascading.operation.aggregator;
023
024import java.beans.ConstructorProperties;
025import java.lang.reflect.Type;
026
027import cascading.flow.FlowProcess;
028import cascading.operation.Aggregator;
029import cascading.operation.AggregatorCall;
030import cascading.operation.BaseOperation;
031import cascading.operation.OperationCall;
032import cascading.tuple.Fields;
033import cascading.tuple.Tuple;
034import cascading.tuple.TupleEntry;
035import cascading.tuple.coerce.Coercions;
036import cascading.tuple.type.CoercibleType;
037import cascading.util.Pair;
038
039/** Class Sum is an {@link Aggregator} that returns the sum of all numeric values in the current group. */
040public class Sum extends BaseOperation<Pair<Double[], Tuple>> implements Aggregator<Pair<Double[], Tuple>>
041  {
042  /** Field FIELD_NAME */
043  public static final String FIELD_NAME = "sum";
044
045  /** Field type */
046  private Type type = Double.class;
047  private CoercibleType canonical;
048
049  /** Constructor Sum creates a new Sum instance that accepts one argument and returns a single field named "sum". */
050  public Sum()
051    {
052    super( 1, new Fields( FIELD_NAME, Double.class ) );
053    this.canonical = Coercions.coercibleTypeFor( this.type );
054    }
055
056  /**
057   * Constructs a new instance that returns the fields declared in fieldDeclaration and accepts
058   * only 1 argument.
059   * <p>
060   * If the given {@code fieldDeclaration} has a type, it will be used to coerce the result value.
061   *
062   * @param fieldDeclaration of type Fields
063   */
064  @ConstructorProperties({"fieldDeclaration"})
065  public Sum( Fields fieldDeclaration )
066    {
067    super( 1, fieldDeclaration );
068
069    if( !fieldDeclaration.isSubstitution() && fieldDeclaration.size() != 1 )
070      throw new IllegalArgumentException( "fieldDeclaration may only declare 1 field, got: " + fieldDeclaration.size() );
071
072    if( fieldDeclaration.hasTypes() )
073      this.type = fieldDeclaration.getType( 0 );
074
075    this.canonical = Coercions.coercibleTypeFor( this.type );
076    }
077
078  /**
079   * Constructs a new instance that returns the fields declared in fieldDeclaration and accepts
080   * only 1 argument. The return result is coerced into the given Class type.
081   *
082   * @param fieldDeclaration of type Fields
083   * @param type             of type Class
084   */
085  @ConstructorProperties({"fieldDeclaration", "type"})
086  public Sum( Fields fieldDeclaration, Class type )
087    {
088    this( fieldDeclaration.applyTypes( type ) );
089    this.type = type;
090    this.canonical = Coercions.coercibleTypeFor( this.type );
091    }
092
093  public Type getType()
094    {
095    return type;
096    }
097
098  @Override
099  public void prepare( FlowProcess flowProcess, OperationCall<Pair<Double[], Tuple>> operationCall )
100    {
101    operationCall.setContext( new Pair<Double[], Tuple>( new Double[]{null}, Tuple.size( 1 ) ) );
102    }
103
104  @Override
105  public void start( FlowProcess flowProcess, AggregatorCall<Pair<Double[], Tuple>> aggregatorCall )
106    {
107    aggregatorCall.getContext().getLhs()[ 0 ] = null;
108    aggregatorCall.getContext().getRhs().set( 0, null );
109    }
110
111  @Override
112  public void aggregate( FlowProcess flowProcess, AggregatorCall<Pair<Double[], Tuple>> aggregatorCall )
113    {
114    TupleEntry arguments = aggregatorCall.getArguments();
115
116    if( arguments.getObject( 0 ) == null )
117      return;
118
119    Double[] sum = aggregatorCall.getContext().getLhs();
120
121    double value = sum[ 0 ] == null ? 0 : sum[ 0 ];
122    sum[ 0 ] = value + arguments.getDouble( 0 );
123    }
124
125  @Override
126  public void complete( FlowProcess flowProcess, AggregatorCall<Pair<Double[], Tuple>> aggregatorCall )
127    {
128    aggregatorCall.getOutputCollector().add( getResult( aggregatorCall ) );
129    }
130
131  protected Tuple getResult( AggregatorCall<Pair<Double[], Tuple>> aggregatorCall )
132    {
133    aggregatorCall.getContext().getRhs().set( 0, canonical.canonical( aggregatorCall.getContext().getLhs()[ 0 ] ) );
134
135    return aggregatorCall.getContext().getRhs();
136    }
137
138  @Override
139  public boolean equals( Object object )
140    {
141    if( this == object )
142      return true;
143    if( !( object instanceof Sum ) )
144      return false;
145    if( !super.equals( object ) )
146      return false;
147
148    Sum sum = (Sum) object;
149
150    if( type != null ? !type.equals( sum.type ) : sum.type != null )
151      return false;
152
153    return true;
154    }
155
156  @Override
157  public int hashCode()
158    {
159    int result = super.hashCode();
160    result = 31 * result + ( type != null ? type.hashCode() : 0 );
161    return result;
162    }
163  }