001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.platform.hadoop2;
022    
023    import java.io.File;
024    import java.io.IOException;
025    import java.util.Map;
026    import java.util.Random;
027    
028    import cascading.flow.FlowConnector;
029    import cascading.flow.FlowProps;
030    import cascading.flow.hadoop2.Hadoop2MR1FlowConnector;
031    import cascading.flow.hadoop2.Hadoop2MR1Planner;
032    import cascading.platform.hadoop.BaseHadoopPlatform;
033    import cascading.util.Util;
034    import org.apache.hadoop.fs.FileSystem;
035    import org.apache.hadoop.hdfs.MiniDFSCluster;
036    import org.apache.hadoop.mapred.JobConf;
037    import org.apache.hadoop.mapred.MiniMRClientCluster;
038    import org.apache.hadoop.mapred.MiniMRClientClusterFactory;
039    import org.slf4j.Logger;
040    import org.slf4j.LoggerFactory;
041    
042    /**
043     * Class Hadoop2Platform is automatically loaded and injected into a {@link cascading.PlatformTestCase} instance
044     * so that all *PlatformTest classes can be tested against Apache Hadoop 2.x.
045     */
046    public class Hadoop2MR1Platform extends BaseHadoopPlatform
047      {
048      private static final Logger LOG = LoggerFactory.getLogger( Hadoop2MR1Platform.class );
049      private transient static MiniDFSCluster dfs;
050      private transient static MiniMRClientCluster mr;
051    
052      public Hadoop2MR1Platform()
053        {
054        }
055    
056      @Override
057      public String getName()
058        {
059        return "hadoop2-mr1";
060        }
061    
062      @Override
063      public FlowConnector getFlowConnector( Map<Object, Object> properties )
064        {
065        return new Hadoop2MR1FlowConnector( properties );
066        }
067    
068      @Override
069      public void setNumMapTasks( Map<Object, Object> properties, int numMapTasks )
070        {
071        properties.put( "mapreduce.job.maps", Integer.toString( numMapTasks ) );
072        }
073    
074      @Override
075      public void setNumReduceTasks( Map<Object, Object> properties, int numReduceTasks )
076        {
077        properties.put( "mapreduce.job.reduces", Integer.toString( numReduceTasks ) );
078        }
079    
080      @Override
081      public Integer getNumMapTasks( Map<Object, Object> properties )
082        {
083        if( properties.get( "mapreduce.job.maps" ) == null )
084          return null;
085    
086        return Integer.parseInt( properties.get( "mapreduce.job.maps" ).toString() );
087        }
088    
089      @Override
090      public Integer getNumReduceTasks( Map<Object, Object> properties )
091        {
092        if( properties.get( "mapreduce.job.reduces" ) == null )
093          return null;
094    
095        return Integer.parseInt( properties.get( "mapreduce.job.reduces" ).toString() );
096        }
097    
098      @Override
099      public synchronized void setUp() throws IOException
100        {
101        if( jobConf != null )
102          return;
103    
104        if( !isUseCluster() )
105          {
106          LOG.info( "not using cluster" );
107          jobConf = new JobConf();
108          fileSys = FileSystem.get( jobConf );
109          }
110        else
111          {
112          LOG.info( "using cluster" );
113    
114          if( Util.isEmpty( System.getProperty( "hadoop.log.dir" ) ) )
115            System.setProperty( "hadoop.log.dir", "build/test/log" );
116    
117          if( Util.isEmpty( System.getProperty( "hadoop.tmp.dir" ) ) )
118            System.setProperty( "hadoop.tmp.dir", "build/test/tmp" );
119    
120          new File( System.getProperty( "hadoop.log.dir" ) ).mkdirs(); // ignored
121    
122          JobConf conf = new JobConf();
123    
124          if( !Util.isEmpty( System.getProperty( "mapred.jar" ) ) )
125            {
126            LOG.info( "using a remote cluster with jar: {}", System.getProperty( "mapred.jar" ) );
127            jobConf = conf;
128    
129            ( (JobConf) jobConf ).setJar( System.getProperty( "mapred.jar" ) );
130    
131            if( !Util.isEmpty( System.getProperty( "fs.default.name" ) ) )
132              {
133              LOG.info( "using {}={}", "fs.default.name", System.getProperty( "fs.default.name" ) );
134              jobConf.set( "fs.default.name", System.getProperty( "fs.default.name" ) );
135              }
136    
137            if( !Util.isEmpty( System.getProperty( "mapred.job.tracker" ) ) )
138              {
139              LOG.info( "using {}={}", "mapred.job.tracker", System.getProperty( "mapred.job.tracker" ) );
140              jobConf.set( "mapred.job.tracker", System.getProperty( "mapred.job.tracker" ) );
141              }
142    
143            jobConf.set( "mapreduce.user.classpath.first", "true" ); // use test dependencies
144            fileSys = FileSystem.get( jobConf );
145            }
146          else
147            {
148            conf.setBoolean( "yarn.is.minicluster", true );
149    //      conf.setInt( "yarn.nodemanager.delete.debug-delay-sec", -1 );
150    //      conf.set( "yarn.scheduler.capacity.root.queues", "default" );
151    //      conf.set( "yarn.scheduler.capacity.root.default.capacity", "100" );
152            // disable blacklisting hosts not to fail localhost during unit tests
153            conf.setBoolean( "yarn.app.mapreduce.am.job.node-blacklisting.enable", false );
154    
155            dfs = new MiniDFSCluster( conf, 4, true, null );
156            fileSys = dfs.getFileSystem();
157    
158            FileSystem.setDefaultUri( conf, fileSys.getUri() );
159    
160            mr = MiniMRClientClusterFactory.create( this.getClass(), 4, conf );
161    
162            jobConf = mr.getConfig();
163            }
164    
165          jobConf.set( "mapred.child.java.opts", "-Xmx512m" );
166          jobConf.setInt( "mapreduce.job.jvm.numtasks", -1 );
167          jobConf.setInt( "mapreduce.client.completion.pollinterval", 50 );
168          jobConf.setInt( "mapreduce.client.progressmonitor.pollinterval", 50 );
169          jobConf.setBoolean( "mapreduce.map.speculative", false );
170          jobConf.setBoolean( "mapreduce.reduce.speculative", false );
171          }
172    
173        jobConf.setInt( "mapreduce.job.maps", numMapTasks );
174        jobConf.setInt( "mapreduce.job.reduces", numReduceTasks );
175    
176        Map<Object, Object> globalProperties = getGlobalProperties();
177    
178        if( logger != null )
179          globalProperties.put( "log4j.logger", logger );
180    
181        FlowProps.setJobPollingInterval( globalProperties, 10 ); // should speed up tests
182    
183        Hadoop2MR1Planner.copyProperties( jobConf, globalProperties ); // copy any external properties
184    
185        Hadoop2MR1Planner.copyConfiguration( properties, jobConf ); // put all properties on the jobconf
186        }
187      }