001/* 002 * Copyright (c) 2016-2018 Chris K Wensel <chris@wensel.net>. All Rights Reserved. 003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 004 * 005 * Project and contact information: http://www.cascading.org/ 006 * 007 * This file is part of the Cascading project. 008 * 009 * Licensed under the Apache License, Version 2.0 (the "License"); 010 * you may not use this file except in compliance with the License. 011 * You may obtain a copy of the License at 012 * 013 * http://www.apache.org/licenses/LICENSE-2.0 014 * 015 * Unless required by applicable law or agreed to in writing, software 016 * distributed under the License is distributed on an "AS IS" BASIS, 017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 018 * See the License for the specific language governing permissions and 019 * limitations under the License. 020 */ 021 022package cascading.tap; 023 024import java.io.Closeable; 025import java.io.IOException; 026import java.io.Serializable; 027import java.io.UncheckedIOException; 028import java.util.Set; 029import java.util.Spliterator; 030import java.util.Spliterators; 031import java.util.stream.Stream; 032import java.util.stream.StreamSupport; 033 034import cascading.flow.Flow; 035import cascading.flow.FlowElement; 036import cascading.flow.FlowException; 037import cascading.flow.FlowProcess; 038import cascading.flow.planner.Scope; 039import cascading.flow.planner.ScopedElement; 040import cascading.management.annotation.Property; 041import cascading.management.annotation.PropertyDescription; 042import cascading.management.annotation.PropertySanitizer; 043import cascading.management.annotation.Visibility; 044import cascading.pipe.Pipe; 045import cascading.property.ConfigDef; 046import cascading.scheme.Scheme; 047import cascading.tuple.Fields; 048import cascading.tuple.FieldsResolverException; 049import cascading.tuple.Tuple; 050import cascading.tuple.TupleEntry; 051import cascading.tuple.TupleEntryCollector; 052import cascading.tuple.TupleEntryIterator; 053import cascading.util.TraceUtil; 054import cascading.util.Traceable; 055import cascading.util.Util; 056 057/** 058 * A Tap represents the physical data source or sink in a connected {@link cascading.flow.Flow}. 059 * <p> 060 * That is, a source Tap is the head end of a connected {@link Pipe} and {@link Tuple} stream, and 061 * a sink Tap is the tail end. Kinds of Tap types are used to manage files from a local disk, 062 * distributed disk, remote storage like Amazon S3, or via FTP. It simply abstracts 063 * out the complexity of connecting to these types of data sources. 064 * <p> 065 * A Tap takes a {@link Scheme} instance, which is used to identify the type of resource (text file, binary file, etc). 066 * A Tap is responsible for how the resource is reached. 067 * <p> 068 * By default when planning a Flow, Tap equality is a function of the {@link #getIdentifier()} and {@link #getScheme()} 069 * values. That is, two Tap instances are the same Tap instance if they sink/source the same resource and sink/source 070 * the same fields. 071 * <p> 072 * Some more advanced taps, like a database tap, may need to extend equality to include any filtering, like the 073 * {@code where} clause in a SQL statement so two taps reading from the same SQL table aren't considered equal. 074 * <p> 075 * Taps are also used to determine dependencies between two or more {@link Flow} instances when used with a 076 * {@link cascading.cascade.Cascade}. In that case the {@link #getFullIdentifier(Object)} value is used and the Scheme 077 * is ignored. 078 */ 079public abstract class Tap<Config, Input, Output> implements ScopedElement, FlowElement, Serializable, Traceable 080 { 081 /** Field scheme */ 082 private Scheme<Config, Input, Output, ?, ?> scheme; 083 084 /** Field mode */ 085 SinkMode sinkMode = SinkMode.KEEP; 086 087 private ConfigDef configDef; 088 private ConfigDef nodeConfigDef; 089 private ConfigDef stepConfigDef; 090 091 /** Field id */ 092 private final String id = Util.createUniqueID(); // 3.0 planner relies on this being consistent 093 /** Field trace */ 094 private String trace = TraceUtil.captureDebugTrace( this ); // see TraceUtil.setTrace() to override 095 096 /** 097 * Convenience function to make an array of Tap instances. 098 * 099 * @param taps of type Tap 100 * @return Tap array 101 */ 102 public static Tap[] taps( Tap... taps ) 103 { 104 return taps; 105 } 106 107 /** 108 * Creates and returns a unique ID for the given Tap, this value is cached and may be used to uniquely identify 109 * the Tap instance in properties files etc. 110 * <p> 111 * This value is generally reproducible assuming the Tap identifier and the Scheme source and sink Fields remain consistent. 112 * 113 * @param tap of type Tap 114 * @return of type String 115 */ 116 public static synchronized String id( Tap tap ) 117 { 118 if( tap instanceof DecoratorTap ) 119 return id( ( (DecoratorTap) tap ).getOriginal() ); 120 121 return tap.id; 122 } 123 124 protected Tap() 125 { 126 } 127 128 protected Tap( Scheme<Config, Input, Output, ?, ?> scheme ) 129 { 130 this.setScheme( scheme ); 131 } 132 133 protected Tap( Scheme<Config, Input, Output, ?, ?> scheme, SinkMode sinkMode ) 134 { 135 this.setScheme( scheme ); 136 137 if( sinkMode != null ) 138 this.sinkMode = sinkMode; 139 } 140 141 protected void setScheme( Scheme<Config, Input, Output, ?, ?> scheme ) 142 { 143 this.scheme = scheme; 144 } 145 146 /** 147 * Method getScheme returns the scheme of this Tap object. 148 * 149 * @return the scheme (type Scheme) of this Tap object. 150 */ 151 public Scheme<Config, Input, Output, ?, ?> getScheme() 152 { 153 return scheme; 154 } 155 156 @Override 157 public String getTrace() 158 { 159 return trace; 160 } 161 162 /** 163 * Method flowInit allows this Tap instance to initialize itself in context of the given {@link cascading.flow.Flow} instance. 164 * This method is guaranteed to be called before the Flow is started and the 165 * {@link cascading.flow.FlowListener#onStarting(cascading.flow.Flow)} event is fired. 166 * <p> 167 * This method will be called once per Flow, and before {@link #sourceConfInit(cascading.flow.FlowProcess, Object)} and 168 * {@link #sinkConfInit(cascading.flow.FlowProcess, Object)} methods. 169 * 170 * @param flow of type Flow 171 */ 172 public void flowConfInit( Flow<Config> flow ) 173 { 174 175 } 176 177 /** 178 * Method sourceConfInit initializes this instance as a source. 179 * <p> 180 * This method maybe called more than once if this Tap instance is used outside the scope of a {@link cascading.flow.Flow} 181 * instance or if it participates in multiple times in a given Flow or across different Flows in 182 * a {@link cascading.cascade.Cascade}. 183 * <p> 184 * In the context of a Flow, it will be called after 185 * {@link cascading.flow.FlowListener#onStarting(cascading.flow.Flow)} 186 * <p> 187 * Note that no resources or services should be modified by this method. 188 * 189 * @param flowProcess of type FlowProcess 190 * @param conf of type Config 191 */ 192 public void sourceConfInit( FlowProcess<? extends Config> flowProcess, Config conf ) 193 { 194 getScheme().sourceConfInit( flowProcess, this, conf ); 195 } 196 197 /** 198 * Method sinkConfInit initializes this instance as a sink. 199 * <p> 200 * This method maybe called more than once if this Tap instance is used outside the scope of a {@link cascading.flow.Flow} 201 * instance or if it participates in multiple times in a given Flow or across different Flows in 202 * a {@link cascading.cascade.Cascade}. 203 * <p> 204 * Note this method will be called in context of this Tap being used as a traditional 'sink' and as a 'trap'. 205 * <p> 206 * In the context of a Flow, it will be called after 207 * {@link cascading.flow.FlowListener#onStarting(cascading.flow.Flow)} 208 * <p> 209 * Note that no resources or services should be modified by this method. If this Tap instance returns true for 210 * {@link #isReplace()}, then {@link #deleteResource(Object)} will be called by the parent Flow. 211 * 212 * @param flowProcess of type FlowProcess 213 * @param conf of type Config 214 */ 215 public void sinkConfInit( FlowProcess<? extends Config> flowProcess, Config conf ) 216 { 217 getScheme().sinkConfInit( flowProcess, this, conf ); 218 } 219 220 /** 221 * Method getIdentifier returns a String representing the resource this Tap instance represents. 222 * <p> 223 * Often, if the tap accesses a filesystem, the identifier is nothing more than the path to the file or directory. 224 * In other cases it may be a an URL or URI representing a connection string or remote resource. 225 * <p> 226 * Any two Tap instances having the same value for the identifier are considered equal. 227 * 228 * @return String 229 */ 230 @Property(name = "identifier", visibility = Visibility.PUBLIC) 231 @PropertyDescription("The resource this instance represents") 232 @PropertySanitizer("cascading.management.annotation.URISanitizer") 233 public abstract String getIdentifier(); 234 235 /** 236 * Method getSourceFields returns the sourceFields of this Tap object. 237 * 238 * @return the sourceFields (type Fields) of this Tap object. 239 */ 240 public Fields getSourceFields() 241 { 242 return getScheme().getSourceFields(); 243 } 244 245 /** 246 * Method getSinkFields returns the sinkFields of this Tap object. 247 * 248 * @return the sinkFields (type Fields) of this Tap object. 249 */ 250 public Fields getSinkFields() 251 { 252 return getScheme().getSinkFields(); 253 } 254 255 /** 256 * Method openForRead opens the resource represented by this Tap instance for reading. 257 * <p> 258 * {@code input} value may be null, if so, sub-classes must inquire with the underlying {@link Scheme} 259 * via {@link Scheme#sourceConfInit(cascading.flow.FlowProcess, Tap, Object)} to get the proper 260 * input type and instantiate it before calling {@code super.openForRead()}. 261 * <p> 262 * Note the returned iterator will return the same instance of {@link cascading.tuple.TupleEntry} on every call, 263 * thus a copy must be made of either the TupleEntry or the underlying {@code Tuple} instance if they are to be 264 * stored in a Collection. 265 * 266 * @param flowProcess of type FlowProcess 267 * @param input of type Input 268 * @return TupleEntryIterator 269 * @throws java.io.IOException when the resource cannot be opened 270 */ 271 public abstract TupleEntryIterator openForRead( FlowProcess<? extends Config> flowProcess, Input input ) throws IOException; 272 273 /** 274 * Method openForRead opens the resource represented by this Tap instance for reading. 275 * <p> 276 * Note the returned iterator will return the same instance of {@link cascading.tuple.TupleEntry} on every call, 277 * thus a copy must be made of either the TupleEntry or the underlying {@code Tuple} instance if they are to be 278 * stored in a Collection. 279 * 280 * @param flowProcess of type FlowProcess 281 * @return TupleEntryIterator 282 * @throws java.io.IOException when the resource cannot be opened 283 */ 284 public TupleEntryIterator openForRead( FlowProcess<? extends Config> flowProcess ) throws IOException 285 { 286 return openForRead( flowProcess, null ); 287 } 288 289 /** 290 * Method openForWrite opens the resource represented by this Tap instance for writing. 291 * <p> 292 * This method is used internally and does not honor the {@link SinkMode} setting. If SinkMode is 293 * {@link SinkMode#REPLACE}, this call may fail. See {@link #openForWrite(cascading.flow.FlowProcess)}. 294 * <p> 295 * {@code output} value may be null, if so, sub-classes must inquire with the underlying {@link Scheme} 296 * via {@link Scheme#sinkConfInit(cascading.flow.FlowProcess, Tap, Object)} to get the proper 297 * output type and instantiate it before calling {@code super.openForWrite()}. 298 * 299 * @param flowProcess of type FlowProcess 300 * @param output of type Output 301 * @return TupleEntryCollector 302 * @throws java.io.IOException when the resource cannot be opened 303 */ 304 public abstract TupleEntryCollector openForWrite( FlowProcess<? extends Config> flowProcess, Output output ) throws IOException; 305 306 /** 307 * Method openForWrite opens the resource represented by this Tap instance for writing. 308 * <p> 309 * This method is for user application use and does honor the {@link SinkMode#REPLACE} settings. That is, if 310 * SinkMode is set to {@link SinkMode#REPLACE} the underlying resource will be deleted. 311 * <p> 312 * Note if {@link SinkMode#UPDATE} is set, the resource will not be deleted. 313 * 314 * @param flowProcess of type FlowProcess 315 * @return TupleEntryCollector 316 * @throws java.io.IOException when the resource cannot be opened 317 */ 318 public TupleEntryCollector openForWrite( FlowProcess<? extends Config> flowProcess ) throws IOException 319 { 320 if( isReplace() ) 321 deleteResource( flowProcess ); 322 323 return openForWrite( flowProcess, null ); 324 } 325 326 @Override 327 public Scope outgoingScopeFor( Set<Scope> incomingScopes ) 328 { 329 // as a source Tap, we emit the scheme defined Fields 330 // as a sink Tap, we declare we emit the incoming Fields 331 // as a temp Tap, this method never gets called, but we emit what we consume 332 int count = 0; 333 for( Scope incomingScope : incomingScopes ) 334 { 335 Fields incomingFields = incomingScope.getIncomingTapFields(); 336 337 if( incomingFields != null ) 338 { 339 try 340 { 341 incomingFields.select( getSinkFields() ); 342 } 343 catch( FieldsResolverException exception ) 344 { 345 throw new TapException( this, exception.getSourceFields(), exception.getSelectorFields(), exception ); 346 } 347 348 count++; 349 } 350 } 351 352 if( count > 1 ) 353 throw new FlowException( "Tap may not have more than one incoming Scope" ); 354 355 // this allows the incoming to be passed through to the outgoing 356 Fields incomingFields = incomingScopes.size() == 0 ? null : incomingScopes.iterator().next().getIncomingTapFields(); 357 358 if( incomingFields != null && 359 ( isSource() && getSourceFields().equals( Fields.UNKNOWN ) || 360 isSink() && getSinkFields().equals( Fields.ALL ) ) ) 361 return new Scope( incomingFields ); 362 363 if( count == 1 ) 364 return new Scope( getSinkFields() ); 365 366 return new Scope( getSourceFields() ); 367 } 368 369 /** 370 * A hook for allowing a Scheme to lazily retrieve its source fields. 371 * 372 * @param flowProcess of type FlowProcess 373 * @return the found Fields 374 */ 375 public Fields retrieveSourceFields( FlowProcess<? extends Config> flowProcess ) 376 { 377 return getScheme().retrieveSourceFields( flowProcess, this ); 378 } 379 380 public void presentSourceFields( FlowProcess<? extends Config> flowProcess, Fields fields ) 381 { 382 getScheme().presentSourceFields( flowProcess, this, fields ); 383 } 384 385 /** 386 * A hook for allowing a Scheme to lazily retrieve its sink fields. 387 * 388 * @param flowProcess of type FlowProcess 389 * @return the found Fields 390 */ 391 public Fields retrieveSinkFields( FlowProcess<? extends Config> flowProcess ) 392 { 393 return getScheme().retrieveSinkFields( flowProcess, this ); 394 } 395 396 public void presentSinkFields( FlowProcess<? extends Config> flowProcess, Fields fields ) 397 { 398 getScheme().presentSinkFields( flowProcess, this, fields ); 399 } 400 401 @Override 402 public Fields resolveIncomingOperationArgumentFields( Scope incomingScope ) 403 { 404 return incomingScope.getIncomingTapFields(); 405 } 406 407 @Override 408 public Fields resolveIncomingOperationPassThroughFields( Scope incomingScope ) 409 { 410 return incomingScope.getIncomingTapFields(); 411 } 412 413 /** 414 * Method getFullIdentifier returns a fully qualified resource identifier. 415 * 416 * @param flowProcess of type FlowProcess 417 * @return String 418 */ 419 public String getFullIdentifier( FlowProcess<? extends Config> flowProcess ) 420 { 421 return getFullIdentifier( flowProcess.getConfig() ); 422 } 423 424 /** 425 * Method getFullIdentifier returns a fully qualified resource identifier. 426 * 427 * @param conf of type Config 428 * @return String 429 */ 430 public String getFullIdentifier( Config conf ) 431 { 432 return getIdentifier(); 433 } 434 435 /** 436 * Method createResource creates the underlying resource. 437 * 438 * @param flowProcess of type FlowProcess 439 * @return boolean 440 * @throws IOException when there is an error making directories 441 */ 442 public boolean createResource( FlowProcess<? extends Config> flowProcess ) throws IOException 443 { 444 return createResource( flowProcess.getConfig() ); 445 } 446 447 /** 448 * Method createResource creates the underlying resource. 449 * 450 * @param conf of type Config 451 * @return boolean 452 * @throws IOException when there is an error making directories 453 */ 454 public abstract boolean createResource( Config conf ) throws IOException; 455 456 /** 457 * Method deleteResource deletes the resource represented by this instance. 458 * 459 * @param flowProcess of type FlowProcess 460 * @return boolean 461 * @throws IOException when the resource cannot be deleted 462 */ 463 public boolean deleteResource( FlowProcess<? extends Config> flowProcess ) throws IOException 464 { 465 return deleteResource( flowProcess.getConfig() ); 466 } 467 468 /** 469 * Method deleteResource deletes the resource represented by this instance. 470 * 471 * @param conf of type Config 472 * @return boolean 473 * @throws IOException when the resource cannot be deleted 474 */ 475 public abstract boolean deleteResource( Config conf ) throws IOException; 476 477 /** 478 * Method prepareResourceForRead allows the underlying resource to be notified when reading will begin. 479 * <p> 480 * This method will be called client side so that any remote or external resources can be initialized. 481 * <p> 482 * If this method returns {@code false}, an exception will be thrown halting the current Flow. 483 * <p> 484 * In most cases, resource initialization should happen in the {@link #openForRead(FlowProcess, Object)} method. 485 * <p> 486 * This allows for initialization of cluster side resources, like a JDBC driver used to read data from a database, 487 * that cannot be passed client to cluster. 488 * 489 * @param conf of type Config 490 * @return returns true if successful 491 * @throws IOException 492 */ 493 public boolean prepareResourceForRead( Config conf ) throws IOException 494 { 495 return true; 496 } 497 498 /** 499 * Method prepareResourceForWrite allows the underlying resource to be notified when writing will begin. 500 * <p> 501 * This method will be called once client side so that any remote or external resources can be initialized. 502 * <p> 503 * If this method returns {@code false}, an exception will be thrown halting the current Flow. 504 * <p> 505 * In most cases, resource initialization should happen in the {@link #openForWrite(FlowProcess, Object)} method. 506 * <p> 507 * This allows for initialization of cluster side resources, like a JDBC driver used to write data to a database, 508 * that cannot be passed client to cluster. 509 * <p> 510 * In the above JDBC example, overriding this method will allow for testing for the existence of and/or creating 511 * a remote table used by all individual cluster side tasks. 512 * 513 * @param conf of type Config 514 * @return returns true if successful 515 * @throws IOException 516 */ 517 public boolean prepareResourceForWrite( Config conf ) throws IOException 518 { 519 return true; 520 } 521 522 /** 523 * Method commitResource allows the underlying resource to be notified when all write processing is 524 * successful so that any additional cleanup or processing may be completed. 525 * <p> 526 * See {@link #rollbackResource(Object)} to handle cleanup in the face of failures. 527 * <p> 528 * This method is invoked once client side and not in the cluster, if any. 529 * <p> 530 * If other sink Tap instance in a given Flow fail on commitResource after called on this instance, 531 * rollbackResource will not be called. 532 * 533 * @param conf of type Config 534 * @return returns true if successful 535 * @throws IOException 536 */ 537 public boolean commitResource( Config conf ) throws IOException 538 { 539 return true; 540 } 541 542 /** 543 * Method rollbackResource allows the underlying resource to be notified when any write processing has failed or 544 * was stopped so that any cleanup may be started. 545 * <p> 546 * See {@link #commitResource(Object)} to handle cleanup when the write has successfully completed. 547 * <p> 548 * This method is invoked once client side and not in the cluster, if any. 549 * 550 * @param conf of type Config 551 * @return returns true if successful 552 * @throws IOException 553 */ 554 public boolean rollbackResource( Config conf ) throws IOException 555 { 556 return true; 557 } 558 559 /** 560 * Method resourceExists returns true if the path represented by this instance exists. 561 * 562 * @param flowProcess of type FlowProcess 563 * @return true if the underlying resource already exists 564 * @throws IOException when the status cannot be determined 565 */ 566 public boolean resourceExists( FlowProcess<? extends Config> flowProcess ) throws IOException 567 { 568 return resourceExists( flowProcess.getConfig() ); 569 } 570 571 /** 572 * Method resourceExists returns true if the path represented by this instance exists. 573 * 574 * @param conf of type Config 575 * @return true if the underlying resource already exists 576 * @throws IOException when the status cannot be determined 577 */ 578 public abstract boolean resourceExists( Config conf ) throws IOException; 579 580 /** 581 * Method getModifiedTime returns the date this resource was last modified. 582 * <p> 583 * If the resource does not exist, returns zero (0). 584 * <p> 585 * If the resource is continuous, returns {@link Long#MAX_VALUE}. 586 * 587 * @param flowProcess of type FlowProcess 588 * @return The date this resource was last modified. 589 * @throws IOException 590 */ 591 public long getModifiedTime( FlowProcess<? extends Config> flowProcess ) throws IOException 592 { 593 return getModifiedTime( flowProcess.getConfig() ); 594 } 595 596 /** 597 * Method getModifiedTime returns the date this resource was last modified. 598 * <p> 599 * If the resource does not exist, returns zero (0). 600 * <p> 601 * If the resource is continuous, returns {@link Long#MAX_VALUE}. 602 * 603 * @param conf of type Config 604 * @return The date this resource was last modified. 605 * @throws IOException 606 */ 607 public abstract long getModifiedTime( Config conf ) throws IOException; 608 609 /** 610 * Method getSinkMode returns the {@link SinkMode} }of this Tap object. 611 * 612 * @return the sinkMode (type SinkMode) of this Tap object. 613 */ 614 public SinkMode getSinkMode() 615 { 616 return sinkMode; 617 } 618 619 /** 620 * Method isKeep indicates whether the resource represented by this instance should be kept if it 621 * already exists when the Flow is started. 622 * 623 * @return boolean 624 */ 625 public boolean isKeep() 626 { 627 return sinkMode == SinkMode.KEEP; 628 } 629 630 /** 631 * Method isReplace indicates whether the resource represented by this instance should be deleted if it 632 * already exists when the Flow is started. 633 * 634 * @return boolean 635 */ 636 public boolean isReplace() 637 { 638 return sinkMode == SinkMode.REPLACE; 639 } 640 641 /** 642 * Method isUpdate indicates whether the resource represented by this instance should be updated if it already 643 * exists. Otherwise a new resource will be created, via {@link #createResource(Object)}, when the Flow is started. 644 * 645 * @return boolean 646 */ 647 public boolean isUpdate() 648 { 649 return sinkMode == SinkMode.UPDATE; 650 } 651 652 /** 653 * Method isSink returns true if this Tap instance can be used as a sink. 654 * 655 * @return boolean 656 */ 657 public boolean isSink() 658 { 659 return getScheme().isSink(); 660 } 661 662 /** 663 * Method isSource returns true if this Tap instance can be used as a source. 664 * 665 * @return boolean 666 */ 667 public boolean isSource() 668 { 669 return getScheme().isSource(); 670 } 671 672 /** 673 * Method isTemporary returns true if this Tap is temporary (used for intermediate results). 674 * 675 * @return the temporary (type boolean) of this Tap object. 676 */ 677 public boolean isTemporary() 678 { 679 return false; 680 } 681 682 /** 683 * Returns a {@link cascading.property.ConfigDef} instance that allows for local properties to be set and made available via 684 * a resulting {@link cascading.flow.FlowProcess} instance when the tap is invoked. 685 * <p> 686 * Any properties set on the configDef will not show up in any {@link Flow} or {@link cascading.flow.FlowStep} process 687 * level configuration, but will override any of those values as seen by the current Tap instance method call where a 688 * FlowProcess is provided except for the {@link #sourceConfInit(cascading.flow.FlowProcess, Object)} and 689 * {@link #sinkConfInit(cascading.flow.FlowProcess, Object)} methods. 690 * <p> 691 * That is, the {@code *confInit} methods are called before any ConfigDef is applied, so any values placed into 692 * a ConfigDef instance will not be visible to them. 693 * 694 * @return an instance of ConfigDef 695 */ 696 public ConfigDef getConfigDef() 697 { 698 if( configDef == null ) 699 configDef = new ConfigDef(); 700 701 return configDef; 702 } 703 704 /** 705 * Returns {@code true} if there are properties in the configDef instance. 706 * 707 * @return true if there are configDef properties 708 */ 709 public boolean hasConfigDef() 710 { 711 return configDef != null && !configDef.isEmpty(); 712 } 713 714 /** 715 * Returns a {@link ConfigDef} instance that allows for process level properties to be set and made available via 716 * a resulting {@link cascading.flow.FlowProcess} instance when the tap is invoked. 717 * <p> 718 * Any properties set on the nodeConfigDef will not show up in any Flow configuration, but will show up in 719 * the current process {@link cascading.flow.FlowNode} (in Apache Tez the Vertex configuration). Any value set in the 720 * nodeConfigDef will be overridden by the pipe local {@code #getConfigDef} instance. 721 * <p> 722 * Use this method to tweak properties in the process node this tap instance is planned into. 723 * 724 * @return an instance of ConfigDef 725 */ 726 @Override 727 public ConfigDef getNodeConfigDef() 728 { 729 if( nodeConfigDef == null ) 730 nodeConfigDef = new ConfigDef(); 731 732 return nodeConfigDef; 733 } 734 735 /** 736 * Returns {@code true} if there are properties in the nodeConfigDef instance. 737 * 738 * @return true if there are nodeConfigDef properties 739 */ 740 @Override 741 public boolean hasNodeConfigDef() 742 { 743 return nodeConfigDef != null && !nodeConfigDef.isEmpty(); 744 } 745 746 /** 747 * Returns a {@link ConfigDef} instance that allows for process level properties to be set and made available via 748 * a resulting {@link cascading.flow.FlowProcess} instance when the tap is invoked. 749 * <p> 750 * Any properties set on the stepConfigDef will not show up in any Flow configuration, but will show up in 751 * the current process {@link cascading.flow.FlowStep} (in Hadoop the MapReduce jobconf). Any value set in the 752 * stepConfigDef will be overridden by the tap local {@code #getConfigDef} instance. 753 * <p> 754 * Use this method to tweak properties in the process step this tap instance is planned into. 755 * <p> 756 * Note the {@code *confInit} methods are called before any ConfigDef is applied, so any values placed into 757 * a ConfigDef instance will not be visible to them. 758 * 759 * @return an instance of ConfigDef 760 */ 761 @Override 762 public ConfigDef getStepConfigDef() 763 { 764 if( stepConfigDef == null ) 765 stepConfigDef = new ConfigDef(); 766 767 return stepConfigDef; 768 } 769 770 /** 771 * Returns {@code true} if there are properties in the stepConfigDef instance. 772 * 773 * @return true if there are stepConfigDef properties 774 */ 775 @Override 776 public boolean hasStepConfigDef() 777 { 778 return stepConfigDef != null && !stepConfigDef.isEmpty(); 779 } 780 781 public Spliterator<TupleEntry> spliterator( FlowProcess<? extends Config> flowProcess ) 782 { 783 return splititerator( openForReadUnchecked( flowProcess ) ); 784 } 785 786 protected TupleEntryIterator openForReadUnchecked( FlowProcess<? extends Config> flowProcess ) 787 { 788 try 789 { 790 return openForRead( flowProcess ); 791 } 792 catch( IOException exception ) 793 { 794 throw new UncheckedIOException( exception ); 795 } 796 } 797 798 protected Spliterator<TupleEntry> splititerator( TupleEntryIterator iterator ) 799 { 800 return Spliterators.spliteratorUnknownSize( iterator, 0 ); 801 } 802 803 /** 804 * Method entryStream returns a {@link Stream} of {@link TupleEntry} instances from the given 805 * Tap instance. 806 * <p> 807 * Also see {@link cascading.tuple.TupleEntryStream#entryStream(Tap, FlowProcess)}. 808 * <p> 809 * Note, the returned Stream instance must be closed in order to clean up underlying resources. This 810 * is simply accomplished with a try-with-resources statement. 811 * 812 * @param flowProcess represents the current platform configuration 813 * @return a Stream of TupleEntry instances 814 */ 815 public Stream<TupleEntry> entryStream( FlowProcess<? extends Config> flowProcess ) 816 { 817 TupleEntryIterator iterator = openForReadUnchecked( flowProcess ); 818 Spliterator<TupleEntry> spliterator = splititerator( iterator ); 819 820 try 821 { 822 return StreamSupport 823 .stream( spliterator, false ) 824 .onClose( asUncheckedRunnable( iterator ) ); 825 } 826 catch( Error | RuntimeException error ) 827 { 828 try 829 { 830 iterator.close(); 831 } 832 catch( IOException exception ) 833 { 834 try 835 { 836 error.addSuppressed( exception ); 837 } 838 catch( Throwable ignore ){} 839 } 840 841 throw error; 842 } 843 } 844 845 /** 846 * Method entryStreamCopy returns a {@link Stream} of {@link TupleEntry} instances from the given 847 * Tap instance. 848 * <p> 849 * This method returns an TupleEntry instance suitable for caching. 850 * <p> 851 * Also see {@link cascading.tuple.TupleEntryStream#entryStreamCopy(Tap, FlowProcess)}. 852 * <p> 853 * Note, the returned Stream instance must be closed in order to clean up underlying resources. This 854 * is simply accomplished with a try-with-resources statement. 855 * 856 * @param flowProcess represents the current platform configuration 857 * @return a Stream of TupleEntry instances 858 */ 859 public Stream<TupleEntry> entryStreamCopy( FlowProcess<? extends Config> flowProcess ) 860 { 861 return entryStream( flowProcess ).map( TupleEntry::new ); 862 } 863 864 /** 865 * Method entryStream returns a {@link Stream} of {@link TupleEntry} instances from the given 866 * Tap instance. 867 * <p> 868 * Also see {@link cascading.tuple.TupleEntryStream#entryStream(Tap, FlowProcess, Fields)}. 869 * <p> 870 * Note, the returned Stream instance must be closed in order to clean up underlying resources. This 871 * is simply accomplished with a try-with-resources statement. 872 * 873 * @param flowProcess represents the current platform configuration 874 * @param selector the fields to select from the underlying TupleEntry 875 * @return a Stream of TupleEntry instances 876 */ 877 public Stream<TupleEntry> entryStream( FlowProcess<? extends Config> flowProcess, Fields selector ) 878 { 879 return entryStream( flowProcess ).map( tupleEntry -> tupleEntry.selectEntry( selector ) ); 880 } 881 882 /** 883 * Method entryStreamCopy returns a {@link Stream} of {@link TupleEntry} instances from the given 884 * Tap instance. 885 * <p> 886 * Also see {@link cascading.tuple.TupleEntryStream#entryStreamCopy(Tap, FlowProcess)}. 887 * <p> 888 * Note, the returned Stream instance must be closed in order to clean up underlying resources. This 889 * is simply accomplished with a try-with-resources statement. 890 * 891 * @param flowProcess represents the current platform configuration 892 * @param selector the fields to select from the underlying TupleEntry 893 * @return a Stream of TupleEntry instances 894 */ 895 public Stream<TupleEntry> entryStreamCopy( FlowProcess<? extends Config> flowProcess, Fields selector ) 896 { 897 return entryStream( flowProcess ).map( tupleEntry -> tupleEntry.selectEntryCopy( selector ) ); 898 } 899 900 /** 901 * Method tupleStream returns a {@link Stream} of {@link Tuple} instances from the given 902 * Tap instance. 903 * <p> 904 * Also see {@link cascading.tuple.TupleStream#tupleStream(Tap, FlowProcess)}. 905 * 906 * @param flowProcess represents the current platform configuration 907 * @return a Stream of Tuple instances 908 */ 909 public Stream<Tuple> tupleStream( FlowProcess<? extends Config> flowProcess ) 910 { 911 return entryStream( flowProcess ).map( TupleEntry::getTuple ); 912 } 913 914 /** 915 * Method tupleStreamCopy returns a {@link Stream} of {@link Tuple} instances from the given 916 * Tap instance. 917 * <p> 918 * This method returns an Tuple instance suitable for caching. 919 * <p> 920 * Also see {@link cascading.tuple.TupleStream#tupleStreamCopy(Tap, FlowProcess)}. 921 * 922 * @param flowProcess represents the current platform configuration 923 * @return a Stream of Tuple instances 924 */ 925 public Stream<Tuple> tupleStreamCopy( FlowProcess<? extends Config> flowProcess ) 926 { 927 return entryStream( flowProcess ).map( TupleEntry::getTupleCopy ); 928 } 929 930 /** 931 * Method tupleStream returns a {@link Stream} of {@link Tuple} instances from the given 932 * Tap instance. 933 * <p> 934 * Also see {@link cascading.tuple.TupleStream#tupleStream(Tap, FlowProcess, Fields)}. 935 * 936 * @param flowProcess represents the current platform configuration 937 * @param selector the fields to select from the underlying Tuple 938 * @return a Stream of TupleE instances 939 */ 940 public Stream<Tuple> tupleStream( FlowProcess<? extends Config> flowProcess, Fields selector ) 941 { 942 return entryStream( flowProcess ).map( tupleEntry -> tupleEntry.selectTuple( selector ) ); 943 } 944 945 /** 946 * Method tupleStreamCopy returns a {@link Stream} of {@link Tuple} instances from the given 947 * Tap instance. 948 * <p> 949 * This method returns an Tuple instance suitable for caching. 950 * <p> 951 * Also see {@link cascading.tuple.TupleStream#tupleStreamCopy(Tap, FlowProcess)}. 952 * 953 * @param flowProcess represents the current platform configuration 954 * @param selector the fields to select from the underlying Tuple 955 * @return a Stream of TupleE instances 956 */ 957 public Stream<Tuple> tupleStreamCopy( FlowProcess<? extends Config> flowProcess, Fields selector ) 958 { 959 return entryStream( flowProcess ).map( tupleEntry -> tupleEntry.selectTupleCopy( selector ) ); 960 } 961 962 private static Runnable asUncheckedRunnable( Closeable closeable ) 963 { 964 return () -> 965 { 966 try 967 { 968 closeable.close(); 969 } 970 catch( IOException exception ) 971 { 972 throw new UncheckedIOException( exception ); 973 } 974 }; 975 } 976 977 @Override 978 public boolean equals( Object object ) 979 { 980 if( this == object ) 981 return true; 982 if( object == null || getClass() != object.getClass() ) 983 return false; 984 985 Tap tap = (Tap) object; 986 987 if( getIdentifier() != null ? !getIdentifier().equals( tap.getIdentifier() ) : tap.getIdentifier() != null ) 988 return false; 989 990 if( getScheme() != null ? !getScheme().equals( tap.getScheme() ) : tap.getScheme() != null ) 991 return false; 992 993 return true; 994 } 995 996 @Override 997 public int hashCode() 998 { 999 int result = getIdentifier() != null ? getIdentifier().hashCode() : 0; 1000 1001 result = 31 * result + ( getScheme() != null ? getScheme().hashCode() : 0 ); 1002 1003 return result; 1004 } 1005 1006 @Override 1007 public String toString() 1008 { 1009 if( getIdentifier() != null ) 1010 return getClass().getSimpleName() + "[\"" + getScheme() + "\"]" + "[\"" + Util.sanitizeUrl( getIdentifier() ) + "\"]"; // sanitize 1011 else 1012 return getClass().getSimpleName() + "[\"" + getScheme() + "\"]" + "[not initialized]"; 1013 } 1014 }