001/* 002 * Copyright (c) 2016-2017 Chris K Wensel <chris@wensel.net>. All Rights Reserved. 003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 004 * 005 * Project and contact information: http://www.cascading.org/ 006 * 007 * This file is part of the Cascading project. 008 * 009 * Licensed under the Apache License, Version 2.0 (the "License"); 010 * you may not use this file except in compliance with the License. 011 * You may obtain a copy of the License at 012 * 013 * http://www.apache.org/licenses/LICENSE-2.0 014 * 015 * Unless required by applicable law or agreed to in writing, software 016 * distributed under the License is distributed on an "AS IS" BASIS, 017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 018 * See the License for the specific language governing permissions and 019 * limitations under the License. 020 */ 021 022package cascading.operation.aggregator; 023 024import java.beans.ConstructorProperties; 025 026import cascading.flow.FlowProcess; 027import cascading.management.annotation.Property; 028import cascading.management.annotation.PropertyDescription; 029import cascading.management.annotation.Visibility; 030import cascading.operation.Aggregator; 031import cascading.operation.AggregatorCall; 032import cascading.tuple.Fields; 033import cascading.tuple.Tuple; 034import cascading.tuple.TupleEntry; 035 036/** 037 * Class First is an {@link Aggregator} that returns the first {@link Tuple} encountered in a grouping. 038 * <p> 039 * By default, it returns the first Tuple of {@link Fields#ARGS} found. 040 * <p> 041 * If {@code firstN} is given, Tuples with each of the first N number of Tuples encountered are returned. That is, 042 * this Aggregator will return at maximum N tuples per grouping. 043 * <p> 044 * Be sure to set the {@link cascading.pipe.GroupBy} {@code sortFields} to control which Tuples are seen first. 045 */ 046public class First extends ExtentBase 047 { 048 private final int firstN; 049 050 /** Selects and returns the first argument Tuple encountered. */ 051 public First() 052 { 053 super( Fields.ARGS ); 054 055 this.firstN = 1; 056 } 057 058 /** 059 * Selects and returns the first N argument Tuples encountered. 060 * 061 * @param firstN of type int 062 */ 063 @ConstructorProperties({"firstN"}) 064 public First( int firstN ) 065 { 066 super( Fields.ARGS ); 067 068 this.firstN = firstN; 069 } 070 071 /** 072 * Selects and returns the first argument Tuple encountered. 073 * 074 * @param fieldDeclaration of type Fields 075 */ 076 @ConstructorProperties({"fieldDeclaration"}) 077 public First( Fields fieldDeclaration ) 078 { 079 super( fieldDeclaration.size(), fieldDeclaration ); 080 081 this.firstN = 1; 082 } 083 084 /** 085 * Selects and returns the first N argument Tuples encountered. 086 * 087 * @param fieldDeclaration of type Fields 088 * @param firstN of type int 089 */ 090 @ConstructorProperties({"fieldDeclaration", "firstN"}) 091 public First( Fields fieldDeclaration, int firstN ) 092 { 093 super( fieldDeclaration.size(), fieldDeclaration ); 094 095 this.firstN = firstN; 096 } 097 098 /** 099 * Selects and returns the first argument Tuple encountered, unless the Tuple 100 * is a member of the set ignoreTuples. 101 * 102 * @param fieldDeclaration of type Fields 103 * @param ignoreTuples of type Tuple... 104 */ 105 @ConstructorProperties({"fieldDeclaration", "ignoreTuples"}) 106 public First( Fields fieldDeclaration, Tuple... ignoreTuples ) 107 { 108 super( fieldDeclaration, ignoreTuples ); 109 110 this.firstN = 1; 111 } 112 113 @Property(name = "firstN", visibility = Visibility.PUBLIC) 114 @PropertyDescription("The number of tuples to return.") 115 public int getFirstN() 116 { 117 return firstN; 118 } 119 120 protected void performOperation( Tuple[] context, TupleEntry entry ) 121 { 122 if( context[ 0 ] == null ) 123 context[ 0 ] = new Tuple(); 124 125 if( context[ 0 ].size() < firstN ) 126 context[ 0 ].add( entry.getTupleCopy() ); 127 } 128 129 @Override 130 public void complete( FlowProcess flowProcess, AggregatorCall<Tuple[]> aggregatorCall ) 131 { 132 Tuple context = aggregatorCall.getContext()[ 0 ]; 133 134 if( context == null ) 135 return; 136 137 for( Object tuple : context ) 138 aggregatorCall.getOutputCollector().add( (Tuple) tuple ); 139 } 140 }