001    /*
002     * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.operation.filter;
022    
023    import java.beans.ConstructorProperties;
024    import java.util.Random;
025    
026    import cascading.flow.FlowProcess;
027    import cascading.management.annotation.Property;
028    import cascading.management.annotation.PropertyDescription;
029    import cascading.management.annotation.Visibility;
030    import cascading.operation.BaseOperation;
031    import cascading.operation.Filter;
032    import cascading.operation.FilterCall;
033    import cascading.operation.OperationCall;
034    
035    /**
036     * Class Sample is a {@link Filter} that only allows the given fraction of {@link cascading.tuple.Tuple} instances to pass.
037     * <p/>
038     * Where fraction is between 1 and zero, inclusive. Thus to sample {@code 50%} of the tuples in a stream, use the
039     * fraction {@code 0.5}.
040     * <p/>
041     * By default, the seed is created at random on the constructor. This implies every branch using the Sample
042     * filter will return the same random stream based on that seed. So if this Sample instance is distributed
043     * into multiple systems against the same data, the result will be the same tuple stream. The alternative
044     * would be to make this Operation "not safe". See {@link cascading.operation.Operation#isSafe()}.
045     * <p/>
046     * Conversely, if the same stream of random data is require across application executions, set the seed manually.
047     * <p/>
048     * The seed is generated from the following code:
049     * <p/>
050     * {@code System.identityHashCode(this) * 2654435761L ^ System.currentTimeMillis()}
051     * <p/>
052     * Override {@link #makeSeed()} to customize.
053     */
054    public class Sample extends BaseOperation<Random> implements Filter<Random>
055      {
056      private long seed = 0;
057      private double fraction = 1.0d;
058    
059      /**
060       * Creates a new Sample that permits percent Tuples to pass.
061       *
062       * @param fraction of type double
063       */
064      @ConstructorProperties({"fraction"})
065      public Sample( double fraction )
066        {
067        this.seed = makeSeed();
068        this.fraction = fraction;
069        }
070    
071      /**
072       * Creates a new Sample that permits percent Tuples to pass. The given seed value seeds the random number generator.
073       *
074       * @param seed     of type long
075       * @param fraction of type double
076       */
077      @ConstructorProperties({"seed", "fraction"})
078      public Sample( long seed, double fraction )
079        {
080        this.seed = seed;
081        this.fraction = fraction;
082        }
083    
084      @Property(name = "seed", visibility = Visibility.PUBLIC)
085      @PropertyDescription("The randomization seed.")
086      public long getSeed()
087        {
088        return seed;
089        }
090    
091      @Property(name = "fraction", visibility = Visibility.PUBLIC)
092      @PropertyDescription("The fraction of tuples to be returned.")
093      public double getFraction()
094        {
095        return fraction;
096        }
097    
098      protected long makeSeed()
099        {
100        return System.identityHashCode( this ) * 2654435761L ^ System.currentTimeMillis();
101        }
102    
103      @Override
104      public void prepare( FlowProcess flowProcess, OperationCall<Random> operationCall )
105        {
106        super.prepare( flowProcess, operationCall );
107    
108        operationCall.setContext( new Random( seed ) );
109        }
110    
111      @Override
112      public boolean isRemove( FlowProcess flowProcess, FilterCall<Random> filterCall )
113        {
114        return !( filterCall.getContext().nextDouble() < fraction );
115        }
116    
117      @Override
118      public boolean equals( Object object )
119        {
120        if( this == object )
121          return true;
122        if( !( object instanceof Sample ) )
123          return false;
124        if( !super.equals( object ) )
125          return false;
126    
127        Sample sample = (Sample) object;
128    
129        if( Double.compare( sample.fraction, fraction ) != 0 )
130          return false;
131        if( seed != sample.seed )
132          return false;
133    
134        return true;
135        }
136    
137      @Override
138      public int hashCode()
139        {
140        int result = super.hashCode();
141        long temp;
142        result = 31 * result + (int) ( seed ^ seed >>> 32 );
143        temp = fraction != +0.0d ? Double.doubleToLongBits( fraction ) : 0L;
144        result = 31 * result + (int) ( temp ^ temp >>> 32 );
145        return result;
146        }
147      }