001/* 002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.flow.tez.util; 022 023import java.io.File; 024import java.io.IOException; 025import java.util.Collection; 026import java.util.HashMap; 027import java.util.HashSet; 028import java.util.Map; 029import java.util.Properties; 030import java.util.Set; 031 032import cascading.CascadingException; 033import cascading.flow.FlowException; 034import cascading.flow.hadoop.util.HadoopUtil; 035import cascading.tap.hadoop.io.MultiInputSplit; 036import cascading.util.Util; 037import org.apache.hadoop.conf.Configuration; 038import org.apache.hadoop.fs.FileStatus; 039import org.apache.hadoop.fs.FileSystem; 040import org.apache.hadoop.fs.LocalFileSystem; 041import org.apache.hadoop.fs.Path; 042import org.apache.hadoop.mapred.JobConf; 043import org.apache.hadoop.mapred.JobContext; 044import org.apache.hadoop.mapred.TaskAttemptID; 045import org.apache.hadoop.security.UserGroupInformation; 046import org.apache.hadoop.yarn.api.records.LocalResource; 047import org.apache.hadoop.yarn.api.records.LocalResourceType; 048import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; 049import org.apache.hadoop.yarn.api.records.URL; 050import org.apache.hadoop.yarn.util.ConverterUtils; 051import org.apache.tez.common.TezUtils; 052import org.apache.tez.dag.api.TezConfiguration; 053import org.apache.tez.mapreduce.input.MRInput; 054import org.apache.tez.mapreduce.lib.MRReader; 055import org.apache.tez.mapreduce.output.MROutput; 056import org.apache.tez.runtime.api.AbstractLogicalInput; 057import org.apache.tez.runtime.api.AbstractLogicalOutput; 058import org.apache.tez.runtime.api.LogicalInput; 059import org.apache.tez.runtime.api.LogicalOutput; 060import org.apache.tez.runtime.api.MergedLogicalInput; 061import org.apache.tez.runtime.api.ProcessorContext; 062import org.slf4j.Logger; 063import org.slf4j.LoggerFactory; 064 065import static cascading.flow.hadoop.util.HadoopUtil.getCommonPaths; 066import static org.apache.hadoop.yarn.api.ApplicationConstants.CLASS_PATH_SEPARATOR; 067import static org.apache.hadoop.yarn.api.ApplicationConstants.Environment.CLASSPATH; 068import static org.apache.hadoop.yarn.api.ApplicationConstants.Environment.PWD; 069import static org.apache.tez.common.TezUtils.createConfFromByteString; 070import static org.apache.tez.common.TezUtils.createConfFromUserPayload; 071import static org.apache.tez.mapreduce.hadoop.MRInputHelpers.parseMRInputPayload; 072 073/** 074 * 075 */ 076public class TezUtil 077 { 078 private static final Logger LOG = LoggerFactory.getLogger( TezUtil.class ); 079 080 /** 081 * Attempting to localize all new JobConf calls 082 * 083 * @param configuration 084 * @return 085 */ 086 public static JobConf asJobConf( Configuration configuration ) 087 { 088 return new JobConf( configuration ); 089 } 090 091 public static TezConfiguration createTezConf( Map<Object, Object> properties, TezConfiguration defaultJobconf ) 092 { 093 TezConfiguration jobConf = defaultJobconf == null ? new TezConfiguration() : new TezConfiguration( defaultJobconf ); 094 095 if( properties == null ) 096 return jobConf; 097 098 Set<Object> keys = new HashSet<Object>( properties.keySet() ); 099 100 // keys will only be grabbed if both key/value are String, so keep orig keys 101 if( properties instanceof Properties ) 102 keys.addAll( ( (Properties) properties ).stringPropertyNames() ); 103 104 for( Object key : keys ) 105 { 106 Object value = properties.get( key ); 107 108 if( value == null && properties instanceof Properties && key instanceof String ) 109 value = ( (Properties) properties ).getProperty( (String) key ); 110 111 if( value == null ) // don't stuff null values 112 continue; 113 114 // don't let these objects pass, even though toString is called below. 115 if( value instanceof Class || value instanceof TezConfiguration ) 116 continue; 117 118 jobConf.set( key.toString(), value.toString() ); 119 } 120 121 return jobConf; 122 } 123 124 public static UserGroupInformation getCurrentUser() 125 { 126 try 127 { 128 return UserGroupInformation.getCurrentUser(); 129 } 130 catch( IOException exception ) 131 { 132 throw new CascadingException( "unable to get current user", exception ); 133 } 134 } 135 136 public static String getEdgeSourceID( LogicalInput input, Configuration configuration ) 137 { 138 String id = configuration.get( "cascading.node.source" ); 139 140 if( id == null ) 141 throw new IllegalStateException( "no source id found: " + input.getClass().getName() ); 142 143 return id; 144 } 145 146 public static String getEdgeSinkID( LogicalOutput output, Configuration configuration ) 147 { 148 String id = configuration.get( "cascading.node.sink" ); 149 150 if( id == null ) 151 throw new IllegalStateException( "no sink id found: " + output.getClass().getName() ); 152 153 return id; 154 } 155 156 public static Configuration getInputConfiguration( LogicalInput input ) 157 { 158 try 159 { 160 if( input instanceof MergedLogicalInput ) 161 input = (LogicalInput) Util.getFirst( ( (MergedLogicalInput) input ).getInputs() ); 162 163 if( input instanceof MRInput ) 164 return createConfFromByteString( parseMRInputPayload( ( (MRInput) input ).getContext().getUserPayload() ).getConfigurationBytes() ); 165 166 if( input instanceof AbstractLogicalInput ) 167 return createConfFromUserPayload( ( (AbstractLogicalInput) input ).getContext().getUserPayload() ); 168 } 169 catch( IOException exception ) 170 { 171 throw new FlowException( "unable to unpack payload", exception ); 172 } 173 174 throw new IllegalStateException( "unknown input type: " + input.getClass().getName() ); 175 } 176 177 public static Configuration getOutputConfiguration( LogicalOutput output ) 178 { 179 try 180 { 181 if( output instanceof MROutput ) 182 return TezUtils.createConfFromUserPayload( ( (MROutput) output ).getContext().getUserPayload() ); 183 184 if( output instanceof AbstractLogicalOutput ) 185 return createConfFromUserPayload( ( (AbstractLogicalOutput) output ).getContext().getUserPayload() ); 186 } 187 catch( IOException exception ) 188 { 189 throw new FlowException( "unable to unpack payload", exception ); 190 } 191 192 throw new IllegalStateException( "unknown input type: " + output.getClass().getName() ); 193 } 194 195 public static void setSourcePathForSplit( MRInput input, MRReader reader, Configuration configuration ) 196 { 197 Path path = null; 198 199 if( Util.returnInstanceFieldIfExistsSafe( input, "useNewApi" ) ) 200 { 201 org.apache.hadoop.mapreduce.InputSplit newInputSplit = (org.apache.hadoop.mapreduce.InputSplit) reader.getSplit(); 202 203 if( newInputSplit instanceof org.apache.hadoop.mapreduce.lib.input.FileSplit ) 204 path = ( (org.apache.hadoop.mapreduce.lib.input.FileSplit) newInputSplit ).getPath(); 205 } 206 else 207 { 208 org.apache.hadoop.mapred.InputSplit oldInputSplit = (org.apache.hadoop.mapred.InputSplit) reader.getSplit(); 209 210 if( oldInputSplit instanceof org.apache.hadoop.mapred.FileSplit ) 211 path = ( (org.apache.hadoop.mapred.FileSplit) oldInputSplit ).getPath(); 212 } 213 214 if( path != null ) 215 configuration.set( MultiInputSplit.CASCADING_SOURCE_PATH, path.toString() ); 216 } 217 218 public static Map<Path, Path> addToClassPath( Configuration config, String stagingRoot, String resourceSubPath, Collection<String> classpath, 219 LocalResourceType resourceType, Map<String, LocalResource> localResources, 220 Map<String, String> environment ) 221 { 222 if( classpath == null ) 223 return null; 224 225 // given to fully qualified 226 Map<String, Path> localPaths = new HashMap<>(); 227 Map<String, Path> remotePaths = new HashMap<>(); 228 229 HadoopUtil.resolvePaths( config, classpath, stagingRoot, resourceSubPath, localPaths, remotePaths ); 230 231 try 232 { 233 LocalFileSystem localFS = HadoopUtil.getLocalFS( config ); 234 235 for( String fileName : localPaths.keySet() ) 236 { 237 Path artifact = localPaths.get( fileName ); 238 Path remotePath = remotePaths.get( fileName ); 239 240 if( remotePath == null ) 241 remotePath = artifact; 242 243 addResource( localResources, environment, fileName, localFS.getFileStatus( artifact ), remotePath, resourceType ); 244 } 245 246 FileSystem defaultFS = HadoopUtil.getDefaultFS( config ); 247 248 for( String fileName : remotePaths.keySet() ) 249 { 250 Path artifact = remotePaths.get( fileName ); 251 Path localPath = localPaths.get( fileName ); 252 253 if( localPath != null ) 254 continue; 255 256 addResource( localResources, environment, fileName, defaultFS.getFileStatus( artifact ), artifact, resourceType ); 257 } 258 } 259 catch( IOException exception ) 260 { 261 throw new FlowException( "unable to set remote resource paths", exception ); 262 } 263 264 return getCommonPaths( localPaths, remotePaths ); 265 } 266 267 protected static void addResource( Map<String, LocalResource> localResources, Map<String, String> environment, String fileName, FileStatus stats, Path fullPath, LocalResourceType type ) throws IOException 268 { 269 if( localResources.containsKey( fileName ) ) 270 throw new FlowException( "duplicate filename added to classpath resources: " + fileName ); 271 272 URL yarnUrlFromPath = ConverterUtils.getYarnUrlFromPath( fullPath ); 273 long len = stats.getLen(); 274 long modificationTime = stats.getModificationTime(); 275 276 LocalResource resource = LocalResource.newInstance( 277 yarnUrlFromPath, 278 type, 279 LocalResourceVisibility.APPLICATION, 280 len, 281 modificationTime ); 282 283 if( type == LocalResourceType.PATTERN ) 284 { 285 // todo: parametrize this for dynamic inclusion below 286 String pattern = "(?:classes/|lib/).*"; 287 288 resource.setPattern( pattern ); 289 290 if( environment != null ) 291 { 292 String current = ""; 293 294 current += PWD.$$() + File.separator + fileName + File.separator + "*" + CLASS_PATH_SEPARATOR; 295 current += PWD.$$() + File.separator + fileName + File.separator + "lib" + File.separator + "*" + CLASS_PATH_SEPARATOR; 296 current += PWD.$$() + File.separator + fileName + File.separator + "classes" + File.separator + "*" + CLASS_PATH_SEPARATOR; 297 298 String classPath = environment.get( CLASSPATH.name() ); 299 300 if( classPath == null ) 301 classPath = ""; 302 else if( !classPath.startsWith( CLASS_PATH_SEPARATOR ) ) 303 classPath += CLASS_PATH_SEPARATOR; 304 305 classPath += current; 306 307 LOG.info( "adding to cluster side classpath: {} ", classPath ); 308 309 environment.put( CLASSPATH.name(), classPath ); 310 } 311 } 312 313 localResources.put( fileName, resource ); 314 } 315 316 public static void setMRProperties( ProcessorContext context, Configuration config, boolean isMapperOutput ) 317 { 318 TaskAttemptID taskAttemptId = org.apache.tez.mapreduce.hadoop.mapreduce.TaskAttemptContextImpl 319 .createMockTaskAttemptID( context.getApplicationId().getClusterTimestamp(), 320 context.getTaskVertexIndex(), context.getApplicationId().getId(), 321 context.getTaskIndex(), context.getTaskAttemptNumber(), isMapperOutput ); 322 323 config.set( JobContext.TASK_ATTEMPT_ID, taskAttemptId.toString() ); 324 config.set( JobContext.TASK_ID, taskAttemptId.getTaskID().toString() ); 325 config.setBoolean( JobContext.TASK_ISMAP, isMapperOutput ); 326 config.setInt( JobContext.TASK_PARTITION, taskAttemptId.getTaskID().getId() ); 327 } 328 }