001/* 002 * Copyright (c) 2016-2017 Chris K Wensel <chris@wensel.net>. All Rights Reserved. 003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 004 * 005 * Project and contact information: http://www.cascading.org/ 006 * 007 * This file is part of the Cascading project. 008 * 009 * Licensed under the Apache License, Version 2.0 (the "License"); 010 * you may not use this file except in compliance with the License. 011 * You may obtain a copy of the License at 012 * 013 * http://www.apache.org/licenses/LICENSE-2.0 014 * 015 * Unless required by applicable law or agreed to in writing, software 016 * distributed under the License is distributed on an "AS IS" BASIS, 017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 018 * See the License for the specific language governing permissions and 019 * limitations under the License. 020 */ 021 022package cascading.operation.buffer; 023 024import java.beans.ConstructorProperties; 025import java.util.Iterator; 026 027import cascading.flow.FlowProcess; 028import cascading.management.annotation.Property; 029import cascading.management.annotation.PropertyDescription; 030import cascading.management.annotation.Visibility; 031import cascading.operation.BaseOperation; 032import cascading.operation.Buffer; 033import cascading.operation.BufferCall; 034import cascading.tuple.Fields; 035import cascading.tuple.TupleEntry; 036 037/** 038 * Class FirstNBuffer will return the first N tuples seen in a given grouping. After the tuples 039 * are returned the Buffer stops iterating the arguments unlike the {@link cascading.operation.aggregator.First} 040 * {@link cascading.operation.Aggregator} which by contract sees all the values in the grouping. 041 * <p> 042 * By default it returns one Tuple. 043 * <p> 044 * Order can be controlled through the prior {@link cascading.pipe.GroupBy} or {@link cascading.pipe.CoGroup} 045 * pipes. 046 * <p> 047 * This class is used by {@link cascading.pipe.assembly.Unique}. 048 */ 049public class FirstNBuffer extends BaseOperation implements Buffer 050 { 051 private final int firstN; 052 053 /** Selects and returns the first argument Tuple encountered. */ 054 public FirstNBuffer() 055 { 056 super( Fields.ARGS ); 057 058 firstN = 1; 059 } 060 061 /** 062 * Selects and returns the first N argument Tuples encountered. 063 * 064 * @param firstN of type int 065 */ 066 @ConstructorProperties({"firstN"}) 067 public FirstNBuffer( int firstN ) 068 { 069 super( Fields.ARGS ); 070 071 this.firstN = firstN; 072 } 073 074 /** 075 * Selects and returns the first argument Tuple encountered. 076 * 077 * @param fieldDeclaration of type Fields 078 */ 079 @ConstructorProperties({"fieldDeclaration"}) 080 public FirstNBuffer( Fields fieldDeclaration ) 081 { 082 super( fieldDeclaration.size(), fieldDeclaration ); 083 084 this.firstN = 1; 085 } 086 087 /** 088 * Selects and returns the first N argument Tuples encountered. 089 * 090 * @param fieldDeclaration of type Fields 091 * @param firstN of type int 092 */ 093 @ConstructorProperties({"fieldDeclaration", "firstN"}) 094 public FirstNBuffer( Fields fieldDeclaration, int firstN ) 095 { 096 super( fieldDeclaration.size(), fieldDeclaration ); 097 098 this.firstN = firstN; 099 } 100 101 @Property(name = "firstN", visibility = Visibility.PUBLIC) 102 @PropertyDescription("The number of tuples to return.") 103 public int getFirstN() 104 { 105 return firstN; 106 } 107 108 @Override 109 public void operate( FlowProcess flowProcess, BufferCall bufferCall ) 110 { 111 Iterator<TupleEntry> iterator = bufferCall.getArgumentsIterator(); 112 113 int count = 0; 114 115 while( count < firstN && iterator.hasNext() ) 116 { 117 bufferCall.getOutputCollector().add( iterator.next() ); 118 count++; 119 } 120 } 121 }