001 /* 002 * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.platform.hadoop; 022 023 import java.io.File; 024 import java.io.IOException; 025 import java.util.Map; 026 027 import cascading.flow.FlowConnector; 028 import cascading.flow.FlowProps; 029 import cascading.flow.hadoop.HadoopFlowConnector; 030 import cascading.flow.hadoop.planner.HadoopPlanner; 031 import cascading.util.Util; 032 import org.apache.hadoop.fs.FileSystem; 033 import org.apache.hadoop.hdfs.MiniDFSCluster; 034 import org.apache.hadoop.mapred.JobConf; 035 import org.apache.hadoop.mapred.MiniMRCluster; 036 import org.slf4j.Logger; 037 import org.slf4j.LoggerFactory; 038 039 /** 040 * Class HadoopPlatform is automatically loaded and injected into a {@link cascading.PlatformTestCase} instance 041 * so that all *PlatformTest classes can be tested against Apache Hadoop. 042 * <p/> 043 * This platform works in three modes. 044 * <p/> 045 * Hadoop standalone mode is when Hadoop is NOT run as a cluster, and all 046 * child tasks are in process and in memory of the "client" side code. 047 * <p/> 048 * Hadoop mini cluster mode where a cluster is created on demand using the Hadoop MiniDFSCluster and MiniMRCluster 049 * utilities. When a PlatformTestCase requests to use a cluster, this is the default cluster. All properties are 050 * pulled from the current CLASSPATH via the JobConf. 051 * <p/> 052 * Lastly remote cluster mode is enabled when the System property "mapred.jar" is set. This is a Hadoop property 053 * specifying the Hadoop "job jar" to be used cluster side. This MUST be the Cascading test suite and dependencies 054 * packaged in a Hadoop compatible way. This is left to be implemented by the framework using this mode. Additionally 055 * these properties may optionally be set if not already in the CLASSPATH; fs.default.name and mapred.job.tracker. 056 */ 057 public class HadoopPlatform extends BaseHadoopPlatform 058 { 059 private static final Logger LOG = LoggerFactory.getLogger( HadoopPlatform.class ); 060 061 public transient static MiniDFSCluster dfs; 062 public transient static MiniMRCluster mr; 063 064 public HadoopPlatform() 065 { 066 } 067 068 @Override 069 public FlowConnector getFlowConnector( Map<Object, Object> properties ) 070 { 071 return new HadoopFlowConnector( properties ); 072 } 073 074 @Override 075 public void setNumMapTasks( Map<Object, Object> properties, int numMapTasks ) 076 { 077 properties.put( "mapred.map.tasks", Integer.toString( numMapTasks ) ); 078 } 079 080 @Override 081 public void setNumReduceTasks( Map<Object, Object> properties, int numReduceTasks ) 082 { 083 properties.put( "mapred.reduce.tasks", Integer.toString( numReduceTasks ) ); 084 } 085 086 @Override 087 public Integer getNumMapTasks( Map<Object, Object> properties ) 088 { 089 if( properties.get( "mapred.map.tasks" ) == null ) 090 return null; 091 092 return Integer.parseInt( properties.get( "mapred.map.tasks" ).toString() ); 093 } 094 095 @Override 096 public Integer getNumReduceTasks( Map<Object, Object> properties ) 097 { 098 if( properties.get( "mapred.reduce.tasks" ) == null ) 099 return null; 100 101 return Integer.parseInt( properties.get( "mapred.reduce.tasks" ).toString() ); 102 } 103 104 @Override 105 public synchronized void setUp() throws IOException 106 { 107 if( jobConf != null ) 108 return; 109 110 if( !isUseCluster() ) 111 { 112 LOG.info( "not using cluster" ); 113 jobConf = new JobConf(); 114 115 // enforce the local file system in local mode 116 jobConf.set( "fs.default.name", "file:///" ); 117 jobConf.set( "mapred.job.tracker", "local" ); 118 jobConf.set( "mapreduce.jobtracker.staging.root.dir", "build/tmp/cascading/staging" ); 119 120 fileSys = FileSystem.get( jobConf ); 121 } 122 else 123 { 124 LOG.info( "using cluster" ); 125 126 if( Util.isEmpty( System.getProperty( "hadoop.log.dir" ) ) ) 127 System.setProperty( "hadoop.log.dir", "build/test/log" ); 128 129 if( Util.isEmpty( System.getProperty( "hadoop.tmp.dir" ) ) ) 130 System.setProperty( "hadoop.tmp.dir", "build/test/tmp" ); 131 132 new File( System.getProperty( "hadoop.log.dir" ) ).mkdirs(); // ignored 133 134 JobConf conf = new JobConf(); 135 136 if( !Util.isEmpty( System.getProperty( "mapred.jar" ) ) ) 137 { 138 LOG.info( "using a remote cluster with jar: {}", System.getProperty( "mapred.jar" ) ); 139 jobConf = conf; 140 141 ( (JobConf) jobConf ).setJar( System.getProperty( "mapred.jar" ) ); 142 143 if( !Util.isEmpty( System.getProperty( "fs.default.name" ) ) ) 144 { 145 LOG.info( "using {}={}", "fs.default.name", System.getProperty( "fs.default.name" ) ); 146 jobConf.set( "fs.default.name", System.getProperty( "fs.default.name" ) ); 147 } 148 149 if( !Util.isEmpty( System.getProperty( "mapred.job.tracker" ) ) ) 150 { 151 LOG.info( "using {}={}", "mapred.job.tracker", System.getProperty( "mapred.job.tracker" ) ); 152 jobConf.set( "mapred.job.tracker", System.getProperty( "mapred.job.tracker" ) ); 153 } 154 155 jobConf.set( "mapreduce.user.classpath.first", "true" ); // use test dependencies 156 fileSys = FileSystem.get( jobConf ); 157 } 158 else 159 { 160 dfs = new MiniDFSCluster( conf, 4, true, null ); 161 fileSys = dfs.getFileSystem(); 162 mr = new MiniMRCluster( 4, fileSys.getUri().toString(), 1, null, null, conf ); 163 164 jobConf = mr.createJobConf(); 165 } 166 167 // jobConf.set( "mapred.map.max.attempts", "1" ); 168 // jobConf.set( "mapred.reduce.max.attempts", "1" ); 169 jobConf.set( "mapred.child.java.opts", "-Xmx512m" ); 170 jobConf.setInt( "mapred.job.reuse.jvm.num.tasks", -1 ); 171 jobConf.setInt( "jobclient.completion.poll.interval", 50 ); 172 jobConf.setInt( "jobclient.progress.monitor.poll.interval", 50 ); 173 ( (JobConf) jobConf ).setMapSpeculativeExecution( false ); 174 ( (JobConf) jobConf ).setReduceSpeculativeExecution( false ); 175 } 176 177 ( (JobConf) jobConf ).setNumMapTasks( numMapTasks ); 178 ( (JobConf) jobConf ).setNumReduceTasks( numReduceTasks ); 179 180 Map<Object, Object> globalProperties = getGlobalProperties(); 181 182 if( logger != null ) 183 globalProperties.put( "log4j.logger", logger ); 184 185 FlowProps.setJobPollingInterval( globalProperties, 10 ); // should speed up tests 186 187 HadoopPlanner.copyProperties( (JobConf) jobConf, globalProperties ); // copy any external properties 188 189 HadoopPlanner.copyJobConf( properties, (JobConf) jobConf ); // put all properties on the jobconf 190 } 191 }