001    /*
002     * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.tap;
022    
023    import java.io.IOException;
024    import java.io.Serializable;
025    import java.util.Set;
026    
027    import cascading.flow.Flow;
028    import cascading.flow.FlowElement;
029    import cascading.flow.FlowException;
030    import cascading.flow.FlowProcess;
031    import cascading.flow.planner.Scope;
032    import cascading.management.annotation.Property;
033    import cascading.management.annotation.PropertyDescription;
034    import cascading.management.annotation.PropertySanitizer;
035    import cascading.management.annotation.Visibility;
036    import cascading.pipe.Pipe;
037    import cascading.property.ConfigDef;
038    import cascading.scheme.Scheme;
039    import cascading.tuple.Fields;
040    import cascading.tuple.FieldsResolverException;
041    import cascading.tuple.Tuple;
042    import cascading.tuple.TupleEntryCollector;
043    import cascading.tuple.TupleEntryIterator;
044    import cascading.util.TraceUtil;
045    import cascading.util.Traceable;
046    import cascading.util.Util;
047    
048    /**
049     * A Tap represents the physical data source or sink in a connected {@link cascading.flow.Flow}.
050     * </p>
051     * That is, a source Tap is the head end of a connected {@link Pipe} and {@link Tuple} stream, and
052     * a sink Tap is the tail end. Kinds of Tap types are used to manage files from a local disk,
053     * distributed disk, remote storage like Amazon S3, or via FTP. It simply abstracts
054     * out the complexity of connecting to these types of data sources.
055     * <p/>
056     * A Tap takes a {@link Scheme} instance, which is used to identify the type of resource (text file, binary file, etc).
057     * A Tap is responsible for how the resource is reached.
058     * <p/>
059     * By default when planning a Flow, Tap equality is a function of the {@link #getIdentifier()} and {@link #getScheme()}
060     * values. That is, two Tap instances are the same Tap instance if they sink/source the same resource and sink/source
061     * the same fields.
062     * <p/>
063     * Some more advanced taps, like a database tap, may need to extend equality to include any filtering, like the
064     * {@code where} clause in a SQL statement so two taps reading from the same SQL table aren't considered equal.
065     * <p/>
066     * Taps are also used to determine dependencies between two or more {@link Flow} instances when used with a
067     * {@link cascading.cascade.Cascade}. In that case the {@link #getFullIdentifier(Object)} value is used and the Scheme
068     * is ignored.
069     */
070    public abstract class Tap<Config, Input, Output> implements FlowElement, Serializable, Traceable
071      {
072      /** Field scheme */
073      private Scheme<Config, Input, Output, ?, ?> scheme;
074    
075      /** Field mode */
076      SinkMode sinkMode = SinkMode.KEEP;
077    
078      private ConfigDef configDef;
079    
080      private ConfigDef processConfigDef;
081    
082      /** Field id */
083      private final String id = Util.createUniqueID(); // 3.0 planner relies on this being consistent
084      /** Field trace */
085      private String trace = TraceUtil.captureDebugTrace( this ); // see TraceUtil.setTrace() to override
086    
087      /**
088       * Convenience function to make an array of Tap instances.
089       *
090       * @param taps of type Tap
091       * @return Tap array
092       */
093      public static Tap[] taps( Tap... taps )
094        {
095        return taps;
096        }
097    
098      /**
099       * Creates and returns a unique ID for the given Tap, this value is cached and may be used to uniquely identify
100       * the Tap instance in properties files etc.
101       * <p/>
102       * This value is generally reproducible assuming the Tap identifier and the Scheme source and sink Fields remain consistent.
103       *
104       * @param tap of type Tap
105       * @return of type String
106       */
107      public static synchronized String id( Tap tap )
108        {
109        if( tap instanceof DecoratorTap )
110          return id( ( (DecoratorTap) tap ).getOriginal() );
111    
112        return tap.id;
113        }
114    
115      protected Tap()
116        {
117        }
118    
119      protected Tap( Scheme<Config, Input, Output, ?, ?> scheme )
120        {
121        this.setScheme( scheme );
122        }
123    
124      protected Tap( Scheme<Config, Input, Output, ?, ?> scheme, SinkMode sinkMode )
125        {
126        this.setScheme( scheme );
127        this.sinkMode = sinkMode;
128        }
129    
130      protected void setScheme( Scheme<Config, Input, Output, ?, ?> scheme )
131        {
132        this.scheme = scheme;
133        }
134    
135      /**
136       * Method getScheme returns the scheme of this Tap object.
137       *
138       * @return the scheme (type Scheme) of this Tap object.
139       */
140      public Scheme<Config, Input, Output, ?, ?> getScheme()
141        {
142        return scheme;
143        }
144    
145      @Override
146      public String getTrace()
147        {
148        return trace;
149        }
150    
151      /**
152       * Method flowInit allows this Tap instance to initialize itself in context of the given {@link cascading.flow.Flow} instance.
153       * This method is guaranteed to be called before the Flow is started and the
154       * {@link cascading.flow.FlowListener#onStarting(cascading.flow.Flow)} event is fired.
155       * <p/>
156       * This method will be called once per Flow, and before {@link #sourceConfInit(cascading.flow.FlowProcess, Object)} and
157       * {@link #sinkConfInit(cascading.flow.FlowProcess, Object)} methods.
158       *
159       * @param flow of type Flow
160       */
161      public void flowConfInit( Flow<Config> flow )
162        {
163    
164        }
165    
166      /**
167       * Method sourceConfInit initializes this instance as a source.
168       * <p/>
169       * This method maybe called more than once if this Tap instance is used outside the scope of a {@link cascading.flow.Flow}
170       * instance or if it participates in multiple times in a given Flow or across different Flows in
171       * a {@link cascading.cascade.Cascade}.
172       * <p/>
173       * In the context of a Flow, it will be called after
174       * {@link cascading.flow.FlowListener#onStarting(cascading.flow.Flow)}
175       * <p/>
176       * Note that no resources or services should be modified by this method.
177       *
178       * @param flowProcess of type FlowProcess
179       * @param conf        of type Config
180       */
181      public void sourceConfInit( FlowProcess<Config> flowProcess, Config conf )
182        {
183        getScheme().sourceConfInit( flowProcess, this, conf );
184        }
185    
186      /**
187       * Method sinkConfInit initializes this instance as a sink.
188       * <p/>
189       * This method maybe called more than once if this Tap instance is used outside the scope of a {@link cascading.flow.Flow}
190       * instance or if it participates in multiple times in a given Flow or across different Flows in
191       * a {@link cascading.cascade.Cascade}.
192       * <p/>
193       * Note this method will be called in context of this Tap being used as a traditional 'sink' and as a 'trap'.
194       * <p/>
195       * In the context of a Flow, it will be called after
196       * {@link cascading.flow.FlowListener#onStarting(cascading.flow.Flow)}
197       * <p/>
198       * Note that no resources or services should be modified by this method. If this Tap instance returns true for
199       * {@link #isReplace()}, then {@link #deleteResource(Object)} will be called by the parent Flow.
200       *
201       * @param flowProcess of type FlowProcess
202       * @param conf        of type Config
203       */
204      public void sinkConfInit( FlowProcess<Config> flowProcess, Config conf )
205        {
206        getScheme().sinkConfInit( flowProcess, this, conf );
207        }
208    
209      /**
210       * Method getIdentifier returns a String representing the resource this Tap instance represents.
211       * <p/>
212       * Often, if the tap accesses a filesystem, the identifier is nothing more than the path to the file or directory.
213       * In other cases it may be a an URL or URI representing a connection string or remote resource.
214       * <p/>
215       * Any two Tap instances having the same value for the identifier are considered equal.
216       *
217       * @return String
218       */
219      @Property(name = "identifier", visibility = Visibility.PUBLIC)
220      @PropertyDescription("The resource this Tap instance represents")
221      @PropertySanitizer("cascading.management.annotation.URISanitizer")
222      public abstract String getIdentifier();
223    
224      /**
225       * Method getSourceFields returns the sourceFields of this Tap object.
226       *
227       * @return the sourceFields (type Fields) of this Tap object.
228       */
229      public Fields getSourceFields()
230        {
231        return getScheme().getSourceFields();
232        }
233    
234      /**
235       * Method getSinkFields returns the sinkFields of this Tap object.
236       *
237       * @return the sinkFields (type Fields) of this Tap object.
238       */
239      public Fields getSinkFields()
240        {
241        return getScheme().getSinkFields();
242        }
243    
244      /**
245       * Method openForRead opens the resource represented by this Tap instance for reading.
246       * <p/>
247       * {@code input} value may be null, if so, sub-classes must inquire with the underlying {@link Scheme}
248       * via {@link Scheme#sourceConfInit(cascading.flow.FlowProcess, Tap, Object)} to get the proper
249       * input type and instantiate it before calling {@code super.openForRead()}.
250       * <p/>
251       * Note the returned iterator will return the same instance of {@link cascading.tuple.TupleEntry} on every call,
252       * thus a copy must be made of either the TupleEntry or the underlying {@code Tuple} instance if they are to be
253       * stored in a Collection.
254       *
255       * @param flowProcess of type FlowProcess
256       * @param input       of type Input
257       * @return TupleEntryIterator
258       * @throws java.io.IOException when the resource cannot be opened
259       */
260      public abstract TupleEntryIterator openForRead( FlowProcess<Config> flowProcess, Input input ) throws IOException;
261    
262      /**
263       * Method openForRead opens the resource represented by this Tap instance for reading.
264       * <p/>
265       * Note the returned iterator will return the same instance of {@link cascading.tuple.TupleEntry} on every call,
266       * thus a copy must be made of either the TupleEntry or the underlying {@code Tuple} instance if they are to be
267       * stored in a Collection.
268       *
269       * @param flowProcess of type FlowProcess
270       * @return TupleEntryIterator
271       * @throws java.io.IOException when the resource cannot be opened
272       */
273      public TupleEntryIterator openForRead( FlowProcess<Config> flowProcess ) throws IOException
274        {
275        return openForRead( flowProcess, null );
276        }
277    
278      /**
279       * Method openForWrite opens the resource represented by this Tap instance for writing.
280       * <p/>
281       * This method is used internally and does not honor the {@link SinkMode} setting. If SinkMode is
282       * {@link SinkMode#REPLACE}, this call may fail. See {@link #openForWrite(cascading.flow.FlowProcess)}.
283       * <p/>
284       * {@code output} value may be null, if so, sub-classes must inquire with the underlying {@link Scheme}
285       * via {@link Scheme#sinkConfInit(cascading.flow.FlowProcess, Tap, Object)} to get the proper
286       * output type and instantiate it before calling {@code super.openForWrite()}.
287       *
288       * @param flowProcess of type FlowProcess
289       * @param output      of type Output
290       * @return TupleEntryCollector
291       * @throws java.io.IOException when the resource cannot be opened
292       */
293      public abstract TupleEntryCollector openForWrite( FlowProcess<Config> flowProcess, Output output ) throws IOException;
294    
295      /**
296       * Method openForWrite opens the resource represented by this Tap instance for writing.
297       * <p/>
298       * This method is for user application use and does honor the {@link SinkMode#REPLACE} settings. That is, if
299       * SinkMode is set to {@link SinkMode#REPLACE} the underlying resource will be deleted.
300       * <p/>
301       * Note if {@link SinkMode#UPDATE} is set, the resource will not be deleted.
302       *
303       * @param flowProcess of type FlowProcess
304       * @return TupleEntryCollector
305       * @throws java.io.IOException when the resource cannot be opened
306       */
307      public TupleEntryCollector openForWrite( FlowProcess<Config> flowProcess ) throws IOException
308        {
309        if( isReplace() )
310          deleteResource( flowProcess.getConfigCopy() );
311    
312        return openForWrite( flowProcess, null );
313        }
314    
315      @Override
316      public Scope outgoingScopeFor( Set<Scope> incomingScopes )
317        {
318        // as a source Tap, we emit the scheme defined Fields
319        // as a sink Tap, we declare we emit the incoming Fields
320        // as a temp Tap, this method never gets called, but we emit what we consume
321        int count = 0;
322        for( Scope incomingScope : incomingScopes )
323          {
324          Fields incomingFields = incomingScope.getIncomingTapFields();
325    
326          if( incomingFields != null )
327            {
328            try
329              {
330              incomingFields.select( getSinkFields() );
331              }
332            catch( FieldsResolverException exception )
333              {
334              throw new TapException( this, exception.getSourceFields(), exception.getSelectorFields(), exception );
335              }
336    
337            count++;
338            }
339          }
340    
341        if( count > 1 )
342          throw new FlowException( "Tap may not have more than one incoming Scope" );
343    
344        // this allows the incoming to be passed through to the outgoing
345        Fields incomingFields = incomingScopes.size() == 0 ? null : incomingScopes.iterator().next().getIncomingTapFields();
346    
347        if( incomingFields != null &&
348          ( isSource() && getSourceFields().equals( Fields.UNKNOWN ) ||
349            isSink() && getSinkFields().equals( Fields.ALL ) ) )
350          return new Scope( incomingFields );
351    
352        if( count == 1 )
353          return new Scope( getSinkFields() );
354    
355        return new Scope( getSourceFields() );
356        }
357    
358      /**
359       * A hook for allowing a Scheme to lazily retrieve its source fields.
360       *
361       * @param flowProcess of type FlowProcess
362       * @return the found Fields
363       */
364      public Fields retrieveSourceFields( FlowProcess<Config> flowProcess )
365        {
366        return getScheme().retrieveSourceFields( flowProcess, this );
367        }
368    
369      public void presentSourceFields( FlowProcess<Config> flowProcess, Fields fields )
370        {
371        getScheme().presentSourceFields( flowProcess, this, fields );
372        }
373    
374      /**
375       * A hook for allowing a Scheme to lazily retrieve its sink fields.
376       *
377       * @param flowProcess of type FlowProcess
378       * @return the found Fields
379       */
380      public Fields retrieveSinkFields( FlowProcess<Config> flowProcess )
381        {
382        return getScheme().retrieveSinkFields( flowProcess, this );
383        }
384    
385      public void presentSinkFields( FlowProcess<Config> flowProcess, Fields fields )
386        {
387        getScheme().presentSinkFields( flowProcess, this, fields );
388        }
389    
390      @Override
391      public Fields resolveIncomingOperationArgumentFields( Scope incomingScope )
392        {
393        return incomingScope.getIncomingTapFields();
394        }
395    
396      @Override
397      public Fields resolveIncomingOperationPassThroughFields( Scope incomingScope )
398        {
399        return incomingScope.getIncomingTapFields();
400        }
401    
402      /**
403       * Method getFullIdentifier returns a fully qualified resource identifier.
404       *
405       * @param flowProcess of type FlowProcess
406       * @return String
407       */
408      public String getFullIdentifier( FlowProcess<Config> flowProcess )
409        {
410        return getFullIdentifier( flowProcess.getConfigCopy() );
411        }
412    
413      /**
414       * Method getFullIdentifier returns a fully qualified resource identifier.
415       *
416       * @param conf of type Config
417       * @return String
418       */
419      public String getFullIdentifier( Config conf )
420        {
421        return getIdentifier();
422        }
423    
424      /**
425       * Method createResource creates the underlying resource.
426       *
427       * @param flowProcess of type FlowProcess
428       * @return boolean
429       * @throws IOException when there is an error making directories
430       */
431      public boolean createResource( FlowProcess<Config> flowProcess ) throws IOException
432        {
433        return createResource( flowProcess.getConfigCopy() );
434        }
435    
436      /**
437       * Method createResource creates the underlying resource.
438       *
439       * @param conf of type Config
440       * @return boolean
441       * @throws IOException when there is an error making directories
442       */
443      public abstract boolean createResource( Config conf ) throws IOException;
444    
445      /**
446       * Method deleteResource deletes the resource represented by this instance.
447       *
448       * @param flowProcess of type FlowProcess
449       * @return boolean
450       * @throws IOException when the resource cannot be deleted
451       */
452      public boolean deleteResource( FlowProcess<Config> flowProcess ) throws IOException
453        {
454        return deleteResource( flowProcess.getConfigCopy() );
455        }
456    
457      /**
458       * Method deleteResource deletes the resource represented by this instance.
459       *
460       * @param conf of type Config
461       * @return boolean
462       * @throws IOException when the resource cannot be deleted
463       */
464      public abstract boolean deleteResource( Config conf ) throws IOException;
465    
466      /**
467       * Method prepareResourceForRead allows the underlying resource to be notified when reading will begin.
468       * <p/>
469       * This method will be called client side so that any remote or external resources can be initialized.
470       * <p/>
471       * If this method returns {@code false}, an exception will be thrown halting the current Flow.
472       * <p/>
473       * In most cases, resource initialization should happen in the {@link #openForRead(FlowProcess, Object)}  method.
474       * <p/>
475       * This allows for initialization of cluster side resources, like a JDBC driver used to read data from a database,
476       * that cannot be passed client to cluster.
477       *
478       * @param conf of type Config
479       * @return returns true if successful
480       * @throws IOException
481       */
482      public boolean prepareResourceForRead( Config conf ) throws IOException
483        {
484        return true;
485        }
486    
487      /**
488       * Method prepareResourceForWrite allows the underlying resource to be notified when writing will begin.
489       * <p/>
490       * This method will be called once client side so that any remote or external resources can be initialized.
491       * <p/>
492       * If this method returns {@code false}, an exception will be thrown halting the current Flow.
493       * <p/>
494       * In most cases, resource initialization should happen in the {@link #openForWrite(FlowProcess, Object)} method.
495       * <p/>
496       * This allows for initialization of cluster side resources, like a JDBC driver used to write data to a database,
497       * that cannot be passed client to cluster.
498       * <p/>
499       * In the above JDBC example, overriding this method will allow for testing for the existence of and/or creating
500       * a remote table used by all individual cluster side tasks.
501       *
502       * @param conf of type Config
503       * @return returns true if successful
504       * @throws IOException
505       */
506      public boolean prepareResourceForWrite( Config conf ) throws IOException
507        {
508        return true;
509        }
510    
511      /**
512       * Method commitResource allows the underlying resource to be notified when all write processing is
513       * successful so that any additional cleanup or processing may be completed.
514       * <p/>
515       * See {@link #rollbackResource(Object)} to handle cleanup in the face of failures.
516       * <p/>
517       * This method is invoked once client side and not in the cluster, if any.
518       * <p/>
519       * If other sink Tap instance in a given Flow fail on commitResource after called on this instance,
520       * rollbackResource will not be called.
521       *
522       * @param conf of type Config
523       * @return returns true if successful
524       * @throws IOException
525       */
526      public boolean commitResource( Config conf ) throws IOException
527        {
528        return true;
529        }
530    
531      /**
532       * Method rollbackResource allows the underlying resource to be notified when any write processing has failed or
533       * was stopped so that any cleanup may be started.
534       * <p/>
535       * See {@link #commitResource(Object)} to handle cleanup when the write has successfully completed.
536       * <p/>
537       * This method is invoked once client side and not in the cluster, if any.
538       *
539       * @param conf of type Config
540       * @return returns true if successful
541       * @throws IOException
542       */
543      public boolean rollbackResource( Config conf ) throws IOException
544        {
545        return true;
546        }
547    
548      /**
549       * Method resourceExists returns true if the path represented by this instance exists.
550       *
551       * @param flowProcess of type FlowProcess
552       * @return true if the underlying resource already exists
553       * @throws IOException when the status cannot be determined
554       */
555      public boolean resourceExists( FlowProcess<Config> flowProcess ) throws IOException
556        {
557        return resourceExists( flowProcess.getConfigCopy() );
558        }
559    
560      /**
561       * Method resourceExists returns true if the path represented by this instance exists.
562       *
563       * @param conf of type Config
564       * @return true if the underlying resource already exists
565       * @throws IOException when the status cannot be determined
566       */
567      public abstract boolean resourceExists( Config conf ) throws IOException;
568    
569      /**
570       * Method getModifiedTime returns the date this resource was last modified.
571       *
572       * @param flowProcess of type FlowProcess
573       * @return The date this resource was last modified.
574       * @throws IOException
575       */
576      public long getModifiedTime( FlowProcess<Config> flowProcess ) throws IOException
577        {
578        return getModifiedTime( flowProcess.getConfigCopy() );
579        }
580    
581      /**
582       * Method getModifiedTime returns the date this resource was last modified.
583       *
584       * @param conf of type Config
585       * @return The date this resource was last modified.
586       * @throws IOException
587       */
588      public abstract long getModifiedTime( Config conf ) throws IOException;
589    
590      /**
591       * Method getSinkMode returns the {@link SinkMode} }of this Tap object.
592       *
593       * @return the sinkMode (type SinkMode) of this Tap object.
594       */
595      public SinkMode getSinkMode()
596        {
597        return sinkMode;
598        }
599    
600      /**
601       * Method isKeep indicates whether the resource represented by this instance should be kept if it
602       * already exists when the Flow is started.
603       *
604       * @return boolean
605       */
606      public boolean isKeep()
607        {
608        return sinkMode == SinkMode.KEEP;
609        }
610    
611      /**
612       * Method isReplace indicates whether the resource represented by this instance should be deleted if it
613       * already exists when the Flow is started.
614       *
615       * @return boolean
616       */
617      public boolean isReplace()
618        {
619        return sinkMode == SinkMode.REPLACE;
620        }
621    
622      /**
623       * Method isUpdate indicates whether the resource represented by this instance should be updated if it already
624       * exists. Otherwise a new resource will be created, via {@link #createResource(Object)}, when the Flow is started.
625       *
626       * @return boolean
627       */
628      public boolean isUpdate()
629        {
630        return sinkMode == SinkMode.UPDATE;
631        }
632    
633      /**
634       * Method isSink returns true if this Tap instance can be used as a sink.
635       *
636       * @return boolean
637       */
638      public boolean isSink()
639        {
640        return getScheme().isSink();
641        }
642    
643      /**
644       * Method isSource returns true if this Tap instance can be used as a source.
645       *
646       * @return boolean
647       */
648      public boolean isSource()
649        {
650        return getScheme().isSource();
651        }
652    
653      /**
654       * Method isTemporary returns true if this Tap is temporary (used for intermediate results).
655       *
656       * @return the temporary (type boolean) of this Tap object.
657       */
658      public boolean isTemporary()
659        {
660        return false;
661        }
662    
663      /**
664       * Returns a {@link cascading.property.ConfigDef} instance that allows for local properties to be set and made available via
665       * a resulting {@link cascading.flow.FlowProcess} instance when the tap is invoked.
666       * <p/>
667       * Any properties set on the configDef will not show up in any {@link Flow} or {@link cascading.flow.FlowStep} process
668       * level configuration, but will override any of those values as seen by the current Tap instance method call where a
669       * FlowProcess is provided except for the {@link #sourceConfInit(cascading.flow.FlowProcess, Object)} and
670       * {@link #sinkConfInit(cascading.flow.FlowProcess, Object)} methods.
671       * <p/>
672       * That is, the {@code *confInit} methods are called before any ConfigDef is applied, so any values placed into
673       * a ConfigDef instance will not be visible to them.
674       *
675       * @return an instance of ConfigDef
676       */
677      public ConfigDef getConfigDef()
678        {
679        if( configDef == null )
680          configDef = new ConfigDef();
681    
682        return configDef;
683        }
684    
685      /**
686       * Returns {@code true} if there are properties in the configDef instance.
687       *
688       * @return true if there are configDef properties
689       */
690      public boolean hasConfigDef()
691        {
692        return configDef != null && !configDef.isEmpty();
693        }
694    
695      /**
696       * Returns a {@link ConfigDef} instance that allows for process level properties to be set and made available via
697       * a resulting {@link cascading.flow.FlowProcess} instance when the tap is invoked.
698       * <p/>
699       * Any properties set on the stepConfigDef will not show up in any Flow configuration, but will show up in
700       * the current process {@link cascading.flow.FlowStep} (in Hadoop the MapReduce jobconf). Any value set in the
701       * stepConfigDef will be overridden by the tap local {@code #getConfigDef} instance.
702       * </p>
703       * Use this method to tweak properties in the process step this tap instance is planned into.
704       * <p/>
705       * Note the {@code *confInit} methods are called before any ConfigDef is applied, so any values placed into
706       * a ConfigDef instance will not be visible to them.
707       *
708       * @return an instance of ConfigDef
709       */
710      @Override
711      public ConfigDef getStepConfigDef()
712        {
713        if( processConfigDef == null )
714          processConfigDef = new ConfigDef();
715    
716        return processConfigDef;
717        }
718    
719      /**
720       * Returns {@code true} if there are properties in the processConfigDef instance.
721       *
722       * @return true if there are processConfigDef properties
723       */
724      @Override
725      public boolean hasStepConfigDef()
726        {
727        return processConfigDef != null && !processConfigDef.isEmpty();
728        }
729    
730      @Override
731      public boolean isEquivalentTo( FlowElement element )
732        {
733        if( element == null )
734          return false;
735    
736        if( this == element )
737          return true;
738    
739        boolean compare = getClass() == element.getClass();
740    
741        if( !compare )
742          return false;
743    
744        return equals( element );
745        }
746    
747      @Override
748      public boolean equals( Object object )
749        {
750        if( this == object )
751          return true;
752        if( object == null || getClass() != object.getClass() )
753          return false;
754    
755        Tap tap = (Tap) object;
756    
757        if( getIdentifier() != null ? !getIdentifier().equals( tap.getIdentifier() ) : tap.getIdentifier() != null )
758          return false;
759    
760        if( getScheme() != null ? !getScheme().equals( tap.getScheme() ) : tap.getScheme() != null )
761          return false;
762    
763        return true;
764        }
765    
766      @Override
767      public int hashCode()
768        {
769        int result = getIdentifier() != null ? getIdentifier().hashCode() : 0;
770    
771        result = 31 * result + ( getScheme() != null ? getScheme().hashCode() : 0 );
772    
773        return result;
774        }
775    
776      @Override
777      public String toString()
778        {
779        if( getIdentifier() != null )
780          return getClass().getSimpleName() + "[\"" + getScheme() + "\"]" + "[\"" + Util.sanitizeUrl( getIdentifier() ) + "\"]"; // sanitize
781        else
782          return getClass().getSimpleName() + "[\"" + getScheme() + "\"]" + "[not initialized]";
783        }
784      }