001/* 002 * Copyright (c) 2016 Chris K Wensel <chris@wensel.net>. All Rights Reserved. 003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 004 * 005 * Project and contact information: http://www.cascading.org/ 006 * 007 * This file is part of the Cascading project. 008 * 009 * Licensed under the Apache License, Version 2.0 (the "License"); 010 * you may not use this file except in compliance with the License. 011 * You may obtain a copy of the License at 012 * 013 * http://www.apache.org/licenses/LICENSE-2.0 014 * 015 * Unless required by applicable law or agreed to in writing, software 016 * distributed under the License is distributed on an "AS IS" BASIS, 017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 018 * See the License for the specific language governing permissions and 019 * limitations under the License. 020 */ 021 022package cascading.flow.stream.element; 023 024import java.util.Comparator; 025 026import cascading.flow.FlowProcess; 027import cascading.flow.FlowProps; 028import cascading.flow.planner.Scope; 029import cascading.flow.stream.duct.Grouping; 030import cascading.flow.stream.duct.Window; 031import cascading.flow.stream.graph.IORole; 032import cascading.flow.stream.util.SparseTupleComparator; 033import cascading.pipe.Splice; 034import cascading.tuple.Fields; 035import cascading.tuple.Tuple; 036import cascading.tuple.TupleEntry; 037import cascading.tuple.TupleEntryChainIterator; 038import cascading.tuple.TupleEntryIterator; 039import cascading.tuple.Tuples; 040import cascading.tuple.io.KeyTuple; 041import cascading.tuple.io.TuplePair; 042import cascading.tuple.io.ValueTuple; 043import cascading.tuple.util.Resettable1; 044import cascading.tuple.util.Resettable2; 045import cascading.tuple.util.TupleBuilder; 046import cascading.tuple.util.TupleHasher; 047import cascading.util.NullSafeReverseComparator; 048import org.slf4j.Logger; 049import org.slf4j.LoggerFactory; 050 051import static cascading.tuple.util.TupleViews.*; 052 053/** 054 * 055 */ 056public abstract class GroupingSpliceGate extends SpliceGate<TupleEntry, Grouping<TupleEntry, TupleEntryIterator>> implements Window 057 { 058 private static final Logger LOG = LoggerFactory.getLogger( GroupingSpliceGate.class ); 059 060 protected Fields[] keyFields; 061 protected Fields[] sortFields; 062 protected Fields[] valuesFields; 063 064 protected Comparator<Tuple>[] groupComparators; 065 protected Comparator<Tuple>[] valueComparators; 066 protected TupleHasher groupHasher; 067 protected boolean nullsAreNotEqual; 068 069 protected TupleBuilder[] keyBuilder; 070 protected TupleBuilder[] valuesBuilder; 071 protected TupleBuilder[] sortBuilder; 072 073 // as sink 074 protected Tuple keyTuple; // alias for groupTuple or groupSortTuple 075 protected Resettable1<Tuple> groupTuple; 076 protected Resettable2<Tuple, Tuple> groupSortTuple; 077 protected Resettable1<Tuple> valueTuple; 078 079 // as source 080 protected Grouping<TupleEntry, TupleEntryIterator> grouping; 081 protected TupleEntry keyEntry; 082 protected TupleEntryChainIterator tupleEntryIterator; 083 084 protected GroupingSpliceGate( FlowProcess flowProcess, Splice splice ) 085 { 086 super( flowProcess, splice ); 087 } 088 089 protected GroupingSpliceGate( FlowProcess flowProcess, Splice splice, IORole role ) 090 { 091 super( flowProcess, splice, role ); 092 } 093 094 protected TupleBuilder createNarrowBuilder( final Fields incomingFields, final Fields narrowFields ) 095 { 096 if( narrowFields.isNone() ) 097 return new TupleBuilder() 098 { 099 @Override 100 public Tuple makeResult( Tuple input, Tuple output ) 101 { 102 return Tuple.NULL; 103 } 104 }; 105 106 if( incomingFields.isUnknown() ) 107 return new TupleBuilder() 108 { 109 @Override 110 public Tuple makeResult( Tuple input, Tuple output ) 111 { 112 return input.get( incomingFields, narrowFields ); 113 } 114 }; 115 116 if( narrowFields.isAll() ) // dubious this is ever reached 117 return new TupleBuilder() 118 { 119 @Override 120 public Tuple makeResult( Tuple input, Tuple output ) 121 { 122 return input; 123 } 124 }; 125 126 return createDefaultNarrowBuilder( incomingFields, narrowFields ); 127 } 128 129 protected TupleBuilder createDefaultNarrowBuilder( final Fields incomingFields, final Fields narrowFields ) 130 { 131 return new TupleBuilder() 132 { 133 Tuple result = createNarrow( incomingFields.getPos( narrowFields ) ); 134 135 @Override 136 public Tuple makeResult( Tuple input, Tuple output ) 137 { 138 return reset( result, input ); 139 } 140 }; 141 } 142 143 protected TupleBuilder createNulledBuilder( final Fields incomingFields, final Fields keyField ) 144 { 145 if( incomingFields.isUnknown() ) 146 return new TupleBuilder() 147 { 148 @Override 149 public Tuple makeResult( Tuple input, Tuple output ) 150 { 151 return Tuples.nulledCopy( incomingFields, input, keyField ); 152 } 153 }; 154 155 if( keyField.isNone() ) 156 return new TupleBuilder() 157 { 158 @Override 159 public Tuple makeResult( Tuple input, Tuple output ) 160 { 161 return input; 162 } 163 }; 164 165 if( keyField.isAll() ) 166 return new TupleBuilder() 167 { 168 Tuple nullTuple = Tuple.size( incomingFields.size() ); 169 170 @Override 171 public Tuple makeResult( Tuple input, Tuple output ) 172 { 173 return nullTuple; 174 } 175 }; 176 177 return new TupleBuilder() 178 { 179 Tuple nullTuple = Tuple.size( keyField.size() ); 180 Tuple result = createOverride( incomingFields, keyField ); 181 182 @Override 183 public Tuple makeResult( Tuple baseTuple, Tuple output ) 184 { 185 return reset( result, baseTuple, nullTuple ); 186 } 187 }; 188 } 189 190 @Override 191 public void initialize() 192 { 193 super.initialize(); 194 195 int size = getNumDeclaredIncomingBranches(); // is the maximum ordinal value 196 197 // this is a merge, all fields have the same declaration 198 // filling out full array has implications on joiner/closure which should be resolved independently 199 if( role == IORole.source && splice.isGroupBy() ) 200 size = 1; 201 202 keyFields = new Fields[ size ]; 203 valuesFields = new Fields[ size ]; 204 205 keyBuilder = new TupleBuilder[ size ]; 206 valuesBuilder = new TupleBuilder[ size ]; 207 208 if( splice.isSorted() ) 209 { 210 sortFields = new Fields[ size ]; 211 sortBuilder = new TupleBuilder[ size ]; 212 } 213 214 Scope outgoingScope = outgoingScopes.get( 0 ); 215 216 int numScopes = Math.min( size, incomingScopes.size() ); 217 for( int i = 0; i < numScopes; i++ ) 218 { 219 Scope incomingScope = incomingScopes.get( i ); 220 221 // for GroupBy, incoming may have same name, but guaranteed to have same key/value/sort fields for merge 222 // arrays may be size 1, then ordinal should always be zero. 223 int ordinal = size == 1 ? 0 : incomingScope.getOrdinal(); 224 225 keyFields[ ordinal ] = outgoingScope.getKeySelectors().get( incomingScope.getName() ); 226 valuesFields[ ordinal ] = incomingScope.getIncomingSpliceFields(); 227 228 keyBuilder[ ordinal ] = createNarrowBuilder( incomingScope.getIncomingSpliceFields(), keyFields[ ordinal ] ); 229 valuesBuilder[ ordinal ] = createNulledBuilder( incomingScope.getIncomingSpliceFields(), keyFields[ ordinal ] ); 230 231 if( sortFields != null ) 232 { 233 sortFields[ ordinal ] = outgoingScope.getSortingSelectors().get( incomingScope.getName() ); 234 sortBuilder[ ordinal ] = createNarrowBuilder( incomingScope.getIncomingSpliceFields(), sortFields[ ordinal ] ); 235 } 236 237 if( LOG.isDebugEnabled() ) 238 { 239 LOG.debug( "incomingScope: {}, in pos: {}", incomingScope.getName(), ordinal ); 240 LOG.debug( "keyFields: {}", printSafe( keyFields[ ordinal ] ) ); 241 LOG.debug( "valueFields: {}", printSafe( valuesFields[ ordinal ] ) ); 242 243 if( sortFields != null ) 244 LOG.debug( "sortFields: {}", printSafe( sortFields[ ordinal ] ) ); 245 } 246 } 247 248 if( role == IORole.sink ) 249 { 250 if( sortFields == null ) 251 groupTuple = new KeyTuple(); 252 else 253 groupSortTuple = new TuplePair(); 254 255 keyTuple = (Tuple) ( sortFields == null ? groupTuple : groupSortTuple ); 256 valueTuple = new ValueTuple(); 257 258 return; 259 } 260 261 keyEntry = new TupleEntry( outgoingScope.getOutGroupingFields(), true ); 262 tupleEntryIterator = new TupleEntryChainIterator( outgoingScope.getOutValuesFields() ); 263 264 grouping = new Grouping<>(); 265 grouping.key = keyEntry; 266 grouping.joinIterator = tupleEntryIterator; 267 } 268 269 protected void initComparators() 270 { 271 Comparator defaultComparator = (Comparator) flowProcess.newInstance( (String) flowProcess.getProperty( FlowProps.DEFAULT_ELEMENT_COMPARATOR ) ); 272 273 Fields[] compareFields = new Fields[ getNumDeclaredIncomingBranches() ]; 274 groupComparators = new Comparator[ getNumDeclaredIncomingBranches() ]; 275 276 if( splice.isSorted() ) 277 valueComparators = new Comparator[ getNumDeclaredIncomingBranches() ]; 278 279 int size = splice.isGroupBy() ? 1 : getNumDeclaredIncomingBranches(); 280 281 for( int i = 0; i < size; i++ ) 282 { 283 Scope incomingScope = incomingScopes.get( i ); 284 285 int pos = splice.isGroupBy() ? 0 : splice.getPipePos().get( incomingScope.getName() ); 286 287 // we want the comparators 288 Fields groupFields = splice.getKeySelectors().get( incomingScope.getName() ); 289 290 compareFields[ pos ] = groupFields; // used for finding hashers 291 292 if( groupFields.size() == 0 ) 293 groupComparators[ pos ] = groupFields; 294 else 295 groupComparators[ pos ] = new SparseTupleComparator( Fields.asDeclaration( groupFields ), defaultComparator ); 296 297 groupComparators[ pos ] = splice.isSortReversed() ? NullSafeReverseComparator.reverseOrder( groupComparators[ pos ] ) : groupComparators[ pos ]; 298 299 if( sortFields != null ) 300 { 301 // we want the comparators, so don't use sortFields array 302 Fields sortFields = splice.getSortingSelectors().get( incomingScope.getName() ); 303 valueComparators[ pos ] = new SparseTupleComparator( valuesFields[ pos ], sortFields, defaultComparator ); 304 305 if( splice.isSortReversed() ) 306 valueComparators[ pos ] = NullSafeReverseComparator.reverseOrder( valueComparators[ pos ] ); 307 } 308 } 309 310 nullsAreNotEqual = !areNullsEqual(); 311 312 if( nullsAreNotEqual ) 313 LOG.debug( "treating null values in Tuples at not equal during grouping" ); 314 315 Comparator[] hashers = TupleHasher.merge( compareFields ); 316 groupHasher = defaultComparator != null || !TupleHasher.isNull( hashers ) ? new TupleHasher( defaultComparator, hashers ) : null; 317 } 318 319 protected Comparator getKeyComparator() 320 { 321 if( groupComparators.length > 0 && groupComparators[ 0 ] != null ) 322 return groupComparators[ 0 ]; 323 324 return new Comparator<Comparable>() 325 { 326 @Override 327 public int compare( Comparable lhs, Comparable rhs ) 328 { 329 return lhs.compareTo( rhs ); 330 } 331 }; 332 } 333 334 @Override 335 public void cleanup() 336 { 337 super.cleanup(); 338 339 // close if top of stack 340 if( next == null ) 341 flowProcess.closeTrapCollectors(); 342 } 343 344 private boolean areNullsEqual() 345 { 346 try 347 { 348 Tuple tupleWithNull = Tuple.size( 1 ); 349 350 return groupComparators[ 0 ].compare( tupleWithNull, tupleWithNull ) == 0; 351 } 352 catch( Exception exception ) 353 { 354 return true; // assume we have an npe or something and they don't expect to see nulls 355 } 356 } 357 358 protected int getNumDeclaredIncomingBranches() 359 { 360 return splice.getPrevious().length; 361 } 362 363 /** 364 * This allows the tuple to honor the hasher and comparators, if any 365 * 366 * @param object the tuple to wrap 367 * @return a DelegatedTuple instance 368 */ 369 protected final Tuple getDelegatedTuple( Tuple object ) 370 { 371 if( groupHasher == null ) 372 return object; 373 374 return new DelegatedTuple( object ); 375 } 376 377 private String printSafe( Fields fields ) 378 { 379 if( fields != null ) 380 return fields.printVerbose(); 381 382 return ""; 383 } 384 385 protected class DelegatedTuple extends Tuple 386 { 387 public DelegatedTuple( Tuple wrapped ) 388 { 389 // pass it in to prevent one being allocated 390 super( Tuple.elements( wrapped ) ); 391 } 392 393 @Override 394 public boolean equals( Object object ) 395 { 396 return compareTo( object ) == 0; 397 } 398 399 @Override 400 public int compareTo( Object other ) 401 { 402 return groupComparators[ 0 ].compare( this, (Tuple) other ); 403 } 404 405 @Override 406 public int hashCode() 407 { 408 return groupHasher.hashCode( this ); 409 } 410 } 411 }