001 /* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.tap.partition; 022 023 import java.io.IOException; 024 import java.util.ArrayList; 025 import java.util.HashSet; 026 import java.util.Iterator; 027 import java.util.LinkedHashMap; 028 import java.util.List; 029 import java.util.Map; 030 import java.util.Set; 031 032 import cascading.flow.FlowProcess; 033 import cascading.scheme.Scheme; 034 import cascading.scheme.SinkCall; 035 import cascading.scheme.SourceCall; 036 import cascading.tap.SinkMode; 037 import cascading.tap.Tap; 038 import cascading.tap.TapException; 039 import cascading.tap.type.FileType; 040 import cascading.tuple.Fields; 041 import cascading.tuple.Tuple; 042 import cascading.tuple.TupleEntry; 043 import cascading.tuple.TupleEntryCollector; 044 import cascading.tuple.TupleEntryIterableChainIterator; 045 import cascading.tuple.TupleEntryIterator; 046 import cascading.tuple.TupleEntrySchemeCollector; 047 import cascading.tuple.TupleEntrySchemeIterator; 048 import cascading.tuple.util.TupleViews; 049 import org.slf4j.Logger; 050 import org.slf4j.LoggerFactory; 051 052 /** 053 * 054 */ 055 public abstract class BasePartitionTap<Config, Input, Output> extends Tap<Config, Input, Output> 056 { 057 /** Field LOG */ 058 private static final Logger LOG = LoggerFactory.getLogger( BasePartitionTap.class ); 059 /** Field OPEN_FILES_THRESHOLD_DEFAULT */ 060 protected static final int OPEN_WRITES_THRESHOLD_DEFAULT = 300; 061 062 private class PartitionIterator extends TupleEntryIterableChainIterator 063 { 064 public PartitionIterator( final FlowProcess<Config> flowProcess, Input input ) throws IOException 065 { 066 super( getSourceFields() ); 067 068 List<Iterator<Tuple>> iterators = new ArrayList<Iterator<Tuple>>(); 069 070 if( input != null ) 071 { 072 String identifier = parent.getFullIdentifier( flowProcess ); 073 iterators.add( createPartitionEntryIterator( flowProcess, input, identifier, getCurrentIdentifier( flowProcess ) ) ); 074 } 075 else 076 { 077 String[] childIdentifiers = getChildPartitionIdentifiers( flowProcess, false ); 078 079 for( String childIdentifier : childIdentifiers ) 080 iterators.add( createPartitionEntryIterator( flowProcess, null, parent.getIdentifier(), childIdentifier ) ); 081 } 082 083 reset( iterators ); 084 } 085 086 private PartitionTupleEntryIterator createPartitionEntryIterator( FlowProcess<Config> flowProcess, Input input, String parentIdentifier, String childIdentifier ) throws IOException 087 { 088 TupleEntrySchemeIterator schemeIterator = createTupleEntrySchemeIterator( flowProcess, parent, childIdentifier, input ); 089 090 return new PartitionTupleEntryIterator( getSourceFields(), partition, parentIdentifier, childIdentifier, schemeIterator ); 091 } 092 } 093 094 public class PartitionCollector extends TupleEntryCollector 095 { 096 private final FlowProcess<Config> flowProcess; 097 private final Config conf; 098 private final Fields parentFields; 099 private final Fields partitionFields; 100 private TupleEntry partitionEntry; 101 private final Tuple partitionTuple; 102 private final Tuple parentTuple; 103 104 public PartitionCollector( FlowProcess<Config> flowProcess ) 105 { 106 super( Fields.asDeclaration( getSinkFields() ) ); 107 this.flowProcess = flowProcess; 108 this.conf = flowProcess.getConfigCopy(); 109 this.parentFields = parent.getSinkFields(); 110 this.partitionFields = ( (PartitionScheme) getScheme() ).partitionFields; 111 this.partitionEntry = new TupleEntry( this.partitionFields ); 112 113 this.partitionTuple = TupleViews.createNarrow( getSinkFields().getPos( this.partitionFields ) ); 114 this.parentTuple = TupleViews.createNarrow( getSinkFields().getPos( this.parentFields ) ); 115 116 this.partitionEntry.setTuple( partitionTuple ); 117 } 118 119 TupleEntryCollector getCollector( String path ) 120 { 121 TupleEntryCollector collector = collectors.get( path ); 122 123 if( collector != null ) 124 return collector; 125 126 try 127 { 128 LOG.debug( "creating collector for parent: {}, path: {}", parent.getFullIdentifier( conf ), path ); 129 130 collector = createTupleEntrySchemeCollector( flowProcess, parent, path, openedCollectors ); 131 132 openedCollectors++; 133 flowProcess.increment( Counters.Paths_Opened, 1 ); 134 } 135 catch( IOException exception ) 136 { 137 throw new TapException( "unable to open partition path: " + path, exception ); 138 } 139 140 if( collectors.size() > openWritesThreshold ) 141 purgeCollectors(); 142 143 collectors.put( path, collector ); 144 145 if( LOG.isInfoEnabled() && collectors.size() % 100 == 0 ) 146 LOG.info( "caching {} open Taps", collectors.size() ); 147 148 return collector; 149 } 150 151 private void purgeCollectors() 152 { 153 int numToClose = Math.max( 1, (int) ( openWritesThreshold * .10 ) ); 154 155 if( LOG.isInfoEnabled() ) 156 LOG.info( "removing {} open Taps from cache of size {}", numToClose, collectors.size() ); 157 158 Set<String> removeKeys = new HashSet<String>(); 159 Set<String> keys = collectors.keySet(); 160 161 for( String key : keys ) 162 { 163 if( numToClose-- == 0 ) 164 break; 165 166 removeKeys.add( key ); 167 } 168 169 for( String removeKey : removeKeys ) 170 { 171 closeCollector( removeKey ); 172 collectors.remove( removeKey ); 173 } 174 175 flowProcess.increment( Counters.Path_Purges, 1 ); 176 } 177 178 @Override 179 public void close() 180 { 181 super.close(); 182 183 try 184 { 185 for( String path : new ArrayList<String>( collectors.keySet() ) ) 186 closeCollector( path ); 187 } 188 finally 189 { 190 collectors.clear(); 191 } 192 } 193 194 public void closeCollector( String path ) 195 { 196 TupleEntryCollector collector = collectors.get( path ); 197 if( collector == null ) 198 return; 199 try 200 { 201 collector.close(); 202 203 flowProcess.increment( Counters.Paths_Closed, 1 ); 204 } 205 catch( Exception exception ) 206 { 207 LOG.error( "exception while closing TupleEntryCollector {}", path, exception ); 208 209 boolean failOnError = false; 210 Object failProperty = flowProcess.getProperty( PartitionTapProps.FAIL_ON_CLOSE ); 211 212 if( failProperty != null ) 213 failOnError = Boolean.parseBoolean( failProperty.toString() ); 214 215 if( failOnError ) 216 throw new TapException( exception ); 217 } 218 } 219 220 protected void collect( TupleEntry tupleEntry ) throws IOException 221 { 222 // reset select views 223 TupleViews.reset( partitionTuple, tupleEntry.getTuple() ); // partitionTuple is inside partitionEntry 224 TupleViews.reset( parentTuple, tupleEntry.getTuple() ); 225 226 String path = partition.toPartition( partitionEntry ); 227 228 getCollector( path ).add( parentTuple ); 229 } 230 } 231 232 /** Field parent */ 233 protected Tap parent; 234 /** Field partition */ 235 protected Partition partition; 236 /** Field keepParentOnDelete */ 237 protected boolean keepParentOnDelete = false; 238 /** Field openTapsThreshold */ 239 protected int openWritesThreshold = OPEN_WRITES_THRESHOLD_DEFAULT; 240 241 /** Field openedCollectors */ 242 private long openedCollectors = 0; 243 /** Field collectors */ 244 private final Map<String, TupleEntryCollector> collectors = new LinkedHashMap<String, TupleEntryCollector>( 1000, .75f, true ); 245 246 protected abstract TupleEntrySchemeCollector createTupleEntrySchemeCollector( FlowProcess<Config> flowProcess, Tap parent, String path, long sequence ) throws IOException; 247 248 protected abstract TupleEntrySchemeIterator createTupleEntrySchemeIterator( FlowProcess<Config> flowProcess, Tap parent, String path, Input input ) throws IOException; 249 250 public enum Counters 251 { 252 Paths_Opened, Paths_Closed, Path_Purges 253 } 254 255 protected BasePartitionTap( Tap parent, Partition partition, int openWritesThreshold ) 256 { 257 super( new PartitionScheme( parent.getScheme(), partition.getPartitionFields() ), parent.getSinkMode() ); 258 this.parent = parent; 259 this.partition = partition; 260 this.openWritesThreshold = openWritesThreshold; 261 } 262 263 protected BasePartitionTap( Tap parent, Partition partition, SinkMode sinkMode ) 264 { 265 super( new PartitionScheme( parent.getScheme(), partition.getPartitionFields() ), sinkMode ); 266 this.parent = parent; 267 this.partition = partition; 268 } 269 270 protected BasePartitionTap( Tap parent, Partition partition, SinkMode sinkMode, boolean keepParentOnDelete, int openWritesThreshold ) 271 { 272 super( new PartitionScheme( parent.getScheme(), partition.getPartitionFields() ), sinkMode ); 273 this.parent = parent; 274 this.partition = partition; 275 this.keepParentOnDelete = keepParentOnDelete; 276 this.openWritesThreshold = openWritesThreshold; 277 } 278 279 /** 280 * Method getParent returns the parent Tap of this PartitionTap object. 281 * 282 * @return the parent (type Tap) of this PartitionTap object. 283 */ 284 public Tap getParent() 285 { 286 return parent; 287 } 288 289 /** 290 * Method getPartition returns the {@link Partition} instance used by this PartitionTap 291 * 292 * @return the partition instance 293 */ 294 public Partition getPartition() 295 { 296 return partition; 297 } 298 299 /** 300 * Method getChildPartitionIdentifiers returns an array of all identifiers for all available partitions. 301 * <p/> 302 * This method is used internally to set all incoming paths, override to limit applicable partitions. 303 * <p/> 304 * Note the returns array may be large. 305 * 306 * @param flowProcess of type FlowProcess 307 * @param fullyQualified of type boolean 308 * @return a String[] of partition identifiers 309 * @throws IOException 310 */ 311 public String[] getChildPartitionIdentifiers( FlowProcess<Config> flowProcess, boolean fullyQualified ) throws IOException 312 { 313 return ( (FileType) parent ).getChildIdentifiers( flowProcess.getConfigCopy(), partition.getPathDepth(), fullyQualified ); 314 } 315 316 @Override 317 public String getIdentifier() 318 { 319 return parent.getIdentifier(); 320 } 321 322 protected abstract String getCurrentIdentifier( FlowProcess<Config> flowProcess ); 323 324 /** 325 * Method getOpenWritesThreshold returns the openTapsThreshold of this PartitionTap object. 326 * 327 * @return the openTapsThreshold (type int) of this PartitionTap object. 328 */ 329 public int getOpenWritesThreshold() 330 { 331 return openWritesThreshold; 332 } 333 334 @Override 335 public TupleEntryCollector openForWrite( FlowProcess<Config> flowProcess, Output output ) throws IOException 336 { 337 return new PartitionCollector( flowProcess ); 338 } 339 340 @Override 341 public TupleEntryIterator openForRead( FlowProcess<Config> flowProcess, Input input ) throws IOException 342 { 343 return new PartitionIterator( flowProcess, input ); 344 } 345 346 @Override 347 public boolean createResource( Config conf ) throws IOException 348 { 349 return parent.createResource( conf ); 350 } 351 352 @Override 353 public boolean deleteResource( Config conf ) throws IOException 354 { 355 return keepParentOnDelete || parent.deleteResource( conf ); 356 } 357 358 @Override 359 public boolean prepareResourceForRead( Config conf ) throws IOException 360 { 361 return parent.prepareResourceForRead( conf ); 362 } 363 364 @Override 365 public boolean prepareResourceForWrite( Config conf ) throws IOException 366 { 367 return parent.prepareResourceForWrite( conf ); 368 } 369 370 @Override 371 public boolean commitResource( Config conf ) throws IOException 372 { 373 return parent.commitResource( conf ); 374 } 375 376 @Override 377 public boolean rollbackResource( Config conf ) throws IOException 378 { 379 return parent.rollbackResource( conf ); 380 } 381 382 @Override 383 public boolean resourceExists( Config conf ) throws IOException 384 { 385 return parent.resourceExists( conf ); 386 } 387 388 @Override 389 public long getModifiedTime( Config conf ) throws IOException 390 { 391 return parent.getModifiedTime( conf ); 392 } 393 394 @Override 395 public boolean equals( Object object ) 396 { 397 if( this == object ) 398 return true; 399 if( object == null || getClass() != object.getClass() ) 400 return false; 401 if( !super.equals( object ) ) 402 return false; 403 404 BasePartitionTap that = (BasePartitionTap) object; 405 406 if( parent != null ? !parent.equals( that.parent ) : that.parent != null ) 407 return false; 408 if( partition != null ? !partition.equals( that.partition ) : that.partition != null ) 409 return false; 410 411 return true; 412 } 413 414 @Override 415 public int hashCode() 416 { 417 int result = super.hashCode(); 418 result = 31 * result + ( parent != null ? parent.hashCode() : 0 ); 419 result = 31 * result + ( partition != null ? partition.hashCode() : 0 ); 420 return result; 421 } 422 423 @Override 424 public String toString() 425 { 426 return getClass().getSimpleName() + "[\"" + parent + "\"]" + "[\"" + partition + "\"]"; 427 } 428 429 public static class PartitionScheme<Config, Input, Output> extends Scheme<Config, Input, Output, Void, Void> 430 { 431 private final Scheme scheme; 432 private final Fields partitionFields; 433 434 public PartitionScheme( Scheme scheme ) 435 { 436 this.scheme = scheme; 437 this.partitionFields = null; 438 } 439 440 public PartitionScheme( Scheme scheme, Fields partitionFields ) 441 { 442 this.scheme = scheme; 443 444 if( partitionFields == null || partitionFields.isAll() ) 445 this.partitionFields = null; 446 else if( partitionFields.isDefined() ) 447 this.partitionFields = partitionFields; 448 else 449 throw new IllegalArgumentException( "partitionFields must be defined or the ALL substitution, got: " + partitionFields.printVerbose() ); 450 } 451 452 public Fields getSinkFields() 453 { 454 if( partitionFields == null || scheme.getSinkFields().isAll() ) 455 return scheme.getSinkFields(); 456 457 return Fields.merge( scheme.getSinkFields(), partitionFields ); 458 } 459 460 public void setSinkFields( Fields sinkFields ) 461 { 462 scheme.setSinkFields( sinkFields ); 463 } 464 465 public Fields getSourceFields() 466 { 467 if( partitionFields == null || scheme.getSourceFields().isUnknown() ) 468 return scheme.getSourceFields(); 469 470 return Fields.merge( scheme.getSourceFields(), partitionFields ); 471 } 472 473 public void setSourceFields( Fields sourceFields ) 474 { 475 scheme.setSourceFields( sourceFields ); 476 } 477 478 public int getNumSinkParts() 479 { 480 return scheme.getNumSinkParts(); 481 } 482 483 public void setNumSinkParts( int numSinkParts ) 484 { 485 scheme.setNumSinkParts( numSinkParts ); 486 } 487 488 @Override 489 public void sourceConfInit( FlowProcess<Config> flowProcess, Tap<Config, Input, Output> tap, Config conf ) 490 { 491 scheme.sourceConfInit( flowProcess, tap, conf ); 492 } 493 494 @Override 495 public void sourcePrepare( FlowProcess<Config> flowProcess, SourceCall<Void, Input> sourceCall ) throws IOException 496 { 497 scheme.sourcePrepare( flowProcess, sourceCall ); 498 } 499 500 @Override 501 public boolean source( FlowProcess<Config> flowProcess, SourceCall<Void, Input> sourceCall ) throws IOException 502 { 503 throw new UnsupportedOperationException( "should never be called" ); 504 } 505 506 @Override 507 public void sourceCleanup( FlowProcess<Config> flowProcess, SourceCall<Void, Input> sourceCall ) throws IOException 508 { 509 scheme.sourceCleanup( flowProcess, sourceCall ); 510 } 511 512 @Override 513 public void sinkConfInit( FlowProcess<Config> flowProcess, Tap<Config, Input, Output> tap, Config conf ) 514 { 515 scheme.sinkConfInit( flowProcess, tap, conf ); 516 } 517 518 @Override 519 public void sinkPrepare( FlowProcess<Config> flowProcess, SinkCall<Void, Output> sinkCall ) throws IOException 520 { 521 scheme.sinkPrepare( flowProcess, sinkCall ); 522 } 523 524 @Override 525 public void sink( FlowProcess<Config> flowProcess, SinkCall<Void, Output> sinkCall ) throws IOException 526 { 527 throw new UnsupportedOperationException( "should never be called" ); 528 } 529 530 @Override 531 public void sinkCleanup( FlowProcess<Config> flowProcess, SinkCall<Void, Output> sinkCall ) throws IOException 532 { 533 scheme.sinkCleanup( flowProcess, sinkCall ); 534 } 535 } 536 }