001    /*
002     * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.tap;
022    
023    import java.io.IOException;
024    import java.util.HashSet;
025    import java.util.LinkedHashMap;
026    import java.util.Map;
027    import java.util.Set;
028    
029    import cascading.flow.FlowProcess;
030    import cascading.scheme.Scheme;
031    import cascading.scheme.SinkCall;
032    import cascading.scheme.SourceCall;
033    import cascading.tuple.Fields;
034    import cascading.tuple.Tuple;
035    import cascading.tuple.TupleEntry;
036    import cascading.tuple.TupleEntryCollector;
037    import cascading.tuple.TupleEntrySchemeCollector;
038    import org.slf4j.Logger;
039    import org.slf4j.LoggerFactory;
040    
041    /**
042     *
043     */
044    public abstract class BaseTemplateTap<Config, Output> extends SinkTap<Config, Output>
045      {
046      /** Field LOG */
047      private static final Logger LOG = LoggerFactory.getLogger( BaseTemplateTap.class );
048      /** Field OPEN_FILES_THRESHOLD_DEFAULT */
049      protected static final int OPEN_TAPS_THRESHOLD_DEFAULT = 300;
050    
051      private class TemplateCollector extends TupleEntryCollector
052        {
053        private final FlowProcess<Config> flowProcess;
054        private final Config conf;
055        private final Fields parentFields;
056        private final Fields pathFields;
057    
058        public TemplateCollector( FlowProcess<Config> flowProcess )
059          {
060          super( Fields.asDeclaration( getSinkFields() ) );
061          this.flowProcess = flowProcess;
062          this.conf = flowProcess.getConfigCopy();
063          this.parentFields = parent.getSinkFields();
064          this.pathFields = ( (TemplateScheme) getScheme() ).pathFields;
065          }
066    
067        private TupleEntryCollector getCollector( String path )
068          {
069          TupleEntryCollector collector = collectors.get( path );
070    
071          if( collector != null )
072            return collector;
073    
074          try
075            {
076            LOG.debug( "creating collector for parent: {}, path: {}", parent.getFullIdentifier( conf ), path );
077    
078            collector = createTupleEntrySchemeCollector( flowProcess, parent, path );
079    
080            flowProcess.increment( Counters.Paths_Opened, 1 );
081            }
082          catch( IOException exception )
083            {
084            throw new TapException( "unable to open template path: " + path, exception );
085            }
086    
087          if( collectors.size() > openTapsThreshold )
088            purgeCollectors();
089    
090          collectors.put( path, collector );
091    
092          if( LOG.isInfoEnabled() && collectors.size() % 100 == 0 )
093            LOG.info( "caching {} open Taps", collectors.size() );
094    
095          return collector;
096          }
097    
098        private void purgeCollectors()
099          {
100          int numToClose = Math.max( 1, (int) ( openTapsThreshold * .10 ) );
101    
102          if( LOG.isInfoEnabled() )
103            LOG.info( "removing {} open Taps from cache of size {}", numToClose, collectors.size() );
104    
105          Set<String> removeKeys = new HashSet<String>();
106          Set<String> keys = collectors.keySet();
107    
108          for( String key : keys )
109            {
110            if( numToClose-- == 0 )
111              break;
112    
113            removeKeys.add( key );
114            }
115    
116          for( String removeKey : removeKeys )
117            closeCollector( collectors.remove( removeKey ) );
118    
119          flowProcess.increment( Counters.Path_Purges, 1 );
120          }
121    
122        @Override
123        public void close()
124          {
125          super.close();
126    
127          try
128            {
129            for( TupleEntryCollector collector : collectors.values() )
130              closeCollector( collector );
131            }
132          finally
133            {
134            collectors.clear();
135            }
136          }
137    
138        private void closeCollector( TupleEntryCollector collector )
139          {
140          if( collector == null )
141            return;
142    
143          try
144            {
145            collector.close();
146    
147            flowProcess.increment( Counters.Paths_Closed, 1 );
148            }
149          catch( Exception exception )
150            {
151            // do nothing
152            }
153          }
154    
155        protected void collect( TupleEntry tupleEntry ) throws IOException
156          {
157          if( pathFields != null )
158            {
159            Tuple pathValues = tupleEntry.selectTuple( pathFields );
160            String path = pathValues.format( pathTemplate );
161    
162            getCollector( path ).add( tupleEntry.selectTuple( parentFields ) );
163            }
164          else
165            {
166            String path = tupleEntry.getTuple().format( pathTemplate );
167    
168            getCollector( path ).add( tupleEntry );
169            }
170          }
171        }
172    
173      /** Field parent */
174      protected Tap parent;
175      /** Field pathTemplate */
176      protected String pathTemplate;
177      /** Field keepParentOnDelete */
178      protected boolean keepParentOnDelete = false;
179      /** Field openTapsThreshold */
180      protected int openTapsThreshold = OPEN_TAPS_THRESHOLD_DEFAULT;
181      /** Field collectors */
182      private final Map<String, TupleEntryCollector> collectors = new LinkedHashMap<String, TupleEntryCollector>( 1000, .75f, true );
183    
184      protected abstract TupleEntrySchemeCollector createTupleEntrySchemeCollector( FlowProcess<Config> flowProcess, Tap parent, String path ) throws IOException;
185    
186      /**
187       * Method getParent returns the parent Tap of this TemplateTap object.
188       *
189       * @return the parent (type Tap) of this TemplateTap object.
190       */
191      public Tap getParent()
192        {
193        return parent;
194        }
195    
196      /**
197       * Method getPathTemplate returns the pathTemplate {@link java.util.Formatter} format String of this TemplateTap object.
198       *
199       * @return the pathTemplate (type String) of this TemplateTap object.
200       */
201      public String getPathTemplate()
202        {
203        return pathTemplate;
204        }
205    
206      @Override
207      public String getIdentifier()
208        {
209        return parent.getIdentifier();
210        }
211    
212      /**
213       * Method getOpenTapsThreshold returns the openTapsThreshold of this TemplateTap object.
214       *
215       * @return the openTapsThreshold (type int) of this TemplateTap object.
216       */
217      public int getOpenTapsThreshold()
218        {
219        return openTapsThreshold;
220        }
221    
222      @Override
223      public TupleEntryCollector openForWrite( FlowProcess<Config> flowProcess, Output output ) throws IOException
224        {
225        return new TemplateCollector( flowProcess );
226        }
227    
228      /** @see cascading.tap.Tap#createResource(Object) */
229      public boolean createResource( Config conf ) throws IOException
230        {
231        return parent.createResource( conf );
232        }
233    
234      /** @see cascading.tap.Tap#deleteResource(Object) */
235      public boolean deleteResource( Config conf ) throws IOException
236        {
237        return keepParentOnDelete || parent.deleteResource( conf );
238        }
239    
240      @Override
241      public boolean prepareResourceForRead( Config conf ) throws IOException
242        {
243        return parent.prepareResourceForRead( conf );
244        }
245    
246      @Override
247      public boolean prepareResourceForWrite( Config conf ) throws IOException
248        {
249        return parent.prepareResourceForWrite( conf );
250        }
251    
252      @Override
253      public boolean commitResource( Config conf ) throws IOException
254        {
255        return parent.commitResource( conf );
256        }
257    
258      @Override
259      public boolean rollbackResource( Config conf ) throws IOException
260        {
261        return parent.rollbackResource( conf );
262        }
263    
264      /** @see cascading.tap.Tap#resourceExists(Object) */
265      public boolean resourceExists( Config conf ) throws IOException
266        {
267        return parent.resourceExists( conf );
268        }
269    
270      /** @see cascading.tap.Tap#getModifiedTime(Object) */
271      @Override
272      public long getModifiedTime( Config conf ) throws IOException
273        {
274        return parent.getModifiedTime( conf );
275        }
276    
277      @Override
278      public boolean equals( Object object )
279        {
280        if( this == object )
281          return true;
282        if( object == null || getClass() != object.getClass() )
283          return false;
284        if( !super.equals( object ) )
285          return false;
286    
287        BaseTemplateTap that = (BaseTemplateTap) object;
288    
289        if( parent != null ? !parent.equals( that.parent ) : that.parent != null )
290          return false;
291        if( pathTemplate != null ? !pathTemplate.equals( that.pathTemplate ) : that.pathTemplate != null )
292          return false;
293    
294        return true;
295        }
296    
297      @Override
298      public int hashCode()
299        {
300        int result = super.hashCode();
301        result = 31 * result + ( parent != null ? parent.hashCode() : 0 );
302        result = 31 * result + ( pathTemplate != null ? pathTemplate.hashCode() : 0 );
303        return result;
304        }
305    
306      @Override
307      public String toString()
308        {
309        return getClass().getSimpleName() + "[\"" + parent + "\"]" + "[\"" + pathTemplate + "\"]";
310        }
311    
312      public enum Counters
313        {
314          Paths_Opened, Paths_Closed, Path_Purges
315        }
316    
317      protected BaseTemplateTap( Tap parent, String pathTemplate, int openTapsThreshold )
318        {
319        this( new TemplateScheme( parent.getScheme() ) );
320        this.parent = parent;
321        this.pathTemplate = pathTemplate;
322        this.openTapsThreshold = openTapsThreshold;
323        }
324    
325      protected BaseTemplateTap( Tap parent, String pathTemplate, SinkMode sinkMode )
326        {
327        super( new TemplateScheme( parent.getScheme() ), sinkMode );
328        this.parent = parent;
329        this.pathTemplate = pathTemplate;
330        }
331    
332      protected BaseTemplateTap( Tap parent, String pathTemplate, SinkMode sinkMode, boolean keepParentOnDelete, int openTapsThreshold )
333        {
334        super( new TemplateScheme( parent.getScheme() ), sinkMode );
335        this.parent = parent;
336        this.pathTemplate = pathTemplate;
337        this.keepParentOnDelete = keepParentOnDelete;
338        this.openTapsThreshold = openTapsThreshold;
339        }
340    
341      protected BaseTemplateTap( Tap parent, String pathTemplate, Fields pathFields, int openTapsThreshold )
342        {
343        super( new TemplateScheme( parent.getScheme(), pathFields ) );
344        this.parent = parent;
345        this.pathTemplate = pathTemplate;
346        this.openTapsThreshold = openTapsThreshold;
347        }
348    
349      protected BaseTemplateTap( Tap parent, String pathTemplate, Fields pathFields, SinkMode sinkMode )
350        {
351        super( new TemplateScheme( parent.getScheme(), pathFields ), sinkMode );
352        this.parent = parent;
353        this.pathTemplate = pathTemplate;
354        }
355    
356      protected BaseTemplateTap( Tap parent, String pathTemplate, Fields pathFields, SinkMode sinkMode, boolean keepParentOnDelete, int openTapsThreshold )
357        {
358        super( new TemplateScheme( parent.getScheme(), pathFields ), sinkMode );
359        this.parent = parent;
360        this.pathTemplate = pathTemplate;
361        this.keepParentOnDelete = keepParentOnDelete;
362        this.openTapsThreshold = openTapsThreshold;
363        }
364    
365      protected BaseTemplateTap( Scheme<Config, ?, Output, ?, ?> scheme, SinkMode sinkMode )
366        {
367        super( scheme, sinkMode );
368        }
369    
370      protected BaseTemplateTap( Scheme<Config, ?, Output, ?, ?> scheme )
371        {
372        super( scheme );
373        }
374    
375      public static class TemplateScheme<Config, Output> extends Scheme<Config, Void, Output, Void, Void>
376        {
377        private final Scheme scheme;
378        private final Fields pathFields;
379    
380        public TemplateScheme( Scheme scheme )
381          {
382          this.scheme = scheme;
383          this.pathFields = null;
384          }
385    
386        public TemplateScheme( Scheme scheme, Fields pathFields )
387          {
388          this.scheme = scheme;
389    
390          if( pathFields == null || pathFields.isAll() )
391            this.pathFields = null;
392          else if( pathFields.isDefined() )
393            this.pathFields = pathFields;
394          else
395            throw new IllegalArgumentException( "pathFields must be defined or the ALL substitution, got: " + pathFields.printVerbose() );
396          }
397    
398        public Fields getSinkFields()
399          {
400          if( pathFields == null || scheme.getSinkFields().isAll() )
401            return scheme.getSinkFields();
402    
403          return Fields.merge( scheme.getSinkFields(), pathFields );
404          }
405    
406        public void setSinkFields( Fields sinkFields )
407          {
408          scheme.setSinkFields( sinkFields );
409          }
410    
411        public Fields getSourceFields()
412          {
413          return scheme.getSourceFields();
414          }
415    
416        public void setSourceFields( Fields sourceFields )
417          {
418          scheme.setSourceFields( sourceFields );
419          }
420    
421        public int getNumSinkParts()
422          {
423          return scheme.getNumSinkParts();
424          }
425    
426        public void setNumSinkParts( int numSinkParts )
427          {
428          scheme.setNumSinkParts( numSinkParts );
429          }
430    
431        @Override
432        public void sourceConfInit( FlowProcess<Config> flowProcess, Tap<Config, Void, Output> tap, Config conf )
433          {
434          scheme.sourceConfInit( flowProcess, tap, conf );
435          }
436    
437        @Override
438        public void sourcePrepare( FlowProcess<Config> flowProcess, SourceCall<Void, Void> sourceCall ) throws IOException
439          {
440          scheme.sourcePrepare( flowProcess, sourceCall );
441          }
442    
443        @Override
444        public boolean source( FlowProcess<Config> flowProcess, SourceCall<Void, Void> sourceCall ) throws IOException
445          {
446          throw new UnsupportedOperationException( "not supported" );
447          }
448    
449        @Override
450        public void sourceCleanup( FlowProcess<Config> flowProcess, SourceCall<Void, Void> sourceCall ) throws IOException
451          {
452          scheme.sourceCleanup( flowProcess, sourceCall );
453          }
454    
455        @Override
456        public void sinkConfInit( FlowProcess<Config> flowProcess, Tap<Config, Void, Output> tap, Config conf )
457          {
458          scheme.sinkConfInit( flowProcess, tap, conf );
459          }
460    
461        @Override
462        public void sinkPrepare( FlowProcess<Config> flowProcess, SinkCall<Void, Output> sinkCall ) throws IOException
463          {
464          scheme.sinkPrepare( flowProcess, sinkCall );
465          }
466    
467        @Override
468        public void sink( FlowProcess<Config> flowProcess, SinkCall<Void, Output> sinkCall ) throws IOException
469          {
470          throw new UnsupportedOperationException( "should never be called" );
471          }
472    
473        @Override
474        public void sinkCleanup( FlowProcess<Config> flowProcess, SinkCall<Void, Output> sinkCall ) throws IOException
475          {
476          scheme.sinkCleanup( flowProcess, sinkCall );
477          }
478        }
479      }