001    /*
002     * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.tap.local;
022    
023    import java.io.File;
024    import java.io.FileInputStream;
025    import java.io.IOException;
026    import java.io.InputStream;
027    import java.io.OutputStream;
028    import java.util.LinkedHashSet;
029    import java.util.Properties;
030    import java.util.Set;
031    
032    import cascading.flow.FlowProcess;
033    import cascading.scheme.Scheme;
034    import cascading.tap.SinkMode;
035    import cascading.tap.Tap;
036    import cascading.tap.local.io.TapFileOutputStream;
037    import cascading.tap.type.FileType;
038    import cascading.tuple.TupleEntryCollector;
039    import cascading.tuple.TupleEntryIterator;
040    import cascading.tuple.TupleEntrySchemeCollector;
041    import cascading.tuple.TupleEntrySchemeIterator;
042    
043    /**
044     * Class FileTap is a {@link Tap} sub-class that allows for direct local file access.
045     * <p/>
046     * FileTap must be used with the {@link cascading.flow.local.LocalFlowConnector} to create
047     * {@link cascading.flow.Flow} instances that run in "local" mode.
048     */
049    public class FileTap extends Tap<Properties, InputStream, OutputStream> implements FileType<Properties>
050      {
051      private final String path;
052    
053      /**
054       * Constructor FileTap creates a new FileTap instance using the given {@link cascading.scheme.Scheme} and file {@code path}.
055       *
056       * @param scheme of type LocalScheme
057       * @param path   of type String
058       */
059      public FileTap( Scheme<Properties, InputStream, OutputStream, ?, ?> scheme, String path )
060        {
061        this( scheme, path, SinkMode.KEEP );
062        }
063    
064      /**
065       * Constructor FileTap creates a new FileTap instance using the given {@link cascading.scheme.Scheme},
066       * file {@code path}, and {@code SinkMode}.
067       *
068       * @param scheme   of type LocalScheme
069       * @param path     of type String
070       * @param sinkMode of type SinkMode
071       */
072      public FileTap( Scheme<Properties, InputStream, OutputStream, ?, ?> scheme, String path, SinkMode sinkMode )
073        {
074        super( scheme, sinkMode );
075        this.path = new File( path ).getPath(); // cleans path information
076        }
077    
078      @Override
079      public String getIdentifier()
080        {
081        return path;
082        }
083    
084      @Override
085      public String getFullIdentifier( Properties conf )
086        {
087        return fullyQualifyIdentifier( getIdentifier() );
088        }
089    
090      private String fullyQualifyIdentifier( String identifier )
091        {
092        return new File( identifier ).getAbsoluteFile().toURI().toString();
093        }
094    
095      @Override
096      public TupleEntryIterator openForRead( FlowProcess<Properties> flowProcess, InputStream input ) throws IOException
097        {
098        if( input == null )
099          input = new FileInputStream( getIdentifier() );
100    
101        return new TupleEntrySchemeIterator<Properties, InputStream>( flowProcess, getScheme(), input, getIdentifier() );
102        }
103    
104      @Override
105      public TupleEntryCollector openForWrite( FlowProcess<Properties> flowProcess, OutputStream output ) throws IOException
106        {
107        if( output == null )
108          output = new TapFileOutputStream( getIdentifier(), isUpdate() ); // append if we are in update mode
109    
110        return new TupleEntrySchemeCollector<Properties, OutputStream>( flowProcess, getScheme(), output, getIdentifier() );
111        }
112    
113      @Override
114      public long getSize( Properties conf ) throws IOException
115        {
116        File file = new File( getIdentifier() );
117    
118        if( file.isDirectory() )
119          return 0;
120    
121        return file.length();
122        }
123    
124      @Override
125      public boolean createResource( Properties conf ) throws IOException
126        {
127        File parentFile = new File( getIdentifier() ).getParentFile();
128    
129        return parentFile.exists() || parentFile.mkdirs();
130        }
131    
132      @Override
133      public boolean deleteResource( Properties conf ) throws IOException
134        {
135        return new File( getIdentifier() ).delete();
136        }
137    
138      @Override
139      public boolean commitResource( Properties conf ) throws IOException
140        {
141        return true;
142        }
143    
144      @Override
145      public boolean resourceExists( Properties conf ) throws IOException
146        {
147        return new File( getIdentifier() ).exists();
148        }
149    
150      @Override
151      public long getModifiedTime( Properties conf ) throws IOException
152        {
153        return new File( getIdentifier() ).lastModified();
154        }
155    
156      @Override
157      public boolean isDirectory( Properties conf ) throws IOException
158        {
159        return new File( getIdentifier() ).isDirectory();
160        }
161    
162      @Override
163      public String[] getChildIdentifiers( Properties conf ) throws IOException
164        {
165        return getChildIdentifiers( conf, 1, false );
166        }
167    
168      @Override
169      public String[] getChildIdentifiers( Properties conf, int depth, boolean fullyQualified ) throws IOException
170        {
171        if( !resourceExists( conf ) )
172          return new String[ 0 ];
173    
174        Set<String> results = new LinkedHashSet<String>();
175    
176        getChildPaths( results, getIdentifier(), depth );
177    
178        String[] allPaths = results.toArray( new String[ results.size() ] );
179    
180        if( !fullyQualified )
181          return allPaths;
182    
183        for( int i = 0; i < allPaths.length; i++ )
184          allPaths[ i ] = fullyQualifyIdentifier( allPaths[ i ] );
185    
186        return allPaths;
187        }
188    
189      private boolean getChildPaths( Set<String> results, String identifier, int depth )
190        {
191        File file = new File( identifier );
192    
193        if( depth == 0 || file.isFile() )
194          {
195          results.add( identifier );
196          return true;
197          }
198    
199        String[] paths = file.list();
200    
201        if( paths == null )
202          return false;
203    
204        boolean result = false;
205    
206        for( String path : paths )
207          result |= getChildPaths( results, new File( file, path ).getPath(), depth - 1 );
208    
209        return result;
210        }
211      }