001 /* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.tap; 022 023 import java.io.IOException; 024 import java.io.Serializable; 025 import java.util.Set; 026 027 import cascading.flow.Flow; 028 import cascading.flow.FlowElement; 029 import cascading.flow.FlowException; 030 import cascading.flow.FlowProcess; 031 import cascading.flow.planner.Scope; 032 import cascading.management.annotation.Property; 033 import cascading.management.annotation.PropertyDescription; 034 import cascading.management.annotation.PropertySanitizer; 035 import cascading.management.annotation.Visibility; 036 import cascading.pipe.Pipe; 037 import cascading.property.ConfigDef; 038 import cascading.scheme.Scheme; 039 import cascading.tuple.Fields; 040 import cascading.tuple.FieldsResolverException; 041 import cascading.tuple.Tuple; 042 import cascading.tuple.TupleEntryCollector; 043 import cascading.tuple.TupleEntryIterator; 044 import cascading.util.TraceUtil; 045 import cascading.util.Traceable; 046 import cascading.util.Util; 047 048 /** 049 * A Tap represents the physical data source or sink in a connected {@link cascading.flow.Flow}. 050 * </p> 051 * That is, a source Tap is the head end of a connected {@link Pipe} and {@link Tuple} stream, and 052 * a sink Tap is the tail end. Kinds of Tap types are used to manage files from a local disk, 053 * distributed disk, remote storage like Amazon S3, or via FTP. It simply abstracts 054 * out the complexity of connecting to these types of data sources. 055 * <p/> 056 * A Tap takes a {@link Scheme} instance, which is used to identify the type of resource (text file, binary file, etc). 057 * A Tap is responsible for how the resource is reached. 058 * <p/> 059 * By default when planning a Flow, Tap equality is a function of the {@link #getIdentifier()} and {@link #getScheme()} 060 * values. That is, two Tap instances are the same Tap instance if they sink/source the same resource and sink/source 061 * the same fields. 062 * <p/> 063 * Some more advanced taps, like a database tap, may need to extend equality to include any filtering, like the 064 * {@code where} clause in a SQL statement so two taps reading from the same SQL table aren't considered equal. 065 * <p/> 066 * Taps are also used to determine dependencies between two or more {@link Flow} instances when used with a 067 * {@link cascading.cascade.Cascade}. In that case the {@link #getFullIdentifier(Object)} value is used and the Scheme 068 * is ignored. 069 */ 070 public abstract class Tap<Config, Input, Output> implements FlowElement, Serializable, Traceable 071 { 072 /** Field scheme */ 073 private Scheme<Config, Input, Output, ?, ?> scheme; 074 075 /** Field mode */ 076 SinkMode sinkMode = SinkMode.KEEP; 077 078 private ConfigDef configDef; 079 080 private ConfigDef processConfigDef; 081 082 /** Field id */ 083 private final String id = Util.createUniqueID(); // 3.0 planner relies on this being consistent 084 /** Field trace */ 085 private String trace = TraceUtil.captureDebugTrace( this ); // see TraceUtil.setTrace() to override 086 087 /** 088 * Convenience function to make an array of Tap instances. 089 * 090 * @param taps of type Tap 091 * @return Tap array 092 */ 093 public static Tap[] taps( Tap... taps ) 094 { 095 return taps; 096 } 097 098 /** 099 * Creates and returns a unique ID for the given Tap, this value is cached and may be used to uniquely identify 100 * the Tap instance in properties files etc. 101 * <p/> 102 * This value is generally reproducible assuming the Tap identifier and the Scheme source and sink Fields remain consistent. 103 * 104 * @param tap of type Tap 105 * @return of type String 106 */ 107 public static synchronized String id( Tap tap ) 108 { 109 if( tap instanceof DecoratorTap ) 110 return id( ( (DecoratorTap) tap ).getOriginal() ); 111 112 return tap.id; 113 } 114 115 protected Tap() 116 { 117 } 118 119 protected Tap( Scheme<Config, Input, Output, ?, ?> scheme ) 120 { 121 this.setScheme( scheme ); 122 } 123 124 protected Tap( Scheme<Config, Input, Output, ?, ?> scheme, SinkMode sinkMode ) 125 { 126 this.setScheme( scheme ); 127 this.sinkMode = sinkMode; 128 } 129 130 protected void setScheme( Scheme<Config, Input, Output, ?, ?> scheme ) 131 { 132 this.scheme = scheme; 133 } 134 135 /** 136 * Method getScheme returns the scheme of this Tap object. 137 * 138 * @return the scheme (type Scheme) of this Tap object. 139 */ 140 public Scheme<Config, Input, Output, ?, ?> getScheme() 141 { 142 return scheme; 143 } 144 145 @Override 146 public String getTrace() 147 { 148 return trace; 149 } 150 151 /** 152 * Method flowInit allows this Tap instance to initialize itself in context of the given {@link cascading.flow.Flow} instance. 153 * This method is guaranteed to be called before the Flow is started and the 154 * {@link cascading.flow.FlowListener#onStarting(cascading.flow.Flow)} event is fired. 155 * <p/> 156 * This method will be called once per Flow, and before {@link #sourceConfInit(cascading.flow.FlowProcess, Object)} and 157 * {@link #sinkConfInit(cascading.flow.FlowProcess, Object)} methods. 158 * 159 * @param flow of type Flow 160 */ 161 public void flowConfInit( Flow<Config> flow ) 162 { 163 164 } 165 166 /** 167 * Method sourceConfInit initializes this instance as a source. 168 * <p/> 169 * This method maybe called more than once if this Tap instance is used outside the scope of a {@link cascading.flow.Flow} 170 * instance or if it participates in multiple times in a given Flow or across different Flows in 171 * a {@link cascading.cascade.Cascade}. 172 * <p/> 173 * In the context of a Flow, it will be called after 174 * {@link cascading.flow.FlowListener#onStarting(cascading.flow.Flow)} 175 * <p/> 176 * Note that no resources or services should be modified by this method. 177 * 178 * @param flowProcess of type FlowProcess 179 * @param conf of type Config 180 */ 181 public void sourceConfInit( FlowProcess<Config> flowProcess, Config conf ) 182 { 183 getScheme().sourceConfInit( flowProcess, this, conf ); 184 } 185 186 /** 187 * Method sinkConfInit initializes this instance as a sink. 188 * <p/> 189 * This method maybe called more than once if this Tap instance is used outside the scope of a {@link cascading.flow.Flow} 190 * instance or if it participates in multiple times in a given Flow or across different Flows in 191 * a {@link cascading.cascade.Cascade}. 192 * <p/> 193 * Note this method will be called in context of this Tap being used as a traditional 'sink' and as a 'trap'. 194 * <p/> 195 * In the context of a Flow, it will be called after 196 * {@link cascading.flow.FlowListener#onStarting(cascading.flow.Flow)} 197 * <p/> 198 * Note that no resources or services should be modified by this method. If this Tap instance returns true for 199 * {@link #isReplace()}, then {@link #deleteResource(Object)} will be called by the parent Flow. 200 * 201 * @param flowProcess of type FlowProcess 202 * @param conf of type Config 203 */ 204 public void sinkConfInit( FlowProcess<Config> flowProcess, Config conf ) 205 { 206 getScheme().sinkConfInit( flowProcess, this, conf ); 207 } 208 209 /** 210 * Method getIdentifier returns a String representing the resource this Tap instance represents. 211 * <p/> 212 * Often, if the tap accesses a filesystem, the identifier is nothing more than the path to the file or directory. 213 * In other cases it may be a an URL or URI representing a connection string or remote resource. 214 * <p/> 215 * Any two Tap instances having the same value for the identifier are considered equal. 216 * 217 * @return String 218 */ 219 @Property(name = "identifier", visibility = Visibility.PUBLIC) 220 @PropertyDescription("The resource this Tap instance represents") 221 @PropertySanitizer("cascading.management.annotation.URISanitizer") 222 public abstract String getIdentifier(); 223 224 /** 225 * Method getSourceFields returns the sourceFields of this Tap object. 226 * 227 * @return the sourceFields (type Fields) of this Tap object. 228 */ 229 public Fields getSourceFields() 230 { 231 return getScheme().getSourceFields(); 232 } 233 234 /** 235 * Method getSinkFields returns the sinkFields of this Tap object. 236 * 237 * @return the sinkFields (type Fields) of this Tap object. 238 */ 239 public Fields getSinkFields() 240 { 241 return getScheme().getSinkFields(); 242 } 243 244 /** 245 * Method openForRead opens the resource represented by this Tap instance for reading. 246 * <p/> 247 * {@code input} value may be null, if so, sub-classes must inquire with the underlying {@link Scheme} 248 * via {@link Scheme#sourceConfInit(cascading.flow.FlowProcess, Tap, Object)} to get the proper 249 * input type and instantiate it before calling {@code super.openForRead()}. 250 * <p/> 251 * Note the returned iterator will return the same instance of {@link cascading.tuple.TupleEntry} on every call, 252 * thus a copy must be made of either the TupleEntry or the underlying {@code Tuple} instance if they are to be 253 * stored in a Collection. 254 * 255 * @param flowProcess of type FlowProcess 256 * @param input of type Input 257 * @return TupleEntryIterator 258 * @throws java.io.IOException when the resource cannot be opened 259 */ 260 public abstract TupleEntryIterator openForRead( FlowProcess<Config> flowProcess, Input input ) throws IOException; 261 262 /** 263 * Method openForRead opens the resource represented by this Tap instance for reading. 264 * <p/> 265 * Note the returned iterator will return the same instance of {@link cascading.tuple.TupleEntry} on every call, 266 * thus a copy must be made of either the TupleEntry or the underlying {@code Tuple} instance if they are to be 267 * stored in a Collection. 268 * 269 * @param flowProcess of type FlowProcess 270 * @return TupleEntryIterator 271 * @throws java.io.IOException when the resource cannot be opened 272 */ 273 public TupleEntryIterator openForRead( FlowProcess<Config> flowProcess ) throws IOException 274 { 275 return openForRead( flowProcess, null ); 276 } 277 278 /** 279 * Method openForWrite opens the resource represented by this Tap instance for writing. 280 * <p/> 281 * This method is used internally and does not honor the {@link SinkMode} setting. If SinkMode is 282 * {@link SinkMode#REPLACE}, this call may fail. See {@link #openForWrite(cascading.flow.FlowProcess)}. 283 * <p/> 284 * {@code output} value may be null, if so, sub-classes must inquire with the underlying {@link Scheme} 285 * via {@link Scheme#sinkConfInit(cascading.flow.FlowProcess, Tap, Object)} to get the proper 286 * output type and instantiate it before calling {@code super.openForWrite()}. 287 * 288 * @param flowProcess of type FlowProcess 289 * @param output of type Output 290 * @return TupleEntryCollector 291 * @throws java.io.IOException when the resource cannot be opened 292 */ 293 public abstract TupleEntryCollector openForWrite( FlowProcess<Config> flowProcess, Output output ) throws IOException; 294 295 /** 296 * Method openForWrite opens the resource represented by this Tap instance for writing. 297 * <p/> 298 * This method is for user application use and does honor the {@link SinkMode#REPLACE} settings. That is, if 299 * SinkMode is set to {@link SinkMode#REPLACE} the underlying resource will be deleted. 300 * <p/> 301 * Note if {@link SinkMode#UPDATE} is set, the resource will not be deleted. 302 * 303 * @param flowProcess of type FlowProcess 304 * @return TupleEntryCollector 305 * @throws java.io.IOException when the resource cannot be opened 306 */ 307 public TupleEntryCollector openForWrite( FlowProcess<Config> flowProcess ) throws IOException 308 { 309 if( isReplace() ) 310 deleteResource( flowProcess.getConfigCopy() ); 311 312 return openForWrite( flowProcess, null ); 313 } 314 315 @Override 316 public Scope outgoingScopeFor( Set<Scope> incomingScopes ) 317 { 318 // as a source Tap, we emit the scheme defined Fields 319 // as a sink Tap, we declare we emit the incoming Fields 320 // as a temp Tap, this method never gets called, but we emit what we consume 321 int count = 0; 322 for( Scope incomingScope : incomingScopes ) 323 { 324 Fields incomingFields = incomingScope.getIncomingTapFields(); 325 326 if( incomingFields != null ) 327 { 328 try 329 { 330 incomingFields.select( getSinkFields() ); 331 } 332 catch( FieldsResolverException exception ) 333 { 334 throw new TapException( this, exception.getSourceFields(), exception.getSelectorFields(), exception ); 335 } 336 337 count++; 338 } 339 } 340 341 if( count > 1 ) 342 throw new FlowException( "Tap may not have more than one incoming Scope" ); 343 344 // this allows the incoming to be passed through to the outgoing 345 Fields incomingFields = incomingScopes.size() == 0 ? null : incomingScopes.iterator().next().getIncomingTapFields(); 346 347 if( incomingFields != null && 348 ( isSource() && getSourceFields().equals( Fields.UNKNOWN ) || 349 isSink() && getSinkFields().equals( Fields.ALL ) ) ) 350 return new Scope( incomingFields ); 351 352 if( count == 1 ) 353 return new Scope( getSinkFields() ); 354 355 return new Scope( getSourceFields() ); 356 } 357 358 /** 359 * A hook for allowing a Scheme to lazily retrieve its source fields. 360 * 361 * @param flowProcess of type FlowProcess 362 * @return the found Fields 363 */ 364 public Fields retrieveSourceFields( FlowProcess<Config> flowProcess ) 365 { 366 return getScheme().retrieveSourceFields( flowProcess, this ); 367 } 368 369 public void presentSourceFields( FlowProcess<Config> flowProcess, Fields fields ) 370 { 371 getScheme().presentSourceFields( flowProcess, this, fields ); 372 } 373 374 /** 375 * A hook for allowing a Scheme to lazily retrieve its sink fields. 376 * 377 * @param flowProcess of type FlowProcess 378 * @return the found Fields 379 */ 380 public Fields retrieveSinkFields( FlowProcess<Config> flowProcess ) 381 { 382 return getScheme().retrieveSinkFields( flowProcess, this ); 383 } 384 385 public void presentSinkFields( FlowProcess<Config> flowProcess, Fields fields ) 386 { 387 getScheme().presentSinkFields( flowProcess, this, fields ); 388 } 389 390 @Override 391 public Fields resolveIncomingOperationArgumentFields( Scope incomingScope ) 392 { 393 return incomingScope.getIncomingTapFields(); 394 } 395 396 @Override 397 public Fields resolveIncomingOperationPassThroughFields( Scope incomingScope ) 398 { 399 return incomingScope.getIncomingTapFields(); 400 } 401 402 /** 403 * Method getFullIdentifier returns a fully qualified resource identifier. 404 * 405 * @param flowProcess of type FlowProcess 406 * @return String 407 */ 408 public String getFullIdentifier( FlowProcess<Config> flowProcess ) 409 { 410 return getFullIdentifier( flowProcess.getConfigCopy() ); 411 } 412 413 /** 414 * Method getFullIdentifier returns a fully qualified resource identifier. 415 * 416 * @param conf of type Config 417 * @return String 418 */ 419 public String getFullIdentifier( Config conf ) 420 { 421 return getIdentifier(); 422 } 423 424 /** 425 * Method createResource creates the underlying resource. 426 * 427 * @param flowProcess of type FlowProcess 428 * @return boolean 429 * @throws IOException when there is an error making directories 430 */ 431 public boolean createResource( FlowProcess<Config> flowProcess ) throws IOException 432 { 433 return createResource( flowProcess.getConfigCopy() ); 434 } 435 436 /** 437 * Method createResource creates the underlying resource. 438 * 439 * @param conf of type Config 440 * @return boolean 441 * @throws IOException when there is an error making directories 442 */ 443 public abstract boolean createResource( Config conf ) throws IOException; 444 445 /** 446 * Method deleteResource deletes the resource represented by this instance. 447 * 448 * @param flowProcess of type FlowProcess 449 * @return boolean 450 * @throws IOException when the resource cannot be deleted 451 */ 452 public boolean deleteResource( FlowProcess<Config> flowProcess ) throws IOException 453 { 454 return deleteResource( flowProcess.getConfigCopy() ); 455 } 456 457 /** 458 * Method deleteResource deletes the resource represented by this instance. 459 * 460 * @param conf of type Config 461 * @return boolean 462 * @throws IOException when the resource cannot be deleted 463 */ 464 public abstract boolean deleteResource( Config conf ) throws IOException; 465 466 /** 467 * Method prepareResourceForRead allows the underlying resource to be notified when reading will begin. 468 * <p/> 469 * This method will be called client side so that any remote or external resources can be initialized. 470 * <p/> 471 * If this method returns {@code false}, an exception will be thrown halting the current Flow. 472 * <p/> 473 * In most cases, resource initialization should happen in the {@link #openForRead(FlowProcess, Object)} method. 474 * <p/> 475 * This allows for initialization of cluster side resources, like a JDBC driver used to read data from a database, 476 * that cannot be passed client to cluster. 477 * 478 * @param conf of type Config 479 * @return returns true if successful 480 * @throws IOException 481 */ 482 public boolean prepareResourceForRead( Config conf ) throws IOException 483 { 484 return true; 485 } 486 487 /** 488 * Method prepareResourceForWrite allows the underlying resource to be notified when writing will begin. 489 * <p/> 490 * This method will be called once client side so that any remote or external resources can be initialized. 491 * <p/> 492 * If this method returns {@code false}, an exception will be thrown halting the current Flow. 493 * <p/> 494 * In most cases, resource initialization should happen in the {@link #openForWrite(FlowProcess, Object)} method. 495 * <p/> 496 * This allows for initialization of cluster side resources, like a JDBC driver used to write data to a database, 497 * that cannot be passed client to cluster. 498 * <p/> 499 * In the above JDBC example, overriding this method will allow for testing for the existence of and/or creating 500 * a remote table used by all individual cluster side tasks. 501 * 502 * @param conf of type Config 503 * @return returns true if successful 504 * @throws IOException 505 */ 506 public boolean prepareResourceForWrite( Config conf ) throws IOException 507 { 508 return true; 509 } 510 511 /** 512 * Method commitResource allows the underlying resource to be notified when all write processing is 513 * successful so that any additional cleanup or processing may be completed. 514 * <p/> 515 * See {@link #rollbackResource(Object)} to handle cleanup in the face of failures. 516 * <p/> 517 * This method is invoked once client side and not in the cluster, if any. 518 * <p/> 519 * If other sink Tap instance in a given Flow fail on commitResource after called on this instance, 520 * rollbackResource will not be called. 521 * 522 * @param conf of type Config 523 * @return returns true if successful 524 * @throws IOException 525 */ 526 public boolean commitResource( Config conf ) throws IOException 527 { 528 return true; 529 } 530 531 /** 532 * Method rollbackResource allows the underlying resource to be notified when any write processing has failed or 533 * was stopped so that any cleanup may be started. 534 * <p/> 535 * See {@link #commitResource(Object)} to handle cleanup when the write has successfully completed. 536 * <p/> 537 * This method is invoked once client side and not in the cluster, if any. 538 * 539 * @param conf of type Config 540 * @return returns true if successful 541 * @throws IOException 542 */ 543 public boolean rollbackResource( Config conf ) throws IOException 544 { 545 return true; 546 } 547 548 /** 549 * Method resourceExists returns true if the path represented by this instance exists. 550 * 551 * @param flowProcess of type FlowProcess 552 * @return true if the underlying resource already exists 553 * @throws IOException when the status cannot be determined 554 */ 555 public boolean resourceExists( FlowProcess<Config> flowProcess ) throws IOException 556 { 557 return resourceExists( flowProcess.getConfigCopy() ); 558 } 559 560 /** 561 * Method resourceExists returns true if the path represented by this instance exists. 562 * 563 * @param conf of type Config 564 * @return true if the underlying resource already exists 565 * @throws IOException when the status cannot be determined 566 */ 567 public abstract boolean resourceExists( Config conf ) throws IOException; 568 569 /** 570 * Method getModifiedTime returns the date this resource was last modified. 571 * 572 * @param flowProcess of type FlowProcess 573 * @return The date this resource was last modified. 574 * @throws IOException 575 */ 576 public long getModifiedTime( FlowProcess<Config> flowProcess ) throws IOException 577 { 578 return getModifiedTime( flowProcess.getConfigCopy() ); 579 } 580 581 /** 582 * Method getModifiedTime returns the date this resource was last modified. 583 * 584 * @param conf of type Config 585 * @return The date this resource was last modified. 586 * @throws IOException 587 */ 588 public abstract long getModifiedTime( Config conf ) throws IOException; 589 590 /** 591 * Method getSinkMode returns the {@link SinkMode} }of this Tap object. 592 * 593 * @return the sinkMode (type SinkMode) of this Tap object. 594 */ 595 public SinkMode getSinkMode() 596 { 597 return sinkMode; 598 } 599 600 /** 601 * Method isKeep indicates whether the resource represented by this instance should be kept if it 602 * already exists when the Flow is started. 603 * 604 * @return boolean 605 */ 606 public boolean isKeep() 607 { 608 return sinkMode == SinkMode.KEEP; 609 } 610 611 /** 612 * Method isReplace indicates whether the resource represented by this instance should be deleted if it 613 * already exists when the Flow is started. 614 * 615 * @return boolean 616 */ 617 public boolean isReplace() 618 { 619 return sinkMode == SinkMode.REPLACE; 620 } 621 622 /** 623 * Method isUpdate indicates whether the resource represented by this instance should be updated if it already 624 * exists. Otherwise a new resource will be created, via {@link #createResource(Object)}, when the Flow is started. 625 * 626 * @return boolean 627 */ 628 public boolean isUpdate() 629 { 630 return sinkMode == SinkMode.UPDATE; 631 } 632 633 /** 634 * Method isSink returns true if this Tap instance can be used as a sink. 635 * 636 * @return boolean 637 */ 638 public boolean isSink() 639 { 640 return getScheme().isSink(); 641 } 642 643 /** 644 * Method isSource returns true if this Tap instance can be used as a source. 645 * 646 * @return boolean 647 */ 648 public boolean isSource() 649 { 650 return getScheme().isSource(); 651 } 652 653 /** 654 * Method isTemporary returns true if this Tap is temporary (used for intermediate results). 655 * 656 * @return the temporary (type boolean) of this Tap object. 657 */ 658 public boolean isTemporary() 659 { 660 return false; 661 } 662 663 /** 664 * Returns a {@link cascading.property.ConfigDef} instance that allows for local properties to be set and made available via 665 * a resulting {@link cascading.flow.FlowProcess} instance when the tap is invoked. 666 * <p/> 667 * Any properties set on the configDef will not show up in any {@link Flow} or {@link cascading.flow.FlowStep} process 668 * level configuration, but will override any of those values as seen by the current Tap instance method call where a 669 * FlowProcess is provided except for the {@link #sourceConfInit(cascading.flow.FlowProcess, Object)} and 670 * {@link #sinkConfInit(cascading.flow.FlowProcess, Object)} methods. 671 * <p/> 672 * That is, the {@code *confInit} methods are called before any ConfigDef is applied, so any values placed into 673 * a ConfigDef instance will not be visible to them. 674 * 675 * @return an instance of ConfigDef 676 */ 677 public ConfigDef getConfigDef() 678 { 679 if( configDef == null ) 680 configDef = new ConfigDef(); 681 682 return configDef; 683 } 684 685 /** 686 * Returns {@code true} if there are properties in the configDef instance. 687 * 688 * @return true if there are configDef properties 689 */ 690 public boolean hasConfigDef() 691 { 692 return configDef != null && !configDef.isEmpty(); 693 } 694 695 /** 696 * Returns a {@link ConfigDef} instance that allows for process level properties to be set and made available via 697 * a resulting {@link cascading.flow.FlowProcess} instance when the tap is invoked. 698 * <p/> 699 * Any properties set on the stepConfigDef will not show up in any Flow configuration, but will show up in 700 * the current process {@link cascading.flow.FlowStep} (in Hadoop the MapReduce jobconf). Any value set in the 701 * stepConfigDef will be overridden by the tap local {@code #getConfigDef} instance. 702 * </p> 703 * Use this method to tweak properties in the process step this tap instance is planned into. 704 * <p/> 705 * Note the {@code *confInit} methods are called before any ConfigDef is applied, so any values placed into 706 * a ConfigDef instance will not be visible to them. 707 * 708 * @return an instance of ConfigDef 709 */ 710 @Override 711 public ConfigDef getStepConfigDef() 712 { 713 if( processConfigDef == null ) 714 processConfigDef = new ConfigDef(); 715 716 return processConfigDef; 717 } 718 719 /** 720 * Returns {@code true} if there are properties in the processConfigDef instance. 721 * 722 * @return true if there are processConfigDef properties 723 */ 724 @Override 725 public boolean hasStepConfigDef() 726 { 727 return processConfigDef != null && !processConfigDef.isEmpty(); 728 } 729 730 @Override 731 public boolean isEquivalentTo( FlowElement element ) 732 { 733 if( element == null ) 734 return false; 735 736 if( this == element ) 737 return true; 738 739 boolean compare = getClass() == element.getClass(); 740 741 if( !compare ) 742 return false; 743 744 return equals( element ); 745 } 746 747 @Override 748 public boolean equals( Object object ) 749 { 750 if( this == object ) 751 return true; 752 if( object == null || getClass() != object.getClass() ) 753 return false; 754 755 Tap tap = (Tap) object; 756 757 if( getIdentifier() != null ? !getIdentifier().equals( tap.getIdentifier() ) : tap.getIdentifier() != null ) 758 return false; 759 760 if( getScheme() != null ? !getScheme().equals( tap.getScheme() ) : tap.getScheme() != null ) 761 return false; 762 763 return true; 764 } 765 766 @Override 767 public int hashCode() 768 { 769 int result = getIdentifier() != null ? getIdentifier().hashCode() : 0; 770 771 result = 31 * result + ( getScheme() != null ? getScheme().hashCode() : 0 ); 772 773 return result; 774 } 775 776 @Override 777 public String toString() 778 { 779 if( getIdentifier() != null ) 780 return getClass().getSimpleName() + "[\"" + getScheme() + "\"]" + "[\"" + Util.sanitizeUrl( getIdentifier() ) + "\"]"; // sanitize 781 else 782 return getClass().getSimpleName() + "[\"" + getScheme() + "\"]" + "[not initialized]"; 783 } 784 }