001/*
002 * Copyright (c) 2016 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
004 *
005 * Project and contact information: http://www.cascading.org/
006 *
007 * This file is part of the Cascading project.
008 *
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *     http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 */
021
022package cascading.flow.tez;
023
024import java.io.File;
025import java.io.IOException;
026import java.util.ArrayList;
027import java.util.Collection;
028import java.util.HashMap;
029import java.util.HashSet;
030import java.util.Iterator;
031import java.util.LinkedList;
032import java.util.List;
033import java.util.Map;
034import java.util.Set;
035
036import cascading.CascadingException;
037import cascading.flow.FlowElement;
038import cascading.flow.FlowElements;
039import cascading.flow.FlowException;
040import cascading.flow.FlowNode;
041import cascading.flow.FlowProcess;
042import cascading.flow.FlowRuntimeProps;
043import cascading.flow.hadoop.ConfigurationSetter;
044import cascading.flow.hadoop.util.HadoopUtil;
045import cascading.flow.planner.BaseFlowStep;
046import cascading.flow.planner.FlowStepJob;
047import cascading.flow.planner.graph.ElementGraph;
048import cascading.flow.planner.process.FlowNodeGraph;
049import cascading.flow.planner.process.ProcessEdge;
050import cascading.flow.stream.annotations.StreamMode;
051import cascading.flow.tez.planner.Hadoop2TezFlowStepJob;
052import cascading.flow.tez.util.TezUtil;
053import cascading.management.state.ClientState;
054import cascading.pipe.Boundary;
055import cascading.pipe.CoGroup;
056import cascading.pipe.Group;
057import cascading.pipe.GroupBy;
058import cascading.pipe.Merge;
059import cascading.pipe.Splice;
060import cascading.property.AppProps;
061import cascading.tap.CompositeTaps;
062import cascading.tap.Tap;
063import cascading.tap.hadoop.Hfs;
064import cascading.tap.hadoop.PartitionTap;
065import cascading.tap.hadoop.util.Hadoop18TapUtil;
066import cascading.tuple.Fields;
067import cascading.tuple.hadoop.TupleSerialization;
068import cascading.tuple.hadoop.util.GroupingSortingComparator;
069import cascading.tuple.hadoop.util.ReverseGroupingSortingComparator;
070import cascading.tuple.hadoop.util.ReverseTupleComparator;
071import cascading.tuple.hadoop.util.TupleComparator;
072import cascading.tuple.io.KeyTuple;
073import cascading.tuple.io.TuplePair;
074import cascading.tuple.io.ValueTuple;
075import cascading.tuple.tez.util.GroupingSortingPartitioner;
076import cascading.tuple.tez.util.TuplePartitioner;
077import cascading.util.Util;
078import cascading.util.Version;
079import org.apache.hadoop.conf.Configuration;
080import org.apache.hadoop.fs.FileSystem;
081import org.apache.hadoop.fs.Path;
082import org.apache.hadoop.io.serializer.Serialization;
083import org.apache.hadoop.mapred.JobConf;
084import org.apache.hadoop.mapreduce.JobContext;
085import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
086import org.apache.hadoop.yarn.api.records.LocalResource;
087import org.apache.hadoop.yarn.api.records.LocalResourceType;
088import org.apache.tez.common.TezUtils;
089import org.apache.tez.dag.api.DAG;
090import org.apache.tez.dag.api.DataSinkDescriptor;
091import org.apache.tez.dag.api.DataSourceDescriptor;
092import org.apache.tez.dag.api.Edge;
093import org.apache.tez.dag.api.EdgeProperty;
094import org.apache.tez.dag.api.GroupInputEdge;
095import org.apache.tez.dag.api.InputDescriptor;
096import org.apache.tez.dag.api.OutputDescriptor;
097import org.apache.tez.dag.api.ProcessorDescriptor;
098import org.apache.tez.dag.api.TezConfiguration;
099import org.apache.tez.dag.api.UserPayload;
100import org.apache.tez.dag.api.Vertex;
101import org.apache.tez.dag.api.VertexGroup;
102import org.apache.tez.mapreduce.input.MRInput;
103import org.apache.tez.mapreduce.output.MROutput;
104import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
105import org.apache.tez.runtime.library.input.ConcatenatedMergedKeyValueInput;
106import org.apache.tez.runtime.library.input.OrderedGroupedKVInput;
107import org.apache.tez.runtime.library.input.OrderedGroupedMergedKVInput;
108import org.apache.tez.runtime.library.input.UnorderedKVInput;
109import org.apache.tez.runtime.library.output.OrderedPartitionedKVOutput;
110import org.apache.tez.runtime.library.output.UnorderedKVOutput;
111import org.apache.tez.runtime.library.output.UnorderedPartitionedKVOutput;
112import org.slf4j.Logger;
113import org.slf4j.LoggerFactory;
114
115import static cascading.flow.hadoop.util.HadoopUtil.*;
116import static cascading.flow.tez.util.TezUtil.addToClassPath;
117import static cascading.tap.hadoop.DistCacheTap.CASCADING_LOCAL_RESOURCES;
118import static cascading.tap.hadoop.DistCacheTap.CASCADING_REMOTE_RESOURCES;
119import static cascading.util.Util.getFirst;
120import static java.util.Collections.singletonList;
121import static org.apache.hadoop.yarn.api.records.LocalResourceType.ARCHIVE;
122import static org.apache.hadoop.yarn.api.records.LocalResourceType.FILE;
123
124/**
125 *
126 */
127public class Hadoop2TezFlowStep extends BaseFlowStep<TezConfiguration>
128  {
129  private static final Logger LOG = LoggerFactory.getLogger( Hadoop2TezFlowStep.class );
130
131  private Map<String, LocalResource> allLocalResources = new HashMap<>();
132  private Map<Path, Path> syncPaths = new HashMap<>();
133  private Map<String, String> environment = new HashMap<>();
134
135  public Hadoop2TezFlowStep( ElementGraph elementGraph, FlowNodeGraph flowNodeGraph )
136    {
137    super( elementGraph, flowNodeGraph );
138    }
139
140  @Override
141  public Map<Object, Object> getConfigAsProperties()
142    {
143    return HadoopUtil.createProperties( getConfig() );
144    }
145
146  @Override
147  public TezConfiguration createInitializedConfig( FlowProcess<TezConfiguration> flowProcess, TezConfiguration parentConfig )
148    {
149    TezConfiguration stepConf = parentConfig == null ? new TezConfiguration() : new TezConfiguration( parentConfig );
150
151    Set<String> serializations = getFieldDeclaredSerializations( Serialization.class );
152
153    TupleSerialization.setSerializations( stepConf, serializations );
154
155    String versionString = Version.getRelease();
156
157    if( versionString != null )
158      stepConf.set( "cascading.version", versionString );
159
160    stepConf.set( CASCADING_FLOW_STEP_ID, getID() );
161    stepConf.set( "cascading.flow.step.num", Integer.toString( getOrdinal() ) );
162
163    String flowStagingPath = ( (Hadoop2TezFlow) getFlow() ).getFlowStagingPath();
164    List<String> classPath = ( (Hadoop2TezFlow) getFlow() ).getClassPath();
165
166    // is updated in addToClassPath method
167    Map<String, LocalResource> dagResources = new HashMap<>();
168
169    if( !classPath.isEmpty() )
170      {
171      // jars in the root will be in the remote CLASSPATH, no need to add to the environment
172      Map<Path, Path> dagClassPath = addToClassPath( stepConf, flowStagingPath, null, classPath, FILE, dagResources, null );
173
174      syncPaths.putAll( dagClassPath );
175      }
176
177    String appJarPath = stepConf.get( AppProps.APP_JAR_PATH );
178
179    if( appJarPath != null )
180      {
181      // the PATTERN represents the insides of the app jar, those elements must be added to the remote CLASSPATH
182      List<String> classpath = singletonList( appJarPath );
183      Map<Path, Path> pathMap = addToClassPath( stepConf, flowStagingPath, null, classpath, ARCHIVE, dagResources, environment );
184
185      syncPaths.putAll( pathMap );
186
187      // AM does not support environments like containers do, so the classpath has to be passed via configuration.
188      String fileName = new File( appJarPath ).getName();
189      stepConf.set( TezConfiguration.TEZ_CLUSTER_ADDITIONAL_CLASSPATH_PREFIX,
190        "$PWD/" + fileName + "/:$PWD/" + fileName + "/classes/:$PWD/" + fileName + "/lib/*:" );
191      }
192
193    allLocalResources.putAll( dagResources );
194
195    initFromStepConfigDef( stepConf );
196
197    return stepConf;
198    }
199
200  @Override
201  protected FlowStepJob createFlowStepJob( ClientState clientState, FlowProcess<TezConfiguration> flowProcess, TezConfiguration initializedStepConfig )
202    {
203    DAG dag = createDAG( flowProcess, initializedStepConfig );
204
205    return new Hadoop2TezFlowStepJob( clientState, this, initializedStepConfig, dag );
206    }
207
208  private DAG createDAG( FlowProcess<TezConfiguration> flowProcess, TezConfiguration initializedConfig )
209    {
210    FlowNodeGraph nodeGraph = getFlowNodeGraph();
211    Map<FlowNode, Vertex> vertexMap = new HashMap<>();
212    DAG dag = DAG.create( getStepDisplayName( initializedConfig.getInt( "cascading.display.id.truncate", Util.ID_LENGTH ) ) );
213
214    dag.addTaskLocalFiles( allLocalResources );
215
216    Iterator<FlowNode> iterator = nodeGraph.getOrderedTopologicalIterator(); // ordering of nodes for consistent remote debugging
217
218    while( iterator.hasNext() )
219      {
220      FlowNode flowNode = iterator.next();
221
222      Vertex vertex = createVertex( flowProcess, initializedConfig, flowNode );
223      dag.addVertex( vertex );
224
225      vertexMap.put( flowNode, vertex );
226      }
227
228    LinkedList<ProcessEdge> processedEdges = new LinkedList<>();
229
230    for( ProcessEdge processEdge : nodeGraph.edgeSet() )
231      {
232      if( processedEdges.contains( processEdge ) )
233        continue;
234
235      FlowNode edgeTargetFlowNode = nodeGraph.getEdgeTarget( processEdge );
236
237      FlowElement flowElement = processEdge.getFlowElement();
238      List<FlowNode> sourceNodes = nodeGraph.getElementSourceProcesses( flowElement );
239
240      EdgeProperty edgeProperty = createEdgeProperty( initializedConfig, processEdge );
241
242      Vertex targetVertex = vertexMap.get( edgeTargetFlowNode );
243
244      if( sourceNodes.size() == 1 || flowElement instanceof CoGroup || flowElement instanceof Boundary ) // todo: create group vertices around incoming ordinal
245        {
246        FlowNode edgeSourceFlowNode = nodeGraph.getEdgeSource( processEdge );
247        Vertex sourceVertex = vertexMap.get( edgeSourceFlowNode );
248
249        LOG.debug( "adding edge between: {} and {}", sourceVertex, targetVertex );
250
251        dag.addEdge( Edge.create( sourceVertex, targetVertex, edgeProperty ) );
252        }
253      else if( flowElement instanceof GroupBy || flowElement instanceof Merge ) // merge - source nodes > 1
254        {
255        List<String> sourceVerticesIDs = new ArrayList<>();
256        List<Vertex> sourceVertices = new ArrayList<>();
257
258        for( FlowNode edgeSourceFlowNode : sourceNodes )
259          {
260          sourceVerticesIDs.add( edgeSourceFlowNode.getID() );
261          sourceVertices.add( vertexMap.get( edgeSourceFlowNode ) );
262          processedEdges.add( nodeGraph.getEdge( edgeSourceFlowNode, edgeTargetFlowNode ) );
263          }
264
265        VertexGroup vertexGroup = dag.createVertexGroup( edgeTargetFlowNode.getID(), sourceVertices.toArray( new Vertex[ sourceVertices.size() ] ) );
266
267        String inputClassName = flowElement instanceof Group ? OrderedGroupedMergedKVInput.class.getName() : ConcatenatedMergedKeyValueInput.class.getName();
268
269        InputDescriptor inputDescriptor = InputDescriptor.create( inputClassName ).setUserPayload( edgeProperty.getEdgeDestination().getUserPayload() );
270
271        String type = ( (Splice) flowElement ).isMerge() ? "merged" : "grouped";
272        LOG.info( "adding {} edge between: {} and {}", type, Util.join( sourceVerticesIDs, "," ), targetVertex.getName() );
273        dag.addEdge( GroupInputEdge.create( vertexGroup, targetVertex, edgeProperty, inputDescriptor ) );
274        }
275      else
276        {
277        throw new UnsupportedOperationException( "can't make edge for: " + flowElement );
278        }
279      }
280
281    return dag;
282    }
283
284  private EdgeProperty createEdgeProperty( TezConfiguration config, ProcessEdge processEdge )
285    {
286    FlowElement flowElement = processEdge.getFlowElement();
287
288    EdgeValues edgeValues = new EdgeValues( new TezConfiguration( config ), processEdge );
289
290    edgeValues.keyClassName = KeyTuple.class.getName(); // TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_CLASS
291    edgeValues.valueClassName = ValueTuple.class.getName(); // TEZ_RUNTIME_INTERMEDIATE_OUTPUT_VALUE_CLASS
292    edgeValues.keyComparatorClassName = TupleComparator.class.getName();
293    edgeValues.keyPartitionerClassName = TuplePartitioner.class.getName();
294    edgeValues.outputClassName = null;
295    edgeValues.inputClassName = null;
296    edgeValues.movementType = null;
297    edgeValues.sourceType = null;
298    edgeValues.schedulingType = null;
299
300    if( flowElement instanceof Group )
301      applyGroup( edgeValues );
302    else if( ( flowElement instanceof Boundary || flowElement instanceof Merge ) && processEdge.getSinkAnnotations().contains( StreamMode.Accumulated ) )
303      applyBoundaryMergeAccumulated( edgeValues );
304    else if( flowElement instanceof Boundary || flowElement instanceof Merge )
305      applyBoundaryMerge( edgeValues );
306    else
307      throw new IllegalStateException( "unsupported flow element: " + flowElement.getClass().getCanonicalName() );
308
309    applyEdgeAnnotations( processEdge, edgeValues );
310
311    return createEdgeProperty( edgeValues );
312    }
313
314  private void applyEdgeAnnotations( ProcessEdge processEdge, EdgeValues edgeValues )
315    {
316    processEdge.addEdgeAnnotation( edgeValues.movementType );
317    processEdge.addEdgeAnnotation( edgeValues.sourceType );
318    processEdge.addEdgeAnnotation( edgeValues.schedulingType );
319    }
320
321  private EdgeValues applyBoundaryMerge( EdgeValues edgeValues )
322    {
323    // todo: support for one to one
324    edgeValues.outputClassName = UnorderedPartitionedKVOutput.class.getName();
325    edgeValues.inputClassName = UnorderedKVInput.class.getName();
326
327    edgeValues.movementType = EdgeProperty.DataMovementType.SCATTER_GATHER;
328    edgeValues.sourceType = EdgeProperty.DataSourceType.PERSISTED;
329    edgeValues.schedulingType = EdgeProperty.SchedulingType.SEQUENTIAL;
330
331    return edgeValues;
332    }
333
334  private EdgeValues applyBoundaryMergeAccumulated( EdgeValues edgeValues )
335    {
336    edgeValues.outputClassName = UnorderedKVOutput.class.getName();
337    edgeValues.inputClassName = UnorderedKVInput.class.getName();
338
339    edgeValues.movementType = EdgeProperty.DataMovementType.BROADCAST;
340    edgeValues.sourceType = EdgeProperty.DataSourceType.PERSISTED;
341    edgeValues.schedulingType = EdgeProperty.SchedulingType.SEQUENTIAL;
342
343    return edgeValues;
344    }
345
346  private EdgeValues applyGroup( EdgeValues edgeValues )
347    {
348    Group group = (Group) edgeValues.flowElement;
349
350    if( group.isSortReversed() )
351      edgeValues.keyComparatorClassName = ReverseTupleComparator.class.getName();
352
353    int ordinal = getFirst( edgeValues.ordinals );
354
355    addComparators( edgeValues.config, "cascading.group.comparator", group.getKeySelectors(), edgeValues.getResolvedKeyFieldsMap().get( ordinal ) );
356
357    if( !group.isGroupBy() )
358      {
359      edgeValues.outputClassName = OrderedPartitionedKVOutput.class.getName();
360      edgeValues.inputClassName = OrderedGroupedKVInput.class.getName();
361
362      edgeValues.movementType = EdgeProperty.DataMovementType.SCATTER_GATHER;
363      edgeValues.sourceType = EdgeProperty.DataSourceType.PERSISTED;
364      edgeValues.schedulingType = EdgeProperty.SchedulingType.SEQUENTIAL;
365      }
366    else
367      {
368      addComparators( edgeValues.config, "cascading.sort.comparator", group.getSortingSelectors(), edgeValues.getResolvedSortFieldsMap().get( ordinal ) );
369
370      edgeValues.outputClassName = OrderedPartitionedKVOutput.class.getName();
371      edgeValues.inputClassName = OrderedGroupedKVInput.class.getName();
372
373      edgeValues.movementType = EdgeProperty.DataMovementType.SCATTER_GATHER;
374      edgeValues.sourceType = EdgeProperty.DataSourceType.PERSISTED;
375      edgeValues.schedulingType = EdgeProperty.SchedulingType.SEQUENTIAL;
376      }
377
378    if( group.isSorted() )
379      {
380      edgeValues.keyClassName = TuplePair.class.getName();
381      edgeValues.keyPartitionerClassName = GroupingSortingPartitioner.class.getName();
382
383      if( group.isSortReversed() )
384        edgeValues.keyComparatorClassName = ReverseGroupingSortingComparator.class.getName();
385      else
386        edgeValues.keyComparatorClassName = GroupingSortingComparator.class.getName();
387      }
388
389    return edgeValues;
390    }
391
392  private EdgeProperty createEdgeProperty( EdgeValues edgeValues )
393    {
394    TezConfiguration outputConfig = new TezConfiguration( edgeValues.getConfig() );
395    outputConfig.set( "cascading.node.sink", FlowElements.id( edgeValues.getFlowElement() ) );
396    outputConfig.set( "cascading.node.ordinals", Util.join( edgeValues.getOrdinals(), "," ) );
397    addFields( outputConfig, "cascading.node.key.fields", edgeValues.getResolvedKeyFieldsMap() );
398    addFields( outputConfig, "cascading.node.sort.fields", edgeValues.getResolvedSortFieldsMap() );
399    addFields( outputConfig, "cascading.node.value.fields", edgeValues.getResolvedValueFieldsMap() );
400
401    UserPayload outputPayload = createIntermediatePayloadOutput( outputConfig, edgeValues );
402
403    TezConfiguration inputConfig = new TezConfiguration( edgeValues.getConfig() );
404    inputConfig.set( "cascading.node.source", FlowElements.id( edgeValues.getFlowElement() ) );
405    inputConfig.set( "cascading.node.ordinals", Util.join( edgeValues.getOrdinals(), "," ) );
406    addFields( inputConfig, "cascading.node.key.fields", edgeValues.getResolvedKeyFieldsMap() );
407    addFields( inputConfig, "cascading.node.sort.fields", edgeValues.getResolvedSortFieldsMap() );
408    addFields( inputConfig, "cascading.node.value.fields", edgeValues.getResolvedValueFieldsMap() );
409
410    UserPayload inputPayload = createIntermediatePayloadInput( inputConfig, edgeValues );
411
412    return EdgeProperty.create(
413      edgeValues.getMovementType(),
414      edgeValues.getSourceType(),
415      edgeValues.getSchedulingType(),
416      OutputDescriptor.create( edgeValues.getOutputClassName() ).setUserPayload( outputPayload ),
417      InputDescriptor.create( edgeValues.getInputClassName() ).setUserPayload( inputPayload )
418    );
419    }
420
421  private UserPayload createIntermediatePayloadOutput( TezConfiguration config, EdgeValues edgeValues )
422    {
423    config.set( TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, edgeValues.keyClassName );
424    config.set( TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, edgeValues.valueClassName );
425    config.set( TezRuntimeConfiguration.TEZ_RUNTIME_KEY_COMPARATOR_CLASS, edgeValues.keyComparatorClassName );
426    config.set( TezRuntimeConfiguration.TEZ_RUNTIME_PARTITIONER_CLASS, edgeValues.keyPartitionerClassName );
427
428    setWorkingDirectory( config );
429
430    return getPayload( config );
431    }
432
433  private UserPayload createIntermediatePayloadInput( TezConfiguration config, EdgeValues edgeValues )
434    {
435    config.set( TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, edgeValues.keyClassName );
436    config.set( TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, edgeValues.valueClassName );
437    config.set( TezRuntimeConfiguration.TEZ_RUNTIME_KEY_COMPARATOR_CLASS, edgeValues.keyComparatorClassName );
438    config.set( TezRuntimeConfiguration.TEZ_RUNTIME_PARTITIONER_CLASS, edgeValues.keyPartitionerClassName );
439
440    setWorkingDirectory( config );
441
442    return getPayload( config );
443    }
444
445  private static void setWorkingDirectory( Configuration conf )
446    {
447    String name = conf.get( JobContext.WORKING_DIR );
448
449    if( name != null )
450      return;
451
452    try
453      {
454      Path dir = FileSystem.get( conf ).getWorkingDirectory();
455      conf.set( JobContext.WORKING_DIR, dir.toString() );
456      }
457    catch( IOException exception )
458      {
459      throw new RuntimeException( exception );
460      }
461    }
462
463  public Vertex createVertex( FlowProcess<TezConfiguration> flowProcess, TezConfiguration initializedConfig, FlowNode flowNode )
464    {
465    JobConf conf = new JobConf( initializedConfig );
466
467    addInputOutputMapping( conf, flowNode );
468
469    conf.setBoolean( "mapred.used.genericoptionsparser", true );
470
471    Map<String, LocalResource> taskLocalResources = new HashMap<>();
472
473    Map<FlowElement, Configuration> sourceConfigs = initFromSources( flowNode, flowProcess, conf, taskLocalResources );
474    Map<FlowElement, Configuration> sinkConfigs = initFromSinks( flowNode, flowProcess, conf );
475
476    initFromTraps( flowNode, flowProcess, conf );
477
478    initFromNodeConfigDef( flowNode, conf );
479
480    // force step to local mode if any tap is local
481    setLocalMode( initializedConfig, conf, null );
482
483    conf.set( "cascading.flow.node.num", Integer.toString( flowNode.getOrdinal() ) );
484
485    HadoopUtil.setIsInflow( conf ); // must be called after all taps configurations have been retrieved
486
487    int parallelism = getParallelism( flowNode, conf );
488
489    if( parallelism == 0 )
490      throw new FlowException( getName(), "the default number of gather partitions must be set, see cascading.flow.FlowRuntimeProps" );
491
492    flowNode.addProcessAnnotation( FlowRuntimeProps.GATHER_PARTITIONS, Integer.toString( parallelism ) );
493
494    Vertex vertex = newVertex( flowNode, conf, parallelism );
495
496    if( !taskLocalResources.isEmpty() )
497      vertex.addTaskLocalFiles( taskLocalResources );
498
499    for( FlowElement flowElement : sourceConfigs.keySet() )
500      {
501      if( !( flowElement instanceof Tap ) )
502        continue;
503
504      Configuration sourceConf = sourceConfigs.get( flowElement );
505
506      // not setting the new-api value could result in failures if not set by the Scheme
507      if( sourceConf.get( "mapred.mapper.new-api" ) == null )
508        HadoopUtil.setNewApi( sourceConf, sourceConf.get( "mapred.input.format.class", sourceConf.get( "mapreduce.job.inputformat.class" ) ) );
509
510      // unfortunately we cannot just load the input format and set it on the builder with also pulling all other
511      // values out of the configuration.
512      MRInput.MRInputConfigBuilder configBuilder = MRInput.createConfigBuilder( sourceConf, null );
513
514      // the default in Tez is true, this overrides
515      if( conf.get( FlowRuntimeProps.COMBINE_SPLITS ) != null )
516        configBuilder.groupSplits( conf.getBoolean( FlowRuntimeProps.COMBINE_SPLITS, true ) );
517
518      // grouping splits loses file name info, breaking partition tap default impl
519      if( !CompositeTaps.unwindNarrow( PartitionTap.class, (Tap) flowElement ).isEmpty() ) // todo: generify
520        configBuilder.groupSplits( false );
521
522      DataSourceDescriptor dataSourceDescriptor = configBuilder.build();
523
524      vertex.addDataSource( FlowElements.id( flowElement ), dataSourceDescriptor );
525      }
526
527    for( FlowElement flowElement : sinkConfigs.keySet() )
528      {
529      if( !( flowElement instanceof Tap ) )
530        continue;
531
532      Configuration sinkConf = sinkConfigs.get( flowElement );
533
534      Class outputFormatClass;
535      String outputPath;
536
537      // we have to set sane defaults if not set by the tap
538      // typically the case of MultiSinkTap
539      String formatClassName = sinkConf.get( "mapred.output.format.class", sinkConf.get( "mapreduce.job.outputformat.class" ) );
540
541      if( formatClassName == null )
542        {
543        outputFormatClass = TextOutputFormat.class; // unused, use "new" api, its the default
544        outputPath = Hfs.getTempPath( sinkConf ).toString(); // unused
545        }
546      else
547        {
548        outputFormatClass = Util.loadClass( formatClassName );
549        outputPath = getOutputPath( sinkConf );
550        }
551
552      if( outputPath == null && getOutputPath( sinkConf ) == null && isFileOutputFormat( outputFormatClass ) )
553        outputPath = Hfs.getTempPath( sinkConf ).toString(); // unused
554
555      MROutput.MROutputConfigBuilder configBuilder = MROutput.createConfigBuilder( sinkConf, outputFormatClass, outputPath );
556
557      DataSinkDescriptor dataSinkDescriptor = configBuilder.build();
558
559      vertex.addDataSink( FlowElements.id( flowElement ), dataSinkDescriptor );
560      }
561
562    addRemoteDebug( flowNode, vertex );
563    addRemoteProfiling( flowNode, vertex );
564
565    if( vertex.getTaskLaunchCmdOpts() != null )
566      flowNode.addProcessAnnotation( TezConfiguration.TEZ_TASK_LAUNCH_CMD_OPTS, vertex.getTaskLaunchCmdOpts() );
567
568    return vertex;
569    }
570
571  protected String getOutputPath( Configuration sinkConf )
572    {
573    return sinkConf.get( "mapred.output.dir", sinkConf.get( "mapreduce.output.fileoutputformat.outputdir" ) );
574    }
575
576  protected boolean isFileOutputFormat( Class outputFormatClass )
577    {
578    return org.apache.hadoop.mapred.FileOutputFormat.class.isAssignableFrom( outputFormatClass ) ||
579      org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.class.isAssignableFrom( outputFormatClass );
580    }
581
582  protected int getParallelism( FlowNode flowNode, JobConf conf )
583    {
584    // only count streamed taps, accumulated taps are always annotated
585    HashSet<Tap> sourceStreamedTaps = new HashSet<>( flowNode.getSourceTaps() );
586
587    sourceStreamedTaps.removeAll( flowNode.getSourceElements( StreamMode.Accumulated ) );
588
589    if( sourceStreamedTaps.size() != 0 )
590      return -1;
591
592    int parallelism = Integer.MAX_VALUE;
593
594    for( Tap tap : flowNode.getSinkTaps() )
595      {
596      int numSinkParts = tap.getScheme().getNumSinkParts();
597
598      if( numSinkParts == 0 )
599        continue;
600
601      if( parallelism != Integer.MAX_VALUE )
602        LOG.info( "multiple sink taps in flow node declaring numSinkParts, choosing lowest value. see cascading.flow.FlowRuntimeProps for broader control." );
603
604      parallelism = Math.min( parallelism, numSinkParts );
605      }
606
607    if( parallelism != Integer.MAX_VALUE )
608      return parallelism;
609
610    return conf.getInt( FlowRuntimeProps.GATHER_PARTITIONS, 0 );
611    }
612
613  private void addInputOutputMapping( JobConf conf, FlowNode flowNode )
614    {
615    FlowNodeGraph flowNodeGraph = getFlowNodeGraph();
616    Set<ProcessEdge> incomingEdges = flowNodeGraph.incomingEdgesOf( flowNode );
617
618    for( ProcessEdge processEdge : incomingEdges )
619      conf.set( "cascading.node.source." + processEdge.getFlowElementID(), processEdge.getSourceProcessID() );
620
621    Set<ProcessEdge> outgoingEdges = flowNodeGraph.outgoingEdgesOf( flowNode );
622
623    for( ProcessEdge processEdge : outgoingEdges )
624      conf.set( "cascading.node.sink." + processEdge.getFlowElementID(), processEdge.getSinkProcessID() );
625    }
626
627  protected Map<FlowElement, Configuration> initFromSources( FlowNode flowNode, FlowProcess<TezConfiguration> flowProcess,
628                                                             Configuration conf, Map<String, LocalResource> taskLocalResources )
629    {
630    Set<? extends FlowElement> accumulatedSources = flowNode.getSourceElements( StreamMode.Accumulated );
631
632    for( FlowElement element : accumulatedSources )
633      {
634      if( element instanceof Tap )
635        {
636        JobConf current = new JobConf( conf );
637        Tap tap = (Tap) element;
638
639        if( tap.getIdentifier() == null )
640          throw new IllegalStateException( "tap may not have null identifier: " + tap.toString() );
641
642        tap.sourceConfInit( flowProcess, current );
643
644        Collection<String> paths = current.getStringCollection( CASCADING_LOCAL_RESOURCES + Tap.id( tap ) );
645
646        if( !paths.isEmpty() )
647          {
648          String flowStagingPath = ( (Hadoop2TezFlow) getFlow() ).getFlowStagingPath();
649          String resourceSubPath = Tap.id( tap );
650          Map<Path, Path> pathMap = TezUtil.addToClassPath( current, flowStagingPath, resourceSubPath, paths, LocalResourceType.FILE, taskLocalResources, null );
651
652          current.setStrings( CASCADING_REMOTE_RESOURCES + Tap.id( tap ), taskLocalResources.keySet().toArray( new String[ taskLocalResources.size() ] ) );
653
654          allLocalResources.putAll( taskLocalResources );
655          syncPaths.putAll( pathMap );
656          }
657
658        Map<String, String> map = flowProcess.diffConfigIntoMap( new TezConfiguration( conf ), new TezConfiguration( current ) );
659        conf.set( "cascading.node.accumulated.source.conf." + Tap.id( tap ), pack( map, conf ) );
660
661        setLocalMode( conf, current, tap );
662        }
663      }
664
665    Set<FlowElement> sources = new HashSet<>( flowNode.getSourceElements() );
666
667    sources.removeAll( accumulatedSources );
668
669    if( sources.isEmpty() )
670      throw new IllegalStateException( "all sources marked as accumulated" );
671
672    Map<FlowElement, Configuration> configs = new HashMap<>();
673
674    for( FlowElement element : sources )
675      {
676      JobConf current = new JobConf( conf );
677
678      String id = FlowElements.id( element );
679
680      current.set( "cascading.node.source", id );
681
682      if( element instanceof Tap )
683        {
684        Tap tap = (Tap) element;
685
686        if( tap.getIdentifier() == null )
687          throw new IllegalStateException( "tap may not have null identifier: " + tap.toString() );
688
689        tap.sourceConfInit( flowProcess, current );
690
691        setLocalMode( conf, current, tap );
692        }
693
694      configs.put( element, current );
695      }
696
697    return configs;
698    }
699
700  protected Map<FlowElement, Configuration> initFromSinks( FlowNode flowNode, FlowProcess<? extends Configuration> flowProcess, Configuration conf )
701    {
702    Set<FlowElement> sinks = flowNode.getSinkElements();
703    Map<FlowElement, Configuration> configs = new HashMap<>();
704
705    for( FlowElement element : sinks )
706      {
707      JobConf current = new JobConf( conf );
708
709      if( element instanceof Tap )
710        {
711        Tap tap = (Tap) element;
712
713        if( tap.getIdentifier() == null )
714          throw new IllegalStateException( "tap may not have null identifier: " + element.toString() );
715
716        tap.sinkConfInit( flowProcess, current );
717
718        setLocalMode( conf, current, tap );
719        }
720
721      String id = FlowElements.id( element );
722
723      current.set( "cascading.node.sink", id );
724
725      configs.put( element, current );
726      }
727
728    return configs;
729    }
730
731  private void initFromNodeConfigDef( FlowNode flowNode, Configuration conf )
732    {
733    initConfFromNodeConfigDef( flowNode.getElementGraph(), new ConfigurationSetter( conf ) );
734    }
735
736  private void initFromStepConfigDef( Configuration conf )
737    {
738    initConfFromStepConfigDef( new ConfigurationSetter( conf ) );
739    }
740
741  protected void initFromTraps( FlowNode flowNode, FlowProcess<? extends Configuration> flowProcess, Configuration conf )
742    {
743    Map<String, Tap> traps = flowNode.getTrapMap();
744
745    if( !traps.isEmpty() )
746      {
747      JobConf trapConf = new JobConf( conf );
748
749      for( Tap tap : traps.values() )
750        {
751        tap.sinkConfInit( flowProcess, trapConf );
752        setLocalMode( conf, trapConf, tap );
753        }
754      }
755    }
756
757  private Vertex newVertex( FlowNode flowNode, Configuration conf, int parallelism )
758    {
759    conf.set( FlowNode.CASCADING_FLOW_NODE, pack( flowNode, conf ) ); // todo: pack into payload directly
760
761    ProcessorDescriptor descriptor = ProcessorDescriptor.create( FlowProcessor.class.getName() );
762
763    descriptor.setUserPayload( getPayload( conf ) );
764
765    Vertex vertex = Vertex.create( flowNode.getID(), descriptor, parallelism );
766
767    if( environment != null )
768      vertex.setTaskEnvironment( environment );
769
770    return vertex;
771    }
772
773  private UserPayload getPayload( Configuration conf )
774    {
775    try
776      {
777      return TezUtils.createUserPayloadFromConf( conf );
778      }
779    catch( IOException exception )
780      {
781      throw new CascadingException( exception );
782      }
783    }
784
785  private String pack( Object object, Configuration conf )
786    {
787    try
788      {
789      return serializeBase64( object, conf, true );
790      }
791    catch( IOException exception )
792      {
793      throw new FlowException( "unable to pack object: " + object.getClass().getCanonicalName(), exception );
794      }
795    }
796
797  @Override
798  public void clean( TezConfiguration config )
799    {
800    for( Tap sink : getSinkTaps() )
801      {
802      if( sink.isTemporary() && ( getFlow().getFlowStats().isSuccessful() || getFlow().getRunID() == null ) )
803        {
804        try
805          {
806          sink.deleteResource( config );
807          }
808        catch( Exception exception )
809          {
810          // sink all exceptions, don't fail app
811          logWarn( "unable to remove temporary file: " + sink, exception );
812          }
813        }
814      else
815        {
816        cleanTapMetaData( config, sink );
817        }
818      }
819
820    for( Tap tap : getTraps() )
821      cleanTapMetaData( config, tap );
822    }
823
824  private void cleanTapMetaData( TezConfiguration config, Tap tap )
825    {
826    try
827      {
828      Hadoop18TapUtil.cleanupTapMetaData( config, tap );
829      }
830    catch( IOException exception )
831      {
832      // ignore exception
833      }
834    }
835
836  public void syncArtifacts()
837    {
838    // this may not be strictly necessary, but there is a condition where setting the access time
839    // fails, so there may be one were setting the modification time fails. if so, we can compensate.
840    Map<String, Long> timestamps = HadoopUtil.syncPaths( getConfig(), syncPaths, true );
841
842    for( Map.Entry<String, Long> entry : timestamps.entrySet() )
843      {
844      LocalResource localResource = allLocalResources.get( entry.getKey() );
845
846      if( localResource != null )
847        localResource.setTimestamp( entry.getValue() );
848      }
849    }
850
851  private void setLocalMode( Configuration parent, JobConf current, Tap tap )
852    {
853    // force step to local mode
854    if( !HadoopUtil.isLocal( current ) )
855      return;
856
857    if( tap != null )
858      logInfo( "tap forcing step to tez local mode: " + tap.getIdentifier() );
859
860    HadoopUtil.setLocal( parent );
861    }
862
863  private void addRemoteDebug( FlowNode flowNode, Vertex vertex )
864    {
865    String value = System.getProperty( "test.debug.node", null );
866
867    if( Util.isEmpty( value ) )
868      return;
869
870    if( !flowNode.getSourceElementNames().contains( value ) && asInt( value ) != flowNode.getOrdinal() )
871      return;
872
873    LOG.warn( "remote debugging enabled with property: {}, on node: {}, with node id: {}", "test.debug.node", value, flowNode.getID() );
874
875    String opts = vertex.getTaskLaunchCmdOpts();
876
877    if( opts == null )
878      opts = "";
879
880    String address = System.getProperty( "test.debug.address", "localhost:5005" ).trim();
881
882    opts += " -agentlib:jdwp=transport=dt_socket,server=n,address=" + address + ",suspend=y";
883
884    vertex.setTaskLaunchCmdOpts( opts );
885    }
886
887  private void addRemoteProfiling( FlowNode flowNode, Vertex vertex )
888    {
889    String value = System.getProperty( "test.profile.node", null );
890
891    if( Util.isEmpty( value ) )
892      return;
893
894    if( !flowNode.getSourceElementNames().contains( value ) && asInt( value ) != flowNode.getOrdinal() )
895      return;
896
897    LOG.warn( "remote profiling enabled with property: {}, on node: {}, with node id: {}", "test.profile.node", value, flowNode.getID() );
898
899    String opts = vertex.getTaskLaunchCmdOpts();
900
901    if( opts == null )
902      opts = "";
903
904    String path = System.getProperty( "test.profile.path", "/tmp/jfr/" );
905
906    if( !path.endsWith( "/" ) )
907      path += "/";
908
909    LOG.warn( "remote profiling property: {}, logging to path: {}", "test.profile.path", path );
910
911    opts += String.format( " -XX:+UnlockCommercialFeatures -XX:+FlightRecorder -XX:FlightRecorderOptions=defaultrecording=true,dumponexit=true,dumponexitpath=%1$s%2$s,disk=true,repository=%1$s%2$s", path, flowNode.getID() );
912
913    vertex.setTaskLaunchCmdOpts( opts );
914    }
915
916  private int asInt( String value )
917    {
918    try
919      {
920      return Integer.parseInt( value );
921      }
922    catch( NumberFormatException exception )
923      {
924      return -1;
925      }
926    }
927
928  public Map<String, LocalResource> getAllLocalResources()
929    {
930    return allLocalResources;
931    }
932
933  private static class EdgeValues
934    {
935    FlowElement flowElement;
936    TezConfiguration config;
937    Set<Integer> ordinals;
938    String keyClassName;
939    String valueClassName;
940    String keyComparatorClassName;
941    String keyPartitionerClassName;
942    String outputClassName;
943    String inputClassName;
944    EdgeProperty.DataMovementType movementType;
945    EdgeProperty.DataSourceType sourceType;
946    EdgeProperty.SchedulingType schedulingType;
947
948    Map<Integer, Fields> resolvedKeyFieldsMap;
949    Map<Integer, Fields> resolvedSortFieldsMap;
950    Map<Integer, Fields> resolvedValueFieldsMap;
951
952    private EdgeValues( TezConfiguration config, ProcessEdge processEdge )
953      {
954      this.config = config;
955      this.flowElement = processEdge.getFlowElement();
956      this.ordinals = processEdge.getSourceProvidedOrdinals();
957
958      this.resolvedKeyFieldsMap = processEdge.getResolvedKeyFields();
959      this.resolvedSortFieldsMap = processEdge.getResolvedSortFields();
960      this.resolvedValueFieldsMap = processEdge.getResolvedValueFields();
961      }
962
963    public FlowElement getFlowElement()
964      {
965      return flowElement;
966      }
967
968    public TezConfiguration getConfig()
969      {
970      return config;
971      }
972
973    public Set getOrdinals()
974      {
975      return ordinals;
976      }
977
978    public String getKeyClassName()
979      {
980      return keyClassName;
981      }
982
983    public String getValueClassName()
984      {
985      return valueClassName;
986      }
987
988    public String getKeyComparatorClassName()
989      {
990      return keyComparatorClassName;
991      }
992
993    public String getKeyPartitionerClassName()
994      {
995      return keyPartitionerClassName;
996      }
997
998    public String getOutputClassName()
999      {
1000      return outputClassName;
1001      }
1002
1003    public String getInputClassName()
1004      {
1005      return inputClassName;
1006      }
1007
1008    public EdgeProperty.DataMovementType getMovementType()
1009      {
1010      return movementType;
1011      }
1012
1013    public EdgeProperty.DataSourceType getSourceType()
1014      {
1015      return sourceType;
1016      }
1017
1018    public EdgeProperty.SchedulingType getSchedulingType()
1019      {
1020      return schedulingType;
1021      }
1022
1023    public Map<Integer, Fields> getResolvedKeyFieldsMap()
1024      {
1025      return resolvedKeyFieldsMap;
1026      }
1027
1028    public Map<Integer, Fields> getResolvedSortFieldsMap()
1029      {
1030      return resolvedSortFieldsMap;
1031      }
1032
1033    public Map<Integer, Fields> getResolvedValueFieldsMap()
1034      {
1035      return resolvedValueFieldsMap;
1036      }
1037    }
1038  }