001/*
002 * Copyright (c) 2016-2017 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
004 *
005 * Project and contact information: http://www.cascading.org/
006 *
007 * This file is part of the Cascading project.
008 *
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *     http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 */
021
022package cascading.tap.hadoop;
023
024import java.beans.ConstructorProperties;
025import java.io.IOException;
026import java.net.URI;
027
028import cascading.scheme.Scheme;
029import cascading.tap.SinkMode;
030import cascading.tap.TapException;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.fs.FileSystem;
033
034/**
035 * Class Dfs is a {@link cascading.tap.Tap} class that provides access to the Hadoop Distributed File System.
036 * <p>
037 * Use the {@link URI} constructors to specify a different HDFS cluster than the default.
038 */
039public class Dfs extends Hfs
040  {
041  /**
042   * Constructor Dfs creates a new Dfs instance.
043   *
044   * @param scheme of type Scheme
045   * @param uri    of type URI
046   */
047  @ConstructorProperties({"scheme", "uri"})
048  public Dfs( Scheme scheme, URI uri )
049    {
050    super( scheme, uri.getPath() );
051
052    init( uri );
053    }
054
055  /**
056   * Constructor Dfs creates a new Dfs instance.
057   *
058   * @param scheme   of type Scheme
059   * @param uri      of type URI
060   * @param sinkMode of type SinkMode
061   */
062  @ConstructorProperties({"scheme", "uri", "sinkMode"})
063  public Dfs( Scheme scheme, URI uri, SinkMode sinkMode )
064    {
065    super( scheme, uri.getPath(), sinkMode );
066
067    init( uri );
068    }
069
070  /**
071   * Constructor Dfs creates a new Dfs instance.
072   *
073   * @param scheme     of type Scheme
074   * @param stringPath of type String
075   */
076  @ConstructorProperties({"scheme", "stringPath"})
077  public Dfs( Scheme scheme, String stringPath )
078    {
079    super( scheme, stringPath );
080    }
081
082  /**
083   * Constructor Dfs creates a new Dfs instance.
084   *
085   * @param scheme     of type Scheme
086   * @param stringPath of type String
087   * @param sinkMode   of type SinkMode
088   */
089  @ConstructorProperties({"scheme", "stringPath", "sinkMode"})
090  public Dfs( Scheme scheme, String stringPath, SinkMode sinkMode )
091    {
092    super( scheme, stringPath, sinkMode );
093    }
094
095  private void init( URI uri )
096    {
097    if( !uri.getScheme().equalsIgnoreCase( "hdfs" ) )
098      throw new IllegalArgumentException( "uri must use the hdfs scheme" );
099
100    setUriScheme( URI.create( uri.getScheme() + "://" + uri.getAuthority() ) );
101    }
102
103  protected void setStringPath( String stringPath )
104    {
105    if( stringPath.matches( ".*://.*" ) && !stringPath.startsWith( "hdfs://" ) )
106      throw new IllegalArgumentException( "uri must use the hdfs scheme" );
107
108    super.setStringPath( stringPath );
109    }
110
111  @Override
112  protected FileSystem getDefaultFileSystem( Configuration configuration )
113    {
114    String name = configuration.get( "fs.default.name", "hdfs://localhost:5001/" );
115
116    if( name.equals( "local" ) || name.matches( ".*://.*" ) && !name.startsWith( "hdfs://" ) )
117      name = "hdfs://localhost:5001/";
118    else if( name.indexOf( '/' ) == -1 )
119      name = "hdfs://" + name;
120
121    try
122      {
123      return FileSystem.get( URI.create( name ), configuration );
124      }
125    catch( IOException exception )
126      {
127      throw new TapException( "unable to get handle to get filesystem for: " + name, exception );
128      }
129    }
130  }