001/*
002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.tap.local;
022
023import java.beans.ConstructorProperties;
024import java.io.File;
025import java.io.FileInputStream;
026import java.io.FileNotFoundException;
027import java.io.IOException;
028import java.io.InputStream;
029import java.io.OutputStream;
030import java.util.Properties;
031
032import cascading.flow.FlowProcess;
033import cascading.tap.SinkMode;
034import cascading.tap.Tap;
035import cascading.tap.local.io.TapFileOutputStream;
036import cascading.tap.partition.BasePartitionTap;
037import cascading.tap.partition.Partition;
038import cascading.tuple.TupleEntrySchemeCollector;
039import cascading.tuple.TupleEntrySchemeIterator;
040
041/**
042 * Class PartitionTap can be used to write tuple streams out to files and sub-directories based on the values in the
043 * current {@link cascading.tuple.Tuple} instance.
044 * <p/>
045 * The constructor takes a {@link cascading.tap.local.FileTap} {@link cascading.tap.Tap} and a {@link Partition}
046 * implementation. This allows Tuple values at given positions to be used as directory names during write
047 * operations, and directory names as data during read operations.
048 * <p/>
049 * The key value here is that there is no need to duplicate data values in the directory names and inside
050 * the data files.
051 * <p/>
052 * So only values declared in the parent Tap will be read or written to the underlying file system files. But
053 * fields declared by the {@link Partition} will only be read or written to the directory names. That is, the
054 * PartitionTap instance will sink or source the partition fields, plus the parent Tap fields. The partition
055 * fields and parent Tap fields do not need to have common field names.
056 * <p/>
057 * {@code openWritesThreshold} limits the number of open files to be output to. This value defaults to 300 files.
058 * Each time the threshold is exceeded, 10% of the least recently used open files will be closed.
059 * <p/>
060 * PartitionTap will populate a given {@code partition} without regard to case of the values being used. Thus
061 * the resulting paths {@code 2012/June/} and {@code 2012/june/} will likely result in two open files into the same
062 * location. Forcing the case to be consistent with a custom Partition implementation or an upstream
063 * {@link cascading.operation.Function} is recommended, see {@link cascading.operation.expression.ExpressionFunction}.
064 */
065public class PartitionTap extends BasePartitionTap<Properties, InputStream, OutputStream>
066  {
067  /**
068   * Constructor PartitionTap creates a new PartitionTap instance using the given parent {@link cascading.tap.local.FileTap} Tap as the
069   * base path and default {@link cascading.scheme.Scheme}, and the partition.
070   *
071   * @param parent    of type Tap
072   * @param partition of type Partition
073   */
074  @ConstructorProperties({"parent", "partition"})
075  public PartitionTap( FileTap parent, Partition partition )
076    {
077    this( parent, partition, OPEN_WRITES_THRESHOLD_DEFAULT );
078    }
079
080  /**
081   * Constructor PartitionTap creates a new PartitionTap instance using the given parent {@link cascading.tap.local.FileTap} Tap as the
082   * base path and default {@link cascading.scheme.Scheme}, and the partition.
083   * <p/>
084   * {@code openWritesThreshold} limits the number of open files to be output to.
085   *
086   * @param parent              of type Hfs
087   * @param partition           of type Partition
088   * @param openWritesThreshold of type int
089   */
090  @ConstructorProperties({"parent", "partition", "openWritesThreshold"})
091  public PartitionTap( FileTap parent, Partition partition, int openWritesThreshold )
092    {
093    super( parent, partition, openWritesThreshold );
094    }
095
096  /**
097   * Constructor PartitionTap creates a new PartitionTap instance using the given parent {@link cascading.tap.local.FileTap} Tap as the
098   * base path and default {@link cascading.scheme.Scheme}, and the partition.
099   *
100   * @param parent    of type Tap
101   * @param partition of type Partition
102   * @param sinkMode  of type SinkMode
103   */
104  @ConstructorProperties({"parent", "partition", "sinkMode"})
105  public PartitionTap( FileTap parent, Partition partition, SinkMode sinkMode )
106    {
107    super( parent, partition, sinkMode );
108    }
109
110  /**
111   * Constructor PartitionTap creates a new PartitionTap instance using the given parent {@link cascading.tap.local.FileTap} Tap as the
112   * base path and default {@link cascading.scheme.Scheme}, and the partition.
113   * <p/>
114   * {@code keepParentOnDelete}, when set to true, prevents the parent Tap from being deleted when {@link #deleteResource(Object)}
115   * is called, typically an issue when used inside a {@link cascading.cascade.Cascade}.
116   *
117   * @param parent             of type Tap
118   * @param partition          of type Partition
119   * @param sinkMode           of type SinkMode
120   * @param keepParentOnDelete of type boolean
121   */
122  @ConstructorProperties({"parent", "partition", "sinkMode", "keepParentOnDelete"})
123  public PartitionTap( FileTap parent, Partition partition, SinkMode sinkMode, boolean keepParentOnDelete )
124    {
125    this( parent, partition, sinkMode, keepParentOnDelete, OPEN_WRITES_THRESHOLD_DEFAULT );
126    }
127
128  /**
129   * Constructor PartitionTap creates a new PartitionTap instance using the given parent {@link cascading.tap.local.FileTap} Tap as the
130   * base path and default {@link cascading.scheme.Scheme}, and the partition.
131   * <p/>
132   * {@code keepParentOnDelete}, when set to true, prevents the parent Tap from being deleted when {@link #deleteResource(Object)}
133   * is called, typically an issue when used inside a {@link cascading.cascade.Cascade}.
134   * <p/>
135   * {@code openWritesThreshold} limits the number of open files to be output to.
136   *
137   * @param parent              of type Tap
138   * @param partition           of type Partition
139   * @param sinkMode            of type SinkMode
140   * @param keepParentOnDelete  of type boolean
141   * @param openWritesThreshold of type int
142   */
143  @ConstructorProperties({"parent", "partition", "sinkMode", "keepParentOnDelete", "openWritesThreshold"})
144  public PartitionTap( FileTap parent, Partition partition, SinkMode sinkMode, boolean keepParentOnDelete, int openWritesThreshold )
145    {
146    super( parent, partition, sinkMode, keepParentOnDelete, openWritesThreshold );
147    }
148
149  @Override
150  protected String getCurrentIdentifier( FlowProcess<? extends Properties> flowProcess )
151    {
152    return null;
153    }
154
155  @Override
156  public boolean deleteResource( Properties conf ) throws IOException
157    {
158    String[] childIdentifiers = ( (FileTap) parent ).getChildIdentifiers( conf, Integer.MAX_VALUE, false );
159
160    if( childIdentifiers.length == 0 )
161      return true;
162
163    boolean result = false;
164
165    for( String childIdentifier : childIdentifiers )
166      result |= new File( childIdentifier ).delete();
167
168    return result;
169    }
170
171  @Override
172  protected TupleEntrySchemeCollector createTupleEntrySchemeCollector( FlowProcess<? extends Properties> flowProcess, Tap parent, String path, long sequence ) throws IOException
173    {
174    TapFileOutputStream output = new TapFileOutputStream( parent, path, true ); // always append
175
176    return new TupleEntrySchemeCollector<Properties, OutputStream>( flowProcess, parent, output );
177    }
178
179  @Override
180  protected TupleEntrySchemeIterator createTupleEntrySchemeIterator( FlowProcess<? extends Properties> flowProcess, Tap parent, String path, InputStream input ) throws FileNotFoundException
181    {
182    if( input == null )
183      input = new FileInputStream( path );
184
185    return new TupleEntrySchemeIterator( flowProcess, parent.getScheme(), input, path );
186    }
187  }