001/* 002 * Copyright (c) 2016-2017 Chris K Wensel <chris@wensel.net>. All Rights Reserved. 003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 004 * 005 * Project and contact information: http://www.cascading.org/ 006 * 007 * This file is part of the Cascading project. 008 * 009 * Licensed under the Apache License, Version 2.0 (the "License"); 010 * you may not use this file except in compliance with the License. 011 * You may obtain a copy of the License at 012 * 013 * http://www.apache.org/licenses/LICENSE-2.0 014 * 015 * Unless required by applicable law or agreed to in writing, software 016 * distributed under the License is distributed on an "AS IS" BASIS, 017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 018 * See the License for the specific language governing permissions and 019 * limitations under the License. 020 */ 021 022package cascading.operation.aggregator; 023 024import java.beans.ConstructorProperties; 025 026import cascading.flow.FlowProcess; 027import cascading.operation.Aggregator; 028import cascading.operation.AggregatorCall; 029import cascading.operation.BaseOperation; 030import cascading.operation.OperationCall; 031import cascading.tuple.Fields; 032import cascading.tuple.Tuple; 033import cascading.util.Pair; 034 035/** 036 * Class Count is an {@link Aggregator} that calculates the number of items in the current group. 037 * <p> 038 * Note the resulting value for count is always a long. So any comparisons should be against a long value. 039 */ 040public class Count extends BaseOperation<Pair<Long[], Tuple>> implements Aggregator<Pair<Long[], Tuple>> 041 { 042 /** Field COUNT */ 043 public static final String FIELD_NAME = "count"; 044 045 /** Constructor Count creates a new Count instance using the default field declaration of name 'count'. */ 046 public Count() 047 { 048 super( new Fields( FIELD_NAME, Long.class ) ); 049 } 050 051 /** 052 * Constructor Count creates a new Count instance and returns a field with the given fieldDeclaration name. 053 * 054 * @param fieldDeclaration of type Fields 055 */ 056 @ConstructorProperties({"fieldDeclaration"}) 057 public Count( Fields fieldDeclaration ) 058 { 059 super( fieldDeclaration ); // allow ANY number of arguments 060 } 061 062 @Override 063 public void prepare( FlowProcess flowProcess, OperationCall<Pair<Long[], Tuple>> operationCall ) 064 { 065 operationCall.setContext( new Pair<Long[], Tuple>( new Long[]{0L}, Tuple.size( 1 ) ) ); 066 } 067 068 @Override 069 public void start( FlowProcess flowProcess, AggregatorCall<Pair<Long[], Tuple>> aggregatorCall ) 070 { 071 aggregatorCall.getContext().getLhs()[ 0 ] = 0L; 072 } 073 074 @Override 075 public void aggregate( FlowProcess flowProcess, AggregatorCall<Pair<Long[], Tuple>> aggregatorCall ) 076 { 077 aggregatorCall.getContext().getLhs()[ 0 ] += 1L; 078 } 079 080 @Override 081 public void complete( FlowProcess flowProcess, AggregatorCall<Pair<Long[], Tuple>> aggregatorCall ) 082 { 083 aggregatorCall.getOutputCollector().add( getResult( aggregatorCall ) ); 084 } 085 086 protected Tuple getResult( AggregatorCall<Pair<Long[], Tuple>> aggregatorCall ) 087 { 088 aggregatorCall.getContext().getRhs().set( 0, aggregatorCall.getContext().getLhs()[ 0 ] ); 089 090 return aggregatorCall.getContext().getRhs(); 091 } 092 }