001/*
002 * Copyright (c) 2016-2017 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
004 *
005 * Project and contact information: http://www.cascading.org/
006 *
007 * This file is part of the Cascading project.
008 *
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *     http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 */
021
022package cascading.operation.filter;
023
024import java.beans.ConstructorProperties;
025import java.util.Random;
026
027import cascading.flow.FlowProcess;
028import cascading.management.annotation.Property;
029import cascading.management.annotation.PropertyDescription;
030import cascading.management.annotation.Visibility;
031import cascading.operation.BaseOperation;
032import cascading.operation.Filter;
033import cascading.operation.FilterCall;
034import cascading.operation.OperationCall;
035
036/**
037 * Class Sample is a {@link Filter} that only allows the given fraction of {@link cascading.tuple.Tuple} instances to pass.
038 * <p>
039 * Where fraction is between 1 and zero, inclusive. Thus to sample {@code 50%} of the tuples in a stream, use the
040 * fraction {@code 0.5}.
041 * <p>
042 * By default, the seed is created at random on the constructor. This implies every branch using the Sample
043 * filter will return the same random stream based on that seed. So if this Sample instance is distributed
044 * into multiple systems against the same data, the result will be the same tuple stream. The alternative
045 * would be to make this Operation "not safe". See {@link cascading.operation.Operation#isSafe()}.
046 * <p>
047 * Conversely, if the same stream of random data is require across application executions, set the seed manually.
048 * <p>
049 * The seed is generated from the following code:
050 * <p>
051 * {@code System.identityHashCode(this) * 2654435761L ^ System.currentTimeMillis()}
052 * <p>
053 * Override {@link #makeSeed()} to customize.
054 */
055public class Sample extends BaseOperation<Random> implements Filter<Random>
056  {
057  private long seed = 0;
058  private double fraction = 1.0d;
059
060  /**
061   * Creates a new Sample that permits percent Tuples to pass.
062   *
063   * @param fraction of type double
064   */
065  @ConstructorProperties({"fraction"})
066  public Sample( double fraction )
067    {
068    this.seed = makeSeed();
069    this.fraction = fraction;
070    }
071
072  /**
073   * Creates a new Sample that permits percent Tuples to pass. The given seed value seeds the random number generator.
074   *
075   * @param seed     of type long
076   * @param fraction of type double
077   */
078  @ConstructorProperties({"seed", "fraction"})
079  public Sample( long seed, double fraction )
080    {
081    this.seed = seed;
082    this.fraction = fraction;
083    }
084
085  @Property(name = "seed", visibility = Visibility.PUBLIC)
086  @PropertyDescription("The randomization seed.")
087  public long getSeed()
088    {
089    return seed;
090    }
091
092  @Property(name = "fraction", visibility = Visibility.PUBLIC)
093  @PropertyDescription("The fraction of tuples to be returned.")
094  public double getFraction()
095    {
096    return fraction;
097    }
098
099  protected long makeSeed()
100    {
101    return System.identityHashCode( this ) * 2654435761L ^ System.currentTimeMillis();
102    }
103
104  @Override
105  public void prepare( FlowProcess flowProcess, OperationCall<Random> operationCall )
106    {
107    super.prepare( flowProcess, operationCall );
108
109    operationCall.setContext( new Random( seed ) );
110    }
111
112  @Override
113  public boolean isRemove( FlowProcess flowProcess, FilterCall<Random> filterCall )
114    {
115    return !( filterCall.getContext().nextDouble() < fraction );
116    }
117
118  @Override
119  public boolean equals( Object object )
120    {
121    if( this == object )
122      return true;
123    if( !( object instanceof Sample ) )
124      return false;
125    if( !super.equals( object ) )
126      return false;
127
128    Sample sample = (Sample) object;
129
130    if( Double.compare( sample.fraction, fraction ) != 0 )
131      return false;
132    if( seed != sample.seed )
133      return false;
134
135    return true;
136    }
137
138  @Override
139  public int hashCode()
140    {
141    int result = super.hashCode();
142    long temp;
143    result = 31 * result + (int) ( seed ^ seed >>> 32 );
144    temp = fraction != +0.0d ? Double.doubleToLongBits( fraction ) : 0L;
145    result = 31 * result + (int) ( temp ^ temp >>> 32 );
146    return result;
147    }
148  }