001 /* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.pipe.assembly; 022 023 import java.beans.ConstructorProperties; 024 025 import cascading.flow.FlowProcess; 026 import cascading.operation.aggregator.MinValue; 027 import cascading.pipe.Pipe; 028 import cascading.tuple.Fields; 029 import cascading.tuple.Tuple; 030 import cascading.tuple.TupleEntry; 031 032 /** 033 * Class MinBy is used to find the minimum value in a grouping. 034 * <p/> 035 * Typically finding the min value of a field in a tuple stream relies on a {@link cascading.pipe.GroupBy} and a 036 * {@link cascading.operation.aggregator.MinValue} {@link cascading.operation.Aggregator} operation. 037 * <p/> 038 * This SubAssembly also uses the {@link cascading.pipe.assembly.MinBy.MinPartials} {@link cascading.pipe.assembly.AggregateBy.Functor} 039 * to track the minimum value before the GroupBy operator to reduce IO over the network. 040 * <p/> 041 * This strategy is similar to using {@code combiners}, except no sorting or serialization is invoked and results 042 * in a much simpler mechanism. 043 * <p/> 044 * The {@code threshold} value tells the underlying MinPartials functions how many unique key sums to accumulate 045 * in the LRU cache, before emitting the least recently used entry. This accumulation happens map-side, and thus is 046 * bounded by the size of your map task JVM and the typical size of each group key. 047 * <p/> 048 * By default, either the value of {@link cascading.pipe.assembly.AggregateByProps#AGGREGATE_BY_CAPACITY} System property 049 * or {@link cascading.pipe.assembly.AggregateByProps#AGGREGATE_BY_DEFAULT_CAPACITY} will be used. 050 * 051 * @see cascading.pipe.assembly.AggregateBy 052 */ 053 public class MinBy extends AggregateBy 054 { 055 /** DEFAULT_THRESHOLD */ 056 @Deprecated 057 public static final int DEFAULT_THRESHOLD = 10000; 058 059 public static class MinPartials implements Functor 060 { 061 private final Fields declaredFields; 062 063 /** Constructor MinPartials creates a new MinPartials instance. */ 064 public MinPartials( Fields declaredFields ) 065 { 066 this.declaredFields = declaredFields; 067 068 if( declaredFields.size() != 1 ) 069 throw new IllegalArgumentException( "declared fields may only have one field, got: " + declaredFields ); 070 } 071 072 @Override 073 public Fields getDeclaredFields() 074 { 075 return declaredFields; 076 } 077 078 @Override 079 public Tuple aggregate( FlowProcess flowProcess, TupleEntry args, Tuple context ) 080 { 081 if( context == null ) 082 return args.getTupleCopy(); 083 else if( args.getObject( 0 ) == null ) 084 return context; 085 086 Comparable lhs = (Comparable) context.getObject( 0 ); 087 Comparable rhs = (Comparable) args.getObject( 0 ); 088 089 if( ( lhs == null ) || ( lhs.compareTo( rhs ) > 0 ) ) 090 context.set( 0, rhs ); 091 092 return context; 093 } 094 095 @Override 096 public Tuple complete( FlowProcess flowProcess, Tuple context ) 097 { 098 return context; 099 } 100 } 101 102 /** 103 * Constructor MinBy creates a new MinBy instance. Use this constructor when used with a {@link cascading.pipe.assembly.AggregateBy} 104 * instance. 105 * 106 * @param valueField of type Fields 107 * @param minField of type Fields 108 */ 109 @ConstructorProperties({"valueField", "minField"}) 110 public MinBy( Fields valueField, Fields minField ) 111 { 112 super( valueField, new MinPartials( minField ), new MinValue( minField ) ); 113 } 114 115 ////////////// 116 117 /** 118 * Constructor MinBy creates a new MinBy instance. 119 * 120 * @param pipe of type Pipe 121 * @param groupingFields of type Fields 122 * @param valueField of type Fields 123 * @param minField of type Fields 124 */ 125 @ConstructorProperties({"pipe", "groupingFields", "valueField", "minField"}) 126 public MinBy( Pipe pipe, Fields groupingFields, Fields valueField, Fields minField ) 127 { 128 this( null, pipe, groupingFields, valueField, minField, 0 ); 129 } 130 131 /** 132 * Constructor MinBy creates a new MinBy instance. 133 * 134 * @param pipe of type Pipe 135 * @param groupingFields of type Fields 136 * @param valueField of type Fields 137 * @param minField of type Fields 138 * @param threshold of type int 139 */ 140 @ConstructorProperties({"pipe", "groupingFields", "valueField", "minField", "threshold"}) 141 public MinBy( Pipe pipe, Fields groupingFields, Fields valueField, Fields minField, int threshold ) 142 { 143 this( null, pipe, groupingFields, valueField, minField, threshold ); 144 } 145 146 /** 147 * Constructor MinBy creates a new MinBy instance. 148 * 149 * @param name of type String 150 * @param pipe of type Pipe 151 * @param groupingFields of type Fields 152 * @param valueField of type Fields 153 * @param minField of type Fields 154 */ 155 @ConstructorProperties({"name", "pipe", "groupingFields", "valueField", "minField"}) 156 public MinBy( String name, Pipe pipe, Fields groupingFields, Fields valueField, Fields minField ) 157 { 158 this( name, pipe, groupingFields, valueField, minField, USE_DEFAULT_THRESHOLD ); 159 } 160 161 /** 162 * Constructor MinBy creates a new MinBy instance. 163 * 164 * @param name of type String 165 * @param pipe of type Pipe 166 * @param groupingFields of type Fields 167 * @param valueField of type Fields 168 * @param minField of type Fields 169 * @param threshold of type int 170 */ 171 @ConstructorProperties({"name", "pipe", "groupingFields", "valueField", "minField", "threshold"}) 172 public MinBy( String name, Pipe pipe, Fields groupingFields, Fields valueField, Fields minField, int threshold ) 173 { 174 this( name, Pipe.pipes( pipe ), groupingFields, valueField, minField, threshold ); 175 } 176 177 /** 178 * Constructor MinBy creates a new MinBy instance. 179 * 180 * @param pipes of type Pipe[] 181 * @param groupingFields of type Fields 182 * @param valueField of type Fields 183 * @param minField of type Fields 184 */ 185 @ConstructorProperties({"pipes", "groupingFields", "valueField", "minField"}) 186 public MinBy( Pipe[] pipes, Fields groupingFields, Fields valueField, Fields minField ) 187 { 188 this( null, pipes, groupingFields, valueField, minField, USE_DEFAULT_THRESHOLD ); 189 } 190 191 /** 192 * Constructor MinBy creates a new MinBy instance. 193 * 194 * @param pipes of type Pipe[] 195 * @param groupingFields of type Fields 196 * @param valueField of type Fields 197 * @param minField of type Fields 198 * @param threshold of type int 199 */ 200 @ConstructorProperties({"pipes", "groupingFields", "valueField", "minField", "threshold"}) 201 public MinBy( Pipe[] pipes, Fields groupingFields, Fields valueField, Fields minField, int threshold ) 202 { 203 this( null, pipes, groupingFields, valueField, minField, threshold ); 204 } 205 206 /** 207 * Constructor MinBy creates a new MinBy instance. 208 * 209 * @param name of type String 210 * @param pipes of type Pipe[] 211 * @param groupingFields of type Fields 212 * @param valueField of type Fields 213 * @param minField of type Fields 214 */ 215 @ConstructorProperties({"name", "pipes", "groupingFields", "valueField", "minField"}) 216 public MinBy( String name, Pipe[] pipes, Fields groupingFields, Fields valueField, Fields minField ) 217 { 218 this( name, pipes, groupingFields, valueField, minField, USE_DEFAULT_THRESHOLD ); 219 } 220 221 /** 222 * Constructor MinBy creates a new MinBy instance. 223 * 224 * @param name of type String 225 * @param pipes of type Pipe[] 226 * @param groupingFields of type Fields 227 * @param valueField of type Fields 228 * @param minField of type Fields 229 * @param threshold of type int 230 */ 231 @ConstructorProperties({"name", "pipes", "groupingFields", "valueField", "minField", "threshold"}) 232 public MinBy( String name, Pipe[] pipes, Fields groupingFields, Fields valueField, Fields minField, int threshold ) 233 { 234 super( name, pipes, groupingFields, valueField, new MinPartials( minField ), new MinValue( minField ), threshold ); 235 } 236 }