001/* 002 * Copyright (c) 2016-2017 Chris K Wensel <chris@wensel.net>. All Rights Reserved. 003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 004 * 005 * Project and contact information: http://www.cascading.org/ 006 * 007 * This file is part of the Cascading project. 008 * 009 * Licensed under the Apache License, Version 2.0 (the "License"); 010 * you may not use this file except in compliance with the License. 011 * You may obtain a copy of the License at 012 * 013 * http://www.apache.org/licenses/LICENSE-2.0 014 * 015 * Unless required by applicable law or agreed to in writing, software 016 * distributed under the License is distributed on an "AS IS" BASIS, 017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 018 * See the License for the specific language governing permissions and 019 * limitations under the License. 020 */ 021 022package cascading.flow.tez.util; 023 024import java.io.File; 025import java.io.IOException; 026import java.util.Collection; 027import java.util.HashMap; 028import java.util.HashSet; 029import java.util.Map; 030import java.util.Properties; 031import java.util.Set; 032 033import cascading.CascadingException; 034import cascading.flow.FlowException; 035import cascading.flow.hadoop.util.HadoopUtil; 036import cascading.tap.type.FileType; 037import cascading.util.Util; 038import org.apache.hadoop.conf.Configuration; 039import org.apache.hadoop.fs.FileStatus; 040import org.apache.hadoop.fs.FileSystem; 041import org.apache.hadoop.fs.LocalFileSystem; 042import org.apache.hadoop.fs.Path; 043import org.apache.hadoop.mapred.JobConf; 044import org.apache.hadoop.mapred.JobContext; 045import org.apache.hadoop.mapred.TaskAttemptID; 046import org.apache.hadoop.security.UserGroupInformation; 047import org.apache.hadoop.yarn.api.records.LocalResource; 048import org.apache.hadoop.yarn.api.records.LocalResourceType; 049import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; 050import org.apache.hadoop.yarn.api.records.URL; 051import org.apache.hadoop.yarn.util.ConverterUtils; 052import org.apache.tez.common.TezUtils; 053import org.apache.tez.dag.api.TezConfiguration; 054import org.apache.tez.mapreduce.input.MRInput; 055import org.apache.tez.mapreduce.lib.MRReader; 056import org.apache.tez.mapreduce.output.MROutput; 057import org.apache.tez.runtime.api.AbstractLogicalInput; 058import org.apache.tez.runtime.api.AbstractLogicalOutput; 059import org.apache.tez.runtime.api.LogicalInput; 060import org.apache.tez.runtime.api.LogicalOutput; 061import org.apache.tez.runtime.api.MergedLogicalInput; 062import org.apache.tez.runtime.api.ProcessorContext; 063import org.slf4j.Logger; 064import org.slf4j.LoggerFactory; 065 066import static cascading.flow.hadoop.util.HadoopUtil.getCommonPaths; 067import static org.apache.hadoop.yarn.api.ApplicationConstants.CLASS_PATH_SEPARATOR; 068import static org.apache.hadoop.yarn.api.ApplicationConstants.Environment.CLASSPATH; 069import static org.apache.hadoop.yarn.api.ApplicationConstants.Environment.PWD; 070import static org.apache.tez.common.TezUtils.createConfFromByteString; 071import static org.apache.tez.common.TezUtils.createConfFromUserPayload; 072import static org.apache.tez.mapreduce.hadoop.MRInputHelpers.parseMRInputPayload; 073 074/** 075 * 076 */ 077public class TezUtil 078 { 079 private static final Logger LOG = LoggerFactory.getLogger( TezUtil.class ); 080 081 /** 082 * Attempting to localize all new JobConf calls 083 * 084 * @param configuration 085 * @return 086 */ 087 public static JobConf asJobConf( Configuration configuration ) 088 { 089 return new JobConf( configuration ); 090 } 091 092 public static TezConfiguration createTezConf( Map<Object, Object> properties, TezConfiguration defaultJobconf ) 093 { 094 TezConfiguration jobConf = defaultJobconf == null ? new TezConfiguration() : new TezConfiguration( defaultJobconf ); 095 096 if( properties == null ) 097 return jobConf; 098 099 Set<Object> keys = new HashSet<Object>( properties.keySet() ); 100 101 // keys will only be grabbed if both key/value are String, so keep orig keys 102 if( properties instanceof Properties ) 103 keys.addAll( ( (Properties) properties ).stringPropertyNames() ); 104 105 for( Object key : keys ) 106 { 107 Object value = properties.get( key ); 108 109 if( value == null && properties instanceof Properties && key instanceof String ) 110 value = ( (Properties) properties ).getProperty( (String) key ); 111 112 if( value == null ) // don't stuff null values 113 continue; 114 115 // don't let these objects pass, even though toString is called below. 116 if( value instanceof Class || value instanceof TezConfiguration ) 117 continue; 118 119 jobConf.set( key.toString(), value.toString() ); 120 } 121 122 return jobConf; 123 } 124 125 public static UserGroupInformation getCurrentUser() 126 { 127 try 128 { 129 return UserGroupInformation.getCurrentUser(); 130 } 131 catch( IOException exception ) 132 { 133 throw new CascadingException( "unable to get current user", exception ); 134 } 135 } 136 137 public static String getEdgeSourceID( LogicalInput input, Configuration configuration ) 138 { 139 String id = configuration.get( "cascading.node.source" ); 140 141 if( id == null ) 142 throw new IllegalStateException( "no source id found: " + input.getClass().getName() ); 143 144 return id; 145 } 146 147 public static String getEdgeSinkID( LogicalOutput output, Configuration configuration ) 148 { 149 String id = configuration.get( "cascading.node.sink" ); 150 151 if( id == null ) 152 throw new IllegalStateException( "no sink id found: " + output.getClass().getName() ); 153 154 return id; 155 } 156 157 public static Configuration getInputConfiguration( LogicalInput input ) 158 { 159 try 160 { 161 if( input instanceof MergedLogicalInput ) 162 input = (LogicalInput) Util.getFirst( ( (MergedLogicalInput) input ).getInputs() ); 163 164 if( input instanceof MRInput ) 165 return createConfFromByteString( parseMRInputPayload( ( (MRInput) input ).getContext().getUserPayload() ).getConfigurationBytes() ); 166 167 if( input instanceof AbstractLogicalInput ) 168 return createConfFromUserPayload( ( (AbstractLogicalInput) input ).getContext().getUserPayload() ); 169 } 170 catch( IOException exception ) 171 { 172 throw new FlowException( "unable to unpack payload", exception ); 173 } 174 175 throw new IllegalStateException( "unknown input type: " + input.getClass().getName() ); 176 } 177 178 public static Configuration getOutputConfiguration( LogicalOutput output ) 179 { 180 try 181 { 182 if( output instanceof MROutput ) 183 return TezUtils.createConfFromUserPayload( ( (MROutput) output ).getContext().getUserPayload() ); 184 185 if( output instanceof AbstractLogicalOutput ) 186 return createConfFromUserPayload( ( (AbstractLogicalOutput) output ).getContext().getUserPayload() ); 187 } 188 catch( IOException exception ) 189 { 190 throw new FlowException( "unable to unpack payload", exception ); 191 } 192 193 throw new IllegalStateException( "unknown input type: " + output.getClass().getName() ); 194 } 195 196 public static void setSourcePathForSplit( MRInput input, MRReader reader, Configuration configuration ) 197 { 198 Path path = null; 199 200 if( Util.returnInstanceFieldIfExistsSafe( input, "useNewApi" ) ) 201 { 202 org.apache.hadoop.mapreduce.InputSplit newInputSplit = (org.apache.hadoop.mapreduce.InputSplit) reader.getSplit(); 203 204 if( newInputSplit instanceof org.apache.hadoop.mapreduce.lib.input.FileSplit ) 205 path = ( (org.apache.hadoop.mapreduce.lib.input.FileSplit) newInputSplit ).getPath(); 206 } 207 else 208 { 209 org.apache.hadoop.mapred.InputSplit oldInputSplit = (org.apache.hadoop.mapred.InputSplit) reader.getSplit(); 210 211 if( oldInputSplit instanceof org.apache.hadoop.mapred.FileSplit ) 212 path = ( (org.apache.hadoop.mapred.FileSplit) oldInputSplit ).getPath(); 213 } 214 215 if( path != null ) 216 configuration.set( FileType.CASCADING_SOURCE_PATH, path.toString() ); 217 } 218 219 public static Map<Path, Path> addToClassPath( Configuration config, String stagingRoot, String resourceSubPath, Collection<String> classpath, 220 LocalResourceType resourceType, Map<String, LocalResource> localResources, 221 Map<String, String> environment ) 222 { 223 if( classpath == null ) 224 return null; 225 226 // given to fully qualified 227 Map<String, Path> localPaths = new HashMap<>(); 228 Map<String, Path> remotePaths = new HashMap<>(); 229 230 HadoopUtil.resolvePaths( config, classpath, stagingRoot, resourceSubPath, localPaths, remotePaths ); 231 232 try 233 { 234 LocalFileSystem localFS = HadoopUtil.getLocalFS( config ); 235 236 for( String fileName : localPaths.keySet() ) 237 { 238 Path artifact = localPaths.get( fileName ); 239 Path remotePath = remotePaths.get( fileName ); 240 241 if( remotePath == null ) 242 remotePath = artifact; 243 244 addResource( localResources, environment, fileName, localFS.getFileStatus( artifact ), remotePath, resourceType ); 245 } 246 247 FileSystem defaultFS = HadoopUtil.getDefaultFS( config ); 248 249 for( String fileName : remotePaths.keySet() ) 250 { 251 Path artifact = remotePaths.get( fileName ); 252 Path localPath = localPaths.get( fileName ); 253 254 if( localPath != null ) 255 continue; 256 257 addResource( localResources, environment, fileName, defaultFS.getFileStatus( artifact ), artifact, resourceType ); 258 } 259 } 260 catch( IOException exception ) 261 { 262 throw new FlowException( "unable to set remote resource paths", exception ); 263 } 264 265 return getCommonPaths( localPaths, remotePaths ); 266 } 267 268 protected static void addResource( Map<String, LocalResource> localResources, Map<String, String> environment, String fileName, FileStatus stats, Path fullPath, LocalResourceType type ) throws IOException 269 { 270 if( localResources.containsKey( fileName ) ) 271 throw new FlowException( "duplicate filename added to classpath resources: " + fileName ); 272 273 URL yarnUrlFromPath = ConverterUtils.getYarnUrlFromPath( fullPath ); 274 long len = stats.getLen(); 275 long modificationTime = stats.getModificationTime(); 276 277 LocalResource resource = LocalResource.newInstance( 278 yarnUrlFromPath, 279 type, 280 LocalResourceVisibility.APPLICATION, 281 len, 282 modificationTime ); 283 284 if( type == LocalResourceType.PATTERN ) 285 { 286 // todo: parametrize this for dynamic inclusion below 287 String pattern = "(?:classes/|lib/).*"; 288 289 resource.setPattern( pattern ); 290 291 if( environment != null ) 292 { 293 String current = ""; 294 295 current += PWD.$$() + File.separator + fileName + File.separator + "*" + CLASS_PATH_SEPARATOR; 296 current += PWD.$$() + File.separator + fileName + File.separator + "lib" + File.separator + "*" + CLASS_PATH_SEPARATOR; 297 current += PWD.$$() + File.separator + fileName + File.separator + "classes" + File.separator + "*" + CLASS_PATH_SEPARATOR; 298 299 String classPath = environment.get( CLASSPATH.name() ); 300 301 if( classPath == null ) 302 classPath = ""; 303 else if( !classPath.startsWith( CLASS_PATH_SEPARATOR ) ) 304 classPath += CLASS_PATH_SEPARATOR; 305 306 classPath += current; 307 308 LOG.info( "adding to cluster side classpath: {} ", classPath ); 309 310 environment.put( CLASSPATH.name(), classPath ); 311 } 312 } 313 314 localResources.put( fileName, resource ); 315 } 316 317 public static void setMRProperties( ProcessorContext context, Configuration config, boolean isMapperOutput ) 318 { 319 TaskAttemptID taskAttemptId = org.apache.tez.mapreduce.hadoop.mapreduce.TaskAttemptContextImpl 320 .createMockTaskAttemptID( context.getApplicationId().getClusterTimestamp(), 321 context.getTaskVertexIndex(), context.getApplicationId().getId(), 322 context.getTaskIndex(), context.getTaskAttemptNumber(), isMapperOutput ); 323 324 config.set( JobContext.TASK_ATTEMPT_ID, taskAttemptId.toString() ); 325 config.set( JobContext.TASK_ID, taskAttemptId.getTaskID().toString() ); 326 config.setBoolean( JobContext.TASK_ISMAP, isMapperOutput ); 327 config.setInt( JobContext.TASK_PARTITION, taskAttemptId.getTaskID().getId() ); 328 } 329 }