001/* 002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.operation.aggregator; 022 023import java.beans.ConstructorProperties; 024import java.lang.reflect.Type; 025 026import cascading.flow.FlowProcess; 027import cascading.operation.Aggregator; 028import cascading.operation.AggregatorCall; 029import cascading.operation.BaseOperation; 030import cascading.operation.OperationCall; 031import cascading.tuple.Fields; 032import cascading.tuple.Tuple; 033import cascading.tuple.TupleEntry; 034import cascading.tuple.coerce.Coercions; 035import cascading.tuple.type.CoercibleType; 036 037/** Class Average is an {@link Aggregator} that returns the average of all numeric values in the current group. */ 038public class Average extends BaseOperation<Average.Context> implements Aggregator<Average.Context> 039 { 040 /** Field FIELD_NAME */ 041 public static final String FIELD_NAME = "average"; 042 043 /** Field type */ 044 private Type type = Double.class; 045 private CoercibleType canonical; 046 047 /** Class Context is used to hold intermediate values. */ 048 protected static class Context 049 { 050 private final CoercibleType canonical; 051 052 Tuple tuple = Tuple.size( 1 ); 053 double sum = 0.0D; 054 long count = 0L; 055 056 public Context( CoercibleType canonical ) 057 { 058 this.canonical = canonical; 059 } 060 061 public Context reset() 062 { 063 sum = 0.0D; 064 count = 0L; 065 066 return this; 067 } 068 069 public Tuple result() 070 { 071 tuple.set( 0, canonical.canonical( sum / count ) ); 072 073 return tuple; 074 } 075 } 076 077 /** Constructs a new instance that returns the average of the values encountered in the field name "average". */ 078 public Average() 079 { 080 super( 1, new Fields( FIELD_NAME, Double.class ) ); 081 082 this.canonical = Coercions.coercibleTypeFor( this.type ); 083 } 084 085 /** 086 * Constructs a new instance that returns the average of the values encountered in the given fieldDeclaration field name. 087 * 088 * @param fieldDeclaration of type Fields 089 */ 090 @ConstructorProperties({"fieldDeclaration"}) 091 public Average( Fields fieldDeclaration ) 092 { 093 super( 1, fieldDeclaration ); 094 095 if( !fieldDeclaration.isSubstitution() && fieldDeclaration.size() != 1 ) 096 throw new IllegalArgumentException( "fieldDeclaration may only declare 1 field, got: " + fieldDeclaration.size() ); 097 098 if( fieldDeclaration.hasTypes() ) 099 this.type = fieldDeclaration.getType( 0 ); 100 101 this.canonical = Coercions.coercibleTypeFor( this.type ); 102 } 103 104 @Override 105 public void prepare( FlowProcess flowProcess, OperationCall<Context> operationCall ) 106 { 107 operationCall.setContext( new Context( canonical ) ); 108 } 109 110 @Override 111 public void start( FlowProcess flowProcess, AggregatorCall<Context> aggregatorCall ) 112 { 113 aggregatorCall.getContext().reset(); 114 } 115 116 @Override 117 public void aggregate( FlowProcess flowProcess, AggregatorCall<Context> aggregatorCall ) 118 { 119 Context context = aggregatorCall.getContext(); 120 TupleEntry arguments = aggregatorCall.getArguments(); 121 122 context.sum += arguments.getDouble( 0 ); 123 context.count += 1L; 124 } 125 126 @Override 127 public void complete( FlowProcess flowProcess, AggregatorCall<Context> aggregatorCall ) 128 { 129 aggregatorCall.getOutputCollector().add( getResult( aggregatorCall ) ); 130 } 131 132 private Tuple getResult( AggregatorCall<Context> aggregatorCall ) 133 { 134 return aggregatorCall.getContext().result(); 135 } 136 }