001/*
002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.platform.hadoop;
022
023import java.io.File;
024import java.io.IOException;
025import java.util.Map;
026
027import cascading.flow.FlowConnector;
028import cascading.flow.FlowProcess;
029import cascading.flow.FlowProps;
030import cascading.flow.FlowSession;
031import cascading.flow.hadoop.HadoopFlowConnector;
032import cascading.flow.hadoop.HadoopFlowProcess;
033import cascading.flow.hadoop.planner.HadoopPlanner;
034import cascading.util.Util;
035import org.apache.hadoop.fs.FileSystem;
036import org.apache.hadoop.hdfs.MiniDFSCluster;
037import org.apache.hadoop.mapred.JobConf;
038import org.apache.hadoop.mapred.MiniMRCluster;
039import org.slf4j.Logger;
040import org.slf4j.LoggerFactory;
041
042/**
043 * Class HadoopPlatform is automatically loaded and injected into a {@link cascading.PlatformTestCase} instance
044 * so that all *PlatformTest classes can be tested against Apache Hadoop.
045 * <p/>
046 * This platform works in three modes.
047 * <p/>
048 * Hadoop standalone mode is when Hadoop is NOT run as a cluster, and all
049 * child tasks are in process and in memory of the "client" side code.
050 * <p/>
051 * Hadoop mini cluster mode where a cluster is created on demand using the Hadoop MiniDFSCluster and MiniMRCluster
052 * utilities. When a PlatformTestCase requests to use a cluster, this is the default cluster. All properties are
053 * pulled from the current CLASSPATH via the JobConf.
054 * <p/>
055 * Lastly remote cluster mode is enabled when the System property "mapred.jar" is set. This is a Hadoop property
056 * specifying the Hadoop "job jar" to be used cluster side. This MUST be the Cascading test suite and dependencies
057 * packaged in a Hadoop compatible way. This is left to be implemented by the framework using this mode. Additionally
058 * these properties may optionally be set if not already in the CLASSPATH; fs.default.name and mapred.job.tracker.
059 */
060public class HadoopPlatform extends BaseHadoopPlatform<JobConf>
061  {
062  private static final Logger LOG = LoggerFactory.getLogger( HadoopPlatform.class );
063
064  public transient static MiniDFSCluster dfs;
065  public transient static MiniMRCluster mr;
066
067  public HadoopPlatform()
068    {
069    }
070
071  @Override
072  public FlowConnector getFlowConnector( Map<Object, Object> properties )
073    {
074    return new HadoopFlowConnector( properties );
075    }
076
077  @Override
078  public void setNumMapTasks( Map<Object, Object> properties, int numMapTasks )
079    {
080    properties.put( "mapred.map.tasks", Integer.toString( numMapTasks ) );
081    }
082
083  @Override
084  public void setNumReduceTasks( Map<Object, Object> properties, int numReduceTasks )
085    {
086    properties.put( "mapred.reduce.tasks", Integer.toString( numReduceTasks ) );
087    }
088
089  @Override
090  public Integer getNumMapTasks( Map<Object, Object> properties )
091    {
092    if( properties.get( "mapred.map.tasks" ) == null )
093      return null;
094
095    return Integer.parseInt( properties.get( "mapred.map.tasks" ).toString() );
096    }
097
098  @Override
099  public Integer getNumReduceTasks( Map<Object, Object> properties )
100    {
101    if( properties.get( "mapred.reduce.tasks" ) == null )
102      return null;
103
104    return Integer.parseInt( properties.get( "mapred.reduce.tasks" ).toString() );
105    }
106
107  @Override
108  public JobConf getConfiguration()
109    {
110    return new JobConf( configuration );
111    }
112
113  @Override
114  public FlowProcess getFlowProcess()
115    {
116    return new HadoopFlowProcess( FlowSession.NULL, (JobConf) getConfiguration(), true );
117    }
118
119  @Override
120  public synchronized void setUp() throws IOException
121    {
122    if( configuration != null )
123      return;
124
125    if( !isUseCluster() )
126      {
127      LOG.info( "not using cluster" );
128      configuration = new JobConf();
129
130      // enforce the local file system in local mode
131      configuration.set( "fs.default.name", "file:///" );
132      configuration.set( "mapred.job.tracker", "local" );
133      configuration.set( "mapreduce.jobtracker.staging.root.dir", System.getProperty( "user.dir" ) + "/build/tmp/cascading/staging" );
134
135      String stagingDir = configuration.get( "mapreduce.jobtracker.staging.root.dir" );
136
137      if( Util.isEmpty( stagingDir ) )
138        configuration.set( "mapreduce.jobtracker.staging.root.dir", System.getProperty( "user.dir" ) + "/build/tmp/cascading/staging" );
139
140      fileSys = FileSystem.get( configuration );
141      }
142    else
143      {
144      LOG.info( "using cluster" );
145
146      if( Util.isEmpty( System.getProperty( "hadoop.log.dir" ) ) )
147        System.setProperty( "hadoop.log.dir", "cascading-hadoop/build/test/log" );
148
149      if( Util.isEmpty( System.getProperty( "hadoop.tmp.dir" ) ) )
150        System.setProperty( "hadoop.tmp.dir", "cascading-hadoop/build/test/tmp" );
151
152      new File( System.getProperty( "hadoop.log.dir" ) ).mkdirs(); // ignored
153
154      JobConf conf = new JobConf();
155
156      if( getApplicationJar() != null )
157        {
158        LOG.info( "using a remote cluster with jar: {}", getApplicationJar() );
159        configuration = conf;
160
161        ( (JobConf) configuration ).setJar( getApplicationJar() );
162
163        if( !Util.isEmpty( System.getProperty( "fs.default.name" ) ) )
164          {
165          LOG.info( "using {}={}", "fs.default.name", System.getProperty( "fs.default.name" ) );
166          configuration.set( "fs.default.name", System.getProperty( "fs.default.name" ) );
167          }
168
169        if( !Util.isEmpty( System.getProperty( "mapred.job.tracker" ) ) )
170          {
171          LOG.info( "using {}={}", "mapred.job.tracker", System.getProperty( "mapred.job.tracker" ) );
172          configuration.set( "mapred.job.tracker", System.getProperty( "mapred.job.tracker" ) );
173          }
174
175        configuration.set( "mapreduce.user.classpath.first", "true" ); // use test dependencies
176        fileSys = FileSystem.get( configuration );
177        }
178      else
179        {
180        dfs = new MiniDFSCluster( conf, 4, true, null );
181        fileSys = dfs.getFileSystem();
182        mr = new MiniMRCluster( 4, fileSys.getUri().toString(), 1, null, null, conf );
183
184        configuration = mr.createJobConf();
185        }
186
187//      jobConf.set( "mapred.map.max.attempts", "1" );
188//      jobConf.set( "mapred.reduce.max.attempts", "1" );
189      configuration.set( "mapred.child.java.opts", "-Xmx512m" );
190      configuration.setInt( "mapred.job.reuse.jvm.num.tasks", -1 );
191      configuration.setInt( "jobclient.completion.poll.interval", 50 );
192      configuration.setInt( "jobclient.progress.monitor.poll.interval", 50 );
193      ( (JobConf) configuration ).setMapSpeculativeExecution( false );
194      ( (JobConf) configuration ).setReduceSpeculativeExecution( false );
195      }
196
197    ( (JobConf) configuration ).setNumMapTasks( numMappers );
198    ( (JobConf) configuration ).setNumReduceTasks( numReducers );
199
200    Map<Object, Object> globalProperties = getGlobalProperties();
201
202    if( logger != null )
203      globalProperties.put( "log4j.logger", logger );
204
205    FlowProps.setJobPollingInterval( globalProperties, 10 ); // should speed up tests
206
207    HadoopPlanner.copyProperties( (JobConf) configuration, globalProperties ); // copy any external properties
208
209    HadoopPlanner.copyJobConf( properties, (JobConf) configuration ); // put all properties on the jobconf
210    }
211  }