001/*
002 * Copyright (c) 2016-2017 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
004 *
005 * Project and contact information: http://www.cascading.org/
006 *
007 * This file is part of the Cascading project.
008 *
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *     http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 */
021
022package cascading.operation.aggregator;
023
024import java.beans.ConstructorProperties;
025
026import cascading.flow.FlowProcess;
027import cascading.operation.Aggregator;
028import cascading.operation.AggregatorCall;
029import cascading.operation.BaseOperation;
030import cascading.operation.OperationCall;
031import cascading.tuple.Fields;
032import cascading.tuple.Tuple;
033import cascading.util.Pair;
034
035/**
036 * Class Count is an {@link Aggregator} that calculates the number of items in the current group.
037 * <p>
038 * Note the resulting value for count is always a long. So any comparisons should be against a long value.
039 */
040public class Count extends BaseOperation<Pair<Long[], Tuple>> implements Aggregator<Pair<Long[], Tuple>>
041  {
042  /** Field COUNT */
043  public static final String FIELD_NAME = "count";
044
045  /** Constructor Count creates a new Count instance using the default field declaration of name 'count'. */
046  public Count()
047    {
048    super( new Fields( FIELD_NAME, Long.class ) );
049    }
050
051  /**
052   * Constructor Count creates a new Count instance and returns a field with the given fieldDeclaration name.
053   *
054   * @param fieldDeclaration of type Fields
055   */
056  @ConstructorProperties({"fieldDeclaration"})
057  public Count( Fields fieldDeclaration )
058    {
059    super( fieldDeclaration ); // allow ANY number of arguments
060    }
061
062  @Override
063  public void prepare( FlowProcess flowProcess, OperationCall<Pair<Long[], Tuple>> operationCall )
064    {
065    operationCall.setContext( new Pair<Long[], Tuple>( new Long[]{0L}, Tuple.size( 1 ) ) );
066    }
067
068  @Override
069  public void start( FlowProcess flowProcess, AggregatorCall<Pair<Long[], Tuple>> aggregatorCall )
070    {
071    aggregatorCall.getContext().getLhs()[ 0 ] = 0L;
072    }
073
074  @Override
075  public void aggregate( FlowProcess flowProcess, AggregatorCall<Pair<Long[], Tuple>> aggregatorCall )
076    {
077    aggregatorCall.getContext().getLhs()[ 0 ] += 1L;
078    }
079
080  @Override
081  public void complete( FlowProcess flowProcess, AggregatorCall<Pair<Long[], Tuple>> aggregatorCall )
082    {
083    aggregatorCall.getOutputCollector().add( getResult( aggregatorCall ) );
084    }
085
086  protected Tuple getResult( AggregatorCall<Pair<Long[], Tuple>> aggregatorCall )
087    {
088    aggregatorCall.getContext().getRhs().set( 0, aggregatorCall.getContext().getLhs()[ 0 ] );
089
090    return aggregatorCall.getContext().getRhs();
091    }
092  }