001/*
002 * Copyright (c) 2016 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
004 *
005 * Project and contact information: http://www.cascading.org/
006 *
007 * This file is part of the Cascading project.
008 *
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *     http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 */
021
022package cascading.flow.tez;
023
024import java.io.File;
025import java.io.IOException;
026import java.util.ArrayList;
027import java.util.Collection;
028import java.util.HashMap;
029import java.util.HashSet;
030import java.util.Iterator;
031import java.util.LinkedList;
032import java.util.List;
033import java.util.Map;
034import java.util.Set;
035
036import cascading.CascadingException;
037import cascading.flow.FlowElement;
038import cascading.flow.FlowElements;
039import cascading.flow.FlowException;
040import cascading.flow.FlowNode;
041import cascading.flow.FlowProcess;
042import cascading.flow.FlowRuntimeProps;
043import cascading.flow.hadoop.ConfigurationSetter;
044import cascading.flow.hadoop.util.HadoopUtil;
045import cascading.flow.planner.BaseFlowStep;
046import cascading.flow.planner.FlowStepJob;
047import cascading.flow.planner.graph.ElementGraph;
048import cascading.flow.planner.process.FlowNodeGraph;
049import cascading.flow.planner.process.ProcessEdge;
050import cascading.flow.stream.annotations.StreamMode;
051import cascading.flow.tez.planner.Hadoop2TezFlowStepJob;
052import cascading.flow.tez.util.TezUtil;
053import cascading.management.state.ClientState;
054import cascading.pipe.Boundary;
055import cascading.pipe.CoGroup;
056import cascading.pipe.Group;
057import cascading.pipe.GroupBy;
058import cascading.pipe.Merge;
059import cascading.pipe.Splice;
060import cascading.property.AppProps;
061import cascading.tap.CompositeTaps;
062import cascading.tap.Tap;
063import cascading.tap.hadoop.Hfs;
064import cascading.tap.hadoop.PartitionTap;
065import cascading.tap.hadoop.util.Hadoop18TapUtil;
066import cascading.tuple.Fields;
067import cascading.tuple.hadoop.TupleSerialization;
068import cascading.tuple.hadoop.util.GroupingSortingComparator;
069import cascading.tuple.hadoop.util.ReverseGroupingSortingComparator;
070import cascading.tuple.hadoop.util.ReverseTupleComparator;
071import cascading.tuple.hadoop.util.TupleComparator;
072import cascading.tuple.io.KeyTuple;
073import cascading.tuple.io.TuplePair;
074import cascading.tuple.io.ValueTuple;
075import cascading.tuple.tez.util.GroupingSortingPartitioner;
076import cascading.tuple.tez.util.TuplePartitioner;
077import cascading.util.Util;
078import cascading.util.Version;
079import org.apache.hadoop.conf.Configuration;
080import org.apache.hadoop.fs.FileSystem;
081import org.apache.hadoop.fs.Path;
082import org.apache.hadoop.mapred.JobConf;
083import org.apache.hadoop.mapreduce.JobContext;
084import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
085import org.apache.hadoop.yarn.api.records.LocalResource;
086import org.apache.hadoop.yarn.api.records.LocalResourceType;
087import org.apache.tez.common.TezUtils;
088import org.apache.tez.dag.api.DAG;
089import org.apache.tez.dag.api.DataSinkDescriptor;
090import org.apache.tez.dag.api.DataSourceDescriptor;
091import org.apache.tez.dag.api.Edge;
092import org.apache.tez.dag.api.EdgeProperty;
093import org.apache.tez.dag.api.GroupInputEdge;
094import org.apache.tez.dag.api.InputDescriptor;
095import org.apache.tez.dag.api.OutputDescriptor;
096import org.apache.tez.dag.api.ProcessorDescriptor;
097import org.apache.tez.dag.api.TezConfiguration;
098import org.apache.tez.dag.api.UserPayload;
099import org.apache.tez.dag.api.Vertex;
100import org.apache.tez.dag.api.VertexGroup;
101import org.apache.tez.mapreduce.input.MRInput;
102import org.apache.tez.mapreduce.output.MROutput;
103import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
104import org.apache.tez.runtime.library.input.ConcatenatedMergedKeyValueInput;
105import org.apache.tez.runtime.library.input.OrderedGroupedKVInput;
106import org.apache.tez.runtime.library.input.OrderedGroupedMergedKVInput;
107import org.apache.tez.runtime.library.input.UnorderedKVInput;
108import org.apache.tez.runtime.library.output.OrderedPartitionedKVOutput;
109import org.apache.tez.runtime.library.output.UnorderedKVOutput;
110import org.apache.tez.runtime.library.output.UnorderedPartitionedKVOutput;
111import org.slf4j.Logger;
112import org.slf4j.LoggerFactory;
113
114import static cascading.flow.hadoop.util.HadoopUtil.*;
115import static cascading.flow.tez.util.TezUtil.addToClassPath;
116import static cascading.tap.hadoop.DistCacheTap.CASCADING_LOCAL_RESOURCES;
117import static cascading.tap.hadoop.DistCacheTap.CASCADING_REMOTE_RESOURCES;
118import static cascading.util.Util.getFirst;
119import static java.util.Collections.singletonList;
120import static org.apache.hadoop.yarn.api.records.LocalResourceType.ARCHIVE;
121import static org.apache.hadoop.yarn.api.records.LocalResourceType.FILE;
122
123/**
124 *
125 */
126public class Hadoop2TezFlowStep extends BaseFlowStep<TezConfiguration>
127  {
128  private static final Logger LOG = LoggerFactory.getLogger( Hadoop2TezFlowStep.class );
129
130  private Map<String, LocalResource> allLocalResources = new HashMap<>();
131  private Map<Path, Path> syncPaths = new HashMap<>();
132  private Map<String, String> environment = new HashMap<>();
133
134  public Hadoop2TezFlowStep( ElementGraph elementGraph, FlowNodeGraph flowNodeGraph )
135    {
136    super( elementGraph, flowNodeGraph );
137    }
138
139  @Override
140  public Map<Object, Object> getConfigAsProperties()
141    {
142    return HadoopUtil.createProperties( getConfig() );
143    }
144
145  @Override
146  public TezConfiguration createInitializedConfig( FlowProcess<TezConfiguration> flowProcess, TezConfiguration parentConfig )
147    {
148    TezConfiguration stepConf = parentConfig == null ? new TezConfiguration() : new TezConfiguration( parentConfig );
149
150    TupleSerialization.setSerializations( stepConf );
151
152    String versionString = Version.getRelease();
153
154    if( versionString != null )
155      stepConf.set( "cascading.version", versionString );
156
157    stepConf.set( CASCADING_FLOW_STEP_ID, getID() );
158    stepConf.set( "cascading.flow.step.num", Integer.toString( getOrdinal() ) );
159
160    String flowStagingPath = ( (Hadoop2TezFlow) getFlow() ).getFlowStagingPath();
161    List<String> classPath = ( (Hadoop2TezFlow) getFlow() ).getClassPath();
162
163    // is updated in addToClassPath method
164    Map<String, LocalResource> dagResources = new HashMap<>();
165
166    if( !classPath.isEmpty() )
167      {
168      // jars in the root will be in the remote CLASSPATH, no need to add to the environment
169      Map<Path, Path> dagClassPath = addToClassPath( stepConf, flowStagingPath, null, classPath, FILE, dagResources, null );
170
171      syncPaths.putAll( dagClassPath );
172      }
173
174    String appJarPath = stepConf.get( AppProps.APP_JAR_PATH );
175
176    if( appJarPath != null )
177      {
178      // the PATTERN represents the insides of the app jar, those elements must be added to the remote CLASSPATH
179      List<String> classpath = singletonList( appJarPath );
180      Map<Path, Path> pathMap = addToClassPath( stepConf, flowStagingPath, null, classpath, ARCHIVE, dagResources, environment );
181
182      syncPaths.putAll( pathMap );
183
184      // AM does not support environments like containers do, so the classpath has to be passed via configuration.
185      String fileName = new File( appJarPath ).getName();
186      stepConf.set( TezConfiguration.TEZ_CLUSTER_ADDITIONAL_CLASSPATH_PREFIX,
187        "$PWD/" + fileName + "/:$PWD/" + fileName + "/classes/:$PWD/" + fileName + "/lib/*:" );
188      }
189
190    allLocalResources.putAll( dagResources );
191
192    initFromStepConfigDef( stepConf );
193
194    return stepConf;
195    }
196
197  @Override
198  protected FlowStepJob createFlowStepJob( ClientState clientState, FlowProcess<TezConfiguration> flowProcess, TezConfiguration initializedStepConfig )
199    {
200    DAG dag = createDAG( flowProcess, initializedStepConfig );
201
202    return new Hadoop2TezFlowStepJob( clientState, this, initializedStepConfig, dag );
203    }
204
205  private DAG createDAG( FlowProcess<TezConfiguration> flowProcess, TezConfiguration initializedConfig )
206    {
207    FlowNodeGraph nodeGraph = getFlowNodeGraph();
208    Map<FlowNode, Vertex> vertexMap = new HashMap<>();
209    DAG dag = DAG.create( getStepDisplayName( initializedConfig.getInt( "cascading.display.id.truncate", Util.ID_LENGTH ) ) );
210
211    dag.addTaskLocalFiles( allLocalResources );
212
213    Iterator<FlowNode> iterator = nodeGraph.getOrderedTopologicalIterator(); // ordering of nodes for consistent remote debugging
214
215    while( iterator.hasNext() )
216      {
217      FlowNode flowNode = iterator.next();
218
219      Vertex vertex = createVertex( flowProcess, initializedConfig, flowNode );
220      dag.addVertex( vertex );
221
222      vertexMap.put( flowNode, vertex );
223      }
224
225    LinkedList<ProcessEdge> processedEdges = new LinkedList<>();
226
227    for( ProcessEdge processEdge : nodeGraph.edgeSet() )
228      {
229      if( processedEdges.contains( processEdge ) )
230        continue;
231
232      FlowNode edgeTargetFlowNode = nodeGraph.getEdgeTarget( processEdge );
233
234      FlowElement flowElement = processEdge.getFlowElement();
235      List<FlowNode> sourceNodes = nodeGraph.getElementSourceProcesses( flowElement );
236
237      EdgeProperty edgeProperty = createEdgeProperty( initializedConfig, processEdge );
238
239      Vertex targetVertex = vertexMap.get( edgeTargetFlowNode );
240
241      if( sourceNodes.size() == 1 || flowElement instanceof CoGroup || flowElement instanceof Boundary ) // todo: create group vertices around incoming ordinal
242        {
243        FlowNode edgeSourceFlowNode = nodeGraph.getEdgeSource( processEdge );
244        Vertex sourceVertex = vertexMap.get( edgeSourceFlowNode );
245
246        LOG.debug( "adding edge between: {} and {}", sourceVertex, targetVertex );
247
248        dag.addEdge( Edge.create( sourceVertex, targetVertex, edgeProperty ) );
249        }
250      else if( flowElement instanceof GroupBy || flowElement instanceof Merge ) // merge - source nodes > 1
251        {
252        List<String> sourceVerticesIDs = new ArrayList<>();
253        List<Vertex> sourceVertices = new ArrayList<>();
254
255        for( FlowNode edgeSourceFlowNode : sourceNodes )
256          {
257          sourceVerticesIDs.add( edgeSourceFlowNode.getID() );
258          sourceVertices.add( vertexMap.get( edgeSourceFlowNode ) );
259          processedEdges.add( nodeGraph.getEdge( edgeSourceFlowNode, edgeTargetFlowNode ) );
260          }
261
262        VertexGroup vertexGroup = dag.createVertexGroup( edgeTargetFlowNode.getID(), sourceVertices.toArray( new Vertex[ sourceVertices.size() ] ) );
263
264        String inputClassName = flowElement instanceof Group ? OrderedGroupedMergedKVInput.class.getName() : ConcatenatedMergedKeyValueInput.class.getName();
265
266        InputDescriptor inputDescriptor = InputDescriptor.create( inputClassName ).setUserPayload( edgeProperty.getEdgeDestination().getUserPayload() );
267
268        String type = ( (Splice) flowElement ).isMerge() ? "merged" : "grouped";
269        LOG.info( "adding {} edge between: {} and {}", type, Util.join( sourceVerticesIDs, "," ), targetVertex.getName() );
270        dag.addEdge( GroupInputEdge.create( vertexGroup, targetVertex, edgeProperty, inputDescriptor ) );
271        }
272      else
273        {
274        throw new UnsupportedOperationException( "can't make edge for: " + flowElement );
275        }
276      }
277
278    return dag;
279    }
280
281  private EdgeProperty createEdgeProperty( TezConfiguration config, ProcessEdge processEdge )
282    {
283    FlowElement flowElement = processEdge.getFlowElement();
284
285    EdgeValues edgeValues = new EdgeValues( new TezConfiguration( config ), processEdge );
286
287    edgeValues.keyClassName = KeyTuple.class.getName(); // TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_CLASS
288    edgeValues.valueClassName = ValueTuple.class.getName(); // TEZ_RUNTIME_INTERMEDIATE_OUTPUT_VALUE_CLASS
289    edgeValues.keyComparatorClassName = TupleComparator.class.getName();
290    edgeValues.keyPartitionerClassName = TuplePartitioner.class.getName();
291    edgeValues.outputClassName = null;
292    edgeValues.inputClassName = null;
293    edgeValues.movementType = null;
294    edgeValues.sourceType = null;
295    edgeValues.schedulingType = null;
296
297    if( flowElement instanceof Group )
298      applyGroup( edgeValues );
299    else if( ( flowElement instanceof Boundary || flowElement instanceof Merge ) && processEdge.getSinkAnnotations().contains( StreamMode.Accumulated ) )
300      applyBoundaryMergeAccumulated( edgeValues );
301    else if( flowElement instanceof Boundary || flowElement instanceof Merge )
302      applyBoundaryMerge( edgeValues );
303    else
304      throw new IllegalStateException( "unsupported flow element: " + flowElement.getClass().getCanonicalName() );
305
306    applyEdgeAnnotations( processEdge, edgeValues );
307
308    return createEdgeProperty( edgeValues );
309    }
310
311  private void applyEdgeAnnotations( ProcessEdge processEdge, EdgeValues edgeValues )
312    {
313    processEdge.addEdgeAnnotation( edgeValues.movementType );
314    processEdge.addEdgeAnnotation( edgeValues.sourceType );
315    processEdge.addEdgeAnnotation( edgeValues.schedulingType );
316    }
317
318  private EdgeValues applyBoundaryMerge( EdgeValues edgeValues )
319    {
320    // todo: support for one to one
321    edgeValues.outputClassName = UnorderedPartitionedKVOutput.class.getName();
322    edgeValues.inputClassName = UnorderedKVInput.class.getName();
323
324    edgeValues.movementType = EdgeProperty.DataMovementType.SCATTER_GATHER;
325    edgeValues.sourceType = EdgeProperty.DataSourceType.PERSISTED;
326    edgeValues.schedulingType = EdgeProperty.SchedulingType.SEQUENTIAL;
327
328    return edgeValues;
329    }
330
331  private EdgeValues applyBoundaryMergeAccumulated( EdgeValues edgeValues )
332    {
333    edgeValues.outputClassName = UnorderedKVOutput.class.getName();
334    edgeValues.inputClassName = UnorderedKVInput.class.getName();
335
336    edgeValues.movementType = EdgeProperty.DataMovementType.BROADCAST;
337    edgeValues.sourceType = EdgeProperty.DataSourceType.PERSISTED;
338    edgeValues.schedulingType = EdgeProperty.SchedulingType.SEQUENTIAL;
339
340    return edgeValues;
341    }
342
343  private EdgeValues applyGroup( EdgeValues edgeValues )
344    {
345    Group group = (Group) edgeValues.flowElement;
346
347    if( group.isSortReversed() )
348      edgeValues.keyComparatorClassName = ReverseTupleComparator.class.getName();
349
350    int ordinal = getFirst( edgeValues.ordinals );
351
352    addComparators( edgeValues.config, "cascading.group.comparator", group.getKeySelectors(), edgeValues.getResolvedKeyFieldsMap().get( ordinal ) );
353
354    if( !group.isGroupBy() )
355      {
356      edgeValues.outputClassName = OrderedPartitionedKVOutput.class.getName();
357      edgeValues.inputClassName = OrderedGroupedKVInput.class.getName();
358
359      edgeValues.movementType = EdgeProperty.DataMovementType.SCATTER_GATHER;
360      edgeValues.sourceType = EdgeProperty.DataSourceType.PERSISTED;
361      edgeValues.schedulingType = EdgeProperty.SchedulingType.SEQUENTIAL;
362      }
363    else
364      {
365      addComparators( edgeValues.config, "cascading.sort.comparator", group.getSortingSelectors(), edgeValues.getResolvedSortFieldsMap().get( ordinal ) );
366
367      edgeValues.outputClassName = OrderedPartitionedKVOutput.class.getName();
368      edgeValues.inputClassName = OrderedGroupedKVInput.class.getName();
369
370      edgeValues.movementType = EdgeProperty.DataMovementType.SCATTER_GATHER;
371      edgeValues.sourceType = EdgeProperty.DataSourceType.PERSISTED;
372      edgeValues.schedulingType = EdgeProperty.SchedulingType.SEQUENTIAL;
373      }
374
375    if( group.isSorted() )
376      {
377      edgeValues.keyClassName = TuplePair.class.getName();
378      edgeValues.keyPartitionerClassName = GroupingSortingPartitioner.class.getName();
379
380      if( group.isSortReversed() )
381        edgeValues.keyComparatorClassName = ReverseGroupingSortingComparator.class.getName();
382      else
383        edgeValues.keyComparatorClassName = GroupingSortingComparator.class.getName();
384      }
385
386    return edgeValues;
387    }
388
389  private EdgeProperty createEdgeProperty( EdgeValues edgeValues )
390    {
391    TezConfiguration outputConfig = new TezConfiguration( edgeValues.getConfig() );
392    outputConfig.set( "cascading.node.sink", FlowElements.id( edgeValues.getFlowElement() ) );
393    outputConfig.set( "cascading.node.ordinals", Util.join( edgeValues.getOrdinals(), "," ) );
394    addFields( outputConfig, "cascading.node.key.fields", edgeValues.getResolvedKeyFieldsMap() );
395    addFields( outputConfig, "cascading.node.sort.fields", edgeValues.getResolvedSortFieldsMap() );
396    addFields( outputConfig, "cascading.node.value.fields", edgeValues.getResolvedValueFieldsMap() );
397
398    UserPayload outputPayload = createIntermediatePayloadOutput( outputConfig, edgeValues );
399
400    TezConfiguration inputConfig = new TezConfiguration( edgeValues.getConfig() );
401    inputConfig.set( "cascading.node.source", FlowElements.id( edgeValues.getFlowElement() ) );
402    inputConfig.set( "cascading.node.ordinals", Util.join( edgeValues.getOrdinals(), "," ) );
403    addFields( inputConfig, "cascading.node.key.fields", edgeValues.getResolvedKeyFieldsMap() );
404    addFields( inputConfig, "cascading.node.sort.fields", edgeValues.getResolvedSortFieldsMap() );
405    addFields( inputConfig, "cascading.node.value.fields", edgeValues.getResolvedValueFieldsMap() );
406
407    UserPayload inputPayload = createIntermediatePayloadInput( inputConfig, edgeValues );
408
409    return EdgeProperty.create(
410      edgeValues.getMovementType(),
411      edgeValues.getSourceType(),
412      edgeValues.getSchedulingType(),
413      OutputDescriptor.create( edgeValues.getOutputClassName() ).setUserPayload( outputPayload ),
414      InputDescriptor.create( edgeValues.getInputClassName() ).setUserPayload( inputPayload )
415    );
416    }
417
418  private UserPayload createIntermediatePayloadOutput( TezConfiguration config, EdgeValues edgeValues )
419    {
420    config.set( TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, edgeValues.keyClassName );
421    config.set( TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, edgeValues.valueClassName );
422    config.set( TezRuntimeConfiguration.TEZ_RUNTIME_KEY_COMPARATOR_CLASS, edgeValues.keyComparatorClassName );
423    config.set( TezRuntimeConfiguration.TEZ_RUNTIME_PARTITIONER_CLASS, edgeValues.keyPartitionerClassName );
424
425    setWorkingDirectory( config );
426
427    return getPayload( config );
428    }
429
430  private UserPayload createIntermediatePayloadInput( TezConfiguration config, EdgeValues edgeValues )
431    {
432    config.set( TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, edgeValues.keyClassName );
433    config.set( TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, edgeValues.valueClassName );
434    config.set( TezRuntimeConfiguration.TEZ_RUNTIME_KEY_COMPARATOR_CLASS, edgeValues.keyComparatorClassName );
435    config.set( TezRuntimeConfiguration.TEZ_RUNTIME_PARTITIONER_CLASS, edgeValues.keyPartitionerClassName );
436
437    setWorkingDirectory( config );
438
439    return getPayload( config );
440    }
441
442  private static void setWorkingDirectory( Configuration conf )
443    {
444    String name = conf.get( JobContext.WORKING_DIR );
445
446    if( name != null )
447      return;
448
449    try
450      {
451      Path dir = FileSystem.get( conf ).getWorkingDirectory();
452      conf.set( JobContext.WORKING_DIR, dir.toString() );
453      }
454    catch( IOException exception )
455      {
456      throw new RuntimeException( exception );
457      }
458    }
459
460  public Vertex createVertex( FlowProcess<TezConfiguration> flowProcess, TezConfiguration initializedConfig, FlowNode flowNode )
461    {
462    JobConf conf = new JobConf( initializedConfig );
463
464    addInputOutputMapping( conf, flowNode );
465
466    conf.setBoolean( "mapred.used.genericoptionsparser", true );
467
468    Map<String, LocalResource> taskLocalResources = new HashMap<>();
469
470    Map<FlowElement, Configuration> sourceConfigs = initFromSources( flowNode, flowProcess, conf, taskLocalResources );
471    Map<FlowElement, Configuration> sinkConfigs = initFromSinks( flowNode, flowProcess, conf );
472
473    initFromTraps( flowNode, flowProcess, conf );
474
475    initFromNodeConfigDef( flowNode, conf );
476
477    // force step to local mode if any tap is local
478    setLocalMode( initializedConfig, conf, null );
479
480    conf.set( "cascading.flow.node.num", Integer.toString( flowNode.getOrdinal() ) );
481
482    HadoopUtil.setIsInflow( conf ); // must be called after all taps configurations have been retrieved
483
484    int parallelism = getParallelism( flowNode, conf );
485
486    if( parallelism == 0 )
487      throw new FlowException( getName(), "the default number of gather partitions must be set, see cascading.flow.FlowRuntimeProps" );
488
489    flowNode.addProcessAnnotation( FlowRuntimeProps.GATHER_PARTITIONS, Integer.toString( parallelism ) );
490
491    Vertex vertex = newVertex( flowNode, conf, parallelism );
492
493    if( !taskLocalResources.isEmpty() )
494      vertex.addTaskLocalFiles( taskLocalResources );
495
496    for( FlowElement flowElement : sourceConfigs.keySet() )
497      {
498      if( !( flowElement instanceof Tap ) )
499        continue;
500
501      Configuration sourceConf = sourceConfigs.get( flowElement );
502
503      // not setting the new-api value could result in failures if not set by the Scheme
504      if( sourceConf.get( "mapred.mapper.new-api" ) == null )
505        HadoopUtil.setNewApi( sourceConf, sourceConf.get( "mapred.input.format.class", sourceConf.get( "mapreduce.job.inputformat.class" ) ) );
506
507      // unfortunately we cannot just load the input format and set it on the builder with also pulling all other
508      // values out of the configuration.
509      MRInput.MRInputConfigBuilder configBuilder = MRInput.createConfigBuilder( sourceConf, null );
510
511      // the default in Tez is true, this overrides
512      if( conf.get( FlowRuntimeProps.COMBINE_SPLITS ) != null )
513        configBuilder.groupSplits( conf.getBoolean( FlowRuntimeProps.COMBINE_SPLITS, true ) );
514
515      // grouping splits loses file name info, breaking partition tap default impl
516      if( !CompositeTaps.unwindNarrow( PartitionTap.class, (Tap) flowElement ).isEmpty() ) // todo: generify
517        configBuilder.groupSplits( false );
518
519      DataSourceDescriptor dataSourceDescriptor = configBuilder.build();
520
521      vertex.addDataSource( FlowElements.id( flowElement ), dataSourceDescriptor );
522      }
523
524    for( FlowElement flowElement : sinkConfigs.keySet() )
525      {
526      if( !( flowElement instanceof Tap ) )
527        continue;
528
529      Configuration sinkConf = sinkConfigs.get( flowElement );
530
531      Class outputFormatClass;
532      String outputPath;
533
534      // we have to set sane defaults if not set by the tap
535      // typically the case of MultiSinkTap
536      String formatClassName = sinkConf.get( "mapred.output.format.class", sinkConf.get( "mapreduce.job.outputformat.class" ) );
537
538      if( formatClassName == null )
539        {
540        outputFormatClass = TextOutputFormat.class; // unused, use "new" api, its the default
541        outputPath = Hfs.getTempPath( sinkConf ).toString(); // unused
542        }
543      else
544        {
545        outputFormatClass = Util.loadClass( formatClassName );
546        outputPath = getOutputPath( sinkConf );
547        }
548
549      if( outputPath == null && getOutputPath( sinkConf ) == null && isFileOutputFormat( outputFormatClass ) )
550        outputPath = Hfs.getTempPath( sinkConf ).toString(); // unused
551
552      MROutput.MROutputConfigBuilder configBuilder = MROutput.createConfigBuilder( sinkConf, outputFormatClass, outputPath );
553
554      DataSinkDescriptor dataSinkDescriptor = configBuilder.build();
555
556      vertex.addDataSink( FlowElements.id( flowElement ), dataSinkDescriptor );
557      }
558
559    addRemoteDebug( flowNode, vertex );
560    addRemoteProfiling( flowNode, vertex );
561
562    if( vertex.getTaskLaunchCmdOpts() != null )
563      flowNode.addProcessAnnotation( TezConfiguration.TEZ_TASK_LAUNCH_CMD_OPTS, vertex.getTaskLaunchCmdOpts() );
564
565    return vertex;
566    }
567
568  protected String getOutputPath( Configuration sinkConf )
569    {
570    return sinkConf.get( "mapred.output.dir", sinkConf.get( "mapreduce.output.fileoutputformat.outputdir" ) );
571    }
572
573  protected boolean isFileOutputFormat( Class outputFormatClass )
574    {
575    return org.apache.hadoop.mapred.FileOutputFormat.class.isAssignableFrom( outputFormatClass ) ||
576      org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.class.isAssignableFrom( outputFormatClass );
577    }
578
579  protected int getParallelism( FlowNode flowNode, JobConf conf )
580    {
581    // only count streamed taps, accumulated taps are always annotated
582    HashSet<Tap> sourceStreamedTaps = new HashSet<>( flowNode.getSourceTaps() );
583
584    sourceStreamedTaps.removeAll( flowNode.getSourceElements( StreamMode.Accumulated ) );
585
586    if( sourceStreamedTaps.size() != 0 )
587      return -1;
588
589    int parallelism = Integer.MAX_VALUE;
590
591    for( Tap tap : flowNode.getSinkTaps() )
592      {
593      int numSinkParts = tap.getScheme().getNumSinkParts();
594
595      if( numSinkParts == 0 )
596        continue;
597
598      if( parallelism != Integer.MAX_VALUE )
599        LOG.info( "multiple sink taps in flow node declaring numSinkParts, choosing lowest value. see cascading.flow.FlowRuntimeProps for broader control." );
600
601      parallelism = Math.min( parallelism, numSinkParts );
602      }
603
604    if( parallelism != Integer.MAX_VALUE )
605      return parallelism;
606
607    return conf.getInt( FlowRuntimeProps.GATHER_PARTITIONS, 0 );
608    }
609
610  private void addInputOutputMapping( JobConf conf, FlowNode flowNode )
611    {
612    FlowNodeGraph flowNodeGraph = getFlowNodeGraph();
613    Set<ProcessEdge> incomingEdges = flowNodeGraph.incomingEdgesOf( flowNode );
614
615    for( ProcessEdge processEdge : incomingEdges )
616      conf.set( "cascading.node.source." + processEdge.getFlowElementID(), processEdge.getSourceProcessID() );
617
618    Set<ProcessEdge> outgoingEdges = flowNodeGraph.outgoingEdgesOf( flowNode );
619
620    for( ProcessEdge processEdge : outgoingEdges )
621      conf.set( "cascading.node.sink." + processEdge.getFlowElementID(), processEdge.getSinkProcessID() );
622    }
623
624  protected Map<FlowElement, Configuration> initFromSources( FlowNode flowNode, FlowProcess<TezConfiguration> flowProcess,
625                                                             Configuration conf, Map<String, LocalResource> taskLocalResources )
626    {
627    Set<? extends FlowElement> accumulatedSources = flowNode.getSourceElements( StreamMode.Accumulated );
628
629    for( FlowElement element : accumulatedSources )
630      {
631      if( element instanceof Tap )
632        {
633        JobConf current = new JobConf( conf );
634        Tap tap = (Tap) element;
635
636        if( tap.getIdentifier() == null )
637          throw new IllegalStateException( "tap may not have null identifier: " + tap.toString() );
638
639        tap.sourceConfInit( flowProcess, current );
640
641        Collection<String> paths = current.getStringCollection( CASCADING_LOCAL_RESOURCES + Tap.id( tap ) );
642
643        if( !paths.isEmpty() )
644          {
645          String flowStagingPath = ( (Hadoop2TezFlow) getFlow() ).getFlowStagingPath();
646          String resourceSubPath = Tap.id( tap );
647          Map<Path, Path> pathMap = TezUtil.addToClassPath( current, flowStagingPath, resourceSubPath, paths, LocalResourceType.FILE, taskLocalResources, null );
648
649          current.setStrings( CASCADING_REMOTE_RESOURCES + Tap.id( tap ), taskLocalResources.keySet().toArray( new String[ taskLocalResources.size() ] ) );
650
651          allLocalResources.putAll( taskLocalResources );
652          syncPaths.putAll( pathMap );
653          }
654
655        Map<String, String> map = flowProcess.diffConfigIntoMap( new TezConfiguration( conf ), new TezConfiguration( current ) );
656        conf.set( "cascading.node.accumulated.source.conf." + Tap.id( tap ), pack( map, conf ) );
657
658        setLocalMode( conf, current, tap );
659        }
660      }
661
662    Set<FlowElement> sources = new HashSet<>( flowNode.getSourceElements() );
663
664    sources.removeAll( accumulatedSources );
665
666    if( sources.isEmpty() )
667      throw new IllegalStateException( "all sources marked as accumulated" );
668
669    Map<FlowElement, Configuration> configs = new HashMap<>();
670
671    for( FlowElement element : sources )
672      {
673      JobConf current = new JobConf( conf );
674
675      String id = FlowElements.id( element );
676
677      current.set( "cascading.node.source", id );
678
679      if( element instanceof Tap )
680        {
681        Tap tap = (Tap) element;
682
683        if( tap.getIdentifier() == null )
684          throw new IllegalStateException( "tap may not have null identifier: " + tap.toString() );
685
686        tap.sourceConfInit( flowProcess, current );
687
688        setLocalMode( conf, current, tap );
689        }
690
691      configs.put( element, current );
692      }
693
694    return configs;
695    }
696
697  protected Map<FlowElement, Configuration> initFromSinks( FlowNode flowNode, FlowProcess<? extends Configuration> flowProcess, Configuration conf )
698    {
699    Set<FlowElement> sinks = flowNode.getSinkElements();
700    Map<FlowElement, Configuration> configs = new HashMap<>();
701
702    for( FlowElement element : sinks )
703      {
704      JobConf current = new JobConf( conf );
705
706      if( element instanceof Tap )
707        {
708        Tap tap = (Tap) element;
709
710        if( tap.getIdentifier() == null )
711          throw new IllegalStateException( "tap may not have null identifier: " + element.toString() );
712
713        tap.sinkConfInit( flowProcess, current );
714
715        setLocalMode( conf, current, tap );
716        }
717
718      String id = FlowElements.id( element );
719
720      current.set( "cascading.node.sink", id );
721
722      configs.put( element, current );
723      }
724
725    return configs;
726    }
727
728  private void initFromNodeConfigDef( FlowNode flowNode, Configuration conf )
729    {
730    initConfFromNodeConfigDef( flowNode.getElementGraph(), new ConfigurationSetter( conf ) );
731    }
732
733  private void initFromStepConfigDef( Configuration conf )
734    {
735    initConfFromStepConfigDef( new ConfigurationSetter( conf ) );
736    }
737
738  protected void initFromTraps( FlowNode flowNode, FlowProcess<? extends Configuration> flowProcess, Configuration conf )
739    {
740    Map<String, Tap> traps = flowNode.getTrapMap();
741
742    if( !traps.isEmpty() )
743      {
744      JobConf trapConf = new JobConf( conf );
745
746      for( Tap tap : traps.values() )
747        {
748        tap.sinkConfInit( flowProcess, trapConf );
749        setLocalMode( conf, trapConf, tap );
750        }
751      }
752    }
753
754  private Vertex newVertex( FlowNode flowNode, Configuration conf, int parallelism )
755    {
756    conf.set( FlowNode.CASCADING_FLOW_NODE, pack( flowNode, conf ) ); // todo: pack into payload directly
757
758    ProcessorDescriptor descriptor = ProcessorDescriptor.create( FlowProcessor.class.getName() );
759
760    descriptor.setUserPayload( getPayload( conf ) );
761
762    Vertex vertex = Vertex.create( flowNode.getID(), descriptor, parallelism );
763
764    if( environment != null )
765      vertex.setTaskEnvironment( environment );
766
767    return vertex;
768    }
769
770  private UserPayload getPayload( Configuration conf )
771    {
772    try
773      {
774      return TezUtils.createUserPayloadFromConf( conf );
775      }
776    catch( IOException exception )
777      {
778      throw new CascadingException( exception );
779      }
780    }
781
782  private String pack( Object object, Configuration conf )
783    {
784    try
785      {
786      return serializeBase64( object, conf, true );
787      }
788    catch( IOException exception )
789      {
790      throw new FlowException( "unable to pack object: " + object.getClass().getCanonicalName(), exception );
791      }
792    }
793
794  @Override
795  public void clean( TezConfiguration config )
796    {
797    for( Tap sink : getSinkTaps() )
798      {
799      if( sink.isTemporary() && ( getFlow().getFlowStats().isSuccessful() || getFlow().getRunID() == null ) )
800        {
801        try
802          {
803          sink.deleteResource( config );
804          }
805        catch( Exception exception )
806          {
807          // sink all exceptions, don't fail app
808          logWarn( "unable to remove temporary file: " + sink, exception );
809          }
810        }
811      else
812        {
813        cleanTapMetaData( config, sink );
814        }
815      }
816
817    for( Tap tap : getTraps() )
818      cleanTapMetaData( config, tap );
819    }
820
821  private void cleanTapMetaData( TezConfiguration config, Tap tap )
822    {
823    try
824      {
825      Hadoop18TapUtil.cleanupTapMetaData( config, tap );
826      }
827    catch( IOException exception )
828      {
829      // ignore exception
830      }
831    }
832
833  public void syncArtifacts()
834    {
835    // this may not be strictly necessary, but there is a condition where setting the access time
836    // fails, so there may be one were setting the modification time fails. if so, we can compensate.
837    Map<String, Long> timestamps = HadoopUtil.syncPaths( getConfig(), syncPaths, true );
838
839    for( Map.Entry<String, Long> entry : timestamps.entrySet() )
840      {
841      LocalResource localResource = allLocalResources.get( entry.getKey() );
842
843      if( localResource != null )
844        localResource.setTimestamp( entry.getValue() );
845      }
846    }
847
848  private void setLocalMode( Configuration parent, JobConf current, Tap tap )
849    {
850    // force step to local mode
851    if( !HadoopUtil.isLocal( current ) )
852      return;
853
854    if( tap != null )
855      logInfo( "tap forcing step to tez local mode: " + tap.getIdentifier() );
856
857    HadoopUtil.setLocal( parent );
858    }
859
860  private void addRemoteDebug( FlowNode flowNode, Vertex vertex )
861    {
862    String value = System.getProperty( "test.debug.node", null );
863
864    if( Util.isEmpty( value ) )
865      return;
866
867    if( !flowNode.getSourceElementNames().contains( value ) && asInt( value ) != flowNode.getOrdinal() )
868      return;
869
870    LOG.warn( "remote debugging enabled with property: {}, on node: {}, with node id: {}", "test.debug.node", value, flowNode.getID() );
871
872    String opts = vertex.getTaskLaunchCmdOpts();
873
874    if( opts == null )
875      opts = "";
876
877    String address = System.getProperty( "test.debug.address", "localhost:5005" ).trim();
878
879    opts += " -agentlib:jdwp=transport=dt_socket,server=n,address=" + address + ",suspend=y";
880
881    vertex.setTaskLaunchCmdOpts( opts );
882    }
883
884  private void addRemoteProfiling( FlowNode flowNode, Vertex vertex )
885    {
886    String value = System.getProperty( "test.profile.node", null );
887
888    if( Util.isEmpty( value ) )
889      return;
890
891    if( !flowNode.getSourceElementNames().contains( value ) && asInt( value ) != flowNode.getOrdinal() )
892      return;
893
894    LOG.warn( "remote profiling enabled with property: {}, on node: {}, with node id: {}", "test.profile.node", value, flowNode.getID() );
895
896    String opts = vertex.getTaskLaunchCmdOpts();
897
898    if( opts == null )
899      opts = "";
900
901    String path = System.getProperty( "test.profile.path", "/tmp/jfr/" );
902
903    if( !path.endsWith( "/" ) )
904      path += "/";
905
906    LOG.warn( "remote profiling property: {}, logging to path: {}", "test.profile.path", path );
907
908    opts += String.format( " -XX:+UnlockCommercialFeatures -XX:+FlightRecorder -XX:FlightRecorderOptions=defaultrecording=true,dumponexit=true,dumponexitpath=%1$s%2$s,disk=true,repository=%1$s%2$s", path, flowNode.getID() );
909
910    vertex.setTaskLaunchCmdOpts( opts );
911    }
912
913  private int asInt( String value )
914    {
915    try
916      {
917      return Integer.parseInt( value );
918      }
919    catch( NumberFormatException exception )
920      {
921      return -1;
922      }
923    }
924
925  public Map<String, LocalResource> getAllLocalResources()
926    {
927    return allLocalResources;
928    }
929
930  private static class EdgeValues
931    {
932    FlowElement flowElement;
933    TezConfiguration config;
934    Set<Integer> ordinals;
935    String keyClassName;
936    String valueClassName;
937    String keyComparatorClassName;
938    String keyPartitionerClassName;
939    String outputClassName;
940    String inputClassName;
941    EdgeProperty.DataMovementType movementType;
942    EdgeProperty.DataSourceType sourceType;
943    EdgeProperty.SchedulingType schedulingType;
944
945    Map<Integer, Fields> resolvedKeyFieldsMap;
946    Map<Integer, Fields> resolvedSortFieldsMap;
947    Map<Integer, Fields> resolvedValueFieldsMap;
948
949    private EdgeValues( TezConfiguration config, ProcessEdge processEdge )
950      {
951      this.config = config;
952      this.flowElement = processEdge.getFlowElement();
953      this.ordinals = processEdge.getSourceProvidedOrdinals();
954
955      this.resolvedKeyFieldsMap = processEdge.getResolvedKeyFields();
956      this.resolvedSortFieldsMap = processEdge.getResolvedSortFields();
957      this.resolvedValueFieldsMap = processEdge.getResolvedValueFields();
958      }
959
960    public FlowElement getFlowElement()
961      {
962      return flowElement;
963      }
964
965    public TezConfiguration getConfig()
966      {
967      return config;
968      }
969
970    public Set getOrdinals()
971      {
972      return ordinals;
973      }
974
975    public String getKeyClassName()
976      {
977      return keyClassName;
978      }
979
980    public String getValueClassName()
981      {
982      return valueClassName;
983      }
984
985    public String getKeyComparatorClassName()
986      {
987      return keyComparatorClassName;
988      }
989
990    public String getKeyPartitionerClassName()
991      {
992      return keyPartitionerClassName;
993      }
994
995    public String getOutputClassName()
996      {
997      return outputClassName;
998      }
999
1000    public String getInputClassName()
1001      {
1002      return inputClassName;
1003      }
1004
1005    public EdgeProperty.DataMovementType getMovementType()
1006      {
1007      return movementType;
1008      }
1009
1010    public EdgeProperty.DataSourceType getSourceType()
1011      {
1012      return sourceType;
1013      }
1014
1015    public EdgeProperty.SchedulingType getSchedulingType()
1016      {
1017      return schedulingType;
1018      }
1019
1020    public Map<Integer, Fields> getResolvedKeyFieldsMap()
1021      {
1022      return resolvedKeyFieldsMap;
1023      }
1024
1025    public Map<Integer, Fields> getResolvedSortFieldsMap()
1026      {
1027      return resolvedSortFieldsMap;
1028      }
1029
1030    public Map<Integer, Fields> getResolvedValueFieldsMap()
1031      {
1032      return resolvedValueFieldsMap;
1033      }
1034    }
1035  }