001/*
002 * Copyright (c) 2016-2017 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
004 *
005 * Project and contact information: http://www.cascading.org/
006 *
007 * This file is part of the Cascading project.
008 *
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *     http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 */
021
022package cascading.operation.buffer;
023
024import java.beans.ConstructorProperties;
025import java.util.Iterator;
026
027import cascading.flow.FlowProcess;
028import cascading.management.annotation.Property;
029import cascading.management.annotation.PropertyDescription;
030import cascading.management.annotation.Visibility;
031import cascading.operation.BaseOperation;
032import cascading.operation.Buffer;
033import cascading.operation.BufferCall;
034import cascading.tuple.Fields;
035import cascading.tuple.TupleEntry;
036
037/**
038 * Class FirstNBuffer will return the first N tuples seen in a given grouping. After the tuples
039 * are returned the Buffer stops iterating the arguments unlike the {@link cascading.operation.aggregator.First}
040 * {@link cascading.operation.Aggregator} which by contract sees all the values in the grouping.
041 * <p>
042 * By default it returns one Tuple.
043 * <p>
044 * Order can be controlled through the prior {@link cascading.pipe.GroupBy} or {@link cascading.pipe.CoGroup}
045 * pipes.
046 * <p>
047 * This class is used by {@link cascading.pipe.assembly.Unique}.
048 */
049public class FirstNBuffer extends BaseOperation implements Buffer
050  {
051  private final int firstN;
052
053  /** Selects and returns the first argument Tuple encountered. */
054  public FirstNBuffer()
055    {
056    super( Fields.ARGS );
057
058    firstN = 1;
059    }
060
061  /**
062   * Selects and returns the first N argument Tuples encountered.
063   *
064   * @param firstN of type int
065   */
066  @ConstructorProperties({"firstN"})
067  public FirstNBuffer( int firstN )
068    {
069    super( Fields.ARGS );
070
071    this.firstN = firstN;
072    }
073
074  /**
075   * Selects and returns the first argument Tuple encountered.
076   *
077   * @param fieldDeclaration of type Fields
078   */
079  @ConstructorProperties({"fieldDeclaration"})
080  public FirstNBuffer( Fields fieldDeclaration )
081    {
082    super( fieldDeclaration.size(), fieldDeclaration );
083
084    this.firstN = 1;
085    }
086
087  /**
088   * Selects and returns the first N argument Tuples encountered.
089   *
090   * @param fieldDeclaration of type Fields
091   * @param firstN           of type int
092   */
093  @ConstructorProperties({"fieldDeclaration", "firstN"})
094  public FirstNBuffer( Fields fieldDeclaration, int firstN )
095    {
096    super( fieldDeclaration.size(), fieldDeclaration );
097
098    this.firstN = firstN;
099    }
100
101  @Property(name = "firstN", visibility = Visibility.PUBLIC)
102  @PropertyDescription("The number of tuples to return.")
103  public int getFirstN()
104    {
105    return firstN;
106    }
107
108  @Override
109  public void operate( FlowProcess flowProcess, BufferCall bufferCall )
110    {
111    Iterator<TupleEntry> iterator = bufferCall.getArgumentsIterator();
112
113    int count = 0;
114
115    while( count < firstN && iterator.hasNext() )
116      {
117      bufferCall.getOutputCollector().add( iterator.next() );
118      count++;
119      }
120    }
121  }