001/*
002 * Copyright (c) 2016 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
004 *
005 * Project and contact information: http://www.cascading.org/
006 *
007 * This file is part of the Cascading project.
008 *
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *     http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 */
021
022package cascading.flow.stream.element;
023
024import java.util.Comparator;
025
026import cascading.flow.FlowProcess;
027import cascading.flow.FlowProps;
028import cascading.flow.planner.Scope;
029import cascading.flow.stream.duct.Grouping;
030import cascading.flow.stream.duct.Window;
031import cascading.flow.stream.graph.IORole;
032import cascading.flow.stream.util.SparseTupleComparator;
033import cascading.pipe.Splice;
034import cascading.tuple.Fields;
035import cascading.tuple.Tuple;
036import cascading.tuple.TupleEntry;
037import cascading.tuple.TupleEntryChainIterator;
038import cascading.tuple.TupleEntryIterator;
039import cascading.tuple.Tuples;
040import cascading.tuple.io.KeyTuple;
041import cascading.tuple.io.TuplePair;
042import cascading.tuple.io.ValueTuple;
043import cascading.tuple.util.Resettable1;
044import cascading.tuple.util.Resettable2;
045import cascading.tuple.util.TupleBuilder;
046import cascading.tuple.util.TupleHasher;
047import cascading.util.NullSafeReverseComparator;
048import org.slf4j.Logger;
049import org.slf4j.LoggerFactory;
050
051import static cascading.tuple.util.TupleViews.*;
052
053/**
054 *
055 */
056public abstract class GroupingSpliceGate extends SpliceGate<TupleEntry, Grouping<TupleEntry, TupleEntryIterator>> implements Window
057  {
058  private static final Logger LOG = LoggerFactory.getLogger( GroupingSpliceGate.class );
059
060  protected Fields[] keyFields;
061  protected Fields[] sortFields;
062  protected Fields[] valuesFields;
063
064  protected Comparator<Tuple>[] groupComparators;
065  protected Comparator<Tuple>[] valueComparators;
066  protected TupleHasher groupHasher;
067  protected boolean nullsAreNotEqual;
068
069  protected TupleBuilder[] keyBuilder;
070  protected TupleBuilder[] valuesBuilder;
071  protected TupleBuilder[] sortBuilder;
072
073  // as sink
074  protected Tuple keyTuple; // alias for groupTuple or groupSortTuple
075  protected Resettable1<Tuple> groupTuple;
076  protected Resettable2<Tuple, Tuple> groupSortTuple;
077  protected Resettable1<Tuple> valueTuple;
078
079  // as source
080  protected Grouping<TupleEntry, TupleEntryIterator> grouping;
081  protected TupleEntry keyEntry;
082  protected TupleEntryChainIterator tupleEntryIterator;
083
084  protected GroupingSpliceGate( FlowProcess flowProcess, Splice splice )
085    {
086    super( flowProcess, splice );
087    }
088
089  protected GroupingSpliceGate( FlowProcess flowProcess, Splice splice, IORole role )
090    {
091    super( flowProcess, splice, role );
092    }
093
094  protected TupleBuilder createNarrowBuilder( final Fields incomingFields, final Fields narrowFields )
095    {
096    if( narrowFields.isNone() )
097      return new TupleBuilder()
098        {
099        @Override
100        public Tuple makeResult( Tuple input, Tuple output )
101          {
102          return Tuple.NULL;
103          }
104        };
105
106    if( incomingFields.isUnknown() )
107      return new TupleBuilder()
108        {
109        @Override
110        public Tuple makeResult( Tuple input, Tuple output )
111          {
112          return input.get( incomingFields, narrowFields );
113          }
114        };
115
116    if( narrowFields.isAll() ) // dubious this is ever reached
117      return new TupleBuilder()
118        {
119        @Override
120        public Tuple makeResult( Tuple input, Tuple output )
121          {
122          return input;
123          }
124        };
125
126    return createDefaultNarrowBuilder( incomingFields, narrowFields );
127    }
128
129  protected TupleBuilder createDefaultNarrowBuilder( final Fields incomingFields, final Fields narrowFields )
130    {
131    return new TupleBuilder()
132      {
133      Tuple result = createNarrow( incomingFields.getPos( narrowFields ) );
134
135      @Override
136      public Tuple makeResult( Tuple input, Tuple output )
137        {
138        return reset( result, input );
139        }
140      };
141    }
142
143  protected TupleBuilder createNulledBuilder( final Fields incomingFields, final Fields keyField )
144    {
145    if( incomingFields.isUnknown() )
146      return new TupleBuilder()
147        {
148        @Override
149        public Tuple makeResult( Tuple input, Tuple output )
150          {
151          return Tuples.nulledCopy( incomingFields, input, keyField );
152          }
153        };
154
155    if( keyField.isNone() )
156      return new TupleBuilder()
157        {
158        @Override
159        public Tuple makeResult( Tuple input, Tuple output )
160          {
161          return input;
162          }
163        };
164
165    if( keyField.isAll() )
166      return new TupleBuilder()
167        {
168        Tuple nullTuple = Tuple.size( incomingFields.size() );
169
170        @Override
171        public Tuple makeResult( Tuple input, Tuple output )
172          {
173          return nullTuple;
174          }
175        };
176
177    return new TupleBuilder()
178      {
179      Tuple nullTuple = Tuple.size( keyField.size() );
180      Tuple result = createOverride( incomingFields, keyField );
181
182      @Override
183      public Tuple makeResult( Tuple baseTuple, Tuple output )
184        {
185        return reset( result, baseTuple, nullTuple );
186        }
187      };
188    }
189
190  @Override
191  public void initialize()
192    {
193    super.initialize();
194
195    int size = getNumDeclaredIncomingBranches(); // is the maximum ordinal value
196
197    // this is a merge, all fields have the same declaration
198    // filling out full array has implications on joiner/closure which should be resolved independently
199    if( role == IORole.source && splice.isGroupBy() )
200      size = 1;
201
202    keyFields = new Fields[ size ];
203    valuesFields = new Fields[ size ];
204
205    keyBuilder = new TupleBuilder[ size ];
206    valuesBuilder = new TupleBuilder[ size ];
207
208    if( splice.isSorted() )
209      {
210      sortFields = new Fields[ size ];
211      sortBuilder = new TupleBuilder[ size ];
212      }
213
214    Scope outgoingScope = outgoingScopes.get( 0 );
215
216    int numScopes = Math.min( size, incomingScopes.size() );
217    for( int i = 0; i < numScopes; i++ )
218      {
219      Scope incomingScope = incomingScopes.get( i );
220
221      // for GroupBy, incoming may have same name, but guaranteed to have same key/value/sort fields for merge
222      // arrays may be size 1, then ordinal should always be zero.
223      int ordinal = size == 1 ? 0 : incomingScope.getOrdinal();
224
225      keyFields[ ordinal ] = outgoingScope.getKeySelectors().get( incomingScope.getName() );
226      valuesFields[ ordinal ] = incomingScope.getIncomingSpliceFields();
227
228      keyBuilder[ ordinal ] = createNarrowBuilder( incomingScope.getIncomingSpliceFields(), keyFields[ ordinal ] );
229      valuesBuilder[ ordinal ] = createNulledBuilder( incomingScope.getIncomingSpliceFields(), keyFields[ ordinal ] );
230
231      if( sortFields != null )
232        {
233        sortFields[ ordinal ] = outgoingScope.getSortingSelectors().get( incomingScope.getName() );
234        sortBuilder[ ordinal ] = createNarrowBuilder( incomingScope.getIncomingSpliceFields(), sortFields[ ordinal ] );
235        }
236
237      if( LOG.isDebugEnabled() )
238        {
239        LOG.debug( "incomingScope: {}, in pos: {}", incomingScope.getName(), ordinal );
240        LOG.debug( "keyFields: {}", printSafe( keyFields[ ordinal ] ) );
241        LOG.debug( "valueFields: {}", printSafe( valuesFields[ ordinal ] ) );
242
243        if( sortFields != null )
244          LOG.debug( "sortFields: {}", printSafe( sortFields[ ordinal ] ) );
245        }
246      }
247
248    if( role == IORole.sink )
249      {
250      if( sortFields == null )
251        groupTuple = new KeyTuple();
252      else
253        groupSortTuple = new TuplePair();
254
255      keyTuple = (Tuple) ( sortFields == null ? groupTuple : groupSortTuple );
256      valueTuple = new ValueTuple();
257
258      return;
259      }
260
261    keyEntry = new TupleEntry( outgoingScope.getOutGroupingFields(), true );
262    tupleEntryIterator = new TupleEntryChainIterator( outgoingScope.getOutValuesFields() );
263
264    grouping = new Grouping<>();
265    grouping.key = keyEntry;
266    grouping.joinIterator = tupleEntryIterator;
267    }
268
269  protected void initComparators()
270    {
271    Comparator defaultComparator = (Comparator) flowProcess.newInstance( (String) flowProcess.getProperty( FlowProps.DEFAULT_ELEMENT_COMPARATOR ) );
272
273    Fields[] compareFields = new Fields[ getNumDeclaredIncomingBranches() ];
274    groupComparators = new Comparator[ getNumDeclaredIncomingBranches() ];
275
276    if( splice.isSorted() )
277      valueComparators = new Comparator[ getNumDeclaredIncomingBranches() ];
278
279    int size = splice.isGroupBy() ? 1 : getNumDeclaredIncomingBranches();
280
281    for( int i = 0; i < size; i++ )
282      {
283      Scope incomingScope = incomingScopes.get( i );
284
285      int pos = splice.isGroupBy() ? 0 : splice.getPipePos().get( incomingScope.getName() );
286
287      // we want the comparators
288      Fields groupFields = splice.getKeySelectors().get( incomingScope.getName() );
289
290      compareFields[ pos ] = groupFields; // used for finding hashers
291
292      if( groupFields.size() == 0 )
293        groupComparators[ pos ] = groupFields;
294      else
295        groupComparators[ pos ] = new SparseTupleComparator( Fields.asDeclaration( groupFields ), defaultComparator );
296
297      groupComparators[ pos ] = splice.isSortReversed() ? NullSafeReverseComparator.reverseOrder( groupComparators[ pos ] ) : groupComparators[ pos ];
298
299      if( sortFields != null )
300        {
301        // we want the comparators, so don't use sortFields array
302        Fields sortFields = splice.getSortingSelectors().get( incomingScope.getName() );
303        valueComparators[ pos ] = new SparseTupleComparator( valuesFields[ pos ], sortFields, defaultComparator );
304
305        if( splice.isSortReversed() )
306          valueComparators[ pos ] = NullSafeReverseComparator.reverseOrder( valueComparators[ pos ] );
307        }
308      }
309
310    nullsAreNotEqual = !areNullsEqual();
311
312    if( nullsAreNotEqual )
313      LOG.debug( "treating null values in Tuples at not equal during grouping" );
314
315    Comparator[] hashers = TupleHasher.merge( compareFields );
316    groupHasher = defaultComparator != null || !TupleHasher.isNull( hashers ) ? new TupleHasher( defaultComparator, hashers ) : null;
317    }
318
319  protected Comparator getKeyComparator()
320    {
321    if( groupComparators.length > 0 && groupComparators[ 0 ] != null )
322      return groupComparators[ 0 ];
323
324    return new Comparator<Comparable>()
325      {
326      @Override
327      public int compare( Comparable lhs, Comparable rhs )
328        {
329        return lhs.compareTo( rhs );
330        }
331      };
332    }
333
334  @Override
335  public void cleanup()
336    {
337    super.cleanup();
338
339    // close if top of stack
340    if( next == null )
341      flowProcess.closeTrapCollectors();
342    }
343
344  private boolean areNullsEqual()
345    {
346    try
347      {
348      Tuple tupleWithNull = Tuple.size( 1 );
349
350      return groupComparators[ 0 ].compare( tupleWithNull, tupleWithNull ) == 0;
351      }
352    catch( Exception exception )
353      {
354      return true; // assume we have an npe or something and they don't expect to see nulls
355      }
356    }
357
358  protected int getNumDeclaredIncomingBranches()
359    {
360    return splice.getPrevious().length;
361    }
362
363  /**
364   * This allows the tuple to honor the hasher and comparators, if any
365   *
366   * @param object the tuple to wrap
367   * @return a DelegatedTuple instance
368   */
369  protected final Tuple getDelegatedTuple( Tuple object )
370    {
371    if( groupHasher == null )
372      return object;
373
374    return new DelegatedTuple( object );
375    }
376
377  private String printSafe( Fields fields )
378    {
379    if( fields != null )
380      return fields.printVerbose();
381
382    return "";
383    }
384
385  protected class DelegatedTuple extends Tuple
386    {
387    public DelegatedTuple( Tuple wrapped )
388      {
389      // pass it in to prevent one being allocated
390      super( Tuple.elements( wrapped ) );
391      }
392
393    @Override
394    public boolean equals( Object object )
395      {
396      return compareTo( object ) == 0;
397      }
398
399    @Override
400    public int compareTo( Object other )
401      {
402      return groupComparators[ 0 ].compare( this, (Tuple) other );
403      }
404
405    @Override
406    public int hashCode()
407      {
408      return groupHasher.hashCode( this );
409      }
410    }
411  }