001/* 002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.flow.stream.element; 022 023import java.util.Collections; 024import java.util.Comparator; 025import java.util.Map; 026 027import cascading.flow.FlowProcess; 028import cascading.flow.FlowProps; 029import cascading.flow.planner.Scope; 030import cascading.flow.stream.duct.Duct; 031import cascading.flow.stream.duct.Grouping; 032import cascading.flow.stream.duct.Window; 033import cascading.flow.stream.graph.IORole; 034import cascading.flow.stream.graph.StreamGraph; 035import cascading.flow.stream.util.SparseTupleComparator; 036import cascading.pipe.Splice; 037import cascading.tuple.Fields; 038import cascading.tuple.Tuple; 039import cascading.tuple.TupleEntry; 040import cascading.tuple.TupleEntryChainIterator; 041import cascading.tuple.TupleEntryIterator; 042import cascading.tuple.Tuples; 043import cascading.tuple.io.KeyTuple; 044import cascading.tuple.io.TuplePair; 045import cascading.tuple.io.ValueTuple; 046import cascading.tuple.util.Resettable1; 047import cascading.tuple.util.Resettable2; 048import cascading.tuple.util.TupleBuilder; 049import cascading.tuple.util.TupleHasher; 050import org.slf4j.Logger; 051import org.slf4j.LoggerFactory; 052 053import static cascading.tuple.util.TupleViews.*; 054 055/** 056 * 057 */ 058public abstract class GroupingSpliceGate extends SpliceGate<TupleEntry, Grouping<TupleEntry, TupleEntryIterator>> implements Window 059 { 060 private static final Logger LOG = LoggerFactory.getLogger( GroupingSpliceGate.class ); 061 062 // incoming ducts have a unique ordinal into the splice, so we cannot store the ordinal directly on the 063 // previous duct 064 protected Map<Duct, Integer> ordinalMap; 065 066 protected Fields[] keyFields; 067 protected Fields[] sortFields; 068 protected Fields[] valuesFields; 069 070 protected Comparator<Tuple>[] groupComparators; 071 protected Comparator<Tuple>[] valueComparators; 072 protected TupleHasher groupHasher; 073 protected boolean nullsAreNotEqual; 074 075 protected TupleBuilder[] keyBuilder; 076 protected TupleBuilder[] valuesBuilder; 077 protected TupleBuilder[] sortBuilder; 078 079 // as sink 080 protected Tuple keyTuple; // alias for groupTuple or groupSortTuple 081 protected Resettable1<Tuple> groupTuple; 082 protected Resettable2<Tuple, Tuple> groupSortTuple; 083 protected Resettable1<Tuple> valueTuple; 084 085 // as source 086 protected Grouping<TupleEntry, TupleEntryIterator> grouping; 087 protected TupleEntry keyEntry; 088 protected TupleEntryChainIterator tupleEntryIterator; 089 090 protected GroupingSpliceGate( FlowProcess flowProcess, Splice splice ) 091 { 092 super( flowProcess, splice ); 093 } 094 095 protected GroupingSpliceGate( FlowProcess flowProcess, Splice splice, IORole role ) 096 { 097 super( flowProcess, splice, role ); 098 } 099 100 @Override 101 public void bind( StreamGraph streamGraph ) 102 { 103 super.bind( streamGraph ); 104 105 setOrdinalMap( streamGraph ); 106 } 107 108 protected synchronized void setOrdinalMap( StreamGraph streamGraph ) 109 { 110 ordinalMap = streamGraph.getOrdinalMap( this ); 111 } 112 113 protected TupleBuilder createNarrowBuilder( final Fields incomingFields, final Fields narrowFields ) 114 { 115 if( narrowFields.isNone() ) 116 return new TupleBuilder() 117 { 118 @Override 119 public Tuple makeResult( Tuple input, Tuple output ) 120 { 121 return Tuple.NULL; 122 } 123 }; 124 125 if( incomingFields.isUnknown() ) 126 return new TupleBuilder() 127 { 128 @Override 129 public Tuple makeResult( Tuple input, Tuple output ) 130 { 131 return input.get( incomingFields, narrowFields ); 132 } 133 }; 134 135 if( narrowFields.isAll() ) // dubious this is ever reached 136 return new TupleBuilder() 137 { 138 @Override 139 public Tuple makeResult( Tuple input, Tuple output ) 140 { 141 return input; 142 } 143 }; 144 145 return createDefaultNarrowBuilder( incomingFields, narrowFields ); 146 } 147 148 protected TupleBuilder createDefaultNarrowBuilder( final Fields incomingFields, final Fields narrowFields ) 149 { 150 return new TupleBuilder() 151 { 152 Tuple result = createNarrow( incomingFields.getPos( narrowFields ) ); 153 154 @Override 155 public Tuple makeResult( Tuple input, Tuple output ) 156 { 157 return reset( result, input ); 158 } 159 }; 160 } 161 162 protected TupleBuilder createNulledBuilder( final Fields incomingFields, final Fields keyField ) 163 { 164 if( incomingFields.isUnknown() ) 165 return new TupleBuilder() 166 { 167 @Override 168 public Tuple makeResult( Tuple input, Tuple output ) 169 { 170 return Tuples.nulledCopy( incomingFields, input, keyField ); 171 } 172 }; 173 174 if( keyField.isNone() ) 175 return new TupleBuilder() 176 { 177 @Override 178 public Tuple makeResult( Tuple input, Tuple output ) 179 { 180 return input; 181 } 182 }; 183 184 if( keyField.isAll() ) 185 return new TupleBuilder() 186 { 187 Tuple nullTuple = Tuple.size( incomingFields.size() ); 188 189 @Override 190 public Tuple makeResult( Tuple input, Tuple output ) 191 { 192 return nullTuple; 193 } 194 }; 195 196 return new TupleBuilder() 197 { 198 Tuple nullTuple = Tuple.size( keyField.size() ); 199 Tuple result = createOverride( incomingFields, keyField ); 200 201 @Override 202 public Tuple makeResult( Tuple baseTuple, Tuple output ) 203 { 204 return reset( result, baseTuple, nullTuple ); 205 } 206 }; 207 } 208 209 @Override 210 public void initialize() 211 { 212 super.initialize(); 213 214 int size = getNumDeclaredIncomingBranches(); // is the maximum ordinal value 215 216 // this is a merge, all fields have the same declaration 217 // filling out full array has implications on joiner/closure which should be resolved independently 218 if( role == IORole.source && splice.isGroupBy() ) 219 size = 1; 220 221 keyFields = new Fields[ size ]; 222 valuesFields = new Fields[ size ]; 223 224 keyBuilder = new TupleBuilder[ size ]; 225 valuesBuilder = new TupleBuilder[ size ]; 226 227 if( splice.isSorted() ) 228 { 229 sortFields = new Fields[ size ]; 230 sortBuilder = new TupleBuilder[ size ]; 231 } 232 233 Scope outgoingScope = outgoingScopes.get( 0 ); 234 235 int numScopes = Math.min( size, incomingScopes.size() ); 236 for( int i = 0; i < numScopes; i++ ) 237 { 238 Scope incomingScope = incomingScopes.get( i ); 239 240 // for GroupBy, incoming may have same name, but guaranteed to have same key/value/sort fields for merge 241 // arrays may be size 1, then ordinal should always be zero. 242 int ordinal = size == 1 ? 0 : incomingScope.getOrdinal(); 243 244 keyFields[ ordinal ] = outgoingScope.getKeySelectors().get( incomingScope.getName() ); 245 valuesFields[ ordinal ] = incomingScope.getIncomingSpliceFields(); 246 247 keyBuilder[ ordinal ] = createNarrowBuilder( incomingScope.getIncomingSpliceFields(), keyFields[ ordinal ] ); 248 valuesBuilder[ ordinal ] = createNulledBuilder( incomingScope.getIncomingSpliceFields(), keyFields[ ordinal ] ); 249 250 if( sortFields != null ) 251 { 252 sortFields[ ordinal ] = outgoingScope.getSortingSelectors().get( incomingScope.getName() ); 253 sortBuilder[ ordinal ] = createNarrowBuilder( incomingScope.getIncomingSpliceFields(), sortFields[ ordinal ] ); 254 } 255 256 if( LOG.isDebugEnabled() ) 257 { 258 LOG.debug( "incomingScope: {}, in pos: {}", incomingScope.getName(), ordinal ); 259 LOG.debug( "keyFields: {}", printSafe( keyFields[ ordinal ] ) ); 260 LOG.debug( "valueFields: {}", printSafe( valuesFields[ ordinal ] ) ); 261 262 if( sortFields != null ) 263 LOG.debug( "sortFields: {}", printSafe( sortFields[ ordinal ] ) ); 264 } 265 } 266 267 if( role == IORole.sink ) 268 { 269 if( sortFields == null ) 270 groupTuple = new KeyTuple(); 271 else 272 groupSortTuple = new TuplePair(); 273 274 keyTuple = (Tuple) ( sortFields == null ? groupTuple : groupSortTuple ); 275 valueTuple = new ValueTuple(); 276 277 return; 278 } 279 280 keyEntry = new TupleEntry( outgoingScope.getOutGroupingFields(), true ); 281 tupleEntryIterator = new TupleEntryChainIterator( outgoingScope.getOutValuesFields() ); 282 283 grouping = new Grouping<>(); 284 grouping.key = keyEntry; 285 grouping.joinIterator = tupleEntryIterator; 286 } 287 288 protected void initComparators() 289 { 290 Comparator defaultComparator = (Comparator) flowProcess.newInstance( (String) flowProcess.getProperty( FlowProps.DEFAULT_ELEMENT_COMPARATOR ) ); 291 292 Fields[] compareFields = new Fields[ getNumDeclaredIncomingBranches() ]; 293 groupComparators = new Comparator[ getNumDeclaredIncomingBranches() ]; 294 295 if( splice.isSorted() ) 296 valueComparators = new Comparator[ getNumDeclaredIncomingBranches() ]; 297 298 int size = splice.isGroupBy() ? 1 : getNumDeclaredIncomingBranches(); 299 300 for( int i = 0; i < size; i++ ) 301 { 302 Scope incomingScope = incomingScopes.get( i ); 303 304 int pos = splice.isGroupBy() ? 0 : splice.getPipePos().get( incomingScope.getName() ); 305 306 // we want the comparators 307 Fields groupFields = splice.getKeySelectors().get( incomingScope.getName() ); 308 309 compareFields[ pos ] = groupFields; // used for finding hashers 310 311 if( groupFields.size() == 0 ) 312 groupComparators[ pos ] = groupFields; 313 else 314 groupComparators[ pos ] = new SparseTupleComparator( Fields.asDeclaration( groupFields ), defaultComparator ); 315 316 groupComparators[ pos ] = splice.isSortReversed() ? Collections.reverseOrder( groupComparators[ pos ] ) : groupComparators[ pos ]; 317 318 if( sortFields != null ) 319 { 320 // we want the comparators, so don't use sortFields array 321 Fields sortFields = splice.getSortingSelectors().get( incomingScope.getName() ); 322 valueComparators[ pos ] = new SparseTupleComparator( valuesFields[ pos ], sortFields, defaultComparator ); 323 324 if( splice.isSortReversed() ) 325 valueComparators[ pos ] = Collections.reverseOrder( valueComparators[ pos ] ); 326 } 327 } 328 329 nullsAreNotEqual = !areNullsEqual(); 330 331 if( nullsAreNotEqual ) 332 LOG.debug( "treating null values in Tuples at not equal during grouping" ); 333 334 Comparator[] hashers = TupleHasher.merge( compareFields ); 335 groupHasher = defaultComparator != null || !TupleHasher.isNull( hashers ) ? new TupleHasher( defaultComparator, hashers ) : null; 336 } 337 338 protected Comparator getKeyComparator() 339 { 340 if( groupComparators.length > 0 && groupComparators[ 0 ] != null ) 341 return groupComparators[ 0 ]; 342 343 return new Comparator<Comparable>() 344 { 345 @Override 346 public int compare( Comparable lhs, Comparable rhs ) 347 { 348 return lhs.compareTo( rhs ); 349 } 350 }; 351 } 352 353 @Override 354 public void cleanup() 355 { 356 super.cleanup(); 357 358 // close if top of stack 359 if( next == null ) 360 flowProcess.closeTrapCollectors(); 361 } 362 363 private boolean areNullsEqual() 364 { 365 try 366 { 367 Tuple tupleWithNull = Tuple.size( 1 ); 368 369 return groupComparators[ 0 ].compare( tupleWithNull, tupleWithNull ) == 0; 370 } 371 catch( Exception exception ) 372 { 373 return true; // assume we have an npe or something and they don't expect to see nulls 374 } 375 } 376 377 protected int getNumDeclaredIncomingBranches() 378 { 379 return splice.getPrevious().length; 380 } 381 382 /** 383 * This allows the tuple to honor the hasher and comparators, if any 384 * 385 * @param object the tuple to wrap 386 * @return a DelegatedTuple instance 387 */ 388 protected final Tuple getDelegatedTuple( Tuple object ) 389 { 390 if( groupHasher == null ) 391 return object; 392 393 return new DelegatedTuple( object ); 394 } 395 396 private String printSafe( Fields fields ) 397 { 398 if( fields != null ) 399 return fields.printVerbose(); 400 401 return ""; 402 } 403 404 @Override 405 public final boolean equals( Object object ) 406 { 407 if( this == object ) 408 return true; 409 if( !( object instanceof GroupingSpliceGate ) ) 410 return false; 411 412 GroupingSpliceGate groupingSpliceGate = (GroupingSpliceGate) object; 413 414 if( splice != null ? splice != groupingSpliceGate.splice : groupingSpliceGate.splice != null ) 415 return false; 416 417 return true; 418 } 419 420 @Override 421 public final int hashCode() 422 { 423 return splice != null ? System.identityHashCode( splice ) : 0; 424 } 425 426 @Override 427 public String toString() 428 { 429 final StringBuilder sb = new StringBuilder( "SpliceGate{" ); 430 sb.append( "splice=" ).append( splice ); 431 sb.append( ", role=" ).append( role ); 432 sb.append( '}' ); 433 return sb.toString(); 434 } 435 436 protected class DelegatedTuple extends Tuple 437 { 438 public DelegatedTuple( Tuple wrapped ) 439 { 440 // pass it in to prevent one being allocated 441 super( Tuple.elements( wrapped ) ); 442 } 443 444 @Override 445 public boolean equals( Object object ) 446 { 447 return compareTo( object ) == 0; 448 } 449 450 @Override 451 public int compareTo( Object other ) 452 { 453 return groupComparators[ 0 ].compare( this, (Tuple) other ); 454 } 455 456 @Override 457 public int hashCode() 458 { 459 return groupHasher.hashCode( this ); 460 } 461 } 462 }