001    /*
002     * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.operation.aggregator;
022    
023    import java.beans.ConstructorProperties;
024    import java.util.Collection;
025    import java.util.Collections;
026    import java.util.HashSet;
027    
028    import cascading.flow.FlowProcess;
029    import cascading.operation.Aggregator;
030    import cascading.operation.AggregatorCall;
031    import cascading.operation.BaseOperation;
032    import cascading.operation.OperationCall;
033    import cascading.tuple.Fields;
034    import cascading.tuple.Tuple;
035    import cascading.tuple.TupleEntry;
036    
037    /**
038     * Class ExtremaBase is the base class for Max and Min. The unique thing about Max and Min are that they return the original,
039     * un-coerced, argument value, though a coerced version of the argument is used for the comparison.
040     */
041    @Deprecated
042    public abstract class ExtremaBase extends BaseOperation<ExtremaBase.Context> implements Aggregator<ExtremaBase.Context>
043      {
044      /** Field ignoreValues */
045      protected final Collection ignoreValues;
046    
047      protected static class Context
048        {
049        Number extrema;
050        Tuple value = Tuple.size( 1 );
051    
052        public Context( Number extrema )
053          {
054          this.extrema = extrema;
055          }
056    
057        public Context reset( Number extrema )
058          {
059          this.extrema = extrema;
060          this.value.set( 0, null );
061    
062          return this;
063          }
064        }
065    
066      @ConstructorProperties({"fieldDeclaration"})
067      public ExtremaBase( Fields fieldDeclaration )
068        {
069        super( fieldDeclaration );
070        ignoreValues = null;
071        }
072    
073      @ConstructorProperties({"numArgs", "fieldDeclaration"})
074      public ExtremaBase( int numArgs, Fields fieldDeclaration )
075        {
076        super( numArgs, fieldDeclaration );
077        ignoreValues = null;
078    
079        if( !fieldDeclaration.isSubstitution() && fieldDeclaration.size() != 1 )
080          throw new IllegalArgumentException( "fieldDeclaration may only declare 1 field, got: " + fieldDeclaration.size() );
081        }
082    
083      @ConstructorProperties({"fieldDeclaration", "ignoreValues"})
084      protected ExtremaBase( Fields fieldDeclaration, Object... ignoreValues )
085        {
086        super( fieldDeclaration );
087        this.ignoreValues = new HashSet();
088        Collections.addAll( this.ignoreValues, ignoreValues );
089        }
090    
091      public Collection getIgnoreValues()
092        {
093        return Collections.unmodifiableCollection( ignoreValues );
094        }
095    
096      @Override
097      public void prepare( FlowProcess flowProcess, OperationCall<Context> operationCall )
098        {
099        operationCall.setContext( new Context( getInitialValue() ) );
100        }
101    
102      @Override
103      public void start( FlowProcess flowProcess, AggregatorCall<Context> aggregatorCall )
104        {
105        aggregatorCall.getContext().reset( getInitialValue() );
106        }
107    
108      protected abstract double getInitialValue();
109    
110      @Override
111      public void aggregate( FlowProcess flowProcess, AggregatorCall<Context> aggregatorCall )
112        {
113        TupleEntry entry = aggregatorCall.getArguments();
114        Context context = aggregatorCall.getContext();
115    
116        Object arg = entry.getObject( 0 );
117    
118        if( ignoreValues != null && ignoreValues.contains( arg ) )
119          return;
120    
121        Number rhs;
122    
123        if( arg instanceof Number )
124          rhs = (Number) arg;
125        else
126          rhs = entry.getDouble( 0 );
127    
128        Number lhs = context.extrema;
129    
130        if( compare( lhs, rhs ) )
131          {
132          context.value.set( 0, arg ); // keep and return original value
133          context.extrema = rhs;
134          }
135        }
136    
137      protected abstract boolean compare( Number lhs, Number rhs );
138    
139      @Override
140      public void complete( FlowProcess flowProcess, AggregatorCall<Context> aggregatorCall )
141        {
142        aggregatorCall.getOutputCollector().add( getResult( aggregatorCall ) );
143        }
144    
145      protected Tuple getResult( AggregatorCall<Context> aggregatorCall )
146        {
147        return aggregatorCall.getContext().value;
148        }
149    
150      @Override
151      public boolean equals( Object object )
152        {
153        if( this == object )
154          return true;
155        if( !( object instanceof ExtremaBase ) )
156          return false;
157        if( !super.equals( object ) )
158          return false;
159    
160        ExtremaBase that = (ExtremaBase) object;
161    
162        if( ignoreValues != null ? !ignoreValues.equals( that.ignoreValues ) : that.ignoreValues != null )
163          return false;
164    
165        return true;
166        }
167    
168      @Override
169      public int hashCode()
170        {
171        int result = super.hashCode();
172        result = 31 * result + ( ignoreValues != null ? ignoreValues.hashCode() : 0 );
173        return result;
174        }
175      }