001 /* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.operation.aggregator; 022 023 import java.beans.ConstructorProperties; 024 import java.util.Collection; 025 import java.util.Collections; 026 import java.util.HashSet; 027 028 import cascading.flow.FlowProcess; 029 import cascading.operation.Aggregator; 030 import cascading.operation.AggregatorCall; 031 import cascading.operation.BaseOperation; 032 import cascading.operation.OperationCall; 033 import cascading.tuple.Fields; 034 import cascading.tuple.Tuple; 035 import cascading.tuple.TupleEntry; 036 037 /** 038 * Class ExtremaValueBase is the base class for MaxValue and MinValue where the values are expected to by 039 * {@link Comparable} types and the {@link Comparable#compareTo(Object)} result is use for max/min 040 * comparison. 041 */ 042 public abstract class ExtremaValueBase extends BaseOperation<ExtremaValueBase.Context> implements Aggregator<ExtremaValueBase.Context> 043 { 044 /** Field ignoreValues */ 045 protected final Collection ignoreValues; 046 047 protected static class Context 048 { 049 Tuple value = Tuple.size( 1 ); 050 051 public Context() 052 { 053 } 054 055 public Context reset() 056 { 057 this.value.set( 0, null ); 058 059 return this; 060 } 061 } 062 063 @ConstructorProperties({"fieldDeclaration"}) 064 public ExtremaValueBase( Fields fieldDeclaration ) 065 { 066 super( fieldDeclaration ); 067 ignoreValues = null; 068 } 069 070 @ConstructorProperties({"numArgs", "fieldDeclaration"}) 071 public ExtremaValueBase( int numArgs, Fields fieldDeclaration ) 072 { 073 super( numArgs, fieldDeclaration ); 074 ignoreValues = null; 075 076 if( !fieldDeclaration.isSubstitution() && fieldDeclaration.size() != 1 ) 077 throw new IllegalArgumentException( "fieldDeclaration may only declare 1 field, got: " + fieldDeclaration.size() ); 078 } 079 080 @ConstructorProperties({"fieldDeclaration", "ignoreValues"}) 081 protected ExtremaValueBase( Fields fieldDeclaration, Object... ignoreValues ) 082 { 083 super( fieldDeclaration ); 084 this.ignoreValues = new HashSet(); 085 Collections.addAll( this.ignoreValues, ignoreValues ); 086 } 087 088 public Collection getIgnoreValues() 089 { 090 return Collections.unmodifiableCollection( ignoreValues ); 091 } 092 093 @Override 094 public void prepare( FlowProcess flowProcess, OperationCall<Context> operationCall ) 095 { 096 operationCall.setContext( new Context() ); 097 } 098 099 @Override 100 public void start( FlowProcess flowProcess, AggregatorCall<Context> aggregatorCall ) 101 { 102 aggregatorCall.getContext().reset(); 103 } 104 105 @Override 106 public void aggregate( FlowProcess flowProcess, AggregatorCall<Context> aggregatorCall ) 107 { 108 TupleEntry entry = aggregatorCall.getArguments(); 109 Context context = aggregatorCall.getContext(); 110 111 Object arg = entry.getObject( 0 ); // returns canonical type 112 113 if( ignoreValues != null && ignoreValues.contains( arg ) ) 114 return; 115 116 Comparable lhs = (Comparable) context.value.getObject( 0 ); 117 Comparable rhs = (Comparable) arg; 118 119 if( lhs == null || ( rhs != null && compare( lhs, rhs ) ) ) 120 context.value.set( 0, rhs ); 121 } 122 123 /** 124 * Allows subclasses to provide own comparison method. 125 * 126 * @param lhs Comparable type 127 * @param rhs Comparable type 128 * @return true if the rhs should be retained as the result value 129 */ 130 protected abstract boolean compare( Comparable lhs, Comparable rhs ); 131 132 @Override 133 public void complete( FlowProcess flowProcess, AggregatorCall<Context> aggregatorCall ) 134 { 135 aggregatorCall.getOutputCollector().add( getResult( aggregatorCall ) ); 136 } 137 138 protected Tuple getResult( AggregatorCall<Context> aggregatorCall ) 139 { 140 return aggregatorCall.getContext().value; 141 } 142 143 @Override 144 public boolean equals( Object object ) 145 { 146 if( this == object ) 147 return true; 148 if( !( object instanceof ExtremaValueBase ) ) 149 return false; 150 if( !super.equals( object ) ) 151 return false; 152 153 ExtremaValueBase that = (ExtremaValueBase) object; 154 155 if( ignoreValues != null ? !ignoreValues.equals( that.ignoreValues ) : that.ignoreValues != null ) 156 return false; 157 158 return true; 159 } 160 161 @Override 162 public int hashCode() 163 { 164 int result = super.hashCode(); 165 result = 31 * result + ( ignoreValues != null ? ignoreValues.hashCode() : 0 ); 166 return result; 167 } 168 }