001/* 002 * Copyright (c) 2016-2017 Chris K Wensel <chris@wensel.net>. All Rights Reserved. 003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 004 * 005 * Project and contact information: http://www.cascading.org/ 006 * 007 * This file is part of the Cascading project. 008 * 009 * Licensed under the Apache License, Version 2.0 (the "License"); 010 * you may not use this file except in compliance with the License. 011 * You may obtain a copy of the License at 012 * 013 * http://www.apache.org/licenses/LICENSE-2.0 014 * 015 * Unless required by applicable law or agreed to in writing, software 016 * distributed under the License is distributed on an "AS IS" BASIS, 017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 018 * See the License for the specific language governing permissions and 019 * limitations under the License. 020 */ 021 022package cascading.tap.hadoop; 023 024import java.beans.ConstructorProperties; 025import java.io.IOException; 026import java.util.ArrayList; 027import java.util.Iterator; 028import java.util.List; 029 030import cascading.flow.FlowProcess; 031import cascading.tap.SinkMode; 032import cascading.tap.Tap; 033import cascading.tap.TapException; 034import cascading.tap.hadoop.io.CombineInputPartitionTupleEntryIterator; 035import cascading.tap.hadoop.io.HadoopTupleEntrySchemeIterator; 036import cascading.tap.hadoop.io.TapOutputCollector; 037import cascading.tap.hadoop.util.Hadoop18TapUtil; 038import cascading.tap.partition.BasePartitionTap; 039import cascading.tap.partition.Partition; 040import cascading.tap.type.FileType; 041import cascading.tuple.Tuple; 042import cascading.tuple.TupleEntryIterableChainIterator; 043import cascading.tuple.TupleEntryIterator; 044import cascading.tuple.TupleEntrySchemeCollector; 045import cascading.tuple.TupleEntrySchemeIterator; 046import org.apache.hadoop.conf.Configuration; 047import org.apache.hadoop.fs.Path; 048import org.apache.hadoop.mapred.OutputCollector; 049import org.apache.hadoop.mapred.RecordReader; 050 051/** 052 * Class PartitionTap can be used to write tuple streams out to files and sub-directories based on the values in the 053 * current {@link cascading.tuple.Tuple} instance. 054 * <p> 055 * The constructor takes a {@link cascading.tap.hadoop.Hfs} {@link cascading.tap.Tap} and a {@link Partition} 056 * implementation. This allows Tuple values at given positions to be used as directory names during write 057 * operations, and directory names as data during read operations. 058 * <p> 059 * The key value here is that there is no need to duplicate data values in the directory names and inside 060 * the data files. 061 * <p> 062 * So only values declared in the parent Tap will be read or written to the underlying file system files. But 063 * fields declared by the {@link Partition} will only be read or written to the directory names. That is, the 064 * PartitionTap instance will sink or source the partition fields, plus the parent Tap fields. The partition 065 * fields and parent Tap fields do not need to have common field names. 066 * <p> 067 * Note that Hadoop can only sink to directories, and all files in those directories are "part-xxxxx" files. 068 * <p> 069 * {@code openWritesThreshold} limits the number of open files to be output to. This value defaults to 300 files. 070 * Each time the threshold is exceeded, 10% of the least recently used open files will be closed. 071 * <p> 072 * PartitionTap will populate a given {@code partition} without regard to case of the values being used. Thus 073 * the resulting paths {@code 2012/June/} and {@code 2012/june/} will likely result in two open files into the same 074 * location. Forcing the case to be consistent with a custom Partition implementation or an upstream 075 * {@link cascading.operation.Function} is recommended, see {@link cascading.operation.expression.ExpressionFunction}. 076 * <p> 077 * Though Hadoop has no mechanism to prevent simultaneous writes to a directory from multiple jobs, it doesn't mean 078 * its safe to do so. Same is true with the PartitionTap. Interleaving writes to a common parent (root) directory 079 * across multiple flows will very likely lead to data loss. 080 */ 081public class PartitionTap extends BasePartitionTap<Configuration, RecordReader, OutputCollector> 082 { 083 /** 084 * Constructor PartitionTap creates a new PartitionTap instance using the given parent {@link cascading.tap.hadoop.Hfs} Tap as the 085 * base path and default {@link cascading.scheme.Scheme}, and the partition. 086 * 087 * @param parent of type Tap 088 * @param partition of type Partition 089 */ 090 @ConstructorProperties({"parent", "partition"}) 091 public PartitionTap( Hfs parent, Partition partition ) 092 { 093 this( parent, partition, OPEN_WRITES_THRESHOLD_DEFAULT ); 094 } 095 096 /** 097 * Constructor PartitionTap creates a new PartitionTap instance using the given parent {@link cascading.tap.hadoop.Hfs} Tap as the 098 * base path and default {@link cascading.scheme.Scheme}, and the partition. 099 * <p> 100 * {@code openWritesThreshold} limits the number of open files to be output to. 101 * 102 * @param parent of type Hfs 103 * @param partition of type Partition 104 * @param openWritesThreshold of type int 105 */ 106 @ConstructorProperties({"parent", "partition", "openWritesThreshold"}) 107 public PartitionTap( Hfs parent, Partition partition, int openWritesThreshold ) 108 { 109 super( parent, partition, openWritesThreshold ); 110 } 111 112 /** 113 * Constructor PartitionTap creates a new PartitionTap instance using the given parent {@link cascading.tap.hadoop.Hfs} Tap as the 114 * base path and default {@link cascading.scheme.Scheme}, and the partition. 115 * 116 * @param parent of type Tap 117 * @param partition of type String 118 * @param sinkMode of type SinkMode 119 */ 120 @ConstructorProperties({"parent", "partition", "sinkMode"}) 121 public PartitionTap( Hfs parent, Partition partition, SinkMode sinkMode ) 122 { 123 super( parent, partition, sinkMode ); 124 } 125 126 /** 127 * Constructor PartitionTap creates a new PartitionTap instance using the given parent {@link cascading.tap.hadoop.Hfs} Tap as the 128 * base path and default {@link cascading.scheme.Scheme}, and the partition. 129 * <p> 130 * {@code keepParentOnDelete}, when set to true, prevents the parent Tap from being deleted when {@link #deleteResource(Object)} 131 * is called, typically an issue when used inside a {@link cascading.cascade.Cascade}. 132 * 133 * @param parent of type Tap 134 * @param partition of type Partition 135 * @param sinkMode of type SinkMode 136 * @param keepParentOnDelete of type boolean 137 */ 138 @ConstructorProperties({"parent", "partition", "sinkMode", "keepParentOnDelete"}) 139 public PartitionTap( Hfs parent, Partition partition, SinkMode sinkMode, boolean keepParentOnDelete ) 140 { 141 this( parent, partition, sinkMode, keepParentOnDelete, OPEN_WRITES_THRESHOLD_DEFAULT ); 142 } 143 144 /** 145 * Constructor PartitionTap creates a new PartitionTap instance using the given parent {@link cascading.tap.hadoop.Hfs} Tap as the 146 * base path and default {@link cascading.scheme.Scheme}, and the partition. 147 * <p> 148 * {@code keepParentOnDelete}, when set to true, prevents the parent Tap from being deleted when {@link #deleteResource(Object)} 149 * is called, typically an issue when used inside a {@link cascading.cascade.Cascade}. 150 * <p> 151 * {@code openWritesThreshold} limits the number of open files to be output to. 152 * 153 * @param parent of type Tap 154 * @param partition of type Partition 155 * @param sinkMode of type SinkMode 156 * @param keepParentOnDelete of type boolean 157 * @param openWritesThreshold of type int 158 */ 159 @ConstructorProperties({"parent", "partition", "sinkMode", "keepParentOnDelete", "openWritesThreshold"}) 160 public PartitionTap( Hfs parent, Partition partition, SinkMode sinkMode, boolean keepParentOnDelete, int openWritesThreshold ) 161 { 162 super( parent, partition, sinkMode, keepParentOnDelete, openWritesThreshold ); 163 } 164 165 @Override 166 protected TupleEntrySchemeCollector createTupleEntrySchemeCollector( FlowProcess<? extends Configuration> flowProcess, Tap parent, String path, long sequence ) throws IOException 167 { 168 TapOutputCollector outputCollector = new TapOutputCollector( flowProcess, parent, path, sequence ); 169 170 return new TupleEntrySchemeCollector<Configuration, OutputCollector>( flowProcess, parent, outputCollector ); 171 } 172 173 @Override 174 protected TupleEntrySchemeIterator createTupleEntrySchemeIterator( FlowProcess<? extends Configuration> flowProcess, Tap parent, String path, RecordReader recordReader ) throws IOException 175 { 176 return new HadoopTupleEntrySchemeIterator( flowProcess, new Hfs( parent.getScheme(), path ), recordReader ); 177 } 178 179 @Override 180 protected String getCurrentIdentifier( FlowProcess<? extends Configuration> flowProcess ) 181 { 182 String identifier = flowProcess.getStringProperty( FileType.CASCADING_SOURCE_PATH ); // set on current split 183 184 if( identifier == null ) 185 { 186 if( flowProcess.getBooleanProperty( HfsProps.COMBINE_INPUT_FILES, false ) ) 187 throw new TapException( "combined input format support, via '" + HfsProps.COMBINE_INPUT_FILES + "', may not be enabled for use with the PartitionTap" ); 188 189 throw new TapException( "unable to retrieve the current file being processed, '" + FileType.CASCADING_SOURCE_PATH + "' is not set" ); 190 } 191 192 return new Path( identifier ).getParent().toString(); // drop part-xxxx 193 } 194 195 @Override 196 public void sourceConfInit( FlowProcess<? extends Configuration> flowProcess, Configuration conf ) 197 { 198 try 199 { 200 String[] childPartitions = getChildPartitionIdentifiers( flowProcess, true ); 201 202 ( (Hfs) getParent() ).applySourceConfInitIdentifiers( flowProcess, conf, childPartitions ); 203 } 204 catch( IOException exception ) 205 { 206 throw new TapException( "unable to retrieve child partitions", exception ); 207 } 208 } 209 210 @Override 211 public TupleEntryIterator openForRead( FlowProcess<? extends Configuration> flowProcess, RecordReader input ) throws IOException 212 { 213 if( flowProcess.getBooleanProperty( HfsProps.COMBINE_INPUT_FILES, false ) ) 214 return new CombinePartitionIterator( flowProcess, input ); 215 216 return super.openForRead( flowProcess, input ); 217 } 218 219 private class CombinePartitionIterator extends TupleEntryIterableChainIterator 220 { 221 public CombinePartitionIterator( final FlowProcess<? extends Configuration> flowProcess, RecordReader input ) throws IOException 222 { 223 super( getSourceFields() ); 224 225 List<Iterator<Tuple>> iterators = new ArrayList<Iterator<Tuple>>(); 226 227 if( input == null ) 228 throw new IOException( "input cannot be null" ); 229 230 String identifier = parent.getFullIdentifier( flowProcess ); 231 232 iterators.add( createPartitionEntryIterator( flowProcess, input, identifier ) ); 233 234 reset( iterators ); 235 } 236 237 private CombineInputPartitionTupleEntryIterator createPartitionEntryIterator( FlowProcess<? extends Configuration> flowProcess, RecordReader input, String parentIdentifier ) throws IOException 238 { 239 TupleEntrySchemeIterator schemeIterator = createTupleEntrySchemeIterator( flowProcess, parent, null, input ); 240 241 return new CombineInputPartitionTupleEntryIterator( flowProcess, getSourceFields(), partition, parentIdentifier, schemeIterator ); 242 } 243 } 244 245 @Override 246 public boolean commitResource( Configuration conf ) throws IOException 247 { 248 Hadoop18TapUtil.writeSuccessMarker( conf, ( (Hfs) parent ).getPath() ); 249 250 return super.commitResource( conf ); 251 } 252 }