001 /* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.tap; 022 023 import java.io.IOException; 024 import java.util.HashSet; 025 import java.util.LinkedHashMap; 026 import java.util.Map; 027 import java.util.Set; 028 029 import cascading.flow.FlowProcess; 030 import cascading.scheme.Scheme; 031 import cascading.scheme.SinkCall; 032 import cascading.scheme.SourceCall; 033 import cascading.tuple.Fields; 034 import cascading.tuple.Tuple; 035 import cascading.tuple.TupleEntry; 036 import cascading.tuple.TupleEntryCollector; 037 import cascading.tuple.TupleEntrySchemeCollector; 038 import org.slf4j.Logger; 039 import org.slf4j.LoggerFactory; 040 041 /** 042 * 043 */ 044 public abstract class BaseTemplateTap<Config, Output> extends SinkTap<Config, Output> 045 { 046 /** Field LOG */ 047 private static final Logger LOG = LoggerFactory.getLogger( BaseTemplateTap.class ); 048 /** Field OPEN_FILES_THRESHOLD_DEFAULT */ 049 protected static final int OPEN_TAPS_THRESHOLD_DEFAULT = 300; 050 051 private class TemplateCollector extends TupleEntryCollector 052 { 053 private final FlowProcess<Config> flowProcess; 054 private final Config conf; 055 private final Fields parentFields; 056 private final Fields pathFields; 057 058 public TemplateCollector( FlowProcess<Config> flowProcess ) 059 { 060 super( Fields.asDeclaration( getSinkFields() ) ); 061 this.flowProcess = flowProcess; 062 this.conf = flowProcess.getConfigCopy(); 063 this.parentFields = parent.getSinkFields(); 064 this.pathFields = ( (TemplateScheme) getScheme() ).pathFields; 065 } 066 067 private TupleEntryCollector getCollector( String path ) 068 { 069 TupleEntryCollector collector = collectors.get( path ); 070 071 if( collector != null ) 072 return collector; 073 074 try 075 { 076 LOG.debug( "creating collector for parent: {}, path: {}", parent.getFullIdentifier( conf ), path ); 077 078 collector = createTupleEntrySchemeCollector( flowProcess, parent, path ); 079 080 flowProcess.increment( Counters.Paths_Opened, 1 ); 081 } 082 catch( IOException exception ) 083 { 084 throw new TapException( "unable to open template path: " + path, exception ); 085 } 086 087 if( collectors.size() > openTapsThreshold ) 088 purgeCollectors(); 089 090 collectors.put( path, collector ); 091 092 if( LOG.isInfoEnabled() && collectors.size() % 100 == 0 ) 093 LOG.info( "caching {} open Taps", collectors.size() ); 094 095 return collector; 096 } 097 098 private void purgeCollectors() 099 { 100 int numToClose = Math.max( 1, (int) ( openTapsThreshold * .10 ) ); 101 102 if( LOG.isInfoEnabled() ) 103 LOG.info( "removing {} open Taps from cache of size {}", numToClose, collectors.size() ); 104 105 Set<String> removeKeys = new HashSet<String>(); 106 Set<String> keys = collectors.keySet(); 107 108 for( String key : keys ) 109 { 110 if( numToClose-- == 0 ) 111 break; 112 113 removeKeys.add( key ); 114 } 115 116 for( String removeKey : removeKeys ) 117 closeCollector( collectors.remove( removeKey ) ); 118 119 flowProcess.increment( Counters.Path_Purges, 1 ); 120 } 121 122 @Override 123 public void close() 124 { 125 super.close(); 126 127 try 128 { 129 for( TupleEntryCollector collector : collectors.values() ) 130 closeCollector( collector ); 131 } 132 finally 133 { 134 collectors.clear(); 135 } 136 } 137 138 private void closeCollector( TupleEntryCollector collector ) 139 { 140 if( collector == null ) 141 return; 142 143 try 144 { 145 collector.close(); 146 147 flowProcess.increment( Counters.Paths_Closed, 1 ); 148 } 149 catch( Exception exception ) 150 { 151 // do nothing 152 } 153 } 154 155 protected void collect( TupleEntry tupleEntry ) throws IOException 156 { 157 if( pathFields != null ) 158 { 159 Tuple pathValues = tupleEntry.selectTuple( pathFields ); 160 String path = pathValues.format( pathTemplate ); 161 162 getCollector( path ).add( tupleEntry.selectTuple( parentFields ) ); 163 } 164 else 165 { 166 String path = tupleEntry.getTuple().format( pathTemplate ); 167 168 getCollector( path ).add( tupleEntry ); 169 } 170 } 171 } 172 173 /** Field parent */ 174 protected Tap parent; 175 /** Field pathTemplate */ 176 protected String pathTemplate; 177 /** Field keepParentOnDelete */ 178 protected boolean keepParentOnDelete = false; 179 /** Field openTapsThreshold */ 180 protected int openTapsThreshold = OPEN_TAPS_THRESHOLD_DEFAULT; 181 /** Field collectors */ 182 private final Map<String, TupleEntryCollector> collectors = new LinkedHashMap<String, TupleEntryCollector>( 1000, .75f, true ); 183 184 protected abstract TupleEntrySchemeCollector createTupleEntrySchemeCollector( FlowProcess<Config> flowProcess, Tap parent, String path ) throws IOException; 185 186 /** 187 * Method getParent returns the parent Tap of this TemplateTap object. 188 * 189 * @return the parent (type Tap) of this TemplateTap object. 190 */ 191 public Tap getParent() 192 { 193 return parent; 194 } 195 196 /** 197 * Method getPathTemplate returns the pathTemplate {@link java.util.Formatter} format String of this TemplateTap object. 198 * 199 * @return the pathTemplate (type String) of this TemplateTap object. 200 */ 201 public String getPathTemplate() 202 { 203 return pathTemplate; 204 } 205 206 @Override 207 public String getIdentifier() 208 { 209 return parent.getIdentifier(); 210 } 211 212 /** 213 * Method getOpenTapsThreshold returns the openTapsThreshold of this TemplateTap object. 214 * 215 * @return the openTapsThreshold (type int) of this TemplateTap object. 216 */ 217 public int getOpenTapsThreshold() 218 { 219 return openTapsThreshold; 220 } 221 222 @Override 223 public TupleEntryCollector openForWrite( FlowProcess<Config> flowProcess, Output output ) throws IOException 224 { 225 return new TemplateCollector( flowProcess ); 226 } 227 228 /** @see cascading.tap.Tap#createResource(Object) */ 229 public boolean createResource( Config conf ) throws IOException 230 { 231 return parent.createResource( conf ); 232 } 233 234 /** @see cascading.tap.Tap#deleteResource(Object) */ 235 public boolean deleteResource( Config conf ) throws IOException 236 { 237 return keepParentOnDelete || parent.deleteResource( conf ); 238 } 239 240 @Override 241 public boolean prepareResourceForRead( Config conf ) throws IOException 242 { 243 return parent.prepareResourceForRead( conf ); 244 } 245 246 @Override 247 public boolean prepareResourceForWrite( Config conf ) throws IOException 248 { 249 return parent.prepareResourceForWrite( conf ); 250 } 251 252 @Override 253 public boolean commitResource( Config conf ) throws IOException 254 { 255 return parent.commitResource( conf ); 256 } 257 258 @Override 259 public boolean rollbackResource( Config conf ) throws IOException 260 { 261 return parent.rollbackResource( conf ); 262 } 263 264 /** @see cascading.tap.Tap#resourceExists(Object) */ 265 public boolean resourceExists( Config conf ) throws IOException 266 { 267 return parent.resourceExists( conf ); 268 } 269 270 /** @see cascading.tap.Tap#getModifiedTime(Object) */ 271 @Override 272 public long getModifiedTime( Config conf ) throws IOException 273 { 274 return parent.getModifiedTime( conf ); 275 } 276 277 @Override 278 public boolean equals( Object object ) 279 { 280 if( this == object ) 281 return true; 282 if( object == null || getClass() != object.getClass() ) 283 return false; 284 if( !super.equals( object ) ) 285 return false; 286 287 BaseTemplateTap that = (BaseTemplateTap) object; 288 289 if( parent != null ? !parent.equals( that.parent ) : that.parent != null ) 290 return false; 291 if( pathTemplate != null ? !pathTemplate.equals( that.pathTemplate ) : that.pathTemplate != null ) 292 return false; 293 294 return true; 295 } 296 297 @Override 298 public int hashCode() 299 { 300 int result = super.hashCode(); 301 result = 31 * result + ( parent != null ? parent.hashCode() : 0 ); 302 result = 31 * result + ( pathTemplate != null ? pathTemplate.hashCode() : 0 ); 303 return result; 304 } 305 306 @Override 307 public String toString() 308 { 309 return getClass().getSimpleName() + "[\"" + parent + "\"]" + "[\"" + pathTemplate + "\"]"; 310 } 311 312 public enum Counters 313 { 314 Paths_Opened, Paths_Closed, Path_Purges 315 } 316 317 protected BaseTemplateTap( Tap parent, String pathTemplate, int openTapsThreshold ) 318 { 319 this( new TemplateScheme( parent.getScheme() ) ); 320 this.parent = parent; 321 this.pathTemplate = pathTemplate; 322 this.openTapsThreshold = openTapsThreshold; 323 } 324 325 protected BaseTemplateTap( Tap parent, String pathTemplate, SinkMode sinkMode ) 326 { 327 super( new TemplateScheme( parent.getScheme() ), sinkMode ); 328 this.parent = parent; 329 this.pathTemplate = pathTemplate; 330 } 331 332 protected BaseTemplateTap( Tap parent, String pathTemplate, SinkMode sinkMode, boolean keepParentOnDelete, int openTapsThreshold ) 333 { 334 super( new TemplateScheme( parent.getScheme() ), sinkMode ); 335 this.parent = parent; 336 this.pathTemplate = pathTemplate; 337 this.keepParentOnDelete = keepParentOnDelete; 338 this.openTapsThreshold = openTapsThreshold; 339 } 340 341 protected BaseTemplateTap( Tap parent, String pathTemplate, Fields pathFields, int openTapsThreshold ) 342 { 343 super( new TemplateScheme( parent.getScheme(), pathFields ) ); 344 this.parent = parent; 345 this.pathTemplate = pathTemplate; 346 this.openTapsThreshold = openTapsThreshold; 347 } 348 349 protected BaseTemplateTap( Tap parent, String pathTemplate, Fields pathFields, SinkMode sinkMode ) 350 { 351 super( new TemplateScheme( parent.getScheme(), pathFields ), sinkMode ); 352 this.parent = parent; 353 this.pathTemplate = pathTemplate; 354 } 355 356 protected BaseTemplateTap( Tap parent, String pathTemplate, Fields pathFields, SinkMode sinkMode, boolean keepParentOnDelete, int openTapsThreshold ) 357 { 358 super( new TemplateScheme( parent.getScheme(), pathFields ), sinkMode ); 359 this.parent = parent; 360 this.pathTemplate = pathTemplate; 361 this.keepParentOnDelete = keepParentOnDelete; 362 this.openTapsThreshold = openTapsThreshold; 363 } 364 365 protected BaseTemplateTap( Scheme<Config, ?, Output, ?, ?> scheme, SinkMode sinkMode ) 366 { 367 super( scheme, sinkMode ); 368 } 369 370 protected BaseTemplateTap( Scheme<Config, ?, Output, ?, ?> scheme ) 371 { 372 super( scheme ); 373 } 374 375 public static class TemplateScheme<Config, Output> extends Scheme<Config, Void, Output, Void, Void> 376 { 377 private final Scheme scheme; 378 private final Fields pathFields; 379 380 public TemplateScheme( Scheme scheme ) 381 { 382 this.scheme = scheme; 383 this.pathFields = null; 384 } 385 386 public TemplateScheme( Scheme scheme, Fields pathFields ) 387 { 388 this.scheme = scheme; 389 390 if( pathFields == null || pathFields.isAll() ) 391 this.pathFields = null; 392 else if( pathFields.isDefined() ) 393 this.pathFields = pathFields; 394 else 395 throw new IllegalArgumentException( "pathFields must be defined or the ALL substitution, got: " + pathFields.printVerbose() ); 396 } 397 398 public Fields getSinkFields() 399 { 400 if( pathFields == null || scheme.getSinkFields().isAll() ) 401 return scheme.getSinkFields(); 402 403 return Fields.merge( scheme.getSinkFields(), pathFields ); 404 } 405 406 public void setSinkFields( Fields sinkFields ) 407 { 408 scheme.setSinkFields( sinkFields ); 409 } 410 411 public Fields getSourceFields() 412 { 413 return scheme.getSourceFields(); 414 } 415 416 public void setSourceFields( Fields sourceFields ) 417 { 418 scheme.setSourceFields( sourceFields ); 419 } 420 421 public int getNumSinkParts() 422 { 423 return scheme.getNumSinkParts(); 424 } 425 426 public void setNumSinkParts( int numSinkParts ) 427 { 428 scheme.setNumSinkParts( numSinkParts ); 429 } 430 431 @Override 432 public void sourceConfInit( FlowProcess<Config> flowProcess, Tap<Config, Void, Output> tap, Config conf ) 433 { 434 scheme.sourceConfInit( flowProcess, tap, conf ); 435 } 436 437 @Override 438 public void sourcePrepare( FlowProcess<Config> flowProcess, SourceCall<Void, Void> sourceCall ) throws IOException 439 { 440 scheme.sourcePrepare( flowProcess, sourceCall ); 441 } 442 443 @Override 444 public boolean source( FlowProcess<Config> flowProcess, SourceCall<Void, Void> sourceCall ) throws IOException 445 { 446 throw new UnsupportedOperationException( "not supported" ); 447 } 448 449 @Override 450 public void sourceCleanup( FlowProcess<Config> flowProcess, SourceCall<Void, Void> sourceCall ) throws IOException 451 { 452 scheme.sourceCleanup( flowProcess, sourceCall ); 453 } 454 455 @Override 456 public void sinkConfInit( FlowProcess<Config> flowProcess, Tap<Config, Void, Output> tap, Config conf ) 457 { 458 scheme.sinkConfInit( flowProcess, tap, conf ); 459 } 460 461 @Override 462 public void sinkPrepare( FlowProcess<Config> flowProcess, SinkCall<Void, Output> sinkCall ) throws IOException 463 { 464 scheme.sinkPrepare( flowProcess, sinkCall ); 465 } 466 467 @Override 468 public void sink( FlowProcess<Config> flowProcess, SinkCall<Void, Output> sinkCall ) throws IOException 469 { 470 throw new UnsupportedOperationException( "should never be called" ); 471 } 472 473 @Override 474 public void sinkCleanup( FlowProcess<Config> flowProcess, SinkCall<Void, Output> sinkCall ) throws IOException 475 { 476 scheme.sinkCleanup( flowProcess, sinkCall ); 477 } 478 } 479 }