001/* 002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.tap.partition; 022 023import java.io.IOException; 024import java.util.ArrayList; 025import java.util.HashSet; 026import java.util.Iterator; 027import java.util.LinkedHashMap; 028import java.util.List; 029import java.util.Map; 030import java.util.Set; 031 032import cascading.flow.Flow; 033import cascading.flow.FlowProcess; 034import cascading.operation.Filter; 035import cascading.scheme.Scheme; 036import cascading.scheme.SinkCall; 037import cascading.scheme.SourceCall; 038import cascading.tap.SinkMode; 039import cascading.tap.Tap; 040import cascading.tap.TapException; 041import cascading.tap.type.FileType; 042import cascading.tuple.Fields; 043import cascading.tuple.Tuple; 044import cascading.tuple.TupleEntry; 045import cascading.tuple.TupleEntryCollector; 046import cascading.tuple.TupleEntryIterableChainIterator; 047import cascading.tuple.TupleEntryIterator; 048import cascading.tuple.TupleEntrySchemeCollector; 049import cascading.tuple.TupleEntrySchemeIterator; 050import cascading.tuple.util.TupleViews; 051import org.slf4j.Logger; 052import org.slf4j.LoggerFactory; 053 054/** 055 * 056 */ 057public abstract class BasePartitionTap<Config, Input, Output> extends Tap<Config, Input, Output> 058 { 059 /** Field LOG */ 060 private static final Logger LOG = LoggerFactory.getLogger( BasePartitionTap.class ); 061 /** Field OPEN_FILES_THRESHOLD_DEFAULT */ 062 protected static final int OPEN_WRITES_THRESHOLD_DEFAULT = 300; 063 064 private class PartitionIterator extends TupleEntryIterableChainIterator 065 { 066 public PartitionIterator( final FlowProcess<? extends Config> flowProcess, Input input ) throws IOException 067 { 068 super( getSourceFields() ); 069 070 List<Iterator<Tuple>> iterators = new ArrayList<Iterator<Tuple>>(); 071 072 if( input != null ) 073 { 074 String identifier = parent.getFullIdentifier( flowProcess ); 075 iterators.add( createPartitionEntryIterator( flowProcess, input, identifier, getCurrentIdentifier( flowProcess ) ) ); 076 } 077 else 078 { 079 String[] childIdentifiers = getChildPartitionIdentifiers( flowProcess, false ); 080 081 for( String childIdentifier : childIdentifiers ) 082 iterators.add( createPartitionEntryIterator( flowProcess, null, parent.getIdentifier(), childIdentifier ) ); 083 } 084 085 reset( iterators ); 086 } 087 088 private PartitionTupleEntryIterator createPartitionEntryIterator( FlowProcess<? extends Config> flowProcess, Input input, String parentIdentifier, String childIdentifier ) throws IOException 089 { 090 TupleEntrySchemeIterator schemeIterator = createTupleEntrySchemeIterator( flowProcess, parent, childIdentifier, input ); 091 092 return new PartitionTupleEntryIterator( getSourceFields(), partition, parentIdentifier, childIdentifier, schemeIterator ); 093 } 094 } 095 096 public class PartitionCollector extends TupleEntryCollector 097 { 098 private final FlowProcess<? extends Config> flowProcess; 099 private final Config conf; 100 private final Fields parentFields; 101 private final Fields partitionFields; 102 private TupleEntry partitionEntry; 103 private final Tuple partitionTuple; 104 private final Tuple parentTuple; 105 106 public PartitionCollector( FlowProcess<? extends Config> flowProcess ) 107 { 108 super( Fields.asDeclaration( getSinkFields() ) ); 109 this.flowProcess = flowProcess; 110 this.conf = flowProcess.getConfigCopy(); 111 this.parentFields = parent.getSinkFields(); 112 this.partitionFields = ( (PartitionScheme) getScheme() ).partitionFields; 113 this.partitionEntry = new TupleEntry( this.partitionFields ); 114 115 this.partitionTuple = TupleViews.createNarrow( getSinkFields().getPos( this.partitionFields ) ); 116 this.parentTuple = TupleViews.createNarrow( getSinkFields().getPos( this.parentFields ) ); 117 118 this.partitionEntry.setTuple( partitionTuple ); 119 } 120 121 TupleEntryCollector getCollector( String path ) 122 { 123 TupleEntryCollector collector = collectors.get( path ); 124 125 if( collector != null ) 126 return collector; 127 128 try 129 { 130 LOG.debug( "creating collector for parent: {}, path: {}", parent.getFullIdentifier( conf ), path ); 131 132 collector = createTupleEntrySchemeCollector( flowProcess, parent, path, openedCollectors ); 133 134 openedCollectors++; 135 flowProcess.increment( Counters.Paths_Opened, 1 ); 136 } 137 catch( IOException exception ) 138 { 139 throw new TapException( "unable to open partition path: " + path, exception ); 140 } 141 142 if( collectors.size() > openWritesThreshold ) 143 purgeCollectors(); 144 145 collectors.put( path, collector ); 146 147 if( LOG.isInfoEnabled() && collectors.size() % 100 == 0 ) 148 LOG.info( "caching {} open Taps", collectors.size() ); 149 150 return collector; 151 } 152 153 private void purgeCollectors() 154 { 155 int numToClose = Math.max( 1, (int) ( openWritesThreshold * .10 ) ); 156 157 if( LOG.isInfoEnabled() ) 158 LOG.info( "removing {} open Taps from cache of size {}", numToClose, collectors.size() ); 159 160 Set<String> removeKeys = new HashSet<String>(); 161 Set<String> keys = collectors.keySet(); 162 163 for( String key : keys ) 164 { 165 if( numToClose-- == 0 ) 166 break; 167 168 removeKeys.add( key ); 169 } 170 171 for( String removeKey : removeKeys ) 172 { 173 closeCollector( removeKey ); 174 collectors.remove( removeKey ); 175 } 176 177 flowProcess.increment( Counters.Path_Purges, 1 ); 178 } 179 180 @Override 181 public void close() 182 { 183 super.close(); 184 185 try 186 { 187 for( String path : new ArrayList<String>( collectors.keySet() ) ) 188 closeCollector( path ); 189 } 190 finally 191 { 192 collectors.clear(); 193 } 194 } 195 196 public void closeCollector( String path ) 197 { 198 TupleEntryCollector collector = collectors.get( path ); 199 if( collector == null ) 200 return; 201 try 202 { 203 collector.close(); 204 205 flowProcess.increment( Counters.Paths_Closed, 1 ); 206 } 207 catch( Exception exception ) 208 { 209 LOG.error( "exception while closing TupleEntryCollector {}", path, exception ); 210 211 boolean failOnError = false; 212 Object failProperty = flowProcess.getProperty( PartitionTapProps.FAIL_ON_CLOSE ); 213 214 if( failProperty != null ) 215 failOnError = Boolean.parseBoolean( failProperty.toString() ); 216 217 if( failOnError ) 218 throw new TapException( exception ); 219 } 220 } 221 222 protected void collect( TupleEntry tupleEntry ) throws IOException 223 { 224 // reset select views 225 TupleViews.reset( partitionTuple, tupleEntry.getTuple() ); // partitionTuple is inside partitionEntry 226 TupleViews.reset( parentTuple, tupleEntry.getTuple() ); 227 228 String path = partition.toPartition( partitionEntry ); 229 230 getCollector( path ).add( parentTuple ); 231 } 232 } 233 234 /** Field parent */ 235 protected Tap parent; 236 /** Field partition */ 237 protected Partition partition; 238 /** Field sourcePartitionFilters */ 239 protected final List<PartitionTapFilter> sourcePartitionFilters = new ArrayList<>(); 240 /** Field keepParentOnDelete */ 241 protected boolean keepParentOnDelete = false; 242 /** Field openTapsThreshold */ 243 protected int openWritesThreshold = OPEN_WRITES_THRESHOLD_DEFAULT; 244 245 /** Field openedCollectors */ 246 private long openedCollectors = 0; 247 /** Field collectors */ 248 private final Map<String, TupleEntryCollector> collectors = new LinkedHashMap<String, TupleEntryCollector>( 1000, .75f, true ); 249 250 protected abstract TupleEntrySchemeCollector createTupleEntrySchemeCollector( FlowProcess<? extends Config> flowProcess, Tap parent, String path, long sequence ) throws IOException; 251 252 protected abstract TupleEntrySchemeIterator createTupleEntrySchemeIterator( FlowProcess<? extends Config> flowProcess, Tap parent, String path, Input input ) throws IOException; 253 254 public enum Counters 255 { 256 Paths_Opened, Paths_Closed, Path_Purges 257 } 258 259 protected BasePartitionTap( Tap parent, Partition partition, int openWritesThreshold ) 260 { 261 super( new PartitionScheme( parent.getScheme(), partition.getPartitionFields() ), parent.getSinkMode() ); 262 this.parent = parent; 263 this.partition = partition; 264 this.openWritesThreshold = openWritesThreshold; 265 } 266 267 protected BasePartitionTap( Tap parent, Partition partition, SinkMode sinkMode ) 268 { 269 super( new PartitionScheme( parent.getScheme(), partition.getPartitionFields() ), sinkMode ); 270 this.parent = parent; 271 this.partition = partition; 272 } 273 274 protected BasePartitionTap( Tap parent, Partition partition, SinkMode sinkMode, boolean keepParentOnDelete, int openWritesThreshold ) 275 { 276 super( new PartitionScheme( parent.getScheme(), partition.getPartitionFields() ), sinkMode ); 277 this.parent = parent; 278 this.partition = partition; 279 this.keepParentOnDelete = keepParentOnDelete; 280 this.openWritesThreshold = openWritesThreshold; 281 } 282 283 /** 284 * Method getParent returns the parent Tap of this PartitionTap object. 285 * 286 * @return the parent (type Tap) of this PartitionTap object. 287 */ 288 public Tap getParent() 289 { 290 return parent; 291 } 292 293 /** 294 * Method getPartition returns the {@link Partition} instance used by this PartitionTap 295 * 296 * @return the partition instance 297 */ 298 public Partition getPartition() 299 { 300 return partition; 301 } 302 303 /** 304 * Method getChildPartitionIdentifiers returns an array of all identifiers for all available partitions. 305 * <p/> 306 * This method is used internally to set all incoming paths, override to limit applicable partitions. 307 * <p/> 308 * Note the returns array may be large. 309 * 310 * @param flowProcess of type FlowProcess 311 * @param fullyQualified of type boolean 312 * @return a String[] of partition identifiers 313 * @throws IOException 314 */ 315 public String[] getChildPartitionIdentifiers( FlowProcess<? extends Config> flowProcess, boolean fullyQualified ) throws IOException 316 { 317 String[] childIdentifiers = ( (FileType) parent ).getChildIdentifiers( 318 flowProcess.getConfig(), 319 partition.getPathDepth(), 320 fullyQualified 321 ); 322 323 if( sourcePartitionFilters.isEmpty() ) 324 return childIdentifiers; 325 326 return getFilteredPartitionIdentifiers( flowProcess, childIdentifiers ); 327 } 328 329 protected String[] getFilteredPartitionIdentifiers( FlowProcess<? extends Config> flowProcess, String[] childIdentifiers ) 330 { 331 Fields partitionFields = partition.getPartitionFields(); 332 TupleEntry partitionEntry = new TupleEntry( partitionFields, Tuple.size( partitionFields.size() ) ); 333 334 List<String> filteredIdentifiers = new ArrayList<>( childIdentifiers.length ); 335 336 for( PartitionTapFilter filter : sourcePartitionFilters ) 337 filter.prepare( flowProcess ); 338 339 for( String childIdentifier : childIdentifiers ) 340 { 341 partition.toTuple( childIdentifier.substring( parent.getFullIdentifier( flowProcess ).length() + 1 ), partitionEntry ); 342 343 boolean isRemove = false; 344 for( PartitionTapFilter filter : sourcePartitionFilters ) 345 { 346 if( filter.isRemove( flowProcess, partitionEntry ) ) 347 { 348 isRemove = true; 349 break; 350 } 351 } 352 353 if( !isRemove ) 354 filteredIdentifiers.add( childIdentifier ); 355 } 356 357 for( PartitionTapFilter filter : sourcePartitionFilters ) 358 filter.cleanup( flowProcess ); 359 360 return filteredIdentifiers.toArray( new String[ filteredIdentifiers.size() ] ); 361 } 362 363 /** 364 * Add a {@link Filter} with its associated argument selector when using this PartitionTap as a source. On read, each 365 * child identifier is converted to a {@link Tuple} using the provided {@link Partition}. Each {@link Filter} will be 366 * applied to the {@link Tuple} so that the input paths can be filtered to only accept those required for the 367 * {@link Flow}. 368 * 369 * @param argumentSelector field selector that selects Filter arguments from the input Tuple 370 * @param filter Filter to be applied to each input Tuple 371 */ 372 public void addSourcePartitionFilter( Fields argumentSelector, Filter filter ) 373 { 374 Fields argumentFields; 375 376 if( argumentSelector.isAll() ) 377 argumentFields = partition.getPartitionFields(); 378 else 379 argumentFields = partition.getPartitionFields().select( argumentSelector ); 380 381 sourcePartitionFilters.add( new PartitionTapFilter( argumentFields, filter ) ); 382 } 383 384 @Override 385 public String getIdentifier() 386 { 387 return parent.getIdentifier(); 388 } 389 390 protected abstract String getCurrentIdentifier( FlowProcess<? extends Config> flowProcess ); 391 392 /** 393 * Method getOpenWritesThreshold returns the openTapsThreshold of this PartitionTap object. 394 * 395 * @return the openTapsThreshold (type int) of this PartitionTap object. 396 */ 397 public int getOpenWritesThreshold() 398 { 399 return openWritesThreshold; 400 } 401 402 @Override 403 public TupleEntryCollector openForWrite( FlowProcess<? extends Config> flowProcess, Output output ) throws IOException 404 { 405 return new PartitionCollector( flowProcess ); 406 } 407 408 @Override 409 public TupleEntryIterator openForRead( FlowProcess<? extends Config> flowProcess, Input input ) throws IOException 410 { 411 return new PartitionIterator( flowProcess, input ); 412 } 413 414 @Override 415 public boolean createResource( Config conf ) throws IOException 416 { 417 return parent.createResource( conf ); 418 } 419 420 @Override 421 public boolean deleteResource( Config conf ) throws IOException 422 { 423 return keepParentOnDelete || parent.deleteResource( conf ); 424 } 425 426 @Override 427 public boolean prepareResourceForRead( Config conf ) throws IOException 428 { 429 return parent.prepareResourceForRead( conf ); 430 } 431 432 @Override 433 public boolean prepareResourceForWrite( Config conf ) throws IOException 434 { 435 return parent.prepareResourceForWrite( conf ); 436 } 437 438 @Override 439 public boolean commitResource( Config conf ) throws IOException 440 { 441 return parent.commitResource( conf ); 442 } 443 444 @Override 445 public boolean rollbackResource( Config conf ) throws IOException 446 { 447 return parent.rollbackResource( conf ); 448 } 449 450 @Override 451 public boolean resourceExists( Config conf ) throws IOException 452 { 453 return parent.resourceExists( conf ); 454 } 455 456 @Override 457 public long getModifiedTime( Config conf ) throws IOException 458 { 459 return parent.getModifiedTime( conf ); 460 } 461 462 @Override 463 public boolean equals( Object object ) 464 { 465 if( this == object ) 466 return true; 467 if( object == null || getClass() != object.getClass() ) 468 return false; 469 if( !super.equals( object ) ) 470 return false; 471 472 BasePartitionTap that = (BasePartitionTap) object; 473 474 if( parent != null ? !parent.equals( that.parent ) : that.parent != null ) 475 return false; 476 if( partition != null ? !partition.equals( that.partition ) : that.partition != null ) 477 return false; 478 if( partition != null ? !sourcePartitionFilters.equals( that.sourcePartitionFilters ) : that.sourcePartitionFilters != null ) 479 return false; 480 481 return true; 482 } 483 484 @Override 485 public int hashCode() 486 { 487 int result = super.hashCode(); 488 result = 31 * result + ( parent != null ? parent.hashCode() : 0 ); 489 result = 31 * result + ( partition != null ? partition.hashCode() : 0 ); 490 result = 31 * result + ( sourcePartitionFilters != null ? sourcePartitionFilters.hashCode() : 0 ); 491 return result; 492 } 493 494 @Override 495 public String toString() 496 { 497 return getClass().getSimpleName() + "[\"" + parent + "\"]" + "[\"" + partition + "\"]" + "[\"" + sourcePartitionFilters + "\"]"; 498 } 499 500 public static class PartitionScheme<Config, Input, Output> extends Scheme<Config, Input, Output, Void, Void> 501 { 502 private final Scheme scheme; 503 private final Fields partitionFields; 504 505 public PartitionScheme( Scheme scheme ) 506 { 507 this.scheme = scheme; 508 this.partitionFields = null; 509 } 510 511 public PartitionScheme( Scheme scheme, Fields partitionFields ) 512 { 513 this.scheme = scheme; 514 515 if( partitionFields == null || partitionFields.isAll() ) 516 this.partitionFields = null; 517 else if( partitionFields.isDefined() ) 518 this.partitionFields = partitionFields; 519 else 520 throw new IllegalArgumentException( "partitionFields must be defined or the ALL substitution, got: " + partitionFields.printVerbose() ); 521 } 522 523 public Fields getSinkFields() 524 { 525 if( partitionFields == null || scheme.getSinkFields().isAll() ) 526 return scheme.getSinkFields(); 527 528 return Fields.merge( scheme.getSinkFields(), partitionFields ); 529 } 530 531 public void setSinkFields( Fields sinkFields ) 532 { 533 scheme.setSinkFields( sinkFields ); 534 } 535 536 public Fields getSourceFields() 537 { 538 if( partitionFields == null || scheme.getSourceFields().isUnknown() ) 539 return scheme.getSourceFields(); 540 541 return Fields.merge( scheme.getSourceFields(), partitionFields ); 542 } 543 544 public void setSourceFields( Fields sourceFields ) 545 { 546 scheme.setSourceFields( sourceFields ); 547 } 548 549 public int getNumSinkParts() 550 { 551 return scheme.getNumSinkParts(); 552 } 553 554 public void setNumSinkParts( int numSinkParts ) 555 { 556 scheme.setNumSinkParts( numSinkParts ); 557 } 558 559 @Override 560 public void sourceConfInit( FlowProcess<? extends Config> flowProcess, Tap<Config, Input, Output> tap, Config conf ) 561 { 562 scheme.sourceConfInit( flowProcess, tap, conf ); 563 } 564 565 @Override 566 public void sourcePrepare( FlowProcess<? extends Config> flowProcess, SourceCall<Void, Input> sourceCall ) throws IOException 567 { 568 scheme.sourcePrepare( flowProcess, sourceCall ); 569 } 570 571 @Override 572 public boolean source( FlowProcess<? extends Config> flowProcess, SourceCall<Void, Input> sourceCall ) throws IOException 573 { 574 throw new UnsupportedOperationException( "should never be called" ); 575 } 576 577 @Override 578 public void sourceCleanup( FlowProcess<? extends Config> flowProcess, SourceCall<Void, Input> sourceCall ) throws IOException 579 { 580 scheme.sourceCleanup( flowProcess, sourceCall ); 581 } 582 583 @Override 584 public void sinkConfInit( FlowProcess<? extends Config> flowProcess, Tap<Config, Input, Output> tap, Config conf ) 585 { 586 scheme.sinkConfInit( flowProcess, tap, conf ); 587 } 588 589 @Override 590 public void sinkPrepare( FlowProcess<? extends Config> flowProcess, SinkCall<Void, Output> sinkCall ) throws IOException 591 { 592 scheme.sinkPrepare( flowProcess, sinkCall ); 593 } 594 595 @Override 596 public void sink( FlowProcess<? extends Config> flowProcess, SinkCall<Void, Output> sinkCall ) throws IOException 597 { 598 throw new UnsupportedOperationException( "should never be called" ); 599 } 600 601 @Override 602 public void sinkCleanup( FlowProcess<? extends Config> flowProcess, SinkCall<Void, Output> sinkCall ) throws IOException 603 { 604 scheme.sinkCleanup( flowProcess, sinkCall ); 605 } 606 } 607 }