001/*
002 * Copyright (c) 2016-2017 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
004 *
005 * Project and contact information: http://www.cascading.org/
006 *
007 * This file is part of the Cascading project.
008 *
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *     http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 */
021
022package cascading.operation.aggregator;
023
024import java.beans.ConstructorProperties;
025
026import cascading.flow.FlowProcess;
027import cascading.management.annotation.Property;
028import cascading.management.annotation.PropertyDescription;
029import cascading.management.annotation.Visibility;
030import cascading.operation.Aggregator;
031import cascading.operation.AggregatorCall;
032import cascading.tuple.Fields;
033import cascading.tuple.Tuple;
034import cascading.tuple.TupleEntry;
035
036/**
037 * Class First is an {@link Aggregator} that returns the first {@link Tuple} encountered in a grouping.
038 * <p>
039 * By default, it returns the first Tuple of {@link Fields#ARGS} found.
040 * <p>
041 * If {@code firstN} is given, Tuples with each of the first N number of Tuples encountered are returned. That is,
042 * this Aggregator will return at maximum N tuples per grouping.
043 * <p>
044 * Be sure to set the {@link cascading.pipe.GroupBy} {@code sortFields} to control which Tuples are seen first.
045 */
046public class First extends ExtentBase
047  {
048  private final int firstN;
049
050  /** Selects and returns the first argument Tuple encountered. */
051  public First()
052    {
053    super( Fields.ARGS );
054
055    this.firstN = 1;
056    }
057
058  /**
059   * Selects and returns the first N argument Tuples encountered.
060   *
061   * @param firstN of type int
062   */
063  @ConstructorProperties({"firstN"})
064  public First( int firstN )
065    {
066    super( Fields.ARGS );
067
068    this.firstN = firstN;
069    }
070
071  /**
072   * Selects and returns the first argument Tuple encountered.
073   *
074   * @param fieldDeclaration of type Fields
075   */
076  @ConstructorProperties({"fieldDeclaration"})
077  public First( Fields fieldDeclaration )
078    {
079    super( fieldDeclaration.size(), fieldDeclaration );
080
081    this.firstN = 1;
082    }
083
084  /**
085   * Selects and returns the first N argument Tuples encountered.
086   *
087   * @param fieldDeclaration of type Fields
088   * @param firstN           of type int
089   */
090  @ConstructorProperties({"fieldDeclaration", "firstN"})
091  public First( Fields fieldDeclaration, int firstN )
092    {
093    super( fieldDeclaration.size(), fieldDeclaration );
094
095    this.firstN = firstN;
096    }
097
098  /**
099   * Selects and returns the first argument Tuple encountered, unless the Tuple
100   * is a member of the set ignoreTuples.
101   *
102   * @param fieldDeclaration of type Fields
103   * @param ignoreTuples     of type Tuple...
104   */
105  @ConstructorProperties({"fieldDeclaration", "ignoreTuples"})
106  public First( Fields fieldDeclaration, Tuple... ignoreTuples )
107    {
108    super( fieldDeclaration, ignoreTuples );
109
110    this.firstN = 1;
111    }
112
113  @Property(name = "firstN", visibility = Visibility.PUBLIC)
114  @PropertyDescription("The number of tuples to return.")
115  public int getFirstN()
116    {
117    return firstN;
118    }
119
120  protected void performOperation( Tuple[] context, TupleEntry entry )
121    {
122    if( context[ 0 ] == null )
123      context[ 0 ] = new Tuple();
124
125    if( context[ 0 ].size() < firstN )
126      context[ 0 ].add( entry.getTupleCopy() );
127    }
128
129  @Override
130  public void complete( FlowProcess flowProcess, AggregatorCall<Tuple[]> aggregatorCall )
131    {
132    Tuple context = aggregatorCall.getContext()[ 0 ];
133
134    if( context == null )
135      return;
136
137    for( Object tuple : context )
138      aggregatorCall.getOutputCollector().add( (Tuple) tuple );
139    }
140  }