001/* 002 * Copyright (c) 2016-2017 Chris K Wensel <chris@wensel.net>. All Rights Reserved. 003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 004 * 005 * Project and contact information: http://www.cascading.org/ 006 * 007 * This file is part of the Cascading project. 008 * 009 * Licensed under the Apache License, Version 2.0 (the "License"); 010 * you may not use this file except in compliance with the License. 011 * You may obtain a copy of the License at 012 * 013 * http://www.apache.org/licenses/LICENSE-2.0 014 * 015 * Unless required by applicable law or agreed to in writing, software 016 * distributed under the License is distributed on an "AS IS" BASIS, 017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 018 * See the License for the specific language governing permissions and 019 * limitations under the License. 020 */ 021 022package cascading.pipe.assembly; 023 024import java.beans.ConstructorProperties; 025 026import cascading.flow.FlowProcess; 027import cascading.operation.aggregator.First; 028import cascading.pipe.Pipe; 029import cascading.tuple.Fields; 030import cascading.tuple.Tuple; 031import cascading.tuple.TupleEntry; 032 033/** 034 * Class FirstBy is used to return the first encountered Tuple in a tuple stream grouping. 035 * <p> 036 * Typically finding the first Tuple in a tuple stream grouping relies on a {@link cascading.pipe.GroupBy} and a 037 * {@link cascading.operation.aggregator.First} {@link cascading.operation.Aggregator} operation. 038 * <p> 039 * If the {@code firstFields} argument has custom {@link java.util.Comparator} instances, they will be used 040 * as the GroupBy {@code sortFields}. 041 * <p> 042 * This SubAssembly also uses the {@link cascading.pipe.assembly.FirstBy.FirstPartials} 043 * {@link cascading.pipe.assembly.AggregateBy.Functor} 044 * to collect field values before the GroupBy operator to reduce IO over the network. 045 * <p> 046 * This strategy is similar to using {@code combiners}, except no sorting or serialization is invoked and results 047 * in a much simpler mechanism. 048 * <p> 049 * The {@code threshold} value tells the underlying FirstPartials functions how many unique key counts to accumulate 050 * in the LRU cache, before emitting the least recently used entry. This accumulation happens map-side, and thus is 051 * bounded by the size of your map task JVM and the typical size of each group key. 052 * <p> 053 * By default, either the value of {@link cascading.pipe.assembly.AggregateByProps#AGGREGATE_BY_CAPACITY} System property 054 * or {@link cascading.pipe.assembly.AggregateByProps#AGGREGATE_BY_DEFAULT_CAPACITY} will be used. 055 * 056 * @see AggregateBy 057 */ 058public class FirstBy extends AggregateBy 059 { 060 /** 061 * Class CountPartials is a {@link cascading.pipe.assembly.AggregateBy.Functor} that is used to count observed duplicates from the tuple stream. 062 * <p> 063 * Use this class typically in tandem with a {@link cascading.operation.aggregator.Sum} 064 * {@link cascading.operation.Aggregator} in order to improve counting performance by removing as many values 065 * as possible before the intermediate {@link cascading.pipe.GroupBy} operator. 066 * 067 * @see cascading.pipe.assembly.FirstBy 068 */ 069 public static class FirstPartials implements Functor 070 { 071 private final Fields declaredFields; 072 private Boolean doComparison; 073 074 /** 075 * Constructor FirstPartials creates a new FirstPartials instance. 076 * 077 * @param declaredFields of type Fields 078 */ 079 public FirstPartials( Fields declaredFields ) 080 { 081 this.declaredFields = declaredFields; 082 } 083 084 @Override 085 public Fields getDeclaredFields() 086 { 087 return declaredFields; 088 } 089 090 @Override 091 public Tuple aggregate( FlowProcess flowProcess, TupleEntry args, Tuple context ) 092 { 093 if( doComparison == null ) 094 doComparison = args.getFields().hasComparators(); // ensure we use resolved fields 095 096 if( context == null || ( doComparison && args.getFields().compare( context, args.getTuple() ) > 0 ) ) 097 return args.getTupleCopy(); 098 099 return context; 100 } 101 102 @Override 103 public Tuple complete( FlowProcess flowProcess, Tuple context ) 104 { 105 return context; 106 } 107 } 108 109 /** 110 * Constructor FirstBy creates a new FirstBy instance. Use this constructor when used with a {@link AggregateBy} 111 * instance. 112 * 113 * @param firstFields of type Fields 114 */ 115 @ConstructorProperties({"firstFields"}) 116 public FirstBy( Fields firstFields ) 117 { 118 super( firstFields, new FirstPartials( firstFields ), new First( firstFields ) ); 119 } 120 121 /** 122 * Constructor FirstBy creates a new FirstBy instance. Use this constructor when used with a {@link AggregateBy} 123 * instance. 124 * 125 * @param firstFields of type Fields 126 */ 127 @ConstructorProperties({"argumentFields", "firstFields"}) 128 public FirstBy( Fields argumentFields, Fields firstFields ) 129 { 130 super( argumentFields, new FirstPartials( argumentFields ), new First( firstFields ) ); 131 } 132 133 /////// 134 135 /** 136 * Constructor FirstBy creates a new FirstBy instance. 137 * 138 * @param pipe of type Pipe 139 * @param groupingFields of type Fields 140 * @param firstFields of type Fields 141 */ 142 @ConstructorProperties({"pipe", "groupingFields", "firstFields"}) 143 public FirstBy( Pipe pipe, Fields groupingFields, Fields firstFields ) 144 { 145 this( null, pipe, groupingFields, firstFields ); 146 } 147 148 /** 149 * Constructor FirstBy creates a new FirstBy instance. 150 * 151 * @param pipe of type Pipe 152 * @param groupingFields of type Fields 153 * @param firstFields fo type Fields 154 * @param threshold of type int 155 */ 156 @ConstructorProperties({"pipe", "groupingFields", "firstFields", "threshold"}) 157 public FirstBy( Pipe pipe, Fields groupingFields, Fields firstFields, int threshold ) 158 { 159 this( null, pipe, groupingFields, firstFields, threshold ); 160 } 161 162 /** 163 * Constructor FirstBy creates a new FirstBy instance. 164 * 165 * @param name of type String 166 * @param pipe of type Pipe 167 * @param groupingFields of type Fields 168 * @param firstFields of type Fields 169 */ 170 @ConstructorProperties({"name", "pipe", "groupingFields", "firstFields"}) 171 public FirstBy( String name, Pipe pipe, Fields groupingFields, Fields firstFields ) 172 { 173 this( name, pipe, groupingFields, firstFields, USE_DEFAULT_THRESHOLD ); 174 } 175 176 /** 177 * Constructor FirstBy creates a new FirstBy instance. 178 * 179 * @param name of type String 180 * @param pipe of type Pipe 181 * @param groupingFields of type Fields 182 * @param firstFields of type Fields 183 * @param threshold of type int 184 */ 185 @ConstructorProperties({"name", "pipe", "groupingFields", "firstFields", "threshold"}) 186 public FirstBy( String name, Pipe pipe, Fields groupingFields, Fields firstFields, int threshold ) 187 { 188 this( name, Pipe.pipes( pipe ), groupingFields, firstFields, threshold ); 189 } 190 191 /** 192 * Constructor FirstBy creates a new FirstBy instance. 193 * 194 * @param pipes of type Pipe[] 195 * @param groupingFields of type Fields 196 * @param firstFields of type Fields 197 */ 198 @ConstructorProperties({"pipes", "groupingFields", "firstFields"}) 199 public FirstBy( Pipe[] pipes, Fields groupingFields, Fields firstFields ) 200 { 201 this( null, pipes, groupingFields, firstFields, USE_DEFAULT_THRESHOLD ); 202 } 203 204 /** 205 * Constructor FirstBy creates a new FirstBy instance. 206 * 207 * @param pipes of type Pipe[] 208 * @param groupingFields of type Fields 209 * @param firstFields of type Fields 210 * @param threshold of type int 211 */ 212 @ConstructorProperties({"pipes", "groupingFields", "firstFields", "threshold"}) 213 public FirstBy( Pipe[] pipes, Fields groupingFields, Fields firstFields, int threshold ) 214 { 215 this( null, pipes, groupingFields, firstFields, threshold ); 216 } 217 218 /** 219 * Constructor FirstBy creates a new FirstBy instance. 220 * 221 * @param name of type String 222 * @param pipes of type Pipe[] 223 * @param groupingFields of type Fields 224 * @param firstFields of type Fields 225 */ 226 @ConstructorProperties({"name", "pipes", "groupingFields", "firstFields"}) 227 public FirstBy( String name, Pipe[] pipes, Fields groupingFields, Fields firstFields ) 228 { 229 this( name, pipes, groupingFields, firstFields, USE_DEFAULT_THRESHOLD ); 230 } 231 232 /** 233 * Constructor FirstBy creates a new FirstBy instance. 234 * 235 * @param name of type String 236 * @param pipes of type Pipe[] 237 * @param groupingFields of type Fields 238 * @param firstFields of type Fields 239 * @param threshold of type int 240 */ 241 @ConstructorProperties({"name", "pipes", "groupingFields", "firstFields", "threshold"}) 242 public FirstBy( String name, Pipe[] pipes, Fields groupingFields, Fields firstFields, int threshold ) 243 { 244 super( name, pipes, groupingFields, firstFields, new FirstPartials( firstFields ), new First( firstFields ), threshold ); 245 } 246 }