001/* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.pipe.assembly; 022 023import java.beans.ConstructorProperties; 024 025import cascading.flow.FlowProcess; 026import cascading.operation.aggregator.First; 027import cascading.pipe.Pipe; 028import cascading.tuple.Fields; 029import cascading.tuple.Tuple; 030import cascading.tuple.TupleEntry; 031 032/** 033 * Class FirstBy is used to return the first encountered Tuple in a tuple stream grouping. 034 * <p/> 035 * Typically finding the first Tuple in a tuple stream grouping relies on a {@link cascading.pipe.GroupBy} and a 036 * {@link cascading.operation.aggregator.First} {@link cascading.operation.Aggregator} operation. 037 * <p/> 038 * If the {@code firstFields} argument has custom {@link java.util.Comparator} instances, they will be used 039 * as the GroupBy {@code sortFields}. 040 * <p/> 041 * This SubAssembly also uses the {@link cascading.pipe.assembly.FirstBy.FirstPartials} 042 * {@link cascading.pipe.assembly.AggregateBy.Functor} 043 * to collect field values before the GroupBy operator to reduce IO over the network. 044 * <p/> 045 * This strategy is similar to using {@code combiners}, except no sorting or serialization is invoked and results 046 * in a much simpler mechanism. 047 * <p/> 048 * The {@code threshold} value tells the underlying FirstPartials functions how many unique key counts to accumulate 049 * in the LRU cache, before emitting the least recently used entry. This accumulation happens map-side, and thus is 050 * bounded by the size of your map task JVM and the typical size of each group key. 051 * <p/> 052 * By default, either the value of {@link cascading.pipe.assembly.AggregateByProps#AGGREGATE_BY_CAPACITY} System property 053 * or {@link cascading.pipe.assembly.AggregateByProps#AGGREGATE_BY_DEFAULT_CAPACITY} will be used. 054 * 055 * @see AggregateBy 056 */ 057public class FirstBy extends AggregateBy 058 { 059 /** 060 * Class CountPartials is a {@link cascading.pipe.assembly.AggregateBy.Functor} that is used to count observed duplicates from the tuple stream. 061 * <p/> 062 * Use this class typically in tandem with a {@link cascading.operation.aggregator.Sum} 063 * {@link cascading.operation.Aggregator} in order to improve counting performance by removing as many values 064 * as possible before the intermediate {@link cascading.pipe.GroupBy} operator. 065 * 066 * @see cascading.pipe.assembly.FirstBy 067 */ 068 public static class FirstPartials implements Functor 069 { 070 private final Fields declaredFields; 071 private Boolean doComparison; 072 073 /** 074 * Constructor FirstPartials creates a new FirstPartials instance. 075 * 076 * @param declaredFields of type Fields 077 */ 078 public FirstPartials( Fields declaredFields ) 079 { 080 this.declaredFields = declaredFields; 081 } 082 083 @Override 084 public Fields getDeclaredFields() 085 { 086 return declaredFields; 087 } 088 089 @Override 090 public Tuple aggregate( FlowProcess flowProcess, TupleEntry args, Tuple context ) 091 { 092 if( doComparison == null ) 093 doComparison = args.getFields().hasComparators(); // ensure we use resolved fields 094 095 if( context == null || ( doComparison && args.getFields().compare( context, args.getTuple() ) > 0 ) ) 096 return args.getTupleCopy(); 097 098 return context; 099 } 100 101 @Override 102 public Tuple complete( FlowProcess flowProcess, Tuple context ) 103 { 104 return context; 105 } 106 } 107 108 /** 109 * Constructor FirstBy creates a new FirstBy instance. Use this constructor when used with a {@link AggregateBy} 110 * instance. 111 * 112 * @param firstFields of type Fields 113 */ 114 @ConstructorProperties({"firstFields"}) 115 public FirstBy( Fields firstFields ) 116 { 117 super( firstFields, new FirstPartials( firstFields ), new First( firstFields ) ); 118 } 119 120 /** 121 * Constructor FirstBy creates a new FirstBy instance. Use this constructor when used with a {@link AggregateBy} 122 * instance. 123 * 124 * @param firstFields of type Fields 125 */ 126 @ConstructorProperties({"argumentFields", "firstFields"}) 127 public FirstBy( Fields argumentFields, Fields firstFields ) 128 { 129 super( argumentFields, new FirstPartials( argumentFields ), new First( firstFields ) ); 130 } 131 132 /////// 133 134 /** 135 * Constructor FirstBy creates a new FirstBy instance. 136 * 137 * @param pipe of type Pipe 138 * @param groupingFields of type Fields 139 * @param firstFields of type Fields 140 */ 141 @ConstructorProperties({"pipe", "groupingFields", "firstFields"}) 142 public FirstBy( Pipe pipe, Fields groupingFields, Fields firstFields ) 143 { 144 this( null, pipe, groupingFields, firstFields ); 145 } 146 147 /** 148 * Constructor FirstBy creates a new FirstBy instance. 149 * 150 * @param pipe of type Pipe 151 * @param groupingFields of type Fields 152 * @param firstFields fo type Fields 153 * @param threshold of type int 154 */ 155 @ConstructorProperties({"pipe", "groupingFields", "firstFields", "threshold"}) 156 public FirstBy( Pipe pipe, Fields groupingFields, Fields firstFields, int threshold ) 157 { 158 this( null, pipe, groupingFields, firstFields, threshold ); 159 } 160 161 /** 162 * Constructor FirstBy creates a new FirstBy instance. 163 * 164 * @param name of type String 165 * @param pipe of type Pipe 166 * @param groupingFields of type Fields 167 * @param firstFields of type Fields 168 */ 169 @ConstructorProperties({"name", "pipe", "groupingFields", "firstFields"}) 170 public FirstBy( String name, Pipe pipe, Fields groupingFields, Fields firstFields ) 171 { 172 this( name, pipe, groupingFields, firstFields, USE_DEFAULT_THRESHOLD ); 173 } 174 175 /** 176 * Constructor FirstBy creates a new FirstBy instance. 177 * 178 * @param name of type String 179 * @param pipe of type Pipe 180 * @param groupingFields of type Fields 181 * @param firstFields of type Fields 182 * @param threshold of type int 183 */ 184 @ConstructorProperties({"name", "pipe", "groupingFields", "firstFields", "threshold"}) 185 public FirstBy( String name, Pipe pipe, Fields groupingFields, Fields firstFields, int threshold ) 186 { 187 this( name, Pipe.pipes( pipe ), groupingFields, firstFields, threshold ); 188 } 189 190 /** 191 * Constructor FirstBy creates a new FirstBy instance. 192 * 193 * @param pipes of type Pipe[] 194 * @param groupingFields of type Fields 195 * @param firstFields of type Fields 196 */ 197 @ConstructorProperties({"pipes", "groupingFields", "firstFields"}) 198 public FirstBy( Pipe[] pipes, Fields groupingFields, Fields firstFields ) 199 { 200 this( null, pipes, groupingFields, firstFields, USE_DEFAULT_THRESHOLD ); 201 } 202 203 /** 204 * Constructor FirstBy creates a new FirstBy instance. 205 * 206 * @param pipes of type Pipe[] 207 * @param groupingFields of type Fields 208 * @param firstFields of type Fields 209 * @param threshold of type int 210 */ 211 @ConstructorProperties({"pipes", "groupingFields", "firstFields", "threshold"}) 212 public FirstBy( Pipe[] pipes, Fields groupingFields, Fields firstFields, int threshold ) 213 { 214 this( null, pipes, groupingFields, firstFields, threshold ); 215 } 216 217 /** 218 * Constructor FirstBy creates a new FirstBy instance. 219 * 220 * @param name of type String 221 * @param pipes of type Pipe[] 222 * @param groupingFields of type Fields 223 * @param firstFields of type Fields 224 */ 225 @ConstructorProperties({"name", "pipes", "groupingFields", "firstFields"}) 226 public FirstBy( String name, Pipe[] pipes, Fields groupingFields, Fields firstFields ) 227 { 228 this( name, pipes, groupingFields, firstFields, USE_DEFAULT_THRESHOLD ); 229 } 230 231 /** 232 * Constructor FirstBy creates a new FirstBy instance. 233 * 234 * @param name of type String 235 * @param pipes of type Pipe[] 236 * @param groupingFields of type Fields 237 * @param firstFields of type Fields 238 * @param threshold of type int 239 */ 240 @ConstructorProperties({"name", "pipes", "groupingFields", "firstFields", "threshold"}) 241 public FirstBy( String name, Pipe[] pipes, Fields groupingFields, Fields firstFields, int threshold ) 242 { 243 super( name, pipes, groupingFields, firstFields, new FirstPartials( firstFields ), new First( firstFields ), threshold ); 244 } 245 }