001/* 002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.flow.planner; 022 023import java.util.Arrays; 024import java.util.Collection; 025import java.util.Collections; 026import java.util.HashSet; 027import java.util.List; 028import java.util.Map; 029import java.util.Set; 030import java.util.TreeSet; 031 032import cascading.flow.AssemblyPlanner; 033import cascading.flow.BaseFlow; 034import cascading.flow.Flow; 035import cascading.flow.FlowConnector; 036import cascading.flow.FlowConnectorProps; 037import cascading.flow.FlowDef; 038import cascading.flow.FlowElement; 039import cascading.flow.FlowStep; 040import cascading.flow.Flows; 041import cascading.flow.planner.graph.ElementGraph; 042import cascading.flow.planner.graph.FlowElementGraph; 043import cascading.flow.planner.process.FlowNodeFactory; 044import cascading.flow.planner.process.FlowNodeGraph; 045import cascading.flow.planner.process.FlowStepFactory; 046import cascading.flow.planner.process.FlowStepGraph; 047import cascading.flow.planner.rule.ProcessLevel; 048import cascading.flow.planner.rule.RuleRegistry; 049import cascading.flow.planner.rule.RuleRegistrySet; 050import cascading.flow.planner.rule.RuleResult; 051import cascading.flow.planner.rule.RuleSetExec; 052import cascading.flow.planner.rule.transformer.IntermediateTapElementFactory; 053import cascading.flow.planner.rule.util.TraceWriter; 054import cascading.operation.AssertionLevel; 055import cascading.operation.DebugLevel; 056import cascading.pipe.Checkpoint; 057import cascading.pipe.OperatorException; 058import cascading.pipe.Pipe; 059import cascading.pipe.SubAssembly; 060import cascading.property.ConfigDef; 061import cascading.property.PropertyUtil; 062import cascading.scheme.Scheme; 063import cascading.tap.Tap; 064import cascading.tap.TapException; 065import cascading.tuple.Fields; 066import cascading.util.Update; 067import cascading.util.Util; 068import org.slf4j.Logger; 069import org.slf4j.LoggerFactory; 070 071import static cascading.util.Util.*; 072import static java.util.Arrays.asList; 073 074/** 075 * Class FlowPlanner is the base class for all planner implementations. 076 * <p/> 077 * This planner support tracing execution of each rule. See the appropriate properties on this 078 * class to enable. 079 */ 080public abstract class FlowPlanner<F extends BaseFlow, Config> 081 { 082 /** 083 * Enables the planner to write out basic planner information including the initial element-graph, 084 * completed element-graph, and the completed step-graph dot files. 085 */ 086 public static final String TRACE_PLAN_PATH = "cascading.planner.plan.path"; 087 088 /** 089 * Enables the planner to write out detail level planner information for each rule, including recursive 090 * transforms. 091 * <p/> 092 * Use this to debug rules. This does increase overhead during planning. 093 */ 094 public static final String TRACE_PLAN_TRANSFORM_PATH = "cascading.planner.plan.transforms.path"; 095 096 /** 097 * Enables the planner to write out planner statistics for each planner phase and rule. 098 */ 099 public static final String TRACE_STATS_PATH = "cascading.planner.stats.path"; 100 101 /** Field LOG */ 102 private static final Logger LOG = LoggerFactory.getLogger( FlowPlanner.class ); 103 104 /** Field properties */ 105 protected Map<Object, Object> defaultProperties; 106 107 protected String checkpointTapRootPath = null; 108 109 /** Field assertionLevel */ 110 protected AssertionLevel defaultAssertionLevel; 111 /** Field debugLevel */ 112 protected DebugLevel defaultDebugLevel; 113 114 /** 115 * Method getAssertionLevel returns the configured target planner {@link cascading.operation.AssertionLevel}. 116 * 117 * @param properties of type Map<Object, Object> 118 * @return AssertionLevel the configured AssertionLevel 119 */ 120 static AssertionLevel getAssertionLevel( Map<Object, Object> properties ) 121 { 122 String assertionLevel = PropertyUtil.getProperty( properties, "cascading.flowconnector.assertionlevel", AssertionLevel.STRICT.name() ); 123 124 return AssertionLevel.valueOf( assertionLevel ); 125 } 126 127 /** 128 * Method getDebugLevel returns the configured target planner {@link cascading.operation.DebugLevel}. 129 * 130 * @param properties of type Map<Object, Object> 131 * @return DebugLevel the configured DebugLevel 132 */ 133 static DebugLevel getDebugLevel( Map<Object, Object> properties ) 134 { 135 String debugLevel = PropertyUtil.getProperty( properties, "cascading.flowconnector.debuglevel", DebugLevel.DEFAULT.name() ); 136 137 return DebugLevel.valueOf( debugLevel ); 138 } 139 140 { 141 Update.registerPlanner( getClass() ); 142 } 143 144 public Map<Object, Object> getDefaultProperties() 145 { 146 return defaultProperties; 147 } 148 149 public abstract Config getDefaultConfig(); 150 151 public abstract PlannerInfo getPlannerInfo( String name ); 152 153 public abstract PlatformInfo getPlatformInfo(); 154 155 public void initialize( FlowConnector flowConnector, Map<Object, Object> properties ) 156 { 157 this.defaultProperties = properties; 158 this.defaultAssertionLevel = getAssertionLevel( properties ); 159 this.defaultDebugLevel = getDebugLevel( properties ); 160 } 161 162 public F buildFlow( FlowDef flowDef, RuleRegistrySet ruleRegistrySet ) 163 { 164 FlowElementGraph flowElementGraph = null; 165 166 try 167 { 168 flowDef = normalizeTaps( flowDef ); 169 170 verifyAllTaps( flowDef ); 171 172 F flow = createFlow( flowDef ); 173 174 Pipe[] tails = resolveTails( flowDef, flow ); 175 176 verifyAssembly( flowDef, tails ); 177 178 flowElementGraph = createFlowElementGraph( flowDef, tails ); 179 180 TraceWriter traceWriter = new TraceWriter( flow ); 181 RuleSetExec ruleSetExec = new RuleSetExec( traceWriter, this, flow, ruleRegistrySet, flowDef, flowElementGraph ); 182 183 RuleResult ruleResult = ruleSetExec.exec(); 184 185 traceWriter.writeTracePlan( null, "0-initial-flow-element-graph", flowElementGraph ); 186 187 FlowElementGraph finalFlowElementGraph = ruleResult.getAssemblyGraph(); 188 189 finalFlowElementGraph = flow.updateSchemes( finalFlowElementGraph ); 190 191 Map<ElementGraph, List<? extends ElementGraph>> stepToNodes = ruleResult.getStepToNodeGraphMap(); 192 Map<ElementGraph, List<? extends ElementGraph>> nodeToPipeline = ruleResult.getNodeToPipelineGraphMap(); 193 194 FlowStepGraph flowStepGraph = new FlowStepGraph( getFlowStepFactory(), finalFlowElementGraph, stepToNodes, nodeToPipeline ); 195 196 traceWriter.writeFinal( "1-final-flow-registry", ruleResult ); 197 traceWriter.writeTracePlan( null, "2-final-flow-element-graph", finalFlowElementGraph ); 198 traceWriter.writeTracePlan( null, "3-final-flow-step-graph", flowStepGraph ); 199 traceWriter.writeTracePlanSteps( "4-final-flow-steps", flowStepGraph ); 200 201 flow.setPlannerInfo( getPlannerInfo( ruleResult.getRegistry().getName() ) ); 202 203 flow.initialize( finalFlowElementGraph, flowStepGraph ); 204 205 return flow; 206 } 207 catch( Exception exception ) 208 { 209 throw handleExceptionDuringPlanning( flowDef, exception, flowElementGraph ); 210 } 211 } 212 213 protected abstract F createFlow( FlowDef flowDef ); 214 215 public abstract FlowStepFactory<Config> getFlowStepFactory(); 216 217 public FlowNodeFactory getFlowNodeFactory() 218 { 219 return new BaseFlowNodeFactory(); 220 } 221 222 public void configRuleRegistryDefaults( RuleRegistry ruleRegistry ) 223 { 224 225 } 226 227 protected Pipe[] resolveTails( FlowDef flowDef, F flow ) 228 { 229 Pipe[] tails = flowDef.getTailsArray(); 230 231 tails = resolveAssemblyPlanners( flowDef, flow, tails ); 232 233 return tails; 234 } 235 236 protected Pipe[] resolveAssemblyPlanners( FlowDef flowDef, Flow flow, Pipe[] pipes ) 237 { 238 List<Pipe> tails = Arrays.asList( pipes ); 239 240 List<AssemblyPlanner> assemblyPlanners = flowDef.getAssemblyPlanners(); 241 242 for( AssemblyPlanner assemblyPlanner : assemblyPlanners ) 243 { 244 tails = assemblyPlanner.resolveTails( new AssemblyPlannerContext( flowDef, flow, tails ) ); 245 246 if( tails.isEmpty() ) 247 throw new PlannerException( "assembly planner: " + assemblyPlanner + ", returned zero tails" ); 248 249 tails = Collections.unmodifiableList( tails ); 250 } 251 252 return tails.toArray( new Pipe[ tails.size() ] ); 253 } 254 255 protected void verifyAssembly( FlowDef flowDef, Pipe[] tails ) 256 { 257 verifyPipeAssemblyEndPoints( flowDef, tails ); 258 verifyTraps( flowDef, tails ); 259 verifyCheckpoints( flowDef, tails ); 260 } 261 262 protected void verifyAllTaps( FlowDef flowDef ) 263 { 264 verifySourceNotSinks( flowDef.getSources(), flowDef.getSinks() ); 265 266 verifyTaps( flowDef.getSources(), true, true ); 267 verifyTaps( flowDef.getSinks(), false, true ); 268 verifyTaps( flowDef.getTraps(), false, false ); 269 270 // are both sources and sinks 271 verifyTaps( flowDef.getCheckpoints(), true, false ); 272 verifyTaps( flowDef.getCheckpoints(), false, false ); 273 } 274 275 protected FlowElementGraph createFlowElementGraph( FlowDef flowDef, Pipe[] flowTails ) 276 { 277 Map<String, Tap> sources = flowDef.getSourcesCopy(); 278 Map<String, Tap> sinks = flowDef.getSinksCopy(); 279 Map<String, Tap> traps = flowDef.getTrapsCopy(); 280 Map<String, Tap> checkpoints = flowDef.getCheckpointsCopy(); 281 282 checkpointTapRootPath = makeCheckpointRootPath( flowDef ); 283 284 return new FlowElementGraph( getPlatformInfo(), flowTails, sources, sinks, traps, checkpoints, checkpointTapRootPath != null ); 285 } 286 287 private FlowDef normalizeTaps( FlowDef flowDef ) 288 { 289 Set<Tap> taps = new HashSet<>(); 290 291 Map<String, Tap> sources = flowDef.getSourcesCopy(); 292 Map<String, Tap> sinks = flowDef.getSinksCopy(); 293 Map<String, Tap> traps = flowDef.getTrapsCopy(); 294 Map<String, Tap> checkpoints = flowDef.getCheckpointsCopy(); 295 296 boolean sourcesHasDupes = addTaps( sources, taps ); 297 boolean sinksHasDupes = addTaps( sinks, taps ); 298 boolean trapsHasDupes = addTaps( traps, taps ); 299 boolean checkpointsHasDupes = addTaps( checkpoints, taps ); 300 301 if( sourcesHasDupes ) 302 normalize( taps, sources ); 303 304 if( sinksHasDupes ) 305 normalize( taps, sinks ); 306 307 if( trapsHasDupes ) 308 normalize( taps, traps ); 309 310 if( checkpointsHasDupes ) 311 normalize( taps, checkpoints ); 312 313 return Flows.copy( flowDef, sources, sinks, traps, checkpoints ); 314 } 315 316 private boolean addTaps( Map<String, Tap> current, Set<Tap> taps ) 317 { 318 int size = taps.size(); 319 320 taps.addAll( current.values() ); 321 322 // if all the added values are not unique, taps.size will be less than original size + num tap instances 323 return size + current.size() != taps.size(); 324 } 325 326 private void normalize( Set<Tap> taps, Map<String, Tap> current ) 327 { 328 for( Tap tap : taps ) 329 { 330 for( Map.Entry<String, Tap> entry : current.entrySet() ) 331 { 332 if( entry.getValue().equals( tap ) ) // force equivalent instance to being the same instance 333 entry.setValue( tap ); 334 } 335 } 336 } 337 338 private String makeCheckpointRootPath( FlowDef flowDef ) 339 { 340 String flowName = flowDef.getName(); 341 String runID = flowDef.getRunID(); 342 343 if( runID == null ) 344 return null; 345 346 if( flowName == null ) 347 throw new PlannerException( "flow name is required when providing a run id" ); 348 349 return flowName + "/" + runID; 350 } 351 352 protected void verifySourceNotSinks( Map<String, Tap> sources, Map<String, Tap> sinks ) 353 { 354 Collection<Tap> sourcesSet = sources.values(); 355 356 for( Tap tap : sinks.values() ) 357 { 358 if( sourcesSet.contains( tap ) ) 359 throw new PlannerException( "tap may not be used as both source and sink in the same Flow: " + tap ); 360 } 361 } 362 363 /** 364 * Method verifyTaps ... 365 * 366 * @param taps of type Map<String, Tap> 367 * @param areSources of type boolean 368 * @param mayNotBeEmpty of type boolean 369 */ 370 protected void verifyTaps( Map<String, Tap> taps, boolean areSources, boolean mayNotBeEmpty ) 371 { 372 if( mayNotBeEmpty && taps.isEmpty() ) 373 throw new PlannerException( ( areSources ? "source" : "sink" ) + " taps are required" ); 374 375 for( String tapName : taps.keySet() ) 376 { 377 if( areSources && !taps.get( tapName ).isSource() ) 378 throw new PlannerException( "tap named: '" + tapName + "', cannot be used as a source: " + taps.get( tapName ) ); 379 else if( !areSources && !taps.get( tapName ).isSink() ) 380 throw new PlannerException( "tap named: '" + tapName + "', cannot be used as a sink: " + taps.get( tapName ) ); 381 } 382 } 383 384 /** 385 * Method verifyEndPoints verifies 386 * <p/> 387 * there aren't dupe names in heads or tails. 388 * all the sink and source tap names match up with tail and head pipes 389 */ 390 // todo: force dupe names to throw exceptions 391 protected void verifyPipeAssemblyEndPoints( FlowDef flowDef, Pipe[] flowTails ) 392 { 393 Set<String> tapNames = new HashSet<String>(); 394 395 tapNames.addAll( flowDef.getSources().keySet() ); 396 tapNames.addAll( flowDef.getSinks().keySet() ); 397 398 // handle tails 399 Set<Pipe> tails = new HashSet<Pipe>(); 400 Set<String> tailNames = new HashSet<String>(); 401 402 for( Pipe pipe : flowTails ) 403 { 404 if( pipe instanceof SubAssembly ) 405 { 406 for( Pipe tail : ( (SubAssembly) pipe ).getTails() ) 407 { 408 String tailName = tail.getName(); 409 410 if( !tapNames.contains( tailName ) ) 411 throw new PlannerException( tail, "pipe name not found in either sink or source map: '" + tailName + "'" ); 412 413 if( tailNames.contains( tailName ) && !tails.contains( tail ) ) 414 throw new PlannerException( pipe, "duplicate tail name found: " + tailName ); 415 416 tailNames.add( tailName ); 417 tails.add( tail ); 418 } 419 } 420 else 421 { 422 String tailName = pipe.getName(); 423 424 if( !tapNames.contains( tailName ) ) 425 throw new PlannerException( pipe, "pipe name not found in either sink or source map: '" + tailName + "'" ); 426 427 if( tailNames.contains( tailName ) && !tails.contains( pipe ) ) 428 throw new PlannerException( pipe, "duplicate tail name found: " + tailName ); 429 430 tailNames.add( tailName ); 431 tails.add( pipe ); 432 } 433 } 434 435 tailNames.removeAll( flowDef.getSinks().keySet() ); 436 Set<String> remainingSinks = new HashSet<String>( flowDef.getSinks().keySet() ); 437 remainingSinks.removeAll( tailNames ); 438 439 if( tailNames.size() != 0 ) 440 throw new PlannerException( "not all tail pipes bound to sink taps, remaining tail pipe names: [" + join( quote( tailNames, "'" ), ", " ) + "], remaining sink tap names: [" + join( quote( remainingSinks, "'" ), ", " ) + "]" ); 441 442 // unlike heads, pipes can input to another pipe and simultaneously be a sink 443 // so there is no way to know all the intentional tails, so they aren't listed below in the exception 444 remainingSinks = new HashSet<String>( flowDef.getSinks().keySet() ); 445 remainingSinks.removeAll( asList( Pipe.names( flowTails ) ) ); 446 447 if( remainingSinks.size() != 0 ) 448 throw new PlannerException( "not all sink taps bound to tail pipes, remaining sink tap names: [" + join( quote( remainingSinks, "'" ), ", " ) + "]" ); 449 450 // handle heads 451 Set<Pipe> heads = new HashSet<Pipe>(); 452 Set<String> headNames = new HashSet<String>(); 453 454 for( Pipe pipe : flowTails ) 455 { 456 for( Pipe head : pipe.getHeads() ) 457 { 458 String headName = head.getName(); 459 460 if( !tapNames.contains( headName ) ) 461 throw new PlannerException( head, "pipe name not found in either sink or source map: '" + headName + "'" ); 462 463 if( headNames.contains( headName ) && !heads.contains( head ) ) 464 LOG.warn( "duplicate head name found, not an error but heads should have unique names: '{}'", headName ); 465 466 headNames.add( headName ); 467 heads.add( head ); 468 } 469 } 470 471 Set<String> allHeadNames = new HashSet<String>( headNames ); 472 headNames.removeAll( flowDef.getSources().keySet() ); 473 Set<String> remainingSources = new HashSet<String>( flowDef.getSources().keySet() ); 474 remainingSources.removeAll( headNames ); 475 476 if( headNames.size() != 0 ) 477 throw new PlannerException( "not all head pipes bound to source taps, remaining head pipe names: [" + join( quote( headNames, "'" ), ", " ) + "], remaining source tap names: [" + join( quote( remainingSources, "'" ), ", " ) + "]" ); 478 479 remainingSources = new HashSet<String>( flowDef.getSources().keySet() ); 480 remainingSources.removeAll( allHeadNames ); 481 482 if( remainingSources.size() != 0 ) 483 throw new PlannerException( "not all source taps bound to head pipes, remaining source tap names: [" + join( quote( remainingSources, "'" ), ", " ) + "], remaining head pipe names: [" + join( quote( headNames, "'" ), ", " ) + "]" ); 484 485 } 486 487 protected void verifyTraps( FlowDef flowDef, Pipe[] flowTails ) 488 { 489 verifyNotSourcesSinks( flowDef.getTraps(), flowDef.getSources(), flowDef.getSinks(), "trap" ); 490 491 Set<String> names = new HashSet<String>( asList( Pipe.names( flowTails ) ) ); 492 493 for( String name : flowDef.getTraps().keySet() ) 494 { 495 if( !names.contains( name ) ) 496 throw new PlannerException( "trap name not found in assembly: '" + name + "'" ); 497 } 498 } 499 500 protected void verifyCheckpoints( FlowDef flowDef, Pipe[] flowTails ) 501 { 502 verifyNotSourcesSinks( flowDef.getCheckpoints(), flowDef.getSources(), flowDef.getSinks(), "checkpoint" ); 503 504 for( Tap checkpointTap : flowDef.getCheckpoints().values() ) 505 { 506 Scheme scheme = checkpointTap.getScheme(); 507 508 if( scheme.getSourceFields().equals( Fields.UNKNOWN ) && scheme.getSinkFields().equals( Fields.ALL ) ) 509 continue; 510 511 throw new PlannerException( "checkpoint tap scheme must be undeclared, source fields must be UNKNOWN, and sink fields ALL, got: " + scheme.toString() ); 512 } 513 514 Set<String> names = new HashSet<String>( asList( Pipe.names( flowTails ) ) ); 515 516 for( String name : flowDef.getCheckpoints().keySet() ) 517 { 518 if( !names.contains( name ) ) 519 throw new PlannerException( "named checkpoint declared in FlowDef, but no named branch found in pipe assembly: '" + name + "'" ); 520 521 Set<Pipe> pipes = new HashSet<Pipe>( asList( Pipe.named( name, flowTails ) ) ); 522 523 int count = 0; 524 525 for( Pipe pipe : pipes ) 526 { 527 if( pipe instanceof Checkpoint ) 528 count++; 529 } 530 531 if( count == 0 ) 532 throw new PlannerException( "no checkpoint pipe with branch name found in pipe assembly: '" + name + "'" ); 533 534 if( count > 1 ) 535 throw new PlannerException( "more than one checkpoint pipe with branch name found in pipe assembly: '" + name + "'" ); 536 } 537 } 538 539 private void verifyNotSourcesSinks( Map<String, Tap> taps, Map<String, Tap> sources, Map<String, Tap> sinks, String role ) 540 { 541 Collection<Tap> sourceTaps = sources.values(); 542 Collection<Tap> sinkTaps = sinks.values(); 543 544 for( Tap tap : taps.values() ) 545 { 546 if( sourceTaps.contains( tap ) ) 547 throw new PlannerException( "tap may not be used as both a " + role + " and a source in the same Flow: " + tap ); 548 549 if( sinkTaps.contains( tap ) ) 550 throw new PlannerException( "tap may not be used as both a " + role + " and a sink in the same Flow: " + tap ); 551 } 552 } 553 554 /** 555 * If there are rules for a given {@link cascading.flow.planner.rule.ProcessLevel} on the current platform 556 * there must be sub-graphs partitioned at that level. 557 */ 558 public Exception verifyResult( RuleResult ruleResult ) 559 { 560 try 561 { 562 verifyResultInternal( ruleResult ); 563 } 564 catch( Exception exception ) 565 { 566 return exception; 567 } 568 569 return null; 570 } 571 572 protected void verifyResultInternal( RuleResult ruleResult ) 573 { 574 Set<ProcessLevel> processLevels = getReverseOrderedProcessLevels( ruleResult ); 575 576 for( ProcessLevel processLevel : processLevels ) 577 { 578 String registryName = ruleResult.getRegistry().getName(); 579 580 switch( processLevel ) 581 { 582 case Assembly: 583 584 FlowElementGraph finalFlowElementGraph = ruleResult.getAssemblyGraph(); 585 586 if( finalFlowElementGraph.vertexSet().isEmpty() ) 587 throw new PlannerException( "final assembly graph is empty: " + registryName ); 588 589 break; 590 591 case Step: 592 593 Map<ElementGraph, List<? extends ElementGraph>> assemblyToSteps = ruleResult.getAssemblyToStepGraphMap(); 594 595 if( assemblyToSteps.isEmpty() ) 596 throw new PlannerException( "no steps partitioned: " + registryName ); 597 598 for( ElementGraph assembly : assemblyToSteps.keySet() ) 599 { 600 List<? extends ElementGraph> steps = assemblyToSteps.get( assembly ); 601 602 if( steps.isEmpty() ) 603 throw new PlannerException( "no steps partitioned from assembly: " + registryName, assembly ); 604 605 Set<ElementGraph> stepSet = new HashSet<>( steps.size() ); 606 607 for( ElementGraph step : steps ) 608 { 609 if( !stepSet.add( step ) ) 610 throw new PlannerException( "found duplicate step in flow: " + registryName, step ); 611 } 612 613 Set<FlowElement> elements = createIdentitySet(); 614 615 for( ElementGraph step : steps ) 616 elements.addAll( step.vertexSet() ); 617 618 Set<FlowElement> missing = differenceIdentity( assembly.vertexSet(), elements ); 619 620 if( !missing.isEmpty() ) 621 { 622 String message = "union of steps have " + missing.size() + " fewer elements than parent assembly: " + registryName + ", missing: [" + join( missing, ", " ) + "]"; 623 throw new PlannerException( message, assembly ); 624 } 625 } 626 627 break; 628 629 case Node: 630 631 Map<ElementGraph, List<? extends ElementGraph>> stepToNodes = ruleResult.getStepToNodeGraphMap(); 632 633 if( stepToNodes.isEmpty() ) 634 throw new PlannerException( "no nodes partitioned: " + registryName ); 635 636 for( ElementGraph step : stepToNodes.keySet() ) 637 { 638 List<? extends ElementGraph> nodes = stepToNodes.get( step ); 639 640 if( nodes.isEmpty() ) 641 throw new PlannerException( "no nodes partitioned from step: " + registryName, step ); 642 643 Set<ElementGraph> nodesSet = new HashSet<>( nodes.size() ); 644 645 for( ElementGraph node : nodes ) 646 { 647 if( !nodesSet.add( node ) ) 648 throw new PlannerException( "found duplicate node in step: " + registryName, node ); 649 } 650 651 Set<FlowElement> elements = createIdentitySet(); 652 653 for( ElementGraph node : nodes ) 654 elements.addAll( node.vertexSet() ); 655 656 Set<FlowElement> missing = differenceIdentity( step.vertexSet(), elements ); 657 658 if( !missing.isEmpty() ) 659 { 660 String message = "union of nodes have " + missing.size() + " fewer elements than parent step: " + registryName + ", missing: [" + join( missing, ", " ) + "]"; 661 throw new PlannerException( message, step ); 662 } 663 } 664 665 break; 666 667 case Pipeline: 668 669 // all nodes are partitioned into pipelines, but if partitioned all elements should be represented 670 Map<ElementGraph, List<? extends ElementGraph>> nodeToPipeline = ruleResult.getNodeToPipelineGraphMap(); 671 672 if( nodeToPipeline.isEmpty() ) 673 throw new PlannerException( "no pipelines partitioned: " + registryName ); 674 675 for( ElementGraph node : nodeToPipeline.keySet() ) 676 { 677 List<? extends ElementGraph> pipelines = nodeToPipeline.get( node ); 678 679 if( pipelines.isEmpty() ) 680 throw new PlannerException( "no pipelines partitioned from node: " + registryName, node ); 681 682 Set<ElementGraph> pipelineSet = new HashSet<>( pipelines.size() ); 683 684 for( ElementGraph pipeline : pipelines ) 685 { 686 if( !pipelineSet.add( pipeline ) ) 687 throw new PlannerException( "found duplicate pipeline in node: " + registryName, pipeline ); 688 } 689 690 Set<FlowElement> elements = createIdentitySet(); 691 692 for( ElementGraph pipeline : pipelines ) 693 elements.addAll( pipeline.vertexSet() ); 694 695 Set<FlowElement> missing = differenceIdentity( node.vertexSet(), elements ); 696 697 if( !missing.isEmpty() ) 698 { 699 String message = "union of pipelines have " + missing.size() + " fewer elements than parent node: " + registryName + ", missing: [" + join( missing, ", " ) + "]"; 700 throw new PlannerException( message, node ); 701 } 702 } 703 704 break; 705 } 706 } 707 } 708 709 protected PlannerException handleExceptionDuringPlanning( FlowDef flowDef, Exception exception, FlowElementGraph flowElementGraph ) 710 { 711 if( exception instanceof PlannerException ) 712 { 713 if( ( (PlannerException) exception ).elementGraph == null ) 714 ( (PlannerException) exception ).elementGraph = flowElementGraph; 715 716 return (PlannerException) exception; 717 } 718 else if( exception instanceof ElementGraphException ) 719 { 720 Throwable cause = exception.getCause(); 721 722 if( cause == null ) 723 cause = exception; 724 725 // captures pipegraph for debugging 726 // forward message in case cause or trace is lost 727 String message = String.format( "[%s] could not build flow from assembly", Util.truncate( flowDef.getName(), 25 ) ); 728 729 if( cause.getMessage() != null ) 730 message = String.format( "%s: [%s]", message, cause.getMessage() ); 731 732 if( cause instanceof OperatorException ) 733 return new PlannerException( message, cause, flowElementGraph ); 734 735 if( cause instanceof TapException ) 736 return new PlannerException( message, cause, flowElementGraph ); 737 738 return new PlannerException( ( (ElementGraphException) exception ).getPipe(), message, cause, flowElementGraph ); 739 } 740 else 741 { 742 // captures pipegraph for debugging 743 // forward message in case cause or trace is lost 744 String message = String.format( "[%s] could not build flow from assembly", Util.truncate( flowDef.getName(), 25 ) ); 745 746 if( exception.getMessage() != null ) 747 message = String.format( "%s: [%s]", message, exception.getMessage() ); 748 749 return new PlannerException( message, exception, flowElementGraph ); 750 } 751 } 752 753 public class TempTapElementFactory extends IntermediateTapElementFactory 754 { 755 @Override 756 public FlowElement create( ElementGraph graph, FlowElement flowElement ) 757 { 758 return makeTempTap( (FlowElementGraph) graph, (Pipe) flowElement ); 759 } 760 } 761 762 private Tap makeTempTap( FlowElementGraph graph, Pipe pipe ) 763 { 764 Tap checkpointTap = graph.getCheckpointsMap().get( pipe.getName() ); 765 766 if( checkpointTap != null ) 767 { 768 LOG.info( "found checkpoint: {}, using tap: {}", pipe.getName(), checkpointTap ); 769 checkpointTap = decorateTap( pipe, checkpointTap, FlowConnectorProps.CHECKPOINT_TAP_DECORATOR_CLASS ); 770 } 771 772 if( checkpointTap == null ) 773 { 774 // only restart from a checkpoint pipe or checkpoint tap below 775 if( pipe instanceof Checkpoint ) 776 { 777 checkpointTap = makeTempTap( checkpointTapRootPath, pipe.getName() ); 778 checkpointTap = decorateTap( pipe, checkpointTap, FlowConnectorProps.CHECKPOINT_TAP_DECORATOR_CLASS ); 779 // mark as an anonymous checkpoint 780 checkpointTap.getConfigDef().setProperty( ConfigDef.Mode.DEFAULT, "cascading.checkpoint", "true" ); 781 } 782 else 783 { 784 checkpointTap = makeTempTap( pipe.getName() ); 785 } 786 } 787 788 return decorateTap( pipe, checkpointTap, FlowConnectorProps.TEMPORARY_TAP_DECORATOR_CLASS ); 789 } 790 791 private Tap decorateTap( Pipe pipe, Tap tempTap, String decoratorClass ) 792 { 793 String decoratorClassName = PropertyUtil.getProperty( defaultProperties, pipe, decoratorClass ); 794 795 if( Util.isEmpty( decoratorClassName ) ) 796 return tempTap; 797 798 LOG.info( "found decorator: {}, wrapping tap: {}", decoratorClass, tempTap ); 799 800 tempTap = Util.newInstance( decoratorClassName, tempTap ); 801 802 return tempTap; 803 } 804 805 protected Tap makeTempTap( String name ) 806 { 807 return makeTempTap( null, name ); 808 } 809 810 protected DebugLevel getDebugLevel( FlowDef flowDef ) 811 { 812 return flowDef.getDebugLevel() == null ? this.defaultDebugLevel : flowDef.getDebugLevel(); 813 } 814 815 protected AssertionLevel getAssertionLevel( FlowDef flowDef ) 816 { 817 return flowDef.getAssertionLevel() == null ? this.defaultAssertionLevel : flowDef.getAssertionLevel(); 818 } 819 820 protected abstract Tap makeTempTap( String prefix, String name ); 821 822 private Set<ProcessLevel> getReverseOrderedProcessLevels( RuleResult ruleResult ) 823 { 824 Set<ProcessLevel> ordered = new TreeSet<>( Collections.reverseOrder() ); 825 826 ordered.addAll( ruleResult.getRegistry().getProcessLevels() ); 827 828 return ordered; 829 } 830 }