001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.tap.hadoop;
022    
023    import java.beans.ConstructorProperties;
024    import java.io.IOException;
025    
026    import cascading.flow.FlowProcess;
027    import cascading.tap.SinkMode;
028    import cascading.tap.Tap;
029    import cascading.tap.TapException;
030    import cascading.tap.hadoop.io.HadoopTupleEntrySchemeIterator;
031    import cascading.tap.hadoop.io.MultiInputSplit;
032    import cascading.tap.hadoop.io.TapOutputCollector;
033    import cascading.tap.partition.BasePartitionTap;
034    import cascading.tap.partition.Partition;
035    import cascading.tuple.TupleEntrySchemeCollector;
036    import cascading.tuple.TupleEntrySchemeIterator;
037    import org.apache.hadoop.fs.Path;
038    import org.apache.hadoop.mapred.JobConf;
039    import org.apache.hadoop.mapred.OutputCollector;
040    import org.apache.hadoop.mapred.RecordReader;
041    
042    /**
043     * Class PartitionTap can be used to write tuple streams out to files and sub-directories based on the values in the
044     * current {@link cascading.tuple.Tuple} instance.
045     * <p/>
046     * The constructor takes a {@link cascading.tap.hadoop.Hfs} {@link cascading.tap.Tap} and a {@link Partition}
047     * implementation. This allows Tuple values at given positions to be used as directory names during write
048     * operations, and directory names as data during read operations.
049     * <p/>
050     * The key value here is that there is no need to duplicate data values in the directory names and inside
051     * the data files.
052     * <p/>
053     * So only values declared in the parent Tap will be read or written to the underlying file system files. But
054     * fields declared by the {@link Partition} will only be read or written to the directory names. That is, the
055     * PartitionTap instance will sink or source the partition fields, plus the parent Tap fields. The partition
056     * fields and parent Tap fields do not need to have common field names.
057     * <p/>
058     * Note that Hadoop can only sink to directories, and all files in those directories are "part-xxxxx" files.
059     * <p/>
060     * {@code openWritesThreshold} limits the number of open files to be output to. This value defaults to 300 files.
061     * Each time the threshold is exceeded, 10% of the least recently used open files will be closed.
062     * <p/>
063     * PartitionTap will populate a given {@code partition} without regard to case of the values being used. Thus
064     * the resulting paths {@code 2012/June/} and {@code 2012/june/} will likely result in two open files into the same
065     * location. Forcing the case to be consistent with a custom Partition implementation or an upstream
066     * {@link cascading.operation.Function} is recommended, see {@link cascading.operation.expression.ExpressionFunction}.
067     * <p/>
068     * Though Hadoop has no mechanism to prevent simultaneous writes to a directory from multiple jobs, it doesn't mean
069     * its safe to do so. Same is true with the PartitionTap. Interleaving writes to a common parent (root) directory
070     * across multiple flows will very likely lead to data loss.
071     */
072    public class PartitionTap extends BasePartitionTap<JobConf, RecordReader, OutputCollector>
073      {
074      /**
075       * Constructor PartitionTap creates a new PartitionTap instance using the given parent {@link cascading.tap.hadoop.Hfs} Tap as the
076       * base path and default {@link cascading.scheme.Scheme}, and the partition.
077       *
078       * @param parent    of type Tap
079       * @param partition of type String
080       */
081      @ConstructorProperties({"parent", "partition"})
082      public PartitionTap( Hfs parent, Partition partition )
083        {
084        this( parent, partition, OPEN_WRITES_THRESHOLD_DEFAULT );
085        }
086    
087      /**
088       * Constructor PartitionTap creates a new PartitionTap instance using the given parent {@link cascading.tap.hadoop.Hfs} Tap as the
089       * base path and default {@link cascading.scheme.Scheme}, and the partition.
090       * <p/>
091       * {@code openWritesThreshold} limits the number of open files to be output to.
092       *
093       * @param parent              of type Hfs
094       * @param partition           of type String
095       * @param openWritesThreshold of type int
096       */
097      @ConstructorProperties({"parent", "partition", "openWritesThreshold"})
098      public PartitionTap( Hfs parent, Partition partition, int openWritesThreshold )
099        {
100        super( parent, partition, openWritesThreshold );
101        }
102    
103      /**
104       * Constructor PartitionTap creates a new PartitionTap instance using the given parent {@link cascading.tap.hadoop.Hfs} Tap as the
105       * base path and default {@link cascading.scheme.Scheme}, and the partition.
106       *
107       * @param parent    of type Tap
108       * @param partition of type String
109       * @param sinkMode  of type SinkMode
110       */
111      @ConstructorProperties({"parent", "partition", "sinkMode"})
112      public PartitionTap( Hfs parent, Partition partition, SinkMode sinkMode )
113        {
114        super( parent, partition, sinkMode );
115        }
116    
117      /**
118       * Constructor PartitionTap creates a new PartitionTap instance using the given parent {@link cascading.tap.hadoop.Hfs} Tap as the
119       * base path and default {@link cascading.scheme.Scheme}, and the partition.
120       * <p/>
121       * {@code keepParentOnDelete}, when set to true, prevents the parent Tap from being deleted when {@link #deleteResource(Object)}
122       * is called, typically an issue when used inside a {@link cascading.cascade.Cascade}.
123       *
124       * @param parent             of type Tap
125       * @param partition          of type String
126       * @param sinkMode           of type SinkMode
127       * @param keepParentOnDelete of type boolean
128       */
129      @ConstructorProperties({"parent", "partition", "sinkMode", "keepParentOnDelete"})
130      public PartitionTap( Hfs parent, Partition partition, SinkMode sinkMode, boolean keepParentOnDelete )
131        {
132        this( parent, partition, sinkMode, keepParentOnDelete, OPEN_WRITES_THRESHOLD_DEFAULT );
133        }
134    
135      /**
136       * Constructor PartitionTap creates a new PartitionTap instance using the given parent {@link cascading.tap.hadoop.Hfs} Tap as the
137       * base path and default {@link cascading.scheme.Scheme}, and the partition.
138       * <p/>
139       * {@code keepParentOnDelete}, when set to true, prevents the parent Tap from being deleted when {@link #deleteResource(Object)}
140       * is called, typically an issue when used inside a {@link cascading.cascade.Cascade}.
141       * <p/>
142       * {@code openWritesThreshold} limits the number of open files to be output to.
143       *
144       * @param parent              of type Tap
145       * @param partition           of type String
146       * @param sinkMode            of type SinkMode
147       * @param keepParentOnDelete  of type boolean
148       * @param openWritesThreshold of type int
149       */
150      @ConstructorProperties({"parent", "partition", "sinkMode", "keepParentOnDelete", "openWritesThreshold"})
151      public PartitionTap( Hfs parent, Partition partition, SinkMode sinkMode, boolean keepParentOnDelete, int openWritesThreshold )
152        {
153        super( parent, partition, sinkMode, keepParentOnDelete, openWritesThreshold );
154        }
155    
156      @Override
157      protected TupleEntrySchemeCollector createTupleEntrySchemeCollector( FlowProcess<JobConf> flowProcess, Tap parent, String path, long sequence ) throws IOException
158        {
159        TapOutputCollector outputCollector = new TapOutputCollector( flowProcess, parent, path, sequence );
160    
161        return new TupleEntrySchemeCollector<JobConf, OutputCollector>( flowProcess, parent, outputCollector );
162        }
163    
164      @Override
165      protected TupleEntrySchemeIterator createTupleEntrySchemeIterator( FlowProcess<JobConf> flowProcess, Tap parent, String path, RecordReader recordReader ) throws IOException
166        {
167        return new HadoopTupleEntrySchemeIterator( flowProcess, new Hfs( parent.getScheme(), path ), recordReader );
168        }
169    
170      @Override
171      protected String getCurrentIdentifier( FlowProcess<JobConf> flowProcess )
172        {
173        String identifier = flowProcess.getStringProperty( MultiInputSplit.CASCADING_SOURCE_PATH ); // set on current split
174    
175        if( identifier == null )
176          return null;
177    
178        return new Path( identifier ).getParent().toString(); // drop part-xxxx
179        }
180    
181      @Override
182      public void sourceConfInit( FlowProcess<JobConf> flowProcess, JobConf conf )
183        {
184        try
185          {
186          String[] childPartitions = getChildPartitionIdentifiers( flowProcess, true );
187    
188          ( (Hfs) getParent() ).applySourceConfInitIdentifiers( flowProcess, conf, childPartitions );
189          }
190        catch( IOException exception )
191          {
192          throw new TapException( "unable to retrieve child partitions", exception );
193          }
194        }
195      }