001/* 002 * Copyright (c) 2016 Chris K Wensel <chris@wensel.net>. All Rights Reserved. 003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 004 * 005 * Project and contact information: http://www.cascading.org/ 006 * 007 * This file is part of the Cascading project. 008 * 009 * Licensed under the Apache License, Version 2.0 (the "License"); 010 * you may not use this file except in compliance with the License. 011 * You may obtain a copy of the License at 012 * 013 * http://www.apache.org/licenses/LICENSE-2.0 014 * 015 * Unless required by applicable law or agreed to in writing, software 016 * distributed under the License is distributed on an "AS IS" BASIS, 017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 018 * See the License for the specific language governing permissions and 019 * limitations under the License. 020 */ 021 022package cascading.flow.tez; 023 024import java.io.File; 025import java.io.IOException; 026import java.util.ArrayList; 027import java.util.Collection; 028import java.util.HashMap; 029import java.util.HashSet; 030import java.util.Iterator; 031import java.util.LinkedList; 032import java.util.List; 033import java.util.Map; 034import java.util.Set; 035 036import cascading.CascadingException; 037import cascading.flow.FlowElement; 038import cascading.flow.FlowElements; 039import cascading.flow.FlowException; 040import cascading.flow.FlowNode; 041import cascading.flow.FlowProcess; 042import cascading.flow.FlowRuntimeProps; 043import cascading.flow.hadoop.ConfigurationSetter; 044import cascading.flow.hadoop.util.HadoopUtil; 045import cascading.flow.planner.BaseFlowStep; 046import cascading.flow.planner.FlowStepJob; 047import cascading.flow.planner.graph.ElementGraph; 048import cascading.flow.planner.process.FlowNodeGraph; 049import cascading.flow.planner.process.ProcessEdge; 050import cascading.flow.stream.annotations.StreamMode; 051import cascading.flow.tez.planner.Hadoop2TezFlowStepJob; 052import cascading.flow.tez.util.TezUtil; 053import cascading.management.state.ClientState; 054import cascading.pipe.Boundary; 055import cascading.pipe.CoGroup; 056import cascading.pipe.Group; 057import cascading.pipe.GroupBy; 058import cascading.pipe.Merge; 059import cascading.pipe.Splice; 060import cascading.property.AppProps; 061import cascading.tap.CompositeTaps; 062import cascading.tap.Tap; 063import cascading.tap.hadoop.Hfs; 064import cascading.tap.hadoop.PartitionTap; 065import cascading.tap.hadoop.util.Hadoop18TapUtil; 066import cascading.tuple.Fields; 067import cascading.tuple.hadoop.TupleSerialization; 068import cascading.tuple.hadoop.util.GroupingSortingComparator; 069import cascading.tuple.hadoop.util.ReverseGroupingSortingComparator; 070import cascading.tuple.hadoop.util.ReverseTupleComparator; 071import cascading.tuple.hadoop.util.TupleComparator; 072import cascading.tuple.io.KeyTuple; 073import cascading.tuple.io.TuplePair; 074import cascading.tuple.io.ValueTuple; 075import cascading.tuple.tez.util.GroupingSortingPartitioner; 076import cascading.tuple.tez.util.TuplePartitioner; 077import cascading.util.Util; 078import cascading.util.Version; 079import org.apache.hadoop.conf.Configuration; 080import org.apache.hadoop.fs.FileSystem; 081import org.apache.hadoop.fs.Path; 082import org.apache.hadoop.mapred.JobConf; 083import org.apache.hadoop.mapreduce.JobContext; 084import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 085import org.apache.hadoop.yarn.api.records.LocalResource; 086import org.apache.hadoop.yarn.api.records.LocalResourceType; 087import org.apache.tez.common.TezUtils; 088import org.apache.tez.dag.api.DAG; 089import org.apache.tez.dag.api.DataSinkDescriptor; 090import org.apache.tez.dag.api.DataSourceDescriptor; 091import org.apache.tez.dag.api.Edge; 092import org.apache.tez.dag.api.EdgeProperty; 093import org.apache.tez.dag.api.GroupInputEdge; 094import org.apache.tez.dag.api.InputDescriptor; 095import org.apache.tez.dag.api.OutputDescriptor; 096import org.apache.tez.dag.api.ProcessorDescriptor; 097import org.apache.tez.dag.api.TezConfiguration; 098import org.apache.tez.dag.api.UserPayload; 099import org.apache.tez.dag.api.Vertex; 100import org.apache.tez.dag.api.VertexGroup; 101import org.apache.tez.mapreduce.input.MRInput; 102import org.apache.tez.mapreduce.output.MROutput; 103import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; 104import org.apache.tez.runtime.library.input.ConcatenatedMergedKeyValueInput; 105import org.apache.tez.runtime.library.input.OrderedGroupedKVInput; 106import org.apache.tez.runtime.library.input.OrderedGroupedMergedKVInput; 107import org.apache.tez.runtime.library.input.UnorderedKVInput; 108import org.apache.tez.runtime.library.output.OrderedPartitionedKVOutput; 109import org.apache.tez.runtime.library.output.UnorderedKVOutput; 110import org.apache.tez.runtime.library.output.UnorderedPartitionedKVOutput; 111import org.slf4j.Logger; 112import org.slf4j.LoggerFactory; 113 114import static cascading.flow.hadoop.util.HadoopUtil.*; 115import static cascading.flow.tez.util.TezUtil.addToClassPath; 116import static cascading.tap.hadoop.DistCacheTap.CASCADING_LOCAL_RESOURCES; 117import static cascading.tap.hadoop.DistCacheTap.CASCADING_REMOTE_RESOURCES; 118import static cascading.util.Util.getFirst; 119import static java.util.Collections.singletonList; 120import static org.apache.hadoop.yarn.api.records.LocalResourceType.ARCHIVE; 121import static org.apache.hadoop.yarn.api.records.LocalResourceType.FILE; 122 123/** 124 * 125 */ 126public class Hadoop2TezFlowStep extends BaseFlowStep<TezConfiguration> 127 { 128 private static final Logger LOG = LoggerFactory.getLogger( Hadoop2TezFlowStep.class ); 129 130 private Map<String, LocalResource> allLocalResources = new HashMap<>(); 131 private Map<Path, Path> syncPaths = new HashMap<>(); 132 private Map<String, String> environment = new HashMap<>(); 133 134 public Hadoop2TezFlowStep( ElementGraph elementGraph, FlowNodeGraph flowNodeGraph ) 135 { 136 super( elementGraph, flowNodeGraph ); 137 } 138 139 @Override 140 public Map<Object, Object> getConfigAsProperties() 141 { 142 return HadoopUtil.createProperties( getConfig() ); 143 } 144 145 @Override 146 public TezConfiguration createInitializedConfig( FlowProcess<TezConfiguration> flowProcess, TezConfiguration parentConfig ) 147 { 148 TezConfiguration stepConf = parentConfig == null ? new TezConfiguration() : new TezConfiguration( parentConfig ); 149 150 TupleSerialization.setSerializations( stepConf ); 151 152 String versionString = Version.getRelease(); 153 154 if( versionString != null ) 155 stepConf.set( "cascading.version", versionString ); 156 157 stepConf.set( CASCADING_FLOW_STEP_ID, getID() ); 158 stepConf.set( "cascading.flow.step.num", Integer.toString( getOrdinal() ) ); 159 160 String flowStagingPath = ( (Hadoop2TezFlow) getFlow() ).getFlowStagingPath(); 161 List<String> classPath = ( (Hadoop2TezFlow) getFlow() ).getClassPath(); 162 163 // is updated in addToClassPath method 164 Map<String, LocalResource> dagResources = new HashMap<>(); 165 166 if( !classPath.isEmpty() ) 167 { 168 // jars in the root will be in the remote CLASSPATH, no need to add to the environment 169 Map<Path, Path> dagClassPath = addToClassPath( stepConf, flowStagingPath, null, classPath, FILE, dagResources, null ); 170 171 syncPaths.putAll( dagClassPath ); 172 } 173 174 String appJarPath = stepConf.get( AppProps.APP_JAR_PATH ); 175 176 if( appJarPath != null ) 177 { 178 // the PATTERN represents the insides of the app jar, those elements must be added to the remote CLASSPATH 179 List<String> classpath = singletonList( appJarPath ); 180 Map<Path, Path> pathMap = addToClassPath( stepConf, flowStagingPath, null, classpath, ARCHIVE, dagResources, environment ); 181 182 syncPaths.putAll( pathMap ); 183 184 // AM does not support environments like containers do, so the classpath has to be passed via configuration. 185 String fileName = new File( appJarPath ).getName(); 186 stepConf.set( TezConfiguration.TEZ_CLUSTER_ADDITIONAL_CLASSPATH_PREFIX, 187 "$PWD/" + fileName + "/:$PWD/" + fileName + "/classes/:$PWD/" + fileName + "/lib/*:" ); 188 } 189 190 allLocalResources.putAll( dagResources ); 191 192 initFromStepConfigDef( stepConf ); 193 194 return stepConf; 195 } 196 197 @Override 198 protected FlowStepJob createFlowStepJob( ClientState clientState, FlowProcess<TezConfiguration> flowProcess, TezConfiguration initializedStepConfig ) 199 { 200 DAG dag = createDAG( flowProcess, initializedStepConfig ); 201 202 return new Hadoop2TezFlowStepJob( clientState, this, initializedStepConfig, dag ); 203 } 204 205 private DAG createDAG( FlowProcess<TezConfiguration> flowProcess, TezConfiguration initializedConfig ) 206 { 207 FlowNodeGraph nodeGraph = getFlowNodeGraph(); 208 Map<FlowNode, Vertex> vertexMap = new HashMap<>(); 209 DAG dag = DAG.create( getStepDisplayName( initializedConfig.getInt( "cascading.display.id.truncate", Util.ID_LENGTH ) ) ); 210 211 dag.addTaskLocalFiles( allLocalResources ); 212 213 Iterator<FlowNode> iterator = nodeGraph.getOrderedTopologicalIterator(); // ordering of nodes for consistent remote debugging 214 215 while( iterator.hasNext() ) 216 { 217 FlowNode flowNode = iterator.next(); 218 219 Vertex vertex = createVertex( flowProcess, initializedConfig, flowNode ); 220 dag.addVertex( vertex ); 221 222 vertexMap.put( flowNode, vertex ); 223 } 224 225 LinkedList<ProcessEdge> processedEdges = new LinkedList<>(); 226 227 for( ProcessEdge processEdge : nodeGraph.edgeSet() ) 228 { 229 if( processedEdges.contains( processEdge ) ) 230 continue; 231 232 FlowNode edgeTargetFlowNode = nodeGraph.getEdgeTarget( processEdge ); 233 234 FlowElement flowElement = processEdge.getFlowElement(); 235 List<FlowNode> sourceNodes = nodeGraph.getElementSourceProcesses( flowElement ); 236 237 EdgeProperty edgeProperty = createEdgeProperty( initializedConfig, processEdge ); 238 239 Vertex targetVertex = vertexMap.get( edgeTargetFlowNode ); 240 241 if( sourceNodes.size() == 1 || flowElement instanceof CoGroup || flowElement instanceof Boundary ) // todo: create group vertices around incoming ordinal 242 { 243 FlowNode edgeSourceFlowNode = nodeGraph.getEdgeSource( processEdge ); 244 Vertex sourceVertex = vertexMap.get( edgeSourceFlowNode ); 245 246 LOG.debug( "adding edge between: {} and {}", sourceVertex, targetVertex ); 247 248 dag.addEdge( Edge.create( sourceVertex, targetVertex, edgeProperty ) ); 249 } 250 else if( flowElement instanceof GroupBy || flowElement instanceof Merge ) // merge - source nodes > 1 251 { 252 List<String> sourceVerticesIDs = new ArrayList<>(); 253 List<Vertex> sourceVertices = new ArrayList<>(); 254 255 for( FlowNode edgeSourceFlowNode : sourceNodes ) 256 { 257 sourceVerticesIDs.add( edgeSourceFlowNode.getID() ); 258 sourceVertices.add( vertexMap.get( edgeSourceFlowNode ) ); 259 processedEdges.add( nodeGraph.getEdge( edgeSourceFlowNode, edgeTargetFlowNode ) ); 260 } 261 262 VertexGroup vertexGroup = dag.createVertexGroup( edgeTargetFlowNode.getID(), sourceVertices.toArray( new Vertex[ sourceVertices.size() ] ) ); 263 264 String inputClassName = flowElement instanceof Group ? OrderedGroupedMergedKVInput.class.getName() : ConcatenatedMergedKeyValueInput.class.getName(); 265 266 InputDescriptor inputDescriptor = InputDescriptor.create( inputClassName ).setUserPayload( edgeProperty.getEdgeDestination().getUserPayload() ); 267 268 String type = ( (Splice) flowElement ).isMerge() ? "merged" : "grouped"; 269 LOG.info( "adding {} edge between: {} and {}", type, Util.join( sourceVerticesIDs, "," ), targetVertex.getName() ); 270 dag.addEdge( GroupInputEdge.create( vertexGroup, targetVertex, edgeProperty, inputDescriptor ) ); 271 } 272 else 273 { 274 throw new UnsupportedOperationException( "can't make edge for: " + flowElement ); 275 } 276 } 277 278 return dag; 279 } 280 281 private EdgeProperty createEdgeProperty( TezConfiguration config, ProcessEdge processEdge ) 282 { 283 FlowElement flowElement = processEdge.getFlowElement(); 284 285 EdgeValues edgeValues = new EdgeValues( new TezConfiguration( config ), processEdge ); 286 287 edgeValues.keyClassName = KeyTuple.class.getName(); // TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_CLASS 288 edgeValues.valueClassName = ValueTuple.class.getName(); // TEZ_RUNTIME_INTERMEDIATE_OUTPUT_VALUE_CLASS 289 edgeValues.keyComparatorClassName = TupleComparator.class.getName(); 290 edgeValues.keyPartitionerClassName = TuplePartitioner.class.getName(); 291 edgeValues.outputClassName = null; 292 edgeValues.inputClassName = null; 293 edgeValues.movementType = null; 294 edgeValues.sourceType = null; 295 edgeValues.schedulingType = null; 296 297 if( flowElement instanceof Group ) 298 applyGroup( edgeValues ); 299 else if( ( flowElement instanceof Boundary || flowElement instanceof Merge ) && processEdge.getSinkAnnotations().contains( StreamMode.Accumulated ) ) 300 applyBoundaryMergeAccumulated( edgeValues ); 301 else if( flowElement instanceof Boundary || flowElement instanceof Merge ) 302 applyBoundaryMerge( edgeValues ); 303 else 304 throw new IllegalStateException( "unsupported flow element: " + flowElement.getClass().getCanonicalName() ); 305 306 applyEdgeAnnotations( processEdge, edgeValues ); 307 308 return createEdgeProperty( edgeValues ); 309 } 310 311 private void applyEdgeAnnotations( ProcessEdge processEdge, EdgeValues edgeValues ) 312 { 313 processEdge.addEdgeAnnotation( edgeValues.movementType ); 314 processEdge.addEdgeAnnotation( edgeValues.sourceType ); 315 processEdge.addEdgeAnnotation( edgeValues.schedulingType ); 316 } 317 318 private EdgeValues applyBoundaryMerge( EdgeValues edgeValues ) 319 { 320 // todo: support for one to one 321 edgeValues.outputClassName = UnorderedPartitionedKVOutput.class.getName(); 322 edgeValues.inputClassName = UnorderedKVInput.class.getName(); 323 324 edgeValues.movementType = EdgeProperty.DataMovementType.SCATTER_GATHER; 325 edgeValues.sourceType = EdgeProperty.DataSourceType.PERSISTED; 326 edgeValues.schedulingType = EdgeProperty.SchedulingType.SEQUENTIAL; 327 328 return edgeValues; 329 } 330 331 private EdgeValues applyBoundaryMergeAccumulated( EdgeValues edgeValues ) 332 { 333 edgeValues.outputClassName = UnorderedKVOutput.class.getName(); 334 edgeValues.inputClassName = UnorderedKVInput.class.getName(); 335 336 edgeValues.movementType = EdgeProperty.DataMovementType.BROADCAST; 337 edgeValues.sourceType = EdgeProperty.DataSourceType.PERSISTED; 338 edgeValues.schedulingType = EdgeProperty.SchedulingType.SEQUENTIAL; 339 340 return edgeValues; 341 } 342 343 private EdgeValues applyGroup( EdgeValues edgeValues ) 344 { 345 Group group = (Group) edgeValues.flowElement; 346 347 if( group.isSortReversed() ) 348 edgeValues.keyComparatorClassName = ReverseTupleComparator.class.getName(); 349 350 int ordinal = getFirst( edgeValues.ordinals ); 351 352 addComparators( edgeValues.config, "cascading.group.comparator", group.getKeySelectors(), edgeValues.getResolvedKeyFieldsMap().get( ordinal ) ); 353 354 if( !group.isGroupBy() ) 355 { 356 edgeValues.outputClassName = OrderedPartitionedKVOutput.class.getName(); 357 edgeValues.inputClassName = OrderedGroupedKVInput.class.getName(); 358 359 edgeValues.movementType = EdgeProperty.DataMovementType.SCATTER_GATHER; 360 edgeValues.sourceType = EdgeProperty.DataSourceType.PERSISTED; 361 edgeValues.schedulingType = EdgeProperty.SchedulingType.SEQUENTIAL; 362 } 363 else 364 { 365 addComparators( edgeValues.config, "cascading.sort.comparator", group.getSortingSelectors(), edgeValues.getResolvedSortFieldsMap().get( ordinal ) ); 366 367 edgeValues.outputClassName = OrderedPartitionedKVOutput.class.getName(); 368 edgeValues.inputClassName = OrderedGroupedKVInput.class.getName(); 369 370 edgeValues.movementType = EdgeProperty.DataMovementType.SCATTER_GATHER; 371 edgeValues.sourceType = EdgeProperty.DataSourceType.PERSISTED; 372 edgeValues.schedulingType = EdgeProperty.SchedulingType.SEQUENTIAL; 373 } 374 375 if( group.isSorted() ) 376 { 377 edgeValues.keyClassName = TuplePair.class.getName(); 378 edgeValues.keyPartitionerClassName = GroupingSortingPartitioner.class.getName(); 379 380 if( group.isSortReversed() ) 381 edgeValues.keyComparatorClassName = ReverseGroupingSortingComparator.class.getName(); 382 else 383 edgeValues.keyComparatorClassName = GroupingSortingComparator.class.getName(); 384 } 385 386 return edgeValues; 387 } 388 389 private EdgeProperty createEdgeProperty( EdgeValues edgeValues ) 390 { 391 TezConfiguration outputConfig = new TezConfiguration( edgeValues.getConfig() ); 392 outputConfig.set( "cascading.node.sink", FlowElements.id( edgeValues.getFlowElement() ) ); 393 outputConfig.set( "cascading.node.ordinals", Util.join( edgeValues.getOrdinals(), "," ) ); 394 addFields( outputConfig, "cascading.node.key.fields", edgeValues.getResolvedKeyFieldsMap() ); 395 addFields( outputConfig, "cascading.node.sort.fields", edgeValues.getResolvedSortFieldsMap() ); 396 addFields( outputConfig, "cascading.node.value.fields", edgeValues.getResolvedValueFieldsMap() ); 397 398 UserPayload outputPayload = createIntermediatePayloadOutput( outputConfig, edgeValues ); 399 400 TezConfiguration inputConfig = new TezConfiguration( edgeValues.getConfig() ); 401 inputConfig.set( "cascading.node.source", FlowElements.id( edgeValues.getFlowElement() ) ); 402 inputConfig.set( "cascading.node.ordinals", Util.join( edgeValues.getOrdinals(), "," ) ); 403 addFields( inputConfig, "cascading.node.key.fields", edgeValues.getResolvedKeyFieldsMap() ); 404 addFields( inputConfig, "cascading.node.sort.fields", edgeValues.getResolvedSortFieldsMap() ); 405 addFields( inputConfig, "cascading.node.value.fields", edgeValues.getResolvedValueFieldsMap() ); 406 407 UserPayload inputPayload = createIntermediatePayloadInput( inputConfig, edgeValues ); 408 409 return EdgeProperty.create( 410 edgeValues.getMovementType(), 411 edgeValues.getSourceType(), 412 edgeValues.getSchedulingType(), 413 OutputDescriptor.create( edgeValues.getOutputClassName() ).setUserPayload( outputPayload ), 414 InputDescriptor.create( edgeValues.getInputClassName() ).setUserPayload( inputPayload ) 415 ); 416 } 417 418 private UserPayload createIntermediatePayloadOutput( TezConfiguration config, EdgeValues edgeValues ) 419 { 420 config.set( TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, edgeValues.keyClassName ); 421 config.set( TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, edgeValues.valueClassName ); 422 config.set( TezRuntimeConfiguration.TEZ_RUNTIME_KEY_COMPARATOR_CLASS, edgeValues.keyComparatorClassName ); 423 config.set( TezRuntimeConfiguration.TEZ_RUNTIME_PARTITIONER_CLASS, edgeValues.keyPartitionerClassName ); 424 425 setWorkingDirectory( config ); 426 427 return getPayload( config ); 428 } 429 430 private UserPayload createIntermediatePayloadInput( TezConfiguration config, EdgeValues edgeValues ) 431 { 432 config.set( TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, edgeValues.keyClassName ); 433 config.set( TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, edgeValues.valueClassName ); 434 config.set( TezRuntimeConfiguration.TEZ_RUNTIME_KEY_COMPARATOR_CLASS, edgeValues.keyComparatorClassName ); 435 config.set( TezRuntimeConfiguration.TEZ_RUNTIME_PARTITIONER_CLASS, edgeValues.keyPartitionerClassName ); 436 437 setWorkingDirectory( config ); 438 439 return getPayload( config ); 440 } 441 442 private static void setWorkingDirectory( Configuration conf ) 443 { 444 String name = conf.get( JobContext.WORKING_DIR ); 445 446 if( name != null ) 447 return; 448 449 try 450 { 451 Path dir = FileSystem.get( conf ).getWorkingDirectory(); 452 conf.set( JobContext.WORKING_DIR, dir.toString() ); 453 } 454 catch( IOException exception ) 455 { 456 throw new RuntimeException( exception ); 457 } 458 } 459 460 public Vertex createVertex( FlowProcess<TezConfiguration> flowProcess, TezConfiguration initializedConfig, FlowNode flowNode ) 461 { 462 JobConf conf = new JobConf( initializedConfig ); 463 464 addInputOutputMapping( conf, flowNode ); 465 466 conf.setBoolean( "mapred.used.genericoptionsparser", true ); 467 468 Map<String, LocalResource> taskLocalResources = new HashMap<>(); 469 470 Map<FlowElement, Configuration> sourceConfigs = initFromSources( flowNode, flowProcess, conf, taskLocalResources ); 471 Map<FlowElement, Configuration> sinkConfigs = initFromSinks( flowNode, flowProcess, conf ); 472 473 initFromTraps( flowNode, flowProcess, conf ); 474 475 initFromNodeConfigDef( flowNode, conf ); 476 477 // force step to local mode if any tap is local 478 setLocalMode( initializedConfig, conf, null ); 479 480 conf.set( "cascading.flow.node.num", Integer.toString( flowNode.getOrdinal() ) ); 481 482 HadoopUtil.setIsInflow( conf ); // must be called after all taps configurations have been retrieved 483 484 int parallelism = getParallelism( flowNode, conf ); 485 486 if( parallelism == 0 ) 487 throw new FlowException( getName(), "the default number of gather partitions must be set, see cascading.flow.FlowRuntimeProps" ); 488 489 flowNode.addProcessAnnotation( FlowRuntimeProps.GATHER_PARTITIONS, Integer.toString( parallelism ) ); 490 491 Vertex vertex = newVertex( flowNode, conf, parallelism ); 492 493 if( !taskLocalResources.isEmpty() ) 494 vertex.addTaskLocalFiles( taskLocalResources ); 495 496 for( FlowElement flowElement : sourceConfigs.keySet() ) 497 { 498 if( !( flowElement instanceof Tap ) ) 499 continue; 500 501 Configuration sourceConf = sourceConfigs.get( flowElement ); 502 503 // not setting the new-api value could result in failures if not set by the Scheme 504 if( sourceConf.get( "mapred.mapper.new-api" ) == null ) 505 HadoopUtil.setNewApi( sourceConf, sourceConf.get( "mapred.input.format.class", sourceConf.get( "mapreduce.job.inputformat.class" ) ) ); 506 507 // unfortunately we cannot just load the input format and set it on the builder with also pulling all other 508 // values out of the configuration. 509 MRInput.MRInputConfigBuilder configBuilder = MRInput.createConfigBuilder( sourceConf, null ); 510 511 // the default in Tez is true, this overrides 512 if( conf.get( FlowRuntimeProps.COMBINE_SPLITS ) != null ) 513 configBuilder.groupSplits( conf.getBoolean( FlowRuntimeProps.COMBINE_SPLITS, true ) ); 514 515 // grouping splits loses file name info, breaking partition tap default impl 516 if( !CompositeTaps.unwindNarrow( PartitionTap.class, (Tap) flowElement ).isEmpty() ) // todo: generify 517 configBuilder.groupSplits( false ); 518 519 DataSourceDescriptor dataSourceDescriptor = configBuilder.build(); 520 521 vertex.addDataSource( FlowElements.id( flowElement ), dataSourceDescriptor ); 522 } 523 524 for( FlowElement flowElement : sinkConfigs.keySet() ) 525 { 526 if( !( flowElement instanceof Tap ) ) 527 continue; 528 529 Configuration sinkConf = sinkConfigs.get( flowElement ); 530 531 Class outputFormatClass; 532 String outputPath; 533 534 // we have to set sane defaults if not set by the tap 535 // typically the case of MultiSinkTap 536 String formatClassName = sinkConf.get( "mapred.output.format.class", sinkConf.get( "mapreduce.job.outputformat.class" ) ); 537 538 if( formatClassName == null ) 539 { 540 outputFormatClass = TextOutputFormat.class; // unused, use "new" api, its the default 541 outputPath = Hfs.getTempPath( sinkConf ).toString(); // unused 542 } 543 else 544 { 545 outputFormatClass = Util.loadClass( formatClassName ); 546 outputPath = getOutputPath( sinkConf ); 547 } 548 549 if( outputPath == null && getOutputPath( sinkConf ) == null && isFileOutputFormat( outputFormatClass ) ) 550 outputPath = Hfs.getTempPath( sinkConf ).toString(); // unused 551 552 MROutput.MROutputConfigBuilder configBuilder = MROutput.createConfigBuilder( sinkConf, outputFormatClass, outputPath ); 553 554 DataSinkDescriptor dataSinkDescriptor = configBuilder.build(); 555 556 vertex.addDataSink( FlowElements.id( flowElement ), dataSinkDescriptor ); 557 } 558 559 addRemoteDebug( flowNode, vertex ); 560 addRemoteProfiling( flowNode, vertex ); 561 562 if( vertex.getTaskLaunchCmdOpts() != null ) 563 flowNode.addProcessAnnotation( TezConfiguration.TEZ_TASK_LAUNCH_CMD_OPTS, vertex.getTaskLaunchCmdOpts() ); 564 565 return vertex; 566 } 567 568 protected String getOutputPath( Configuration sinkConf ) 569 { 570 return sinkConf.get( "mapred.output.dir", sinkConf.get( "mapreduce.output.fileoutputformat.outputdir" ) ); 571 } 572 573 protected boolean isFileOutputFormat( Class outputFormatClass ) 574 { 575 return org.apache.hadoop.mapred.FileOutputFormat.class.isAssignableFrom( outputFormatClass ) || 576 org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.class.isAssignableFrom( outputFormatClass ); 577 } 578 579 protected int getParallelism( FlowNode flowNode, JobConf conf ) 580 { 581 // only count streamed taps, accumulated taps are always annotated 582 HashSet<Tap> sourceStreamedTaps = new HashSet<>( flowNode.getSourceTaps() ); 583 584 sourceStreamedTaps.removeAll( flowNode.getSourceElements( StreamMode.Accumulated ) ); 585 586 if( sourceStreamedTaps.size() != 0 ) 587 return -1; 588 589 int parallelism = Integer.MAX_VALUE; 590 591 for( Tap tap : flowNode.getSinkTaps() ) 592 { 593 int numSinkParts = tap.getScheme().getNumSinkParts(); 594 595 if( numSinkParts == 0 ) 596 continue; 597 598 if( parallelism != Integer.MAX_VALUE ) 599 LOG.info( "multiple sink taps in flow node declaring numSinkParts, choosing lowest value. see cascading.flow.FlowRuntimeProps for broader control." ); 600 601 parallelism = Math.min( parallelism, numSinkParts ); 602 } 603 604 if( parallelism != Integer.MAX_VALUE ) 605 return parallelism; 606 607 return conf.getInt( FlowRuntimeProps.GATHER_PARTITIONS, 0 ); 608 } 609 610 private void addInputOutputMapping( JobConf conf, FlowNode flowNode ) 611 { 612 FlowNodeGraph flowNodeGraph = getFlowNodeGraph(); 613 Set<ProcessEdge> incomingEdges = flowNodeGraph.incomingEdgesOf( flowNode ); 614 615 for( ProcessEdge processEdge : incomingEdges ) 616 conf.set( "cascading.node.source." + processEdge.getFlowElementID(), processEdge.getSourceProcessID() ); 617 618 Set<ProcessEdge> outgoingEdges = flowNodeGraph.outgoingEdgesOf( flowNode ); 619 620 for( ProcessEdge processEdge : outgoingEdges ) 621 conf.set( "cascading.node.sink." + processEdge.getFlowElementID(), processEdge.getSinkProcessID() ); 622 } 623 624 protected Map<FlowElement, Configuration> initFromSources( FlowNode flowNode, FlowProcess<TezConfiguration> flowProcess, 625 Configuration conf, Map<String, LocalResource> taskLocalResources ) 626 { 627 Set<? extends FlowElement> accumulatedSources = flowNode.getSourceElements( StreamMode.Accumulated ); 628 629 for( FlowElement element : accumulatedSources ) 630 { 631 if( element instanceof Tap ) 632 { 633 JobConf current = new JobConf( conf ); 634 Tap tap = (Tap) element; 635 636 if( tap.getIdentifier() == null ) 637 throw new IllegalStateException( "tap may not have null identifier: " + tap.toString() ); 638 639 tap.sourceConfInit( flowProcess, current ); 640 641 Collection<String> paths = current.getStringCollection( CASCADING_LOCAL_RESOURCES + Tap.id( tap ) ); 642 643 if( !paths.isEmpty() ) 644 { 645 String flowStagingPath = ( (Hadoop2TezFlow) getFlow() ).getFlowStagingPath(); 646 String resourceSubPath = Tap.id( tap ); 647 Map<Path, Path> pathMap = TezUtil.addToClassPath( current, flowStagingPath, resourceSubPath, paths, LocalResourceType.FILE, taskLocalResources, null ); 648 649 current.setStrings( CASCADING_REMOTE_RESOURCES + Tap.id( tap ), taskLocalResources.keySet().toArray( new String[ taskLocalResources.size() ] ) ); 650 651 allLocalResources.putAll( taskLocalResources ); 652 syncPaths.putAll( pathMap ); 653 } 654 655 Map<String, String> map = flowProcess.diffConfigIntoMap( new TezConfiguration( conf ), new TezConfiguration( current ) ); 656 conf.set( "cascading.node.accumulated.source.conf." + Tap.id( tap ), pack( map, conf ) ); 657 658 setLocalMode( conf, current, tap ); 659 } 660 } 661 662 Set<FlowElement> sources = new HashSet<>( flowNode.getSourceElements() ); 663 664 sources.removeAll( accumulatedSources ); 665 666 if( sources.isEmpty() ) 667 throw new IllegalStateException( "all sources marked as accumulated" ); 668 669 Map<FlowElement, Configuration> configs = new HashMap<>(); 670 671 for( FlowElement element : sources ) 672 { 673 JobConf current = new JobConf( conf ); 674 675 String id = FlowElements.id( element ); 676 677 current.set( "cascading.node.source", id ); 678 679 if( element instanceof Tap ) 680 { 681 Tap tap = (Tap) element; 682 683 if( tap.getIdentifier() == null ) 684 throw new IllegalStateException( "tap may not have null identifier: " + tap.toString() ); 685 686 tap.sourceConfInit( flowProcess, current ); 687 688 setLocalMode( conf, current, tap ); 689 } 690 691 configs.put( element, current ); 692 } 693 694 return configs; 695 } 696 697 protected Map<FlowElement, Configuration> initFromSinks( FlowNode flowNode, FlowProcess<? extends Configuration> flowProcess, Configuration conf ) 698 { 699 Set<FlowElement> sinks = flowNode.getSinkElements(); 700 Map<FlowElement, Configuration> configs = new HashMap<>(); 701 702 for( FlowElement element : sinks ) 703 { 704 JobConf current = new JobConf( conf ); 705 706 if( element instanceof Tap ) 707 { 708 Tap tap = (Tap) element; 709 710 if( tap.getIdentifier() == null ) 711 throw new IllegalStateException( "tap may not have null identifier: " + element.toString() ); 712 713 tap.sinkConfInit( flowProcess, current ); 714 715 setLocalMode( conf, current, tap ); 716 } 717 718 String id = FlowElements.id( element ); 719 720 current.set( "cascading.node.sink", id ); 721 722 configs.put( element, current ); 723 } 724 725 return configs; 726 } 727 728 private void initFromNodeConfigDef( FlowNode flowNode, Configuration conf ) 729 { 730 initConfFromNodeConfigDef( flowNode.getElementGraph(), new ConfigurationSetter( conf ) ); 731 } 732 733 private void initFromStepConfigDef( Configuration conf ) 734 { 735 initConfFromStepConfigDef( new ConfigurationSetter( conf ) ); 736 } 737 738 protected void initFromTraps( FlowNode flowNode, FlowProcess<? extends Configuration> flowProcess, Configuration conf ) 739 { 740 Map<String, Tap> traps = flowNode.getTrapMap(); 741 742 if( !traps.isEmpty() ) 743 { 744 JobConf trapConf = new JobConf( conf ); 745 746 for( Tap tap : traps.values() ) 747 { 748 tap.sinkConfInit( flowProcess, trapConf ); 749 setLocalMode( conf, trapConf, tap ); 750 } 751 } 752 } 753 754 private Vertex newVertex( FlowNode flowNode, Configuration conf, int parallelism ) 755 { 756 conf.set( FlowNode.CASCADING_FLOW_NODE, pack( flowNode, conf ) ); // todo: pack into payload directly 757 758 ProcessorDescriptor descriptor = ProcessorDescriptor.create( FlowProcessor.class.getName() ); 759 760 descriptor.setUserPayload( getPayload( conf ) ); 761 762 Vertex vertex = Vertex.create( flowNode.getID(), descriptor, parallelism ); 763 764 if( environment != null ) 765 vertex.setTaskEnvironment( environment ); 766 767 return vertex; 768 } 769 770 private UserPayload getPayload( Configuration conf ) 771 { 772 try 773 { 774 return TezUtils.createUserPayloadFromConf( conf ); 775 } 776 catch( IOException exception ) 777 { 778 throw new CascadingException( exception ); 779 } 780 } 781 782 private String pack( Object object, Configuration conf ) 783 { 784 try 785 { 786 return serializeBase64( object, conf, true ); 787 } 788 catch( IOException exception ) 789 { 790 throw new FlowException( "unable to pack object: " + object.getClass().getCanonicalName(), exception ); 791 } 792 } 793 794 @Override 795 public void clean( TezConfiguration config ) 796 { 797 for( Tap sink : getSinkTaps() ) 798 { 799 if( sink.isTemporary() && ( getFlow().getFlowStats().isSuccessful() || getFlow().getRunID() == null ) ) 800 { 801 try 802 { 803 sink.deleteResource( config ); 804 } 805 catch( Exception exception ) 806 { 807 // sink all exceptions, don't fail app 808 logWarn( "unable to remove temporary file: " + sink, exception ); 809 } 810 } 811 else 812 { 813 cleanTapMetaData( config, sink ); 814 } 815 } 816 817 for( Tap tap : getTraps() ) 818 cleanTapMetaData( config, tap ); 819 } 820 821 private void cleanTapMetaData( TezConfiguration config, Tap tap ) 822 { 823 try 824 { 825 Hadoop18TapUtil.cleanupTapMetaData( config, tap ); 826 } 827 catch( IOException exception ) 828 { 829 // ignore exception 830 } 831 } 832 833 public void syncArtifacts() 834 { 835 // this may not be strictly necessary, but there is a condition where setting the access time 836 // fails, so there may be one were setting the modification time fails. if so, we can compensate. 837 Map<String, Long> timestamps = HadoopUtil.syncPaths( getConfig(), syncPaths, true ); 838 839 for( Map.Entry<String, Long> entry : timestamps.entrySet() ) 840 { 841 LocalResource localResource = allLocalResources.get( entry.getKey() ); 842 843 if( localResource != null ) 844 localResource.setTimestamp( entry.getValue() ); 845 } 846 } 847 848 private void setLocalMode( Configuration parent, JobConf current, Tap tap ) 849 { 850 // force step to local mode 851 if( !HadoopUtil.isLocal( current ) ) 852 return; 853 854 if( tap != null ) 855 logInfo( "tap forcing step to tez local mode: " + tap.getIdentifier() ); 856 857 HadoopUtil.setLocal( parent ); 858 } 859 860 private void addRemoteDebug( FlowNode flowNode, Vertex vertex ) 861 { 862 String value = System.getProperty( "test.debug.node", null ); 863 864 if( Util.isEmpty( value ) ) 865 return; 866 867 if( !flowNode.getSourceElementNames().contains( value ) && asInt( value ) != flowNode.getOrdinal() ) 868 return; 869 870 LOG.warn( "remote debugging enabled with property: {}, on node: {}, with node id: {}", "test.debug.node", value, flowNode.getID() ); 871 872 String opts = vertex.getTaskLaunchCmdOpts(); 873 874 if( opts == null ) 875 opts = ""; 876 877 String address = System.getProperty( "test.debug.address", "localhost:5005" ).trim(); 878 879 opts += " -agentlib:jdwp=transport=dt_socket,server=n,address=" + address + ",suspend=y"; 880 881 vertex.setTaskLaunchCmdOpts( opts ); 882 } 883 884 private void addRemoteProfiling( FlowNode flowNode, Vertex vertex ) 885 { 886 String value = System.getProperty( "test.profile.node", null ); 887 888 if( Util.isEmpty( value ) ) 889 return; 890 891 if( !flowNode.getSourceElementNames().contains( value ) && asInt( value ) != flowNode.getOrdinal() ) 892 return; 893 894 LOG.warn( "remote profiling enabled with property: {}, on node: {}, with node id: {}", "test.profile.node", value, flowNode.getID() ); 895 896 String opts = vertex.getTaskLaunchCmdOpts(); 897 898 if( opts == null ) 899 opts = ""; 900 901 String path = System.getProperty( "test.profile.path", "/tmp/jfr/" ); 902 903 if( !path.endsWith( "/" ) ) 904 path += "/"; 905 906 LOG.warn( "remote profiling property: {}, logging to path: {}", "test.profile.path", path ); 907 908 opts += String.format( " -XX:+UnlockCommercialFeatures -XX:+FlightRecorder -XX:FlightRecorderOptions=defaultrecording=true,dumponexit=true,dumponexitpath=%1$s%2$s,disk=true,repository=%1$s%2$s", path, flowNode.getID() ); 909 910 vertex.setTaskLaunchCmdOpts( opts ); 911 } 912 913 private int asInt( String value ) 914 { 915 try 916 { 917 return Integer.parseInt( value ); 918 } 919 catch( NumberFormatException exception ) 920 { 921 return -1; 922 } 923 } 924 925 public Map<String, LocalResource> getAllLocalResources() 926 { 927 return allLocalResources; 928 } 929 930 private static class EdgeValues 931 { 932 FlowElement flowElement; 933 TezConfiguration config; 934 Set<Integer> ordinals; 935 String keyClassName; 936 String valueClassName; 937 String keyComparatorClassName; 938 String keyPartitionerClassName; 939 String outputClassName; 940 String inputClassName; 941 EdgeProperty.DataMovementType movementType; 942 EdgeProperty.DataSourceType sourceType; 943 EdgeProperty.SchedulingType schedulingType; 944 945 Map<Integer, Fields> resolvedKeyFieldsMap; 946 Map<Integer, Fields> resolvedSortFieldsMap; 947 Map<Integer, Fields> resolvedValueFieldsMap; 948 949 private EdgeValues( TezConfiguration config, ProcessEdge processEdge ) 950 { 951 this.config = config; 952 this.flowElement = processEdge.getFlowElement(); 953 this.ordinals = processEdge.getSourceProvidedOrdinals(); 954 955 this.resolvedKeyFieldsMap = processEdge.getResolvedKeyFields(); 956 this.resolvedSortFieldsMap = processEdge.getResolvedSortFields(); 957 this.resolvedValueFieldsMap = processEdge.getResolvedValueFields(); 958 } 959 960 public FlowElement getFlowElement() 961 { 962 return flowElement; 963 } 964 965 public TezConfiguration getConfig() 966 { 967 return config; 968 } 969 970 public Set getOrdinals() 971 { 972 return ordinals; 973 } 974 975 public String getKeyClassName() 976 { 977 return keyClassName; 978 } 979 980 public String getValueClassName() 981 { 982 return valueClassName; 983 } 984 985 public String getKeyComparatorClassName() 986 { 987 return keyComparatorClassName; 988 } 989 990 public String getKeyPartitionerClassName() 991 { 992 return keyPartitionerClassName; 993 } 994 995 public String getOutputClassName() 996 { 997 return outputClassName; 998 } 999 1000 public String getInputClassName() 1001 { 1002 return inputClassName; 1003 } 1004 1005 public EdgeProperty.DataMovementType getMovementType() 1006 { 1007 return movementType; 1008 } 1009 1010 public EdgeProperty.DataSourceType getSourceType() 1011 { 1012 return sourceType; 1013 } 1014 1015 public EdgeProperty.SchedulingType getSchedulingType() 1016 { 1017 return schedulingType; 1018 } 1019 1020 public Map<Integer, Fields> getResolvedKeyFieldsMap() 1021 { 1022 return resolvedKeyFieldsMap; 1023 } 1024 1025 public Map<Integer, Fields> getResolvedSortFieldsMap() 1026 { 1027 return resolvedSortFieldsMap; 1028 } 1029 1030 public Map<Integer, Fields> getResolvedValueFieldsMap() 1031 { 1032 return resolvedValueFieldsMap; 1033 } 1034 } 1035 }