001 /* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.tap.local; 022 023 import java.io.File; 024 import java.io.FileInputStream; 025 import java.io.IOException; 026 import java.io.InputStream; 027 import java.io.OutputStream; 028 import java.util.LinkedHashSet; 029 import java.util.Properties; 030 import java.util.Set; 031 032 import cascading.flow.FlowProcess; 033 import cascading.scheme.Scheme; 034 import cascading.tap.SinkMode; 035 import cascading.tap.Tap; 036 import cascading.tap.local.io.TapFileOutputStream; 037 import cascading.tap.type.FileType; 038 import cascading.tuple.TupleEntryCollector; 039 import cascading.tuple.TupleEntryIterator; 040 import cascading.tuple.TupleEntrySchemeCollector; 041 import cascading.tuple.TupleEntrySchemeIterator; 042 043 /** 044 * Class FileTap is a {@link Tap} sub-class that allows for direct local file access. 045 * <p/> 046 * FileTap must be used with the {@link cascading.flow.local.LocalFlowConnector} to create 047 * {@link cascading.flow.Flow} instances that run in "local" mode. 048 */ 049 public class FileTap extends Tap<Properties, InputStream, OutputStream> implements FileType<Properties> 050 { 051 private final String path; 052 053 /** 054 * Constructor FileTap creates a new FileTap instance using the given {@link cascading.scheme.Scheme} and file {@code path}. 055 * 056 * @param scheme of type LocalScheme 057 * @param path of type String 058 */ 059 public FileTap( Scheme<Properties, InputStream, OutputStream, ?, ?> scheme, String path ) 060 { 061 this( scheme, path, SinkMode.KEEP ); 062 } 063 064 /** 065 * Constructor FileTap creates a new FileTap instance using the given {@link cascading.scheme.Scheme}, 066 * file {@code path}, and {@code SinkMode}. 067 * 068 * @param scheme of type LocalScheme 069 * @param path of type String 070 * @param sinkMode of type SinkMode 071 */ 072 public FileTap( Scheme<Properties, InputStream, OutputStream, ?, ?> scheme, String path, SinkMode sinkMode ) 073 { 074 super( scheme, sinkMode ); 075 this.path = new File( path ).getPath(); // cleans path information 076 } 077 078 @Override 079 public String getIdentifier() 080 { 081 return path; 082 } 083 084 @Override 085 public String getFullIdentifier( Properties conf ) 086 { 087 return fullyQualifyIdentifier( getIdentifier() ); 088 } 089 090 private String fullyQualifyIdentifier( String identifier ) 091 { 092 return new File( identifier ).getAbsoluteFile().toURI().toString(); 093 } 094 095 @Override 096 public TupleEntryIterator openForRead( FlowProcess<Properties> flowProcess, InputStream input ) throws IOException 097 { 098 if( input == null ) 099 input = new FileInputStream( getIdentifier() ); 100 101 return new TupleEntrySchemeIterator<Properties, InputStream>( flowProcess, getScheme(), input, getIdentifier() ); 102 } 103 104 @Override 105 public TupleEntryCollector openForWrite( FlowProcess<Properties> flowProcess, OutputStream output ) throws IOException 106 { 107 if( output == null ) 108 output = new TapFileOutputStream( getIdentifier(), isUpdate() ); // append if we are in update mode 109 110 return new TupleEntrySchemeCollector<Properties, OutputStream>( flowProcess, getScheme(), output, getIdentifier() ); 111 } 112 113 @Override 114 public long getSize( Properties conf ) throws IOException 115 { 116 File file = new File( getIdentifier() ); 117 118 if( file.isDirectory() ) 119 return 0; 120 121 return file.length(); 122 } 123 124 @Override 125 public boolean createResource( Properties conf ) throws IOException 126 { 127 File parentFile = new File( getIdentifier() ).getParentFile(); 128 129 return parentFile.exists() || parentFile.mkdirs(); 130 } 131 132 @Override 133 public boolean deleteResource( Properties conf ) throws IOException 134 { 135 return new File( getIdentifier() ).delete(); 136 } 137 138 @Override 139 public boolean commitResource( Properties conf ) throws IOException 140 { 141 return true; 142 } 143 144 @Override 145 public boolean resourceExists( Properties conf ) throws IOException 146 { 147 return new File( getIdentifier() ).exists(); 148 } 149 150 @Override 151 public long getModifiedTime( Properties conf ) throws IOException 152 { 153 return new File( getIdentifier() ).lastModified(); 154 } 155 156 @Override 157 public boolean isDirectory( Properties conf ) throws IOException 158 { 159 return new File( getIdentifier() ).isDirectory(); 160 } 161 162 @Override 163 public String[] getChildIdentifiers( Properties conf ) throws IOException 164 { 165 return getChildIdentifiers( conf, 1, false ); 166 } 167 168 @Override 169 public String[] getChildIdentifiers( Properties conf, int depth, boolean fullyQualified ) throws IOException 170 { 171 if( !resourceExists( conf ) ) 172 return new String[ 0 ]; 173 174 Set<String> results = new LinkedHashSet<String>(); 175 176 getChildPaths( results, getIdentifier(), depth ); 177 178 String[] allPaths = results.toArray( new String[ results.size() ] ); 179 180 if( !fullyQualified ) 181 return allPaths; 182 183 for( int i = 0; i < allPaths.length; i++ ) 184 allPaths[ i ] = fullyQualifyIdentifier( allPaths[ i ] ); 185 186 return allPaths; 187 } 188 189 private boolean getChildPaths( Set<String> results, String identifier, int depth ) 190 { 191 File file = new File( identifier ); 192 193 if( depth == 0 || file.isFile() ) 194 { 195 results.add( identifier ); 196 return true; 197 } 198 199 String[] paths = file.list(); 200 201 if( paths == null ) 202 return false; 203 204 boolean result = false; 205 206 for( String path : paths ) 207 result |= getChildPaths( results, new File( file, path ).getPath(), depth - 1 ); 208 209 return result; 210 } 211 }