001 /* 002 * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.pipe.assembly; 022 023 import java.beans.ConstructorProperties; 024 025 import cascading.flow.FlowProcess; 026 import cascading.operation.aggregator.MinValue; 027 import cascading.pipe.Pipe; 028 import cascading.tuple.Fields; 029 import cascading.tuple.Tuple; 030 import cascading.tuple.TupleEntry; 031 032 /** 033 * Class MinBy is used to find the minimum value in a grouping. 034 * <p/> 035 * Typically finding the min value of a field in a tuple stream relies on a {@link cascading.pipe.GroupBy} and a 036 * {@link cascading.operation.aggregator.MinValue} {@link cascading.operation.Aggregator} operation. 037 * <p/> 038 * This SubAssembly also uses the {@link cascading.pipe.assembly.MinBy.MinPartials} {@link cascading.pipe.assembly.AggregateBy.Functor} 039 * to track the minimum value before the GroupBy operator to reduce IO over the network. 040 * <p/> 041 * This strategy is similar to using {@code combiners}, except no sorting or serialization is invoked and results 042 * in a much simpler mechanism. 043 * <p/> 044 * The {@code threshold} value tells the underlying MinPartials functions how many unique key sums to accumulate 045 * in the LRU cache, before emitting the least recently used entry. 046 * <p/> 047 * By default, either the value of {@link #AGGREGATE_BY_THRESHOLD} System property or {@link AggregateBy#DEFAULT_THRESHOLD} 048 * will be used. 049 * 050 * @see cascading.pipe.assembly.AggregateBy 051 */ 052 public class MinBy extends AggregateBy 053 { 054 /** DEFAULT_THRESHOLD */ 055 @Deprecated 056 public static final int DEFAULT_THRESHOLD = 10000; 057 058 public static class MinPartials implements Functor 059 { 060 private final Fields declaredFields; 061 062 /** Constructor MinPartials creates a new MinPartials instance. */ 063 public MinPartials( Fields declaredFields ) 064 { 065 this.declaredFields = declaredFields; 066 067 if( declaredFields.size() != 1 ) 068 throw new IllegalArgumentException( "declared fields may only have one field, got: " + declaredFields ); 069 } 070 071 @Override 072 public Fields getDeclaredFields() 073 { 074 return declaredFields; 075 } 076 077 @Override 078 public Tuple aggregate( FlowProcess flowProcess, TupleEntry args, Tuple context ) 079 { 080 if( context == null ) 081 return args.getTupleCopy(); 082 else if( args.getObject( 0 ) == null ) 083 return context; 084 085 Comparable lhs = (Comparable) context.getObject( 0 ); 086 Comparable rhs = (Comparable) args.getObject( 0 ); 087 088 if( ( lhs == null ) || ( lhs.compareTo( rhs ) > 0 ) ) 089 context.set( 0, rhs ); 090 091 return context; 092 } 093 094 @Override 095 public Tuple complete( FlowProcess flowProcess, Tuple context ) 096 { 097 return context; 098 } 099 } 100 101 /** 102 * Constructor MinBy creates a new MinBy instance. Use this constructor when used with a {@link cascading.pipe.assembly.AggregateBy} 103 * instance. 104 * 105 * @param valueField of type Fields 106 * @param minField of type Fields 107 */ 108 @ConstructorProperties({"valueField", "minField"}) 109 public MinBy( Fields valueField, Fields minField ) 110 { 111 super( valueField, new MinPartials( minField ), new MinValue( minField ) ); 112 } 113 114 ////////////// 115 116 /** 117 * Constructor MinBy creates a new MinBy instance. 118 * 119 * @param pipe of type Pipe 120 * @param groupingFields of type Fields 121 * @param valueField of type Fields 122 * @param minField of type Fields 123 */ 124 @ConstructorProperties({"pipe", "groupingFields", "valueField", "minField"}) 125 public MinBy( Pipe pipe, Fields groupingFields, Fields valueField, Fields minField ) 126 { 127 this( null, pipe, groupingFields, valueField, minField, 0 ); 128 } 129 130 /** 131 * Constructor MinBy creates a new MinBy instance. 132 * 133 * @param pipe of type Pipe 134 * @param groupingFields of type Fields 135 * @param valueField of type Fields 136 * @param minField of type Fields 137 * @param threshold of type int 138 */ 139 @ConstructorProperties({"pipe", "groupingFields", "valueField", "minField", "threshold"}) 140 public MinBy( Pipe pipe, Fields groupingFields, Fields valueField, Fields minField, int threshold ) 141 { 142 this( null, pipe, groupingFields, valueField, minField, threshold ); 143 } 144 145 /** 146 * Constructor MinBy creates a new MinBy instance. 147 * 148 * @param name of type String 149 * @param pipe of type Pipe 150 * @param groupingFields of type Fields 151 * @param valueField of type Fields 152 * @param minField of type Fields 153 */ 154 @ConstructorProperties({"name", "pipe", "groupingFields", "valueField", "minField"}) 155 public MinBy( String name, Pipe pipe, Fields groupingFields, Fields valueField, Fields minField ) 156 { 157 this( name, pipe, groupingFields, valueField, minField, USE_DEFAULT_THRESHOLD ); 158 } 159 160 /** 161 * Constructor MinBy creates a new MinBy instance. 162 * 163 * @param name of type String 164 * @param pipe of type Pipe 165 * @param groupingFields of type Fields 166 * @param valueField of type Fields 167 * @param minField of type Fields 168 * @param threshold of type int 169 */ 170 @ConstructorProperties({"name", "pipe", "groupingFields", "valueField", "minField", "threshold"}) 171 public MinBy( String name, Pipe pipe, Fields groupingFields, Fields valueField, Fields minField, int threshold ) 172 { 173 this( name, Pipe.pipes( pipe ), groupingFields, valueField, minField, threshold ); 174 } 175 176 /** 177 * Constructor MinBy creates a new MinBy instance. 178 * 179 * @param pipes of type Pipe[] 180 * @param groupingFields of type Fields 181 * @param valueField of type Fields 182 * @param minField of type Fields 183 */ 184 @ConstructorProperties({"pipes", "groupingFields", "valueField", "minField"}) 185 public MinBy( Pipe[] pipes, Fields groupingFields, Fields valueField, Fields minField ) 186 { 187 this( null, pipes, groupingFields, valueField, minField, USE_DEFAULT_THRESHOLD ); 188 } 189 190 /** 191 * Constructor MinBy creates a new MinBy instance. 192 * 193 * @param pipes of type Pipe[] 194 * @param groupingFields of type Fields 195 * @param valueField of type Fields 196 * @param minField of type Fields 197 * @param threshold of type int 198 */ 199 @ConstructorProperties({"pipes", "groupingFields", "valueField", "minField", "threshold"}) 200 public MinBy( Pipe[] pipes, Fields groupingFields, Fields valueField, Fields minField, int threshold ) 201 { 202 this( null, pipes, groupingFields, valueField, minField, threshold ); 203 } 204 205 /** 206 * Constructor MinBy creates a new MinBy instance. 207 * 208 * @param name of type String 209 * @param pipes of type Pipe[] 210 * @param groupingFields of type Fields 211 * @param valueField of type Fields 212 * @param minField of type Fields 213 */ 214 @ConstructorProperties({"name", "pipes", "groupingFields", "valueField", "minField"}) 215 public MinBy( String name, Pipe[] pipes, Fields groupingFields, Fields valueField, Fields minField ) 216 { 217 this( name, pipes, groupingFields, valueField, minField, USE_DEFAULT_THRESHOLD ); 218 } 219 220 /** 221 * Constructor MinBy creates a new MinBy instance. 222 * 223 * @param name of type String 224 * @param pipes of type Pipe[] 225 * @param groupingFields of type Fields 226 * @param valueField of type Fields 227 * @param minField of type Fields 228 * @param threshold of type int 229 */ 230 @ConstructorProperties({"name", "pipes", "groupingFields", "valueField", "minField", "threshold"}) 231 public MinBy( String name, Pipe[] pipes, Fields groupingFields, Fields valueField, Fields minField, int threshold ) 232 { 233 super( name, pipes, groupingFields, valueField, new MinPartials( minField ), new MinValue( minField ), threshold ); 234 } 235 }