001 /* 002 * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.tap.hadoop; 022 023 import java.beans.ConstructorProperties; 024 import java.io.IOException; 025 026 import cascading.flow.FlowProcess; 027 import cascading.tap.SinkMode; 028 import cascading.tap.Tap; 029 import cascading.tap.TapException; 030 import cascading.tap.hadoop.io.HadoopTupleEntrySchemeIterator; 031 import cascading.tap.hadoop.io.MultiInputSplit; 032 import cascading.tap.hadoop.io.TapOutputCollector; 033 import cascading.tap.partition.BasePartitionTap; 034 import cascading.tap.partition.Partition; 035 import cascading.tuple.TupleEntrySchemeCollector; 036 import cascading.tuple.TupleEntrySchemeIterator; 037 import org.apache.hadoop.fs.Path; 038 import org.apache.hadoop.mapred.JobConf; 039 import org.apache.hadoop.mapred.OutputCollector; 040 import org.apache.hadoop.mapred.RecordReader; 041 042 /** 043 * Class PartitionTap can be used to write tuple streams out to files and sub-directories based on the values in the 044 * current {@link cascading.tuple.Tuple} instance. 045 * <p/> 046 * The constructor takes a {@link cascading.tap.hadoop.Hfs} {@link cascading.tap.Tap} and a {@link Partition} 047 * implementation. This allows Tuple values at given positions to be used as directory names during write 048 * operations, and directory names as data during read operations. 049 * <p/> 050 * The key value here is that there is no need to duplicate data values in the directory names and inside 051 * the data files. 052 * <p/> 053 * So only values declared in the parent Tap will be read or written to the underlying file system files. But 054 * fields declared by the {@link Partition} will only be read or written to the directory names. That is, the 055 * PartitionTap instance will sink or source the partition fields, plus the parent Tap fields. The partition 056 * fields and parent Tap fields do not need to have common field names. 057 * <p/> 058 * Note that Hadoop can only sink to directories, and all files in those directories are "part-xxxxx" files. 059 * <p/> 060 * {@code openWritesThreshold} limits the number of open files to be output to. This value defaults to 300 files. 061 * Each time the threshold is exceeded, 10% of the least recently used open files will be closed. 062 * <p/> 063 * PartitionTap will populate a given {@code partition} without regard to case of the values being used. Thus 064 * the resulting paths {@code 2012/June/} and {@code 2012/june/} will likely result in two open files into the same 065 * location. Forcing the case to be consistent with a custom Partition implementation or an upstream 066 * {@link cascading.operation.Function} is recommended, see {@link cascading.operation.expression.ExpressionFunction}. 067 * <p/> 068 * Though Hadoop has no mechanism to prevent simultaneous writes to a directory from multiple jobs, it doesn't mean 069 * its safe to do so. Same is true with the PartitionTap. Interleaving writes to a common parent (root) directory 070 * across multiple flows will very likely lead to data loss. 071 */ 072 public class PartitionTap extends BasePartitionTap<JobConf, RecordReader, OutputCollector> 073 { 074 /** 075 * Constructor PartitionTap creates a new PartitionTap instance using the given parent {@link cascading.tap.hadoop.Hfs} Tap as the 076 * base path and default {@link cascading.scheme.Scheme}, and the partition. 077 * 078 * @param parent of type Tap 079 * @param partition of type String 080 */ 081 @ConstructorProperties({"parent", "partition"}) 082 public PartitionTap( Hfs parent, Partition partition ) 083 { 084 this( parent, partition, OPEN_WRITES_THRESHOLD_DEFAULT ); 085 } 086 087 /** 088 * Constructor PartitionTap creates a new PartitionTap instance using the given parent {@link cascading.tap.hadoop.Hfs} Tap as the 089 * base path and default {@link cascading.scheme.Scheme}, and the partition. 090 * <p/> 091 * {@code openWritesThreshold} limits the number of open files to be output to. 092 * 093 * @param parent of type Hfs 094 * @param partition of type String 095 * @param openWritesThreshold of type int 096 */ 097 @ConstructorProperties({"parent", "partition", "openWritesThreshold"}) 098 public PartitionTap( Hfs parent, Partition partition, int openWritesThreshold ) 099 { 100 super( parent, partition, openWritesThreshold ); 101 } 102 103 /** 104 * Constructor PartitionTap creates a new PartitionTap instance using the given parent {@link cascading.tap.hadoop.Hfs} Tap as the 105 * base path and default {@link cascading.scheme.Scheme}, and the partition. 106 * 107 * @param parent of type Tap 108 * @param partition of type String 109 * @param sinkMode of type SinkMode 110 */ 111 @ConstructorProperties({"parent", "partition", "sinkMode"}) 112 public PartitionTap( Hfs parent, Partition partition, SinkMode sinkMode ) 113 { 114 super( parent, partition, sinkMode ); 115 } 116 117 /** 118 * Constructor PartitionTap creates a new PartitionTap instance using the given parent {@link cascading.tap.hadoop.Hfs} Tap as the 119 * base path and default {@link cascading.scheme.Scheme}, and the partition. 120 * <p/> 121 * {@code keepParentOnDelete}, when set to true, prevents the parent Tap from being deleted when {@link #deleteResource(Object)} 122 * is called, typically an issue when used inside a {@link cascading.cascade.Cascade}. 123 * 124 * @param parent of type Tap 125 * @param partition of type String 126 * @param sinkMode of type SinkMode 127 * @param keepParentOnDelete of type boolean 128 */ 129 @ConstructorProperties({"parent", "partition", "sinkMode", "keepParentOnDelete"}) 130 public PartitionTap( Hfs parent, Partition partition, SinkMode sinkMode, boolean keepParentOnDelete ) 131 { 132 this( parent, partition, sinkMode, keepParentOnDelete, OPEN_WRITES_THRESHOLD_DEFAULT ); 133 } 134 135 /** 136 * Constructor PartitionTap creates a new PartitionTap instance using the given parent {@link cascading.tap.hadoop.Hfs} Tap as the 137 * base path and default {@link cascading.scheme.Scheme}, and the partition. 138 * <p/> 139 * {@code keepParentOnDelete}, when set to true, prevents the parent Tap from being deleted when {@link #deleteResource(Object)} 140 * is called, typically an issue when used inside a {@link cascading.cascade.Cascade}. 141 * <p/> 142 * {@code openWritesThreshold} limits the number of open files to be output to. 143 * 144 * @param parent of type Tap 145 * @param partition of type String 146 * @param sinkMode of type SinkMode 147 * @param keepParentOnDelete of type boolean 148 * @param openWritesThreshold of type int 149 */ 150 @ConstructorProperties({"parent", "partition", "sinkMode", "keepParentOnDelete", "openWritesThreshold"}) 151 public PartitionTap( Hfs parent, Partition partition, SinkMode sinkMode, boolean keepParentOnDelete, int openWritesThreshold ) 152 { 153 super( parent, partition, sinkMode, keepParentOnDelete, openWritesThreshold ); 154 } 155 156 @Override 157 protected TupleEntrySchemeCollector createTupleEntrySchemeCollector( FlowProcess<JobConf> flowProcess, Tap parent, String path, long sequence ) throws IOException 158 { 159 TapOutputCollector outputCollector = new TapOutputCollector( flowProcess, parent, path, sequence ); 160 161 return new TupleEntrySchemeCollector<JobConf, OutputCollector>( flowProcess, parent, outputCollector ); 162 } 163 164 @Override 165 protected TupleEntrySchemeIterator createTupleEntrySchemeIterator( FlowProcess<JobConf> flowProcess, Tap parent, String path, RecordReader recordReader ) throws IOException 166 { 167 return new HadoopTupleEntrySchemeIterator( flowProcess, new Hfs( parent.getScheme(), path ), recordReader ); 168 } 169 170 @Override 171 protected String getCurrentIdentifier( FlowProcess<JobConf> flowProcess ) 172 { 173 String identifier = flowProcess.getStringProperty( MultiInputSplit.CASCADING_SOURCE_PATH ); // set on current split 174 175 if( identifier == null ) 176 return null; 177 178 return new Path( identifier ).getParent().toString(); // drop part-xxxx 179 } 180 181 @Override 182 public void sourceConfInit( FlowProcess<JobConf> flowProcess, JobConf conf ) 183 { 184 try 185 { 186 String[] childPartitions = getChildPartitionIdentifiers( flowProcess, true ); 187 188 ( (Hfs) getParent() ).applySourceConfInitIdentifiers( flowProcess, conf, childPartitions ); 189 } 190 catch( IOException exception ) 191 { 192 throw new TapException( "unable to retrieve child partitions", exception ); 193 } 194 } 195 }