001    /*
002     * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.operation.aggregator;
022    
023    import java.beans.ConstructorProperties;
024    import java.util.Collection;
025    import java.util.Collections;
026    import java.util.HashSet;
027    
028    import cascading.flow.FlowProcess;
029    import cascading.operation.Aggregator;
030    import cascading.operation.AggregatorCall;
031    import cascading.operation.BaseOperation;
032    import cascading.operation.OperationCall;
033    import cascading.tuple.Fields;
034    import cascading.tuple.Tuple;
035    import cascading.tuple.TupleEntry;
036    
037    /**
038     * Class ExtremaValueBase is the base class for MaxValue and MinValue where the values are expected to by
039     * {@link Comparable} types and the {@link Comparable#compareTo(Object)} result is use for max/min
040     * comparison.
041     */
042    public abstract class ExtremaValueBase extends BaseOperation<ExtremaValueBase.Context> implements Aggregator<ExtremaValueBase.Context>
043      {
044      /** Field ignoreValues */
045      protected final Collection ignoreValues;
046    
047      protected static class Context
048        {
049        Tuple value = Tuple.size( 1 );
050    
051        public Context()
052          {
053          }
054    
055        public Context reset()
056          {
057          this.value.set( 0, null );
058    
059          return this;
060          }
061        }
062    
063      @ConstructorProperties({"fieldDeclaration"})
064      public ExtremaValueBase( Fields fieldDeclaration )
065        {
066        super( fieldDeclaration );
067        ignoreValues = null;
068        }
069    
070      @ConstructorProperties({"numArgs", "fieldDeclaration"})
071      public ExtremaValueBase( int numArgs, Fields fieldDeclaration )
072        {
073        super( numArgs, fieldDeclaration );
074        ignoreValues = null;
075    
076        if( !fieldDeclaration.isSubstitution() && fieldDeclaration.size() != 1 )
077          throw new IllegalArgumentException( "fieldDeclaration may only declare 1 field, got: " + fieldDeclaration.size() );
078        }
079    
080      @ConstructorProperties({"fieldDeclaration", "ignoreValues"})
081      protected ExtremaValueBase( Fields fieldDeclaration, Object... ignoreValues )
082        {
083        super( fieldDeclaration );
084        this.ignoreValues = new HashSet();
085        Collections.addAll( this.ignoreValues, ignoreValues );
086        }
087    
088      public Collection getIgnoreValues()
089        {
090        return Collections.unmodifiableCollection( ignoreValues );
091        }
092    
093      @Override
094      public void prepare( FlowProcess flowProcess, OperationCall<Context> operationCall )
095        {
096        operationCall.setContext( new Context() );
097        }
098    
099      @Override
100      public void start( FlowProcess flowProcess, AggregatorCall<Context> aggregatorCall )
101        {
102        aggregatorCall.getContext().reset();
103        }
104    
105      @Override
106      public void aggregate( FlowProcess flowProcess, AggregatorCall<Context> aggregatorCall )
107        {
108        TupleEntry entry = aggregatorCall.getArguments();
109        Context context = aggregatorCall.getContext();
110    
111        Object arg = entry.getObject( 0 ); // returns canonical type
112    
113        if( ignoreValues != null && ignoreValues.contains( arg ) )
114          return;
115    
116        Comparable lhs = (Comparable) context.value.getObject( 0 );
117        Comparable rhs = (Comparable) arg;
118    
119        if( lhs == null || ( rhs != null && compare( lhs, rhs ) ) )
120          context.value.set( 0, rhs );
121        }
122    
123      /**
124       * Allows subclasses to provide own comparison method.
125       *
126       * @param lhs Comparable type
127       * @param rhs Comparable type
128       * @return true if the rhs should be retained as the result value
129       */
130      protected abstract boolean compare( Comparable lhs, Comparable rhs );
131    
132      @Override
133      public void complete( FlowProcess flowProcess, AggregatorCall<Context> aggregatorCall )
134        {
135        aggregatorCall.getOutputCollector().add( getResult( aggregatorCall ) );
136        }
137    
138      protected Tuple getResult( AggregatorCall<Context> aggregatorCall )
139        {
140        return aggregatorCall.getContext().value;
141        }
142    
143      @Override
144      public boolean equals( Object object )
145        {
146        if( this == object )
147          return true;
148        if( !( object instanceof ExtremaValueBase ) )
149          return false;
150        if( !super.equals( object ) )
151          return false;
152    
153        ExtremaValueBase that = (ExtremaValueBase) object;
154    
155        if( ignoreValues != null ? !ignoreValues.equals( that.ignoreValues ) : that.ignoreValues != null )
156          return false;
157    
158        return true;
159        }
160    
161      @Override
162      public int hashCode()
163        {
164        int result = super.hashCode();
165        result = 31 * result + ( ignoreValues != null ? ignoreValues.hashCode() : 0 );
166        return result;
167        }
168      }