001/* 002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.tap.local; 022 023import java.io.File; 024import java.io.FileInputStream; 025import java.io.IOException; 026import java.io.InputStream; 027import java.io.OutputStream; 028import java.util.LinkedHashSet; 029import java.util.Properties; 030import java.util.Set; 031 032import cascading.flow.FlowProcess; 033import cascading.scheme.Scheme; 034import cascading.tap.SinkMode; 035import cascading.tap.Tap; 036import cascading.tap.local.io.TapFileOutputStream; 037import cascading.tap.type.FileType; 038import cascading.tuple.TupleEntryCollector; 039import cascading.tuple.TupleEntryIterator; 040import cascading.tuple.TupleEntrySchemeCollector; 041import cascading.tuple.TupleEntrySchemeIterator; 042 043/** 044 * Class FileTap is a {@link Tap} sub-class that allows for direct local file access. 045 * <p/> 046 * FileTap must be used with the {@link cascading.flow.local.LocalFlowConnector} to create 047 * {@link cascading.flow.Flow} instances that run in "local" mode. 048 */ 049public class FileTap extends Tap<Properties, InputStream, OutputStream> implements FileType<Properties> 050 { 051 private final String path; 052 053 /** 054 * Constructor FileTap creates a new FileTap instance using the given {@link cascading.scheme.Scheme} and file {@code path}. 055 * 056 * @param scheme of type LocalScheme 057 * @param path of type String 058 */ 059 public FileTap( Scheme<Properties, InputStream, OutputStream, ?, ?> scheme, String path ) 060 { 061 this( scheme, path, SinkMode.KEEP ); 062 } 063 064 /** 065 * Constructor FileTap creates a new FileTap instance using the given {@link cascading.scheme.Scheme}, 066 * file {@code path}, and {@code SinkMode}. 067 * 068 * @param scheme of type LocalScheme 069 * @param path of type String 070 * @param sinkMode of type SinkMode 071 */ 072 public FileTap( Scheme<Properties, InputStream, OutputStream, ?, ?> scheme, String path, SinkMode sinkMode ) 073 { 074 super( scheme, sinkMode ); 075 this.path = new File( path ).getPath(); // cleans path information 076 } 077 078 @Override 079 public String getIdentifier() 080 { 081 return path; 082 } 083 084 @Override 085 public String getFullIdentifier( Properties conf ) 086 { 087 return fullyQualifyIdentifier( getIdentifier() ); 088 } 089 090 private String fullyQualifyIdentifier( String identifier ) 091 { 092 return new File( identifier ).getAbsoluteFile().toURI().toString(); 093 } 094 095 @Override 096 public TupleEntryIterator openForRead( FlowProcess<? extends Properties> flowProcess, InputStream input ) throws IOException 097 { 098 if( input == null ) 099 input = new FileInputStream( getIdentifier() ); 100 101 return new TupleEntrySchemeIterator<Properties, InputStream>( flowProcess, getScheme(), input, getIdentifier() ); 102 } 103 104 @Override 105 public TupleEntryCollector openForWrite( FlowProcess<? extends Properties> flowProcess, OutputStream output ) throws IOException 106 { 107 if( output == null ) 108 output = new TapFileOutputStream( getIdentifier(), isUpdate() ); // append if we are in update mode 109 110 return new TupleEntrySchemeCollector<Properties, OutputStream>( flowProcess, getScheme(), output, getIdentifier() ); 111 } 112 113 @Override 114 public boolean createResource( Properties conf ) throws IOException 115 { 116 File parentFile = new File( getIdentifier() ).getParentFile(); 117 118 return parentFile.exists() || parentFile.mkdirs(); 119 } 120 121 @Override 122 public boolean deleteResource( Properties conf ) throws IOException 123 { 124 return new File( getIdentifier() ).delete(); 125 } 126 127 @Override 128 public boolean commitResource( Properties conf ) throws IOException 129 { 130 return true; 131 } 132 133 @Override 134 public boolean resourceExists( Properties conf ) throws IOException 135 { 136 return new File( getIdentifier() ).exists(); 137 } 138 139 @Override 140 public long getModifiedTime( Properties conf ) throws IOException 141 { 142 return new File( getIdentifier() ).lastModified(); 143 } 144 145 @Override 146 public boolean isDirectory( FlowProcess<? extends Properties> flowProcess ) throws IOException 147 { 148 return isDirectory( flowProcess.getConfig() ); 149 } 150 151 @Override 152 public boolean isDirectory( Properties conf ) throws IOException 153 { 154 return new File( getIdentifier() ).isDirectory(); 155 } 156 157 @Override 158 public String[] getChildIdentifiers( FlowProcess<? extends Properties> flowProcess ) throws IOException 159 { 160 return getChildIdentifiers( flowProcess.getConfig() ); 161 } 162 163 @Override 164 public String[] getChildIdentifiers( Properties conf ) throws IOException 165 { 166 return getChildIdentifiers( conf, 1, false ); 167 } 168 169 @Override 170 public String[] getChildIdentifiers( FlowProcess<? extends Properties> flowProcess, int depth, boolean fullyQualified ) throws IOException 171 { 172 return getChildIdentifiers( flowProcess.getConfig(), depth, fullyQualified ); 173 } 174 175 @Override 176 public String[] getChildIdentifiers( Properties conf, int depth, boolean fullyQualified ) throws IOException 177 { 178 if( !resourceExists( conf ) ) 179 return new String[ 0 ]; 180 181 Set<String> results = new LinkedHashSet<String>(); 182 183 getChildPaths( results, getIdentifier(), depth ); 184 185 String[] allPaths = results.toArray( new String[ results.size() ] ); 186 187 if( !fullyQualified ) 188 return allPaths; 189 190 for( int i = 0; i < allPaths.length; i++ ) 191 allPaths[ i ] = fullyQualifyIdentifier( allPaths[ i ] ); 192 193 return allPaths; 194 } 195 196 @Override 197 public long getSize( FlowProcess<? extends Properties> flowProcess ) throws IOException 198 { 199 return getSize( flowProcess.getConfig() ); 200 } 201 202 @Override 203 public long getSize( Properties conf ) throws IOException 204 { 205 File file = new File( getIdentifier() ); 206 207 if( file.isDirectory() ) 208 return 0; 209 210 return file.length(); 211 } 212 213 private boolean getChildPaths( Set<String> results, String identifier, int depth ) 214 { 215 File file = new File( identifier ); 216 217 if( depth == 0 || file.isFile() ) 218 { 219 results.add( identifier ); 220 return true; 221 } 222 223 String[] paths = file.list(); 224 225 if( paths == null ) 226 return false; 227 228 boolean result = false; 229 230 for( String path : paths ) 231 result |= getChildPaths( results, new File( file, path ).getPath(), depth - 1 ); 232 233 return result; 234 } 235 }