001/*
002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.tap.hadoop;
022
023import java.beans.ConstructorProperties;
024import java.io.IOException;
025import java.util.ArrayList;
026import java.util.Iterator;
027import java.util.List;
028
029import cascading.flow.FlowProcess;
030import cascading.tap.SinkMode;
031import cascading.tap.Tap;
032import cascading.tap.TapException;
033import cascading.tap.hadoop.io.CombineInputPartitionTupleEntryIterator;
034import cascading.tap.hadoop.io.HadoopTupleEntrySchemeIterator;
035import cascading.tap.hadoop.io.MultiInputSplit;
036import cascading.tap.hadoop.io.TapOutputCollector;
037import cascading.tap.partition.BasePartitionTap;
038import cascading.tap.partition.Partition;
039import cascading.tuple.Tuple;
040import cascading.tuple.TupleEntryIterableChainIterator;
041import cascading.tuple.TupleEntryIterator;
042import cascading.tuple.TupleEntrySchemeCollector;
043import cascading.tuple.TupleEntrySchemeIterator;
044import org.apache.hadoop.conf.Configuration;
045import org.apache.hadoop.fs.Path;
046import org.apache.hadoop.mapred.OutputCollector;
047import org.apache.hadoop.mapred.RecordReader;
048
049/**
050 * Class PartitionTap can be used to write tuple streams out to files and sub-directories based on the values in the
051 * current {@link cascading.tuple.Tuple} instance.
052 * <p/>
053 * The constructor takes a {@link cascading.tap.hadoop.Hfs} {@link cascading.tap.Tap} and a {@link Partition}
054 * implementation. This allows Tuple values at given positions to be used as directory names during write
055 * operations, and directory names as data during read operations.
056 * <p/>
057 * The key value here is that there is no need to duplicate data values in the directory names and inside
058 * the data files.
059 * <p/>
060 * So only values declared in the parent Tap will be read or written to the underlying file system files. But
061 * fields declared by the {@link Partition} will only be read or written to the directory names. That is, the
062 * PartitionTap instance will sink or source the partition fields, plus the parent Tap fields. The partition
063 * fields and parent Tap fields do not need to have common field names.
064 * <p/>
065 * Note that Hadoop can only sink to directories, and all files in those directories are "part-xxxxx" files.
066 * <p/>
067 * {@code openWritesThreshold} limits the number of open files to be output to. This value defaults to 300 files.
068 * Each time the threshold is exceeded, 10% of the least recently used open files will be closed.
069 * <p/>
070 * PartitionTap will populate a given {@code partition} without regard to case of the values being used. Thus
071 * the resulting paths {@code 2012/June/} and {@code 2012/june/} will likely result in two open files into the same
072 * location. Forcing the case to be consistent with a custom Partition implementation or an upstream
073 * {@link cascading.operation.Function} is recommended, see {@link cascading.operation.expression.ExpressionFunction}.
074 * <p/>
075 * Though Hadoop has no mechanism to prevent simultaneous writes to a directory from multiple jobs, it doesn't mean
076 * its safe to do so. Same is true with the PartitionTap. Interleaving writes to a common parent (root) directory
077 * across multiple flows will very likely lead to data loss.
078 */
079public class PartitionTap extends BasePartitionTap<Configuration, RecordReader, OutputCollector>
080  {
081  /**
082   * Constructor PartitionTap creates a new PartitionTap instance using the given parent {@link cascading.tap.hadoop.Hfs} Tap as the
083   * base path and default {@link cascading.scheme.Scheme}, and the partition.
084   *
085   * @param parent    of type Tap
086   * @param partition of type Partition
087   */
088  @ConstructorProperties({"parent", "partition"})
089  public PartitionTap( Hfs parent, Partition partition )
090    {
091    this( parent, partition, OPEN_WRITES_THRESHOLD_DEFAULT );
092    }
093
094  /**
095   * Constructor PartitionTap creates a new PartitionTap instance using the given parent {@link cascading.tap.hadoop.Hfs} Tap as the
096   * base path and default {@link cascading.scheme.Scheme}, and the partition.
097   * <p/>
098   * {@code openWritesThreshold} limits the number of open files to be output to.
099   *
100   * @param parent              of type Hfs
101   * @param partition           of type Partition
102   * @param openWritesThreshold of type int
103   */
104  @ConstructorProperties({"parent", "partition", "openWritesThreshold"})
105  public PartitionTap( Hfs parent, Partition partition, int openWritesThreshold )
106    {
107    super( parent, partition, openWritesThreshold );
108    }
109
110  /**
111   * Constructor PartitionTap creates a new PartitionTap instance using the given parent {@link cascading.tap.hadoop.Hfs} Tap as the
112   * base path and default {@link cascading.scheme.Scheme}, and the partition.
113   *
114   * @param parent    of type Tap
115   * @param partition of type String
116   * @param sinkMode  of type SinkMode
117   */
118  @ConstructorProperties({"parent", "partition", "sinkMode"})
119  public PartitionTap( Hfs parent, Partition partition, SinkMode sinkMode )
120    {
121    super( parent, partition, sinkMode );
122    }
123
124  /**
125   * Constructor PartitionTap creates a new PartitionTap instance using the given parent {@link cascading.tap.hadoop.Hfs} Tap as the
126   * base path and default {@link cascading.scheme.Scheme}, and the partition.
127   * <p/>
128   * {@code keepParentOnDelete}, when set to true, prevents the parent Tap from being deleted when {@link #deleteResource(Object)}
129   * is called, typically an issue when used inside a {@link cascading.cascade.Cascade}.
130   *
131   * @param parent             of type Tap
132   * @param partition          of type Partition
133   * @param sinkMode           of type SinkMode
134   * @param keepParentOnDelete of type boolean
135   */
136  @ConstructorProperties({"parent", "partition", "sinkMode", "keepParentOnDelete"})
137  public PartitionTap( Hfs parent, Partition partition, SinkMode sinkMode, boolean keepParentOnDelete )
138    {
139    this( parent, partition, sinkMode, keepParentOnDelete, OPEN_WRITES_THRESHOLD_DEFAULT );
140    }
141
142  /**
143   * Constructor PartitionTap creates a new PartitionTap instance using the given parent {@link cascading.tap.hadoop.Hfs} Tap as the
144   * base path and default {@link cascading.scheme.Scheme}, and the partition.
145   * <p/>
146   * {@code keepParentOnDelete}, when set to true, prevents the parent Tap from being deleted when {@link #deleteResource(Object)}
147   * is called, typically an issue when used inside a {@link cascading.cascade.Cascade}.
148   * <p/>
149   * {@code openWritesThreshold} limits the number of open files to be output to.
150   *
151   * @param parent              of type Tap
152   * @param partition           of type Partition
153   * @param sinkMode            of type SinkMode
154   * @param keepParentOnDelete  of type boolean
155   * @param openWritesThreshold of type int
156   */
157  @ConstructorProperties({"parent", "partition", "sinkMode", "keepParentOnDelete", "openWritesThreshold"})
158  public PartitionTap( Hfs parent, Partition partition, SinkMode sinkMode, boolean keepParentOnDelete, int openWritesThreshold )
159    {
160    super( parent, partition, sinkMode, keepParentOnDelete, openWritesThreshold );
161    }
162
163  @Override
164  protected TupleEntrySchemeCollector createTupleEntrySchemeCollector( FlowProcess<? extends Configuration> flowProcess, Tap parent, String path, long sequence ) throws IOException
165    {
166    TapOutputCollector outputCollector = new TapOutputCollector( flowProcess, parent, path, sequence );
167
168    return new TupleEntrySchemeCollector<Configuration, OutputCollector>( flowProcess, parent, outputCollector );
169    }
170
171  @Override
172  protected TupleEntrySchemeIterator createTupleEntrySchemeIterator( FlowProcess<? extends Configuration> flowProcess, Tap parent, String path, RecordReader recordReader ) throws IOException
173    {
174    return new HadoopTupleEntrySchemeIterator( flowProcess, new Hfs( parent.getScheme(), path ), recordReader );
175    }
176
177  @Override
178  protected String getCurrentIdentifier( FlowProcess<? extends Configuration> flowProcess )
179    {
180    String identifier = flowProcess.getStringProperty( MultiInputSplit.CASCADING_SOURCE_PATH ); // set on current split
181
182    if( identifier == null )
183      {
184      if( flowProcess.getBooleanProperty( HfsProps.COMBINE_INPUT_FILES, false ) )
185        throw new TapException( "combined input format support, via '" + HfsProps.COMBINE_INPUT_FILES + "', may not be enabled for use with the PartitionTap" );
186
187      throw new TapException( "unable to retrieve the current file being processed, '" + MultiInputSplit.CASCADING_SOURCE_PATH + "' is not set" );
188      }
189
190    return new Path( identifier ).getParent().toString(); // drop part-xxxx
191    }
192
193  @Override
194  public void sourceConfInit( FlowProcess<? extends Configuration> flowProcess, Configuration conf )
195    {
196    try
197      {
198      String[] childPartitions = getChildPartitionIdentifiers( flowProcess, true );
199
200      ( (Hfs) getParent() ).applySourceConfInitIdentifiers( flowProcess, conf, childPartitions );
201      }
202    catch( IOException exception )
203      {
204      throw new TapException( "unable to retrieve child partitions", exception );
205      }
206    }
207
208  @Override
209  public TupleEntryIterator openForRead( FlowProcess<? extends Configuration> flowProcess, RecordReader input ) throws IOException
210    {
211    if( flowProcess.getBooleanProperty( HfsProps.COMBINE_INPUT_FILES, false ) )
212      return new CombinePartitionIterator( flowProcess, input );
213
214    return super.openForRead( flowProcess, input );
215    }
216
217  private class CombinePartitionIterator extends TupleEntryIterableChainIterator
218    {
219    public CombinePartitionIterator( final FlowProcess<? extends Configuration> flowProcess, RecordReader input ) throws IOException
220      {
221      super( getSourceFields() );
222
223      List<Iterator<Tuple>> iterators = new ArrayList<Iterator<Tuple>>();
224
225      if( input == null )
226        throw new IOException( "input cannot be null" );
227
228      String identifier = parent.getFullIdentifier( flowProcess );
229
230      iterators.add( createPartitionEntryIterator( flowProcess, input, identifier ) );
231
232      reset( iterators );
233      }
234
235    private CombineInputPartitionTupleEntryIterator createPartitionEntryIterator( FlowProcess<? extends Configuration> flowProcess, RecordReader input, String parentIdentifier ) throws IOException
236      {
237      TupleEntrySchemeIterator schemeIterator = createTupleEntrySchemeIterator( flowProcess, parent, null, input );
238
239      return new CombineInputPartitionTupleEntryIterator( flowProcess, getSourceFields(), partition, parentIdentifier, schemeIterator );
240      }
241    }
242  }