001/*
002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow.stream.element;
022
023import java.util.Collections;
024import java.util.Comparator;
025import java.util.Map;
026
027import cascading.flow.FlowProcess;
028import cascading.flow.FlowProps;
029import cascading.flow.planner.Scope;
030import cascading.flow.stream.duct.Duct;
031import cascading.flow.stream.duct.Grouping;
032import cascading.flow.stream.duct.Window;
033import cascading.flow.stream.graph.IORole;
034import cascading.flow.stream.graph.StreamGraph;
035import cascading.flow.stream.util.SparseTupleComparator;
036import cascading.pipe.Splice;
037import cascading.tuple.Fields;
038import cascading.tuple.Tuple;
039import cascading.tuple.TupleEntry;
040import cascading.tuple.TupleEntryChainIterator;
041import cascading.tuple.TupleEntryIterator;
042import cascading.tuple.Tuples;
043import cascading.tuple.io.KeyTuple;
044import cascading.tuple.io.TuplePair;
045import cascading.tuple.io.ValueTuple;
046import cascading.tuple.util.Resettable1;
047import cascading.tuple.util.Resettable2;
048import cascading.tuple.util.TupleBuilder;
049import cascading.tuple.util.TupleHasher;
050import org.slf4j.Logger;
051import org.slf4j.LoggerFactory;
052
053import static cascading.tuple.util.TupleViews.*;
054
055/**
056 *
057 */
058public abstract class GroupingSpliceGate extends SpliceGate<TupleEntry, Grouping<TupleEntry, TupleEntryIterator>> implements Window
059  {
060  private static final Logger LOG = LoggerFactory.getLogger( GroupingSpliceGate.class );
061
062  // incoming ducts have a unique ordinal into the splice, so we cannot store the ordinal directly on the
063  // previous duct
064  protected Map<Duct, Integer> ordinalMap;
065
066  protected Fields[] keyFields;
067  protected Fields[] sortFields;
068  protected Fields[] valuesFields;
069
070  protected Comparator<Tuple>[] groupComparators;
071  protected Comparator<Tuple>[] valueComparators;
072  protected TupleHasher groupHasher;
073  protected boolean nullsAreNotEqual;
074
075  protected TupleBuilder[] keyBuilder;
076  protected TupleBuilder[] valuesBuilder;
077  protected TupleBuilder[] sortBuilder;
078
079  // as sink
080  protected Tuple keyTuple; // alias for groupTuple or groupSortTuple
081  protected Resettable1<Tuple> groupTuple;
082  protected Resettable2<Tuple, Tuple> groupSortTuple;
083  protected Resettable1<Tuple> valueTuple;
084
085  // as source
086  protected Grouping<TupleEntry, TupleEntryIterator> grouping;
087  protected TupleEntry keyEntry;
088  protected TupleEntryChainIterator tupleEntryIterator;
089
090  protected GroupingSpliceGate( FlowProcess flowProcess, Splice splice )
091    {
092    super( flowProcess, splice );
093    }
094
095  protected GroupingSpliceGate( FlowProcess flowProcess, Splice splice, IORole role )
096    {
097    super( flowProcess, splice, role );
098    }
099
100  @Override
101  public void bind( StreamGraph streamGraph )
102    {
103    super.bind( streamGraph );
104
105    setOrdinalMap( streamGraph );
106    }
107
108  protected synchronized void setOrdinalMap( StreamGraph streamGraph )
109    {
110    ordinalMap = streamGraph.getOrdinalMap( this );
111    }
112
113  protected TupleBuilder createNarrowBuilder( final Fields incomingFields, final Fields narrowFields )
114    {
115    if( narrowFields.isNone() )
116      return new TupleBuilder()
117      {
118      @Override
119      public Tuple makeResult( Tuple input, Tuple output )
120        {
121        return Tuple.NULL;
122        }
123      };
124
125    if( incomingFields.isUnknown() )
126      return new TupleBuilder()
127      {
128      @Override
129      public Tuple makeResult( Tuple input, Tuple output )
130        {
131        return input.get( incomingFields, narrowFields );
132        }
133      };
134
135    if( narrowFields.isAll() ) // dubious this is ever reached
136      return new TupleBuilder()
137      {
138      @Override
139      public Tuple makeResult( Tuple input, Tuple output )
140        {
141        return input;
142        }
143      };
144
145    return createDefaultNarrowBuilder( incomingFields, narrowFields );
146    }
147
148  protected TupleBuilder createDefaultNarrowBuilder( final Fields incomingFields, final Fields narrowFields )
149    {
150    return new TupleBuilder()
151    {
152    Tuple result = createNarrow( incomingFields.getPos( narrowFields ) );
153
154    @Override
155    public Tuple makeResult( Tuple input, Tuple output )
156      {
157      return reset( result, input );
158      }
159    };
160    }
161
162  protected TupleBuilder createNulledBuilder( final Fields incomingFields, final Fields keyField )
163    {
164    if( incomingFields.isUnknown() )
165      return new TupleBuilder()
166      {
167      @Override
168      public Tuple makeResult( Tuple input, Tuple output )
169        {
170        return Tuples.nulledCopy( incomingFields, input, keyField );
171        }
172      };
173
174    if( keyField.isNone() )
175      return new TupleBuilder()
176      {
177      @Override
178      public Tuple makeResult( Tuple input, Tuple output )
179        {
180        return input;
181        }
182      };
183
184    if( keyField.isAll() )
185      return new TupleBuilder()
186      {
187      Tuple nullTuple = Tuple.size( incomingFields.size() );
188
189      @Override
190      public Tuple makeResult( Tuple input, Tuple output )
191        {
192        return nullTuple;
193        }
194      };
195
196    return new TupleBuilder()
197    {
198    Tuple nullTuple = Tuple.size( keyField.size() );
199    Tuple result = createOverride( incomingFields, keyField );
200
201    @Override
202    public Tuple makeResult( Tuple baseTuple, Tuple output )
203      {
204      return reset( result, baseTuple, nullTuple );
205      }
206    };
207    }
208
209  @Override
210  public void initialize()
211    {
212    super.initialize();
213
214    int size = getNumDeclaredIncomingBranches(); // is the maximum ordinal value
215
216    // this is a merge, all fields have the same declaration
217    // filling out full array has implications on joiner/closure which should be resolved independently
218    if( role == IORole.source && splice.isGroupBy() )
219      size = 1;
220
221    keyFields = new Fields[ size ];
222    valuesFields = new Fields[ size ];
223
224    keyBuilder = new TupleBuilder[ size ];
225    valuesBuilder = new TupleBuilder[ size ];
226
227    if( splice.isSorted() )
228      {
229      sortFields = new Fields[ size ];
230      sortBuilder = new TupleBuilder[ size ];
231      }
232
233    Scope outgoingScope = outgoingScopes.get( 0 );
234
235    int numScopes = Math.min( size, incomingScopes.size() );
236    for( int i = 0; i < numScopes; i++ )
237      {
238      Scope incomingScope = incomingScopes.get( i );
239
240      // for GroupBy, incoming may have same name, but guaranteed to have same key/value/sort fields for merge
241      // arrays may be size 1, then ordinal should always be zero.
242      int ordinal = size == 1 ? 0 : incomingScope.getOrdinal();
243
244      keyFields[ ordinal ] = outgoingScope.getKeySelectors().get( incomingScope.getName() );
245      valuesFields[ ordinal ] = incomingScope.getIncomingSpliceFields();
246
247      keyBuilder[ ordinal ] = createNarrowBuilder( incomingScope.getIncomingSpliceFields(), keyFields[ ordinal ] );
248      valuesBuilder[ ordinal ] = createNulledBuilder( incomingScope.getIncomingSpliceFields(), keyFields[ ordinal ] );
249
250      if( sortFields != null )
251        {
252        sortFields[ ordinal ] = outgoingScope.getSortingSelectors().get( incomingScope.getName() );
253        sortBuilder[ ordinal ] = createNarrowBuilder( incomingScope.getIncomingSpliceFields(), sortFields[ ordinal ] );
254        }
255
256      if( LOG.isDebugEnabled() )
257        {
258        LOG.debug( "incomingScope: {}, in pos: {}", incomingScope.getName(), ordinal );
259        LOG.debug( "keyFields: {}", printSafe( keyFields[ ordinal ] ) );
260        LOG.debug( "valueFields: {}", printSafe( valuesFields[ ordinal ] ) );
261
262        if( sortFields != null )
263          LOG.debug( "sortFields: {}", printSafe( sortFields[ ordinal ] ) );
264        }
265      }
266
267    if( role == IORole.sink )
268      {
269      if( sortFields == null )
270        groupTuple = new KeyTuple();
271      else
272        groupSortTuple = new TuplePair();
273
274      keyTuple = (Tuple) ( sortFields == null ? groupTuple : groupSortTuple );
275      valueTuple = new ValueTuple();
276
277      return;
278      }
279
280    keyEntry = new TupleEntry( outgoingScope.getOutGroupingFields(), true );
281    tupleEntryIterator = new TupleEntryChainIterator( outgoingScope.getOutValuesFields() );
282
283    grouping = new Grouping<>();
284    grouping.key = keyEntry;
285    grouping.joinIterator = tupleEntryIterator;
286    }
287
288  protected void initComparators()
289    {
290    Comparator defaultComparator = (Comparator) flowProcess.newInstance( (String) flowProcess.getProperty( FlowProps.DEFAULT_ELEMENT_COMPARATOR ) );
291
292    Fields[] compareFields = new Fields[ getNumDeclaredIncomingBranches() ];
293    groupComparators = new Comparator[ getNumDeclaredIncomingBranches() ];
294
295    if( splice.isSorted() )
296      valueComparators = new Comparator[ getNumDeclaredIncomingBranches() ];
297
298    int size = splice.isGroupBy() ? 1 : getNumDeclaredIncomingBranches();
299
300    for( int i = 0; i < size; i++ )
301      {
302      Scope incomingScope = incomingScopes.get( i );
303
304      int pos = splice.isGroupBy() ? 0 : splice.getPipePos().get( incomingScope.getName() );
305
306      // we want the comparators
307      Fields groupFields = splice.getKeySelectors().get( incomingScope.getName() );
308
309      compareFields[ pos ] = groupFields; // used for finding hashers
310
311      if( groupFields.size() == 0 )
312        groupComparators[ pos ] = groupFields;
313      else
314        groupComparators[ pos ] = new SparseTupleComparator( Fields.asDeclaration( groupFields ), defaultComparator );
315
316      groupComparators[ pos ] = splice.isSortReversed() ? Collections.reverseOrder( groupComparators[ pos ] ) : groupComparators[ pos ];
317
318      if( sortFields != null )
319        {
320        // we want the comparators, so don't use sortFields array
321        Fields sortFields = splice.getSortingSelectors().get( incomingScope.getName() );
322        valueComparators[ pos ] = new SparseTupleComparator( valuesFields[ pos ], sortFields, defaultComparator );
323
324        if( splice.isSortReversed() )
325          valueComparators[ pos ] = Collections.reverseOrder( valueComparators[ pos ] );
326        }
327      }
328
329    nullsAreNotEqual = !areNullsEqual();
330
331    if( nullsAreNotEqual )
332      LOG.debug( "treating null values in Tuples at not equal during grouping" );
333
334    Comparator[] hashers = TupleHasher.merge( compareFields );
335    groupHasher = defaultComparator != null || !TupleHasher.isNull( hashers ) ? new TupleHasher( defaultComparator, hashers ) : null;
336    }
337
338  protected Comparator getKeyComparator()
339    {
340    if( groupComparators.length > 0 && groupComparators[ 0 ] != null )
341      return groupComparators[ 0 ];
342
343    return new Comparator<Comparable>()
344    {
345    @Override
346    public int compare( Comparable lhs, Comparable rhs )
347      {
348      return lhs.compareTo( rhs );
349      }
350    };
351    }
352
353  @Override
354  public void cleanup()
355    {
356    super.cleanup();
357
358    // close if top of stack
359    if( next == null )
360      flowProcess.closeTrapCollectors();
361    }
362
363  private boolean areNullsEqual()
364    {
365    try
366      {
367      Tuple tupleWithNull = Tuple.size( 1 );
368
369      return groupComparators[ 0 ].compare( tupleWithNull, tupleWithNull ) == 0;
370      }
371    catch( Exception exception )
372      {
373      return true; // assume we have an npe or something and they don't expect to see nulls
374      }
375    }
376
377  protected int getNumDeclaredIncomingBranches()
378    {
379    return splice.getPrevious().length;
380    }
381
382  /**
383   * This allows the tuple to honor the hasher and comparators, if any
384   *
385   * @param object the tuple to wrap
386   * @return a DelegatedTuple instance
387   */
388  protected final Tuple getDelegatedTuple( Tuple object )
389    {
390    if( groupHasher == null )
391      return object;
392
393    return new DelegatedTuple( object );
394    }
395
396  private String printSafe( Fields fields )
397    {
398    if( fields != null )
399      return fields.printVerbose();
400
401    return "";
402    }
403
404  @Override
405  public final boolean equals( Object object )
406    {
407    if( this == object )
408      return true;
409    if( !( object instanceof GroupingSpliceGate ) )
410      return false;
411
412    GroupingSpliceGate groupingSpliceGate = (GroupingSpliceGate) object;
413
414    if( splice != null ? splice != groupingSpliceGate.splice : groupingSpliceGate.splice != null )
415      return false;
416
417    return true;
418    }
419
420  @Override
421  public final int hashCode()
422    {
423    return splice != null ? System.identityHashCode( splice ) : 0;
424    }
425
426  @Override
427  public String toString()
428    {
429    final StringBuilder sb = new StringBuilder( "SpliceGate{" );
430    sb.append( "splice=" ).append( splice );
431    sb.append( ", role=" ).append( role );
432    sb.append( '}' );
433    return sb.toString();
434    }
435
436  protected class DelegatedTuple extends Tuple
437    {
438    public DelegatedTuple( Tuple wrapped )
439      {
440      // pass it in to prevent one being allocated
441      super( Tuple.elements( wrapped ) );
442      }
443
444    @Override
445    public boolean equals( Object object )
446      {
447      return compareTo( object ) == 0;
448      }
449
450    @Override
451    public int compareTo( Object other )
452      {
453      return groupComparators[ 0 ].compare( this, (Tuple) other );
454      }
455
456    @Override
457    public int hashCode()
458      {
459      return groupHasher.hashCode( this );
460      }
461    }
462  }