001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.platform.hadoop;
022    
023    import java.io.File;
024    import java.io.FileNotFoundException;
025    import java.io.IOException;
026    import java.net.URI;
027    import java.net.URISyntaxException;
028    import java.util.Comparator;
029    import java.util.HashMap;
030    import java.util.Map;
031    
032    import cascading.flow.FlowProcess;
033    import cascading.flow.FlowSession;
034    import cascading.flow.hadoop.HadoopFlowProcess;
035    import cascading.platform.TestPlatform;
036    import cascading.scheme.Scheme;
037    import cascading.scheme.hadoop.TextDelimited;
038    import cascading.scheme.hadoop.TextLine;
039    import cascading.scheme.util.DelimitedParser;
040    import cascading.scheme.util.FieldTypeResolver;
041    import cascading.tap.SinkMode;
042    import cascading.tap.Tap;
043    import cascading.tap.hadoop.Hfs;
044    import cascading.tap.hadoop.PartitionTap;
045    import cascading.tap.hadoop.TemplateTap;
046    import cascading.tap.hadoop.util.Hadoop18TapUtil;
047    import cascading.tap.partition.Partition;
048    import cascading.tuple.Fields;
049    import org.apache.hadoop.conf.Configuration;
050    import org.apache.hadoop.fs.FileStatus;
051    import org.apache.hadoop.fs.FileSystem;
052    import org.apache.hadoop.fs.FileUtil;
053    import org.apache.hadoop.fs.Path;
054    import org.apache.hadoop.mapred.JobConf;
055    import org.slf4j.Logger;
056    import org.slf4j.LoggerFactory;
057    
058    /**
059     *
060     */
061    public abstract class BaseHadoopPlatform extends TestPlatform
062      {
063      private static final Logger LOG = LoggerFactory.getLogger( BaseHadoopPlatform.class );
064    
065      public transient static FileSystem fileSys;
066      public transient static Configuration jobConf;
067      public transient static Map<Object, Object> properties = new HashMap<Object, Object>();
068    
069      public int numMapTasks = 4;
070      public int numReduceTasks = 1;
071      protected String logger;
072    
073      public BaseHadoopPlatform()
074        {
075        logger = System.getProperty( "log4j.logger" );
076        }
077    
078      @Override
079      public boolean isMapReduce()
080        {
081        return true;
082        }
083    
084      public void setNumMapTasks( int numMapTasks )
085        {
086        if( numMapTasks > 0 )
087          this.numMapTasks = numMapTasks;
088        }
089    
090      public void setNumReduceTasks( int numReduceTasks )
091        {
092        if( numReduceTasks > 0 )
093          this.numReduceTasks = numReduceTasks;
094        }
095    
096      @Override
097      public Map<Object, Object> getProperties()
098        {
099        return new HashMap<Object, Object>( properties );
100        }
101    
102      @Override
103      public void tearDown()
104        {
105        }
106    
107      public JobConf getJobConf()
108        {
109        return new JobConf( jobConf );
110        }
111    
112      public boolean isHDFSAvailable()
113        {
114        try
115          {
116          FileSystem fileSystem = FileSystem.get( new URI( "hdfs:", null, null ), getJobConf() );
117    
118          return fileSystem != null;
119          }
120        catch( IOException exception ) // if no hdfs, a no filesystem for scheme io exception will be caught
121          {
122          LOG.warn( "unable to get hdfs filesystem", exception );
123          }
124        catch( URISyntaxException exception )
125          {
126          throw new RuntimeException( "internal failure", exception );
127          }
128    
129        return false;
130        }
131    
132      @Override
133      public FlowProcess getFlowProcess()
134        {
135        return new HadoopFlowProcess( FlowSession.NULL, getJobConf(), true );
136        }
137    
138      @Override
139      public void copyFromLocal( String inputFile ) throws IOException
140        {
141        if( !new File( inputFile ).exists() )
142          throw new FileNotFoundException( "data file not found: " + inputFile );
143    
144        if( !isUseCluster() )
145          return;
146    
147        Path path = new Path( safeFileName( inputFile ) );
148    
149        if( !fileSys.exists( path ) )
150          FileUtil.copy( new File( inputFile ), fileSys, path, false, jobConf );
151        }
152    
153      @Override
154      public void copyToLocal( String outputFile ) throws IOException
155        {
156        if( !isUseCluster() )
157          return;
158    
159        Path path = new Path( safeFileName( outputFile ) );
160    
161        if( !fileSys.exists( path ) )
162          throw new FileNotFoundException( "data file not found: " + outputFile );
163    
164        File file = new File( outputFile );
165    
166        if( file.exists() )
167          file.delete();
168    
169        if( fileSys.isFile( path ) )
170          {
171          // its a file, so just copy it over
172          FileUtil.copy( fileSys, path, file, false, jobConf );
173          return;
174          }
175    
176        // it's a directory
177        file.mkdirs();
178    
179        FileStatus contents[] = fileSys.listStatus( path );
180    
181        for( FileStatus fileStatus : contents )
182          {
183          Path currentPath = fileStatus.getPath();
184    
185          if( currentPath.getName().startsWith( "_" ) ) // filter out temp and log dirs
186            continue;
187    
188          FileUtil.copy( fileSys, currentPath, new File( file, currentPath.getName() ), false, jobConf );
189          }
190        }
191    
192      @Override
193      public boolean remoteExists( String outputFile ) throws IOException
194        {
195        return fileSys.exists( new Path( safeFileName( outputFile ) ) );
196        }
197    
198      @Override
199      public boolean remoteRemove( String outputFile, boolean recursive ) throws IOException
200        {
201        return fileSys.delete( new Path( safeFileName( outputFile ) ), recursive );
202        }
203    
204      @Override
205      public Tap getTap( Scheme scheme, String filename, SinkMode mode )
206        {
207        return new Hfs( scheme, safeFileName( filename ), mode );
208        }
209    
210      @Override
211      public Tap getTextFile( Fields sourceFields, Fields sinkFields, String filename, SinkMode mode )
212        {
213        if( sourceFields == null )
214          return new Hfs( new TextLine(), safeFileName( filename ), mode );
215    
216        return new Hfs( new TextLine( sourceFields, sinkFields ), safeFileName( filename ), mode );
217        }
218    
219      @Override
220      public Tap getDelimitedFile( Fields fields, boolean hasHeader, String delimiter, String quote, Class[] types, String filename, SinkMode mode )
221        {
222        return new Hfs( new TextDelimited( fields, hasHeader, delimiter, quote, types ), safeFileName( filename ), mode );
223        }
224    
225      @Override
226      public Tap getDelimitedFile( Fields fields, boolean skipHeader, boolean writeHeader, String delimiter, String quote, Class[] types, String filename, SinkMode mode )
227        {
228        return new Hfs( new TextDelimited( fields, skipHeader, writeHeader, delimiter, quote, types ), safeFileName( filename ), mode );
229        }
230    
231      @Override
232      public Tap getDelimitedFile( String delimiter, String quote, FieldTypeResolver fieldTypeResolver, String filename, SinkMode mode )
233        {
234        return new Hfs( new TextDelimited( true, new DelimitedParser( delimiter, quote, fieldTypeResolver ) ), safeFileName( filename ), mode );
235        }
236    
237      @Override
238      public Tap getTemplateTap( Tap sink, String pathTemplate, int openThreshold )
239        {
240        return new TemplateTap( (Hfs) sink, pathTemplate, openThreshold );
241        }
242    
243      @Override
244      public Tap getTemplateTap( Tap sink, String pathTemplate, Fields fields, int openThreshold )
245        {
246        return new TemplateTap( (Hfs) sink, pathTemplate, fields, openThreshold );
247        }
248    
249      @Override
250      public Tap getPartitionTap( Tap sink, Partition partition, int openThreshold )
251        {
252        return new PartitionTap( (Hfs) sink, partition, openThreshold );
253        }
254    
255      @Override
256      public Scheme getTestConfigDefScheme()
257        {
258        return new HadoopConfigDefScheme( new Fields( "line" ) );
259        }
260    
261      @Override
262      public Scheme getTestFailScheme()
263        {
264        return new HadoopFailScheme( new Fields( "line" ) );
265        }
266    
267      @Override
268      public Comparator getLongComparator( boolean reverseSort )
269        {
270        return new TestLongComparator( reverseSort );
271        }
272    
273      @Override
274      public Comparator getStringComparator( boolean reverseSort )
275        {
276        return new TestStringComparator( reverseSort );
277        }
278    
279      @Override
280      public String getHiddenTemporaryPath()
281        {
282        return Hadoop18TapUtil.TEMPORARY_PATH;
283        }
284    
285      /**
286       * Replaces characters, that are not allowed by HDFS with an "_".
287       *
288       * @param filename The filename to make safe
289       * @return The filename with all non-supported characters removed.
290       */
291      protected String safeFileName( String filename )
292        {
293        return filename.replace( ":", "_" ); // not using Util.cleansePathName as it removes /
294        }
295      }