001 /* 002 * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.flow.hadoop.util; 022 023 import java.io.IOException; 024 import java.io.UnsupportedEncodingException; 025 import java.lang.reflect.Field; 026 import java.net.URI; 027 import java.net.URL; 028 import java.util.HashMap; 029 import java.util.HashSet; 030 import java.util.List; 031 import java.util.Map; 032 import java.util.Properties; 033 import java.util.Set; 034 import java.util.jar.Attributes; 035 import java.util.jar.Manifest; 036 037 import cascading.flow.FlowException; 038 import cascading.flow.hadoop.HadoopFlowProcess; 039 import cascading.flow.planner.PlatformInfo; 040 import cascading.scheme.hadoop.TextLine; 041 import cascading.tap.SinkMode; 042 import cascading.tap.hadoop.Hfs; 043 import cascading.tap.hadoop.Lfs; 044 import cascading.tuple.Fields; 045 import cascading.tuple.Tuple; 046 import cascading.tuple.TupleEntryCollector; 047 import cascading.tuple.TupleEntryIterator; 048 import cascading.util.Util; 049 import org.apache.commons.codec.binary.Base64; 050 import org.apache.hadoop.conf.Configurable; 051 import org.apache.hadoop.conf.Configuration; 052 import org.apache.hadoop.filecache.DistributedCache; 053 import org.apache.hadoop.fs.FileSystem; 054 import org.apache.hadoop.fs.LocalFileSystem; 055 import org.apache.hadoop.fs.Path; 056 import org.apache.hadoop.mapred.JobConf; 057 import org.slf4j.Logger; 058 import org.slf4j.LoggerFactory; 059 060 /** 061 * 062 */ 063 public class HadoopUtil 064 { 065 private static final Logger LOG = LoggerFactory.getLogger( HadoopUtil.class ); 066 private static final String ENCODING = "US-ASCII"; 067 private static final Class<?> DEFAULT_OBJECT_SERIALIZER = JavaObjectSerializer.class; 068 private static PlatformInfo platformInfo; 069 070 public static void initLog4j( JobConf jobConf ) 071 { 072 String values = jobConf.get( "log4j.logger", null ); 073 074 if( values == null || values.length() == 0 ) 075 return; 076 077 if( !Util.hasClass( "org.apache.log4j.Logger" ) ) 078 { 079 LOG.info( "org.apache.log4j.Logger is not in the current CLASSPATH, not setting log4j.logger properties" ); 080 return; 081 } 082 083 String[] elements = values.split( "," ); 084 085 for( String element : elements ) 086 setLogLevel( element.split( "=" ) ); 087 } 088 089 private static void setLogLevel( String[] logger ) 090 { 091 // removing logj4 dependency 092 // org.apache.log4j.Logger.getLogger( logger[ 0 ] ).setLevel( org.apache.log4j.Level.toLevel( logger[ 1 ] ) ); 093 094 Object loggerObject = Util.invokeStaticMethod( "org.apache.log4j.Logger", "getLogger", 095 new Object[]{logger[ 0 ]}, new Class[]{String.class} ); 096 097 Object levelObject = Util.invokeStaticMethod( "org.apache.log4j.Level", "toLevel", 098 new Object[]{logger[ 1 ]}, new Class[]{String.class} ); 099 100 Util.invokeInstanceMethod( loggerObject, "setLevel", 101 new Object[]{levelObject}, new Class[]{levelObject.getClass()} ); 102 } 103 104 public static JobConf copyJobConf( JobConf parentJobConf ) 105 { 106 if( parentJobConf == null ) 107 throw new NullPointerException( "parentJobConf" ); 108 109 // see https://github.com/Cascading/cascading/pull/21 110 // The JobConf(JobConf) constructor causes derived JobConfs to share Credentials. We want to avoid this, in 111 // case those Credentials are mutated later on down the road (which they will be, during job submission, in 112 // separate threads!). Using the JobConf(Configuration) constructor avoids Credentials-sharing. 113 final Configuration configurationCopy = new Configuration( parentJobConf ); 114 final JobConf jobConf = new JobConf( configurationCopy ); 115 116 jobConf.getCredentials().addAll( parentJobConf.getCredentials() ); 117 118 return jobConf; 119 } 120 121 public static JobConf createJobConf( Map<Object, Object> properties, JobConf defaultJobconf ) 122 { 123 JobConf jobConf = defaultJobconf == null ? new JobConf() : copyJobConf( defaultJobconf ); 124 125 if( properties == null ) 126 return jobConf; 127 128 Set<Object> keys = new HashSet<Object>( properties.keySet() ); 129 130 // keys will only be grabbed if both key/value are String, so keep orig keys 131 if( properties instanceof Properties ) 132 keys.addAll( ( (Properties) properties ).stringPropertyNames() ); 133 134 for( Object key : keys ) 135 { 136 Object value = properties.get( key ); 137 138 if( value == null && properties instanceof Properties && key instanceof String ) 139 value = ( (Properties) properties ).getProperty( (String) key ); 140 141 if( value == null ) // don't stuff null values 142 continue; 143 144 // don't let these objects pass, even though toString is called below. 145 if( value instanceof Class || value instanceof JobConf ) 146 continue; 147 148 jobConf.set( key.toString(), value.toString() ); 149 } 150 151 return jobConf; 152 } 153 154 public static Map<Object, Object> createProperties( Configuration jobConf ) 155 { 156 Map<Object, Object> properties = new HashMap<Object, Object>(); 157 158 if( jobConf == null ) 159 return properties; 160 161 for( Map.Entry<String, String> entry : jobConf ) 162 properties.put( entry.getKey(), entry.getValue() ); 163 164 return properties; 165 } 166 167 public static Thread getHDFSShutdownHook() 168 { 169 Exception caughtException; 170 171 try 172 { 173 // we must init the FS so the finalizer is registered 174 FileSystem.getLocal( new JobConf() ); 175 176 Field field = FileSystem.class.getDeclaredField( "clientFinalizer" ); 177 field.setAccessible( true ); 178 179 Thread finalizer = (Thread) field.get( null ); 180 181 if( finalizer != null ) 182 Runtime.getRuntime().removeShutdownHook( finalizer ); 183 184 return finalizer; 185 } 186 catch( NoSuchFieldException exception ) 187 { 188 caughtException = exception; 189 } 190 catch( IllegalAccessException exception ) 191 { 192 caughtException = exception; 193 } 194 catch( IOException exception ) 195 { 196 caughtException = exception; 197 } 198 199 LOG.debug( "unable to find and remove client hdfs shutdown hook, received exception: {}", caughtException.getClass().getName() ); 200 201 return null; 202 } 203 204 public static String encodeBytes( byte[] bytes ) 205 { 206 try 207 { 208 return new String( Base64.encodeBase64( bytes ), ENCODING ); 209 } 210 catch( UnsupportedEncodingException exception ) 211 { 212 throw new RuntimeException( exception ); 213 } 214 } 215 216 public static byte[] decodeBytes( String string ) 217 { 218 try 219 { 220 byte[] bytes = string.getBytes( ENCODING ); 221 return Base64.decodeBase64( bytes ); 222 } 223 catch( UnsupportedEncodingException exception ) 224 { 225 throw new RuntimeException( exception ); 226 } 227 } 228 229 public static <T> ObjectSerializer instantiateSerializer( Configuration conf, Class<T> type ) throws ClassNotFoundException 230 { 231 Class<ObjectSerializer> flowSerializerClass; 232 233 String serializerClassName = conf.get( ObjectSerializer.OBJECT_SERIALIZER_PROPERTY ); 234 235 if( serializerClassName == null || serializerClassName.length() == 0 ) 236 flowSerializerClass = (Class<ObjectSerializer>) DEFAULT_OBJECT_SERIALIZER; 237 else 238 flowSerializerClass = (Class<ObjectSerializer>) Class.forName( serializerClassName ); 239 240 ObjectSerializer objectSerializer; 241 242 try 243 { 244 objectSerializer = flowSerializerClass.newInstance(); 245 246 if( objectSerializer instanceof Configurable ) 247 ( (Configurable) objectSerializer ).setConf( conf ); 248 } 249 catch( Exception exception ) 250 { 251 exception.printStackTrace(); 252 throw new IllegalArgumentException( "Unable to instantiate serializer \"" 253 + flowSerializerClass.getName() 254 + "\" for class: " 255 + type.getName() ); 256 } 257 258 if( !objectSerializer.accepts( type ) ) 259 throw new IllegalArgumentException( serializerClassName + " won't accept objects of class " + type.toString() ); 260 261 return objectSerializer; 262 } 263 264 public static <T> String serializeBase64( T object, JobConf conf ) throws IOException 265 { 266 return serializeBase64( object, conf, true ); 267 } 268 269 public static <T> String serializeBase64( T object, JobConf conf, boolean compress ) throws IOException 270 { 271 ObjectSerializer objectSerializer; 272 273 try 274 { 275 objectSerializer = instantiateSerializer( conf, object.getClass() ); 276 } 277 catch( ClassNotFoundException exception ) 278 { 279 throw new IOException( exception ); 280 } 281 282 return encodeBytes( objectSerializer.serialize( object, compress ) ); 283 } 284 285 /** 286 * This method deserializes the Base64 encoded String into an Object instance. 287 * 288 * @param string 289 * @return an Object 290 */ 291 public static <T> T deserializeBase64( String string, Configuration conf, Class<T> type ) throws IOException 292 { 293 return deserializeBase64( string, conf, type, true ); 294 } 295 296 public static <T> T deserializeBase64( String string, Configuration conf, Class<T> type, boolean decompress ) throws IOException 297 { 298 if( string == null || string.length() == 0 ) 299 return null; 300 301 ObjectSerializer objectSerializer; 302 303 try 304 { 305 objectSerializer = instantiateSerializer( conf, type ); 306 } 307 catch( ClassNotFoundException exception ) 308 { 309 throw new IOException( exception ); 310 } 311 312 return objectSerializer.deserialize( decodeBytes( string ), type, decompress ); 313 } 314 315 public static Class findMainClass( Class defaultType ) 316 { 317 StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace(); 318 319 for( StackTraceElement stackTraceElement : stackTrace ) 320 { 321 if( stackTraceElement.getMethodName().equals( "main" ) && !stackTraceElement.getClassName().startsWith( "org.apache.hadoop" ) ) 322 { 323 try 324 { 325 LOG.info( "resolving application jar from found main method on: {}", stackTraceElement.getClassName() ); 326 327 return Thread.currentThread().getContextClassLoader().loadClass( stackTraceElement.getClassName() ); 328 } 329 catch( ClassNotFoundException exception ) 330 { 331 LOG.warn( "unable to load class while discovering application jar: {}", stackTraceElement.getClassName(), exception ); 332 } 333 } 334 } 335 336 LOG.info( "using default application jar, may cause class not found exceptions on the cluster" ); 337 338 return defaultType; 339 } 340 341 public static Map<String, String> getConfig( JobConf defaultConf, JobConf updatedConf ) 342 { 343 Map<String, String> configs = new HashMap<String, String>(); 344 345 for( Map.Entry<String, String> entry : updatedConf ) 346 configs.put( entry.getKey(), entry.getValue() ); 347 348 for( Map.Entry<String, String> entry : defaultConf ) 349 { 350 if( entry.getValue() == null ) 351 continue; 352 353 String updatedValue = configs.get( entry.getKey() ); 354 355 // if both null, lets purge from map to save space 356 if( updatedValue == null && entry.getValue() == null ) 357 configs.remove( entry.getKey() ); 358 359 // if the values are the same, lets also purge from map to save space 360 if( updatedValue != null && updatedValue.equals( entry.getValue() ) ) 361 configs.remove( entry.getKey() ); 362 363 configs.remove( "mapred.working.dir" ); 364 configs.remove( "mapreduce.job.working.dir" ); // hadoop2 365 } 366 367 return configs; 368 } 369 370 public static JobConf[] getJobConfs( JobConf job, List<Map<String, String>> configs ) 371 { 372 JobConf[] jobConfs = new JobConf[ configs.size() ]; 373 374 for( int i = 0; i < jobConfs.length; i++ ) 375 jobConfs[ i ] = mergeConf( job, configs.get( i ), false ); 376 377 return jobConfs; 378 } 379 380 public static JobConf mergeConf( JobConf job, Map<String, String> config, boolean directly ) 381 { 382 JobConf currentConf = directly ? job : copyJobConf( job ); 383 384 for( String key : config.keySet() ) 385 { 386 LOG.debug( "merging key: {} value: {}", key, config.get( key ) ); 387 388 currentConf.set( key, config.get( key ) ); 389 } 390 391 return currentConf; 392 } 393 394 public static JobConf removePropertiesFrom( JobConf jobConf, String... keys ) 395 { 396 Map<Object, Object> properties = createProperties( jobConf ); 397 398 for( String key : keys ) 399 properties.remove( key ); 400 401 return createJobConf( properties, null ); 402 } 403 404 public static boolean removeStateFromDistCache( JobConf conf, String path ) throws IOException 405 { 406 return new Hfs( new TextLine(), path ).deleteResource( conf ); 407 } 408 409 public static String writeStateToDistCache( JobConf conf, String id, String stepState ) 410 { 411 LOG.info( "writing step state to dist cache, too large for job conf, size: {}", stepState.length() ); 412 413 String statePath = Hfs.getTempPath( conf ) + "/step-state-" + id; 414 415 Hfs temp = new Hfs( new TextLine(), statePath, SinkMode.REPLACE ); 416 417 try 418 { 419 TupleEntryCollector writer = temp.openForWrite( new HadoopFlowProcess( conf ) ); 420 421 writer.add( new Tuple( stepState ) ); 422 423 writer.close(); 424 } 425 catch( IOException exception ) 426 { 427 throw new FlowException( "unable to write step state to Hadoop FS: " + temp.getIdentifier() ); 428 } 429 430 URI uri = new Path( statePath ).toUri(); 431 DistributedCache.addCacheFile( uri, conf ); 432 433 LOG.info( "using step state path: {}", uri ); 434 435 return statePath; 436 } 437 438 public static String readStateFromDistCache( JobConf jobConf, String id ) throws IOException 439 { 440 Path[] files = DistributedCache.getLocalCacheFiles( jobConf ); 441 442 Path stepStatePath = null; 443 444 for( Path file : files ) 445 { 446 if( !file.toString().contains( "step-state-" + id ) ) 447 continue; 448 449 stepStatePath = file; 450 break; 451 } 452 453 if( stepStatePath == null ) 454 throw new FlowException( "unable to find step state from distributed cache" ); 455 456 LOG.info( "reading step state from local path: {}", stepStatePath ); 457 458 Hfs temp = new Lfs( new TextLine( new Fields( "line" ) ), stepStatePath.toString() ); 459 460 TupleEntryIterator reader = null; 461 462 try 463 { 464 reader = temp.openForRead( new HadoopFlowProcess( jobConf ) ); 465 466 if( !reader.hasNext() ) 467 throw new FlowException( "step state path is empty: " + temp.getIdentifier() ); 468 469 return reader.next().getString( 0 ); 470 } 471 catch( IOException exception ) 472 { 473 throw new FlowException( "unable to find state path: " + temp.getIdentifier(), exception ); 474 } 475 finally 476 { 477 try 478 { 479 if( reader != null ) 480 reader.close(); 481 } 482 catch( IOException exception ) 483 { 484 LOG.warn( "error closing state path reader", exception ); 485 } 486 } 487 } 488 489 public static PlatformInfo getPlatformInfo() 490 { 491 if( platformInfo == null ) 492 platformInfo = getPlatformInfoInternal(); 493 494 return platformInfo; 495 } 496 497 private static PlatformInfo getPlatformInfoInternal() 498 { 499 URL url = JobConf.class.getResource( JobConf.class.getSimpleName() + ".class" ); 500 501 if( url == null || !url.toString().startsWith( "jar" ) ) 502 return new PlatformInfo( "Hadoop", null, null ); 503 504 String path = url.toString(); 505 String manifestPath = path.substring( 0, path.lastIndexOf( "!" ) + 1 ) + "/META-INF/MANIFEST.MF"; 506 507 Manifest manifest; 508 509 try 510 { 511 manifest = new Manifest( new URL( manifestPath ).openStream() ); 512 } 513 catch( IOException exception ) 514 { 515 LOG.warn( "unable to get manifest from {}", manifestPath, exception ); 516 517 return new PlatformInfo( "Hadoop", null, null ); 518 } 519 520 Attributes attributes = manifest.getAttributes( "org/apache/hadoop" ); 521 522 if( attributes == null ) 523 { 524 LOG.debug( "unable to get Hadoop manifest attributes" ); 525 return new PlatformInfo( "Hadoop", null, null ); 526 } 527 528 String vendor = attributes.getValue( "Implementation-Vendor" ); 529 String version = attributes.getValue( "Implementation-Version" ); 530 531 return new PlatformInfo( "Hadoop", vendor, version ); 532 } 533 534 /** 535 * Add to class path. 536 * 537 * @param config the config 538 * @param classpath the classpath 539 */ 540 public static Map<Path, Path> addToClassPath( JobConf config, List<String> classpath ) 541 { 542 if( classpath == null ) 543 return null; 544 545 // given to fully qualified 546 Map<String, Path> localPaths = new HashMap<String, Path>(); 547 Map<String, Path> remotePaths = new HashMap<String, Path>(); 548 549 resolvePaths( config, classpath, localPaths, remotePaths ); 550 551 try 552 { 553 LocalFileSystem localFS = getLocalFS( config ); 554 555 for( String path : localPaths.keySet() ) 556 { 557 // only add local if no remote 558 if( remotePaths.containsKey( path ) ) 559 continue; 560 561 Path artifact = localPaths.get( path ); 562 563 DistributedCache.addFileToClassPath( artifact.makeQualified( localFS ), config ); 564 } 565 566 FileSystem defaultFS = getDefaultFS( config ); 567 568 for( String path : remotePaths.keySet() ) 569 { 570 // always add remote 571 Path artifact = remotePaths.get( path ); 572 573 DistributedCache.addFileToClassPath( artifact.makeQualified( defaultFS ), config ); 574 } 575 } 576 catch( IOException exception ) 577 { 578 throw new FlowException( "unable to set distributed cache paths", exception ); 579 } 580 581 return getCommonPaths( localPaths, remotePaths ); 582 } 583 584 public static void syncPaths( JobConf config, Map<Path, Path> commonPaths ) 585 { 586 if( commonPaths == null ) 587 return; 588 589 Map<Path, Path> copyPaths = getCopyPaths( config, commonPaths ); 590 591 FileSystem remoteFS = getDefaultFS( config ); 592 593 for( Map.Entry<Path, Path> entry : copyPaths.entrySet() ) 594 { 595 Path localPath = entry.getKey(); 596 Path remotePath = entry.getValue(); 597 598 try 599 { 600 LOG.info( "copying from: {}, to: {}", localPath, remotePath ); 601 remoteFS.copyFromLocalFile( localPath, remotePath ); 602 } 603 catch( IOException exception ) 604 { 605 throw new FlowException( "unable to copy local: " + localPath + " to remote: " + remotePath ); 606 } 607 } 608 } 609 610 private static Map<Path, Path> getCommonPaths( Map<String, Path> localPaths, Map<String, Path> remotePaths ) 611 { 612 Map<Path, Path> commonPaths = new HashMap<Path, Path>(); 613 614 for( Map.Entry<String, Path> entry : localPaths.entrySet() ) 615 { 616 if( remotePaths.containsKey( entry.getKey() ) ) 617 commonPaths.put( entry.getValue(), remotePaths.get( entry.getKey() ) ); 618 } 619 return commonPaths; 620 } 621 622 private static Map<Path, Path> getCopyPaths( JobConf config, Map<Path, Path> commonPaths ) 623 { 624 Map<Path, Path> copyPaths = new HashMap<Path, Path>(); 625 626 FileSystem remoteFS = getDefaultFS( config ); 627 FileSystem localFS = getLocalFS( config ); 628 629 for( Map.Entry<Path, Path> entry : commonPaths.entrySet() ) 630 { 631 Path localPath = entry.getKey(); 632 Path remotePath = entry.getValue(); 633 634 try 635 { 636 boolean localExists = localFS.exists( localPath ); 637 boolean remoteExist = remoteFS.exists( remotePath ); 638 639 if( localExists && !remoteExist ) 640 { 641 copyPaths.put( localPath, remotePath ); 642 } 643 else if( localExists ) 644 { 645 long localModTime = localFS.getFileStatus( localPath ).getModificationTime(); 646 long remoteModTime = remoteFS.getFileStatus( remotePath ).getModificationTime(); 647 648 if( localModTime > remoteModTime ) 649 copyPaths.put( localPath, remotePath ); 650 } 651 } 652 catch( IOException exception ) 653 { 654 throw new FlowException( "unable to get handle to underlying filesystem", exception ); 655 } 656 } 657 658 return copyPaths; 659 } 660 661 private static void resolvePaths( JobConf config, List<String> classpath, Map<String, Path> localPaths, Map<String, Path> remotePaths ) 662 { 663 FileSystem defaultFS = getDefaultFS( config ); 664 FileSystem localFS = getLocalFS( config ); 665 666 boolean defaultIsLocal = defaultFS.equals( localFS ); 667 668 for( String stringPath : classpath ) 669 { 670 URI uri = URI.create( stringPath ); // fails if invalid uri 671 Path path = new Path( uri.toString() ); 672 673 if( uri.getScheme() == null && !defaultIsLocal ) // we want to sync 674 { 675 Path localPath = path.makeQualified( localFS ); 676 677 if( !exists( localFS, localPath ) ) 678 throw new FlowException( "path not found: " + localPath ); 679 680 localPaths.put( stringPath, localPath ); 681 remotePaths.put( stringPath, path.makeQualified( defaultFS ) ); 682 } 683 else if( localFS.equals( getFileSystem( config, path ) ) ) 684 { 685 if( !exists( localFS, path ) ) 686 throw new FlowException( "path not found: " + path ); 687 688 localPaths.put( stringPath, path ); 689 } 690 else 691 { 692 if( !exists( defaultFS, path ) ) 693 throw new FlowException( "path not found: " + path ); 694 695 remotePaths.put( stringPath, path ); 696 } 697 } 698 } 699 700 private static boolean exists( FileSystem fileSystem, Path path ) 701 { 702 try 703 { 704 return fileSystem.exists( path ); 705 } 706 catch( IOException exception ) 707 { 708 throw new FlowException( "could not test file exists: " + path ); 709 } 710 } 711 712 private static FileSystem getFileSystem( JobConf config, Path path ) 713 { 714 try 715 { 716 return path.getFileSystem( config ); 717 } 718 catch( IOException exception ) 719 { 720 throw new FlowException( "unable to get handle to underlying filesystem", exception ); 721 } 722 } 723 724 private static LocalFileSystem getLocalFS( JobConf config ) 725 { 726 try 727 { 728 return FileSystem.getLocal( config ); 729 } 730 catch( IOException exception ) 731 { 732 throw new FlowException( "unable to get handle to underlying filesystem", exception ); 733 } 734 } 735 736 private static FileSystem getDefaultFS( JobConf config ) 737 { 738 try 739 { 740 return FileSystem.get( config ); 741 } 742 catch( IOException exception ) 743 { 744 throw new FlowException( "unable to get handle to underlying filesystem", exception ); 745 } 746 } 747 748 public static boolean isLocal( JobConf conf ) 749 { 750 // hadoop 1.0 and 2.0 use different properties to define local mode: we check the new YARN 751 // property first 752 String frameworkName = conf.get( "mapreduce.framework.name" ); 753 754 // we are running on hadoop 2.0 (YARN) 755 if( frameworkName != null ) 756 return frameworkName.equals( "local" ); 757 758 // hadoop 1.0: use the old property to determine the local mode 759 return conf.get( "mapred.job.tracker" ).equals( "local" ); 760 } 761 762 public static void setLocal( JobConf conf ) 763 { 764 // set both properties to local 765 conf.set( "mapred.job.tracker", "local" ); 766 conf.set( "mapreduce.framework.name", "local" ); 767 } 768 }