001/*
002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.tap;
022
023import java.io.IOException;
024import java.io.Serializable;
025import java.util.Set;
026
027import cascading.flow.Flow;
028import cascading.flow.FlowElement;
029import cascading.flow.FlowException;
030import cascading.flow.FlowProcess;
031import cascading.flow.planner.Scope;
032import cascading.flow.planner.ScopedElement;
033import cascading.management.annotation.Property;
034import cascading.management.annotation.PropertyDescription;
035import cascading.management.annotation.PropertySanitizer;
036import cascading.management.annotation.Visibility;
037import cascading.pipe.Pipe;
038import cascading.property.ConfigDef;
039import cascading.scheme.Scheme;
040import cascading.tuple.Fields;
041import cascading.tuple.FieldsResolverException;
042import cascading.tuple.Tuple;
043import cascading.tuple.TupleEntryCollector;
044import cascading.tuple.TupleEntryIterator;
045import cascading.util.TraceUtil;
046import cascading.util.Traceable;
047import cascading.util.Util;
048
049/**
050 * A Tap represents the physical data source or sink in a connected {@link cascading.flow.Flow}.
051 * </p>
052 * That is, a source Tap is the head end of a connected {@link Pipe} and {@link Tuple} stream, and
053 * a sink Tap is the tail end. Kinds of Tap types are used to manage files from a local disk,
054 * distributed disk, remote storage like Amazon S3, or via FTP. It simply abstracts
055 * out the complexity of connecting to these types of data sources.
056 * <p/>
057 * A Tap takes a {@link Scheme} instance, which is used to identify the type of resource (text file, binary file, etc).
058 * A Tap is responsible for how the resource is reached.
059 * <p/>
060 * By default when planning a Flow, Tap equality is a function of the {@link #getIdentifier()} and {@link #getScheme()}
061 * values. That is, two Tap instances are the same Tap instance if they sink/source the same resource and sink/source
062 * the same fields.
063 * <p/>
064 * Some more advanced taps, like a database tap, may need to extend equality to include any filtering, like the
065 * {@code where} clause in a SQL statement so two taps reading from the same SQL table aren't considered equal.
066 * <p/>
067 * Taps are also used to determine dependencies between two or more {@link Flow} instances when used with a
068 * {@link cascading.cascade.Cascade}. In that case the {@link #getFullIdentifier(Object)} value is used and the Scheme
069 * is ignored.
070 */
071public abstract class Tap<Config, Input, Output> implements ScopedElement, FlowElement, Serializable, Traceable
072  {
073  /** Field scheme */
074  private Scheme<Config, Input, Output, ?, ?> scheme;
075
076  /** Field mode */
077  SinkMode sinkMode = SinkMode.KEEP;
078
079  private ConfigDef configDef;
080  private ConfigDef nodeConfigDef;
081  private ConfigDef stepConfigDef;
082
083  /** Field id */
084  private final String id = Util.createUniqueID(); // 3.0 planner relies on this being consistent
085  /** Field trace */
086  private String trace = TraceUtil.captureDebugTrace( this ); // see TraceUtil.setTrace() to override
087
088  /**
089   * Convenience function to make an array of Tap instances.
090   *
091   * @param taps of type Tap
092   * @return Tap array
093   */
094  public static Tap[] taps( Tap... taps )
095    {
096    return taps;
097    }
098
099  /**
100   * Creates and returns a unique ID for the given Tap, this value is cached and may be used to uniquely identify
101   * the Tap instance in properties files etc.
102   * <p/>
103   * This value is generally reproducible assuming the Tap identifier and the Scheme source and sink Fields remain consistent.
104   *
105   * @param tap of type Tap
106   * @return of type String
107   */
108  public static synchronized String id( Tap tap )
109    {
110    if( tap instanceof DecoratorTap )
111      return id( ( (DecoratorTap) tap ).getOriginal() );
112
113    return tap.id;
114    }
115
116  protected Tap()
117    {
118    }
119
120  protected Tap( Scheme<Config, Input, Output, ?, ?> scheme )
121    {
122    this.setScheme( scheme );
123    }
124
125  protected Tap( Scheme<Config, Input, Output, ?, ?> scheme, SinkMode sinkMode )
126    {
127    this.setScheme( scheme );
128    this.sinkMode = sinkMode;
129    }
130
131  protected void setScheme( Scheme<Config, Input, Output, ?, ?> scheme )
132    {
133    this.scheme = scheme;
134    }
135
136  /**
137   * Method getScheme returns the scheme of this Tap object.
138   *
139   * @return the scheme (type Scheme) of this Tap object.
140   */
141  public Scheme<Config, Input, Output, ?, ?> getScheme()
142    {
143    return scheme;
144    }
145
146  @Override
147  public String getTrace()
148    {
149    return trace;
150    }
151
152  /**
153   * Method flowInit allows this Tap instance to initialize itself in context of the given {@link cascading.flow.Flow} instance.
154   * This method is guaranteed to be called before the Flow is started and the
155   * {@link cascading.flow.FlowListener#onStarting(cascading.flow.Flow)} event is fired.
156   * <p/>
157   * This method will be called once per Flow, and before {@link #sourceConfInit(cascading.flow.FlowProcess, Object)} and
158   * {@link #sinkConfInit(cascading.flow.FlowProcess, Object)} methods.
159   *
160   * @param flow of type Flow
161   */
162  public void flowConfInit( Flow<Config> flow )
163    {
164
165    }
166
167  /**
168   * Method sourceConfInit initializes this instance as a source.
169   * <p/>
170   * This method maybe called more than once if this Tap instance is used outside the scope of a {@link cascading.flow.Flow}
171   * instance or if it participates in multiple times in a given Flow or across different Flows in
172   * a {@link cascading.cascade.Cascade}.
173   * <p/>
174   * In the context of a Flow, it will be called after
175   * {@link cascading.flow.FlowListener#onStarting(cascading.flow.Flow)}
176   * <p/>
177   * Note that no resources or services should be modified by this method.
178   *
179   * @param flowProcess of type FlowProcess
180   * @param conf        of type Config
181   */
182  public void sourceConfInit( FlowProcess<? extends Config> flowProcess, Config conf )
183    {
184    getScheme().sourceConfInit( flowProcess, this, conf );
185    }
186
187  /**
188   * Method sinkConfInit initializes this instance as a sink.
189   * <p/>
190   * This method maybe called more than once if this Tap instance is used outside the scope of a {@link cascading.flow.Flow}
191   * instance or if it participates in multiple times in a given Flow or across different Flows in
192   * a {@link cascading.cascade.Cascade}.
193   * <p/>
194   * Note this method will be called in context of this Tap being used as a traditional 'sink' and as a 'trap'.
195   * <p/>
196   * In the context of a Flow, it will be called after
197   * {@link cascading.flow.FlowListener#onStarting(cascading.flow.Flow)}
198   * <p/>
199   * Note that no resources or services should be modified by this method. If this Tap instance returns true for
200   * {@link #isReplace()}, then {@link #deleteResource(Object)} will be called by the parent Flow.
201   *
202   * @param flowProcess of type FlowProcess
203   * @param conf        of type Config
204   */
205  public void sinkConfInit( FlowProcess<? extends Config> flowProcess, Config conf )
206    {
207    getScheme().sinkConfInit( flowProcess, this, conf );
208    }
209
210  /**
211   * Method getIdentifier returns a String representing the resource this Tap instance represents.
212   * <p/>
213   * Often, if the tap accesses a filesystem, the identifier is nothing more than the path to the file or directory.
214   * In other cases it may be a an URL or URI representing a connection string or remote resource.
215   * <p/>
216   * Any two Tap instances having the same value for the identifier are considered equal.
217   *
218   * @return String
219   */
220  @Property(name = "identifier", visibility = Visibility.PUBLIC)
221  @PropertyDescription("The resource this instance represents")
222  @PropertySanitizer("cascading.management.annotation.URISanitizer")
223  public abstract String getIdentifier();
224
225  /**
226   * Method getSourceFields returns the sourceFields of this Tap object.
227   *
228   * @return the sourceFields (type Fields) of this Tap object.
229   */
230  public Fields getSourceFields()
231    {
232    return getScheme().getSourceFields();
233    }
234
235  /**
236   * Method getSinkFields returns the sinkFields of this Tap object.
237   *
238   * @return the sinkFields (type Fields) of this Tap object.
239   */
240  public Fields getSinkFields()
241    {
242    return getScheme().getSinkFields();
243    }
244
245  /**
246   * Method openForRead opens the resource represented by this Tap instance for reading.
247   * <p/>
248   * {@code input} value may be null, if so, sub-classes must inquire with the underlying {@link Scheme}
249   * via {@link Scheme#sourceConfInit(cascading.flow.FlowProcess, Tap, Object)} to get the proper
250   * input type and instantiate it before calling {@code super.openForRead()}.
251   * <p/>
252   * Note the returned iterator will return the same instance of {@link cascading.tuple.TupleEntry} on every call,
253   * thus a copy must be made of either the TupleEntry or the underlying {@code Tuple} instance if they are to be
254   * stored in a Collection.
255   *
256   * @param flowProcess of type FlowProcess
257   * @param input       of type Input
258   * @return TupleEntryIterator
259   * @throws java.io.IOException when the resource cannot be opened
260   */
261  public abstract TupleEntryIterator openForRead( FlowProcess<? extends Config> flowProcess, Input input ) throws IOException;
262
263  /**
264   * Method openForRead opens the resource represented by this Tap instance for reading.
265   * <p/>
266   * Note the returned iterator will return the same instance of {@link cascading.tuple.TupleEntry} on every call,
267   * thus a copy must be made of either the TupleEntry or the underlying {@code Tuple} instance if they are to be
268   * stored in a Collection.
269   *
270   * @param flowProcess of type FlowProcess
271   * @return TupleEntryIterator
272   * @throws java.io.IOException when the resource cannot be opened
273   */
274  public TupleEntryIterator openForRead( FlowProcess<? extends Config> flowProcess ) throws IOException
275    {
276    return openForRead( flowProcess, null );
277    }
278
279  /**
280   * Method openForWrite opens the resource represented by this Tap instance for writing.
281   * <p/>
282   * This method is used internally and does not honor the {@link SinkMode} setting. If SinkMode is
283   * {@link SinkMode#REPLACE}, this call may fail. See {@link #openForWrite(cascading.flow.FlowProcess)}.
284   * <p/>
285   * {@code output} value may be null, if so, sub-classes must inquire with the underlying {@link Scheme}
286   * via {@link Scheme#sinkConfInit(cascading.flow.FlowProcess, Tap, Object)} to get the proper
287   * output type and instantiate it before calling {@code super.openForWrite()}.
288   *
289   * @param flowProcess of type FlowProcess
290   * @param output      of type Output
291   * @return TupleEntryCollector
292   * @throws java.io.IOException when the resource cannot be opened
293   */
294  public abstract TupleEntryCollector openForWrite( FlowProcess<? extends Config> flowProcess, Output output ) throws IOException;
295
296  /**
297   * Method openForWrite opens the resource represented by this Tap instance for writing.
298   * <p/>
299   * This method is for user application use and does honor the {@link SinkMode#REPLACE} settings. That is, if
300   * SinkMode is set to {@link SinkMode#REPLACE} the underlying resource will be deleted.
301   * <p/>
302   * Note if {@link SinkMode#UPDATE} is set, the resource will not be deleted.
303   *
304   * @param flowProcess of type FlowProcess
305   * @return TupleEntryCollector
306   * @throws java.io.IOException when the resource cannot be opened
307   */
308  public TupleEntryCollector openForWrite( FlowProcess<? extends Config> flowProcess ) throws IOException
309    {
310    if( isReplace() )
311      deleteResource( flowProcess );
312
313    return openForWrite( flowProcess, null );
314    }
315
316  @Override
317  public Scope outgoingScopeFor( Set<Scope> incomingScopes )
318    {
319    // as a source Tap, we emit the scheme defined Fields
320    // as a sink Tap, we declare we emit the incoming Fields
321    // as a temp Tap, this method never gets called, but we emit what we consume
322    int count = 0;
323    for( Scope incomingScope : incomingScopes )
324      {
325      Fields incomingFields = incomingScope.getIncomingTapFields();
326
327      if( incomingFields != null )
328        {
329        try
330          {
331          incomingFields.select( getSinkFields() );
332          }
333        catch( FieldsResolverException exception )
334          {
335          throw new TapException( this, exception.getSourceFields(), exception.getSelectorFields(), exception );
336          }
337
338        count++;
339        }
340      }
341
342    if( count > 1 )
343      throw new FlowException( "Tap may not have more than one incoming Scope" );
344
345    // this allows the incoming to be passed through to the outgoing
346    Fields incomingFields = incomingScopes.size() == 0 ? null : incomingScopes.iterator().next().getIncomingTapFields();
347
348    if( incomingFields != null &&
349      ( isSource() && getSourceFields().equals( Fields.UNKNOWN ) ||
350        isSink() && getSinkFields().equals( Fields.ALL ) ) )
351      return new Scope( incomingFields );
352
353    if( count == 1 )
354      return new Scope( getSinkFields() );
355
356    return new Scope( getSourceFields() );
357    }
358
359  /**
360   * A hook for allowing a Scheme to lazily retrieve its source fields.
361   *
362   * @param flowProcess of type FlowProcess
363   * @return the found Fields
364   */
365  public Fields retrieveSourceFields( FlowProcess<? extends Config> flowProcess )
366    {
367    return getScheme().retrieveSourceFields( flowProcess, this );
368    }
369
370  public void presentSourceFields( FlowProcess<? extends Config> flowProcess, Fields fields )
371    {
372    getScheme().presentSourceFields( flowProcess, this, fields );
373    }
374
375  /**
376   * A hook for allowing a Scheme to lazily retrieve its sink fields.
377   *
378   * @param flowProcess of type FlowProcess
379   * @return the found Fields
380   */
381  public Fields retrieveSinkFields( FlowProcess<? extends Config> flowProcess )
382    {
383    return getScheme().retrieveSinkFields( flowProcess, this );
384    }
385
386  public void presentSinkFields( FlowProcess<? extends Config> flowProcess, Fields fields )
387    {
388    getScheme().presentSinkFields( flowProcess, this, fields );
389    }
390
391  @Override
392  public Fields resolveIncomingOperationArgumentFields( Scope incomingScope )
393    {
394    return incomingScope.getIncomingTapFields();
395    }
396
397  @Override
398  public Fields resolveIncomingOperationPassThroughFields( Scope incomingScope )
399    {
400    return incomingScope.getIncomingTapFields();
401    }
402
403  /**
404   * Method getFullIdentifier returns a fully qualified resource identifier.
405   *
406   * @param flowProcess of type FlowProcess
407   * @return String
408   */
409  public String getFullIdentifier( FlowProcess<? extends Config> flowProcess )
410    {
411    return getFullIdentifier( flowProcess.getConfig() );
412    }
413
414  /**
415   * Method getFullIdentifier returns a fully qualified resource identifier.
416   *
417   * @param conf of type Config
418   * @return String
419   */
420  public String getFullIdentifier( Config conf )
421    {
422    return getIdentifier();
423    }
424
425  /**
426   * Method createResource creates the underlying resource.
427   *
428   * @param flowProcess of type FlowProcess
429   * @return boolean
430   * @throws IOException when there is an error making directories
431   */
432  public boolean createResource( FlowProcess<? extends Config> flowProcess ) throws IOException
433    {
434    return createResource( flowProcess.getConfig() );
435    }
436
437  /**
438   * Method createResource creates the underlying resource.
439   *
440   * @param conf of type Config
441   * @return boolean
442   * @throws IOException when there is an error making directories
443   */
444  public abstract boolean createResource( Config conf ) throws IOException;
445
446  /**
447   * Method deleteResource deletes the resource represented by this instance.
448   *
449   * @param flowProcess of type FlowProcess
450   * @return boolean
451   * @throws IOException when the resource cannot be deleted
452   */
453  public boolean deleteResource( FlowProcess<? extends Config> flowProcess ) throws IOException
454    {
455    return deleteResource( flowProcess.getConfig() );
456    }
457
458  /**
459   * Method deleteResource deletes the resource represented by this instance.
460   *
461   * @param conf of type Config
462   * @return boolean
463   * @throws IOException when the resource cannot be deleted
464   */
465  public abstract boolean deleteResource( Config conf ) throws IOException;
466
467  /**
468   * Method prepareResourceForRead allows the underlying resource to be notified when reading will begin.
469   * <p/>
470   * This method will be called client side so that any remote or external resources can be initialized.
471   * <p/>
472   * If this method returns {@code false}, an exception will be thrown halting the current Flow.
473   * <p/>
474   * In most cases, resource initialization should happen in the {@link #openForRead(FlowProcess, Object)}  method.
475   * <p/>
476   * This allows for initialization of cluster side resources, like a JDBC driver used to read data from a database,
477   * that cannot be passed client to cluster.
478   *
479   * @param conf of type Config
480   * @return returns true if successful
481   * @throws IOException
482   */
483  public boolean prepareResourceForRead( Config conf ) throws IOException
484    {
485    return true;
486    }
487
488  /**
489   * Method prepareResourceForWrite allows the underlying resource to be notified when writing will begin.
490   * <p/>
491   * This method will be called once client side so that any remote or external resources can be initialized.
492   * <p/>
493   * If this method returns {@code false}, an exception will be thrown halting the current Flow.
494   * <p/>
495   * In most cases, resource initialization should happen in the {@link #openForWrite(FlowProcess, Object)} method.
496   * <p/>
497   * This allows for initialization of cluster side resources, like a JDBC driver used to write data to a database,
498   * that cannot be passed client to cluster.
499   * <p/>
500   * In the above JDBC example, overriding this method will allow for testing for the existence of and/or creating
501   * a remote table used by all individual cluster side tasks.
502   *
503   * @param conf of type Config
504   * @return returns true if successful
505   * @throws IOException
506   */
507  public boolean prepareResourceForWrite( Config conf ) throws IOException
508    {
509    return true;
510    }
511
512  /**
513   * Method commitResource allows the underlying resource to be notified when all write processing is
514   * successful so that any additional cleanup or processing may be completed.
515   * <p/>
516   * See {@link #rollbackResource(Object)} to handle cleanup in the face of failures.
517   * <p/>
518   * This method is invoked once client side and not in the cluster, if any.
519   * <p/>
520   * If other sink Tap instance in a given Flow fail on commitResource after called on this instance,
521   * rollbackResource will not be called.
522   *
523   * @param conf of type Config
524   * @return returns true if successful
525   * @throws IOException
526   */
527  public boolean commitResource( Config conf ) throws IOException
528    {
529    return true;
530    }
531
532  /**
533   * Method rollbackResource allows the underlying resource to be notified when any write processing has failed or
534   * was stopped so that any cleanup may be started.
535   * <p/>
536   * See {@link #commitResource(Object)} to handle cleanup when the write has successfully completed.
537   * <p/>
538   * This method is invoked once client side and not in the cluster, if any.
539   *
540   * @param conf of type Config
541   * @return returns true if successful
542   * @throws IOException
543   */
544  public boolean rollbackResource( Config conf ) throws IOException
545    {
546    return true;
547    }
548
549  /**
550   * Method resourceExists returns true if the path represented by this instance exists.
551   *
552   * @param flowProcess of type FlowProcess
553   * @return true if the underlying resource already exists
554   * @throws IOException when the status cannot be determined
555   */
556  public boolean resourceExists( FlowProcess<? extends Config> flowProcess ) throws IOException
557    {
558    return resourceExists( flowProcess.getConfig() );
559    }
560
561  /**
562   * Method resourceExists returns true if the path represented by this instance exists.
563   *
564   * @param conf of type Config
565   * @return true if the underlying resource already exists
566   * @throws IOException when the status cannot be determined
567   */
568  public abstract boolean resourceExists( Config conf ) throws IOException;
569
570  /**
571   * Method getModifiedTime returns the date this resource was last modified.
572   *
573   * @param flowProcess of type FlowProcess
574   * @return The date this resource was last modified.
575   * @throws IOException
576   */
577  public long getModifiedTime( FlowProcess<? extends Config> flowProcess ) throws IOException
578    {
579    return getModifiedTime( flowProcess.getConfig() );
580    }
581
582  /**
583   * Method getModifiedTime returns the date this resource was last modified.
584   *
585   * @param conf of type Config
586   * @return The date this resource was last modified.
587   * @throws IOException
588   */
589  public abstract long getModifiedTime( Config conf ) throws IOException;
590
591  /**
592   * Method getSinkMode returns the {@link SinkMode} }of this Tap object.
593   *
594   * @return the sinkMode (type SinkMode) of this Tap object.
595   */
596  public SinkMode getSinkMode()
597    {
598    return sinkMode;
599    }
600
601  /**
602   * Method isKeep indicates whether the resource represented by this instance should be kept if it
603   * already exists when the Flow is started.
604   *
605   * @return boolean
606   */
607  public boolean isKeep()
608    {
609    return sinkMode == SinkMode.KEEP;
610    }
611
612  /**
613   * Method isReplace indicates whether the resource represented by this instance should be deleted if it
614   * already exists when the Flow is started.
615   *
616   * @return boolean
617   */
618  public boolean isReplace()
619    {
620    return sinkMode == SinkMode.REPLACE;
621    }
622
623  /**
624   * Method isUpdate indicates whether the resource represented by this instance should be updated if it already
625   * exists. Otherwise a new resource will be created, via {@link #createResource(Object)}, when the Flow is started.
626   *
627   * @return boolean
628   */
629  public boolean isUpdate()
630    {
631    return sinkMode == SinkMode.UPDATE;
632    }
633
634  /**
635   * Method isSink returns true if this Tap instance can be used as a sink.
636   *
637   * @return boolean
638   */
639  public boolean isSink()
640    {
641    return getScheme().isSink();
642    }
643
644  /**
645   * Method isSource returns true if this Tap instance can be used as a source.
646   *
647   * @return boolean
648   */
649  public boolean isSource()
650    {
651    return getScheme().isSource();
652    }
653
654  /**
655   * Method isTemporary returns true if this Tap is temporary (used for intermediate results).
656   *
657   * @return the temporary (type boolean) of this Tap object.
658   */
659  public boolean isTemporary()
660    {
661    return false;
662    }
663
664  /**
665   * Returns a {@link cascading.property.ConfigDef} instance that allows for local properties to be set and made available via
666   * a resulting {@link cascading.flow.FlowProcess} instance when the tap is invoked.
667   * <p/>
668   * Any properties set on the configDef will not show up in any {@link Flow} or {@link cascading.flow.FlowStep} process
669   * level configuration, but will override any of those values as seen by the current Tap instance method call where a
670   * FlowProcess is provided except for the {@link #sourceConfInit(cascading.flow.FlowProcess, Object)} and
671   * {@link #sinkConfInit(cascading.flow.FlowProcess, Object)} methods.
672   * <p/>
673   * That is, the {@code *confInit} methods are called before any ConfigDef is applied, so any values placed into
674   * a ConfigDef instance will not be visible to them.
675   *
676   * @return an instance of ConfigDef
677   */
678  public ConfigDef getConfigDef()
679    {
680    if( configDef == null )
681      configDef = new ConfigDef();
682
683    return configDef;
684    }
685
686  /**
687   * Returns {@code true} if there are properties in the configDef instance.
688   *
689   * @return true if there are configDef properties
690   */
691  public boolean hasConfigDef()
692    {
693    return configDef != null && !configDef.isEmpty();
694    }
695
696  /**
697   * Returns a {@link ConfigDef} instance that allows for process level properties to be set and made available via
698   * a resulting {@link cascading.flow.FlowProcess} instance when the tap is invoked.
699   * <p/>
700   * Any properties set on the nodeConfigDef will not show up in any Flow configuration, but will show up in
701   * the current process {@link cascading.flow.FlowNode} (in Apache Tez the Vertex configuration). Any value set in the
702   * nodeConfigDef will be overridden by the pipe local {@code #getConfigDef} instance.
703   * </p>
704   * Use this method to tweak properties in the process node this tap instance is planned into.
705   *
706   * @return an instance of ConfigDef
707   */
708  @Override
709  public ConfigDef getNodeConfigDef()
710    {
711    if( nodeConfigDef == null )
712      nodeConfigDef = new ConfigDef();
713
714    return nodeConfigDef;
715    }
716
717  /**
718   * Returns {@code true} if there are properties in the nodeConfigDef instance.
719   *
720   * @return true if there are nodeConfigDef properties
721   */
722  @Override
723  public boolean hasNodeConfigDef()
724    {
725    return nodeConfigDef != null && !nodeConfigDef.isEmpty();
726    }
727
728  /**
729   * Returns a {@link ConfigDef} instance that allows for process level properties to be set and made available via
730   * a resulting {@link cascading.flow.FlowProcess} instance when the tap is invoked.
731   * <p/>
732   * Any properties set on the stepConfigDef will not show up in any Flow configuration, but will show up in
733   * the current process {@link cascading.flow.FlowStep} (in Hadoop the MapReduce jobconf). Any value set in the
734   * stepConfigDef will be overridden by the tap local {@code #getConfigDef} instance.
735   * </p>
736   * Use this method to tweak properties in the process step this tap instance is planned into.
737   * <p/>
738   * Note the {@code *confInit} methods are called before any ConfigDef is applied, so any values placed into
739   * a ConfigDef instance will not be visible to them.
740   *
741   * @return an instance of ConfigDef
742   */
743  @Override
744  public ConfigDef getStepConfigDef()
745    {
746    if( stepConfigDef == null )
747      stepConfigDef = new ConfigDef();
748
749    return stepConfigDef;
750    }
751
752  /**
753   * Returns {@code true} if there are properties in the stepConfigDef instance.
754   *
755   * @return true if there are stepConfigDef properties
756   */
757  @Override
758  public boolean hasStepConfigDef()
759    {
760    return stepConfigDef != null && !stepConfigDef.isEmpty();
761    }
762
763  @Override
764  public boolean equals( Object object )
765    {
766    if( this == object )
767      return true;
768    if( object == null || getClass() != object.getClass() )
769      return false;
770
771    Tap tap = (Tap) object;
772
773    if( getIdentifier() != null ? !getIdentifier().equals( tap.getIdentifier() ) : tap.getIdentifier() != null )
774      return false;
775
776    if( getScheme() != null ? !getScheme().equals( tap.getScheme() ) : tap.getScheme() != null )
777      return false;
778
779    return true;
780    }
781
782  @Override
783  public int hashCode()
784    {
785    int result = getIdentifier() != null ? getIdentifier().hashCode() : 0;
786
787    result = 31 * result + ( getScheme() != null ? getScheme().hashCode() : 0 );
788
789    return result;
790    }
791
792  @Override
793  public String toString()
794    {
795    if( getIdentifier() != null )
796      return getClass().getSimpleName() + "[\"" + getScheme() + "\"]" + "[\"" + Util.sanitizeUrl( getIdentifier() ) + "\"]"; // sanitize
797    else
798      return getClass().getSimpleName() + "[\"" + getScheme() + "\"]" + "[not initialized]";
799    }
800  }