001/*
002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow.hadoop.util;
022
023import java.io.IOException;
024import java.io.UnsupportedEncodingException;
025import java.lang.reflect.Constructor;
026import java.lang.reflect.Field;
027import java.lang.reflect.InvocationTargetException;
028import java.net.URI;
029import java.net.URL;
030import java.util.Collection;
031import java.util.Collections;
032import java.util.HashMap;
033import java.util.HashSet;
034import java.util.Iterator;
035import java.util.List;
036import java.util.Map;
037import java.util.Properties;
038import java.util.Set;
039import java.util.jar.Attributes;
040import java.util.jar.Manifest;
041
042import cascading.CascadingException;
043import cascading.flow.FlowException;
044import cascading.flow.planner.BaseFlowStep;
045import cascading.flow.planner.PlatformInfo;
046import cascading.flow.planner.Scope;
047import cascading.pipe.Group;
048import cascading.scheme.hadoop.TextLine;
049import cascading.tap.hadoop.Hfs;
050import cascading.tuple.Fields;
051import cascading.util.LogUtil;
052import cascading.util.Util;
053import org.apache.commons.codec.binary.Base64;
054import org.apache.hadoop.conf.Configurable;
055import org.apache.hadoop.conf.Configuration;
056import org.apache.hadoop.fs.FileStatus;
057import org.apache.hadoop.fs.FileSystem;
058import org.apache.hadoop.fs.LocalFileSystem;
059import org.apache.hadoop.fs.Path;
060import org.apache.hadoop.mapred.JobConf;
061import org.apache.hadoop.util.StringUtils;
062import org.slf4j.Logger;
063import org.slf4j.LoggerFactory;
064
065import static cascading.util.Util.invokeInstanceMethod;
066
067/**
068 *
069 */
070public class HadoopUtil
071  {
072  public static final String CASCADING_FLOW_EXECUTING = "cascading.flow.executing";
073
074  private static final Logger LOG = LoggerFactory.getLogger( HadoopUtil.class );
075  private static final String ENCODING = "US-ASCII";
076  private static final Class<?> DEFAULT_OBJECT_SERIALIZER = JavaObjectSerializer.class;
077
078  private static PlatformInfo platformInfo;
079
080  public static void setIsInflow( Configuration conf )
081    {
082    conf.setBoolean( CASCADING_FLOW_EXECUTING, true );
083    }
084
085  public static boolean isInflow( Configuration conf )
086    {
087    return conf.getBoolean( CASCADING_FLOW_EXECUTING, false );
088    }
089
090  public static void initLog4j( JobConf configuration )
091    {
092    initLog4j( (Configuration) configuration );
093    }
094
095  public static void initLog4j( Configuration configuration )
096    {
097    String values = configuration.get( "log4j.logger", null );
098
099    if( values == null || values.length() == 0 )
100      return;
101
102    if( !Util.hasClass( "org.apache.log4j.Logger" ) )
103      {
104      LOG.info( "org.apache.log4j.Logger is not in the current CLASSPATH, not setting log4j.logger properties" );
105      return;
106      }
107
108    String[] elements = values.split( "," );
109
110    for( String element : elements )
111      LogUtil.setLog4jLevel( element.split( "=" ) );
112    }
113
114  // only place JobConf should ever be returned
115  public static JobConf asJobConfInstance( Configuration configuration )
116    {
117    if( configuration instanceof JobConf )
118      return (JobConf) configuration;
119
120    return new JobConf( configuration );
121    }
122
123  public static <C> C copyJobConf( C parentJobConf )
124    {
125    return copyConfiguration( parentJobConf );
126    }
127
128  public static JobConf copyJobConf( JobConf parentJobConf )
129    {
130    if( parentJobConf == null )
131      throw new IllegalArgumentException( "parent may not be null" );
132
133    // see https://github.com/Cascading/cascading/pull/21
134    // The JobConf(JobConf) constructor causes derived JobConfs to share Credentials. We want to avoid this, in
135    // case those Credentials are mutated later on down the road (which they will be, during job submission, in
136    // separate threads!). Using the JobConf(Configuration) constructor avoids Credentials-sharing.
137    final Configuration configurationCopy = new Configuration( parentJobConf );
138    final JobConf jobConf = new JobConf( configurationCopy );
139
140    jobConf.getCredentials().addAll( parentJobConf.getCredentials() );
141
142    return jobConf;
143    }
144
145  public static JobConf createJobConf( Map<Object, Object> properties, JobConf defaultJobconf )
146    {
147    JobConf jobConf = defaultJobconf == null ? new JobConf() : copyJobConf( defaultJobconf );
148
149    if( properties == null )
150      return jobConf;
151
152    return copyConfiguration( properties, jobConf );
153    }
154
155  public static <C> C copyConfiguration( C parent )
156    {
157    if( parent == null )
158      throw new IllegalArgumentException( "parent may not be null" );
159
160    if( !( parent instanceof Configuration ) )
161      throw new IllegalArgumentException( "parent must be of type Configuration" );
162
163    Configuration conf = (Configuration) parent;
164
165    // see https://github.com/Cascading/cascading/pull/21
166    // The JobConf(JobConf) constructor causes derived JobConfs to share Credentials. We want to avoid this, in
167    // case those Credentials are mutated later on down the road (which they will be, during job submission, in
168    // separate threads!). Using the JobConf(Configuration) constructor avoids Credentials-sharing.
169    Configuration configurationCopy = new Configuration( conf );
170
171    Configuration copiedConf = callCopyConstructor( parent.getClass(), configurationCopy );
172
173    if( Util.hasInstanceMethod( parent, "getCredentials", null ) )
174      {
175      Object result = invokeInstanceMethod( parent, "getCredentials", null, null );
176      Object credentials = invokeInstanceMethod( copiedConf, "getCredentials", null, null );
177
178      invokeInstanceMethod( credentials, "addAll", new Object[]{result}, new Class[]{credentials.getClass()} );
179      }
180
181    return (C) copiedConf;
182    }
183
184  protected static <C extends Configuration> C callCopyConstructor( Class type, Configuration parent )
185    {
186    try
187      {
188      Constructor<C> constructor = type.getConstructor( parent.getClass() );
189
190      return constructor.newInstance( parent );
191      }
192    catch( NoSuchMethodException | InvocationTargetException | InstantiationException | IllegalAccessException exception )
193      {
194      throw new CascadingException( "unable to create copy of: " + type );
195      }
196    }
197
198  public static <C extends Configuration> C copyConfiguration( Map<Object, Object> srcProperties, C dstConfiguration )
199    {
200    Set<Object> keys = new HashSet<Object>( srcProperties.keySet() );
201
202    // keys will only be grabbed if both key/value are String, so keep orig keys
203    if( srcProperties instanceof Properties )
204      keys.addAll( ( (Properties) srcProperties ).stringPropertyNames() );
205
206    for( Object key : keys )
207      {
208      Object value = srcProperties.get( key );
209
210      if( value == null && srcProperties instanceof Properties && key instanceof String )
211        value = ( (Properties) srcProperties ).getProperty( (String) key );
212
213      if( value == null ) // don't stuff null values
214        continue;
215
216      // don't let these objects pass, even though toString is called below.
217      if( value instanceof Class || value instanceof JobConf )
218        continue;
219
220      dstConfiguration.set( key.toString(), value.toString() );
221      }
222
223    return dstConfiguration;
224    }
225
226  public static Map<Object, Object> createProperties( Configuration jobConf )
227    {
228    Map<Object, Object> properties = new HashMap<Object, Object>();
229
230    if( jobConf == null )
231      return properties;
232
233    for( Map.Entry<String, String> entry : jobConf )
234      properties.put( entry.getKey(), entry.getValue() );
235
236    return properties;
237    }
238
239  public static Thread getHDFSShutdownHook()
240    {
241    Exception caughtException;
242
243    try
244      {
245      // we must init the FS so the finalizer is registered
246      FileSystem.getLocal( new JobConf() );
247
248      Field field = FileSystem.class.getDeclaredField( "clientFinalizer" );
249      field.setAccessible( true );
250
251      Thread finalizer = (Thread) field.get( null );
252
253      if( finalizer != null )
254        Runtime.getRuntime().removeShutdownHook( finalizer );
255
256      return finalizer;
257      }
258    catch( NoSuchFieldException exception )
259      {
260      caughtException = exception;
261      }
262    catch( IllegalAccessException exception )
263      {
264      caughtException = exception;
265      }
266    catch( IOException exception )
267      {
268      caughtException = exception;
269      }
270
271    LOG.debug( "unable to find and remove client hdfs shutdown hook, received exception: {}", caughtException.getClass().getName() );
272
273    return null;
274    }
275
276  public static String encodeBytes( byte[] bytes )
277    {
278    try
279      {
280      return new String( Base64.encodeBase64( bytes ), ENCODING );
281      }
282    catch( UnsupportedEncodingException exception )
283      {
284      throw new RuntimeException( exception );
285      }
286    }
287
288  public static byte[] decodeBytes( String string )
289    {
290    try
291      {
292      byte[] bytes = string.getBytes( ENCODING );
293      return Base64.decodeBase64( bytes );
294      }
295    catch( UnsupportedEncodingException exception )
296      {
297      throw new RuntimeException( exception );
298      }
299    }
300
301  public static <T> ObjectSerializer instantiateSerializer( Configuration conf, Class<T> type ) throws ClassNotFoundException
302    {
303    Class<ObjectSerializer> flowSerializerClass;
304
305    String serializerClassName = conf.get( ObjectSerializer.OBJECT_SERIALIZER_PROPERTY );
306
307    if( serializerClassName == null || serializerClassName.length() == 0 )
308      flowSerializerClass = (Class<ObjectSerializer>) DEFAULT_OBJECT_SERIALIZER;
309    else
310      flowSerializerClass = (Class<ObjectSerializer>) Class.forName( serializerClassName );
311
312    ObjectSerializer objectSerializer;
313
314    try
315      {
316      objectSerializer = flowSerializerClass.newInstance();
317
318      if( objectSerializer instanceof Configurable )
319        ( (Configurable) objectSerializer ).setConf( conf );
320      }
321    catch( Exception exception )
322      {
323      exception.printStackTrace();
324      throw new IllegalArgumentException( "Unable to instantiate serializer \""
325        + flowSerializerClass.getName()
326        + "\" for class: "
327        + type.getName() );
328      }
329
330    if( !objectSerializer.accepts( type ) )
331      throw new IllegalArgumentException( serializerClassName + " won't accept objects of class " + type.toString() );
332
333    return objectSerializer;
334    }
335
336  public static <T> String serializeBase64( T object, Configuration conf ) throws IOException
337    {
338    return serializeBase64( object, conf, true );
339    }
340
341  public static <T> String serializeBase64( T object, Configuration conf, boolean compress ) throws IOException
342    {
343    ObjectSerializer objectSerializer;
344
345    try
346      {
347      objectSerializer = instantiateSerializer( conf, object.getClass() );
348      }
349    catch( ClassNotFoundException exception )
350      {
351      throw new IOException( exception );
352      }
353
354    return encodeBytes( objectSerializer.serialize( object, compress ) );
355    }
356
357  /**
358   * This method deserializes the Base64 encoded String into an Object instance.
359   *
360   * @param string
361   * @return an Object
362   */
363  public static <T> T deserializeBase64( String string, Configuration conf, Class<T> type ) throws IOException
364    {
365    return deserializeBase64( string, conf, type, true );
366    }
367
368  public static <T> T deserializeBase64( String string, Configuration conf, Class<T> type, boolean decompress ) throws IOException
369    {
370    if( string == null || string.length() == 0 )
371      return null;
372
373    ObjectSerializer objectSerializer;
374
375    try
376      {
377      objectSerializer = instantiateSerializer( conf, type );
378      }
379    catch( ClassNotFoundException exception )
380      {
381      throw new IOException( exception );
382      }
383
384    return objectSerializer.deserialize( decodeBytes( string ), type, decompress );
385    }
386
387  public static Class findMainClass( Class defaultType )
388    {
389    return Util.findMainClass( defaultType, "org.apache.hadoop" );
390    }
391
392  public static Map<String, String> getConfig( Configuration defaultConf, Configuration updatedConf )
393    {
394    Map<String, String> configs = new HashMap<String, String>();
395
396    for( Map.Entry<String, String> entry : updatedConf )
397      configs.put( entry.getKey(), entry.getValue() );
398
399    for( Map.Entry<String, String> entry : defaultConf )
400      {
401      if( entry.getValue() == null )
402        continue;
403
404      String updatedValue = configs.get( entry.getKey() );
405
406      // if both null, lets purge from map to save space
407      if( updatedValue == null && entry.getValue() == null )
408        configs.remove( entry.getKey() );
409
410      // if the values are the same, lets also purge from map to save space
411      if( updatedValue != null && updatedValue.equals( entry.getValue() ) )
412        configs.remove( entry.getKey() );
413
414      configs.remove( "mapred.working.dir" );
415      configs.remove( "mapreduce.job.working.dir" ); // hadoop2
416      }
417
418    return configs;
419    }
420
421  public static JobConf[] getJobConfs( Configuration job, List<Map<String, String>> configs )
422    {
423    JobConf[] jobConfs = new JobConf[ configs.size() ];
424
425    for( int i = 0; i < jobConfs.length; i++ )
426      jobConfs[ i ] = (JobConf) mergeConf( job, configs.get( i ), false );
427
428    return jobConfs;
429    }
430
431  public static <J extends Configuration> J mergeConf( J job, Map<String, String> config, boolean directly )
432    {
433    Configuration currentConf = directly ? job : ( job instanceof JobConf ? copyJobConf( (JobConf) job ) : new Configuration( job ) );
434
435    for( String key : config.keySet() )
436      {
437      LOG.debug( "merging key: {} value: {}", key, config.get( key ) );
438
439      currentConf.set( key, config.get( key ) );
440      }
441
442    return (J) currentConf;
443    }
444
445  public static Configuration removePropertiesFrom( Configuration jobConf, String... keys )
446    {
447    Map<Object, Object> properties = createProperties( jobConf );
448
449    for( String key : keys )
450      properties.remove( key );
451
452    return copyConfiguration( properties, new JobConf() );
453    }
454
455  public static boolean removeStateFromDistCache( Configuration conf, String path ) throws IOException
456    {
457    return new Hfs( new TextLine(), path ).deleteResource( conf );
458    }
459
460  public static PlatformInfo getPlatformInfo()
461    {
462    if( platformInfo == null )
463      platformInfo = getPlatformInfoInternal( JobConf.class, "org/apache/hadoop", "Hadoop" );
464
465    return platformInfo;
466    }
467
468  public static PlatformInfo getPlatformInfo( Class type, String attributePath, String platformName )
469    {
470    if( platformInfo == null )
471      platformInfo = getPlatformInfoInternal( type, attributePath, platformName );
472
473    return platformInfo;
474    }
475
476  public static PlatformInfo createPlatformInfo( Class type, String attributePath, String platformName )
477    {
478    return getPlatformInfoInternal( type, attributePath, platformName );
479    }
480
481  private static PlatformInfo getPlatformInfoInternal( Class type, String attributePath, String platformName )
482    {
483    URL url = type.getResource( type.getSimpleName() + ".class" );
484
485    if( url == null || !url.toString().startsWith( "jar" ) )
486      return new PlatformInfo( platformName, null, null );
487
488    String path = url.toString();
489    path = path.substring( 0, path.lastIndexOf( "!" ) + 1 );
490
491    String manifestPath = path + "/META-INF/MANIFEST.MF";
492    String parsedVersion = Util.findVersion( path.substring( 0, path.length() - 1 ) );
493
494    Manifest manifest;
495
496    try
497      {
498      manifest = new Manifest( new URL( manifestPath ).openStream() );
499      }
500    catch( IOException exception )
501      {
502      LOG.warn( "unable to get manifest from {}: {}", manifestPath, exception.getMessage() );
503
504      return new PlatformInfo( platformName, null, parsedVersion );
505      }
506
507    Attributes attributes = manifest.getAttributes( attributePath );
508
509    if( attributes == null )
510      attributes = manifest.getMainAttributes();
511
512    if( attributes == null )
513      {
514      LOG.debug( "unable to get platform manifest attributes" );
515      return new PlatformInfo( platformName, null, parsedVersion );
516      }
517
518    String vendor = attributes.getValue( "Implementation-Vendor" );
519    String version = attributes.getValue( "Implementation-Version" );
520
521    if( Util.isEmpty( version ) )
522      version = parsedVersion;
523
524    return new PlatformInfo( platformName, vendor, version );
525    }
526
527  /**
528   * Copies paths from one local path to a remote path. If syncTimes is true, both modification and access time are
529   * changed to match the local 'from' path.
530   * <p/>
531   * Returns a map of file-name to remote modification times if the remote time is different than the local time.
532   *
533   * @param config
534   * @param commonPaths
535   * @param syncTimes
536   */
537  public static Map<String, Long> syncPaths( Configuration config, Map<Path, Path> commonPaths, boolean syncTimes )
538    {
539    if( commonPaths == null )
540      return Collections.emptyMap();
541
542    Map<String, Long> timestampMap = new HashMap<>();
543
544    Map<Path, Path> copyPaths = getCopyPaths( config, commonPaths ); // tests remote file existence or if stale
545
546    LocalFileSystem localFS = getLocalFS( config );
547    FileSystem remoteFS = getDefaultFS( config );
548
549    for( Map.Entry<Path, Path> entry : copyPaths.entrySet() )
550      {
551      Path localPath = entry.getKey();
552      Path remotePath = entry.getValue();
553
554      try
555        {
556        LOG.info( "copying from: {}, to: {}", localPath, remotePath );
557        remoteFS.copyFromLocalFile( localPath, remotePath );
558
559        if( !syncTimes )
560          {
561          timestampMap.put( remotePath.getName(), remoteFS.getFileStatus( remotePath ).getModificationTime() );
562          continue;
563          }
564        }
565      catch( IOException exception )
566        {
567        throw new FlowException( "unable to copy local: " + localPath + " to remote: " + remotePath, exception );
568        }
569
570      FileStatus localFileStatus = null;
571
572      try
573        {
574        // sync the modified times so we can lazily upload jars to hdfs after job is started
575        // otherwise modified time will be local to hdfs
576        localFileStatus = localFS.getFileStatus( localPath );
577        remoteFS.setTimes( remotePath, localFileStatus.getModificationTime(), -1 ); // don't set the access time
578        }
579      catch( IOException exception )
580        {
581        LOG.info( "unable to set local modification time on remote file: {}, 'dfs.namenode.accesstime.precision' may be set to 0 on HDFS.", remotePath );
582
583        if( localFileStatus != null )
584          timestampMap.put( remotePath.getName(), localFileStatus.getModificationTime() );
585        }
586      }
587
588    return timestampMap;
589    }
590
591  public static Map<Path, Path> getCommonPaths( Map<String, Path> localPaths, Map<String, Path> remotePaths )
592    {
593    Map<Path, Path> commonPaths = new HashMap<Path, Path>();
594
595    for( Map.Entry<String, Path> entry : localPaths.entrySet() )
596      {
597      if( remotePaths.containsKey( entry.getKey() ) )
598        commonPaths.put( entry.getValue(), remotePaths.get( entry.getKey() ) );
599      }
600
601    return commonPaths;
602    }
603
604  private static Map<Path, Path> getCopyPaths( Configuration config, Map<Path, Path> commonPaths )
605    {
606    Map<Path, Path> copyPaths = new HashMap<Path, Path>();
607
608    FileSystem remoteFS = getDefaultFS( config );
609    FileSystem localFS = getLocalFS( config );
610
611    for( Map.Entry<Path, Path> entry : commonPaths.entrySet() )
612      {
613      Path localPath = entry.getKey();
614      Path remotePath = entry.getValue();
615
616      try
617        {
618        boolean localExists = localFS.exists( localPath );
619        boolean remoteExist = remoteFS.exists( remotePath );
620
621        if( localExists && !remoteExist )
622          {
623          copyPaths.put( localPath, remotePath );
624          }
625        else if( localExists )
626          {
627          long localModTime = localFS.getFileStatus( localPath ).getModificationTime();
628          long remoteModTime = remoteFS.getFileStatus( remotePath ).getModificationTime();
629
630          if( localModTime > remoteModTime )
631            copyPaths.put( localPath, remotePath );
632          }
633        }
634      catch( IOException exception )
635        {
636        throw new FlowException( "unable to get handle to underlying filesystem", exception );
637        }
638      }
639
640    return copyPaths;
641    }
642
643  public static void resolvePaths( Configuration config, Collection<String> classpath, String remoteRoot, String resourceSubPath, Map<String, Path> localPaths, Map<String, Path> remotePaths )
644    {
645    FileSystem defaultFS = getDefaultFS( config );
646    FileSystem localFS = getLocalFS( config );
647
648    Path remoteRootPath = new Path( remoteRoot == null ? "./.staging" : remoteRoot );
649
650    if( resourceSubPath != null )
651      remoteRootPath = new Path( remoteRootPath, resourceSubPath );
652
653    remoteRootPath = defaultFS.makeQualified( remoteRootPath );
654
655    boolean defaultIsLocal = defaultFS.equals( localFS );
656
657    for( String stringPath : classpath )
658      {
659      Path path = new Path( stringPath );
660
661      URI uri = path.toUri();
662
663      if( uri.getScheme() == null && !defaultIsLocal ) // we want to sync
664        {
665        Path localPath = localFS.makeQualified( path );
666
667        if( !exists( localFS, localPath ) )
668          throw new FlowException( "path not found: " + localPath );
669
670        String name = localPath.getName();
671
672        if( resourceSubPath != null )
673          name = resourceSubPath + "/" + name;
674
675        localPaths.put( name, localPath );
676        remotePaths.put( name, defaultFS.makeQualified( new Path( remoteRootPath, path.getName() ) ) );
677        }
678      else if( localFS.equals( getFileSystem( config, path ) ) )
679        {
680        if( !exists( localFS, path ) )
681          throw new FlowException( "path not found: " + path );
682
683        Path localPath = localFS.makeQualified( path );
684
685        String name = localPath.getName();
686
687        if( resourceSubPath != null )
688          name = resourceSubPath + "/" + name;
689
690        localPaths.put( name, localPath );
691        }
692      else
693        {
694        if( !exists( defaultFS, path ) )
695          throw new FlowException( "path not found: " + path );
696
697        Path defaultPath = defaultFS.makeQualified( path );
698
699        String name = defaultPath.getName();
700
701        if( resourceSubPath != null )
702          name = resourceSubPath + "/" + name;
703
704        remotePaths.put( name, defaultPath );
705        }
706      }
707    }
708
709  private static boolean exists( FileSystem fileSystem, Path path )
710    {
711    try
712      {
713      return fileSystem.exists( path );
714      }
715    catch( IOException exception )
716      {
717      throw new FlowException( "could not test file exists: " + path );
718      }
719    }
720
721  private static FileSystem getFileSystem( Configuration config, Path path )
722    {
723    try
724      {
725      return path.getFileSystem( config );
726      }
727    catch( IOException exception )
728      {
729      throw new FlowException( "unable to get handle to underlying filesystem", exception );
730      }
731    }
732
733  public static LocalFileSystem getLocalFS( Configuration config )
734    {
735    try
736      {
737      return FileSystem.getLocal( config );
738      }
739    catch( IOException exception )
740      {
741      throw new FlowException( "unable to get handle to underlying filesystem", exception );
742      }
743    }
744
745  public static FileSystem getDefaultFS( Configuration config )
746    {
747    try
748      {
749      return FileSystem.get( config );
750      }
751    catch( IOException exception )
752      {
753      throw new FlowException( "unable to get handle to underlying filesystem", exception );
754      }
755    }
756
757  public static boolean isLocal( Configuration conf )
758    {
759    // hadoop 1.0 and 2.0 use different properties to define local mode: we check the new YARN
760    // property first
761    String frameworkName = conf.get( "mapreduce.framework.name" );
762
763    // we are running on hadoop 2.0 (YARN)
764    if( frameworkName != null )
765      return frameworkName.equals( "local" );
766
767    // for Tez
768    String tezLocal = conf.get( "tez.local.mode" );
769
770    if( tezLocal != null )
771      return tezLocal.equals( "true" );
772
773    // hadoop 1.0: use the old property to determine the local mode
774    String hadoop1 = conf.get( "mapred.job.tracker" );
775
776    if( hadoop1 == null )
777      {
778      LOG.warn( "could not successfully test if Hadoop based platform is in standalone/local mode, no valid properties set, returning false - tests for: mapreduce.framework.name, tez.local.mode, and mapred.job.tracker" );
779      return false;
780      }
781
782    return hadoop1.equals( "local" );
783    }
784
785  public static boolean isYARN( Configuration conf )
786    {
787    return conf.get( "mapreduce.framework.name" ) != null;
788    }
789
790  public static void setLocal( Configuration conf )
791    {
792    // set both properties to local
793    conf.set( "mapred.job.tracker", "local" );
794
795    // yarn
796    conf.set( "mapreduce.framework.name", "local" );
797
798    // tez
799    conf.set( "tez.local.mode", "true" );
800    conf.set( "tez.runtime.optimize.local.fetch", "true" );
801    }
802
803  private static boolean interfaceAssignableFromClassName( Class<?> xface, String className )
804    {
805    if( ( className == null ) || ( xface == null ) )
806      return false;
807
808    try
809      {
810      Class<?> klass = Class.forName( className );
811      if( klass == null )
812        return false;
813
814      if( !xface.isAssignableFrom( klass ) )
815        return false;
816
817      return true;
818      }
819    catch( ClassNotFoundException cnfe )
820      {
821      return false; // let downstream figure it out
822      }
823    }
824
825  public static boolean setNewApi( Configuration conf, String className )
826    {
827    if( className == null ) // silently return and let the error be caught downstream
828      return false;
829
830    boolean isStable = className.startsWith( "org.apache.hadoop.mapred." )
831      || interfaceAssignableFromClassName( org.apache.hadoop.mapred.InputFormat.class, className );
832
833    boolean isNew = className.startsWith( "org.apache.hadoop.mapreduce." )
834      || interfaceAssignableFromClassName( org.apache.hadoop.mapreduce.InputFormat.class, className );
835
836    if( isStable )
837      conf.setBoolean( "mapred.mapper.new-api", false );
838    else if( isNew )
839      conf.setBoolean( "mapred.mapper.new-api", true );
840    else
841      throw new IllegalStateException( "cannot determine if class denotes stable or new api, please set 'mapred.mapper.new-api' to the appropriate value" );
842
843    return true;
844    }
845
846  public static void addInputPath( Configuration conf, Path path )
847    {
848    Path workingDirectory = getWorkingDirectory( conf );
849    path = new Path( workingDirectory, path );
850    String dirStr = StringUtils.escapeString( path.toString() );
851    String dirs = conf.get( "mapred.input.dir" );
852    conf.set( "mapred.input.dir", dirs == null ? dirStr :
853      dirs + StringUtils.COMMA_STR + dirStr );
854    }
855
856  public static void setOutputPath( Configuration conf, Path path )
857    {
858    Path workingDirectory = getWorkingDirectory( conf );
859    path = new Path( workingDirectory, path );
860    conf.set( "mapred.output.dir", path.toString() );
861    }
862
863  private static Path getWorkingDirectory( Configuration conf )
864    {
865    String name = conf.get( "mapred.working.dir" );
866    if( name != null )
867      {
868      return new Path( name );
869      }
870    else
871      {
872      try
873        {
874        Path dir = FileSystem.get( conf ).getWorkingDirectory();
875        conf.set( "mapred.working.dir", dir.toString() );
876        return dir;
877        }
878      catch( IOException e )
879        {
880        throw new RuntimeException( e );
881        }
882      }
883    }
884
885  public static Path getOutputPath( Configuration conf )
886    {
887    String name = conf.get( "mapred.output.dir" );
888    return name == null ? null : new Path( name );
889    }
890
891  public static String pack( Object object, Configuration conf )
892    {
893    if( object == null )
894      return "";
895
896    try
897      {
898      return serializeBase64( object, conf, true );
899      }
900    catch( IOException exception )
901      {
902      throw new FlowException( "unable to pack object: " + object.getClass().getCanonicalName(), exception );
903      }
904    }
905
906  public static void addFields( Configuration conf, String property, Map<Integer, Fields> fields )
907    {
908    if( fields == null || fields.isEmpty() )
909      return;
910
911    Map<String, Fields> toPack = new HashMap<>();
912
913    for( Map.Entry<Integer, Fields> entry : fields.entrySet() )
914      toPack.put( entry.getKey().toString(), entry.getValue() );
915
916    conf.set( property, pack( toPack, conf ) );
917    }
918
919  public static Map<Integer, Fields> getFields( Configuration conf, String property ) throws IOException
920    {
921    String value = conf.getRaw( property );
922
923    if( value == null || value.isEmpty() )
924      return Collections.emptyMap();
925
926    Map<String, Fields> map = deserializeBase64( value, conf, Map.class, true );
927    Map<Integer, Fields> result = new HashMap<>();
928
929    for( Map.Entry<String, Fields> entry : map.entrySet() )
930      result.put( Integer.parseInt( entry.getKey() ), entry.getValue() );
931
932    return result;
933    }
934
935  public static void addComparators( Configuration conf, String property, Map<String, Fields> map, BaseFlowStep flowStep, Group group )
936    {
937    Iterator<Fields> fieldsIterator = map.values().iterator();
938
939    if( !fieldsIterator.hasNext() )
940      return;
941
942    Fields fields = fieldsIterator.next();
943
944    if( fields.hasComparators() )
945      {
946      conf.set( property, pack( fields, conf ) );
947      return;
948      }
949
950    // use resolved fields if there are no comparators.
951    Set<Scope> previousScopes = flowStep.getPreviousScopes( group );
952
953    fields = previousScopes.iterator().next().getOutValuesFields();
954
955    if( fields.size() != 0 ) // allows fields.UNKNOWN to be used
956      conf.setInt( property + ".size", fields.size() );
957    }
958
959  public static void addComparators( Configuration conf, String property, Map<String, Fields> map, Fields resolvedFields )
960    {
961    Iterator<Fields> fieldsIterator = map.values().iterator();
962
963    if( !fieldsIterator.hasNext() )
964      return;
965
966    while( fieldsIterator.hasNext() )
967      {
968      Fields fields = fieldsIterator.next();
969
970      if( fields.hasComparators() )
971        {
972        conf.set( property, pack( fields, conf ) );
973        return;
974        }
975      }
976
977    if( resolvedFields.size() != 0 ) // allows fields.UNKNOWN to be used
978      conf.setInt( property + ".size", resolvedFields.size() );
979    }
980  }