001/*
002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.tap.local;
022
023import java.io.File;
024import java.io.FileInputStream;
025import java.io.IOException;
026import java.io.InputStream;
027import java.io.OutputStream;
028import java.util.LinkedHashSet;
029import java.util.Properties;
030import java.util.Set;
031
032import cascading.flow.FlowProcess;
033import cascading.scheme.Scheme;
034import cascading.tap.SinkMode;
035import cascading.tap.Tap;
036import cascading.tap.local.io.TapFileOutputStream;
037import cascading.tap.type.FileType;
038import cascading.tuple.TupleEntryCollector;
039import cascading.tuple.TupleEntryIterator;
040import cascading.tuple.TupleEntrySchemeCollector;
041import cascading.tuple.TupleEntrySchemeIterator;
042
043/**
044 * Class FileTap is a {@link Tap} sub-class that allows for direct local file access.
045 * <p/>
046 * FileTap must be used with the {@link cascading.flow.local.LocalFlowConnector} to create
047 * {@link cascading.flow.Flow} instances that run in "local" mode.
048 */
049public class FileTap extends Tap<Properties, InputStream, OutputStream> implements FileType<Properties>
050  {
051  private final String path;
052
053  /**
054   * Constructor FileTap creates a new FileTap instance using the given {@link cascading.scheme.Scheme} and file {@code path}.
055   *
056   * @param scheme of type LocalScheme
057   * @param path   of type String
058   */
059  public FileTap( Scheme<Properties, InputStream, OutputStream, ?, ?> scheme, String path )
060    {
061    this( scheme, path, SinkMode.KEEP );
062    }
063
064  /**
065   * Constructor FileTap creates a new FileTap instance using the given {@link cascading.scheme.Scheme},
066   * file {@code path}, and {@code SinkMode}.
067   *
068   * @param scheme   of type LocalScheme
069   * @param path     of type String
070   * @param sinkMode of type SinkMode
071   */
072  public FileTap( Scheme<Properties, InputStream, OutputStream, ?, ?> scheme, String path, SinkMode sinkMode )
073    {
074    super( scheme, sinkMode );
075    this.path = new File( path ).getPath(); // cleans path information
076    }
077
078  @Override
079  public String getIdentifier()
080    {
081    return path;
082    }
083
084  @Override
085  public String getFullIdentifier( Properties conf )
086    {
087    return fullyQualifyIdentifier( getIdentifier() );
088    }
089
090  private String fullyQualifyIdentifier( String identifier )
091    {
092    return new File( identifier ).getAbsoluteFile().toURI().toString();
093    }
094
095  @Override
096  public TupleEntryIterator openForRead( FlowProcess<? extends Properties> flowProcess, InputStream input ) throws IOException
097    {
098    if( input == null )
099      input = new FileInputStream( getIdentifier() );
100
101    return new TupleEntrySchemeIterator<Properties, InputStream>( flowProcess, getScheme(), input, getIdentifier() );
102    }
103
104  @Override
105  public TupleEntryCollector openForWrite( FlowProcess<? extends Properties> flowProcess, OutputStream output ) throws IOException
106    {
107    if( output == null )
108      output = new TapFileOutputStream( getIdentifier(), isUpdate() ); // append if we are in update mode
109
110    return new TupleEntrySchemeCollector<Properties, OutputStream>( flowProcess, getScheme(), output, getIdentifier() );
111    }
112
113  @Override
114  public boolean createResource( Properties conf ) throws IOException
115    {
116    File parentFile = new File( getIdentifier() ).getParentFile();
117
118    return parentFile.exists() || parentFile.mkdirs();
119    }
120
121  @Override
122  public boolean deleteResource( Properties conf ) throws IOException
123    {
124    return new File( getIdentifier() ).delete();
125    }
126
127  @Override
128  public boolean commitResource( Properties conf ) throws IOException
129    {
130    return true;
131    }
132
133  @Override
134  public boolean resourceExists( Properties conf ) throws IOException
135    {
136    return new File( getIdentifier() ).exists();
137    }
138
139  @Override
140  public long getModifiedTime( Properties conf ) throws IOException
141    {
142    return new File( getIdentifier() ).lastModified();
143    }
144
145  @Override
146  public boolean isDirectory( FlowProcess<? extends Properties> flowProcess ) throws IOException
147    {
148    return isDirectory( flowProcess.getConfig() );
149    }
150
151  @Override
152  public boolean isDirectory( Properties conf ) throws IOException
153    {
154    return new File( getIdentifier() ).isDirectory();
155    }
156
157  @Override
158  public String[] getChildIdentifiers( FlowProcess<? extends Properties> flowProcess ) throws IOException
159    {
160    return getChildIdentifiers( flowProcess.getConfig() );
161    }
162
163  @Override
164  public String[] getChildIdentifiers( Properties conf ) throws IOException
165    {
166    return getChildIdentifiers( conf, 1, false );
167    }
168
169  @Override
170  public String[] getChildIdentifiers( FlowProcess<? extends Properties> flowProcess, int depth, boolean fullyQualified ) throws IOException
171    {
172    return getChildIdentifiers( flowProcess.getConfig(), depth, fullyQualified );
173    }
174
175  @Override
176  public String[] getChildIdentifiers( Properties conf, int depth, boolean fullyQualified ) throws IOException
177    {
178    if( !resourceExists( conf ) )
179      return new String[ 0 ];
180
181    Set<String> results = new LinkedHashSet<String>();
182
183    getChildPaths( results, getIdentifier(), depth );
184
185    String[] allPaths = results.toArray( new String[ results.size() ] );
186
187    if( !fullyQualified )
188      return allPaths;
189
190    for( int i = 0; i < allPaths.length; i++ )
191      allPaths[ i ] = fullyQualifyIdentifier( allPaths[ i ] );
192
193    return allPaths;
194    }
195
196  @Override
197  public long getSize( FlowProcess<? extends Properties> flowProcess ) throws IOException
198    {
199    return getSize( flowProcess.getConfig() );
200    }
201
202  @Override
203  public long getSize( Properties conf ) throws IOException
204    {
205    File file = new File( getIdentifier() );
206
207    if( file.isDirectory() )
208      return 0;
209
210    return file.length();
211    }
212
213  private boolean getChildPaths( Set<String> results, String identifier, int depth )
214    {
215    File file = new File( identifier );
216
217    if( depth == 0 || file.isFile() )
218      {
219      results.add( identifier );
220      return true;
221      }
222
223    String[] paths = file.list();
224
225    if( paths == null )
226      return false;
227
228    boolean result = false;
229
230    for( String path : paths )
231      result |= getChildPaths( results, new File( file, path ).getPath(), depth - 1 );
232
233    return result;
234    }
235  }