001 /* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.tap.local; 022 023 import java.beans.ConstructorProperties; 024 import java.io.File; 025 import java.io.FileInputStream; 026 import java.io.FileNotFoundException; 027 import java.io.IOException; 028 import java.io.InputStream; 029 import java.io.OutputStream; 030 import java.util.Properties; 031 032 import cascading.flow.FlowProcess; 033 import cascading.tap.SinkMode; 034 import cascading.tap.Tap; 035 import cascading.tap.local.io.TapFileOutputStream; 036 import cascading.tap.partition.BasePartitionTap; 037 import cascading.tap.partition.Partition; 038 import cascading.tuple.TupleEntrySchemeCollector; 039 import cascading.tuple.TupleEntrySchemeIterator; 040 041 /** 042 * Class PartitionTap can be used to write tuple streams out to files and sub-directories based on the values in the 043 * current {@link cascading.tuple.Tuple} instance. 044 * <p/> 045 * The constructor takes a {@link cascading.tap.local.FileTap} {@link cascading.tap.Tap} and a {@link Partition} 046 * implementation. This allows Tuple values at given positions to be used as directory names during write 047 * operations, and directory names as data during read operations. 048 * <p/> 049 * The key value here is that there is no need to duplicate data values in the directory names and inside 050 * the data files. 051 * <p/> 052 * So only values declared in the parent Tap will be read or written to the underlying file system files. But 053 * fields declared by the {@link Partition} will only be read or written to the directory names. That is, the 054 * PartitionTap instance will sink or source the partition fields, plus the parent Tap fields. The partition 055 * fields and parent Tap fields do not need to have common field names. 056 * <p/> 057 * {@code openWritesThreshold} limits the number of open files to be output to. This value defaults to 300 files. 058 * Each time the threshold is exceeded, 10% of the least recently used open files will be closed. 059 * <p/> 060 * PartitionTap will populate a given {@code partition} without regard to case of the values being used. Thus 061 * the resulting paths {@code 2012/June/} and {@code 2012/june/} will likely result in two open files into the same 062 * location. Forcing the case to be consistent with a custom Partition implementation or an upstream 063 * {@link cascading.operation.Function} is recommended, see {@link cascading.operation.expression.ExpressionFunction}. 064 */ 065 public class PartitionTap extends BasePartitionTap<Properties, InputStream, OutputStream> 066 { 067 /** 068 * Constructor PartitionTap creates a new PartitionTap instance using the given parent {@link cascading.tap.local.FileTap} Tap as the 069 * base path and default {@link cascading.scheme.Scheme}, and the partition. 070 * 071 * @param parent of type Tap 072 * @param partition of type String 073 */ 074 @ConstructorProperties({"parent", "partition"}) 075 public PartitionTap( FileTap parent, Partition partition ) 076 { 077 this( parent, partition, OPEN_WRITES_THRESHOLD_DEFAULT ); 078 } 079 080 /** 081 * Constructor PartitionTap creates a new PartitionTap instance using the given parent {@link cascading.tap.local.FileTap} Tap as the 082 * base path and default {@link cascading.scheme.Scheme}, and the partition. 083 * <p/> 084 * {@code openWritesThreshold} limits the number of open files to be output to. 085 * 086 * @param parent of type Hfs 087 * @param partition of type String 088 * @param openWritesThreshold of type int 089 */ 090 @ConstructorProperties({"parent", "partition", "openWritesThreshold"}) 091 public PartitionTap( FileTap parent, Partition partition, int openWritesThreshold ) 092 { 093 super( parent, partition, openWritesThreshold ); 094 } 095 096 /** 097 * Constructor PartitionTap creates a new PartitionTap instance using the given parent {@link cascading.tap.local.FileTap} Tap as the 098 * base path and default {@link cascading.scheme.Scheme}, and the partition. 099 * 100 * @param parent of type Tap 101 * @param partition of type String 102 * @param sinkMode of type SinkMode 103 */ 104 @ConstructorProperties({"parent", "partition", "sinkMode"}) 105 public PartitionTap( FileTap parent, Partition partition, SinkMode sinkMode ) 106 { 107 super( parent, partition, sinkMode ); 108 } 109 110 /** 111 * Constructor PartitionTap creates a new PartitionTap instance using the given parent {@link cascading.tap.local.FileTap} Tap as the 112 * base path and default {@link cascading.scheme.Scheme}, and the partition. 113 * <p/> 114 * {@code keepParentOnDelete}, when set to true, prevents the parent Tap from being deleted when {@link #deleteResource(Object)} 115 * is called, typically an issue when used inside a {@link cascading.cascade.Cascade}. 116 * 117 * @param parent of type Tap 118 * @param partition of type String 119 * @param sinkMode of type SinkMode 120 * @param keepParentOnDelete of type boolean 121 */ 122 @ConstructorProperties({"parent", "partition", "sinkMode", "keepParentOnDelete"}) 123 public PartitionTap( FileTap parent, Partition partition, SinkMode sinkMode, boolean keepParentOnDelete ) 124 { 125 this( parent, partition, sinkMode, keepParentOnDelete, OPEN_WRITES_THRESHOLD_DEFAULT ); 126 } 127 128 /** 129 * Constructor PartitionTap creates a new PartitionTap instance using the given parent {@link cascading.tap.local.FileTap} Tap as the 130 * base path and default {@link cascading.scheme.Scheme}, and the partition. 131 * <p/> 132 * {@code keepParentOnDelete}, when set to true, prevents the parent Tap from being deleted when {@link #deleteResource(Object)} 133 * is called, typically an issue when used inside a {@link cascading.cascade.Cascade}. 134 * <p/> 135 * {@code openWritesThreshold} limits the number of open files to be output to. 136 * 137 * @param parent of type Tap 138 * @param partition of type String 139 * @param sinkMode of type SinkMode 140 * @param keepParentOnDelete of type boolean 141 * @param openWritesThreshold of type int 142 */ 143 @ConstructorProperties({"parent", "partition", "sinkMode", "keepParentOnDelete", "openWritesThreshold"}) 144 public PartitionTap( FileTap parent, Partition partition, SinkMode sinkMode, boolean keepParentOnDelete, int openWritesThreshold ) 145 { 146 super( parent, partition, sinkMode, keepParentOnDelete, openWritesThreshold ); 147 } 148 149 @Override 150 protected String getCurrentIdentifier( FlowProcess<Properties> flowProcess ) 151 { 152 return null; 153 } 154 155 @Override 156 public boolean deleteResource( Properties conf ) throws IOException 157 { 158 String[] childIdentifiers = ( (FileTap) parent ).getChildIdentifiers( conf, Integer.MAX_VALUE, false ); 159 160 if( childIdentifiers.length == 0 ) 161 return true; 162 163 boolean result = false; 164 165 for( String childIdentifier : childIdentifiers ) 166 result |= new File( childIdentifier ).delete(); 167 168 return result; 169 } 170 171 @Override 172 protected TupleEntrySchemeCollector createTupleEntrySchemeCollector( FlowProcess<Properties> flowProcess, Tap parent, String path, long sequence ) throws IOException 173 { 174 TapFileOutputStream output = new TapFileOutputStream( parent, path, true ); // always append 175 176 return new TupleEntrySchemeCollector<Properties, OutputStream>( flowProcess, parent, output ); 177 } 178 179 @Override 180 protected TupleEntrySchemeIterator createTupleEntrySchemeIterator( FlowProcess<Properties> flowProcess, Tap parent, String path, InputStream input ) throws FileNotFoundException 181 { 182 if( input == null ) 183 input = new FileInputStream( path ); 184 185 return new TupleEntrySchemeIterator( flowProcess, parent.getScheme(), input, path ); 186 } 187 }