001/* 002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.flow.hadoop.util; 022 023import java.io.IOException; 024import java.io.UnsupportedEncodingException; 025import java.lang.reflect.Constructor; 026import java.lang.reflect.Field; 027import java.lang.reflect.InvocationTargetException; 028import java.net.URI; 029import java.net.URL; 030import java.util.Collection; 031import java.util.Collections; 032import java.util.HashMap; 033import java.util.HashSet; 034import java.util.Iterator; 035import java.util.List; 036import java.util.Map; 037import java.util.Properties; 038import java.util.Set; 039import java.util.jar.Attributes; 040import java.util.jar.Manifest; 041 042import cascading.CascadingException; 043import cascading.flow.FlowException; 044import cascading.flow.planner.BaseFlowStep; 045import cascading.flow.planner.PlatformInfo; 046import cascading.flow.planner.Scope; 047import cascading.pipe.Group; 048import cascading.scheme.hadoop.TextLine; 049import cascading.tap.hadoop.Hfs; 050import cascading.tuple.Fields; 051import cascading.util.LogUtil; 052import cascading.util.Util; 053import org.apache.commons.codec.binary.Base64; 054import org.apache.hadoop.conf.Configurable; 055import org.apache.hadoop.conf.Configuration; 056import org.apache.hadoop.fs.FileStatus; 057import org.apache.hadoop.fs.FileSystem; 058import org.apache.hadoop.fs.LocalFileSystem; 059import org.apache.hadoop.fs.Path; 060import org.apache.hadoop.mapred.JobConf; 061import org.apache.hadoop.util.StringUtils; 062import org.slf4j.Logger; 063import org.slf4j.LoggerFactory; 064 065import static cascading.util.Util.invokeInstanceMethod; 066 067/** 068 * 069 */ 070public class HadoopUtil 071 { 072 public static final String CASCADING_FLOW_EXECUTING = "cascading.flow.executing"; 073 074 private static final Logger LOG = LoggerFactory.getLogger( HadoopUtil.class ); 075 private static final String ENCODING = "US-ASCII"; 076 private static final Class<?> DEFAULT_OBJECT_SERIALIZER = JavaObjectSerializer.class; 077 078 private static PlatformInfo platformInfo; 079 080 public static void setIsInflow( Configuration conf ) 081 { 082 conf.setBoolean( CASCADING_FLOW_EXECUTING, true ); 083 } 084 085 public static boolean isInflow( Configuration conf ) 086 { 087 return conf.getBoolean( CASCADING_FLOW_EXECUTING, false ); 088 } 089 090 public static void initLog4j( JobConf configuration ) 091 { 092 initLog4j( (Configuration) configuration ); 093 } 094 095 public static void initLog4j( Configuration configuration ) 096 { 097 String values = configuration.get( "log4j.logger", null ); 098 099 if( values == null || values.length() == 0 ) 100 return; 101 102 if( !Util.hasClass( "org.apache.log4j.Logger" ) ) 103 { 104 LOG.info( "org.apache.log4j.Logger is not in the current CLASSPATH, not setting log4j.logger properties" ); 105 return; 106 } 107 108 String[] elements = values.split( "," ); 109 110 for( String element : elements ) 111 LogUtil.setLog4jLevel( element.split( "=" ) ); 112 } 113 114 // only place JobConf should ever be returned 115 public static JobConf asJobConfInstance( Configuration configuration ) 116 { 117 if( configuration instanceof JobConf ) 118 return (JobConf) configuration; 119 120 return new JobConf( configuration ); 121 } 122 123 public static <C> C copyJobConf( C parentJobConf ) 124 { 125 return copyConfiguration( parentJobConf ); 126 } 127 128 public static JobConf copyJobConf( JobConf parentJobConf ) 129 { 130 if( parentJobConf == null ) 131 throw new IllegalArgumentException( "parent may not be null" ); 132 133 // see https://github.com/Cascading/cascading/pull/21 134 // The JobConf(JobConf) constructor causes derived JobConfs to share Credentials. We want to avoid this, in 135 // case those Credentials are mutated later on down the road (which they will be, during job submission, in 136 // separate threads!). Using the JobConf(Configuration) constructor avoids Credentials-sharing. 137 final Configuration configurationCopy = new Configuration( parentJobConf ); 138 final JobConf jobConf = new JobConf( configurationCopy ); 139 140 jobConf.getCredentials().addAll( parentJobConf.getCredentials() ); 141 142 return jobConf; 143 } 144 145 public static JobConf createJobConf( Map<Object, Object> properties, JobConf defaultJobconf ) 146 { 147 JobConf jobConf = defaultJobconf == null ? new JobConf() : copyJobConf( defaultJobconf ); 148 149 if( properties == null ) 150 return jobConf; 151 152 return copyConfiguration( properties, jobConf ); 153 } 154 155 public static <C> C copyConfiguration( C parent ) 156 { 157 if( parent == null ) 158 throw new IllegalArgumentException( "parent may not be null" ); 159 160 if( !( parent instanceof Configuration ) ) 161 throw new IllegalArgumentException( "parent must be of type Configuration" ); 162 163 Configuration conf = (Configuration) parent; 164 165 // see https://github.com/Cascading/cascading/pull/21 166 // The JobConf(JobConf) constructor causes derived JobConfs to share Credentials. We want to avoid this, in 167 // case those Credentials are mutated later on down the road (which they will be, during job submission, in 168 // separate threads!). Using the JobConf(Configuration) constructor avoids Credentials-sharing. 169 Configuration configurationCopy = new Configuration( conf ); 170 171 Configuration copiedConf = callCopyConstructor( parent.getClass(), configurationCopy ); 172 173 if( Util.hasInstanceMethod( parent, "getCredentials", null ) ) 174 { 175 Object result = invokeInstanceMethod( parent, "getCredentials", null, null ); 176 Object credentials = invokeInstanceMethod( copiedConf, "getCredentials", null, null ); 177 178 invokeInstanceMethod( credentials, "addAll", new Object[]{result}, new Class[]{credentials.getClass()} ); 179 } 180 181 return (C) copiedConf; 182 } 183 184 protected static <C extends Configuration> C callCopyConstructor( Class type, Configuration parent ) 185 { 186 try 187 { 188 Constructor<C> constructor = type.getConstructor( parent.getClass() ); 189 190 return constructor.newInstance( parent ); 191 } 192 catch( NoSuchMethodException | InvocationTargetException | InstantiationException | IllegalAccessException exception ) 193 { 194 throw new CascadingException( "unable to create copy of: " + type ); 195 } 196 } 197 198 public static <C extends Configuration> C copyConfiguration( Map<Object, Object> srcProperties, C dstConfiguration ) 199 { 200 Set<Object> keys = new HashSet<Object>( srcProperties.keySet() ); 201 202 // keys will only be grabbed if both key/value are String, so keep orig keys 203 if( srcProperties instanceof Properties ) 204 keys.addAll( ( (Properties) srcProperties ).stringPropertyNames() ); 205 206 for( Object key : keys ) 207 { 208 Object value = srcProperties.get( key ); 209 210 if( value == null && srcProperties instanceof Properties && key instanceof String ) 211 value = ( (Properties) srcProperties ).getProperty( (String) key ); 212 213 if( value == null ) // don't stuff null values 214 continue; 215 216 // don't let these objects pass, even though toString is called below. 217 if( value instanceof Class || value instanceof JobConf ) 218 continue; 219 220 dstConfiguration.set( key.toString(), value.toString() ); 221 } 222 223 return dstConfiguration; 224 } 225 226 public static Map<Object, Object> createProperties( Configuration jobConf ) 227 { 228 Map<Object, Object> properties = new HashMap<Object, Object>(); 229 230 if( jobConf == null ) 231 return properties; 232 233 for( Map.Entry<String, String> entry : jobConf ) 234 properties.put( entry.getKey(), entry.getValue() ); 235 236 return properties; 237 } 238 239 public static Thread getHDFSShutdownHook() 240 { 241 Exception caughtException; 242 243 try 244 { 245 // we must init the FS so the finalizer is registered 246 FileSystem.getLocal( new JobConf() ); 247 248 Field field = FileSystem.class.getDeclaredField( "clientFinalizer" ); 249 field.setAccessible( true ); 250 251 Thread finalizer = (Thread) field.get( null ); 252 253 if( finalizer != null ) 254 Runtime.getRuntime().removeShutdownHook( finalizer ); 255 256 return finalizer; 257 } 258 catch( NoSuchFieldException exception ) 259 { 260 caughtException = exception; 261 } 262 catch( IllegalAccessException exception ) 263 { 264 caughtException = exception; 265 } 266 catch( IOException exception ) 267 { 268 caughtException = exception; 269 } 270 271 LOG.debug( "unable to find and remove client hdfs shutdown hook, received exception: {}", caughtException.getClass().getName() ); 272 273 return null; 274 } 275 276 public static String encodeBytes( byte[] bytes ) 277 { 278 try 279 { 280 return new String( Base64.encodeBase64( bytes ), ENCODING ); 281 } 282 catch( UnsupportedEncodingException exception ) 283 { 284 throw new RuntimeException( exception ); 285 } 286 } 287 288 public static byte[] decodeBytes( String string ) 289 { 290 try 291 { 292 byte[] bytes = string.getBytes( ENCODING ); 293 return Base64.decodeBase64( bytes ); 294 } 295 catch( UnsupportedEncodingException exception ) 296 { 297 throw new RuntimeException( exception ); 298 } 299 } 300 301 public static <T> ObjectSerializer instantiateSerializer( Configuration conf, Class<T> type ) throws ClassNotFoundException 302 { 303 Class<ObjectSerializer> flowSerializerClass; 304 305 String serializerClassName = conf.get( ObjectSerializer.OBJECT_SERIALIZER_PROPERTY ); 306 307 if( serializerClassName == null || serializerClassName.length() == 0 ) 308 flowSerializerClass = (Class<ObjectSerializer>) DEFAULT_OBJECT_SERIALIZER; 309 else 310 flowSerializerClass = (Class<ObjectSerializer>) Class.forName( serializerClassName ); 311 312 ObjectSerializer objectSerializer; 313 314 try 315 { 316 objectSerializer = flowSerializerClass.newInstance(); 317 318 if( objectSerializer instanceof Configurable ) 319 ( (Configurable) objectSerializer ).setConf( conf ); 320 } 321 catch( Exception exception ) 322 { 323 exception.printStackTrace(); 324 throw new IllegalArgumentException( "Unable to instantiate serializer \"" 325 + flowSerializerClass.getName() 326 + "\" for class: " 327 + type.getName() ); 328 } 329 330 if( !objectSerializer.accepts( type ) ) 331 throw new IllegalArgumentException( serializerClassName + " won't accept objects of class " + type.toString() ); 332 333 return objectSerializer; 334 } 335 336 public static <T> String serializeBase64( T object, Configuration conf ) throws IOException 337 { 338 return serializeBase64( object, conf, true ); 339 } 340 341 public static <T> String serializeBase64( T object, Configuration conf, boolean compress ) throws IOException 342 { 343 ObjectSerializer objectSerializer; 344 345 try 346 { 347 objectSerializer = instantiateSerializer( conf, object.getClass() ); 348 } 349 catch( ClassNotFoundException exception ) 350 { 351 throw new IOException( exception ); 352 } 353 354 return encodeBytes( objectSerializer.serialize( object, compress ) ); 355 } 356 357 /** 358 * This method deserializes the Base64 encoded String into an Object instance. 359 * 360 * @param string 361 * @return an Object 362 */ 363 public static <T> T deserializeBase64( String string, Configuration conf, Class<T> type ) throws IOException 364 { 365 return deserializeBase64( string, conf, type, true ); 366 } 367 368 public static <T> T deserializeBase64( String string, Configuration conf, Class<T> type, boolean decompress ) throws IOException 369 { 370 if( string == null || string.length() == 0 ) 371 return null; 372 373 ObjectSerializer objectSerializer; 374 375 try 376 { 377 objectSerializer = instantiateSerializer( conf, type ); 378 } 379 catch( ClassNotFoundException exception ) 380 { 381 throw new IOException( exception ); 382 } 383 384 return objectSerializer.deserialize( decodeBytes( string ), type, decompress ); 385 } 386 387 public static Class findMainClass( Class defaultType ) 388 { 389 return Util.findMainClass( defaultType, "org.apache.hadoop" ); 390 } 391 392 public static Map<String, String> getConfig( Configuration defaultConf, Configuration updatedConf ) 393 { 394 Map<String, String> configs = new HashMap<String, String>(); 395 396 for( Map.Entry<String, String> entry : updatedConf ) 397 configs.put( entry.getKey(), entry.getValue() ); 398 399 for( Map.Entry<String, String> entry : defaultConf ) 400 { 401 if( entry.getValue() == null ) 402 continue; 403 404 String updatedValue = configs.get( entry.getKey() ); 405 406 // if both null, lets purge from map to save space 407 if( updatedValue == null && entry.getValue() == null ) 408 configs.remove( entry.getKey() ); 409 410 // if the values are the same, lets also purge from map to save space 411 if( updatedValue != null && updatedValue.equals( entry.getValue() ) ) 412 configs.remove( entry.getKey() ); 413 414 configs.remove( "mapred.working.dir" ); 415 configs.remove( "mapreduce.job.working.dir" ); // hadoop2 416 } 417 418 return configs; 419 } 420 421 public static JobConf[] getJobConfs( Configuration job, List<Map<String, String>> configs ) 422 { 423 JobConf[] jobConfs = new JobConf[ configs.size() ]; 424 425 for( int i = 0; i < jobConfs.length; i++ ) 426 jobConfs[ i ] = (JobConf) mergeConf( job, configs.get( i ), false ); 427 428 return jobConfs; 429 } 430 431 public static <J extends Configuration> J mergeConf( J job, Map<String, String> config, boolean directly ) 432 { 433 Configuration currentConf = directly ? job : ( job instanceof JobConf ? copyJobConf( (JobConf) job ) : new Configuration( job ) ); 434 435 for( String key : config.keySet() ) 436 { 437 LOG.debug( "merging key: {} value: {}", key, config.get( key ) ); 438 439 currentConf.set( key, config.get( key ) ); 440 } 441 442 return (J) currentConf; 443 } 444 445 public static Configuration removePropertiesFrom( Configuration jobConf, String... keys ) 446 { 447 Map<Object, Object> properties = createProperties( jobConf ); 448 449 for( String key : keys ) 450 properties.remove( key ); 451 452 return copyConfiguration( properties, new JobConf() ); 453 } 454 455 public static boolean removeStateFromDistCache( Configuration conf, String path ) throws IOException 456 { 457 return new Hfs( new TextLine(), path ).deleteResource( conf ); 458 } 459 460 public static PlatformInfo getPlatformInfo() 461 { 462 if( platformInfo == null ) 463 platformInfo = getPlatformInfoInternal( JobConf.class, "org/apache/hadoop", "Hadoop" ); 464 465 return platformInfo; 466 } 467 468 public static PlatformInfo getPlatformInfo( Class type, String attributePath, String platformName ) 469 { 470 if( platformInfo == null ) 471 platformInfo = getPlatformInfoInternal( type, attributePath, platformName ); 472 473 return platformInfo; 474 } 475 476 public static PlatformInfo createPlatformInfo( Class type, String attributePath, String platformName ) 477 { 478 return getPlatformInfoInternal( type, attributePath, platformName ); 479 } 480 481 private static PlatformInfo getPlatformInfoInternal( Class type, String attributePath, String platformName ) 482 { 483 URL url = type.getResource( type.getSimpleName() + ".class" ); 484 485 if( url == null || !url.toString().startsWith( "jar" ) ) 486 return new PlatformInfo( platformName, null, null ); 487 488 String path = url.toString(); 489 path = path.substring( 0, path.lastIndexOf( "!" ) + 1 ); 490 491 String manifestPath = path + "/META-INF/MANIFEST.MF"; 492 String parsedVersion = Util.findVersion( path.substring( 0, path.length() - 1 ) ); 493 494 Manifest manifest; 495 496 try 497 { 498 manifest = new Manifest( new URL( manifestPath ).openStream() ); 499 } 500 catch( IOException exception ) 501 { 502 LOG.warn( "unable to get manifest from {}: {}", manifestPath, exception.getMessage() ); 503 504 return new PlatformInfo( platformName, null, parsedVersion ); 505 } 506 507 Attributes attributes = manifest.getAttributes( attributePath ); 508 509 if( attributes == null ) 510 attributes = manifest.getMainAttributes(); 511 512 if( attributes == null ) 513 { 514 LOG.debug( "unable to get platform manifest attributes" ); 515 return new PlatformInfo( platformName, null, parsedVersion ); 516 } 517 518 String vendor = attributes.getValue( "Implementation-Vendor" ); 519 String version = attributes.getValue( "Implementation-Version" ); 520 521 if( Util.isEmpty( version ) ) 522 version = parsedVersion; 523 524 return new PlatformInfo( platformName, vendor, version ); 525 } 526 527 /** 528 * Copies paths from one local path to a remote path. If syncTimes is true, both modification and access time are 529 * changed to match the local 'from' path. 530 * <p/> 531 * Returns a map of file-name to remote modification times if the remote time is different than the local time. 532 * 533 * @param config 534 * @param commonPaths 535 * @param syncTimes 536 */ 537 public static Map<String, Long> syncPaths( Configuration config, Map<Path, Path> commonPaths, boolean syncTimes ) 538 { 539 if( commonPaths == null ) 540 return Collections.emptyMap(); 541 542 Map<String, Long> timestampMap = new HashMap<>(); 543 544 Map<Path, Path> copyPaths = getCopyPaths( config, commonPaths ); // tests remote file existence or if stale 545 546 LocalFileSystem localFS = getLocalFS( config ); 547 FileSystem remoteFS = getDefaultFS( config ); 548 549 for( Map.Entry<Path, Path> entry : copyPaths.entrySet() ) 550 { 551 Path localPath = entry.getKey(); 552 Path remotePath = entry.getValue(); 553 554 try 555 { 556 LOG.info( "copying from: {}, to: {}", localPath, remotePath ); 557 remoteFS.copyFromLocalFile( localPath, remotePath ); 558 559 if( !syncTimes ) 560 { 561 timestampMap.put( remotePath.getName(), remoteFS.getFileStatus( remotePath ).getModificationTime() ); 562 continue; 563 } 564 } 565 catch( IOException exception ) 566 { 567 throw new FlowException( "unable to copy local: " + localPath + " to remote: " + remotePath, exception ); 568 } 569 570 FileStatus localFileStatus = null; 571 572 try 573 { 574 // sync the modified times so we can lazily upload jars to hdfs after job is started 575 // otherwise modified time will be local to hdfs 576 localFileStatus = localFS.getFileStatus( localPath ); 577 remoteFS.setTimes( remotePath, localFileStatus.getModificationTime(), -1 ); // don't set the access time 578 } 579 catch( IOException exception ) 580 { 581 LOG.info( "unable to set local modification time on remote file: {}, 'dfs.namenode.accesstime.precision' may be set to 0 on HDFS.", remotePath ); 582 583 if( localFileStatus != null ) 584 timestampMap.put( remotePath.getName(), localFileStatus.getModificationTime() ); 585 } 586 } 587 588 return timestampMap; 589 } 590 591 public static Map<Path, Path> getCommonPaths( Map<String, Path> localPaths, Map<String, Path> remotePaths ) 592 { 593 Map<Path, Path> commonPaths = new HashMap<Path, Path>(); 594 595 for( Map.Entry<String, Path> entry : localPaths.entrySet() ) 596 { 597 if( remotePaths.containsKey( entry.getKey() ) ) 598 commonPaths.put( entry.getValue(), remotePaths.get( entry.getKey() ) ); 599 } 600 601 return commonPaths; 602 } 603 604 private static Map<Path, Path> getCopyPaths( Configuration config, Map<Path, Path> commonPaths ) 605 { 606 Map<Path, Path> copyPaths = new HashMap<Path, Path>(); 607 608 FileSystem remoteFS = getDefaultFS( config ); 609 FileSystem localFS = getLocalFS( config ); 610 611 for( Map.Entry<Path, Path> entry : commonPaths.entrySet() ) 612 { 613 Path localPath = entry.getKey(); 614 Path remotePath = entry.getValue(); 615 616 try 617 { 618 boolean localExists = localFS.exists( localPath ); 619 boolean remoteExist = remoteFS.exists( remotePath ); 620 621 if( localExists && !remoteExist ) 622 { 623 copyPaths.put( localPath, remotePath ); 624 } 625 else if( localExists ) 626 { 627 long localModTime = localFS.getFileStatus( localPath ).getModificationTime(); 628 long remoteModTime = remoteFS.getFileStatus( remotePath ).getModificationTime(); 629 630 if( localModTime > remoteModTime ) 631 copyPaths.put( localPath, remotePath ); 632 } 633 } 634 catch( IOException exception ) 635 { 636 throw new FlowException( "unable to get handle to underlying filesystem", exception ); 637 } 638 } 639 640 return copyPaths; 641 } 642 643 public static void resolvePaths( Configuration config, Collection<String> classpath, String remoteRoot, String resourceSubPath, Map<String, Path> localPaths, Map<String, Path> remotePaths ) 644 { 645 FileSystem defaultFS = getDefaultFS( config ); 646 FileSystem localFS = getLocalFS( config ); 647 648 Path remoteRootPath = new Path( remoteRoot == null ? "./.staging" : remoteRoot ); 649 650 if( resourceSubPath != null ) 651 remoteRootPath = new Path( remoteRootPath, resourceSubPath ); 652 653 remoteRootPath = defaultFS.makeQualified( remoteRootPath ); 654 655 boolean defaultIsLocal = defaultFS.equals( localFS ); 656 657 for( String stringPath : classpath ) 658 { 659 Path path = new Path( stringPath ); 660 661 URI uri = path.toUri(); 662 663 if( uri.getScheme() == null && !defaultIsLocal ) // we want to sync 664 { 665 Path localPath = localFS.makeQualified( path ); 666 667 if( !exists( localFS, localPath ) ) 668 throw new FlowException( "path not found: " + localPath ); 669 670 String name = localPath.getName(); 671 672 if( resourceSubPath != null ) 673 name = resourceSubPath + "/" + name; 674 675 localPaths.put( name, localPath ); 676 remotePaths.put( name, defaultFS.makeQualified( new Path( remoteRootPath, path.getName() ) ) ); 677 } 678 else if( localFS.equals( getFileSystem( config, path ) ) ) 679 { 680 if( !exists( localFS, path ) ) 681 throw new FlowException( "path not found: " + path ); 682 683 Path localPath = localFS.makeQualified( path ); 684 685 String name = localPath.getName(); 686 687 if( resourceSubPath != null ) 688 name = resourceSubPath + "/" + name; 689 690 localPaths.put( name, localPath ); 691 } 692 else 693 { 694 if( !exists( defaultFS, path ) ) 695 throw new FlowException( "path not found: " + path ); 696 697 Path defaultPath = defaultFS.makeQualified( path ); 698 699 String name = defaultPath.getName(); 700 701 if( resourceSubPath != null ) 702 name = resourceSubPath + "/" + name; 703 704 remotePaths.put( name, defaultPath ); 705 } 706 } 707 } 708 709 private static boolean exists( FileSystem fileSystem, Path path ) 710 { 711 try 712 { 713 return fileSystem.exists( path ); 714 } 715 catch( IOException exception ) 716 { 717 throw new FlowException( "could not test file exists: " + path ); 718 } 719 } 720 721 private static FileSystem getFileSystem( Configuration config, Path path ) 722 { 723 try 724 { 725 return path.getFileSystem( config ); 726 } 727 catch( IOException exception ) 728 { 729 throw new FlowException( "unable to get handle to underlying filesystem", exception ); 730 } 731 } 732 733 public static LocalFileSystem getLocalFS( Configuration config ) 734 { 735 try 736 { 737 return FileSystem.getLocal( config ); 738 } 739 catch( IOException exception ) 740 { 741 throw new FlowException( "unable to get handle to underlying filesystem", exception ); 742 } 743 } 744 745 public static FileSystem getDefaultFS( Configuration config ) 746 { 747 try 748 { 749 return FileSystem.get( config ); 750 } 751 catch( IOException exception ) 752 { 753 throw new FlowException( "unable to get handle to underlying filesystem", exception ); 754 } 755 } 756 757 public static boolean isLocal( Configuration conf ) 758 { 759 // hadoop 1.0 and 2.0 use different properties to define local mode: we check the new YARN 760 // property first 761 String frameworkName = conf.get( "mapreduce.framework.name" ); 762 763 // we are running on hadoop 2.0 (YARN) 764 if( frameworkName != null ) 765 return frameworkName.equals( "local" ); 766 767 // for Tez 768 String tezLocal = conf.get( "tez.local.mode" ); 769 770 if( tezLocal != null ) 771 return tezLocal.equals( "true" ); 772 773 // hadoop 1.0: use the old property to determine the local mode 774 String hadoop1 = conf.get( "mapred.job.tracker" ); 775 776 if( hadoop1 == null ) 777 { 778 LOG.warn( "could not successfully test if Hadoop based platform is in standalone/local mode, no valid properties set, returning false - tests for: mapreduce.framework.name, tez.local.mode, and mapred.job.tracker" ); 779 return false; 780 } 781 782 return hadoop1.equals( "local" ); 783 } 784 785 public static boolean isYARN( Configuration conf ) 786 { 787 return conf.get( "mapreduce.framework.name" ) != null; 788 } 789 790 public static void setLocal( Configuration conf ) 791 { 792 // set both properties to local 793 conf.set( "mapred.job.tracker", "local" ); 794 795 // yarn 796 conf.set( "mapreduce.framework.name", "local" ); 797 798 // tez 799 conf.set( "tez.local.mode", "true" ); 800 conf.set( "tez.runtime.optimize.local.fetch", "true" ); 801 } 802 803 private static boolean interfaceAssignableFromClassName( Class<?> xface, String className ) 804 { 805 if( ( className == null ) || ( xface == null ) ) 806 return false; 807 808 try 809 { 810 Class<?> klass = Class.forName( className ); 811 if( klass == null ) 812 return false; 813 814 if( !xface.isAssignableFrom( klass ) ) 815 return false; 816 817 return true; 818 } 819 catch( ClassNotFoundException cnfe ) 820 { 821 return false; // let downstream figure it out 822 } 823 } 824 825 public static boolean setNewApi( Configuration conf, String className ) 826 { 827 if( className == null ) // silently return and let the error be caught downstream 828 return false; 829 830 boolean isStable = className.startsWith( "org.apache.hadoop.mapred." ) 831 || interfaceAssignableFromClassName( org.apache.hadoop.mapred.InputFormat.class, className ); 832 833 boolean isNew = className.startsWith( "org.apache.hadoop.mapreduce." ) 834 || interfaceAssignableFromClassName( org.apache.hadoop.mapreduce.InputFormat.class, className ); 835 836 if( isStable ) 837 conf.setBoolean( "mapred.mapper.new-api", false ); 838 else if( isNew ) 839 conf.setBoolean( "mapred.mapper.new-api", true ); 840 else 841 throw new IllegalStateException( "cannot determine if class denotes stable or new api, please set 'mapred.mapper.new-api' to the appropriate value" ); 842 843 return true; 844 } 845 846 public static void addInputPath( Configuration conf, Path path ) 847 { 848 Path workingDirectory = getWorkingDirectory( conf ); 849 path = new Path( workingDirectory, path ); 850 String dirStr = StringUtils.escapeString( path.toString() ); 851 String dirs = conf.get( "mapred.input.dir" ); 852 conf.set( "mapred.input.dir", dirs == null ? dirStr : 853 dirs + StringUtils.COMMA_STR + dirStr ); 854 } 855 856 public static void setOutputPath( Configuration conf, Path path ) 857 { 858 Path workingDirectory = getWorkingDirectory( conf ); 859 path = new Path( workingDirectory, path ); 860 conf.set( "mapred.output.dir", path.toString() ); 861 } 862 863 private static Path getWorkingDirectory( Configuration conf ) 864 { 865 String name = conf.get( "mapred.working.dir" ); 866 if( name != null ) 867 { 868 return new Path( name ); 869 } 870 else 871 { 872 try 873 { 874 Path dir = FileSystem.get( conf ).getWorkingDirectory(); 875 conf.set( "mapred.working.dir", dir.toString() ); 876 return dir; 877 } 878 catch( IOException e ) 879 { 880 throw new RuntimeException( e ); 881 } 882 } 883 } 884 885 public static Path getOutputPath( Configuration conf ) 886 { 887 String name = conf.get( "mapred.output.dir" ); 888 return name == null ? null : new Path( name ); 889 } 890 891 public static String pack( Object object, Configuration conf ) 892 { 893 if( object == null ) 894 return ""; 895 896 try 897 { 898 return serializeBase64( object, conf, true ); 899 } 900 catch( IOException exception ) 901 { 902 throw new FlowException( "unable to pack object: " + object.getClass().getCanonicalName(), exception ); 903 } 904 } 905 906 public static void addFields( Configuration conf, String property, Map<Integer, Fields> fields ) 907 { 908 if( fields == null || fields.isEmpty() ) 909 return; 910 911 Map<String, Fields> toPack = new HashMap<>(); 912 913 for( Map.Entry<Integer, Fields> entry : fields.entrySet() ) 914 toPack.put( entry.getKey().toString(), entry.getValue() ); 915 916 conf.set( property, pack( toPack, conf ) ); 917 } 918 919 public static Map<Integer, Fields> getFields( Configuration conf, String property ) throws IOException 920 { 921 String value = conf.getRaw( property ); 922 923 if( value == null || value.isEmpty() ) 924 return Collections.emptyMap(); 925 926 Map<String, Fields> map = deserializeBase64( value, conf, Map.class, true ); 927 Map<Integer, Fields> result = new HashMap<>(); 928 929 for( Map.Entry<String, Fields> entry : map.entrySet() ) 930 result.put( Integer.parseInt( entry.getKey() ), entry.getValue() ); 931 932 return result; 933 } 934 935 public static void addComparators( Configuration conf, String property, Map<String, Fields> map, BaseFlowStep flowStep, Group group ) 936 { 937 Iterator<Fields> fieldsIterator = map.values().iterator(); 938 939 if( !fieldsIterator.hasNext() ) 940 return; 941 942 Fields fields = fieldsIterator.next(); 943 944 if( fields.hasComparators() ) 945 { 946 conf.set( property, pack( fields, conf ) ); 947 return; 948 } 949 950 // use resolved fields if there are no comparators. 951 Set<Scope> previousScopes = flowStep.getPreviousScopes( group ); 952 953 fields = previousScopes.iterator().next().getOutValuesFields(); 954 955 if( fields.size() != 0 ) // allows fields.UNKNOWN to be used 956 conf.setInt( property + ".size", fields.size() ); 957 } 958 959 public static void addComparators( Configuration conf, String property, Map<String, Fields> map, Fields resolvedFields ) 960 { 961 Iterator<Fields> fieldsIterator = map.values().iterator(); 962 963 if( !fieldsIterator.hasNext() ) 964 return; 965 966 while( fieldsIterator.hasNext() ) 967 { 968 Fields fields = fieldsIterator.next(); 969 970 if( fields.hasComparators() ) 971 { 972 conf.set( property, pack( fields, conf ) ); 973 return; 974 } 975 } 976 977 if( resolvedFields.size() != 0 ) // allows fields.UNKNOWN to be used 978 conf.setInt( property + ".size", resolvedFields.size() ); 979 } 980 }