001/*
002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.platform.hadoop;
022
023import java.io.File;
024import java.io.FileNotFoundException;
025import java.io.IOException;
026import java.net.URI;
027import java.net.URISyntaxException;
028import java.util.Comparator;
029import java.util.HashMap;
030import java.util.Map;
031
032import cascading.platform.TestPlatform;
033import cascading.scheme.Scheme;
034import cascading.scheme.hadoop.TextDelimited;
035import cascading.scheme.hadoop.TextLine;
036import cascading.scheme.util.DelimitedParser;
037import cascading.scheme.util.FieldTypeResolver;
038import cascading.tap.SinkMode;
039import cascading.tap.Tap;
040import cascading.tap.hadoop.Hfs;
041import cascading.tap.hadoop.PartitionTap;
042import cascading.tap.hadoop.util.Hadoop18TapUtil;
043import cascading.tap.partition.Partition;
044import cascading.tuple.Fields;
045import org.apache.hadoop.conf.Configuration;
046import org.apache.hadoop.fs.FileStatus;
047import org.apache.hadoop.fs.FileSystem;
048import org.apache.hadoop.fs.FileUtil;
049import org.apache.hadoop.fs.Path;
050import org.slf4j.Logger;
051import org.slf4j.LoggerFactory;
052
053/**
054 *
055 */
056public abstract class BaseHadoopPlatform<Config extends Configuration> extends TestPlatform
057  {
058  private static final Logger LOG = LoggerFactory.getLogger( BaseHadoopPlatform.class );
059
060  public transient static FileSystem fileSys;
061  public transient static Configuration configuration;
062  public transient static Map<Object, Object> properties = new HashMap<Object, Object>();
063
064  protected String logger;
065
066  public BaseHadoopPlatform()
067    {
068    this.logger = System.getProperty( "log4j.logger" );
069    this.numMappers = 4;
070    this.numReducers = 1;
071    }
072
073  @Override
074  public boolean isMapReduce()
075    {
076    return true;
077    }
078
079  @Override
080  public void setNumMappers( int numMapTasks )
081    {
082    if( numMapTasks > 0 )
083      this.numMappers = numMapTasks;
084    }
085
086  @Override
087  public void setNumReducers( int numReduceTasks )
088    {
089    if( numReduceTasks > 0 )
090      this.numReducers = numReduceTasks;
091    }
092
093  @Override
094  public void setNumGatherPartitions( int numGatherPartitions )
095    {
096    if( numGatherPartitions > 0 )
097      this.numGatherPartitions = numGatherPartitions;
098    }
099
100  @Override
101  public Map<Object, Object> getProperties()
102    {
103    return new HashMap<Object, Object>( properties );
104    }
105
106  @Override
107  public void tearDown()
108    {
109    }
110
111  public abstract Config getConfiguration();
112
113  public boolean isHDFSAvailable()
114    {
115    try
116      {
117      FileSystem fileSystem = FileSystem.get( new URI( "hdfs:", null, null ), configuration );
118
119      return fileSystem != null;
120      }
121    catch( IOException exception ) // if no hdfs, a no filesystem for scheme io exception will be caught
122      {
123      LOG.warn( "unable to get hdfs filesystem", exception );
124      }
125    catch( URISyntaxException exception )
126      {
127      throw new RuntimeException( "internal failure", exception );
128      }
129
130    return false;
131    }
132
133  @Override
134  public void copyFromLocal( String inputFile ) throws IOException
135    {
136    if( !new File( inputFile ).exists() )
137      throw new FileNotFoundException( "data file not found: " + inputFile );
138
139    if( !isUseCluster() )
140      return;
141
142    Path path = new Path( safeFileName( inputFile ) );
143
144    if( !fileSys.exists( path ) )
145      FileUtil.copy( new File( inputFile ), fileSys, path, false, configuration );
146    }
147
148  @Override
149  public void copyToLocal( String outputFile ) throws IOException
150    {
151    if( !isUseCluster() )
152      return;
153
154    Path path = new Path( safeFileName( outputFile ) );
155
156    if( !fileSys.exists( path ) )
157      throw new FileNotFoundException( "data file not found: " + outputFile );
158
159    File file = new File( outputFile );
160
161    if( file.exists() )
162      file.delete();
163
164    if( fileSys.isFile( path ) )
165      {
166      // its a file, so just copy it over
167      FileUtil.copy( fileSys, path, file, false, configuration );
168      return;
169      }
170
171    // it's a directory
172    file.mkdirs();
173
174    FileStatus contents[] = fileSys.listStatus( path );
175
176    for( FileStatus fileStatus : contents )
177      {
178      Path currentPath = fileStatus.getPath();
179
180      if( currentPath.getName().startsWith( "_" ) ) // filter out temp and log dirs
181        continue;
182
183      FileUtil.copy( fileSys, currentPath, new File( file, currentPath.getName() ), false, configuration );
184      }
185    }
186
187  @Override
188  public boolean remoteExists( String outputFile ) throws IOException
189    {
190    return fileSys.exists( new Path( safeFileName( outputFile ) ) );
191    }
192
193  @Override
194  public boolean remoteRemove( String outputFile, boolean recursive ) throws IOException
195    {
196    return fileSys.delete( new Path( safeFileName( outputFile ) ), recursive );
197    }
198
199  @Override
200  public Tap getTap( Scheme scheme, String filename, SinkMode mode )
201    {
202    return new Hfs( scheme, safeFileName( filename ), mode );
203    }
204
205  @Override
206  public Tap getTextFile( Fields sourceFields, Fields sinkFields, String filename, SinkMode mode )
207    {
208    if( sourceFields == null )
209      return new Hfs( new TextLine(), safeFileName( filename ), mode );
210
211    return new Hfs( new TextLine( sourceFields, sinkFields ), safeFileName( filename ), mode );
212    }
213
214  @Override
215  public Tap getDelimitedFile( Fields fields, boolean hasHeader, String delimiter, String quote, Class[] types, String filename, SinkMode mode )
216    {
217    return new Hfs( new TextDelimited( fields, hasHeader, delimiter, quote, types ), safeFileName( filename ), mode );
218    }
219
220  @Override
221  public Tap getDelimitedFile( Fields fields, boolean skipHeader, boolean writeHeader, String delimiter, String quote, Class[] types, String filename, SinkMode mode )
222    {
223    return new Hfs( new TextDelimited( fields, skipHeader, writeHeader, delimiter, quote, types ), safeFileName( filename ), mode );
224    }
225
226  @Override
227  public Tap getDelimitedFile( String delimiter, String quote, FieldTypeResolver fieldTypeResolver, String filename, SinkMode mode )
228    {
229    return new Hfs( new TextDelimited( true, new DelimitedParser( delimiter, quote, fieldTypeResolver ) ), safeFileName( filename ), mode );
230    }
231
232  @Override
233  public Tap getPartitionTap( Tap sink, Partition partition, int openThreshold )
234    {
235    return new PartitionTap( (Hfs) sink, partition, openThreshold );
236    }
237
238  @Override
239  public Scheme getTestConfigDefScheme()
240    {
241    return new HadoopConfigDefScheme( new Fields( "line" ), isDAG() );
242    }
243
244  @Override
245  public Scheme getTestFailScheme()
246    {
247    return new HadoopFailScheme( new Fields( "line" ) );
248    }
249
250  @Override
251  public Comparator getLongComparator( boolean reverseSort )
252    {
253    return new TestLongComparator( reverseSort );
254    }
255
256  @Override
257  public Comparator getStringComparator( boolean reverseSort )
258    {
259    return new TestStringComparator( reverseSort );
260    }
261
262  @Override
263  public String getHiddenTemporaryPath()
264    {
265    return Hadoop18TapUtil.TEMPORARY_PATH;
266    }
267
268  /**
269   * Replaces characters, that are not allowed by HDFS with an "_".
270   *
271   * @param filename The filename to make safe
272   * @return The filename with all non-supported characters removed.
273   */
274  protected String safeFileName( String filename )
275    {
276    return filename.replace( ":", "_" ); // not using Util.cleansePathName as it removes /
277    }
278  }