001/*
002 * Copyright (c) 2016-2017 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
004 *
005 * Project and contact information: http://www.cascading.org/
006 *
007 * This file is part of the Cascading project.
008 *
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *     http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 */
021
022package cascading.operation.filter;
023
024import java.beans.ConstructorProperties;
025
026import cascading.flow.FlowProcess;
027import cascading.management.annotation.Property;
028import cascading.management.annotation.PropertyDescription;
029import cascading.management.annotation.Visibility;
030import cascading.operation.BaseOperation;
031import cascading.operation.Filter;
032import cascading.operation.FilterCall;
033import cascading.operation.OperationCall;
034
035/**
036 * Class Limit is a {@link Filter} that will limit the number of {@link cascading.tuple.Tuple} instances that it will
037 * allow to pass.
038 * <br>
039 * Note that the limit value is roughly a suggestion. It attempts to divide the limit number by the number of concurrent
040 * tasks, but knowing the number of tasks isn't easily available from configuration information provided to individual
041 * tasks. Further, the number of records/lines available to a task may be less than the limit amount.
042 * <br>
043 * More consistent results will be received from using {@link Sample}.
044 *
045 * @see Sample
046 */
047public class Limit extends BaseOperation<Limit.Context> implements Filter<Limit.Context>
048  {
049  private long limit = 0;
050
051  public static class Context
052    {
053    public long limit = 0;
054    public long count = 0;
055
056    public boolean increment()
057      {
058      if( limit == count )
059        return true;
060
061      count++;
062
063      return false;
064      }
065    }
066
067  /**
068   * Creates a new Limit class that only allows limit number of Tuple instances to pass.
069   *
070   * @param limit the number of tuples to let pass
071   */
072  @ConstructorProperties({"limit"})
073  public Limit( long limit )
074    {
075    this.limit = limit;
076    }
077
078  @Property(name = "limit", visibility = Visibility.PUBLIC)
079  @PropertyDescription("The upper limit.")
080  public long getLimit()
081    {
082    return limit;
083    }
084
085  @Override
086  public void prepare( FlowProcess flowProcess, OperationCall<Context> operationCall )
087    {
088    super.prepare( flowProcess, operationCall );
089
090    Context context = new Context();
091
092    operationCall.setContext( context );
093
094    int numTasks = flowProcess.getNumProcessSlices();
095    int taskNum = flowProcess.getCurrentSliceNum();
096
097    context.limit = (long) Math.floor( (double) limit / (double) numTasks );
098
099    long remainingLimit = limit % numTasks;
100
101    // evenly divide limits across tasks
102    context.limit += taskNum < remainingLimit ? 1 : 0;
103    }
104
105  @Override
106  public boolean isRemove( FlowProcess flowProcess, FilterCall<Context> filterCall )
107    {
108    return filterCall.getContext().increment();
109    }
110
111  @Override
112  public boolean equals( Object object )
113    {
114    if( this == object )
115      return true;
116    if( !( object instanceof Limit ) )
117      return false;
118    if( !super.equals( object ) )
119      return false;
120
121    Limit limit1 = (Limit) object;
122
123    if( limit != limit1.limit )
124      return false;
125
126    return true;
127    }
128
129  @Override
130  public int hashCode()
131    {
132    int result = super.hashCode();
133    result = 31 * result + (int) ( limit ^ limit >>> 32 );
134    return result;
135    }
136  }