001/* 002 * Copyright (c) 2016-2017 Chris K Wensel <chris@wensel.net>. All Rights Reserved. 003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 004 * 005 * Project and contact information: http://www.cascading.org/ 006 * 007 * This file is part of the Cascading project. 008 * 009 * Licensed under the Apache License, Version 2.0 (the "License"); 010 * you may not use this file except in compliance with the License. 011 * You may obtain a copy of the License at 012 * 013 * http://www.apache.org/licenses/LICENSE-2.0 014 * 015 * Unless required by applicable law or agreed to in writing, software 016 * distributed under the License is distributed on an "AS IS" BASIS, 017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 018 * See the License for the specific language governing permissions and 019 * limitations under the License. 020 */ 021 022package cascading.pipe.assembly; 023 024import java.beans.ConstructorProperties; 025 026import cascading.flow.FlowProcess; 027import cascading.operation.aggregator.MinValue; 028import cascading.pipe.Pipe; 029import cascading.tuple.Fields; 030import cascading.tuple.Tuple; 031import cascading.tuple.TupleEntry; 032 033/** 034 * Class MinBy is used to find the minimum value in a grouping. 035 * <p> 036 * Typically finding the min value of a field in a tuple stream relies on a {@link cascading.pipe.GroupBy} and a 037 * {@link cascading.operation.aggregator.MinValue} {@link cascading.operation.Aggregator} operation. 038 * <p> 039 * This SubAssembly also uses the {@link cascading.pipe.assembly.MinBy.MinPartials} {@link cascading.pipe.assembly.AggregateBy.Functor} 040 * to track the minimum value before the GroupBy operator to reduce IO over the network. 041 * <p> 042 * This strategy is similar to using {@code combiners}, except no sorting or serialization is invoked and results 043 * in a much simpler mechanism. 044 * <p> 045 * The {@code threshold} value tells the underlying MinPartials functions how many unique key sums to accumulate 046 * in the LRU cache, before emitting the least recently used entry. This accumulation happens map-side, and thus is 047 * bounded by the size of your map task JVM and the typical size of each group key. 048 * <p> 049 * By default, either the value of {@link cascading.pipe.assembly.AggregateByProps#AGGREGATE_BY_CAPACITY} System property 050 * or {@link cascading.pipe.assembly.AggregateByProps#AGGREGATE_BY_DEFAULT_CAPACITY} will be used. 051 * 052 * @see cascading.pipe.assembly.AggregateBy 053 */ 054public class MinBy extends AggregateBy 055 { 056 public static class MinPartials implements Functor 057 { 058 private final Fields declaredFields; 059 060 /** Constructor MinPartials creates a new MinPartials instance. */ 061 public MinPartials( Fields declaredFields ) 062 { 063 this.declaredFields = declaredFields; 064 065 if( declaredFields.size() != 1 ) 066 throw new IllegalArgumentException( "declared fields may only have one field, got: " + declaredFields ); 067 } 068 069 @Override 070 public Fields getDeclaredFields() 071 { 072 return declaredFields; 073 } 074 075 @Override 076 public Tuple aggregate( FlowProcess flowProcess, TupleEntry args, Tuple context ) 077 { 078 if( context == null ) 079 return args.getTupleCopy(); 080 else if( args.getObject( 0 ) == null ) 081 return context; 082 083 Comparable lhs = (Comparable) context.getObject( 0 ); 084 Comparable rhs = (Comparable) args.getObject( 0 ); 085 086 if( ( lhs == null ) || ( lhs.compareTo( rhs ) > 0 ) ) 087 context.set( 0, rhs ); 088 089 return context; 090 } 091 092 @Override 093 public Tuple complete( FlowProcess flowProcess, Tuple context ) 094 { 095 return context; 096 } 097 } 098 099 /** 100 * Constructor MinBy creates a new MinBy instance. Use this constructor when used with a {@link cascading.pipe.assembly.AggregateBy} 101 * instance. 102 * 103 * @param valueField of type Fields 104 * @param minField of type Fields 105 */ 106 @ConstructorProperties({"valueField", "minField"}) 107 public MinBy( Fields valueField, Fields minField ) 108 { 109 super( valueField, new MinPartials( minField ), new MinValue( minField ) ); 110 } 111 112 ////////////// 113 114 /** 115 * Constructor MinBy creates a new MinBy instance. 116 * 117 * @param pipe of type Pipe 118 * @param groupingFields of type Fields 119 * @param valueField of type Fields 120 * @param minField of type Fields 121 */ 122 @ConstructorProperties({"pipe", "groupingFields", "valueField", "minField"}) 123 public MinBy( Pipe pipe, Fields groupingFields, Fields valueField, Fields minField ) 124 { 125 this( null, pipe, groupingFields, valueField, minField, 0 ); 126 } 127 128 /** 129 * Constructor MinBy creates a new MinBy instance. 130 * 131 * @param pipe of type Pipe 132 * @param groupingFields of type Fields 133 * @param valueField of type Fields 134 * @param minField of type Fields 135 * @param threshold of type int 136 */ 137 @ConstructorProperties({"pipe", "groupingFields", "valueField", "minField", "threshold"}) 138 public MinBy( Pipe pipe, Fields groupingFields, Fields valueField, Fields minField, int threshold ) 139 { 140 this( null, pipe, groupingFields, valueField, minField, threshold ); 141 } 142 143 /** 144 * Constructor MinBy creates a new MinBy instance. 145 * 146 * @param name of type String 147 * @param pipe of type Pipe 148 * @param groupingFields of type Fields 149 * @param valueField of type Fields 150 * @param minField of type Fields 151 */ 152 @ConstructorProperties({"name", "pipe", "groupingFields", "valueField", "minField"}) 153 public MinBy( String name, Pipe pipe, Fields groupingFields, Fields valueField, Fields minField ) 154 { 155 this( name, pipe, groupingFields, valueField, minField, USE_DEFAULT_THRESHOLD ); 156 } 157 158 /** 159 * Constructor MinBy creates a new MinBy instance. 160 * 161 * @param name of type String 162 * @param pipe of type Pipe 163 * @param groupingFields of type Fields 164 * @param valueField of type Fields 165 * @param minField of type Fields 166 * @param threshold of type int 167 */ 168 @ConstructorProperties({"name", "pipe", "groupingFields", "valueField", "minField", "threshold"}) 169 public MinBy( String name, Pipe pipe, Fields groupingFields, Fields valueField, Fields minField, int threshold ) 170 { 171 this( name, Pipe.pipes( pipe ), groupingFields, valueField, minField, threshold ); 172 } 173 174 /** 175 * Constructor MinBy creates a new MinBy instance. 176 * 177 * @param pipes of type Pipe[] 178 * @param groupingFields of type Fields 179 * @param valueField of type Fields 180 * @param minField of type Fields 181 */ 182 @ConstructorProperties({"pipes", "groupingFields", "valueField", "minField"}) 183 public MinBy( Pipe[] pipes, Fields groupingFields, Fields valueField, Fields minField ) 184 { 185 this( null, pipes, groupingFields, valueField, minField, USE_DEFAULT_THRESHOLD ); 186 } 187 188 /** 189 * Constructor MinBy creates a new MinBy instance. 190 * 191 * @param pipes of type Pipe[] 192 * @param groupingFields of type Fields 193 * @param valueField of type Fields 194 * @param minField of type Fields 195 * @param threshold of type int 196 */ 197 @ConstructorProperties({"pipes", "groupingFields", "valueField", "minField", "threshold"}) 198 public MinBy( Pipe[] pipes, Fields groupingFields, Fields valueField, Fields minField, int threshold ) 199 { 200 this( null, pipes, groupingFields, valueField, minField, threshold ); 201 } 202 203 /** 204 * Constructor MinBy creates a new MinBy instance. 205 * 206 * @param name of type String 207 * @param pipes of type Pipe[] 208 * @param groupingFields of type Fields 209 * @param valueField of type Fields 210 * @param minField of type Fields 211 */ 212 @ConstructorProperties({"name", "pipes", "groupingFields", "valueField", "minField"}) 213 public MinBy( String name, Pipe[] pipes, Fields groupingFields, Fields valueField, Fields minField ) 214 { 215 this( name, pipes, groupingFields, valueField, minField, USE_DEFAULT_THRESHOLD ); 216 } 217 218 /** 219 * Constructor MinBy creates a new MinBy instance. 220 * 221 * @param name of type String 222 * @param pipes of type Pipe[] 223 * @param groupingFields of type Fields 224 * @param valueField of type Fields 225 * @param minField of type Fields 226 * @param threshold of type int 227 */ 228 @ConstructorProperties({"name", "pipes", "groupingFields", "valueField", "minField", "threshold"}) 229 public MinBy( String name, Pipe[] pipes, Fields groupingFields, Fields valueField, Fields minField, int threshold ) 230 { 231 super( name, pipes, groupingFields, valueField, new MinPartials( minField ), new MinValue( minField ), threshold ); 232 } 233 }