001/* 002 * Copyright (c) 2016-2017 Chris K Wensel <chris@wensel.net>. All Rights Reserved. 003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 004 * 005 * Project and contact information: http://www.cascading.org/ 006 * 007 * This file is part of the Cascading project. 008 * 009 * Licensed under the Apache License, Version 2.0 (the "License"); 010 * you may not use this file except in compliance with the License. 011 * You may obtain a copy of the License at 012 * 013 * http://www.apache.org/licenses/LICENSE-2.0 014 * 015 * Unless required by applicable law or agreed to in writing, software 016 * distributed under the License is distributed on an "AS IS" BASIS, 017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 018 * See the License for the specific language governing permissions and 019 * limitations under the License. 020 */ 021 022package cascading.operation.filter; 023 024import java.beans.ConstructorProperties; 025 026import cascading.flow.FlowProcess; 027import cascading.management.annotation.Property; 028import cascading.management.annotation.PropertyDescription; 029import cascading.management.annotation.Visibility; 030import cascading.operation.BaseOperation; 031import cascading.operation.Filter; 032import cascading.operation.FilterCall; 033import cascading.operation.OperationCall; 034 035/** 036 * Class Limit is a {@link Filter} that will limit the number of {@link cascading.tuple.Tuple} instances that it will 037 * allow to pass. 038 * <br> 039 * Note that the limit value is roughly a suggestion. It attempts to divide the limit number by the number of concurrent 040 * tasks, but knowing the number of tasks isn't easily available from configuration information provided to individual 041 * tasks. Further, the number of records/lines available to a task may be less than the limit amount. 042 * <br> 043 * More consistent results will be received from using {@link Sample}. 044 * 045 * @see Sample 046 */ 047public class Limit extends BaseOperation<Limit.Context> implements Filter<Limit.Context> 048 { 049 private long limit = 0; 050 051 public static class Context 052 { 053 public long limit = 0; 054 public long count = 0; 055 056 public boolean increment() 057 { 058 if( limit == count ) 059 return true; 060 061 count++; 062 063 return false; 064 } 065 } 066 067 /** 068 * Creates a new Limit class that only allows limit number of Tuple instances to pass. 069 * 070 * @param limit the number of tuples to let pass 071 */ 072 @ConstructorProperties({"limit"}) 073 public Limit( long limit ) 074 { 075 this.limit = limit; 076 } 077 078 @Property(name = "limit", visibility = Visibility.PUBLIC) 079 @PropertyDescription("The upper limit.") 080 public long getLimit() 081 { 082 return limit; 083 } 084 085 @Override 086 public void prepare( FlowProcess flowProcess, OperationCall<Context> operationCall ) 087 { 088 super.prepare( flowProcess, operationCall ); 089 090 Context context = new Context(); 091 092 operationCall.setContext( context ); 093 094 int numTasks = flowProcess.getNumProcessSlices(); 095 int taskNum = flowProcess.getCurrentSliceNum(); 096 097 context.limit = (long) Math.floor( (double) limit / (double) numTasks ); 098 099 long remainingLimit = limit % numTasks; 100 101 // evenly divide limits across tasks 102 context.limit += taskNum < remainingLimit ? 1 : 0; 103 } 104 105 @Override 106 public boolean isRemove( FlowProcess flowProcess, FilterCall<Context> filterCall ) 107 { 108 return filterCall.getContext().increment(); 109 } 110 111 @Override 112 public boolean equals( Object object ) 113 { 114 if( this == object ) 115 return true; 116 if( !( object instanceof Limit ) ) 117 return false; 118 if( !super.equals( object ) ) 119 return false; 120 121 Limit limit1 = (Limit) object; 122 123 if( limit != limit1.limit ) 124 return false; 125 126 return true; 127 } 128 129 @Override 130 public int hashCode() 131 { 132 int result = super.hashCode(); 133 result = 31 * result + (int) ( limit ^ limit >>> 32 ); 134 return result; 135 } 136 }