001/* 002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.platform.hadoop; 022 023import java.io.File; 024import java.io.IOException; 025import java.util.Map; 026 027import cascading.flow.FlowConnector; 028import cascading.flow.FlowProcess; 029import cascading.flow.FlowProps; 030import cascading.flow.FlowSession; 031import cascading.flow.hadoop.HadoopFlowConnector; 032import cascading.flow.hadoop.HadoopFlowProcess; 033import cascading.flow.hadoop.planner.HadoopPlanner; 034import cascading.util.Util; 035import org.apache.hadoop.fs.FileSystem; 036import org.apache.hadoop.hdfs.MiniDFSCluster; 037import org.apache.hadoop.mapred.JobConf; 038import org.apache.hadoop.mapred.MiniMRCluster; 039import org.slf4j.Logger; 040import org.slf4j.LoggerFactory; 041 042/** 043 * Class HadoopPlatform is automatically loaded and injected into a {@link cascading.PlatformTestCase} instance 044 * so that all *PlatformTest classes can be tested against Apache Hadoop. 045 * <p/> 046 * This platform works in three modes. 047 * <p/> 048 * Hadoop standalone mode is when Hadoop is NOT run as a cluster, and all 049 * child tasks are in process and in memory of the "client" side code. 050 * <p/> 051 * Hadoop mini cluster mode where a cluster is created on demand using the Hadoop MiniDFSCluster and MiniMRCluster 052 * utilities. When a PlatformTestCase requests to use a cluster, this is the default cluster. All properties are 053 * pulled from the current CLASSPATH via the JobConf. 054 * <p/> 055 * Lastly remote cluster mode is enabled when the System property "mapred.jar" is set. This is a Hadoop property 056 * specifying the Hadoop "job jar" to be used cluster side. This MUST be the Cascading test suite and dependencies 057 * packaged in a Hadoop compatible way. This is left to be implemented by the framework using this mode. Additionally 058 * these properties may optionally be set if not already in the CLASSPATH; fs.default.name and mapred.job.tracker. 059 */ 060public class HadoopPlatform extends BaseHadoopPlatform<JobConf> 061 { 062 private static final Logger LOG = LoggerFactory.getLogger( HadoopPlatform.class ); 063 064 public transient static MiniDFSCluster dfs; 065 public transient static MiniMRCluster mr; 066 067 public HadoopPlatform() 068 { 069 } 070 071 @Override 072 public FlowConnector getFlowConnector( Map<Object, Object> properties ) 073 { 074 return new HadoopFlowConnector( properties ); 075 } 076 077 @Override 078 public void setNumMapTasks( Map<Object, Object> properties, int numMapTasks ) 079 { 080 properties.put( "mapred.map.tasks", Integer.toString( numMapTasks ) ); 081 } 082 083 @Override 084 public void setNumReduceTasks( Map<Object, Object> properties, int numReduceTasks ) 085 { 086 properties.put( "mapred.reduce.tasks", Integer.toString( numReduceTasks ) ); 087 } 088 089 @Override 090 public Integer getNumMapTasks( Map<Object, Object> properties ) 091 { 092 if( properties.get( "mapred.map.tasks" ) == null ) 093 return null; 094 095 return Integer.parseInt( properties.get( "mapred.map.tasks" ).toString() ); 096 } 097 098 @Override 099 public Integer getNumReduceTasks( Map<Object, Object> properties ) 100 { 101 if( properties.get( "mapred.reduce.tasks" ) == null ) 102 return null; 103 104 return Integer.parseInt( properties.get( "mapred.reduce.tasks" ).toString() ); 105 } 106 107 @Override 108 public JobConf getConfiguration() 109 { 110 return new JobConf( configuration ); 111 } 112 113 @Override 114 public FlowProcess getFlowProcess() 115 { 116 return new HadoopFlowProcess( FlowSession.NULL, (JobConf) getConfiguration(), true ); 117 } 118 119 @Override 120 public synchronized void setUp() throws IOException 121 { 122 if( configuration != null ) 123 return; 124 125 if( !isUseCluster() ) 126 { 127 LOG.info( "not using cluster" ); 128 configuration = new JobConf(); 129 130 // enforce the local file system in local mode 131 configuration.set( "fs.default.name", "file:///" ); 132 configuration.set( "mapred.job.tracker", "local" ); 133 configuration.set( "mapreduce.jobtracker.staging.root.dir", System.getProperty( "user.dir" ) + "/build/tmp/cascading/staging" ); 134 135 String stagingDir = configuration.get( "mapreduce.jobtracker.staging.root.dir" ); 136 137 if( Util.isEmpty( stagingDir ) ) 138 configuration.set( "mapreduce.jobtracker.staging.root.dir", System.getProperty( "user.dir" ) + "/build/tmp/cascading/staging" ); 139 140 fileSys = FileSystem.get( configuration ); 141 } 142 else 143 { 144 LOG.info( "using cluster" ); 145 146 if( Util.isEmpty( System.getProperty( "hadoop.log.dir" ) ) ) 147 System.setProperty( "hadoop.log.dir", "cascading-hadoop/build/test/log" ); 148 149 if( Util.isEmpty( System.getProperty( "hadoop.tmp.dir" ) ) ) 150 System.setProperty( "hadoop.tmp.dir", "cascading-hadoop/build/test/tmp" ); 151 152 new File( System.getProperty( "hadoop.log.dir" ) ).mkdirs(); // ignored 153 154 JobConf conf = new JobConf(); 155 156 if( getApplicationJar() != null ) 157 { 158 LOG.info( "using a remote cluster with jar: {}", getApplicationJar() ); 159 configuration = conf; 160 161 ( (JobConf) configuration ).setJar( getApplicationJar() ); 162 163 if( !Util.isEmpty( System.getProperty( "fs.default.name" ) ) ) 164 { 165 LOG.info( "using {}={}", "fs.default.name", System.getProperty( "fs.default.name" ) ); 166 configuration.set( "fs.default.name", System.getProperty( "fs.default.name" ) ); 167 } 168 169 if( !Util.isEmpty( System.getProperty( "mapred.job.tracker" ) ) ) 170 { 171 LOG.info( "using {}={}", "mapred.job.tracker", System.getProperty( "mapred.job.tracker" ) ); 172 configuration.set( "mapred.job.tracker", System.getProperty( "mapred.job.tracker" ) ); 173 } 174 175 configuration.set( "mapreduce.user.classpath.first", "true" ); // use test dependencies 176 fileSys = FileSystem.get( configuration ); 177 } 178 else 179 { 180 dfs = new MiniDFSCluster( conf, 4, true, null ); 181 fileSys = dfs.getFileSystem(); 182 mr = new MiniMRCluster( 4, fileSys.getUri().toString(), 1, null, null, conf ); 183 184 configuration = mr.createJobConf(); 185 } 186 187// jobConf.set( "mapred.map.max.attempts", "1" ); 188// jobConf.set( "mapred.reduce.max.attempts", "1" ); 189 configuration.set( "mapred.child.java.opts", "-Xmx512m" ); 190 configuration.setInt( "mapred.job.reuse.jvm.num.tasks", -1 ); 191 configuration.setInt( "jobclient.completion.poll.interval", 50 ); 192 configuration.setInt( "jobclient.progress.monitor.poll.interval", 50 ); 193 ( (JobConf) configuration ).setMapSpeculativeExecution( false ); 194 ( (JobConf) configuration ).setReduceSpeculativeExecution( false ); 195 } 196 197 ( (JobConf) configuration ).setNumMapTasks( numMappers ); 198 ( (JobConf) configuration ).setNumReduceTasks( numReducers ); 199 200 Map<Object, Object> globalProperties = getGlobalProperties(); 201 202 if( logger != null ) 203 globalProperties.put( "log4j.logger", logger ); 204 205 FlowProps.setJobPollingInterval( globalProperties, 10 ); // should speed up tests 206 207 HadoopPlanner.copyProperties( (JobConf) configuration, globalProperties ); // copy any external properties 208 209 HadoopPlanner.copyJobConf( properties, (JobConf) configuration ); // put all properties on the jobconf 210 } 211 }