001    /*
002     * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.tap;
022    
023    import java.beans.ConstructorProperties;
024    import java.io.IOException;
025    import java.util.Set;
026    
027    import cascading.flow.Flow;
028    import cascading.flow.FlowElement;
029    import cascading.flow.FlowProcess;
030    import cascading.flow.planner.Scope;
031    import cascading.property.ConfigDef;
032    import cascading.scheme.Scheme;
033    import cascading.tuple.Fields;
034    import cascading.tuple.TupleEntryCollector;
035    import cascading.tuple.TupleEntryIterator;
036    
037    /**
038     * Class DecoratorTap wraps a given {@link Tap} instance, delegating all calls to the original.
039     * <p/>
040     * It also provides an additional generic field that may hold any custom type, this allows implementations
041     * to attach any meta-info to the tap being decorated.
042     * <p/>
043     * Further, sub-classes of DecoratorTap can be used to decorate any Tap instances associated or created for
044     * use by the {@link cascading.pipe.Checkpoint} {@link cascading.pipe.Pipe}.
045     * <p/>
046     * Sub-classing this is optional if the aim is to simply attach relevant meta-info for use by a given application.
047     * <p/>
048     * In order to pass any meta-info to a management service via the {@link cascading.management.annotation.Property}
049     * annotation, a sub-class of DecoratorTap must be provided.
050     */
051    public class DecoratorTap<MetaInfo, Config, Input, Output> extends Tap<Config, Input, Output>
052      {
053      protected MetaInfo metaInfo;
054      protected Tap<Config, Input, Output> original;
055    
056      /**
057       * Creates a new Tap instance, wrapping the given Tap, and associates the given MetaInfo type with
058       * the wrapped Tap instance.
059       *
060       * @param metaInfo meta-information about the current Tap
061       * @param original the decorated Tap instance
062       */
063      @ConstructorProperties({"metaInfo", "original"})
064      public DecoratorTap( MetaInfo metaInfo, Tap<Config, Input, Output> original )
065        {
066        setMetaInfo( metaInfo );
067        setOriginal( original );
068        }
069    
070      /**
071       * Creates a new Tap instance, wrapping the given Tap, and associates the given MetaInfo type with
072       * the wrapped Tap instance.
073       *
074       * @param original the decorated Tap instance
075       */
076      @ConstructorProperties({"original"})
077      public DecoratorTap( Tap<Config, Input, Output> original )
078        {
079        setOriginal( original );
080        }
081    
082      public MetaInfo getMetaInfo()
083        {
084        return metaInfo;
085        }
086    
087      public Tap<Config, Input, Output> getOriginal()
088        {
089        return original;
090        }
091    
092      protected void setOriginal( Tap<Config, Input, Output> original )
093        {
094        if( original == null )
095          throw new IllegalArgumentException( "wrapped tap value may not be null" );
096    
097        this.original = original;
098        }
099    
100      protected void setMetaInfo( MetaInfo metaInfo )
101        {
102        this.metaInfo = metaInfo;
103        }
104    
105      @Override
106      public Scheme<Config, Input, Output, ?, ?> getScheme()
107        {
108        return original.getScheme();
109        }
110    
111      @Override
112      public String getTrace()
113        {
114        return original.getTrace();
115        }
116    
117      @Override
118      public void flowConfInit( Flow<Config> flow )
119        {
120        original.flowConfInit( flow );
121        }
122    
123      @Override
124      public void sourceConfInit( FlowProcess<Config> flowProcess, Config conf )
125        {
126        original.sourceConfInit( flowProcess, conf );
127        }
128    
129      @Override
130      public void sinkConfInit( FlowProcess<Config> flowProcess, Config conf )
131        {
132        original.sinkConfInit( flowProcess, conf );
133        }
134    
135      @Override
136      public String getIdentifier()
137        {
138        return original.getIdentifier();
139        }
140    
141      @Override
142      public Fields getSourceFields()
143        {
144        return original.getSourceFields();
145        }
146    
147      @Override
148      public Fields getSinkFields()
149        {
150        return original.getSinkFields();
151        }
152    
153      @Override
154      public TupleEntryIterator openForRead( FlowProcess<Config> flowProcess, Input input ) throws IOException
155        {
156        return original.openForRead( flowProcess, input );
157        }
158    
159      @Override
160      public TupleEntryIterator openForRead( FlowProcess<Config> flowProcess ) throws IOException
161        {
162        return original.openForRead( flowProcess );
163        }
164    
165      @Override
166      public TupleEntryCollector openForWrite( FlowProcess<Config> flowProcess, Output output ) throws IOException
167        {
168        return original.openForWrite( flowProcess, output );
169        }
170    
171      @Override
172      public TupleEntryCollector openForWrite( FlowProcess<Config> flowProcess ) throws IOException
173        {
174        return original.openForWrite( flowProcess );
175        }
176    
177      @Override
178      public Scope outgoingScopeFor( Set<Scope> incomingScopes )
179        {
180        return original.outgoingScopeFor( incomingScopes );
181        }
182    
183      @Override
184      public Fields retrieveSourceFields( FlowProcess<Config> flowProcess )
185        {
186        return original.retrieveSourceFields( flowProcess );
187        }
188    
189      @Override
190      public void presentSourceFields( FlowProcess<Config> flowProcess, Fields fields )
191        {
192        original.presentSourceFields( flowProcess, fields );
193        }
194    
195      @Override
196      public Fields retrieveSinkFields( FlowProcess<Config> flowProcess )
197        {
198        return original.retrieveSinkFields( flowProcess );
199        }
200    
201      @Override
202      public void presentSinkFields( FlowProcess<Config> flowProcess, Fields fields )
203        {
204        original.presentSinkFields( flowProcess, fields );
205        }
206    
207      @Override
208      public Fields resolveIncomingOperationArgumentFields( Scope incomingScope )
209        {
210        return original.resolveIncomingOperationArgumentFields( incomingScope );
211        }
212    
213      @Override
214      public Fields resolveIncomingOperationPassThroughFields( Scope incomingScope )
215        {
216        return original.resolveIncomingOperationPassThroughFields( incomingScope );
217        }
218    
219      @Override
220      public String getFullIdentifier( FlowProcess<Config> flowProcess )
221        {
222        return original.getFullIdentifier( flowProcess );
223        }
224    
225      @Override
226      public String getFullIdentifier( Config conf )
227        {
228        return original.getFullIdentifier( conf );
229        }
230    
231      @Override
232      public boolean createResource( FlowProcess<Config> flowProcess ) throws IOException
233        {
234        return original.createResource( flowProcess );
235        }
236    
237      @Override
238      public boolean createResource( Config conf ) throws IOException
239        {
240        return original.createResource( conf );
241        }
242    
243      @Override
244      public boolean deleteResource( FlowProcess<Config> flowProcess ) throws IOException
245        {
246        return original.deleteResource( flowProcess );
247        }
248    
249      @Override
250      public boolean deleteResource( Config conf ) throws IOException
251        {
252        return original.deleteResource( conf );
253        }
254    
255      @Override
256      public boolean prepareResourceForRead( Config conf ) throws IOException
257        {
258        return original.prepareResourceForRead( conf );
259        }
260    
261      @Override
262      public boolean prepareResourceForWrite( Config conf ) throws IOException
263        {
264        return original.prepareResourceForWrite( conf );
265        }
266    
267      @Override
268      public boolean commitResource( Config conf ) throws IOException
269        {
270        return original.commitResource( conf );
271        }
272    
273      @Override
274      public boolean rollbackResource( Config conf ) throws IOException
275        {
276        return original.rollbackResource( conf );
277        }
278    
279      @Override
280      public boolean resourceExists( FlowProcess<Config> flowProcess ) throws IOException
281        {
282        return original.resourceExists( flowProcess );
283        }
284    
285      @Override
286      public boolean resourceExists( Config conf ) throws IOException
287        {
288        return original.resourceExists( conf );
289        }
290    
291      @Override
292      public long getModifiedTime( FlowProcess<Config> flowProcess ) throws IOException
293        {
294        return original.getModifiedTime( flowProcess );
295        }
296    
297      @Override
298      public long getModifiedTime( Config conf ) throws IOException
299        {
300        return original.getModifiedTime( conf );
301        }
302    
303      @Override
304      public SinkMode getSinkMode()
305        {
306        return original.getSinkMode();
307        }
308    
309      @Override
310      public boolean isKeep()
311        {
312        return original.isKeep();
313        }
314    
315      @Override
316      public boolean isReplace()
317        {
318        return original.isReplace();
319        }
320    
321      @Override
322      public boolean isUpdate()
323        {
324        return original.isUpdate();
325        }
326    
327      @Override
328      public boolean isSink()
329        {
330        return original.isSink();
331        }
332    
333      @Override
334      public boolean isSource()
335        {
336        return original.isSource();
337        }
338    
339      @Override
340      public boolean isTemporary()
341        {
342        return original.isTemporary();
343        }
344    
345      @Override
346      public ConfigDef getConfigDef()
347        {
348        return original.getConfigDef();
349        }
350    
351      @Override
352      public boolean hasConfigDef()
353        {
354        return original.hasConfigDef();
355        }
356    
357      @Override
358      public ConfigDef getStepConfigDef()
359        {
360        return original.getStepConfigDef();
361        }
362    
363      @Override
364      public boolean hasStepConfigDef()
365        {
366        return original.hasStepConfigDef();
367        }
368    
369      @Override
370      public boolean isEquivalentTo( FlowElement element )
371        {
372        return original.isEquivalentTo( element );
373        }
374    
375      @Override
376      public String toString()
377        {
378        return original.toString();
379        }
380      }