001/*
002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.operation.buffer;
022
023import java.beans.ConstructorProperties;
024import java.util.Iterator;
025
026import cascading.flow.FlowProcess;
027import cascading.management.annotation.Property;
028import cascading.management.annotation.PropertyDescription;
029import cascading.management.annotation.Visibility;
030import cascading.operation.BaseOperation;
031import cascading.operation.Buffer;
032import cascading.operation.BufferCall;
033import cascading.tuple.Fields;
034import cascading.tuple.TupleEntry;
035
036/**
037 * Class FirstNBuffer will return the first N tuples seen in a given grouping. After the tuples
038 * are returned the Buffer stops iterating the arguments unlike the {@link cascading.operation.aggregator.First}
039 * {@link cascading.operation.Aggregator} which by contract sees all the values in the grouping.
040 * <p/>
041 * By default it returns one Tuple.
042 * <p/>
043 * Order can be controlled through the prior {@link cascading.pipe.GroupBy} or {@link cascading.pipe.CoGroup}
044 * pipes.
045 * <p/>
046 * This class is used by {@link cascading.pipe.assembly.Unique}.
047 */
048public class FirstNBuffer extends BaseOperation implements Buffer
049  {
050  private final int firstN;
051
052  /** Selects and returns the first argument Tuple encountered. */
053  public FirstNBuffer()
054    {
055    super( Fields.ARGS );
056
057    firstN = 1;
058    }
059
060  /**
061   * Selects and returns the first N argument Tuples encountered.
062   *
063   * @param firstN of type int
064   */
065  @ConstructorProperties({"firstN"})
066  public FirstNBuffer( int firstN )
067    {
068    super( Fields.ARGS );
069
070    this.firstN = firstN;
071    }
072
073  /**
074   * Selects and returns the first argument Tuple encountered.
075   *
076   * @param fieldDeclaration of type Fields
077   */
078  @ConstructorProperties({"fieldDeclaration"})
079  public FirstNBuffer( Fields fieldDeclaration )
080    {
081    super( fieldDeclaration.size(), fieldDeclaration );
082
083    this.firstN = 1;
084    }
085
086  /**
087   * Selects and returns the first N argument Tuples encountered.
088   *
089   * @param fieldDeclaration of type Fields
090   * @param firstN           of type int
091   */
092  @ConstructorProperties({"fieldDeclaration", "firstN"})
093  public FirstNBuffer( Fields fieldDeclaration, int firstN )
094    {
095    super( fieldDeclaration.size(), fieldDeclaration );
096
097    this.firstN = firstN;
098    }
099
100  @Property(name = "firstN", visibility = Visibility.PUBLIC)
101  @PropertyDescription("The number of tuples to return.")
102  public int getFirstN()
103    {
104    return firstN;
105    }
106
107  @Override
108  public void operate( FlowProcess flowProcess, BufferCall bufferCall )
109    {
110    Iterator<TupleEntry> iterator = bufferCall.getArgumentsIterator();
111
112    int count = 0;
113
114    while( count < firstN && iterator.hasNext() )
115      {
116      bufferCall.getOutputCollector().add( iterator.next() );
117      count++;
118      }
119    }
120  }