001/* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.operation.filter; 022 023import java.beans.ConstructorProperties; 024 025import cascading.flow.FlowProcess; 026import cascading.management.annotation.Property; 027import cascading.management.annotation.PropertyDescription; 028import cascading.management.annotation.Visibility; 029import cascading.operation.BaseOperation; 030import cascading.operation.Filter; 031import cascading.operation.FilterCall; 032import cascading.operation.OperationCall; 033 034/** 035 * Class Limit is a {@link Filter} that will limit the number of {@link cascading.tuple.Tuple} instances that it will 036 * allow to pass. 037 * <br/> 038 * Note that the limit value is roughly a suggestion. It attempts to divide the limit number by the number of concurrent 039 * tasks, but knowing the number of tasks isn't easily available from configuration information provided to individual 040 * tasks. Further, the number of records/lines available to a task may be less than the limit amount. 041 * <br/> 042 * More consistent results will be received from using {@link Sample}. 043 * 044 * @see Sample 045 */ 046public class Limit extends BaseOperation<Limit.Context> implements Filter<Limit.Context> 047 { 048 private long limit = 0; 049 050 public static class Context 051 { 052 public long limit = 0; 053 public long count = 0; 054 055 public boolean increment() 056 { 057 if( limit == count ) 058 return true; 059 060 count++; 061 062 return false; 063 } 064 } 065 066 /** 067 * Creates a new Limit class that only allows limit number of Tuple instances to pass. 068 * 069 * @param limit the number of tuples to let pass 070 */ 071 @ConstructorProperties({"limit"}) 072 public Limit( long limit ) 073 { 074 this.limit = limit; 075 } 076 077 @Property(name = "limit", visibility = Visibility.PUBLIC) 078 @PropertyDescription("The upper limit.") 079 public long getLimit() 080 { 081 return limit; 082 } 083 084 @Override 085 public void prepare( FlowProcess flowProcess, OperationCall<Context> operationCall ) 086 { 087 super.prepare( flowProcess, operationCall ); 088 089 Context context = new Context(); 090 091 operationCall.setContext( context ); 092 093 int numTasks = flowProcess.getNumProcessSlices(); 094 int taskNum = flowProcess.getCurrentSliceNum(); 095 096 context.limit = (long) Math.floor( (double) limit / (double) numTasks ); 097 098 long remainingLimit = limit % numTasks; 099 100 // evenly divide limits across tasks 101 context.limit += taskNum < remainingLimit ? 1 : 0; 102 } 103 104 @Override 105 public boolean isRemove( FlowProcess flowProcess, FilterCall<Context> filterCall ) 106 { 107 return filterCall.getContext().increment(); 108 } 109 110 @Override 111 public boolean equals( Object object ) 112 { 113 if( this == object ) 114 return true; 115 if( !( object instanceof Limit ) ) 116 return false; 117 if( !super.equals( object ) ) 118 return false; 119 120 Limit limit1 = (Limit) object; 121 122 if( limit != limit1.limit ) 123 return false; 124 125 return true; 126 } 127 128 @Override 129 public int hashCode() 130 { 131 int result = super.hashCode(); 132 result = 31 * result + (int) ( limit ^ limit >>> 32 ); 133 return result; 134 } 135 }