001/* 002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.flow.hadoop.planner; 022 023import java.net.URI; 024import java.util.Map; 025import java.util.Properties; 026import java.util.Set; 027 028import cascading.flow.FlowConnector; 029import cascading.flow.FlowDef; 030import cascading.flow.FlowStep; 031import cascading.flow.hadoop.HadoopFlow; 032import cascading.flow.hadoop.HadoopFlowStep; 033import cascading.flow.hadoop.util.HadoopUtil; 034import cascading.flow.planner.BaseFlowStepFactory; 035import cascading.flow.planner.FlowPlanner; 036import cascading.flow.planner.PlannerInfo; 037import cascading.flow.planner.PlatformInfo; 038import cascading.flow.planner.graph.ElementGraph; 039import cascading.flow.planner.process.FlowNodeGraph; 040import cascading.flow.planner.process.FlowStepFactory; 041import cascading.flow.planner.rule.RuleRegistry; 042import cascading.flow.planner.rule.transformer.IntermediateTapElementFactory; 043import cascading.property.AppProps; 044import cascading.tap.Tap; 045import cascading.tap.hadoop.Hfs; 046import cascading.tap.hadoop.util.TempHfs; 047import cascading.util.Util; 048import org.apache.hadoop.conf.Configuration; 049import org.apache.hadoop.mapred.JobConf; 050import org.slf4j.Logger; 051import org.slf4j.LoggerFactory; 052 053/** 054 * Class HadoopPlanner is the core Hadoop MapReduce planner used by default through a {@link cascading.flow.FlowConnector} 055 * sub-class. 056 * <p/> 057 * Notes: 058 * <p/> 059 * <strong>Custom JobConf properties</strong><br/> 060 * A custom JobConf instance can be passed to this planner by calling {@link #copyJobConf(java.util.Map, org.apache.hadoop.mapred.JobConf)} 061 * on a map properties object before constructing a new {@link cascading.flow.FlowConnector} sub-class. 062 * <p/> 063 * A better practice would be to set Hadoop properties directly on the map properties object handed to the FlowConnector. 064 * All values in the map will be passed to a new default JobConf instance to be used as defaults for all resulting 065 * Flow instances. 066 * <p/> 067 * For example, {@code properties.set("mapred.child.java.opts","-Xmx512m");} would convince Hadoop 068 * to spawn all child jvms with a heap of 512MB. 069 */ 070public class HadoopPlanner extends FlowPlanner<HadoopFlow, JobConf> 071 { 072 /** Field LOG */ 073 private static final Logger LOG = LoggerFactory.getLogger( HadoopPlanner.class ); 074 075 public static final String PLATFORM_NAME = "hadoop"; 076 077 /** Field jobConf */ 078 private JobConf defaultJobConf; 079 /** Field intermediateSchemeClass */ 080 private Class intermediateSchemeClass; 081 082 /** 083 * Method copyJobConf adds the given JobConf values to the given properties object. Use this method to pass 084 * custom default Hadoop JobConf properties to Hadoop. 085 * 086 * @param properties of type Map 087 * @param jobConf of type JobConf 088 */ 089 public static void copyJobConf( Map<Object, Object> properties, JobConf jobConf ) 090 { 091 for( Map.Entry<String, String> entry : jobConf ) 092 properties.put( entry.getKey(), entry.getValue() ); 093 } 094 095 /** 096 * Method createJobConf returns a new JobConf instance using the values in the given properties argument. 097 * 098 * @param properties of type Map 099 * @return a JobConf instance 100 */ 101 public static JobConf createJobConf( Map<Object, Object> properties ) 102 { 103 JobConf conf = new JobConf(); 104 105 copyProperties( conf, properties ); 106 107 return conf; 108 } 109 110 /** 111 * Method copyProperties adds the given Map values to the given JobConf object. 112 * 113 * @param jobConf of type JobConf 114 * @param properties of type Map 115 */ 116 public static void copyProperties( JobConf jobConf, Map<Object, Object> properties ) 117 { 118 if( properties instanceof Properties ) 119 { 120 Properties props = (Properties) properties; 121 Set<String> keys = props.stringPropertyNames(); 122 123 for( String key : keys ) 124 jobConf.set( key, props.getProperty( key ) ); 125 } 126 else 127 { 128 for( Map.Entry<Object, Object> entry : properties.entrySet() ) 129 { 130 if( entry.getValue() != null ) 131 jobConf.set( entry.getKey().toString(), entry.getValue().toString() ); 132 } 133 } 134 } 135 136 @Override 137 public PlannerInfo getPlannerInfo( String registryName ) 138 { 139 return new PlannerInfo( getClass().getSimpleName(), PLATFORM_NAME, registryName ); 140 } 141 142 @Override 143 public JobConf getDefaultConfig() 144 { 145 return defaultJobConf; 146 } 147 148 @Override 149 public PlatformInfo getPlatformInfo() 150 { 151 return HadoopUtil.getPlatformInfo( JobConf.class, "org/apache/hadoop", "Hadoop MR" ); 152 } 153 154 @Override 155 public void initialize( FlowConnector flowConnector, Map<Object, Object> properties ) 156 { 157 super.initialize( flowConnector, properties ); 158 159 defaultJobConf = HadoopUtil.createJobConf( properties, createJobConf( properties ) ); 160 checkPlatform( defaultJobConf ); 161 intermediateSchemeClass = flowConnector.getIntermediateSchemeClass( properties ); 162 163 Class type = AppProps.getApplicationJarClass( properties ); 164 if( defaultJobConf.getJar() == null && type != null ) 165 defaultJobConf.setJarByClass( type ); 166 167 String path = AppProps.getApplicationJarPath( properties ); 168 if( defaultJobConf.getJar() == null && path != null ) 169 defaultJobConf.setJar( path ); 170 171 if( defaultJobConf.getJar() == null ) 172 defaultJobConf.setJarByClass( HadoopUtil.findMainClass( HadoopPlanner.class ) ); 173 174 AppProps.setApplicationJarPath( properties, defaultJobConf.getJar() ); 175 176 LOG.info( "using application jar: {}", defaultJobConf.getJar() ); 177 } 178 179 @Override 180 public void configRuleRegistryDefaults( RuleRegistry ruleRegistry ) 181 { 182 super.configRuleRegistryDefaults( ruleRegistry ); 183 184 ruleRegistry.addDefaultElementFactory( IntermediateTapElementFactory.TEMP_TAP, new TempTapElementFactory() ); 185 } 186 187 protected void checkPlatform( Configuration conf ) 188 { 189 if( HadoopUtil.isYARN( conf ) ) 190 LOG.warn( "running YARN based flows on Hadoop 1.x may cause problems, please use the 'cascading-hadoop2-mr1' dependencies" ); 191 } 192 193 @Override 194 protected HadoopFlow createFlow( FlowDef flowDef ) 195 { 196 return new HadoopFlow( getPlatformInfo(), getDefaultProperties(), getDefaultConfig(), flowDef ); 197 } 198 199 @Override 200 public FlowStepFactory<JobConf> getFlowStepFactory() 201 { 202 return new BaseFlowStepFactory<JobConf>( getFlowNodeFactory() ) 203 { 204 @Override 205 public FlowStep<JobConf> createFlowStep( ElementGraph stepElementGraph, FlowNodeGraph flowNodeGraph ) 206 { 207 return new HadoopFlowStep( stepElementGraph, flowNodeGraph ); 208 } 209 }; 210 } 211 212 public URI getDefaultURIScheme( Tap tap ) 213 { 214 return ( (Hfs) tap ).getDefaultFileSystemURIScheme( defaultJobConf ); 215 } 216 217 public URI getURIScheme( Tap tap ) 218 { 219 return ( (Hfs) tap ).getURIScheme( defaultJobConf ); 220 } 221 222 @Override 223 protected Tap makeTempTap( String prefix, String name ) 224 { 225 // must give Taps unique names 226 return new TempHfs( defaultJobConf, Util.makePath( prefix, name ), intermediateSchemeClass, prefix == null ); 227 } 228 }