001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.flow.hadoop.util;
022    
023    import java.io.IOException;
024    import java.io.UnsupportedEncodingException;
025    import java.lang.reflect.Field;
026    import java.net.URI;
027    import java.net.URL;
028    import java.util.HashMap;
029    import java.util.HashSet;
030    import java.util.List;
031    import java.util.Map;
032    import java.util.Properties;
033    import java.util.Set;
034    import java.util.jar.Attributes;
035    import java.util.jar.Manifest;
036    
037    import cascading.flow.FlowException;
038    import cascading.flow.hadoop.HadoopFlowProcess;
039    import cascading.flow.planner.PlatformInfo;
040    import cascading.scheme.hadoop.TextLine;
041    import cascading.tap.SinkMode;
042    import cascading.tap.hadoop.Hfs;
043    import cascading.tap.hadoop.Lfs;
044    import cascading.tuple.Fields;
045    import cascading.tuple.Tuple;
046    import cascading.tuple.TupleEntryCollector;
047    import cascading.tuple.TupleEntryIterator;
048    import cascading.util.Util;
049    import org.apache.commons.codec.binary.Base64;
050    import org.apache.hadoop.conf.Configurable;
051    import org.apache.hadoop.conf.Configuration;
052    import org.apache.hadoop.filecache.DistributedCache;
053    import org.apache.hadoop.fs.FileSystem;
054    import org.apache.hadoop.fs.LocalFileSystem;
055    import org.apache.hadoop.fs.Path;
056    import org.apache.hadoop.mapred.JobConf;
057    import org.slf4j.Logger;
058    import org.slf4j.LoggerFactory;
059    
060    /**
061     *
062     */
063    public class HadoopUtil
064      {
065      private static final Logger LOG = LoggerFactory.getLogger( HadoopUtil.class );
066      private static final String ENCODING = "US-ASCII";
067      private static final Class<?> DEFAULT_OBJECT_SERIALIZER = JavaObjectSerializer.class;
068      private static PlatformInfo platformInfo;
069    
070      public static void initLog4j( JobConf jobConf )
071        {
072        String values = jobConf.get( "log4j.logger", null );
073    
074        if( values == null || values.length() == 0 )
075          return;
076    
077        if( !Util.hasClass( "org.apache.log4j.Logger" ) )
078          {
079          LOG.info( "org.apache.log4j.Logger is not in the current CLASSPATH, not setting log4j.logger properties" );
080          return;
081          }
082    
083        String[] elements = values.split( "," );
084    
085        for( String element : elements )
086          setLogLevel( element.split( "=" ) );
087        }
088    
089      private static void setLogLevel( String[] logger )
090        {
091        // removing logj4 dependency
092        // org.apache.log4j.Logger.getLogger( logger[ 0 ] ).setLevel( org.apache.log4j.Level.toLevel( logger[ 1 ] ) );
093    
094        Object loggerObject = Util.invokeStaticMethod( "org.apache.log4j.Logger", "getLogger",
095          new Object[]{logger[ 0 ]}, new Class[]{String.class} );
096    
097        Object levelObject = Util.invokeStaticMethod( "org.apache.log4j.Level", "toLevel",
098          new Object[]{logger[ 1 ]}, new Class[]{String.class} );
099    
100        Util.invokeInstanceMethod( loggerObject, "setLevel",
101          new Object[]{levelObject}, new Class[]{levelObject.getClass()} );
102        }
103    
104      public static JobConf copyJobConf( JobConf parentJobConf )
105        {
106        if( parentJobConf == null )
107          throw new NullPointerException( "parentJobConf" );
108    
109        // see https://github.com/Cascading/cascading/pull/21
110        // The JobConf(JobConf) constructor causes derived JobConfs to share Credentials. We want to avoid this, in
111        // case those Credentials are mutated later on down the road (which they will be, during job submission, in
112        // separate threads!). Using the JobConf(Configuration) constructor avoids Credentials-sharing.
113        final Configuration configurationCopy = new Configuration( parentJobConf );
114        final JobConf jobConf = new JobConf( configurationCopy );
115    
116        jobConf.getCredentials().addAll( parentJobConf.getCredentials() );
117    
118        return jobConf;
119        }
120    
121      public static JobConf createJobConf( Map<Object, Object> properties, JobConf defaultJobconf )
122        {
123        JobConf jobConf = defaultJobconf == null ? new JobConf() : copyJobConf( defaultJobconf );
124    
125        if( properties == null )
126          return jobConf;
127    
128        Set<Object> keys = new HashSet<Object>( properties.keySet() );
129    
130        // keys will only be grabbed if both key/value are String, so keep orig keys
131        if( properties instanceof Properties )
132          keys.addAll( ( (Properties) properties ).stringPropertyNames() );
133    
134        for( Object key : keys )
135          {
136          Object value = properties.get( key );
137    
138          if( value == null && properties instanceof Properties && key instanceof String )
139            value = ( (Properties) properties ).getProperty( (String) key );
140    
141          if( value == null ) // don't stuff null values
142            continue;
143    
144          // don't let these objects pass, even though toString is called below.
145          if( value instanceof Class || value instanceof JobConf )
146            continue;
147    
148          jobConf.set( key.toString(), value.toString() );
149          }
150    
151        return jobConf;
152        }
153    
154      public static Map<Object, Object> createProperties( Configuration jobConf )
155        {
156        Map<Object, Object> properties = new HashMap<Object, Object>();
157    
158        if( jobConf == null )
159          return properties;
160    
161        for( Map.Entry<String, String> entry : jobConf )
162          properties.put( entry.getKey(), entry.getValue() );
163    
164        return properties;
165        }
166    
167      public static Thread getHDFSShutdownHook()
168        {
169        Exception caughtException;
170    
171        try
172          {
173          // we must init the FS so the finalizer is registered
174          FileSystem.getLocal( new JobConf() );
175    
176          Field field = FileSystem.class.getDeclaredField( "clientFinalizer" );
177          field.setAccessible( true );
178    
179          Thread finalizer = (Thread) field.get( null );
180    
181          if( finalizer != null )
182            Runtime.getRuntime().removeShutdownHook( finalizer );
183    
184          return finalizer;
185          }
186        catch( NoSuchFieldException exception )
187          {
188          caughtException = exception;
189          }
190        catch( IllegalAccessException exception )
191          {
192          caughtException = exception;
193          }
194        catch( IOException exception )
195          {
196          caughtException = exception;
197          }
198    
199        LOG.debug( "unable to find and remove client hdfs shutdown hook, received exception: {}", caughtException.getClass().getName() );
200    
201        return null;
202        }
203    
204      public static String encodeBytes( byte[] bytes )
205        {
206        try
207          {
208          return new String( Base64.encodeBase64( bytes ), ENCODING );
209          }
210        catch( UnsupportedEncodingException exception )
211          {
212          throw new RuntimeException( exception );
213          }
214        }
215    
216      public static byte[] decodeBytes( String string )
217        {
218        try
219          {
220          byte[] bytes = string.getBytes( ENCODING );
221          return Base64.decodeBase64( bytes );
222          }
223        catch( UnsupportedEncodingException exception )
224          {
225          throw new RuntimeException( exception );
226          }
227        }
228    
229      public static <T> ObjectSerializer instantiateSerializer( Configuration conf, Class<T> type ) throws ClassNotFoundException
230        {
231        Class<ObjectSerializer> flowSerializerClass;
232    
233        String serializerClassName = conf.get( ObjectSerializer.OBJECT_SERIALIZER_PROPERTY );
234    
235        if( serializerClassName == null || serializerClassName.length() == 0 )
236          flowSerializerClass = (Class<ObjectSerializer>) DEFAULT_OBJECT_SERIALIZER;
237        else
238          flowSerializerClass = (Class<ObjectSerializer>) Class.forName( serializerClassName );
239    
240        ObjectSerializer objectSerializer;
241    
242        try
243          {
244          objectSerializer = flowSerializerClass.newInstance();
245    
246          if( objectSerializer instanceof Configurable )
247            ( (Configurable) objectSerializer ).setConf( conf );
248          }
249        catch( Exception exception )
250          {
251          exception.printStackTrace();
252          throw new IllegalArgumentException( "Unable to instantiate serializer \""
253            + flowSerializerClass.getName()
254            + "\" for class: "
255            + type.getName() );
256          }
257    
258        if( !objectSerializer.accepts( type ) )
259          throw new IllegalArgumentException( serializerClassName + " won't accept objects of class " + type.toString() );
260    
261        return objectSerializer;
262        }
263    
264      public static <T> String serializeBase64( T object, JobConf conf ) throws IOException
265        {
266        return serializeBase64( object, conf, true );
267        }
268    
269      public static <T> String serializeBase64( T object, JobConf conf, boolean compress ) throws IOException
270        {
271        ObjectSerializer objectSerializer;
272    
273        try
274          {
275          objectSerializer = instantiateSerializer( conf, object.getClass() );
276          }
277        catch( ClassNotFoundException exception )
278          {
279          throw new IOException( exception );
280          }
281    
282        return encodeBytes( objectSerializer.serialize( object, compress ) );
283        }
284    
285      /**
286       * This method deserializes the Base64 encoded String into an Object instance.
287       *
288       * @param string
289       * @return an Object
290       */
291      public static <T> T deserializeBase64( String string, Configuration conf, Class<T> type ) throws IOException
292        {
293        return deserializeBase64( string, conf, type, true );
294        }
295    
296      public static <T> T deserializeBase64( String string, Configuration conf, Class<T> type, boolean decompress ) throws IOException
297        {
298        if( string == null || string.length() == 0 )
299          return null;
300    
301        ObjectSerializer objectSerializer;
302    
303        try
304          {
305          objectSerializer = instantiateSerializer( conf, type );
306          }
307        catch( ClassNotFoundException exception )
308          {
309          throw new IOException( exception );
310          }
311    
312        return objectSerializer.deserialize( decodeBytes( string ), type, decompress );
313        }
314    
315      public static Class findMainClass( Class defaultType )
316        {
317        StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
318    
319        for( StackTraceElement stackTraceElement : stackTrace )
320          {
321          if( stackTraceElement.getMethodName().equals( "main" ) && !stackTraceElement.getClassName().startsWith( "org.apache.hadoop" ) )
322            {
323            try
324              {
325              LOG.info( "resolving application jar from found main method on: {}", stackTraceElement.getClassName() );
326    
327              return Thread.currentThread().getContextClassLoader().loadClass( stackTraceElement.getClassName() );
328              }
329            catch( ClassNotFoundException exception )
330              {
331              LOG.warn( "unable to load class while discovering application jar: {}", stackTraceElement.getClassName(), exception );
332              }
333            }
334          }
335    
336        LOG.info( "using default application jar, may cause class not found exceptions on the cluster" );
337    
338        return defaultType;
339        }
340    
341      public static Map<String, String> getConfig( JobConf defaultConf, JobConf updatedConf )
342        {
343        Map<String, String> configs = new HashMap<String, String>();
344    
345        for( Map.Entry<String, String> entry : updatedConf )
346          configs.put( entry.getKey(), entry.getValue() );
347    
348        for( Map.Entry<String, String> entry : defaultConf )
349          {
350          if( entry.getValue() == null )
351            continue;
352    
353          String updatedValue = configs.get( entry.getKey() );
354    
355          // if both null, lets purge from map to save space
356          if( updatedValue == null && entry.getValue() == null )
357            configs.remove( entry.getKey() );
358    
359          // if the values are the same, lets also purge from map to save space
360          if( updatedValue != null && updatedValue.equals( entry.getValue() ) )
361            configs.remove( entry.getKey() );
362    
363          configs.remove( "mapred.working.dir" );
364          configs.remove( "mapreduce.job.working.dir" ); // hadoop2
365          }
366    
367        return configs;
368        }
369    
370      public static JobConf[] getJobConfs( JobConf job, List<Map<String, String>> configs )
371        {
372        JobConf[] jobConfs = new JobConf[ configs.size() ];
373    
374        for( int i = 0; i < jobConfs.length; i++ )
375          jobConfs[ i ] = mergeConf( job, configs.get( i ), false );
376    
377        return jobConfs;
378        }
379    
380      public static JobConf mergeConf( JobConf job, Map<String, String> config, boolean directly )
381        {
382        JobConf currentConf = directly ? job : copyJobConf( job );
383    
384        for( String key : config.keySet() )
385          {
386          LOG.debug( "merging key: {} value: {}", key, config.get( key ) );
387    
388          currentConf.set( key, config.get( key ) );
389          }
390    
391        return currentConf;
392        }
393    
394      public static JobConf removePropertiesFrom( JobConf jobConf, String... keys )
395        {
396        Map<Object, Object> properties = createProperties( jobConf );
397    
398        for( String key : keys )
399          properties.remove( key );
400    
401        return createJobConf( properties, null );
402        }
403    
404      public static boolean removeStateFromDistCache( JobConf conf, String path ) throws IOException
405        {
406        return new Hfs( new TextLine(), path ).deleteResource( conf );
407        }
408    
409      public static String writeStateToDistCache( JobConf conf, String id, String stepState )
410        {
411        LOG.info( "writing step state to dist cache, too large for job conf, size: {}", stepState.length() );
412    
413        String statePath = Hfs.getTempPath( conf ) + "/step-state-" + id;
414    
415        Hfs temp = new Hfs( new TextLine(), statePath, SinkMode.REPLACE );
416    
417        try
418          {
419          TupleEntryCollector writer = temp.openForWrite( new HadoopFlowProcess( conf ) );
420    
421          writer.add( new Tuple( stepState ) );
422    
423          writer.close();
424          }
425        catch( IOException exception )
426          {
427          throw new FlowException( "unable to write step state to Hadoop FS: " + temp.getIdentifier() );
428          }
429    
430        URI uri = new Path( statePath ).toUri();
431        DistributedCache.addCacheFile( uri, conf );
432    
433        LOG.info( "using step state path: {}", uri );
434    
435        return statePath;
436        }
437    
438      public static String readStateFromDistCache( JobConf jobConf, String id ) throws IOException
439        {
440        Path[] files = DistributedCache.getLocalCacheFiles( jobConf );
441    
442        Path stepStatePath = null;
443    
444        for( Path file : files )
445          {
446          if( !file.toString().contains( "step-state-" + id ) )
447            continue;
448    
449          stepStatePath = file;
450          break;
451          }
452    
453        if( stepStatePath == null )
454          throw new FlowException( "unable to find step state from distributed cache" );
455    
456        LOG.info( "reading step state from local path: {}", stepStatePath );
457    
458        Hfs temp = new Lfs( new TextLine( new Fields( "line" ) ), stepStatePath.toString() );
459    
460        TupleEntryIterator reader = null;
461    
462        try
463          {
464          reader = temp.openForRead( new HadoopFlowProcess( jobConf ) );
465    
466          if( !reader.hasNext() )
467            throw new FlowException( "step state path is empty: " + temp.getIdentifier() );
468    
469          return reader.next().getString( 0 );
470          }
471        catch( IOException exception )
472          {
473          throw new FlowException( "unable to find state path: " + temp.getIdentifier(), exception );
474          }
475        finally
476          {
477          try
478            {
479            if( reader != null )
480              reader.close();
481            }
482          catch( IOException exception )
483            {
484            LOG.warn( "error closing state path reader", exception );
485            }
486          }
487        }
488    
489      public static PlatformInfo getPlatformInfo()
490        {
491        if( platformInfo == null )
492          platformInfo = getPlatformInfoInternal();
493    
494        return platformInfo;
495        }
496    
497      private static PlatformInfo getPlatformInfoInternal()
498        {
499        URL url = JobConf.class.getResource( JobConf.class.getSimpleName() + ".class" );
500    
501        if( url == null || !url.toString().startsWith( "jar" ) )
502          return new PlatformInfo( "Hadoop", null, null );
503    
504        String path = url.toString();
505        String manifestPath = path.substring( 0, path.lastIndexOf( "!" ) + 1 ) + "/META-INF/MANIFEST.MF";
506    
507        Manifest manifest;
508    
509        try
510          {
511          manifest = new Manifest( new URL( manifestPath ).openStream() );
512          }
513        catch( IOException exception )
514          {
515          LOG.warn( "unable to get manifest from {}", manifestPath, exception );
516    
517          return new PlatformInfo( "Hadoop", null, null );
518          }
519    
520        Attributes attributes = manifest.getAttributes( "org/apache/hadoop" );
521    
522        if( attributes == null )
523          {
524          LOG.debug( "unable to get Hadoop manifest attributes" );
525          return new PlatformInfo( "Hadoop", null, null );
526          }
527    
528        String vendor = attributes.getValue( "Implementation-Vendor" );
529        String version = attributes.getValue( "Implementation-Version" );
530    
531        return new PlatformInfo( "Hadoop", vendor, version );
532        }
533    
534      /**
535       * Add to class path.
536       *
537       * @param config    the config
538       * @param classpath the classpath
539       */
540      public static Map<Path, Path> addToClassPath( JobConf config, List<String> classpath )
541        {
542        if( classpath == null )
543          return null;
544    
545        // given to fully qualified
546        Map<String, Path> localPaths = new HashMap<String, Path>();
547        Map<String, Path> remotePaths = new HashMap<String, Path>();
548    
549        resolvePaths( config, classpath, localPaths, remotePaths );
550    
551        try
552          {
553          LocalFileSystem localFS = getLocalFS( config );
554    
555          for( String path : localPaths.keySet() )
556            {
557            // only add local if no remote
558            if( remotePaths.containsKey( path ) )
559              continue;
560    
561            Path artifact = localPaths.get( path );
562    
563            DistributedCache.addFileToClassPath( artifact.makeQualified( localFS ), config );
564            }
565    
566          FileSystem defaultFS = getDefaultFS( config );
567    
568          for( String path : remotePaths.keySet() )
569            {
570            // always add remote
571            Path artifact = remotePaths.get( path );
572    
573            DistributedCache.addFileToClassPath( artifact.makeQualified( defaultFS ), config );
574            }
575          }
576        catch( IOException exception )
577          {
578          throw new FlowException( "unable to set distributed cache paths", exception );
579          }
580    
581        return getCommonPaths( localPaths, remotePaths );
582        }
583    
584      public static void syncPaths( JobConf config, Map<Path, Path> commonPaths )
585        {
586        if( commonPaths == null )
587          return;
588    
589        Map<Path, Path> copyPaths = getCopyPaths( config, commonPaths );
590    
591        FileSystem remoteFS = getDefaultFS( config );
592    
593        for( Map.Entry<Path, Path> entry : copyPaths.entrySet() )
594          {
595          Path localPath = entry.getKey();
596          Path remotePath = entry.getValue();
597    
598          try
599            {
600            LOG.info( "copying from: {}, to: {}", localPath, remotePath );
601            remoteFS.copyFromLocalFile( localPath, remotePath );
602            }
603          catch( IOException exception )
604            {
605            throw new FlowException( "unable to copy local: " + localPath + " to remote: " + remotePath );
606            }
607          }
608        }
609    
610      private static Map<Path, Path> getCommonPaths( Map<String, Path> localPaths, Map<String, Path> remotePaths )
611        {
612        Map<Path, Path> commonPaths = new HashMap<Path, Path>();
613    
614        for( Map.Entry<String, Path> entry : localPaths.entrySet() )
615          {
616          if( remotePaths.containsKey( entry.getKey() ) )
617            commonPaths.put( entry.getValue(), remotePaths.get( entry.getKey() ) );
618          }
619        return commonPaths;
620        }
621    
622      private static Map<Path, Path> getCopyPaths( JobConf config, Map<Path, Path> commonPaths )
623        {
624        Map<Path, Path> copyPaths = new HashMap<Path, Path>();
625    
626        FileSystem remoteFS = getDefaultFS( config );
627        FileSystem localFS = getLocalFS( config );
628    
629        for( Map.Entry<Path, Path> entry : commonPaths.entrySet() )
630          {
631          Path localPath = entry.getKey();
632          Path remotePath = entry.getValue();
633    
634          try
635            {
636            boolean localExists = localFS.exists( localPath );
637            boolean remoteExist = remoteFS.exists( remotePath );
638    
639            if( localExists && !remoteExist )
640              {
641              copyPaths.put( localPath, remotePath );
642              }
643            else if( localExists )
644              {
645              long localModTime = localFS.getFileStatus( localPath ).getModificationTime();
646              long remoteModTime = remoteFS.getFileStatus( remotePath ).getModificationTime();
647    
648              if( localModTime > remoteModTime )
649                copyPaths.put( localPath, remotePath );
650              }
651            }
652          catch( IOException exception )
653            {
654            throw new FlowException( "unable to get handle to underlying filesystem", exception );
655            }
656          }
657    
658        return copyPaths;
659        }
660    
661      private static void resolvePaths( JobConf config, List<String> classpath, Map<String, Path> localPaths, Map<String, Path> remotePaths )
662        {
663        FileSystem defaultFS = getDefaultFS( config );
664        FileSystem localFS = getLocalFS( config );
665    
666        boolean defaultIsLocal = defaultFS.equals( localFS );
667    
668        for( String stringPath : classpath )
669          {
670          URI uri = URI.create( stringPath ); // fails if invalid uri
671          Path path = new Path( uri.toString() );
672    
673          if( uri.getScheme() == null && !defaultIsLocal ) // we want to sync
674            {
675            Path localPath = path.makeQualified( localFS );
676    
677            if( !exists( localFS, localPath ) )
678              throw new FlowException( "path not found: " + localPath );
679    
680            localPaths.put( stringPath, localPath );
681            remotePaths.put( stringPath, path.makeQualified( defaultFS ) );
682            }
683          else if( localFS.equals( getFileSystem( config, path ) ) )
684            {
685            if( !exists( localFS, path ) )
686              throw new FlowException( "path not found: " + path );
687    
688            localPaths.put( stringPath, path );
689            }
690          else
691            {
692            if( !exists( defaultFS, path ) )
693              throw new FlowException( "path not found: " + path );
694    
695            remotePaths.put( stringPath, path );
696            }
697          }
698        }
699    
700      private static boolean exists( FileSystem fileSystem, Path path )
701        {
702        try
703          {
704          return fileSystem.exists( path );
705          }
706        catch( IOException exception )
707          {
708          throw new FlowException( "could not test file exists: " + path );
709          }
710        }
711    
712      private static FileSystem getFileSystem( JobConf config, Path path )
713        {
714        try
715          {
716          return path.getFileSystem( config );
717          }
718        catch( IOException exception )
719          {
720          throw new FlowException( "unable to get handle to underlying filesystem", exception );
721          }
722        }
723    
724      private static LocalFileSystem getLocalFS( JobConf config )
725        {
726        try
727          {
728          return FileSystem.getLocal( config );
729          }
730        catch( IOException exception )
731          {
732          throw new FlowException( "unable to get handle to underlying filesystem", exception );
733          }
734        }
735    
736      private static FileSystem getDefaultFS( JobConf config )
737        {
738        try
739          {
740          return FileSystem.get( config );
741          }
742        catch( IOException exception )
743          {
744          throw new FlowException( "unable to get handle to underlying filesystem", exception );
745          }
746        }
747    
748      public static boolean isLocal( JobConf conf )
749        {
750        // hadoop 1.0 and 2.0 use different properties to define local mode: we check the new YARN
751        // property first
752        String frameworkName = conf.get( "mapreduce.framework.name" );
753    
754        // we are running on hadoop 2.0 (YARN)
755        if( frameworkName != null )
756          return frameworkName.equals( "local" );
757    
758        // hadoop 1.0: use the old property to determine the local mode
759        return conf.get( "mapred.job.tracker" ).equals( "local" );
760        }
761    
762      public static void setLocal( JobConf conf )
763        {
764        // set both properties to local
765        conf.set( "mapred.job.tracker", "local" );
766        conf.set( "mapreduce.framework.name", "local" );
767        }
768      }