001/*
002 * Copyright (c) 2016-2017 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
004 *
005 * Project and contact information: http://www.cascading.org/
006 *
007 * This file is part of the Cascading project.
008 *
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *     http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 */
021
022package cascading.tap.hadoop;
023
024import java.beans.ConstructorProperties;
025import java.io.IOException;
026import java.util.ArrayList;
027import java.util.Iterator;
028import java.util.List;
029
030import cascading.flow.FlowProcess;
031import cascading.tap.SinkMode;
032import cascading.tap.Tap;
033import cascading.tap.TapException;
034import cascading.tap.hadoop.io.CombineInputPartitionTupleEntryIterator;
035import cascading.tap.hadoop.io.HadoopTupleEntrySchemeIterator;
036import cascading.tap.hadoop.io.TapOutputCollector;
037import cascading.tap.hadoop.util.Hadoop18TapUtil;
038import cascading.tap.partition.BasePartitionTap;
039import cascading.tap.partition.Partition;
040import cascading.tap.type.FileType;
041import cascading.tuple.Tuple;
042import cascading.tuple.TupleEntryIterableChainIterator;
043import cascading.tuple.TupleEntryIterator;
044import cascading.tuple.TupleEntrySchemeCollector;
045import cascading.tuple.TupleEntrySchemeIterator;
046import org.apache.hadoop.conf.Configuration;
047import org.apache.hadoop.fs.Path;
048import org.apache.hadoop.mapred.OutputCollector;
049import org.apache.hadoop.mapred.RecordReader;
050
051/**
052 * Class PartitionTap can be used to write tuple streams out to files and sub-directories based on the values in the
053 * current {@link cascading.tuple.Tuple} instance.
054 * <p>
055 * The constructor takes a {@link cascading.tap.hadoop.Hfs} {@link cascading.tap.Tap} and a {@link Partition}
056 * implementation. This allows Tuple values at given positions to be used as directory names during write
057 * operations, and directory names as data during read operations.
058 * <p>
059 * The key value here is that there is no need to duplicate data values in the directory names and inside
060 * the data files.
061 * <p>
062 * So only values declared in the parent Tap will be read or written to the underlying file system files. But
063 * fields declared by the {@link Partition} will only be read or written to the directory names. That is, the
064 * PartitionTap instance will sink or source the partition fields, plus the parent Tap fields. The partition
065 * fields and parent Tap fields do not need to have common field names.
066 * <p>
067 * Note that Hadoop can only sink to directories, and all files in those directories are "part-xxxxx" files.
068 * <p>
069 * {@code openWritesThreshold} limits the number of open files to be output to. This value defaults to 300 files.
070 * Each time the threshold is exceeded, 10% of the least recently used open files will be closed.
071 * <p>
072 * PartitionTap will populate a given {@code partition} without regard to case of the values being used. Thus
073 * the resulting paths {@code 2012/June/} and {@code 2012/june/} will likely result in two open files into the same
074 * location. Forcing the case to be consistent with a custom Partition implementation or an upstream
075 * {@link cascading.operation.Function} is recommended, see {@link cascading.operation.expression.ExpressionFunction}.
076 * <p>
077 * Though Hadoop has no mechanism to prevent simultaneous writes to a directory from multiple jobs, it doesn't mean
078 * its safe to do so. Same is true with the PartitionTap. Interleaving writes to a common parent (root) directory
079 * across multiple flows will very likely lead to data loss.
080 */
081public class PartitionTap extends BasePartitionTap<Configuration, RecordReader, OutputCollector>
082  {
083  /**
084   * Constructor PartitionTap creates a new PartitionTap instance using the given parent {@link cascading.tap.hadoop.Hfs} Tap as the
085   * base path and default {@link cascading.scheme.Scheme}, and the partition.
086   *
087   * @param parent    of type Tap
088   * @param partition of type Partition
089   */
090  @ConstructorProperties({"parent", "partition"})
091  public PartitionTap( Hfs parent, Partition partition )
092    {
093    this( parent, partition, OPEN_WRITES_THRESHOLD_DEFAULT );
094    }
095
096  /**
097   * Constructor PartitionTap creates a new PartitionTap instance using the given parent {@link cascading.tap.hadoop.Hfs} Tap as the
098   * base path and default {@link cascading.scheme.Scheme}, and the partition.
099   * <p>
100   * {@code openWritesThreshold} limits the number of open files to be output to.
101   *
102   * @param parent              of type Hfs
103   * @param partition           of type Partition
104   * @param openWritesThreshold of type int
105   */
106  @ConstructorProperties({"parent", "partition", "openWritesThreshold"})
107  public PartitionTap( Hfs parent, Partition partition, int openWritesThreshold )
108    {
109    super( parent, partition, openWritesThreshold );
110    }
111
112  /**
113   * Constructor PartitionTap creates a new PartitionTap instance using the given parent {@link cascading.tap.hadoop.Hfs} Tap as the
114   * base path and default {@link cascading.scheme.Scheme}, and the partition.
115   *
116   * @param parent    of type Tap
117   * @param partition of type String
118   * @param sinkMode  of type SinkMode
119   */
120  @ConstructorProperties({"parent", "partition", "sinkMode"})
121  public PartitionTap( Hfs parent, Partition partition, SinkMode sinkMode )
122    {
123    super( parent, partition, sinkMode );
124    }
125
126  /**
127   * Constructor PartitionTap creates a new PartitionTap instance using the given parent {@link cascading.tap.hadoop.Hfs} Tap as the
128   * base path and default {@link cascading.scheme.Scheme}, and the partition.
129   * <p>
130   * {@code keepParentOnDelete}, when set to true, prevents the parent Tap from being deleted when {@link #deleteResource(Object)}
131   * is called, typically an issue when used inside a {@link cascading.cascade.Cascade}.
132   *
133   * @param parent             of type Tap
134   * @param partition          of type Partition
135   * @param sinkMode           of type SinkMode
136   * @param keepParentOnDelete of type boolean
137   */
138  @ConstructorProperties({"parent", "partition", "sinkMode", "keepParentOnDelete"})
139  public PartitionTap( Hfs parent, Partition partition, SinkMode sinkMode, boolean keepParentOnDelete )
140    {
141    this( parent, partition, sinkMode, keepParentOnDelete, OPEN_WRITES_THRESHOLD_DEFAULT );
142    }
143
144  /**
145   * Constructor PartitionTap creates a new PartitionTap instance using the given parent {@link cascading.tap.hadoop.Hfs} Tap as the
146   * base path and default {@link cascading.scheme.Scheme}, and the partition.
147   * <p>
148   * {@code keepParentOnDelete}, when set to true, prevents the parent Tap from being deleted when {@link #deleteResource(Object)}
149   * is called, typically an issue when used inside a {@link cascading.cascade.Cascade}.
150   * <p>
151   * {@code openWritesThreshold} limits the number of open files to be output to.
152   *
153   * @param parent              of type Tap
154   * @param partition           of type Partition
155   * @param sinkMode            of type SinkMode
156   * @param keepParentOnDelete  of type boolean
157   * @param openWritesThreshold of type int
158   */
159  @ConstructorProperties({"parent", "partition", "sinkMode", "keepParentOnDelete", "openWritesThreshold"})
160  public PartitionTap( Hfs parent, Partition partition, SinkMode sinkMode, boolean keepParentOnDelete, int openWritesThreshold )
161    {
162    super( parent, partition, sinkMode, keepParentOnDelete, openWritesThreshold );
163    }
164
165  @Override
166  protected TupleEntrySchemeCollector createTupleEntrySchemeCollector( FlowProcess<? extends Configuration> flowProcess, Tap parent, String path, long sequence ) throws IOException
167    {
168    TapOutputCollector outputCollector = new TapOutputCollector( flowProcess, parent, path, sequence );
169
170    return new TupleEntrySchemeCollector<Configuration, OutputCollector>( flowProcess, parent, outputCollector );
171    }
172
173  @Override
174  protected TupleEntrySchemeIterator createTupleEntrySchemeIterator( FlowProcess<? extends Configuration> flowProcess, Tap parent, String path, RecordReader recordReader ) throws IOException
175    {
176    return new HadoopTupleEntrySchemeIterator( flowProcess, new Hfs( parent.getScheme(), path ), recordReader );
177    }
178
179  @Override
180  protected String getCurrentIdentifier( FlowProcess<? extends Configuration> flowProcess )
181    {
182    String identifier = flowProcess.getStringProperty( FileType.CASCADING_SOURCE_PATH ); // set on current split
183
184    if( identifier == null )
185      {
186      if( flowProcess.getBooleanProperty( HfsProps.COMBINE_INPUT_FILES, false ) )
187        throw new TapException( "combined input format support, via '" + HfsProps.COMBINE_INPUT_FILES + "', may not be enabled for use with the PartitionTap" );
188
189      throw new TapException( "unable to retrieve the current file being processed, '" + FileType.CASCADING_SOURCE_PATH + "' is not set" );
190      }
191
192    return new Path( identifier ).getParent().toString(); // drop part-xxxx
193    }
194
195  @Override
196  public void sourceConfInit( FlowProcess<? extends Configuration> flowProcess, Configuration conf )
197    {
198    try
199      {
200      String[] childPartitions = getChildPartitionIdentifiers( flowProcess, true );
201
202      ( (Hfs) getParent() ).applySourceConfInitIdentifiers( flowProcess, conf, childPartitions );
203      }
204    catch( IOException exception )
205      {
206      throw new TapException( "unable to retrieve child partitions", exception );
207      }
208    }
209
210  @Override
211  public TupleEntryIterator openForRead( FlowProcess<? extends Configuration> flowProcess, RecordReader input ) throws IOException
212    {
213    if( flowProcess.getBooleanProperty( HfsProps.COMBINE_INPUT_FILES, false ) )
214      return new CombinePartitionIterator( flowProcess, input );
215
216    return super.openForRead( flowProcess, input );
217    }
218
219  private class CombinePartitionIterator extends TupleEntryIterableChainIterator
220    {
221    public CombinePartitionIterator( final FlowProcess<? extends Configuration> flowProcess, RecordReader input ) throws IOException
222      {
223      super( getSourceFields() );
224
225      List<Iterator<Tuple>> iterators = new ArrayList<Iterator<Tuple>>();
226
227      if( input == null )
228        throw new IOException( "input cannot be null" );
229
230      String identifier = parent.getFullIdentifier( flowProcess );
231
232      iterators.add( createPartitionEntryIterator( flowProcess, input, identifier ) );
233
234      reset( iterators );
235      }
236
237    private CombineInputPartitionTupleEntryIterator createPartitionEntryIterator( FlowProcess<? extends Configuration> flowProcess, RecordReader input, String parentIdentifier ) throws IOException
238      {
239      TupleEntrySchemeIterator schemeIterator = createTupleEntrySchemeIterator( flowProcess, parent, null, input );
240
241      return new CombineInputPartitionTupleEntryIterator( flowProcess, getSourceFields(), partition, parentIdentifier, schemeIterator );
242      }
243    }
244
245  @Override
246  public boolean commitResource( Configuration conf ) throws IOException
247    {
248    Hadoop18TapUtil.writeSuccessMarker( conf, ( (Hfs) parent ).getPath() );
249
250    return super.commitResource( conf );
251    }
252  }