001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.tap.hadoop.io;
022    
023    import java.io.FileNotFoundException;
024    import java.io.IOException;
025    import java.net.HttpURLConnection;
026    import java.net.URI;
027    import java.net.URISyntaxException;
028    import java.net.URL;
029    
030    import org.apache.hadoop.conf.Configuration;
031    import org.apache.hadoop.fs.FSDataInputStream;
032    import org.apache.hadoop.fs.FileStatus;
033    import org.apache.hadoop.fs.FileSystem;
034    import org.apache.hadoop.fs.Path;
035    import org.apache.hadoop.fs.PathFilter;
036    import org.slf4j.Logger;
037    import org.slf4j.LoggerFactory;
038    
039    /**
040     * Class HttpFileSystem provides a basic read-only {@link FileSystem} for accessing remote HTTP and HTTPS data.
041     * <p/>
042     * To use this FileSystem, just use regular http:// or https:// URLs.
043     */
044    public class HttpFileSystem extends StreamedFileSystem
045      {
046      /** Field LOG */
047      private static final Logger LOG = LoggerFactory.getLogger( HttpFileSystem.class );
048    
049      /** Field HTTP_SCHEME */
050      public static final String HTTP_SCHEME = "http";
051      /** Field HTTPS_SCHEME */
052      public static final String HTTPS_SCHEME = "https";
053    
054      static
055        {
056        HttpURLConnection.setFollowRedirects( true );
057        }
058    
059      /** Field scheme */
060      private String scheme;
061      /** Field authority */
062      private String authority;
063    
064      @Override
065      public void initialize( URI uri, Configuration configuration ) throws IOException
066        {
067        setConf( configuration );
068    
069        scheme = uri.getScheme();
070        authority = uri.getAuthority();
071        }
072    
073      @Override
074      public URI getUri()
075        {
076        try
077          {
078          return new URI( scheme, authority, null, null, null );
079          }
080        catch( URISyntaxException exception )
081          {
082          throw new RuntimeException( "failed parsing uri", exception );
083          }
084        }
085    
086      @Override
087      public FileStatus[] globStatus( Path path, PathFilter pathFilter ) throws IOException
088        {
089        FileStatus fileStatus = getFileStatus( path );
090    
091        if( fileStatus == null )
092          return null;
093    
094        return new FileStatus[]{fileStatus};
095        }
096    
097      @Override
098      public FSDataInputStream open( Path path, int i ) throws IOException
099        {
100        URL url = makeUrl( path );
101    
102        HttpURLConnection connection = (HttpURLConnection) url.openConnection();
103        connection.setRequestMethod( "GET" );
104        connection.connect();
105    
106        debugConnection( connection );
107    
108        return new FSDataInputStream( new FSDigestInputStream( connection.getInputStream(), getMD5SumFor( getConf(), path ) ) );
109        }
110    
111      @Override
112      public boolean exists( Path path ) throws IOException
113        {
114        URL url = makeUrl( path );
115    
116        HttpURLConnection connection = (HttpURLConnection) url.openConnection();
117        connection.setRequestMethod( "HEAD" );
118        connection.connect();
119    
120        debugConnection( connection );
121    
122        return connection.getResponseCode() == 200;
123        }
124    
125      @Override
126      public FileStatus getFileStatus( Path path ) throws IOException
127        {
128        URL url = makeUrl( path );
129    
130        HttpURLConnection connection = (HttpURLConnection) url.openConnection();
131        connection.setRequestMethod( "HEAD" );
132        connection.connect();
133    
134        debugConnection( connection );
135    
136        if( connection.getResponseCode() != 200 )
137          throw new FileNotFoundException( "could not find file: " + path );
138    
139        long length = connection.getHeaderFieldInt( "Content-Length", 0 );
140    
141        length = length < 0 ? 0 : length; // queries may return -1
142    
143        long modified = connection.getHeaderFieldDate( "Last-Modified", System.currentTimeMillis() );
144    
145        return new FileStatus( length, false, 1, getDefaultBlockSize(), modified, path );
146        }
147    
148      private void debugConnection( HttpURLConnection connection ) throws IOException
149        {
150        if( LOG.isDebugEnabled() )
151          {
152          LOG.debug( "connection.getURL() = {}", connection.getURL() );
153          LOG.debug( "connection.getRequestMethod() = {}", connection.getRequestMethod() );
154          LOG.debug( "connection.getResponseCode() = {}", connection.getResponseCode() );
155          LOG.debug( "connection.getResponseMessage() = {}", connection.getResponseMessage() );
156          LOG.debug( "connection.getContentLength() = {}", connection.getContentLength() );
157          }
158        }
159    
160      private URL makeUrl( Path path ) throws IOException
161        {
162        if( path.toString().startsWith( scheme ) )
163          return URI.create( path.toString() ).toURL();
164    
165        try
166          {
167          return new URI( scheme, authority, path.toString(), null, null ).toURL();
168          }
169        catch( URISyntaxException exception )
170          {
171          throw new IOException( exception.getMessage() );
172          }
173        }
174      }