001/* 002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.operation.aggregator; 022 023import java.beans.ConstructorProperties; 024import java.lang.reflect.Type; 025 026import cascading.flow.FlowProcess; 027import cascading.operation.Aggregator; 028import cascading.operation.AggregatorCall; 029import cascading.operation.BaseOperation; 030import cascading.operation.OperationCall; 031import cascading.tuple.Fields; 032import cascading.tuple.Tuple; 033import cascading.tuple.TupleEntry; 034import cascading.tuple.coerce.Coercions; 035import cascading.tuple.type.CoercibleType; 036import cascading.util.Pair; 037 038/** Class Sum is an {@link Aggregator} that returns the sum of all numeric values in the current group. */ 039public class Sum extends BaseOperation<Pair<Double[], Tuple>> implements Aggregator<Pair<Double[], Tuple>> 040 { 041 /** Field FIELD_NAME */ 042 public static final String FIELD_NAME = "sum"; 043 044 /** Field type */ 045 private Type type = Double.class; 046 private CoercibleType canonical; 047 048 /** Constructor Sum creates a new Sum instance that accepts one argument and returns a single field named "sum". */ 049 public Sum() 050 { 051 super( 1, new Fields( FIELD_NAME, Double.class ) ); 052 this.canonical = Coercions.coercibleTypeFor( this.type ); 053 } 054 055 /** 056 * Constructs a new instance that returns the fields declared in fieldDeclaration and accepts 057 * only 1 argument. 058 * <p/> 059 * If the given {@code fieldDeclaration} has a type, it will be used to coerce the result value. 060 * 061 * @param fieldDeclaration of type Fields 062 */ 063 @ConstructorProperties({"fieldDeclaration"}) 064 public Sum( Fields fieldDeclaration ) 065 { 066 super( 1, fieldDeclaration ); 067 068 if( !fieldDeclaration.isSubstitution() && fieldDeclaration.size() != 1 ) 069 throw new IllegalArgumentException( "fieldDeclaration may only declare 1 field, got: " + fieldDeclaration.size() ); 070 071 if( fieldDeclaration.hasTypes() ) 072 this.type = fieldDeclaration.getType( 0 ); 073 074 this.canonical = Coercions.coercibleTypeFor( this.type ); 075 } 076 077 /** 078 * Constructs a new instance that returns the fields declared in fieldDeclaration and accepts 079 * only 1 argument. The return result is coerced into the given Class type. 080 * 081 * @param fieldDeclaration of type Fields 082 * @param type of type Class 083 */ 084 @ConstructorProperties({"fieldDeclaration", "type"}) 085 public Sum( Fields fieldDeclaration, Class type ) 086 { 087 this( fieldDeclaration.applyTypes( type ) ); 088 this.type = type; 089 this.canonical = Coercions.coercibleTypeFor( this.type ); 090 } 091 092 public Type getType() 093 { 094 return type; 095 } 096 097 @Override 098 public void prepare( FlowProcess flowProcess, OperationCall<Pair<Double[], Tuple>> operationCall ) 099 { 100 operationCall.setContext( new Pair<Double[], Tuple>( new Double[]{null}, Tuple.size( 1 ) ) ); 101 } 102 103 @Override 104 public void start( FlowProcess flowProcess, AggregatorCall<Pair<Double[], Tuple>> aggregatorCall ) 105 { 106 aggregatorCall.getContext().getLhs()[ 0 ] = null; 107 aggregatorCall.getContext().getRhs().set( 0, null ); 108 } 109 110 @Override 111 public void aggregate( FlowProcess flowProcess, AggregatorCall<Pair<Double[], Tuple>> aggregatorCall ) 112 { 113 TupleEntry arguments = aggregatorCall.getArguments(); 114 115 if( arguments.getObject( 0 ) == null ) 116 return; 117 118 Double[] sum = aggregatorCall.getContext().getLhs(); 119 120 double value = sum[ 0 ] == null ? 0 : sum[ 0 ]; 121 sum[ 0 ] = value + arguments.getDouble( 0 ); 122 } 123 124 @Override 125 public void complete( FlowProcess flowProcess, AggregatorCall<Pair<Double[], Tuple>> aggregatorCall ) 126 { 127 aggregatorCall.getOutputCollector().add( getResult( aggregatorCall ) ); 128 } 129 130 protected Tuple getResult( AggregatorCall<Pair<Double[], Tuple>> aggregatorCall ) 131 { 132 aggregatorCall.getContext().getRhs().set( 0, canonical.canonical( aggregatorCall.getContext().getLhs()[ 0 ] ) ); 133 134 return aggregatorCall.getContext().getRhs(); 135 } 136 137 @Override 138 public boolean equals( Object object ) 139 { 140 if( this == object ) 141 return true; 142 if( !( object instanceof Sum ) ) 143 return false; 144 if( !super.equals( object ) ) 145 return false; 146 147 Sum sum = (Sum) object; 148 149 if( type != null ? !type.equals( sum.type ) : sum.type != null ) 150 return false; 151 152 return true; 153 } 154 155 @Override 156 public int hashCode() 157 { 158 int result = super.hashCode(); 159 result = 31 * result + ( type != null ? type.hashCode() : 0 ); 160 return result; 161 } 162 }