001 /* 002 * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.platform.hadoop2; 022 023 import java.io.File; 024 import java.io.IOException; 025 import java.util.Map; 026 import java.util.Random; 027 028 import cascading.flow.FlowConnector; 029 import cascading.flow.FlowProps; 030 import cascading.flow.hadoop2.Hadoop2MR1FlowConnector; 031 import cascading.flow.hadoop2.Hadoop2MR1Planner; 032 import cascading.platform.hadoop.BaseHadoopPlatform; 033 import cascading.util.Util; 034 import org.apache.hadoop.fs.FileSystem; 035 import org.apache.hadoop.hdfs.MiniDFSCluster; 036 import org.apache.hadoop.mapred.JobConf; 037 import org.apache.hadoop.mapred.MiniMRClientCluster; 038 import org.apache.hadoop.mapred.MiniMRClientClusterFactory; 039 import org.slf4j.Logger; 040 import org.slf4j.LoggerFactory; 041 042 /** 043 * Class Hadoop2Platform is automatically loaded and injected into a {@link cascading.PlatformTestCase} instance 044 * so that all *PlatformTest classes can be tested against Apache Hadoop 2.x. 045 */ 046 public class Hadoop2MR1Platform extends BaseHadoopPlatform 047 { 048 private static final Logger LOG = LoggerFactory.getLogger( Hadoop2MR1Platform.class ); 049 private transient static MiniDFSCluster dfs; 050 private transient static MiniMRClientCluster mr; 051 052 public Hadoop2MR1Platform() 053 { 054 } 055 056 @Override 057 public String getName() 058 { 059 return "hadoop2-mr1"; 060 } 061 062 @Override 063 public FlowConnector getFlowConnector( Map<Object, Object> properties ) 064 { 065 return new Hadoop2MR1FlowConnector( properties ); 066 } 067 068 @Override 069 public void setNumMapTasks( Map<Object, Object> properties, int numMapTasks ) 070 { 071 properties.put( "mapreduce.job.maps", Integer.toString( numMapTasks ) ); 072 } 073 074 @Override 075 public void setNumReduceTasks( Map<Object, Object> properties, int numReduceTasks ) 076 { 077 properties.put( "mapreduce.job.reduces", Integer.toString( numReduceTasks ) ); 078 } 079 080 @Override 081 public Integer getNumMapTasks( Map<Object, Object> properties ) 082 { 083 if( properties.get( "mapreduce.job.maps" ) == null ) 084 return null; 085 086 return Integer.parseInt( properties.get( "mapreduce.job.maps" ).toString() ); 087 } 088 089 @Override 090 public Integer getNumReduceTasks( Map<Object, Object> properties ) 091 { 092 if( properties.get( "mapreduce.job.reduces" ) == null ) 093 return null; 094 095 return Integer.parseInt( properties.get( "mapreduce.job.reduces" ).toString() ); 096 } 097 098 @Override 099 public synchronized void setUp() throws IOException 100 { 101 if( jobConf != null ) 102 return; 103 104 if( !isUseCluster() ) 105 { 106 LOG.info( "not using cluster" ); 107 jobConf = new JobConf(); 108 fileSys = FileSystem.get( jobConf ); 109 } 110 else 111 { 112 LOG.info( "using cluster" ); 113 114 if( Util.isEmpty( System.getProperty( "hadoop.log.dir" ) ) ) 115 System.setProperty( "hadoop.log.dir", "build/test/log" ); 116 117 if( Util.isEmpty( System.getProperty( "hadoop.tmp.dir" ) ) ) 118 System.setProperty( "hadoop.tmp.dir", "build/test/tmp" ); 119 120 new File( System.getProperty( "hadoop.log.dir" ) ).mkdirs(); // ignored 121 122 JobConf conf = new JobConf(); 123 124 if( !Util.isEmpty( System.getProperty( "mapred.jar" ) ) ) 125 { 126 LOG.info( "using a remote cluster with jar: {}", System.getProperty( "mapred.jar" ) ); 127 jobConf = conf; 128 129 ( (JobConf) jobConf ).setJar( System.getProperty( "mapred.jar" ) ); 130 131 if( !Util.isEmpty( System.getProperty( "fs.default.name" ) ) ) 132 { 133 LOG.info( "using {}={}", "fs.default.name", System.getProperty( "fs.default.name" ) ); 134 jobConf.set( "fs.default.name", System.getProperty( "fs.default.name" ) ); 135 } 136 137 if( !Util.isEmpty( System.getProperty( "mapred.job.tracker" ) ) ) 138 { 139 LOG.info( "using {}={}", "mapred.job.tracker", System.getProperty( "mapred.job.tracker" ) ); 140 jobConf.set( "mapred.job.tracker", System.getProperty( "mapred.job.tracker" ) ); 141 } 142 143 jobConf.set( "mapreduce.user.classpath.first", "true" ); // use test dependencies 144 fileSys = FileSystem.get( jobConf ); 145 } 146 else 147 { 148 conf.setBoolean( "yarn.is.minicluster", true ); 149 // conf.setInt( "yarn.nodemanager.delete.debug-delay-sec", -1 ); 150 // conf.set( "yarn.scheduler.capacity.root.queues", "default" ); 151 // conf.set( "yarn.scheduler.capacity.root.default.capacity", "100" ); 152 // disable blacklisting hosts not to fail localhost during unit tests 153 conf.setBoolean( "yarn.app.mapreduce.am.job.node-blacklisting.enable", false ); 154 155 dfs = new MiniDFSCluster( conf, 4, true, null ); 156 fileSys = dfs.getFileSystem(); 157 158 FileSystem.setDefaultUri( conf, fileSys.getUri() ); 159 160 mr = MiniMRClientClusterFactory.create( this.getClass(), 4, conf ); 161 162 jobConf = mr.getConfig(); 163 } 164 165 jobConf.set( "mapred.child.java.opts", "-Xmx512m" ); 166 jobConf.setInt( "mapreduce.job.jvm.numtasks", -1 ); 167 jobConf.setInt( "mapreduce.client.completion.pollinterval", 50 ); 168 jobConf.setInt( "mapreduce.client.progressmonitor.pollinterval", 50 ); 169 jobConf.setBoolean( "mapreduce.map.speculative", false ); 170 jobConf.setBoolean( "mapreduce.reduce.speculative", false ); 171 } 172 173 jobConf.setInt( "mapreduce.job.maps", numMapTasks ); 174 jobConf.setInt( "mapreduce.job.reduces", numReduceTasks ); 175 176 Map<Object, Object> globalProperties = getGlobalProperties(); 177 178 if( logger != null ) 179 globalProperties.put( "log4j.logger", logger ); 180 181 FlowProps.setJobPollingInterval( globalProperties, 10 ); // should speed up tests 182 183 Hadoop2MR1Planner.copyProperties( jobConf, globalProperties ); // copy any external properties 184 185 Hadoop2MR1Planner.copyConfiguration( properties, jobConf ); // put all properties on the jobconf 186 } 187 }