001    /*
002     * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.tap.local;
022    
023    import java.beans.ConstructorProperties;
024    import java.io.File;
025    import java.io.FileInputStream;
026    import java.io.FileNotFoundException;
027    import java.io.IOException;
028    import java.io.InputStream;
029    import java.io.OutputStream;
030    import java.util.Properties;
031    
032    import cascading.flow.FlowProcess;
033    import cascading.tap.SinkMode;
034    import cascading.tap.Tap;
035    import cascading.tap.local.io.TapFileOutputStream;
036    import cascading.tap.partition.BasePartitionTap;
037    import cascading.tap.partition.Partition;
038    import cascading.tuple.TupleEntrySchemeCollector;
039    import cascading.tuple.TupleEntrySchemeIterator;
040    
041    /**
042     * Class PartitionTap can be used to write tuple streams out to files and sub-directories based on the values in the
043     * current {@link cascading.tuple.Tuple} instance.
044     * <p/>
045     * The constructor takes a {@link cascading.tap.local.FileTap} {@link cascading.tap.Tap} and a {@link Partition}
046     * implementation. This allows Tuple values at given positions to be used as directory names during write
047     * operations, and directory names as data during read operations.
048     * <p/>
049     * The key value here is that there is no need to duplicate data values in the directory names and inside
050     * the data files.
051     * <p/>
052     * So only values declared in the parent Tap will be read or written to the underlying file system files. But
053     * fields declared by the {@link Partition} will only be read or written to the directory names. That is, the
054     * PartitionTap instance will sink or source the partition fields, plus the parent Tap fields. The partition
055     * fields and parent Tap fields do not need to have common field names.
056     * <p/>
057     * {@code openWritesThreshold} limits the number of open files to be output to. This value defaults to 300 files.
058     * Each time the threshold is exceeded, 10% of the least recently used open files will be closed.
059     * <p/>
060     * PartitionTap will populate a given {@code partition} without regard to case of the values being used. Thus
061     * the resulting paths {@code 2012/June/} and {@code 2012/june/} will likely result in two open files into the same
062     * location. Forcing the case to be consistent with a custom Partition implementation or an upstream
063     * {@link cascading.operation.Function} is recommended, see {@link cascading.operation.expression.ExpressionFunction}.
064     */
065    public class PartitionTap extends BasePartitionTap<Properties, InputStream, OutputStream>
066      {
067      /**
068       * Constructor PartitionTap creates a new PartitionTap instance using the given parent {@link cascading.tap.local.FileTap} Tap as the
069       * base path and default {@link cascading.scheme.Scheme}, and the partition.
070       *
071       * @param parent    of type Tap
072       * @param partition of type String
073       */
074      @ConstructorProperties({"parent", "partition"})
075      public PartitionTap( FileTap parent, Partition partition )
076        {
077        this( parent, partition, OPEN_WRITES_THRESHOLD_DEFAULT );
078        }
079    
080      /**
081       * Constructor PartitionTap creates a new PartitionTap instance using the given parent {@link cascading.tap.local.FileTap} Tap as the
082       * base path and default {@link cascading.scheme.Scheme}, and the partition.
083       * <p/>
084       * {@code openWritesThreshold} limits the number of open files to be output to.
085       *
086       * @param parent              of type Hfs
087       * @param partition           of type String
088       * @param openWritesThreshold of type int
089       */
090      @ConstructorProperties({"parent", "partition", "openWritesThreshold"})
091      public PartitionTap( FileTap parent, Partition partition, int openWritesThreshold )
092        {
093        super( parent, partition, openWritesThreshold );
094        }
095    
096      /**
097       * Constructor PartitionTap creates a new PartitionTap instance using the given parent {@link cascading.tap.local.FileTap} Tap as the
098       * base path and default {@link cascading.scheme.Scheme}, and the partition.
099       *
100       * @param parent    of type Tap
101       * @param partition of type String
102       * @param sinkMode  of type SinkMode
103       */
104      @ConstructorProperties({"parent", "partition", "sinkMode"})
105      public PartitionTap( FileTap parent, Partition partition, SinkMode sinkMode )
106        {
107        super( parent, partition, sinkMode );
108        }
109    
110      /**
111       * Constructor PartitionTap creates a new PartitionTap instance using the given parent {@link cascading.tap.local.FileTap} Tap as the
112       * base path and default {@link cascading.scheme.Scheme}, and the partition.
113       * <p/>
114       * {@code keepParentOnDelete}, when set to true, prevents the parent Tap from being deleted when {@link #deleteResource(Object)}
115       * is called, typically an issue when used inside a {@link cascading.cascade.Cascade}.
116       *
117       * @param parent             of type Tap
118       * @param partition          of type String
119       * @param sinkMode           of type SinkMode
120       * @param keepParentOnDelete of type boolean
121       */
122      @ConstructorProperties({"parent", "partition", "sinkMode", "keepParentOnDelete"})
123      public PartitionTap( FileTap parent, Partition partition, SinkMode sinkMode, boolean keepParentOnDelete )
124        {
125        this( parent, partition, sinkMode, keepParentOnDelete, OPEN_WRITES_THRESHOLD_DEFAULT );
126        }
127    
128      /**
129       * Constructor PartitionTap creates a new PartitionTap instance using the given parent {@link cascading.tap.local.FileTap} Tap as the
130       * base path and default {@link cascading.scheme.Scheme}, and the partition.
131       * <p/>
132       * {@code keepParentOnDelete}, when set to true, prevents the parent Tap from being deleted when {@link #deleteResource(Object)}
133       * is called, typically an issue when used inside a {@link cascading.cascade.Cascade}.
134       * <p/>
135       * {@code openWritesThreshold} limits the number of open files to be output to.
136       *
137       * @param parent              of type Tap
138       * @param partition           of type String
139       * @param sinkMode            of type SinkMode
140       * @param keepParentOnDelete  of type boolean
141       * @param openWritesThreshold of type int
142       */
143      @ConstructorProperties({"parent", "partition", "sinkMode", "keepParentOnDelete", "openWritesThreshold"})
144      public PartitionTap( FileTap parent, Partition partition, SinkMode sinkMode, boolean keepParentOnDelete, int openWritesThreshold )
145        {
146        super( parent, partition, sinkMode, keepParentOnDelete, openWritesThreshold );
147        }
148    
149      @Override
150      protected String getCurrentIdentifier( FlowProcess<Properties> flowProcess )
151        {
152        return null;
153        }
154    
155      @Override
156      public boolean deleteResource( Properties conf ) throws IOException
157        {
158        String[] childIdentifiers = ( (FileTap) parent ).getChildIdentifiers( conf, Integer.MAX_VALUE, false );
159    
160        if( childIdentifiers.length == 0 )
161          return true;
162    
163        boolean result = false;
164    
165        for( String childIdentifier : childIdentifiers )
166          result |= new File( childIdentifier ).delete();
167    
168        return result;
169        }
170    
171      @Override
172      protected TupleEntrySchemeCollector createTupleEntrySchemeCollector( FlowProcess<Properties> flowProcess, Tap parent, String path, long sequence ) throws IOException
173        {
174        TapFileOutputStream output = new TapFileOutputStream( parent, path, true ); // always append
175    
176        return new TupleEntrySchemeCollector<Properties, OutputStream>( flowProcess, parent, output );
177        }
178    
179      @Override
180      protected TupleEntrySchemeIterator createTupleEntrySchemeIterator( FlowProcess<Properties> flowProcess, Tap parent, String path, InputStream input ) throws FileNotFoundException
181        {
182        if( input == null )
183          input = new FileInputStream( path );
184    
185        return new TupleEntrySchemeIterator( flowProcess, parent.getScheme(), input, path );
186        }
187      }