001/* 002 * Copyright (c) 2016 Chris K Wensel <chris@wensel.net>. All Rights Reserved. 003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 004 * 005 * Project and contact information: http://www.cascading.org/ 006 * 007 * This file is part of the Cascading project. 008 * 009 * Licensed under the Apache License, Version 2.0 (the "License"); 010 * you may not use this file except in compliance with the License. 011 * You may obtain a copy of the License at 012 * 013 * http://www.apache.org/licenses/LICENSE-2.0 014 * 015 * Unless required by applicable law or agreed to in writing, software 016 * distributed under the License is distributed on an "AS IS" BASIS, 017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 018 * See the License for the specific language governing permissions and 019 * limitations under the License. 020 */ 021 022package cascading.flow.tez; 023 024import java.io.File; 025import java.io.IOException; 026import java.util.ArrayList; 027import java.util.Collection; 028import java.util.HashMap; 029import java.util.HashSet; 030import java.util.Iterator; 031import java.util.LinkedList; 032import java.util.List; 033import java.util.Map; 034import java.util.Set; 035 036import cascading.CascadingException; 037import cascading.flow.FlowElement; 038import cascading.flow.FlowElements; 039import cascading.flow.FlowException; 040import cascading.flow.FlowNode; 041import cascading.flow.FlowProcess; 042import cascading.flow.FlowRuntimeProps; 043import cascading.flow.hadoop.ConfigurationSetter; 044import cascading.flow.hadoop.util.HadoopUtil; 045import cascading.flow.planner.BaseFlowStep; 046import cascading.flow.planner.FlowStepJob; 047import cascading.flow.planner.graph.ElementGraph; 048import cascading.flow.planner.process.FlowNodeGraph; 049import cascading.flow.planner.process.ProcessEdge; 050import cascading.flow.stream.annotations.StreamMode; 051import cascading.flow.tez.planner.Hadoop2TezFlowStepJob; 052import cascading.flow.tez.util.TezUtil; 053import cascading.management.state.ClientState; 054import cascading.pipe.Boundary; 055import cascading.pipe.CoGroup; 056import cascading.pipe.Group; 057import cascading.pipe.GroupBy; 058import cascading.pipe.Merge; 059import cascading.pipe.Splice; 060import cascading.property.AppProps; 061import cascading.tap.CompositeTaps; 062import cascading.tap.Tap; 063import cascading.tap.hadoop.Hfs; 064import cascading.tap.hadoop.PartitionTap; 065import cascading.tap.hadoop.util.Hadoop18TapUtil; 066import cascading.tuple.Fields; 067import cascading.tuple.hadoop.TupleSerialization; 068import cascading.tuple.hadoop.util.GroupingSortingComparator; 069import cascading.tuple.hadoop.util.ReverseGroupingSortingComparator; 070import cascading.tuple.hadoop.util.ReverseTupleComparator; 071import cascading.tuple.hadoop.util.TupleComparator; 072import cascading.tuple.io.KeyTuple; 073import cascading.tuple.io.TuplePair; 074import cascading.tuple.io.ValueTuple; 075import cascading.tuple.tez.util.GroupingSortingPartitioner; 076import cascading.tuple.tez.util.TuplePartitioner; 077import cascading.util.Util; 078import cascading.util.Version; 079import org.apache.hadoop.conf.Configuration; 080import org.apache.hadoop.fs.FileSystem; 081import org.apache.hadoop.fs.Path; 082import org.apache.hadoop.io.serializer.Serialization; 083import org.apache.hadoop.mapred.JobConf; 084import org.apache.hadoop.mapreduce.JobContext; 085import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 086import org.apache.hadoop.yarn.api.records.LocalResource; 087import org.apache.hadoop.yarn.api.records.LocalResourceType; 088import org.apache.tez.common.TezUtils; 089import org.apache.tez.dag.api.DAG; 090import org.apache.tez.dag.api.DataSinkDescriptor; 091import org.apache.tez.dag.api.DataSourceDescriptor; 092import org.apache.tez.dag.api.Edge; 093import org.apache.tez.dag.api.EdgeProperty; 094import org.apache.tez.dag.api.GroupInputEdge; 095import org.apache.tez.dag.api.InputDescriptor; 096import org.apache.tez.dag.api.OutputDescriptor; 097import org.apache.tez.dag.api.ProcessorDescriptor; 098import org.apache.tez.dag.api.TezConfiguration; 099import org.apache.tez.dag.api.UserPayload; 100import org.apache.tez.dag.api.Vertex; 101import org.apache.tez.dag.api.VertexGroup; 102import org.apache.tez.mapreduce.input.MRInput; 103import org.apache.tez.mapreduce.output.MROutput; 104import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; 105import org.apache.tez.runtime.library.input.ConcatenatedMergedKeyValueInput; 106import org.apache.tez.runtime.library.input.OrderedGroupedKVInput; 107import org.apache.tez.runtime.library.input.OrderedGroupedMergedKVInput; 108import org.apache.tez.runtime.library.input.UnorderedKVInput; 109import org.apache.tez.runtime.library.output.OrderedPartitionedKVOutput; 110import org.apache.tez.runtime.library.output.UnorderedKVOutput; 111import org.apache.tez.runtime.library.output.UnorderedPartitionedKVOutput; 112import org.slf4j.Logger; 113import org.slf4j.LoggerFactory; 114 115import static cascading.flow.hadoop.util.HadoopUtil.*; 116import static cascading.flow.tez.util.TezUtil.addToClassPath; 117import static cascading.tap.hadoop.DistCacheTap.CASCADING_LOCAL_RESOURCES; 118import static cascading.tap.hadoop.DistCacheTap.CASCADING_REMOTE_RESOURCES; 119import static cascading.util.Util.getFirst; 120import static java.util.Collections.singletonList; 121import static org.apache.hadoop.yarn.api.records.LocalResourceType.ARCHIVE; 122import static org.apache.hadoop.yarn.api.records.LocalResourceType.FILE; 123 124/** 125 * 126 */ 127public class Hadoop2TezFlowStep extends BaseFlowStep<TezConfiguration> 128 { 129 private static final Logger LOG = LoggerFactory.getLogger( Hadoop2TezFlowStep.class ); 130 131 private Map<String, LocalResource> allLocalResources = new HashMap<>(); 132 private Map<Path, Path> syncPaths = new HashMap<>(); 133 private Map<String, String> environment = new HashMap<>(); 134 135 public Hadoop2TezFlowStep( ElementGraph elementGraph, FlowNodeGraph flowNodeGraph ) 136 { 137 super( elementGraph, flowNodeGraph ); 138 } 139 140 @Override 141 public Map<Object, Object> getConfigAsProperties() 142 { 143 return HadoopUtil.createProperties( getConfig() ); 144 } 145 146 @Override 147 public TezConfiguration createInitializedConfig( FlowProcess<TezConfiguration> flowProcess, TezConfiguration parentConfig ) 148 { 149 TezConfiguration stepConf = parentConfig == null ? new TezConfiguration() : new TezConfiguration( parentConfig ); 150 151 Set<String> serializations = getFieldDeclaredSerializations( Serialization.class ); 152 153 TupleSerialization.setSerializations( stepConf, serializations ); 154 155 String versionString = Version.getRelease(); 156 157 if( versionString != null ) 158 stepConf.set( "cascading.version", versionString ); 159 160 stepConf.set( CASCADING_FLOW_STEP_ID, getID() ); 161 stepConf.set( "cascading.flow.step.num", Integer.toString( getOrdinal() ) ); 162 163 String flowStagingPath = ( (Hadoop2TezFlow) getFlow() ).getFlowStagingPath(); 164 List<String> classPath = ( (Hadoop2TezFlow) getFlow() ).getClassPath(); 165 166 // is updated in addToClassPath method 167 Map<String, LocalResource> dagResources = new HashMap<>(); 168 169 if( !classPath.isEmpty() ) 170 { 171 // jars in the root will be in the remote CLASSPATH, no need to add to the environment 172 Map<Path, Path> dagClassPath = addToClassPath( stepConf, flowStagingPath, null, classPath, FILE, dagResources, null ); 173 174 syncPaths.putAll( dagClassPath ); 175 } 176 177 String appJarPath = stepConf.get( AppProps.APP_JAR_PATH ); 178 179 if( appJarPath != null ) 180 { 181 // the PATTERN represents the insides of the app jar, those elements must be added to the remote CLASSPATH 182 List<String> classpath = singletonList( appJarPath ); 183 Map<Path, Path> pathMap = addToClassPath( stepConf, flowStagingPath, null, classpath, ARCHIVE, dagResources, environment ); 184 185 syncPaths.putAll( pathMap ); 186 187 // AM does not support environments like containers do, so the classpath has to be passed via configuration. 188 String fileName = new File( appJarPath ).getName(); 189 stepConf.set( TezConfiguration.TEZ_CLUSTER_ADDITIONAL_CLASSPATH_PREFIX, 190 "$PWD/" + fileName + "/:$PWD/" + fileName + "/classes/:$PWD/" + fileName + "/lib/*:" ); 191 } 192 193 allLocalResources.putAll( dagResources ); 194 195 initFromStepConfigDef( stepConf ); 196 197 return stepConf; 198 } 199 200 @Override 201 protected FlowStepJob createFlowStepJob( ClientState clientState, FlowProcess<TezConfiguration> flowProcess, TezConfiguration initializedStepConfig ) 202 { 203 DAG dag = createDAG( flowProcess, initializedStepConfig ); 204 205 return new Hadoop2TezFlowStepJob( clientState, this, initializedStepConfig, dag ); 206 } 207 208 private DAG createDAG( FlowProcess<TezConfiguration> flowProcess, TezConfiguration initializedConfig ) 209 { 210 FlowNodeGraph nodeGraph = getFlowNodeGraph(); 211 Map<FlowNode, Vertex> vertexMap = new HashMap<>(); 212 DAG dag = DAG.create( getStepDisplayName( initializedConfig.getInt( "cascading.display.id.truncate", Util.ID_LENGTH ) ) ); 213 214 dag.addTaskLocalFiles( allLocalResources ); 215 216 Iterator<FlowNode> iterator = nodeGraph.getOrderedTopologicalIterator(); // ordering of nodes for consistent remote debugging 217 218 while( iterator.hasNext() ) 219 { 220 FlowNode flowNode = iterator.next(); 221 222 Vertex vertex = createVertex( flowProcess, initializedConfig, flowNode ); 223 dag.addVertex( vertex ); 224 225 vertexMap.put( flowNode, vertex ); 226 } 227 228 LinkedList<ProcessEdge> processedEdges = new LinkedList<>(); 229 230 for( ProcessEdge processEdge : nodeGraph.edgeSet() ) 231 { 232 if( processedEdges.contains( processEdge ) ) 233 continue; 234 235 FlowNode edgeTargetFlowNode = nodeGraph.getEdgeTarget( processEdge ); 236 237 FlowElement flowElement = processEdge.getFlowElement(); 238 List<FlowNode> sourceNodes = nodeGraph.getElementSourceProcesses( flowElement ); 239 240 EdgeProperty edgeProperty = createEdgeProperty( initializedConfig, processEdge ); 241 242 Vertex targetVertex = vertexMap.get( edgeTargetFlowNode ); 243 244 if( sourceNodes.size() == 1 || flowElement instanceof CoGroup || flowElement instanceof Boundary ) // todo: create group vertices around incoming ordinal 245 { 246 FlowNode edgeSourceFlowNode = nodeGraph.getEdgeSource( processEdge ); 247 Vertex sourceVertex = vertexMap.get( edgeSourceFlowNode ); 248 249 LOG.debug( "adding edge between: {} and {}", sourceVertex, targetVertex ); 250 251 dag.addEdge( Edge.create( sourceVertex, targetVertex, edgeProperty ) ); 252 } 253 else if( flowElement instanceof GroupBy || flowElement instanceof Merge ) // merge - source nodes > 1 254 { 255 List<String> sourceVerticesIDs = new ArrayList<>(); 256 List<Vertex> sourceVertices = new ArrayList<>(); 257 258 for( FlowNode edgeSourceFlowNode : sourceNodes ) 259 { 260 sourceVerticesIDs.add( edgeSourceFlowNode.getID() ); 261 sourceVertices.add( vertexMap.get( edgeSourceFlowNode ) ); 262 processedEdges.add( nodeGraph.getEdge( edgeSourceFlowNode, edgeTargetFlowNode ) ); 263 } 264 265 VertexGroup vertexGroup = dag.createVertexGroup( edgeTargetFlowNode.getID(), sourceVertices.toArray( new Vertex[ sourceVertices.size() ] ) ); 266 267 String inputClassName = flowElement instanceof Group ? OrderedGroupedMergedKVInput.class.getName() : ConcatenatedMergedKeyValueInput.class.getName(); 268 269 InputDescriptor inputDescriptor = InputDescriptor.create( inputClassName ).setUserPayload( edgeProperty.getEdgeDestination().getUserPayload() ); 270 271 String type = ( (Splice) flowElement ).isMerge() ? "merged" : "grouped"; 272 LOG.info( "adding {} edge between: {} and {}", type, Util.join( sourceVerticesIDs, "," ), targetVertex.getName() ); 273 dag.addEdge( GroupInputEdge.create( vertexGroup, targetVertex, edgeProperty, inputDescriptor ) ); 274 } 275 else 276 { 277 throw new UnsupportedOperationException( "can't make edge for: " + flowElement ); 278 } 279 } 280 281 return dag; 282 } 283 284 private EdgeProperty createEdgeProperty( TezConfiguration config, ProcessEdge processEdge ) 285 { 286 FlowElement flowElement = processEdge.getFlowElement(); 287 288 EdgeValues edgeValues = new EdgeValues( new TezConfiguration( config ), processEdge ); 289 290 edgeValues.keyClassName = KeyTuple.class.getName(); // TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_CLASS 291 edgeValues.valueClassName = ValueTuple.class.getName(); // TEZ_RUNTIME_INTERMEDIATE_OUTPUT_VALUE_CLASS 292 edgeValues.keyComparatorClassName = TupleComparator.class.getName(); 293 edgeValues.keyPartitionerClassName = TuplePartitioner.class.getName(); 294 edgeValues.outputClassName = null; 295 edgeValues.inputClassName = null; 296 edgeValues.movementType = null; 297 edgeValues.sourceType = null; 298 edgeValues.schedulingType = null; 299 300 if( flowElement instanceof Group ) 301 applyGroup( edgeValues ); 302 else if( ( flowElement instanceof Boundary || flowElement instanceof Merge ) && processEdge.getSinkAnnotations().contains( StreamMode.Accumulated ) ) 303 applyBoundaryMergeAccumulated( edgeValues ); 304 else if( flowElement instanceof Boundary || flowElement instanceof Merge ) 305 applyBoundaryMerge( edgeValues ); 306 else 307 throw new IllegalStateException( "unsupported flow element: " + flowElement.getClass().getCanonicalName() ); 308 309 applyEdgeAnnotations( processEdge, edgeValues ); 310 311 return createEdgeProperty( edgeValues ); 312 } 313 314 private void applyEdgeAnnotations( ProcessEdge processEdge, EdgeValues edgeValues ) 315 { 316 processEdge.addEdgeAnnotation( edgeValues.movementType ); 317 processEdge.addEdgeAnnotation( edgeValues.sourceType ); 318 processEdge.addEdgeAnnotation( edgeValues.schedulingType ); 319 } 320 321 private EdgeValues applyBoundaryMerge( EdgeValues edgeValues ) 322 { 323 // todo: support for one to one 324 edgeValues.outputClassName = UnorderedPartitionedKVOutput.class.getName(); 325 edgeValues.inputClassName = UnorderedKVInput.class.getName(); 326 327 edgeValues.movementType = EdgeProperty.DataMovementType.SCATTER_GATHER; 328 edgeValues.sourceType = EdgeProperty.DataSourceType.PERSISTED; 329 edgeValues.schedulingType = EdgeProperty.SchedulingType.SEQUENTIAL; 330 331 return edgeValues; 332 } 333 334 private EdgeValues applyBoundaryMergeAccumulated( EdgeValues edgeValues ) 335 { 336 edgeValues.outputClassName = UnorderedKVOutput.class.getName(); 337 edgeValues.inputClassName = UnorderedKVInput.class.getName(); 338 339 edgeValues.movementType = EdgeProperty.DataMovementType.BROADCAST; 340 edgeValues.sourceType = EdgeProperty.DataSourceType.PERSISTED; 341 edgeValues.schedulingType = EdgeProperty.SchedulingType.SEQUENTIAL; 342 343 return edgeValues; 344 } 345 346 private EdgeValues applyGroup( EdgeValues edgeValues ) 347 { 348 Group group = (Group) edgeValues.flowElement; 349 350 if( group.isSortReversed() ) 351 edgeValues.keyComparatorClassName = ReverseTupleComparator.class.getName(); 352 353 int ordinal = getFirst( edgeValues.ordinals ); 354 355 addComparators( edgeValues.config, "cascading.group.comparator", group.getKeySelectors(), edgeValues.getResolvedKeyFieldsMap().get( ordinal ) ); 356 357 if( !group.isGroupBy() ) 358 { 359 edgeValues.outputClassName = OrderedPartitionedKVOutput.class.getName(); 360 edgeValues.inputClassName = OrderedGroupedKVInput.class.getName(); 361 362 edgeValues.movementType = EdgeProperty.DataMovementType.SCATTER_GATHER; 363 edgeValues.sourceType = EdgeProperty.DataSourceType.PERSISTED; 364 edgeValues.schedulingType = EdgeProperty.SchedulingType.SEQUENTIAL; 365 } 366 else 367 { 368 addComparators( edgeValues.config, "cascading.sort.comparator", group.getSortingSelectors(), edgeValues.getResolvedSortFieldsMap().get( ordinal ) ); 369 370 edgeValues.outputClassName = OrderedPartitionedKVOutput.class.getName(); 371 edgeValues.inputClassName = OrderedGroupedKVInput.class.getName(); 372 373 edgeValues.movementType = EdgeProperty.DataMovementType.SCATTER_GATHER; 374 edgeValues.sourceType = EdgeProperty.DataSourceType.PERSISTED; 375 edgeValues.schedulingType = EdgeProperty.SchedulingType.SEQUENTIAL; 376 } 377 378 if( group.isSorted() ) 379 { 380 edgeValues.keyClassName = TuplePair.class.getName(); 381 edgeValues.keyPartitionerClassName = GroupingSortingPartitioner.class.getName(); 382 383 if( group.isSortReversed() ) 384 edgeValues.keyComparatorClassName = ReverseGroupingSortingComparator.class.getName(); 385 else 386 edgeValues.keyComparatorClassName = GroupingSortingComparator.class.getName(); 387 } 388 389 return edgeValues; 390 } 391 392 private EdgeProperty createEdgeProperty( EdgeValues edgeValues ) 393 { 394 TezConfiguration outputConfig = new TezConfiguration( edgeValues.getConfig() ); 395 outputConfig.set( "cascading.node.sink", FlowElements.id( edgeValues.getFlowElement() ) ); 396 outputConfig.set( "cascading.node.ordinals", Util.join( edgeValues.getOrdinals(), "," ) ); 397 addFields( outputConfig, "cascading.node.key.fields", edgeValues.getResolvedKeyFieldsMap() ); 398 addFields( outputConfig, "cascading.node.sort.fields", edgeValues.getResolvedSortFieldsMap() ); 399 addFields( outputConfig, "cascading.node.value.fields", edgeValues.getResolvedValueFieldsMap() ); 400 401 UserPayload outputPayload = createIntermediatePayloadOutput( outputConfig, edgeValues ); 402 403 TezConfiguration inputConfig = new TezConfiguration( edgeValues.getConfig() ); 404 inputConfig.set( "cascading.node.source", FlowElements.id( edgeValues.getFlowElement() ) ); 405 inputConfig.set( "cascading.node.ordinals", Util.join( edgeValues.getOrdinals(), "," ) ); 406 addFields( inputConfig, "cascading.node.key.fields", edgeValues.getResolvedKeyFieldsMap() ); 407 addFields( inputConfig, "cascading.node.sort.fields", edgeValues.getResolvedSortFieldsMap() ); 408 addFields( inputConfig, "cascading.node.value.fields", edgeValues.getResolvedValueFieldsMap() ); 409 410 UserPayload inputPayload = createIntermediatePayloadInput( inputConfig, edgeValues ); 411 412 return EdgeProperty.create( 413 edgeValues.getMovementType(), 414 edgeValues.getSourceType(), 415 edgeValues.getSchedulingType(), 416 OutputDescriptor.create( edgeValues.getOutputClassName() ).setUserPayload( outputPayload ), 417 InputDescriptor.create( edgeValues.getInputClassName() ).setUserPayload( inputPayload ) 418 ); 419 } 420 421 private UserPayload createIntermediatePayloadOutput( TezConfiguration config, EdgeValues edgeValues ) 422 { 423 config.set( TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, edgeValues.keyClassName ); 424 config.set( TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, edgeValues.valueClassName ); 425 config.set( TezRuntimeConfiguration.TEZ_RUNTIME_KEY_COMPARATOR_CLASS, edgeValues.keyComparatorClassName ); 426 config.set( TezRuntimeConfiguration.TEZ_RUNTIME_PARTITIONER_CLASS, edgeValues.keyPartitionerClassName ); 427 428 setWorkingDirectory( config ); 429 430 return getPayload( config ); 431 } 432 433 private UserPayload createIntermediatePayloadInput( TezConfiguration config, EdgeValues edgeValues ) 434 { 435 config.set( TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, edgeValues.keyClassName ); 436 config.set( TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, edgeValues.valueClassName ); 437 config.set( TezRuntimeConfiguration.TEZ_RUNTIME_KEY_COMPARATOR_CLASS, edgeValues.keyComparatorClassName ); 438 config.set( TezRuntimeConfiguration.TEZ_RUNTIME_PARTITIONER_CLASS, edgeValues.keyPartitionerClassName ); 439 440 setWorkingDirectory( config ); 441 442 return getPayload( config ); 443 } 444 445 private static void setWorkingDirectory( Configuration conf ) 446 { 447 String name = conf.get( JobContext.WORKING_DIR ); 448 449 if( name != null ) 450 return; 451 452 try 453 { 454 Path dir = FileSystem.get( conf ).getWorkingDirectory(); 455 conf.set( JobContext.WORKING_DIR, dir.toString() ); 456 } 457 catch( IOException exception ) 458 { 459 throw new RuntimeException( exception ); 460 } 461 } 462 463 public Vertex createVertex( FlowProcess<TezConfiguration> flowProcess, TezConfiguration initializedConfig, FlowNode flowNode ) 464 { 465 JobConf conf = new JobConf( initializedConfig ); 466 467 addInputOutputMapping( conf, flowNode ); 468 469 conf.setBoolean( "mapred.used.genericoptionsparser", true ); 470 471 Map<String, LocalResource> taskLocalResources = new HashMap<>(); 472 473 Map<FlowElement, Configuration> sourceConfigs = initFromSources( flowNode, flowProcess, conf, taskLocalResources ); 474 Map<FlowElement, Configuration> sinkConfigs = initFromSinks( flowNode, flowProcess, conf ); 475 476 initFromTraps( flowNode, flowProcess, conf ); 477 478 initFromNodeConfigDef( flowNode, conf ); 479 480 // force step to local mode if any tap is local 481 setLocalMode( initializedConfig, conf, null ); 482 483 conf.set( "cascading.flow.node.num", Integer.toString( flowNode.getOrdinal() ) ); 484 485 HadoopUtil.setIsInflow( conf ); // must be called after all taps configurations have been retrieved 486 487 int parallelism = getParallelism( flowNode, conf ); 488 489 if( parallelism == 0 ) 490 throw new FlowException( getName(), "the default number of gather partitions must be set, see cascading.flow.FlowRuntimeProps" ); 491 492 flowNode.addProcessAnnotation( FlowRuntimeProps.GATHER_PARTITIONS, Integer.toString( parallelism ) ); 493 494 Vertex vertex = newVertex( flowNode, conf, parallelism ); 495 496 if( !taskLocalResources.isEmpty() ) 497 vertex.addTaskLocalFiles( taskLocalResources ); 498 499 for( FlowElement flowElement : sourceConfigs.keySet() ) 500 { 501 if( !( flowElement instanceof Tap ) ) 502 continue; 503 504 Configuration sourceConf = sourceConfigs.get( flowElement ); 505 506 // not setting the new-api value could result in failures if not set by the Scheme 507 if( sourceConf.get( "mapred.mapper.new-api" ) == null ) 508 HadoopUtil.setNewApi( sourceConf, sourceConf.get( "mapred.input.format.class", sourceConf.get( "mapreduce.job.inputformat.class" ) ) ); 509 510 // unfortunately we cannot just load the input format and set it on the builder with also pulling all other 511 // values out of the configuration. 512 MRInput.MRInputConfigBuilder configBuilder = MRInput.createConfigBuilder( sourceConf, null ); 513 514 // the default in Tez is true, this overrides 515 if( conf.get( FlowRuntimeProps.COMBINE_SPLITS ) != null ) 516 configBuilder.groupSplits( conf.getBoolean( FlowRuntimeProps.COMBINE_SPLITS, true ) ); 517 518 // grouping splits loses file name info, breaking partition tap default impl 519 if( !CompositeTaps.unwindNarrow( PartitionTap.class, (Tap) flowElement ).isEmpty() ) // todo: generify 520 configBuilder.groupSplits( false ); 521 522 DataSourceDescriptor dataSourceDescriptor = configBuilder.build(); 523 524 vertex.addDataSource( FlowElements.id( flowElement ), dataSourceDescriptor ); 525 } 526 527 for( FlowElement flowElement : sinkConfigs.keySet() ) 528 { 529 if( !( flowElement instanceof Tap ) ) 530 continue; 531 532 Configuration sinkConf = sinkConfigs.get( flowElement ); 533 534 Class outputFormatClass; 535 String outputPath; 536 537 // we have to set sane defaults if not set by the tap 538 // typically the case of MultiSinkTap 539 String formatClassName = sinkConf.get( "mapred.output.format.class", sinkConf.get( "mapreduce.job.outputformat.class" ) ); 540 541 if( formatClassName == null ) 542 { 543 outputFormatClass = TextOutputFormat.class; // unused, use "new" api, its the default 544 outputPath = Hfs.getTempPath( sinkConf ).toString(); // unused 545 } 546 else 547 { 548 outputFormatClass = Util.loadClass( formatClassName ); 549 outputPath = getOutputPath( sinkConf ); 550 } 551 552 if( outputPath == null && getOutputPath( sinkConf ) == null && isFileOutputFormat( outputFormatClass ) ) 553 outputPath = Hfs.getTempPath( sinkConf ).toString(); // unused 554 555 MROutput.MROutputConfigBuilder configBuilder = MROutput.createConfigBuilder( sinkConf, outputFormatClass, outputPath ); 556 557 DataSinkDescriptor dataSinkDescriptor = configBuilder.build(); 558 559 vertex.addDataSink( FlowElements.id( flowElement ), dataSinkDescriptor ); 560 } 561 562 addRemoteDebug( flowNode, vertex ); 563 addRemoteProfiling( flowNode, vertex ); 564 565 if( vertex.getTaskLaunchCmdOpts() != null ) 566 flowNode.addProcessAnnotation( TezConfiguration.TEZ_TASK_LAUNCH_CMD_OPTS, vertex.getTaskLaunchCmdOpts() ); 567 568 return vertex; 569 } 570 571 protected String getOutputPath( Configuration sinkConf ) 572 { 573 return sinkConf.get( "mapred.output.dir", sinkConf.get( "mapreduce.output.fileoutputformat.outputdir" ) ); 574 } 575 576 protected boolean isFileOutputFormat( Class outputFormatClass ) 577 { 578 return org.apache.hadoop.mapred.FileOutputFormat.class.isAssignableFrom( outputFormatClass ) || 579 org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.class.isAssignableFrom( outputFormatClass ); 580 } 581 582 protected int getParallelism( FlowNode flowNode, JobConf conf ) 583 { 584 // only count streamed taps, accumulated taps are always annotated 585 HashSet<Tap> sourceStreamedTaps = new HashSet<>( flowNode.getSourceTaps() ); 586 587 sourceStreamedTaps.removeAll( flowNode.getSourceElements( StreamMode.Accumulated ) ); 588 589 if( sourceStreamedTaps.size() != 0 ) 590 return -1; 591 592 int parallelism = Integer.MAX_VALUE; 593 594 for( Tap tap : flowNode.getSinkTaps() ) 595 { 596 int numSinkParts = tap.getScheme().getNumSinkParts(); 597 598 if( numSinkParts == 0 ) 599 continue; 600 601 if( parallelism != Integer.MAX_VALUE ) 602 LOG.info( "multiple sink taps in flow node declaring numSinkParts, choosing lowest value. see cascading.flow.FlowRuntimeProps for broader control." ); 603 604 parallelism = Math.min( parallelism, numSinkParts ); 605 } 606 607 if( parallelism != Integer.MAX_VALUE ) 608 return parallelism; 609 610 return conf.getInt( FlowRuntimeProps.GATHER_PARTITIONS, 0 ); 611 } 612 613 private void addInputOutputMapping( JobConf conf, FlowNode flowNode ) 614 { 615 FlowNodeGraph flowNodeGraph = getFlowNodeGraph(); 616 Set<ProcessEdge> incomingEdges = flowNodeGraph.incomingEdgesOf( flowNode ); 617 618 for( ProcessEdge processEdge : incomingEdges ) 619 conf.set( "cascading.node.source." + processEdge.getFlowElementID(), processEdge.getSourceProcessID() ); 620 621 Set<ProcessEdge> outgoingEdges = flowNodeGraph.outgoingEdgesOf( flowNode ); 622 623 for( ProcessEdge processEdge : outgoingEdges ) 624 conf.set( "cascading.node.sink." + processEdge.getFlowElementID(), processEdge.getSinkProcessID() ); 625 } 626 627 protected Map<FlowElement, Configuration> initFromSources( FlowNode flowNode, FlowProcess<TezConfiguration> flowProcess, 628 Configuration conf, Map<String, LocalResource> taskLocalResources ) 629 { 630 Set<? extends FlowElement> accumulatedSources = flowNode.getSourceElements( StreamMode.Accumulated ); 631 632 for( FlowElement element : accumulatedSources ) 633 { 634 if( element instanceof Tap ) 635 { 636 JobConf current = new JobConf( conf ); 637 Tap tap = (Tap) element; 638 639 if( tap.getIdentifier() == null ) 640 throw new IllegalStateException( "tap may not have null identifier: " + tap.toString() ); 641 642 tap.sourceConfInit( flowProcess, current ); 643 644 Collection<String> paths = current.getStringCollection( CASCADING_LOCAL_RESOURCES + Tap.id( tap ) ); 645 646 if( !paths.isEmpty() ) 647 { 648 String flowStagingPath = ( (Hadoop2TezFlow) getFlow() ).getFlowStagingPath(); 649 String resourceSubPath = Tap.id( tap ); 650 Map<Path, Path> pathMap = TezUtil.addToClassPath( current, flowStagingPath, resourceSubPath, paths, LocalResourceType.FILE, taskLocalResources, null ); 651 652 current.setStrings( CASCADING_REMOTE_RESOURCES + Tap.id( tap ), taskLocalResources.keySet().toArray( new String[ taskLocalResources.size() ] ) ); 653 654 allLocalResources.putAll( taskLocalResources ); 655 syncPaths.putAll( pathMap ); 656 } 657 658 Map<String, String> map = flowProcess.diffConfigIntoMap( new TezConfiguration( conf ), new TezConfiguration( current ) ); 659 conf.set( "cascading.node.accumulated.source.conf." + Tap.id( tap ), pack( map, conf ) ); 660 661 setLocalMode( conf, current, tap ); 662 } 663 } 664 665 Set<FlowElement> sources = new HashSet<>( flowNode.getSourceElements() ); 666 667 sources.removeAll( accumulatedSources ); 668 669 if( sources.isEmpty() ) 670 throw new IllegalStateException( "all sources marked as accumulated" ); 671 672 Map<FlowElement, Configuration> configs = new HashMap<>(); 673 674 for( FlowElement element : sources ) 675 { 676 JobConf current = new JobConf( conf ); 677 678 String id = FlowElements.id( element ); 679 680 current.set( "cascading.node.source", id ); 681 682 if( element instanceof Tap ) 683 { 684 Tap tap = (Tap) element; 685 686 if( tap.getIdentifier() == null ) 687 throw new IllegalStateException( "tap may not have null identifier: " + tap.toString() ); 688 689 tap.sourceConfInit( flowProcess, current ); 690 691 setLocalMode( conf, current, tap ); 692 } 693 694 configs.put( element, current ); 695 } 696 697 return configs; 698 } 699 700 protected Map<FlowElement, Configuration> initFromSinks( FlowNode flowNode, FlowProcess<? extends Configuration> flowProcess, Configuration conf ) 701 { 702 Set<FlowElement> sinks = flowNode.getSinkElements(); 703 Map<FlowElement, Configuration> configs = new HashMap<>(); 704 705 for( FlowElement element : sinks ) 706 { 707 JobConf current = new JobConf( conf ); 708 709 if( element instanceof Tap ) 710 { 711 Tap tap = (Tap) element; 712 713 if( tap.getIdentifier() == null ) 714 throw new IllegalStateException( "tap may not have null identifier: " + element.toString() ); 715 716 tap.sinkConfInit( flowProcess, current ); 717 718 setLocalMode( conf, current, tap ); 719 } 720 721 String id = FlowElements.id( element ); 722 723 current.set( "cascading.node.sink", id ); 724 725 configs.put( element, current ); 726 } 727 728 return configs; 729 } 730 731 private void initFromNodeConfigDef( FlowNode flowNode, Configuration conf ) 732 { 733 initConfFromNodeConfigDef( flowNode.getElementGraph(), new ConfigurationSetter( conf ) ); 734 } 735 736 private void initFromStepConfigDef( Configuration conf ) 737 { 738 initConfFromStepConfigDef( new ConfigurationSetter( conf ) ); 739 } 740 741 protected void initFromTraps( FlowNode flowNode, FlowProcess<? extends Configuration> flowProcess, Configuration conf ) 742 { 743 Map<String, Tap> traps = flowNode.getTrapMap(); 744 745 if( !traps.isEmpty() ) 746 { 747 JobConf trapConf = new JobConf( conf ); 748 749 for( Tap tap : traps.values() ) 750 { 751 tap.sinkConfInit( flowProcess, trapConf ); 752 setLocalMode( conf, trapConf, tap ); 753 } 754 } 755 } 756 757 private Vertex newVertex( FlowNode flowNode, Configuration conf, int parallelism ) 758 { 759 conf.set( FlowNode.CASCADING_FLOW_NODE, pack( flowNode, conf ) ); // todo: pack into payload directly 760 761 ProcessorDescriptor descriptor = ProcessorDescriptor.create( FlowProcessor.class.getName() ); 762 763 descriptor.setUserPayload( getPayload( conf ) ); 764 765 Vertex vertex = Vertex.create( flowNode.getID(), descriptor, parallelism ); 766 767 if( environment != null ) 768 vertex.setTaskEnvironment( environment ); 769 770 return vertex; 771 } 772 773 private UserPayload getPayload( Configuration conf ) 774 { 775 try 776 { 777 return TezUtils.createUserPayloadFromConf( conf ); 778 } 779 catch( IOException exception ) 780 { 781 throw new CascadingException( exception ); 782 } 783 } 784 785 private String pack( Object object, Configuration conf ) 786 { 787 try 788 { 789 return serializeBase64( object, conf, true ); 790 } 791 catch( IOException exception ) 792 { 793 throw new FlowException( "unable to pack object: " + object.getClass().getCanonicalName(), exception ); 794 } 795 } 796 797 @Override 798 public void clean( TezConfiguration config ) 799 { 800 for( Tap sink : getSinkTaps() ) 801 { 802 if( sink.isTemporary() && ( getFlow().getFlowStats().isSuccessful() || getFlow().getRunID() == null ) ) 803 { 804 try 805 { 806 sink.deleteResource( config ); 807 } 808 catch( Exception exception ) 809 { 810 // sink all exceptions, don't fail app 811 logWarn( "unable to remove temporary file: " + sink, exception ); 812 } 813 } 814 else 815 { 816 cleanTapMetaData( config, sink ); 817 } 818 } 819 820 for( Tap tap : getTraps() ) 821 cleanTapMetaData( config, tap ); 822 } 823 824 private void cleanTapMetaData( TezConfiguration config, Tap tap ) 825 { 826 try 827 { 828 Hadoop18TapUtil.cleanupTapMetaData( config, tap ); 829 } 830 catch( IOException exception ) 831 { 832 // ignore exception 833 } 834 } 835 836 public void syncArtifacts() 837 { 838 // this may not be strictly necessary, but there is a condition where setting the access time 839 // fails, so there may be one were setting the modification time fails. if so, we can compensate. 840 Map<String, Long> timestamps = HadoopUtil.syncPaths( getConfig(), syncPaths, true ); 841 842 for( Map.Entry<String, Long> entry : timestamps.entrySet() ) 843 { 844 LocalResource localResource = allLocalResources.get( entry.getKey() ); 845 846 if( localResource != null ) 847 localResource.setTimestamp( entry.getValue() ); 848 } 849 } 850 851 private void setLocalMode( Configuration parent, JobConf current, Tap tap ) 852 { 853 // force step to local mode 854 if( !HadoopUtil.isLocal( current ) ) 855 return; 856 857 if( tap != null ) 858 logInfo( "tap forcing step to tez local mode: " + tap.getIdentifier() ); 859 860 HadoopUtil.setLocal( parent ); 861 } 862 863 private void addRemoteDebug( FlowNode flowNode, Vertex vertex ) 864 { 865 String value = System.getProperty( "test.debug.node", null ); 866 867 if( Util.isEmpty( value ) ) 868 return; 869 870 if( !flowNode.getSourceElementNames().contains( value ) && asInt( value ) != flowNode.getOrdinal() ) 871 return; 872 873 LOG.warn( "remote debugging enabled with property: {}, on node: {}, with node id: {}", "test.debug.node", value, flowNode.getID() ); 874 875 String opts = vertex.getTaskLaunchCmdOpts(); 876 877 if( opts == null ) 878 opts = ""; 879 880 String address = System.getProperty( "test.debug.address", "localhost:5005" ).trim(); 881 882 opts += " -agentlib:jdwp=transport=dt_socket,server=n,address=" + address + ",suspend=y"; 883 884 vertex.setTaskLaunchCmdOpts( opts ); 885 } 886 887 private void addRemoteProfiling( FlowNode flowNode, Vertex vertex ) 888 { 889 String value = System.getProperty( "test.profile.node", null ); 890 891 if( Util.isEmpty( value ) ) 892 return; 893 894 if( !flowNode.getSourceElementNames().contains( value ) && asInt( value ) != flowNode.getOrdinal() ) 895 return; 896 897 LOG.warn( "remote profiling enabled with property: {}, on node: {}, with node id: {}", "test.profile.node", value, flowNode.getID() ); 898 899 String opts = vertex.getTaskLaunchCmdOpts(); 900 901 if( opts == null ) 902 opts = ""; 903 904 String path = System.getProperty( "test.profile.path", "/tmp/jfr/" ); 905 906 if( !path.endsWith( "/" ) ) 907 path += "/"; 908 909 LOG.warn( "remote profiling property: {}, logging to path: {}", "test.profile.path", path ); 910 911 opts += String.format( " -XX:+UnlockCommercialFeatures -XX:+FlightRecorder -XX:FlightRecorderOptions=defaultrecording=true,dumponexit=true,dumponexitpath=%1$s%2$s,disk=true,repository=%1$s%2$s", path, flowNode.getID() ); 912 913 vertex.setTaskLaunchCmdOpts( opts ); 914 } 915 916 private int asInt( String value ) 917 { 918 try 919 { 920 return Integer.parseInt( value ); 921 } 922 catch( NumberFormatException exception ) 923 { 924 return -1; 925 } 926 } 927 928 public Map<String, LocalResource> getAllLocalResources() 929 { 930 return allLocalResources; 931 } 932 933 private static class EdgeValues 934 { 935 FlowElement flowElement; 936 TezConfiguration config; 937 Set<Integer> ordinals; 938 String keyClassName; 939 String valueClassName; 940 String keyComparatorClassName; 941 String keyPartitionerClassName; 942 String outputClassName; 943 String inputClassName; 944 EdgeProperty.DataMovementType movementType; 945 EdgeProperty.DataSourceType sourceType; 946 EdgeProperty.SchedulingType schedulingType; 947 948 Map<Integer, Fields> resolvedKeyFieldsMap; 949 Map<Integer, Fields> resolvedSortFieldsMap; 950 Map<Integer, Fields> resolvedValueFieldsMap; 951 952 private EdgeValues( TezConfiguration config, ProcessEdge processEdge ) 953 { 954 this.config = config; 955 this.flowElement = processEdge.getFlowElement(); 956 this.ordinals = processEdge.getSourceProvidedOrdinals(); 957 958 this.resolvedKeyFieldsMap = processEdge.getResolvedKeyFields(); 959 this.resolvedSortFieldsMap = processEdge.getResolvedSortFields(); 960 this.resolvedValueFieldsMap = processEdge.getResolvedValueFields(); 961 } 962 963 public FlowElement getFlowElement() 964 { 965 return flowElement; 966 } 967 968 public TezConfiguration getConfig() 969 { 970 return config; 971 } 972 973 public Set getOrdinals() 974 { 975 return ordinals; 976 } 977 978 public String getKeyClassName() 979 { 980 return keyClassName; 981 } 982 983 public String getValueClassName() 984 { 985 return valueClassName; 986 } 987 988 public String getKeyComparatorClassName() 989 { 990 return keyComparatorClassName; 991 } 992 993 public String getKeyPartitionerClassName() 994 { 995 return keyPartitionerClassName; 996 } 997 998 public String getOutputClassName() 999 { 1000 return outputClassName; 1001 } 1002 1003 public String getInputClassName() 1004 { 1005 return inputClassName; 1006 } 1007 1008 public EdgeProperty.DataMovementType getMovementType() 1009 { 1010 return movementType; 1011 } 1012 1013 public EdgeProperty.DataSourceType getSourceType() 1014 { 1015 return sourceType; 1016 } 1017 1018 public EdgeProperty.SchedulingType getSchedulingType() 1019 { 1020 return schedulingType; 1021 } 1022 1023 public Map<Integer, Fields> getResolvedKeyFieldsMap() 1024 { 1025 return resolvedKeyFieldsMap; 1026 } 1027 1028 public Map<Integer, Fields> getResolvedSortFieldsMap() 1029 { 1030 return resolvedSortFieldsMap; 1031 } 1032 1033 public Map<Integer, Fields> getResolvedValueFieldsMap() 1034 { 1035 return resolvedValueFieldsMap; 1036 } 1037 } 1038 }