001/* 002 * Copyright (c) 2016-2018 Chris K Wensel <chris@wensel.net>. All Rights Reserved. 003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 004 * 005 * Project and contact information: http://www.cascading.org/ 006 * 007 * This file is part of the Cascading project. 008 * 009 * Licensed under the Apache License, Version 2.0 (the "License"); 010 * you may not use this file except in compliance with the License. 011 * You may obtain a copy of the License at 012 * 013 * http://www.apache.org/licenses/LICENSE-2.0 014 * 015 * Unless required by applicable law or agreed to in writing, software 016 * distributed under the License is distributed on an "AS IS" BASIS, 017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 018 * See the License for the specific language governing permissions and 019 * limitations under the License. 020 */ 021 022package cascading.tap.partition; 023 024import java.io.IOException; 025import java.util.ArrayList; 026import java.util.HashSet; 027import java.util.Iterator; 028import java.util.LinkedHashMap; 029import java.util.List; 030import java.util.Map; 031import java.util.Set; 032 033import cascading.flow.Flow; 034import cascading.flow.FlowProcess; 035import cascading.operation.Filter; 036import cascading.scheme.Scheme; 037import cascading.scheme.SinkCall; 038import cascading.scheme.SourceCall; 039import cascading.tap.SinkMode; 040import cascading.tap.Tap; 041import cascading.tap.TapException; 042import cascading.tap.type.FileType; 043import cascading.tuple.Fields; 044import cascading.tuple.Tuple; 045import cascading.tuple.TupleEntry; 046import cascading.tuple.TupleEntryCollector; 047import cascading.tuple.TupleEntryIterableChainIterator; 048import cascading.tuple.TupleEntryIterator; 049import cascading.tuple.TupleEntrySchemeCollector; 050import cascading.tuple.TupleEntrySchemeIterator; 051import cascading.tuple.util.TupleViews; 052import org.slf4j.Logger; 053import org.slf4j.LoggerFactory; 054 055/** 056 * 057 */ 058public abstract class BasePartitionTap<Config, Input, Output> extends Tap<Config, Input, Output> implements FileType<Config> 059 { 060 /** Field LOG */ 061 private static final Logger LOG = LoggerFactory.getLogger( BasePartitionTap.class ); 062 /** Field OPEN_FILES_THRESHOLD_DEFAULT */ 063 protected static final int OPEN_WRITES_THRESHOLD_DEFAULT = 300; 064 065 private class PartitionIterator extends TupleEntryIterableChainIterator 066 { 067 public PartitionIterator( final FlowProcess<? extends Config> flowProcess, Input input ) throws IOException 068 { 069 super( getSourceFields() ); 070 071 List<Iterator<Tuple>> iterators = new ArrayList<>(); 072 073 if( input != null ) 074 { 075 String identifier = parent.getFullIdentifier( flowProcess ); 076 iterators.add( createPartitionEntryIterator( flowProcess, input, identifier, getCurrentIdentifier( flowProcess ) ) ); 077 } 078 else 079 { 080 String[] childIdentifiers = getChildPartitionIdentifiers( flowProcess, false ); 081 082 for( String childIdentifier : childIdentifiers ) 083 iterators.add( createPartitionEntryIterator( flowProcess, null, parent.getIdentifier(), childIdentifier ) ); 084 } 085 086 reset( iterators ); 087 } 088 089 private PartitionTupleEntryIterator createPartitionEntryIterator( FlowProcess<? extends Config> flowProcess, Input input, String parentIdentifier, String childIdentifier ) throws IOException 090 { 091 TupleEntrySchemeIterator schemeIterator = createTupleEntrySchemeIterator( flowProcess, parent, childIdentifier, input ); 092 093 return new PartitionTupleEntryIterator( getSourceFields(), partition, parentIdentifier, childIdentifier, schemeIterator ); 094 } 095 } 096 097 public class PartitionCollector extends TupleEntryCollector 098 { 099 private final FlowProcess<? extends Config> flowProcess; 100 private final Config conf; 101 private final Fields parentFields; 102 private final Fields partitionFields; 103 private TupleEntry partitionEntry; 104 private final Tuple partitionTuple; 105 private final Tuple parentTuple; 106 107 public PartitionCollector( FlowProcess<? extends Config> flowProcess ) 108 { 109 super( Fields.asDeclaration( getSinkFields() ) ); 110 this.flowProcess = flowProcess; 111 this.conf = flowProcess.getConfigCopy(); 112 this.parentFields = parent.getSinkFields(); 113 this.partitionFields = ( (PartitionScheme) getScheme() ).partitionFields; 114 this.partitionEntry = new TupleEntry( this.partitionFields ); 115 116 this.partitionTuple = TupleViews.createNarrow( getSinkFields().getPos( this.partitionFields ) ); 117 this.parentTuple = TupleViews.createNarrow( getSinkFields().getPos( this.parentFields ) ); 118 119 this.partitionEntry.setTuple( partitionTuple ); 120 } 121 122 TupleEntryCollector getCollector( String path ) 123 { 124 TupleEntryCollector collector = collectors.get( path ); 125 126 if( collector != null ) 127 return collector; 128 129 try 130 { 131 if( LOG.isDebugEnabled() ) 132 LOG.debug( "creating collector for parent: {}, path: {}", parent.getFullIdentifier( conf ), path ); 133 134 collector = createTupleEntrySchemeCollector( flowProcess, parent, path, openedCollectors ); 135 136 openedCollectors++; 137 flowProcess.increment( Counters.Paths_Opened, 1 ); 138 } 139 catch( IOException exception ) 140 { 141 throw new TapException( "unable to open partition path: " + path, exception ); 142 } 143 144 if( collectors.size() > openWritesThreshold ) 145 purgeCollectors(); 146 147 collectors.put( path, collector ); 148 149 if( LOG.isInfoEnabled() && collectors.size() % 100 == 0 ) 150 LOG.info( "caching {} open Taps", collectors.size() ); 151 152 return collector; 153 } 154 155 private void purgeCollectors() 156 { 157 int numToClose = Math.max( 1, (int) ( openWritesThreshold * .10 ) ); 158 159 if( LOG.isInfoEnabled() ) 160 LOG.info( "removing {} open Taps from cache of size {}", numToClose, collectors.size() ); 161 162 Set<String> removeKeys = new HashSet<>(); 163 Set<String> keys = collectors.keySet(); 164 165 for( String key : keys ) 166 { 167 if( numToClose-- == 0 ) 168 break; 169 170 removeKeys.add( key ); 171 } 172 173 for( String removeKey : removeKeys ) 174 { 175 closeCollector( removeKey ); 176 collectors.remove( removeKey ); 177 } 178 179 flowProcess.increment( Counters.Path_Purges, 1 ); 180 } 181 182 @Override 183 public void close() 184 { 185 super.close(); 186 187 try 188 { 189 for( String path : new ArrayList<>( collectors.keySet() ) ) 190 closeCollector( path ); 191 } 192 finally 193 { 194 collectors.clear(); 195 } 196 } 197 198 public void closeCollector( String path ) 199 { 200 TupleEntryCollector collector = collectors.get( path ); 201 if( collector == null ) 202 return; 203 try 204 { 205 collector.close(); 206 207 flowProcess.increment( Counters.Paths_Closed, 1 ); 208 } 209 catch( Exception exception ) 210 { 211 LOG.error( "exception while closing TupleEntryCollector {}", path, exception ); 212 213 boolean failOnError = false; 214 Object failProperty = flowProcess.getProperty( PartitionTapProps.FAIL_ON_CLOSE ); 215 216 if( failProperty != null ) 217 failOnError = Boolean.parseBoolean( failProperty.toString() ); 218 219 if( failOnError ) 220 throw new TapException( exception ); 221 } 222 } 223 224 protected void collect( TupleEntry tupleEntry ) throws IOException 225 { 226 // reset select views 227 TupleViews.reset( partitionTuple, tupleEntry.getTuple() ); // partitionTuple is inside partitionEntry 228 TupleViews.reset( parentTuple, tupleEntry.getTuple() ); 229 230 String path = partition.toPartition( partitionEntry ); 231 232 getCollector( path ).add( parentTuple ); 233 } 234 } 235 236 /** Field parent */ 237 protected Tap parent; 238 /** Field partition */ 239 protected Partition partition; 240 /** Field sourcePartitionFilters */ 241 protected final List<PartitionTapFilter> sourcePartitionFilters = new ArrayList<>(); 242 /** Field keepParentOnDelete */ 243 protected boolean keepParentOnDelete = false; 244 /** Field openTapsThreshold */ 245 protected int openWritesThreshold = OPEN_WRITES_THRESHOLD_DEFAULT; 246 247 /** Field openedCollectors */ 248 private long openedCollectors = 0; 249 /** Field collectors */ 250 private final Map<String, TupleEntryCollector> collectors = new LinkedHashMap<>( 1000, .75f, true ); 251 252 protected abstract TupleEntrySchemeCollector createTupleEntrySchemeCollector( FlowProcess<? extends Config> flowProcess, Tap parent, String path, long sequence ) throws IOException; 253 254 protected abstract TupleEntrySchemeIterator createTupleEntrySchemeIterator( FlowProcess<? extends Config> flowProcess, Tap parent, String path, Input input ) throws IOException; 255 256 public enum Counters 257 { 258 Paths_Opened, Paths_Closed, Path_Purges 259 } 260 261 protected BasePartitionTap( Tap parent, Partition partition, int openWritesThreshold ) 262 { 263 super( new PartitionScheme( parent.getScheme(), partition.getPartitionFields() ), parent.getSinkMode() ); 264 this.parent = parent; 265 this.partition = partition; 266 this.openWritesThreshold = openWritesThreshold; 267 } 268 269 protected BasePartitionTap( Tap parent, Partition partition, SinkMode sinkMode ) 270 { 271 super( new PartitionScheme( parent.getScheme(), partition.getPartitionFields() ), sinkMode ); 272 this.parent = parent; 273 this.partition = partition; 274 } 275 276 protected BasePartitionTap( Tap parent, Partition partition, SinkMode sinkMode, boolean keepParentOnDelete, int openWritesThreshold ) 277 { 278 super( new PartitionScheme( parent.getScheme(), partition.getPartitionFields() ), sinkMode ); 279 this.parent = parent; 280 this.partition = partition; 281 this.keepParentOnDelete = keepParentOnDelete; 282 this.openWritesThreshold = openWritesThreshold; 283 } 284 285 /** 286 * Method getParent returns the parent Tap of this PartitionTap object. 287 * 288 * @return the parent (type Tap) of this PartitionTap object. 289 */ 290 public Tap getParent() 291 { 292 return parent; 293 } 294 295 /** 296 * Method getPartition returns the {@link Partition} instance used by this PartitionTap 297 * 298 * @return the partition instance 299 */ 300 public Partition getPartition() 301 { 302 return partition; 303 } 304 305 /** 306 * Method getChildPartitionIdentifiers returns an array of all identifiers for all available partitions. 307 * <p> 308 * This method is used internally to set all incoming paths, override to limit applicable partitions. 309 * <p> 310 * Note the returns array may be large. 311 * 312 * @param flowProcess of type FlowProcess 313 * @param fullyQualified of type boolean 314 * @return a String[] of partition identifiers 315 * @throws IOException 316 */ 317 public String[] getChildPartitionIdentifiers( FlowProcess<? extends Config> flowProcess, boolean fullyQualified ) throws IOException 318 { 319 String[] childIdentifiers = ( castFileType() ).getChildIdentifiers( 320 flowProcess.getConfig(), 321 partition.getPathDepth(), 322 fullyQualified 323 ); 324 325 if( sourcePartitionFilters.isEmpty() ) 326 return childIdentifiers; 327 328 return getFilteredPartitionIdentifiers( flowProcess, childIdentifiers ); 329 } 330 331 protected String[] getFilteredPartitionIdentifiers( FlowProcess<? extends Config> flowProcess, String[] childIdentifiers ) 332 { 333 Fields partitionFields = partition.getPartitionFields(); 334 TupleEntry partitionEntry = new TupleEntry( partitionFields, Tuple.size( partitionFields.size() ) ); 335 336 List<String> filteredIdentifiers = new ArrayList<>( childIdentifiers.length ); 337 338 for( PartitionTapFilter filter : sourcePartitionFilters ) 339 filter.prepare( flowProcess ); 340 341 for( String childIdentifier : childIdentifiers ) 342 { 343 partition.toTuple( childIdentifier.substring( parent.getFullIdentifier( flowProcess ).length() + 1 ), partitionEntry ); 344 345 boolean isRemove = false; 346 for( PartitionTapFilter filter : sourcePartitionFilters ) 347 { 348 if( filter.isRemove( flowProcess, partitionEntry ) ) 349 { 350 isRemove = true; 351 break; 352 } 353 } 354 355 if( !isRemove ) 356 filteredIdentifiers.add( childIdentifier ); 357 } 358 359 for( PartitionTapFilter filter : sourcePartitionFilters ) 360 filter.cleanup( flowProcess ); 361 362 return filteredIdentifiers.toArray( new String[ filteredIdentifiers.size() ] ); 363 } 364 365 /** 366 * Add a {@link Filter} with its associated argument selector when using this PartitionTap as a source. On read, each 367 * child identifier is converted to a {@link Tuple} using the provided {@link Partition}. Each {@link Filter} will be 368 * applied to the {@link Tuple} so that the input paths can be filtered to only accept those required for the 369 * {@link Flow}. 370 * 371 * @param argumentSelector field selector that selects Filter arguments from the input Tuple 372 * @param filter Filter to be applied to each input Tuple 373 */ 374 public void addSourcePartitionFilter( Fields argumentSelector, Filter filter ) 375 { 376 Fields argumentFields; 377 378 if( argumentSelector.isAll() ) 379 argumentFields = partition.getPartitionFields(); 380 else 381 argumentFields = partition.getPartitionFields().select( argumentSelector ); 382 383 sourcePartitionFilters.add( new PartitionTapFilter( argumentFields, filter ) ); 384 } 385 386 @Override 387 public String getIdentifier() 388 { 389 return parent.getIdentifier(); 390 } 391 392 protected abstract String getCurrentIdentifier( FlowProcess<? extends Config> flowProcess ); 393 394 /** 395 * Method getOpenWritesThreshold returns the openTapsThreshold of this PartitionTap object. 396 * 397 * @return the openTapsThreshold (type int) of this PartitionTap object. 398 */ 399 public int getOpenWritesThreshold() 400 { 401 return openWritesThreshold; 402 } 403 404 @Override 405 public TupleEntryCollector openForWrite( FlowProcess<? extends Config> flowProcess, Output output ) throws IOException 406 { 407 return new PartitionCollector( flowProcess ); 408 } 409 410 @Override 411 public TupleEntryIterator openForRead( FlowProcess<? extends Config> flowProcess, Input input ) throws IOException 412 { 413 return new PartitionIterator( flowProcess, input ); 414 } 415 416 @Override 417 public boolean createResource( Config conf ) throws IOException 418 { 419 return parent.createResource( conf ); 420 } 421 422 @Override 423 public boolean deleteResource( Config conf ) throws IOException 424 { 425 return keepParentOnDelete || parent.deleteResource( conf ); 426 } 427 428 @Override 429 public boolean prepareResourceForRead( Config conf ) throws IOException 430 { 431 return parent.prepareResourceForRead( conf ); 432 } 433 434 @Override 435 public boolean prepareResourceForWrite( Config conf ) throws IOException 436 { 437 return parent.prepareResourceForWrite( conf ); 438 } 439 440 @Override 441 public boolean commitResource( Config conf ) throws IOException 442 { 443 return parent.commitResource( conf ); 444 } 445 446 @Override 447 public boolean rollbackResource( Config conf ) throws IOException 448 { 449 return parent.rollbackResource( conf ); 450 } 451 452 @Override 453 public boolean resourceExists( Config conf ) throws IOException 454 { 455 return parent.resourceExists( conf ); 456 } 457 458 @Override 459 public long getModifiedTime( Config conf ) throws IOException 460 { 461 return parent.getModifiedTime( conf ); 462 } 463 464 @Override 465 public boolean isDirectory( FlowProcess<? extends Config> flowProcess ) throws IOException 466 { 467 return castFileType().isDirectory( flowProcess ); 468 } 469 470 @Override 471 public boolean isDirectory( Config conf ) throws IOException 472 { 473 return castFileType().isDirectory( conf ); 474 } 475 476 @Override 477 public String[] getChildIdentifiers( FlowProcess<? extends Config> flowProcess ) throws IOException 478 { 479 return castFileType().getChildIdentifiers( flowProcess ); 480 } 481 482 @Override 483 public String[] getChildIdentifiers( Config conf ) throws IOException 484 { 485 return castFileType().getChildIdentifiers( conf ); 486 } 487 488 @Override 489 public String[] getChildIdentifiers( FlowProcess<? extends Config> flowProcess, int depth, boolean fullyQualified ) throws IOException 490 { 491 return castFileType().getChildIdentifiers( flowProcess, depth, fullyQualified ); 492 } 493 494 @Override 495 public String[] getChildIdentifiers( Config conf, int depth, boolean fullyQualified ) throws IOException 496 { 497 return castFileType().getChildIdentifiers( conf, depth, fullyQualified ); 498 } 499 500 @Override 501 public long getSize( FlowProcess<? extends Config> flowProcess ) throws IOException 502 { 503 return castFileType().getSize( flowProcess ); 504 } 505 506 @Override 507 public long getSize( Config conf ) throws IOException 508 { 509 return castFileType().getSize( conf ); 510 } 511 512 protected FileType<Config> castFileType() 513 { 514 if( parent instanceof FileType ) 515 return (FileType<Config>) parent; 516 517 throw new UnsupportedOperationException( "parent is not an implementation of " + FileType.class.getName() + ", is type: " + parent.getClass().getName() ); 518 } 519 520 @Override 521 public boolean equals( Object object ) 522 { 523 if( this == object ) 524 return true; 525 if( object == null || getClass() != object.getClass() ) 526 return false; 527 if( !super.equals( object ) ) 528 return false; 529 530 BasePartitionTap that = (BasePartitionTap) object; 531 532 if( parent != null ? !parent.equals( that.parent ) : that.parent != null ) 533 return false; 534 if( partition != null ? !partition.equals( that.partition ) : that.partition != null ) 535 return false; 536 if( partition != null ? !sourcePartitionFilters.equals( that.sourcePartitionFilters ) : that.sourcePartitionFilters != null ) 537 return false; 538 539 return true; 540 } 541 542 @Override 543 public int hashCode() 544 { 545 int result = super.hashCode(); 546 result = 31 * result + ( parent != null ? parent.hashCode() : 0 ); 547 result = 31 * result + ( partition != null ? partition.hashCode() : 0 ); 548 result = 31 * result + ( sourcePartitionFilters != null ? sourcePartitionFilters.hashCode() : 0 ); 549 return result; 550 } 551 552 @Override 553 public String toString() 554 { 555 return getClass().getSimpleName() + "[\"" + parent + "\"]" + "[\"" + partition + "\"]" + "[\"" + sourcePartitionFilters + "\"]"; 556 } 557 558 public static class PartitionScheme<Config, Input, Output> extends Scheme<Config, Input, Output, Void, Void> 559 { 560 private final Scheme scheme; 561 private final Fields partitionFields; 562 563 public PartitionScheme( Scheme scheme ) 564 { 565 this.scheme = scheme; 566 this.partitionFields = null; 567 } 568 569 public PartitionScheme( Scheme scheme, Fields partitionFields ) 570 { 571 this.scheme = scheme; 572 573 if( partitionFields == null || partitionFields.isAll() ) 574 this.partitionFields = null; 575 else if( partitionFields.isDefined() ) 576 this.partitionFields = partitionFields; 577 else 578 throw new IllegalArgumentException( "partitionFields must be defined or the ALL substitution, got: " + partitionFields.printVerbose() ); 579 } 580 581 public Fields getSinkFields() 582 { 583 if( partitionFields == null || scheme.getSinkFields().isAll() ) 584 return scheme.getSinkFields(); 585 586 return Fields.merge( scheme.getSinkFields(), partitionFields ); 587 } 588 589 public void setSinkFields( Fields sinkFields ) 590 { 591 scheme.setSinkFields( sinkFields ); 592 } 593 594 @Override 595 public Fields retrieveSourceFields( FlowProcess<? extends Config> flowProcess, Tap tap ) 596 { 597 return scheme.retrieveSourceFields( flowProcess, tap ); 598 } 599 600 @Override 601 public Fields retrieveSinkFields( FlowProcess<? extends Config> flowProcess, Tap tap ) 602 { 603 return scheme.retrieveSinkFields( flowProcess, tap ); 604 } 605 606 public Fields getSourceFields() 607 { 608 if( partitionFields == null || scheme.getSourceFields().isUnknown() ) 609 return scheme.getSourceFields(); 610 611 return Fields.merge( scheme.getSourceFields(), partitionFields ); 612 } 613 614 public void setSourceFields( Fields sourceFields ) 615 { 616 scheme.setSourceFields( sourceFields ); 617 } 618 619 public int getNumSinkParts() 620 { 621 return scheme.getNumSinkParts(); 622 } 623 624 public void setNumSinkParts( int numSinkParts ) 625 { 626 scheme.setNumSinkParts( numSinkParts ); 627 } 628 629 @Override 630 public void sourceConfInit( FlowProcess<? extends Config> flowProcess, Tap<Config, Input, Output> tap, Config conf ) 631 { 632 scheme.sourceConfInit( flowProcess, tap, conf ); 633 } 634 635 @Override 636 public void sourcePrepare( FlowProcess<? extends Config> flowProcess, SourceCall<Void, Input> sourceCall ) throws IOException 637 { 638 scheme.sourcePrepare( flowProcess, sourceCall ); 639 } 640 641 @Override 642 public boolean source( FlowProcess<? extends Config> flowProcess, SourceCall<Void, Input> sourceCall ) throws IOException 643 { 644 throw new UnsupportedOperationException( "should never be called" ); 645 } 646 647 @Override 648 public void sourceCleanup( FlowProcess<? extends Config> flowProcess, SourceCall<Void, Input> sourceCall ) throws IOException 649 { 650 scheme.sourceCleanup( flowProcess, sourceCall ); 651 } 652 653 @Override 654 public void sinkConfInit( FlowProcess<? extends Config> flowProcess, Tap<Config, Input, Output> tap, Config conf ) 655 { 656 scheme.sinkConfInit( flowProcess, tap, conf ); 657 } 658 659 @Override 660 public void sinkPrepare( FlowProcess<? extends Config> flowProcess, SinkCall<Void, Output> sinkCall ) throws IOException 661 { 662 scheme.sinkPrepare( flowProcess, sinkCall ); 663 } 664 665 @Override 666 public void sink( FlowProcess<? extends Config> flowProcess, SinkCall<Void, Output> sinkCall ) throws IOException 667 { 668 throw new UnsupportedOperationException( "should never be called" ); 669 } 670 671 @Override 672 public void sinkCleanup( FlowProcess<? extends Config> flowProcess, SinkCall<Void, Output> sinkCall ) throws IOException 673 { 674 scheme.sinkCleanup( flowProcess, sinkCall ); 675 } 676 } 677 }