001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.platform.hadoop;
022    
023    import java.io.File;
024    import java.io.IOException;
025    import java.util.Map;
026    
027    import cascading.flow.FlowConnector;
028    import cascading.flow.FlowProps;
029    import cascading.flow.hadoop.HadoopFlowConnector;
030    import cascading.flow.hadoop.planner.HadoopPlanner;
031    import cascading.util.Util;
032    import org.apache.hadoop.fs.FileSystem;
033    import org.apache.hadoop.hdfs.MiniDFSCluster;
034    import org.apache.hadoop.mapred.JobConf;
035    import org.apache.hadoop.mapred.MiniMRCluster;
036    import org.slf4j.Logger;
037    import org.slf4j.LoggerFactory;
038    
039    /**
040     * Class HadoopPlatform is automatically loaded and injected into a {@link cascading.PlatformTestCase} instance
041     * so that all *PlatformTest classes can be tested against Apache Hadoop.
042     * <p/>
043     * This platform works in three modes.
044     * <p/>
045     * Hadoop standalone mode is when Hadoop is NOT run as a cluster, and all
046     * child tasks are in process and in memory of the "client" side code.
047     * <p/>
048     * Hadoop mini cluster mode where a cluster is created on demand using the Hadoop MiniDFSCluster and MiniMRCluster
049     * utilities. When a PlatformTestCase requests to use a cluster, this is the default cluster. All properties are
050     * pulled from the current CLASSPATH via the JobConf.
051     * <p/>
052     * Lastly remote cluster mode is enabled when the System property "mapred.jar" is set. This is a Hadoop property
053     * specifying the Hadoop "job jar" to be used cluster side. This MUST be the Cascading test suite and dependencies
054     * packaged in a Hadoop compatible way. This is left to be implemented by the framework using this mode. Additionally
055     * these properties may optionally be set if not already in the CLASSPATH; fs.default.name and mapred.job.tracker.
056     */
057    public class HadoopPlatform extends BaseHadoopPlatform
058      {
059      private static final Logger LOG = LoggerFactory.getLogger( HadoopPlatform.class );
060    
061      public transient static MiniDFSCluster dfs;
062      public transient static MiniMRCluster mr;
063    
064      public HadoopPlatform()
065        {
066        }
067    
068      @Override
069      public FlowConnector getFlowConnector( Map<Object, Object> properties )
070        {
071        return new HadoopFlowConnector( properties );
072        }
073    
074      @Override
075      public void setNumMapTasks( Map<Object, Object> properties, int numMapTasks )
076        {
077        properties.put( "mapred.map.tasks", Integer.toString( numMapTasks ) );
078        }
079    
080      @Override
081      public void setNumReduceTasks( Map<Object, Object> properties, int numReduceTasks )
082        {
083        properties.put( "mapred.reduce.tasks", Integer.toString( numReduceTasks ) );
084        }
085    
086      @Override
087      public Integer getNumMapTasks( Map<Object, Object> properties )
088        {
089        if( properties.get( "mapred.map.tasks" ) == null )
090          return null;
091    
092        return Integer.parseInt( properties.get( "mapred.map.tasks" ).toString() );
093        }
094    
095      @Override
096      public Integer getNumReduceTasks( Map<Object, Object> properties )
097        {
098        if( properties.get( "mapred.reduce.tasks" ) == null )
099          return null;
100    
101        return Integer.parseInt( properties.get( "mapred.reduce.tasks" ).toString() );
102        }
103    
104      @Override
105      public synchronized void setUp() throws IOException
106        {
107        if( jobConf != null )
108          return;
109    
110        if( !isUseCluster() )
111          {
112          LOG.info( "not using cluster" );
113          jobConf = new JobConf();
114    
115          // enforce the local file system in local mode
116          jobConf.set( "fs.default.name", "file:///" );
117          jobConf.set( "mapred.job.tracker", "local" );
118          jobConf.set( "mapreduce.jobtracker.staging.root.dir", System.getProperty( "user.dir" ) + "/build/tmp/cascading/staging" );
119    
120          fileSys = FileSystem.get( jobConf );
121          }
122        else
123          {
124          LOG.info( "using cluster" );
125    
126          if( Util.isEmpty( System.getProperty( "hadoop.log.dir" ) ) )
127            System.setProperty( "hadoop.log.dir", "build/test/log" );
128    
129          if( Util.isEmpty( System.getProperty( "hadoop.tmp.dir" ) ) )
130            System.setProperty( "hadoop.tmp.dir", "build/test/tmp" );
131    
132          new File( System.getProperty( "hadoop.log.dir" ) ).mkdirs(); // ignored
133    
134          JobConf conf = new JobConf();
135    
136          if( !Util.isEmpty( System.getProperty( "mapred.jar" ) ) )
137            {
138            LOG.info( "using a remote cluster with jar: {}", System.getProperty( "mapred.jar" ) );
139            jobConf = conf;
140    
141            ( (JobConf) jobConf ).setJar( System.getProperty( "mapred.jar" ) );
142    
143            if( !Util.isEmpty( System.getProperty( "fs.default.name" ) ) )
144              {
145              LOG.info( "using {}={}", "fs.default.name", System.getProperty( "fs.default.name" ) );
146              jobConf.set( "fs.default.name", System.getProperty( "fs.default.name" ) );
147              }
148    
149            if( !Util.isEmpty( System.getProperty( "mapred.job.tracker" ) ) )
150              {
151              LOG.info( "using {}={}", "mapred.job.tracker", System.getProperty( "mapred.job.tracker" ) );
152              jobConf.set( "mapred.job.tracker", System.getProperty( "mapred.job.tracker" ) );
153              }
154    
155            jobConf.set( "mapreduce.user.classpath.first", "true" ); // use test dependencies
156            fileSys = FileSystem.get( jobConf );
157            }
158          else
159            {
160            dfs = new MiniDFSCluster( conf, 4, true, null );
161            fileSys = dfs.getFileSystem();
162            mr = new MiniMRCluster( 4, fileSys.getUri().toString(), 1, null, null, conf );
163    
164            jobConf = mr.createJobConf();
165            }
166    
167    //      jobConf.set( "mapred.map.max.attempts", "1" );
168    //      jobConf.set( "mapred.reduce.max.attempts", "1" );
169          jobConf.set( "mapred.child.java.opts", "-Xmx512m" );
170          jobConf.setInt( "mapred.job.reuse.jvm.num.tasks", -1 );
171          jobConf.setInt( "jobclient.completion.poll.interval", 50 );
172          jobConf.setInt( "jobclient.progress.monitor.poll.interval", 50 );
173          ( (JobConf) jobConf ).setMapSpeculativeExecution( false );
174          ( (JobConf) jobConf ).setReduceSpeculativeExecution( false );
175          }
176    
177        ( (JobConf) jobConf ).setNumMapTasks( numMapTasks );
178        ( (JobConf) jobConf ).setNumReduceTasks( numReduceTasks );
179    
180        Map<Object, Object> globalProperties = getGlobalProperties();
181    
182        if( logger != null )
183          globalProperties.put( "log4j.logger", logger );
184    
185        FlowProps.setJobPollingInterval( globalProperties, 10 ); // should speed up tests
186    
187        HadoopPlanner.copyProperties( (JobConf) jobConf, globalProperties ); // copy any external properties
188    
189        HadoopPlanner.copyJobConf( properties, (JobConf) jobConf ); // put all properties on the jobconf
190        }
191      }