001/*
002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.operation.filter;
022
023import java.beans.ConstructorProperties;
024import java.util.Random;
025
026import cascading.flow.FlowProcess;
027import cascading.management.annotation.Property;
028import cascading.management.annotation.PropertyDescription;
029import cascading.management.annotation.Visibility;
030import cascading.operation.BaseOperation;
031import cascading.operation.Filter;
032import cascading.operation.FilterCall;
033import cascading.operation.OperationCall;
034
035/**
036 * Class Sample is a {@link Filter} that only allows the given fraction of {@link cascading.tuple.Tuple} instances to pass.
037 * <p/>
038 * Where fraction is between 1 and zero, inclusive. Thus to sample {@code 50%} of the tuples in a stream, use the
039 * fraction {@code 0.5}.
040 * <p/>
041 * By default, the seed is created at random on the constructor. This implies every branch using the Sample
042 * filter will return the same random stream based on that seed. So if this Sample instance is distributed
043 * into multiple systems against the same data, the result will be the same tuple stream. The alternative
044 * would be to make this Operation "not safe". See {@link cascading.operation.Operation#isSafe()}.
045 * <p/>
046 * Conversely, if the same stream of random data is require across application executions, set the seed manually.
047 * <p/>
048 * The seed is generated from the following code:
049 * <p/>
050 * {@code System.identityHashCode(this) * 2654435761L ^ System.currentTimeMillis()}
051 * <p/>
052 * Override {@link #makeSeed()} to customize.
053 */
054public class Sample extends BaseOperation<Random> implements Filter<Random>
055  {
056  private long seed = 0;
057  private double fraction = 1.0d;
058
059  /**
060   * Creates a new Sample that permits percent Tuples to pass.
061   *
062   * @param fraction of type double
063   */
064  @ConstructorProperties({"fraction"})
065  public Sample( double fraction )
066    {
067    this.seed = makeSeed();
068    this.fraction = fraction;
069    }
070
071  /**
072   * Creates a new Sample that permits percent Tuples to pass. The given seed value seeds the random number generator.
073   *
074   * @param seed     of type long
075   * @param fraction of type double
076   */
077  @ConstructorProperties({"seed", "fraction"})
078  public Sample( long seed, double fraction )
079    {
080    this.seed = seed;
081    this.fraction = fraction;
082    }
083
084  @Property(name = "seed", visibility = Visibility.PUBLIC)
085  @PropertyDescription("The randomization seed.")
086  public long getSeed()
087    {
088    return seed;
089    }
090
091  @Property(name = "fraction", visibility = Visibility.PUBLIC)
092  @PropertyDescription("The fraction of tuples to be returned.")
093  public double getFraction()
094    {
095    return fraction;
096    }
097
098  protected long makeSeed()
099    {
100    return System.identityHashCode( this ) * 2654435761L ^ System.currentTimeMillis();
101    }
102
103  @Override
104  public void prepare( FlowProcess flowProcess, OperationCall<Random> operationCall )
105    {
106    super.prepare( flowProcess, operationCall );
107
108    operationCall.setContext( new Random( seed ) );
109    }
110
111  @Override
112  public boolean isRemove( FlowProcess flowProcess, FilterCall<Random> filterCall )
113    {
114    return !( filterCall.getContext().nextDouble() < fraction );
115    }
116
117  @Override
118  public boolean equals( Object object )
119    {
120    if( this == object )
121      return true;
122    if( !( object instanceof Sample ) )
123      return false;
124    if( !super.equals( object ) )
125      return false;
126
127    Sample sample = (Sample) object;
128
129    if( Double.compare( sample.fraction, fraction ) != 0 )
130      return false;
131    if( seed != sample.seed )
132      return false;
133
134    return true;
135    }
136
137  @Override
138  public int hashCode()
139    {
140    int result = super.hashCode();
141    long temp;
142    result = 31 * result + (int) ( seed ^ seed >>> 32 );
143    temp = fraction != +0.0d ? Double.doubleToLongBits( fraction ) : 0L;
144    result = 31 * result + (int) ( temp ^ temp >>> 32 );
145    return result;
146    }
147  }