001 /* 002 * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.operation.aggregator; 022 023 import java.beans.ConstructorProperties; 024 import java.util.Collection; 025 import java.util.Collections; 026 import java.util.HashSet; 027 028 import cascading.flow.FlowProcess; 029 import cascading.operation.Aggregator; 030 import cascading.operation.AggregatorCall; 031 import cascading.operation.BaseOperation; 032 import cascading.operation.OperationCall; 033 import cascading.tuple.Fields; 034 import cascading.tuple.Tuple; 035 import cascading.tuple.TupleEntry; 036 037 /** 038 * Class ExtremaBase is the base class for Max and Min. The unique thing about Max and Min are that they return the original, 039 * un-coerced, argument value, though a coerced version of the argument is used for the comparison. 040 */ 041 @Deprecated 042 public abstract class ExtremaBase extends BaseOperation<ExtremaBase.Context> implements Aggregator<ExtremaBase.Context> 043 { 044 /** Field ignoreValues */ 045 protected final Collection ignoreValues; 046 047 protected static class Context 048 { 049 Number extrema; 050 Tuple value = Tuple.size( 1 ); 051 052 public Context( Number extrema ) 053 { 054 this.extrema = extrema; 055 } 056 057 public Context reset( Number extrema ) 058 { 059 this.extrema = extrema; 060 this.value.set( 0, null ); 061 062 return this; 063 } 064 } 065 066 @ConstructorProperties({"fieldDeclaration"}) 067 public ExtremaBase( Fields fieldDeclaration ) 068 { 069 super( fieldDeclaration ); 070 ignoreValues = null; 071 } 072 073 @ConstructorProperties({"numArgs", "fieldDeclaration"}) 074 public ExtremaBase( int numArgs, Fields fieldDeclaration ) 075 { 076 super( numArgs, fieldDeclaration ); 077 ignoreValues = null; 078 079 if( !fieldDeclaration.isSubstitution() && fieldDeclaration.size() != 1 ) 080 throw new IllegalArgumentException( "fieldDeclaration may only declare 1 field, got: " + fieldDeclaration.size() ); 081 } 082 083 @ConstructorProperties({"fieldDeclaration", "ignoreValues"}) 084 protected ExtremaBase( Fields fieldDeclaration, Object... ignoreValues ) 085 { 086 super( fieldDeclaration ); 087 this.ignoreValues = new HashSet(); 088 Collections.addAll( this.ignoreValues, ignoreValues ); 089 } 090 091 public Collection getIgnoreValues() 092 { 093 return Collections.unmodifiableCollection( ignoreValues ); 094 } 095 096 @Override 097 public void prepare( FlowProcess flowProcess, OperationCall<Context> operationCall ) 098 { 099 operationCall.setContext( new Context( getInitialValue() ) ); 100 } 101 102 @Override 103 public void start( FlowProcess flowProcess, AggregatorCall<Context> aggregatorCall ) 104 { 105 aggregatorCall.getContext().reset( getInitialValue() ); 106 } 107 108 protected abstract double getInitialValue(); 109 110 @Override 111 public void aggregate( FlowProcess flowProcess, AggregatorCall<Context> aggregatorCall ) 112 { 113 TupleEntry entry = aggregatorCall.getArguments(); 114 Context context = aggregatorCall.getContext(); 115 116 Object arg = entry.getObject( 0 ); 117 118 if( ignoreValues != null && ignoreValues.contains( arg ) ) 119 return; 120 121 Number rhs; 122 123 if( arg instanceof Number ) 124 rhs = (Number) arg; 125 else 126 rhs = entry.getDouble( 0 ); 127 128 Number lhs = context.extrema; 129 130 if( compare( lhs, rhs ) ) 131 { 132 context.value.set( 0, arg ); // keep and return original value 133 context.extrema = rhs; 134 } 135 } 136 137 protected abstract boolean compare( Number lhs, Number rhs ); 138 139 @Override 140 public void complete( FlowProcess flowProcess, AggregatorCall<Context> aggregatorCall ) 141 { 142 aggregatorCall.getOutputCollector().add( getResult( aggregatorCall ) ); 143 } 144 145 protected Tuple getResult( AggregatorCall<Context> aggregatorCall ) 146 { 147 return aggregatorCall.getContext().value; 148 } 149 150 @Override 151 public boolean equals( Object object ) 152 { 153 if( this == object ) 154 return true; 155 if( !( object instanceof ExtremaBase ) ) 156 return false; 157 if( !super.equals( object ) ) 158 return false; 159 160 ExtremaBase that = (ExtremaBase) object; 161 162 if( ignoreValues != null ? !ignoreValues.equals( that.ignoreValues ) : that.ignoreValues != null ) 163 return false; 164 165 return true; 166 } 167 168 @Override 169 public int hashCode() 170 { 171 int result = super.hashCode(); 172 result = 31 * result + ( ignoreValues != null ? ignoreValues.hashCode() : 0 ); 173 return result; 174 } 175 }