001/*
002 * Copyright (c) 2016-2017 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
004 *
005 * Project and contact information: http://www.cascading.org/
006 *
007 * This file is part of the Cascading project.
008 *
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *     http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 */
021
022package cascading.flow.tez.util;
023
024import java.io.File;
025import java.io.IOException;
026import java.util.Collection;
027import java.util.HashMap;
028import java.util.HashSet;
029import java.util.Map;
030import java.util.Properties;
031import java.util.Set;
032
033import cascading.CascadingException;
034import cascading.flow.FlowException;
035import cascading.flow.hadoop.util.HadoopUtil;
036import cascading.tap.type.FileType;
037import cascading.util.Util;
038import org.apache.hadoop.conf.Configuration;
039import org.apache.hadoop.fs.FileStatus;
040import org.apache.hadoop.fs.FileSystem;
041import org.apache.hadoop.fs.LocalFileSystem;
042import org.apache.hadoop.fs.Path;
043import org.apache.hadoop.mapred.JobConf;
044import org.apache.hadoop.mapred.JobContext;
045import org.apache.hadoop.mapred.TaskAttemptID;
046import org.apache.hadoop.security.UserGroupInformation;
047import org.apache.hadoop.yarn.api.records.LocalResource;
048import org.apache.hadoop.yarn.api.records.LocalResourceType;
049import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
050import org.apache.hadoop.yarn.api.records.URL;
051import org.apache.hadoop.yarn.util.ConverterUtils;
052import org.apache.tez.common.TezUtils;
053import org.apache.tez.dag.api.TezConfiguration;
054import org.apache.tez.mapreduce.input.MRInput;
055import org.apache.tez.mapreduce.lib.MRReader;
056import org.apache.tez.mapreduce.output.MROutput;
057import org.apache.tez.runtime.api.AbstractLogicalInput;
058import org.apache.tez.runtime.api.AbstractLogicalOutput;
059import org.apache.tez.runtime.api.LogicalInput;
060import org.apache.tez.runtime.api.LogicalOutput;
061import org.apache.tez.runtime.api.MergedLogicalInput;
062import org.apache.tez.runtime.api.ProcessorContext;
063import org.slf4j.Logger;
064import org.slf4j.LoggerFactory;
065
066import static cascading.flow.hadoop.util.HadoopUtil.getCommonPaths;
067import static org.apache.hadoop.yarn.api.ApplicationConstants.CLASS_PATH_SEPARATOR;
068import static org.apache.hadoop.yarn.api.ApplicationConstants.Environment.CLASSPATH;
069import static org.apache.hadoop.yarn.api.ApplicationConstants.Environment.PWD;
070import static org.apache.tez.common.TezUtils.createConfFromByteString;
071import static org.apache.tez.common.TezUtils.createConfFromUserPayload;
072import static org.apache.tez.mapreduce.hadoop.MRInputHelpers.parseMRInputPayload;
073
074/**
075 *
076 */
077public class TezUtil
078  {
079  private static final Logger LOG = LoggerFactory.getLogger( TezUtil.class );
080
081  /**
082   * Attempting to localize all new JobConf calls
083   *
084   * @param configuration
085   * @return
086   */
087  public static JobConf asJobConf( Configuration configuration )
088    {
089    return new JobConf( configuration );
090    }
091
092  public static TezConfiguration createTezConf( Map<Object, Object> properties, TezConfiguration defaultJobconf )
093    {
094    TezConfiguration jobConf = defaultJobconf == null ? new TezConfiguration() : new TezConfiguration( defaultJobconf );
095
096    if( properties == null )
097      return jobConf;
098
099    Set<Object> keys = new HashSet<Object>( properties.keySet() );
100
101    // keys will only be grabbed if both key/value are String, so keep orig keys
102    if( properties instanceof Properties )
103      keys.addAll( ( (Properties) properties ).stringPropertyNames() );
104
105    for( Object key : keys )
106      {
107      Object value = properties.get( key );
108
109      if( value == null && properties instanceof Properties && key instanceof String )
110        value = ( (Properties) properties ).getProperty( (String) key );
111
112      if( value == null ) // don't stuff null values
113        continue;
114
115      // don't let these objects pass, even though toString is called below.
116      if( value instanceof Class || value instanceof TezConfiguration )
117        continue;
118
119      jobConf.set( key.toString(), value.toString() );
120      }
121
122    return jobConf;
123    }
124
125  public static UserGroupInformation getCurrentUser()
126    {
127    try
128      {
129      return UserGroupInformation.getCurrentUser();
130      }
131    catch( IOException exception )
132      {
133      throw new CascadingException( "unable to get current user", exception );
134      }
135    }
136
137  public static String getEdgeSourceID( LogicalInput input, Configuration configuration )
138    {
139    String id = configuration.get( "cascading.node.source" );
140
141    if( id == null )
142      throw new IllegalStateException( "no source id found: " + input.getClass().getName() );
143
144    return id;
145    }
146
147  public static String getEdgeSinkID( LogicalOutput output, Configuration configuration )
148    {
149    String id = configuration.get( "cascading.node.sink" );
150
151    if( id == null )
152      throw new IllegalStateException( "no sink id found: " + output.getClass().getName() );
153
154    return id;
155    }
156
157  public static Configuration getInputConfiguration( LogicalInput input )
158    {
159    try
160      {
161      if( input instanceof MergedLogicalInput )
162        input = (LogicalInput) Util.getFirst( ( (MergedLogicalInput) input ).getInputs() );
163
164      if( input instanceof MRInput )
165        return createConfFromByteString( parseMRInputPayload( ( (MRInput) input ).getContext().getUserPayload() ).getConfigurationBytes() );
166
167      if( input instanceof AbstractLogicalInput )
168        return createConfFromUserPayload( ( (AbstractLogicalInput) input ).getContext().getUserPayload() );
169      }
170    catch( IOException exception )
171      {
172      throw new FlowException( "unable to unpack payload", exception );
173      }
174
175    throw new IllegalStateException( "unknown input type: " + input.getClass().getName() );
176    }
177
178  public static Configuration getOutputConfiguration( LogicalOutput output )
179    {
180    try
181      {
182      if( output instanceof MROutput )
183        return TezUtils.createConfFromUserPayload( ( (MROutput) output ).getContext().getUserPayload() );
184
185      if( output instanceof AbstractLogicalOutput )
186        return createConfFromUserPayload( ( (AbstractLogicalOutput) output ).getContext().getUserPayload() );
187      }
188    catch( IOException exception )
189      {
190      throw new FlowException( "unable to unpack payload", exception );
191      }
192
193    throw new IllegalStateException( "unknown input type: " + output.getClass().getName() );
194    }
195
196  public static void setSourcePathForSplit( MRInput input, MRReader reader, Configuration configuration )
197    {
198    Path path = null;
199
200    if( Util.returnInstanceFieldIfExistsSafe( input, "useNewApi" ) )
201      {
202      org.apache.hadoop.mapreduce.InputSplit newInputSplit = (org.apache.hadoop.mapreduce.InputSplit) reader.getSplit();
203
204      if( newInputSplit instanceof org.apache.hadoop.mapreduce.lib.input.FileSplit )
205        path = ( (org.apache.hadoop.mapreduce.lib.input.FileSplit) newInputSplit ).getPath();
206      }
207    else
208      {
209      org.apache.hadoop.mapred.InputSplit oldInputSplit = (org.apache.hadoop.mapred.InputSplit) reader.getSplit();
210
211      if( oldInputSplit instanceof org.apache.hadoop.mapred.FileSplit )
212        path = ( (org.apache.hadoop.mapred.FileSplit) oldInputSplit ).getPath();
213      }
214
215    if( path != null )
216      configuration.set( FileType.CASCADING_SOURCE_PATH, path.toString() );
217    }
218
219  public static Map<Path, Path> addToClassPath( Configuration config, String stagingRoot, String resourceSubPath, Collection<String> classpath,
220                                                LocalResourceType resourceType, Map<String, LocalResource> localResources,
221                                                Map<String, String> environment )
222    {
223    if( classpath == null )
224      return null;
225
226    // given to fully qualified
227    Map<String, Path> localPaths = new HashMap<>();
228    Map<String, Path> remotePaths = new HashMap<>();
229
230    HadoopUtil.resolvePaths( config, classpath, stagingRoot, resourceSubPath, localPaths, remotePaths );
231
232    try
233      {
234      LocalFileSystem localFS = HadoopUtil.getLocalFS( config );
235
236      for( String fileName : localPaths.keySet() )
237        {
238        Path artifact = localPaths.get( fileName );
239        Path remotePath = remotePaths.get( fileName );
240
241        if( remotePath == null )
242          remotePath = artifact;
243
244        addResource( localResources, environment, fileName, localFS.getFileStatus( artifact ), remotePath, resourceType );
245        }
246
247      FileSystem defaultFS = HadoopUtil.getDefaultFS( config );
248
249      for( String fileName : remotePaths.keySet() )
250        {
251        Path artifact = remotePaths.get( fileName );
252        Path localPath = localPaths.get( fileName );
253
254        if( localPath != null )
255          continue;
256
257        addResource( localResources, environment, fileName, defaultFS.getFileStatus( artifact ), artifact, resourceType );
258        }
259      }
260    catch( IOException exception )
261      {
262      throw new FlowException( "unable to set remote resource paths", exception );
263      }
264
265    return getCommonPaths( localPaths, remotePaths );
266    }
267
268  protected static void addResource( Map<String, LocalResource> localResources, Map<String, String> environment, String fileName, FileStatus stats, Path fullPath, LocalResourceType type ) throws IOException
269    {
270    if( localResources.containsKey( fileName ) )
271      throw new FlowException( "duplicate filename added to classpath resources: " + fileName );
272
273    URL yarnUrlFromPath = ConverterUtils.getYarnUrlFromPath( fullPath );
274    long len = stats.getLen();
275    long modificationTime = stats.getModificationTime();
276
277    LocalResource resource = LocalResource.newInstance(
278      yarnUrlFromPath,
279      type,
280      LocalResourceVisibility.APPLICATION,
281      len,
282      modificationTime );
283
284    if( type == LocalResourceType.PATTERN )
285      {
286      // todo: parametrize this for dynamic inclusion below
287      String pattern = "(?:classes/|lib/).*";
288
289      resource.setPattern( pattern );
290
291      if( environment != null )
292        {
293        String current = "";
294
295        current += PWD.$$() + File.separator + fileName + File.separator + "*" + CLASS_PATH_SEPARATOR;
296        current += PWD.$$() + File.separator + fileName + File.separator + "lib" + File.separator + "*" + CLASS_PATH_SEPARATOR;
297        current += PWD.$$() + File.separator + fileName + File.separator + "classes" + File.separator + "*" + CLASS_PATH_SEPARATOR;
298
299        String classPath = environment.get( CLASSPATH.name() );
300
301        if( classPath == null )
302          classPath = "";
303        else if( !classPath.startsWith( CLASS_PATH_SEPARATOR ) )
304          classPath += CLASS_PATH_SEPARATOR;
305
306        classPath += current;
307
308        LOG.info( "adding to cluster side classpath: {} ", classPath );
309
310        environment.put( CLASSPATH.name(), classPath );
311        }
312      }
313
314    localResources.put( fileName, resource );
315    }
316
317  public static void setMRProperties( ProcessorContext context, Configuration config, boolean isMapperOutput )
318    {
319    TaskAttemptID taskAttemptId = org.apache.tez.mapreduce.hadoop.mapreduce.TaskAttemptContextImpl
320      .createMockTaskAttemptID( context.getApplicationId().getClusterTimestamp(),
321        context.getTaskVertexIndex(), context.getApplicationId().getId(),
322        context.getTaskIndex(), context.getTaskAttemptNumber(), isMapperOutput );
323
324    config.set( JobContext.TASK_ATTEMPT_ID, taskAttemptId.toString() );
325    config.set( JobContext.TASK_ID, taskAttemptId.getTaskID().toString() );
326    config.setBoolean( JobContext.TASK_ISMAP, isMapperOutput );
327    config.setInt( JobContext.TASK_PARTITION, taskAttemptId.getTaskID().getId() );
328    }
329  }