001/*
002 * Copyright (c) 2016-2018 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
004 *
005 * Project and contact information: http://www.cascading.org/
006 *
007 * This file is part of the Cascading project.
008 *
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *     http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 */
021
022package cascading.tap;
023
024import java.io.Closeable;
025import java.io.IOException;
026import java.io.Serializable;
027import java.io.UncheckedIOException;
028import java.util.Set;
029import java.util.Spliterator;
030import java.util.Spliterators;
031import java.util.stream.Stream;
032import java.util.stream.StreamSupport;
033
034import cascading.flow.Flow;
035import cascading.flow.FlowElement;
036import cascading.flow.FlowException;
037import cascading.flow.FlowProcess;
038import cascading.flow.planner.Scope;
039import cascading.flow.planner.ScopedElement;
040import cascading.management.annotation.Property;
041import cascading.management.annotation.PropertyDescription;
042import cascading.management.annotation.PropertySanitizer;
043import cascading.management.annotation.Visibility;
044import cascading.pipe.Pipe;
045import cascading.property.ConfigDef;
046import cascading.scheme.Scheme;
047import cascading.tuple.Fields;
048import cascading.tuple.FieldsResolverException;
049import cascading.tuple.Tuple;
050import cascading.tuple.TupleEntry;
051import cascading.tuple.TupleEntryCollector;
052import cascading.tuple.TupleEntryIterator;
053import cascading.util.TraceUtil;
054import cascading.util.Traceable;
055import cascading.util.Util;
056
057/**
058 * A Tap represents the physical data source or sink in a connected {@link cascading.flow.Flow}.
059 * <p>
060 * That is, a source Tap is the head end of a connected {@link Pipe} and {@link Tuple} stream, and
061 * a sink Tap is the tail end. Kinds of Tap types are used to manage files from a local disk,
062 * distributed disk, remote storage like Amazon S3, or via FTP. It simply abstracts
063 * out the complexity of connecting to these types of data sources.
064 * <p>
065 * A Tap takes a {@link Scheme} instance, which is used to identify the type of resource (text file, binary file, etc).
066 * A Tap is responsible for how the resource is reached.
067 * <p>
068 * By default when planning a Flow, Tap equality is a function of the {@link #getIdentifier()} and {@link #getScheme()}
069 * values. That is, two Tap instances are the same Tap instance if they sink/source the same resource and sink/source
070 * the same fields.
071 * <p>
072 * Some more advanced taps, like a database tap, may need to extend equality to include any filtering, like the
073 * {@code where} clause in a SQL statement so two taps reading from the same SQL table aren't considered equal.
074 * <p>
075 * Taps are also used to determine dependencies between two or more {@link Flow} instances when used with a
076 * {@link cascading.cascade.Cascade}. In that case the {@link #getFullIdentifier(Object)} value is used and the Scheme
077 * is ignored.
078 */
079public abstract class Tap<Config, Input, Output> implements ScopedElement, FlowElement, Serializable, Traceable
080  {
081  /** Field scheme */
082  private Scheme<Config, Input, Output, ?, ?> scheme;
083
084  /** Field mode */
085  SinkMode sinkMode = SinkMode.KEEP;
086
087  private ConfigDef configDef;
088  private ConfigDef nodeConfigDef;
089  private ConfigDef stepConfigDef;
090
091  /** Field id */
092  private final String id = Util.createUniqueID(); // 3.0 planner relies on this being consistent
093  /** Field trace */
094  private String trace = TraceUtil.captureDebugTrace( this ); // see TraceUtil.setTrace() to override
095
096  /**
097   * Convenience function to make an array of Tap instances.
098   *
099   * @param taps of type Tap
100   * @return Tap array
101   */
102  public static Tap[] taps( Tap... taps )
103    {
104    return taps;
105    }
106
107  /**
108   * Creates and returns a unique ID for the given Tap, this value is cached and may be used to uniquely identify
109   * the Tap instance in properties files etc.
110   * <p>
111   * This value is generally reproducible assuming the Tap identifier and the Scheme source and sink Fields remain consistent.
112   *
113   * @param tap of type Tap
114   * @return of type String
115   */
116  public static synchronized String id( Tap tap )
117    {
118    if( tap instanceof DecoratorTap )
119      return id( ( (DecoratorTap) tap ).getOriginal() );
120
121    return tap.id;
122    }
123
124  protected Tap()
125    {
126    }
127
128  protected Tap( Scheme<Config, Input, Output, ?, ?> scheme )
129    {
130    this.setScheme( scheme );
131    }
132
133  protected Tap( Scheme<Config, Input, Output, ?, ?> scheme, SinkMode sinkMode )
134    {
135    this.setScheme( scheme );
136
137    if( sinkMode != null )
138      this.sinkMode = sinkMode;
139    }
140
141  protected void setScheme( Scheme<Config, Input, Output, ?, ?> scheme )
142    {
143    this.scheme = scheme;
144    }
145
146  /**
147   * Method getScheme returns the scheme of this Tap object.
148   *
149   * @return the scheme (type Scheme) of this Tap object.
150   */
151  public Scheme<Config, Input, Output, ?, ?> getScheme()
152    {
153    return scheme;
154    }
155
156  @Override
157  public String getTrace()
158    {
159    return trace;
160    }
161
162  /**
163   * Method flowInit allows this Tap instance to initialize itself in context of the given {@link cascading.flow.Flow} instance.
164   * This method is guaranteed to be called before the Flow is started and the
165   * {@link cascading.flow.FlowListener#onStarting(cascading.flow.Flow)} event is fired.
166   * <p>
167   * This method will be called once per Flow, and before {@link #sourceConfInit(cascading.flow.FlowProcess, Object)} and
168   * {@link #sinkConfInit(cascading.flow.FlowProcess, Object)} methods.
169   *
170   * @param flow of type Flow
171   */
172  public void flowConfInit( Flow<Config> flow )
173    {
174
175    }
176
177  /**
178   * Method sourceConfInit initializes this instance as a source.
179   * <p>
180   * This method maybe called more than once if this Tap instance is used outside the scope of a {@link cascading.flow.Flow}
181   * instance or if it participates in multiple times in a given Flow or across different Flows in
182   * a {@link cascading.cascade.Cascade}.
183   * <p>
184   * In the context of a Flow, it will be called after
185   * {@link cascading.flow.FlowListener#onStarting(cascading.flow.Flow)}
186   * <p>
187   * Note that no resources or services should be modified by this method.
188   *
189   * @param flowProcess of type FlowProcess
190   * @param conf        of type Config
191   */
192  public void sourceConfInit( FlowProcess<? extends Config> flowProcess, Config conf )
193    {
194    getScheme().sourceConfInit( flowProcess, this, conf );
195    }
196
197  /**
198   * Method sinkConfInit initializes this instance as a sink.
199   * <p>
200   * This method maybe called more than once if this Tap instance is used outside the scope of a {@link cascading.flow.Flow}
201   * instance or if it participates in multiple times in a given Flow or across different Flows in
202   * a {@link cascading.cascade.Cascade}.
203   * <p>
204   * Note this method will be called in context of this Tap being used as a traditional 'sink' and as a 'trap'.
205   * <p>
206   * In the context of a Flow, it will be called after
207   * {@link cascading.flow.FlowListener#onStarting(cascading.flow.Flow)}
208   * <p>
209   * Note that no resources or services should be modified by this method. If this Tap instance returns true for
210   * {@link #isReplace()}, then {@link #deleteResource(Object)} will be called by the parent Flow.
211   *
212   * @param flowProcess of type FlowProcess
213   * @param conf        of type Config
214   */
215  public void sinkConfInit( FlowProcess<? extends Config> flowProcess, Config conf )
216    {
217    getScheme().sinkConfInit( flowProcess, this, conf );
218    }
219
220  /**
221   * Method getIdentifier returns a String representing the resource this Tap instance represents.
222   * <p>
223   * Often, if the tap accesses a filesystem, the identifier is nothing more than the path to the file or directory.
224   * In other cases it may be a an URL or URI representing a connection string or remote resource.
225   * <p>
226   * Any two Tap instances having the same value for the identifier are considered equal.
227   *
228   * @return String
229   */
230  @Property(name = "identifier", visibility = Visibility.PUBLIC)
231  @PropertyDescription("The resource this instance represents")
232  @PropertySanitizer("cascading.management.annotation.URISanitizer")
233  public abstract String getIdentifier();
234
235  /**
236   * Method getSourceFields returns the sourceFields of this Tap object.
237   *
238   * @return the sourceFields (type Fields) of this Tap object.
239   */
240  public Fields getSourceFields()
241    {
242    return getScheme().getSourceFields();
243    }
244
245  /**
246   * Method getSinkFields returns the sinkFields of this Tap object.
247   *
248   * @return the sinkFields (type Fields) of this Tap object.
249   */
250  public Fields getSinkFields()
251    {
252    return getScheme().getSinkFields();
253    }
254
255  /**
256   * Method openForRead opens the resource represented by this Tap instance for reading.
257   * <p>
258   * {@code input} value may be null, if so, sub-classes must inquire with the underlying {@link Scheme}
259   * via {@link Scheme#sourceConfInit(cascading.flow.FlowProcess, Tap, Object)} to get the proper
260   * input type and instantiate it before calling {@code super.openForRead()}.
261   * <p>
262   * Note the returned iterator will return the same instance of {@link cascading.tuple.TupleEntry} on every call,
263   * thus a copy must be made of either the TupleEntry or the underlying {@code Tuple} instance if they are to be
264   * stored in a Collection.
265   *
266   * @param flowProcess of type FlowProcess
267   * @param input       of type Input
268   * @return TupleEntryIterator
269   * @throws java.io.IOException when the resource cannot be opened
270   */
271  public abstract TupleEntryIterator openForRead( FlowProcess<? extends Config> flowProcess, Input input ) throws IOException;
272
273  /**
274   * Method openForRead opens the resource represented by this Tap instance for reading.
275   * <p>
276   * Note the returned iterator will return the same instance of {@link cascading.tuple.TupleEntry} on every call,
277   * thus a copy must be made of either the TupleEntry or the underlying {@code Tuple} instance if they are to be
278   * stored in a Collection.
279   *
280   * @param flowProcess of type FlowProcess
281   * @return TupleEntryIterator
282   * @throws java.io.IOException when the resource cannot be opened
283   */
284  public TupleEntryIterator openForRead( FlowProcess<? extends Config> flowProcess ) throws IOException
285    {
286    return openForRead( flowProcess, null );
287    }
288
289  /**
290   * Method openForWrite opens the resource represented by this Tap instance for writing.
291   * <p>
292   * This method is used internally and does not honor the {@link SinkMode} setting. If SinkMode is
293   * {@link SinkMode#REPLACE}, this call may fail. See {@link #openForWrite(cascading.flow.FlowProcess)}.
294   * <p>
295   * {@code output} value may be null, if so, sub-classes must inquire with the underlying {@link Scheme}
296   * via {@link Scheme#sinkConfInit(cascading.flow.FlowProcess, Tap, Object)} to get the proper
297   * output type and instantiate it before calling {@code super.openForWrite()}.
298   *
299   * @param flowProcess of type FlowProcess
300   * @param output      of type Output
301   * @return TupleEntryCollector
302   * @throws java.io.IOException when the resource cannot be opened
303   */
304  public abstract TupleEntryCollector openForWrite( FlowProcess<? extends Config> flowProcess, Output output ) throws IOException;
305
306  /**
307   * Method openForWrite opens the resource represented by this Tap instance for writing.
308   * <p>
309   * This method is for user application use and does honor the {@link SinkMode#REPLACE} settings. That is, if
310   * SinkMode is set to {@link SinkMode#REPLACE} the underlying resource will be deleted.
311   * <p>
312   * Note if {@link SinkMode#UPDATE} is set, the resource will not be deleted.
313   *
314   * @param flowProcess of type FlowProcess
315   * @return TupleEntryCollector
316   * @throws java.io.IOException when the resource cannot be opened
317   */
318  public TupleEntryCollector openForWrite( FlowProcess<? extends Config> flowProcess ) throws IOException
319    {
320    if( isReplace() )
321      deleteResource( flowProcess );
322
323    return openForWrite( flowProcess, null );
324    }
325
326  @Override
327  public Scope outgoingScopeFor( Set<Scope> incomingScopes )
328    {
329    // as a source Tap, we emit the scheme defined Fields
330    // as a sink Tap, we declare we emit the incoming Fields
331    // as a temp Tap, this method never gets called, but we emit what we consume
332    int count = 0;
333    for( Scope incomingScope : incomingScopes )
334      {
335      Fields incomingFields = incomingScope.getIncomingTapFields();
336
337      if( incomingFields != null )
338        {
339        try
340          {
341          incomingFields.select( getSinkFields() );
342          }
343        catch( FieldsResolverException exception )
344          {
345          throw new TapException( this, exception.getSourceFields(), exception.getSelectorFields(), exception );
346          }
347
348        count++;
349        }
350      }
351
352    if( count > 1 )
353      throw new FlowException( "Tap may not have more than one incoming Scope" );
354
355    // this allows the incoming to be passed through to the outgoing
356    Fields incomingFields = incomingScopes.size() == 0 ? null : incomingScopes.iterator().next().getIncomingTapFields();
357
358    if( incomingFields != null &&
359      ( isSource() && getSourceFields().equals( Fields.UNKNOWN ) ||
360        isSink() && getSinkFields().equals( Fields.ALL ) ) )
361      return new Scope( incomingFields );
362
363    if( count == 1 )
364      return new Scope( getSinkFields() );
365
366    return new Scope( getSourceFields() );
367    }
368
369  /**
370   * A hook for allowing a Scheme to lazily retrieve its source fields.
371   *
372   * @param flowProcess of type FlowProcess
373   * @return the found Fields
374   */
375  public Fields retrieveSourceFields( FlowProcess<? extends Config> flowProcess )
376    {
377    return getScheme().retrieveSourceFields( flowProcess, this );
378    }
379
380  public void presentSourceFields( FlowProcess<? extends Config> flowProcess, Fields fields )
381    {
382    getScheme().presentSourceFields( flowProcess, this, fields );
383    }
384
385  /**
386   * A hook for allowing a Scheme to lazily retrieve its sink fields.
387   *
388   * @param flowProcess of type FlowProcess
389   * @return the found Fields
390   */
391  public Fields retrieveSinkFields( FlowProcess<? extends Config> flowProcess )
392    {
393    return getScheme().retrieveSinkFields( flowProcess, this );
394    }
395
396  public void presentSinkFields( FlowProcess<? extends Config> flowProcess, Fields fields )
397    {
398    getScheme().presentSinkFields( flowProcess, this, fields );
399    }
400
401  @Override
402  public Fields resolveIncomingOperationArgumentFields( Scope incomingScope )
403    {
404    return incomingScope.getIncomingTapFields();
405    }
406
407  @Override
408  public Fields resolveIncomingOperationPassThroughFields( Scope incomingScope )
409    {
410    return incomingScope.getIncomingTapFields();
411    }
412
413  /**
414   * Method getFullIdentifier returns a fully qualified resource identifier.
415   *
416   * @param flowProcess of type FlowProcess
417   * @return String
418   */
419  public String getFullIdentifier( FlowProcess<? extends Config> flowProcess )
420    {
421    return getFullIdentifier( flowProcess.getConfig() );
422    }
423
424  /**
425   * Method getFullIdentifier returns a fully qualified resource identifier.
426   *
427   * @param conf of type Config
428   * @return String
429   */
430  public String getFullIdentifier( Config conf )
431    {
432    return getIdentifier();
433    }
434
435  /**
436   * Method createResource creates the underlying resource.
437   *
438   * @param flowProcess of type FlowProcess
439   * @return boolean
440   * @throws IOException when there is an error making directories
441   */
442  public boolean createResource( FlowProcess<? extends Config> flowProcess ) throws IOException
443    {
444    return createResource( flowProcess.getConfig() );
445    }
446
447  /**
448   * Method createResource creates the underlying resource.
449   *
450   * @param conf of type Config
451   * @return boolean
452   * @throws IOException when there is an error making directories
453   */
454  public abstract boolean createResource( Config conf ) throws IOException;
455
456  /**
457   * Method deleteResource deletes the resource represented by this instance.
458   *
459   * @param flowProcess of type FlowProcess
460   * @return boolean
461   * @throws IOException when the resource cannot be deleted
462   */
463  public boolean deleteResource( FlowProcess<? extends Config> flowProcess ) throws IOException
464    {
465    return deleteResource( flowProcess.getConfig() );
466    }
467
468  /**
469   * Method deleteResource deletes the resource represented by this instance.
470   *
471   * @param conf of type Config
472   * @return boolean
473   * @throws IOException when the resource cannot be deleted
474   */
475  public abstract boolean deleteResource( Config conf ) throws IOException;
476
477  /**
478   * Method prepareResourceForRead allows the underlying resource to be notified when reading will begin.
479   * <p>
480   * This method will be called client side so that any remote or external resources can be initialized.
481   * <p>
482   * If this method returns {@code false}, an exception will be thrown halting the current Flow.
483   * <p>
484   * In most cases, resource initialization should happen in the {@link #openForRead(FlowProcess, Object)}  method.
485   * <p>
486   * This allows for initialization of cluster side resources, like a JDBC driver used to read data from a database,
487   * that cannot be passed client to cluster.
488   *
489   * @param conf of type Config
490   * @return returns true if successful
491   * @throws IOException
492   */
493  public boolean prepareResourceForRead( Config conf ) throws IOException
494    {
495    return true;
496    }
497
498  /**
499   * Method prepareResourceForWrite allows the underlying resource to be notified when writing will begin.
500   * <p>
501   * This method will be called once client side so that any remote or external resources can be initialized.
502   * <p>
503   * If this method returns {@code false}, an exception will be thrown halting the current Flow.
504   * <p>
505   * In most cases, resource initialization should happen in the {@link #openForWrite(FlowProcess, Object)} method.
506   * <p>
507   * This allows for initialization of cluster side resources, like a JDBC driver used to write data to a database,
508   * that cannot be passed client to cluster.
509   * <p>
510   * In the above JDBC example, overriding this method will allow for testing for the existence of and/or creating
511   * a remote table used by all individual cluster side tasks.
512   *
513   * @param conf of type Config
514   * @return returns true if successful
515   * @throws IOException
516   */
517  public boolean prepareResourceForWrite( Config conf ) throws IOException
518    {
519    return true;
520    }
521
522  /**
523   * Method commitResource allows the underlying resource to be notified when all write processing is
524   * successful so that any additional cleanup or processing may be completed.
525   * <p>
526   * See {@link #rollbackResource(Object)} to handle cleanup in the face of failures.
527   * <p>
528   * This method is invoked once client side and not in the cluster, if any.
529   * <p>
530   * If other sink Tap instance in a given Flow fail on commitResource after called on this instance,
531   * rollbackResource will not be called.
532   *
533   * @param conf of type Config
534   * @return returns true if successful
535   * @throws IOException
536   */
537  public boolean commitResource( Config conf ) throws IOException
538    {
539    return true;
540    }
541
542  /**
543   * Method rollbackResource allows the underlying resource to be notified when any write processing has failed or
544   * was stopped so that any cleanup may be started.
545   * <p>
546   * See {@link #commitResource(Object)} to handle cleanup when the write has successfully completed.
547   * <p>
548   * This method is invoked once client side and not in the cluster, if any.
549   *
550   * @param conf of type Config
551   * @return returns true if successful
552   * @throws IOException
553   */
554  public boolean rollbackResource( Config conf ) throws IOException
555    {
556    return true;
557    }
558
559  /**
560   * Method resourceExists returns true if the path represented by this instance exists.
561   *
562   * @param flowProcess of type FlowProcess
563   * @return true if the underlying resource already exists
564   * @throws IOException when the status cannot be determined
565   */
566  public boolean resourceExists( FlowProcess<? extends Config> flowProcess ) throws IOException
567    {
568    return resourceExists( flowProcess.getConfig() );
569    }
570
571  /**
572   * Method resourceExists returns true if the path represented by this instance exists.
573   *
574   * @param conf of type Config
575   * @return true if the underlying resource already exists
576   * @throws IOException when the status cannot be determined
577   */
578  public abstract boolean resourceExists( Config conf ) throws IOException;
579
580  /**
581   * Method getModifiedTime returns the date this resource was last modified.
582   * <p>
583   * If the resource does not exist, returns zero (0).
584   * <p>
585   * If the resource is continuous, returns {@link Long#MAX_VALUE}.
586   *
587   * @param flowProcess of type FlowProcess
588   * @return The date this resource was last modified.
589   * @throws IOException
590   */
591  public long getModifiedTime( FlowProcess<? extends Config> flowProcess ) throws IOException
592    {
593    return getModifiedTime( flowProcess.getConfig() );
594    }
595
596  /**
597   * Method getModifiedTime returns the date this resource was last modified.
598   * <p>
599   * If the resource does not exist, returns zero (0).
600   * <p>
601   * If the resource is continuous, returns {@link Long#MAX_VALUE}.
602   *
603   * @param conf of type Config
604   * @return The date this resource was last modified.
605   * @throws IOException
606   */
607  public abstract long getModifiedTime( Config conf ) throws IOException;
608
609  /**
610   * Method getSinkMode returns the {@link SinkMode} }of this Tap object.
611   *
612   * @return the sinkMode (type SinkMode) of this Tap object.
613   */
614  public SinkMode getSinkMode()
615    {
616    return sinkMode;
617    }
618
619  /**
620   * Method isKeep indicates whether the resource represented by this instance should be kept if it
621   * already exists when the Flow is started.
622   *
623   * @return boolean
624   */
625  public boolean isKeep()
626    {
627    return sinkMode == SinkMode.KEEP;
628    }
629
630  /**
631   * Method isReplace indicates whether the resource represented by this instance should be deleted if it
632   * already exists when the Flow is started.
633   *
634   * @return boolean
635   */
636  public boolean isReplace()
637    {
638    return sinkMode == SinkMode.REPLACE;
639    }
640
641  /**
642   * Method isUpdate indicates whether the resource represented by this instance should be updated if it already
643   * exists. Otherwise a new resource will be created, via {@link #createResource(Object)}, when the Flow is started.
644   *
645   * @return boolean
646   */
647  public boolean isUpdate()
648    {
649    return sinkMode == SinkMode.UPDATE;
650    }
651
652  /**
653   * Method isSink returns true if this Tap instance can be used as a sink.
654   *
655   * @return boolean
656   */
657  public boolean isSink()
658    {
659    return getScheme().isSink();
660    }
661
662  /**
663   * Method isSource returns true if this Tap instance can be used as a source.
664   *
665   * @return boolean
666   */
667  public boolean isSource()
668    {
669    return getScheme().isSource();
670    }
671
672  /**
673   * Method isTemporary returns true if this Tap is temporary (used for intermediate results).
674   *
675   * @return the temporary (type boolean) of this Tap object.
676   */
677  public boolean isTemporary()
678    {
679    return false;
680    }
681
682  /**
683   * Returns a {@link cascading.property.ConfigDef} instance that allows for local properties to be set and made available via
684   * a resulting {@link cascading.flow.FlowProcess} instance when the tap is invoked.
685   * <p>
686   * Any properties set on the configDef will not show up in any {@link Flow} or {@link cascading.flow.FlowStep} process
687   * level configuration, but will override any of those values as seen by the current Tap instance method call where a
688   * FlowProcess is provided except for the {@link #sourceConfInit(cascading.flow.FlowProcess, Object)} and
689   * {@link #sinkConfInit(cascading.flow.FlowProcess, Object)} methods.
690   * <p>
691   * That is, the {@code *confInit} methods are called before any ConfigDef is applied, so any values placed into
692   * a ConfigDef instance will not be visible to them.
693   *
694   * @return an instance of ConfigDef
695   */
696  public ConfigDef getConfigDef()
697    {
698    if( configDef == null )
699      configDef = new ConfigDef();
700
701    return configDef;
702    }
703
704  /**
705   * Returns {@code true} if there are properties in the configDef instance.
706   *
707   * @return true if there are configDef properties
708   */
709  public boolean hasConfigDef()
710    {
711    return configDef != null && !configDef.isEmpty();
712    }
713
714  /**
715   * Returns a {@link ConfigDef} instance that allows for process level properties to be set and made available via
716   * a resulting {@link cascading.flow.FlowProcess} instance when the tap is invoked.
717   * <p>
718   * Any properties set on the nodeConfigDef will not show up in any Flow configuration, but will show up in
719   * the current process {@link cascading.flow.FlowNode} (in Apache Tez the Vertex configuration). Any value set in the
720   * nodeConfigDef will be overridden by the pipe local {@code #getConfigDef} instance.
721   * <p>
722   * Use this method to tweak properties in the process node this tap instance is planned into.
723   *
724   * @return an instance of ConfigDef
725   */
726  @Override
727  public ConfigDef getNodeConfigDef()
728    {
729    if( nodeConfigDef == null )
730      nodeConfigDef = new ConfigDef();
731
732    return nodeConfigDef;
733    }
734
735  /**
736   * Returns {@code true} if there are properties in the nodeConfigDef instance.
737   *
738   * @return true if there are nodeConfigDef properties
739   */
740  @Override
741  public boolean hasNodeConfigDef()
742    {
743    return nodeConfigDef != null && !nodeConfigDef.isEmpty();
744    }
745
746  /**
747   * Returns a {@link ConfigDef} instance that allows for process level properties to be set and made available via
748   * a resulting {@link cascading.flow.FlowProcess} instance when the tap is invoked.
749   * <p>
750   * Any properties set on the stepConfigDef will not show up in any Flow configuration, but will show up in
751   * the current process {@link cascading.flow.FlowStep} (in Hadoop the MapReduce jobconf). Any value set in the
752   * stepConfigDef will be overridden by the tap local {@code #getConfigDef} instance.
753   * <p>
754   * Use this method to tweak properties in the process step this tap instance is planned into.
755   * <p>
756   * Note the {@code *confInit} methods are called before any ConfigDef is applied, so any values placed into
757   * a ConfigDef instance will not be visible to them.
758   *
759   * @return an instance of ConfigDef
760   */
761  @Override
762  public ConfigDef getStepConfigDef()
763    {
764    if( stepConfigDef == null )
765      stepConfigDef = new ConfigDef();
766
767    return stepConfigDef;
768    }
769
770  /**
771   * Returns {@code true} if there are properties in the stepConfigDef instance.
772   *
773   * @return true if there are stepConfigDef properties
774   */
775  @Override
776  public boolean hasStepConfigDef()
777    {
778    return stepConfigDef != null && !stepConfigDef.isEmpty();
779    }
780
781  public Spliterator<TupleEntry> spliterator( FlowProcess<? extends Config> flowProcess )
782    {
783    return splititerator( openForReadUnchecked( flowProcess ) );
784    }
785
786  protected TupleEntryIterator openForReadUnchecked( FlowProcess<? extends Config> flowProcess )
787    {
788    try
789      {
790      return openForRead( flowProcess );
791      }
792    catch( IOException exception )
793      {
794      throw new UncheckedIOException( exception );
795      }
796    }
797
798  protected Spliterator<TupleEntry> splititerator( TupleEntryIterator iterator )
799    {
800    return Spliterators.spliteratorUnknownSize( iterator, 0 );
801    }
802
803  /**
804   * Method entryStream returns a {@link Stream} of {@link TupleEntry} instances from the given
805   * Tap instance.
806   * <p>
807   * Also see {@link cascading.tuple.TupleEntryStream#entryStream(Tap, FlowProcess)}.
808   * <p>
809   * Note, the returned Stream instance must be closed in order to clean up underlying resources. This
810   * is simply accomplished with a try-with-resources statement.
811   *
812   * @param flowProcess represents the current platform configuration
813   * @return a Stream of TupleEntry instances
814   */
815  public Stream<TupleEntry> entryStream( FlowProcess<? extends Config> flowProcess )
816    {
817    TupleEntryIterator iterator = openForReadUnchecked( flowProcess );
818    Spliterator<TupleEntry> spliterator = splititerator( iterator );
819
820    try
821      {
822      return StreamSupport
823        .stream( spliterator, false )
824        .onClose( asUncheckedRunnable( iterator ) );
825      }
826    catch( Error | RuntimeException error )
827      {
828      try
829        {
830        iterator.close();
831        }
832      catch( IOException exception )
833        {
834        try
835          {
836          error.addSuppressed( exception );
837          }
838        catch( Throwable ignore ){}
839        }
840
841      throw error;
842      }
843    }
844
845  /**
846   * Method entryStreamCopy returns a {@link Stream} of {@link TupleEntry} instances from the given
847   * Tap instance.
848   * <p>
849   * This method returns an TupleEntry instance suitable for caching.
850   * <p>
851   * Also see {@link cascading.tuple.TupleEntryStream#entryStreamCopy(Tap, FlowProcess)}.
852   * <p>
853   * Note, the returned Stream instance must be closed in order to clean up underlying resources. This
854   * is simply accomplished with a try-with-resources statement.
855   *
856   * @param flowProcess represents the current platform configuration
857   * @return a Stream of TupleEntry instances
858   */
859  public Stream<TupleEntry> entryStreamCopy( FlowProcess<? extends Config> flowProcess )
860    {
861    return entryStream( flowProcess ).map( TupleEntry::new );
862    }
863
864  /**
865   * Method entryStream returns a {@link Stream} of {@link TupleEntry} instances from the given
866   * Tap instance.
867   * <p>
868   * Also see {@link cascading.tuple.TupleEntryStream#entryStream(Tap, FlowProcess, Fields)}.
869   * <p>
870   * Note, the returned Stream instance must be closed in order to clean up underlying resources. This
871   * is simply accomplished with a try-with-resources statement.
872   *
873   * @param flowProcess represents the current platform configuration
874   * @param selector    the fields to select from the underlying TupleEntry
875   * @return a Stream of TupleEntry instances
876   */
877  public Stream<TupleEntry> entryStream( FlowProcess<? extends Config> flowProcess, Fields selector )
878    {
879    return entryStream( flowProcess ).map( tupleEntry -> tupleEntry.selectEntry( selector ) );
880    }
881
882  /**
883   * Method entryStreamCopy returns a {@link Stream} of {@link TupleEntry} instances from the given
884   * Tap instance.
885   * <p>
886   * Also see {@link cascading.tuple.TupleEntryStream#entryStreamCopy(Tap, FlowProcess)}.
887   * <p>
888   * Note, the returned Stream instance must be closed in order to clean up underlying resources. This
889   * is simply accomplished with a try-with-resources statement.
890   *
891   * @param flowProcess represents the current platform configuration
892   * @param selector    the fields to select from the underlying TupleEntry
893   * @return a Stream of TupleEntry instances
894   */
895  public Stream<TupleEntry> entryStreamCopy( FlowProcess<? extends Config> flowProcess, Fields selector )
896    {
897    return entryStream( flowProcess ).map( tupleEntry -> tupleEntry.selectEntryCopy( selector ) );
898    }
899
900  /**
901   * Method tupleStream returns a {@link Stream} of {@link Tuple} instances from the given
902   * Tap instance.
903   * <p>
904   * Also see {@link cascading.tuple.TupleStream#tupleStream(Tap, FlowProcess)}.
905   *
906   * @param flowProcess represents the current platform configuration
907   * @return a Stream of Tuple instances
908   */
909  public Stream<Tuple> tupleStream( FlowProcess<? extends Config> flowProcess )
910    {
911    return entryStream( flowProcess ).map( TupleEntry::getTuple );
912    }
913
914  /**
915   * Method tupleStreamCopy returns a {@link Stream} of {@link Tuple} instances from the given
916   * Tap instance.
917   * <p>
918   * This method returns an Tuple instance suitable for caching.
919   * <p>
920   * Also see {@link cascading.tuple.TupleStream#tupleStreamCopy(Tap, FlowProcess)}.
921   *
922   * @param flowProcess represents the current platform configuration
923   * @return a Stream of Tuple instances
924   */
925  public Stream<Tuple> tupleStreamCopy( FlowProcess<? extends Config> flowProcess )
926    {
927    return entryStream( flowProcess ).map( TupleEntry::getTupleCopy );
928    }
929
930  /**
931   * Method tupleStream returns a {@link Stream} of {@link Tuple} instances from the given
932   * Tap instance.
933   * <p>
934   * Also see {@link cascading.tuple.TupleStream#tupleStream(Tap, FlowProcess, Fields)}.
935   *
936   * @param flowProcess represents the current platform configuration
937   * @param selector    the fields to select from the underlying Tuple
938   * @return a Stream of TupleE instances
939   */
940  public Stream<Tuple> tupleStream( FlowProcess<? extends Config> flowProcess, Fields selector )
941    {
942    return entryStream( flowProcess ).map( tupleEntry -> tupleEntry.selectTuple( selector ) );
943    }
944
945  /**
946   * Method tupleStreamCopy returns a {@link Stream} of {@link Tuple} instances from the given
947   * Tap instance.
948   * <p>
949   * This method returns an Tuple instance suitable for caching.
950   * <p>
951   * Also see {@link cascading.tuple.TupleStream#tupleStreamCopy(Tap, FlowProcess)}.
952   *
953   * @param flowProcess represents the current platform configuration
954   * @param selector    the fields to select from the underlying Tuple
955   * @return a Stream of TupleE instances
956   */
957  public Stream<Tuple> tupleStreamCopy( FlowProcess<? extends Config> flowProcess, Fields selector )
958    {
959    return entryStream( flowProcess ).map( tupleEntry -> tupleEntry.selectTupleCopy( selector ) );
960    }
961
962  private static Runnable asUncheckedRunnable( Closeable closeable )
963    {
964    return () ->
965    {
966    try
967      {
968      closeable.close();
969      }
970    catch( IOException exception )
971      {
972      throw new UncheckedIOException( exception );
973      }
974    };
975    }
976
977  @Override
978  public boolean equals( Object object )
979    {
980    if( this == object )
981      return true;
982    if( object == null || getClass() != object.getClass() )
983      return false;
984
985    Tap tap = (Tap) object;
986
987    if( getIdentifier() != null ? !getIdentifier().equals( tap.getIdentifier() ) : tap.getIdentifier() != null )
988      return false;
989
990    if( getScheme() != null ? !getScheme().equals( tap.getScheme() ) : tap.getScheme() != null )
991      return false;
992
993    return true;
994    }
995
996  @Override
997  public int hashCode()
998    {
999    int result = getIdentifier() != null ? getIdentifier().hashCode() : 0;
1000
1001    result = 31 * result + ( getScheme() != null ? getScheme().hashCode() : 0 );
1002
1003    return result;
1004    }
1005
1006  @Override
1007  public String toString()
1008    {
1009    if( getIdentifier() != null )
1010      return getClass().getSimpleName() + "[\"" + getScheme() + "\"]" + "[\"" + Util.sanitizeUrl( getIdentifier() ) + "\"]"; // sanitize
1011    else
1012      return getClass().getSimpleName() + "[\"" + getScheme() + "\"]" + "[not initialized]";
1013    }
1014  }