001/* 002 * Copyright (c) 2016-2017 Chris K Wensel <chris@wensel.net>. All Rights Reserved. 003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 004 * 005 * Project and contact information: http://www.cascading.org/ 006 * 007 * This file is part of the Cascading project. 008 * 009 * Licensed under the Apache License, Version 2.0 (the "License"); 010 * you may not use this file except in compliance with the License. 011 * You may obtain a copy of the License at 012 * 013 * http://www.apache.org/licenses/LICENSE-2.0 014 * 015 * Unless required by applicable law or agreed to in writing, software 016 * distributed under the License is distributed on an "AS IS" BASIS, 017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 018 * See the License for the specific language governing permissions and 019 * limitations under the License. 020 */ 021 022package cascading.tap.hadoop; 023 024import java.beans.ConstructorProperties; 025import java.io.IOException; 026import java.net.URI; 027 028import cascading.scheme.Scheme; 029import cascading.tap.SinkMode; 030import cascading.tap.TapException; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.fs.FileSystem; 033 034/** 035 * Class Dfs is a {@link cascading.tap.Tap} class that provides access to the Hadoop Distributed File System. 036 * <p> 037 * Use the {@link URI} constructors to specify a different HDFS cluster than the default. 038 */ 039public class Dfs extends Hfs 040 { 041 /** 042 * Constructor Dfs creates a new Dfs instance. 043 * 044 * @param scheme of type Scheme 045 * @param uri of type URI 046 */ 047 @ConstructorProperties({"scheme", "uri"}) 048 public Dfs( Scheme scheme, URI uri ) 049 { 050 super( scheme, uri.getPath() ); 051 052 init( uri ); 053 } 054 055 /** 056 * Constructor Dfs creates a new Dfs instance. 057 * 058 * @param scheme of type Scheme 059 * @param uri of type URI 060 * @param sinkMode of type SinkMode 061 */ 062 @ConstructorProperties({"scheme", "uri", "sinkMode"}) 063 public Dfs( Scheme scheme, URI uri, SinkMode sinkMode ) 064 { 065 super( scheme, uri.getPath(), sinkMode ); 066 067 init( uri ); 068 } 069 070 /** 071 * Constructor Dfs creates a new Dfs instance. 072 * 073 * @param scheme of type Scheme 074 * @param stringPath of type String 075 */ 076 @ConstructorProperties({"scheme", "stringPath"}) 077 public Dfs( Scheme scheme, String stringPath ) 078 { 079 super( scheme, stringPath ); 080 } 081 082 /** 083 * Constructor Dfs creates a new Dfs instance. 084 * 085 * @param scheme of type Scheme 086 * @param stringPath of type String 087 * @param sinkMode of type SinkMode 088 */ 089 @ConstructorProperties({"scheme", "stringPath", "sinkMode"}) 090 public Dfs( Scheme scheme, String stringPath, SinkMode sinkMode ) 091 { 092 super( scheme, stringPath, sinkMode ); 093 } 094 095 private void init( URI uri ) 096 { 097 if( !uri.getScheme().equalsIgnoreCase( "hdfs" ) ) 098 throw new IllegalArgumentException( "uri must use the hdfs scheme" ); 099 100 setUriScheme( URI.create( uri.getScheme() + "://" + uri.getAuthority() ) ); 101 } 102 103 protected void setStringPath( String stringPath ) 104 { 105 if( stringPath.matches( ".*://.*" ) && !stringPath.startsWith( "hdfs://" ) ) 106 throw new IllegalArgumentException( "uri must use the hdfs scheme" ); 107 108 super.setStringPath( stringPath ); 109 } 110 111 @Override 112 protected FileSystem getDefaultFileSystem( Configuration configuration ) 113 { 114 String name = configuration.get( "fs.default.name", "hdfs://localhost:5001/" ); 115 116 if( name.equals( "local" ) || name.matches( ".*://.*" ) && !name.startsWith( "hdfs://" ) ) 117 name = "hdfs://localhost:5001/"; 118 else if( name.indexOf( '/' ) == -1 ) 119 name = "hdfs://" + name; 120 121 try 122 { 123 return FileSystem.get( URI.create( name ), configuration ); 124 } 125 catch( IOException exception ) 126 { 127 throw new TapException( "unable to get handle to get filesystem for: " + name, exception ); 128 } 129 } 130 }