001    /*
002     * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.pipe.assembly;
022    
023    import java.beans.ConstructorProperties;
024    
025    import cascading.flow.FlowProcess;
026    import cascading.operation.aggregator.First;
027    import cascading.pipe.Pipe;
028    import cascading.tuple.Fields;
029    import cascading.tuple.Tuple;
030    import cascading.tuple.TupleEntry;
031    
032    /**
033     * Class FirstBy is used to return the first encountered Tuple in a tuple stream grouping.
034     * <p/>
035     * Typically finding the first Tuple in a tuple stream grouping relies on a {@link cascading.pipe.GroupBy} and a
036     * {@link cascading.operation.aggregator.First} {@link cascading.operation.Aggregator} operation.
037     * <p/>
038     * If the {@code firstFields} argument has custom {@link java.util.Comparator} instances, they will be used
039     * as the GroupBy {@code sortFields}.
040     * <p/>
041     * This SubAssembly also uses the {@link cascading.pipe.assembly.FirstBy.FirstPartials}
042     * {@link cascading.pipe.assembly.AggregateBy.Functor}
043     * to collect field values before the GroupBy operator to reduce IO over the network.
044     * <p/>
045     * This strategy is similar to using {@code combiners}, except no sorting or serialization is invoked and results
046     * in a much simpler mechanism.
047     * <p/>
048     * The {@code threshold} value tells the underlying FirstPartials functions how many unique key counts to accumulate
049     * in the LRU cache, before emitting the least recently used entry.  This accumulation happens map-side, and thus is
050     * bounded by the size of your map task JVM and the typical size of each group key.
051     * <p/>
052     * By default, either the value of {@link cascading.pipe.assembly.AggregateByProps#AGGREGATE_BY_CAPACITY} System property
053     * or {@link cascading.pipe.assembly.AggregateByProps#AGGREGATE_BY_DEFAULT_CAPACITY} will be used.
054     *
055     * @see AggregateBy
056     */
057    public class FirstBy extends AggregateBy
058      {
059      /**
060       * Class CountPartials is a {@link cascading.pipe.assembly.AggregateBy.Functor} that is used to count observed duplicates from the tuple stream.
061       * <p/>
062       * Use this class typically in tandem with a {@link cascading.operation.aggregator.Sum}
063       * {@link cascading.operation.Aggregator} in order to improve counting performance by removing as many values
064       * as possible before the intermediate {@link cascading.pipe.GroupBy} operator.
065       *
066       * @see cascading.pipe.assembly.FirstBy
067       */
068      public static class FirstPartials implements Functor
069        {
070        private final Fields declaredFields;
071        private Boolean doComparison;
072    
073        /**
074         * Constructor FirstPartials creates a new FirstPartials instance.
075         *
076         * @param declaredFields of type Fields
077         */
078        public FirstPartials( Fields declaredFields )
079          {
080          this.declaredFields = declaredFields;
081          }
082    
083        @Override
084        public Fields getDeclaredFields()
085          {
086          return declaredFields;
087          }
088    
089        @Override
090        public Tuple aggregate( FlowProcess flowProcess, TupleEntry args, Tuple context )
091          {
092          if( doComparison == null )
093            doComparison = args.getFields().hasComparators(); // ensure we use resolved fields
094    
095          if( context == null || ( doComparison && args.getFields().compare( context, args.getTuple() ) > 0 ) )
096            return args.getTupleCopy();
097    
098          return context;
099          }
100    
101        @Override
102        public Tuple complete( FlowProcess flowProcess, Tuple context )
103          {
104          return context;
105          }
106        }
107    
108      /**
109       * Constructor FirstBy creates a new FirstBy instance. Use this constructor when used with a {@link AggregateBy}
110       * instance.
111       *
112       * @param firstFields of type Fields
113       */
114      @ConstructorProperties({"firstFields"})
115      public FirstBy( Fields firstFields )
116        {
117        super( firstFields, new FirstPartials( firstFields ), new First( firstFields ) );
118        }
119    
120      /**
121       * Constructor FirstBy creates a new FirstBy instance. Use this constructor when used with a {@link AggregateBy}
122       * instance.
123       *
124       * @param firstFields of type Fields
125       */
126      @ConstructorProperties({"argumentFields", "firstFields"})
127      public FirstBy( Fields argumentFields, Fields firstFields )
128        {
129        super( argumentFields, new FirstPartials( argumentFields ), new First( firstFields ) );
130        }
131    
132      ///////
133    
134      /**
135       * Constructor FirstBy creates a new FirstBy instance.
136       *
137       * @param pipe           of type Pipe
138       * @param groupingFields of type Fields
139       * @param firstFields    of type Fields
140       */
141      @ConstructorProperties({"pipe", "groupingFields", "firstFields"})
142      public FirstBy( Pipe pipe, Fields groupingFields, Fields firstFields )
143        {
144        this( null, pipe, groupingFields, firstFields );
145        }
146    
147      /**
148       * Constructor FirstBy creates a new FirstBy instance.
149       *
150       * @param pipe           of type Pipe
151       * @param groupingFields of type Fields
152       * @param firstFields    fo type Fields
153       * @param threshold      of type int
154       */
155      @ConstructorProperties({"pipe", "groupingFields", "firstFields", "threshold"})
156      public FirstBy( Pipe pipe, Fields groupingFields, Fields firstFields, int threshold )
157        {
158        this( null, pipe, groupingFields, firstFields, threshold );
159        }
160    
161      /**
162       * Constructor FirstBy creates a new FirstBy instance.
163       *
164       * @param name           of type String
165       * @param pipe           of type Pipe
166       * @param groupingFields of type Fields
167       * @param firstFields    of type Fields
168       */
169      @ConstructorProperties({"name", "pipe", "groupingFields", "firstFields"})
170      public FirstBy( String name, Pipe pipe, Fields groupingFields, Fields firstFields )
171        {
172        this( name, pipe, groupingFields, firstFields, USE_DEFAULT_THRESHOLD );
173        }
174    
175      /**
176       * Constructor FirstBy creates a new FirstBy instance.
177       *
178       * @param name           of type String
179       * @param pipe           of type Pipe
180       * @param groupingFields of type Fields
181       * @param firstFields    of type Fields
182       * @param threshold      of type int
183       */
184      @ConstructorProperties({"name", "pipe", "groupingFields", "firstFields", "threshold"})
185      public FirstBy( String name, Pipe pipe, Fields groupingFields, Fields firstFields, int threshold )
186        {
187        this( name, Pipe.pipes( pipe ), groupingFields, firstFields, threshold );
188        }
189    
190      /**
191       * Constructor FirstBy creates a new FirstBy instance.
192       *
193       * @param pipes          of type Pipe[]
194       * @param groupingFields of type Fields
195       * @param firstFields    of type Fields
196       */
197      @ConstructorProperties({"pipes", "groupingFields", "firstFields"})
198      public FirstBy( Pipe[] pipes, Fields groupingFields, Fields firstFields )
199        {
200        this( null, pipes, groupingFields, firstFields, USE_DEFAULT_THRESHOLD );
201        }
202    
203      /**
204       * Constructor FirstBy creates a new FirstBy instance.
205       *
206       * @param pipes          of type Pipe[]
207       * @param groupingFields of type Fields
208       * @param firstFields    of type Fields
209       * @param threshold      of type int
210       */
211      @ConstructorProperties({"pipes", "groupingFields", "firstFields", "threshold"})
212      public FirstBy( Pipe[] pipes, Fields groupingFields, Fields firstFields, int threshold )
213        {
214        this( null, pipes, groupingFields, firstFields, threshold );
215        }
216    
217      /**
218       * Constructor FirstBy creates a new FirstBy instance.
219       *
220       * @param name           of type String
221       * @param pipes          of type Pipe[]
222       * @param groupingFields of type Fields
223       * @param firstFields    of type Fields
224       */
225      @ConstructorProperties({"name", "pipes", "groupingFields", "firstFields"})
226      public FirstBy( String name, Pipe[] pipes, Fields groupingFields, Fields firstFields )
227        {
228        this( name, pipes, groupingFields, firstFields, USE_DEFAULT_THRESHOLD );
229        }
230    
231      /**
232       * Constructor FirstBy creates a new FirstBy instance.
233       *
234       * @param name           of type String
235       * @param pipes          of type Pipe[]
236       * @param groupingFields of type Fields
237       * @param firstFields    of type Fields
238       * @param threshold      of type int
239       */
240      @ConstructorProperties({"name", "pipes", "groupingFields", "firstFields", "threshold"})
241      public FirstBy( String name, Pipe[] pipes, Fields groupingFields, Fields firstFields, int threshold )
242        {
243        super( name, pipes, groupingFields, firstFields, new FirstPartials( firstFields ), new First( firstFields ), threshold );
244        }
245      }