001/* 002 * Copyright (c) 2016 Chris K Wensel <chris@wensel.net>. All Rights Reserved. 003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 004 * 005 * Project and contact information: http://www.cascading.org/ 006 * 007 * This file is part of the Cascading project. 008 * 009 * Licensed under the Apache License, Version 2.0 (the "License"); 010 * you may not use this file except in compliance with the License. 011 * You may obtain a copy of the License at 012 * 013 * http://www.apache.org/licenses/LICENSE-2.0 014 * 015 * Unless required by applicable law or agreed to in writing, software 016 * distributed under the License is distributed on an "AS IS" BASIS, 017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 018 * See the License for the specific language governing permissions and 019 * limitations under the License. 020 */ 021 022package cascading.flow.planner; 023 024import java.util.Arrays; 025import java.util.Collection; 026import java.util.Collections; 027import java.util.HashSet; 028import java.util.List; 029import java.util.Map; 030import java.util.Set; 031import java.util.TreeSet; 032 033import cascading.flow.AssemblyPlanner; 034import cascading.flow.BaseFlow; 035import cascading.flow.Flow; 036import cascading.flow.FlowConnector; 037import cascading.flow.FlowConnectorProps; 038import cascading.flow.FlowDef; 039import cascading.flow.FlowElement; 040import cascading.flow.Flows; 041import cascading.flow.planner.graph.ElementGraph; 042import cascading.flow.planner.graph.FlowElementGraph; 043import cascading.flow.planner.process.FlowNodeFactory; 044import cascading.flow.planner.process.FlowStepFactory; 045import cascading.flow.planner.process.FlowStepGraph; 046import cascading.flow.planner.rule.ProcessLevel; 047import cascading.flow.planner.rule.RuleRegistry; 048import cascading.flow.planner.rule.RuleRegistrySet; 049import cascading.flow.planner.rule.RuleResult; 050import cascading.flow.planner.rule.RuleSetExec; 051import cascading.flow.planner.rule.transformer.IntermediatePipeElementFactory; 052import cascading.flow.planner.rule.transformer.IntermediateTapElementFactory; 053import cascading.flow.planner.rule.util.TraceWriter; 054import cascading.operation.AssertionLevel; 055import cascading.operation.DebugLevel; 056import cascading.operation.Identity; 057import cascading.pipe.Checkpoint; 058import cascading.pipe.Each; 059import cascading.pipe.OperatorException; 060import cascading.pipe.Pipe; 061import cascading.pipe.SubAssembly; 062import cascading.property.ConfigDef; 063import cascading.property.PropertyUtil; 064import cascading.scheme.Scheme; 065import cascading.tap.Tap; 066import cascading.tap.TapException; 067import cascading.tuple.Fields; 068import cascading.util.Update; 069import cascading.util.Util; 070import org.slf4j.Logger; 071import org.slf4j.LoggerFactory; 072 073import static cascading.util.TraceUtil.formatTraces; 074import static cascading.util.Util.*; 075import static java.util.Arrays.asList; 076 077/** 078 * Class FlowPlanner is the base class for all planner implementations. 079 * <p/> 080 * This planner support tracing execution of each rule. See the appropriate properties on this 081 * class to enable. 082 */ 083public abstract class FlowPlanner<F extends BaseFlow, Config> 084 { 085 /** 086 * Enables the planner to write out basic planner information including the initial element-graph, 087 * completed element-graph, and the completed step-graph dot files. 088 */ 089 public static final String TRACE_PLAN_PATH = "cascading.planner.plan.path"; 090 091 /** 092 * Enables the planner to write out detail level planner information for each rule, including recursive 093 * transforms. 094 * <p/> 095 * Use this to debug rules. This does increase overhead during planning. 096 */ 097 public static final String TRACE_PLAN_TRANSFORM_PATH = "cascading.planner.plan.transforms.path"; 098 099 /** 100 * Enables the planner to write out planner statistics for each planner phase and rule. 101 */ 102 public static final String TRACE_STATS_PATH = "cascading.planner.stats.path"; 103 104 /** Field LOG */ 105 private static final Logger LOG = LoggerFactory.getLogger( FlowPlanner.class ); 106 107 /** Field properties */ 108 protected Map<Object, Object> defaultProperties; 109 110 protected String checkpointTapRootPath = null; 111 112 /** Field assertionLevel */ 113 protected AssertionLevel defaultAssertionLevel; 114 /** Field debugLevel */ 115 protected DebugLevel defaultDebugLevel; 116 117 /** 118 * Method getAssertionLevel returns the configured target planner {@link cascading.operation.AssertionLevel}. 119 * 120 * @param properties of type Map<Object, Object> 121 * @return AssertionLevel the configured AssertionLevel 122 */ 123 static AssertionLevel getAssertionLevel( Map<Object, Object> properties ) 124 { 125 String assertionLevel = PropertyUtil.getProperty( properties, "cascading.flowconnector.assertionlevel", AssertionLevel.STRICT.name() ); 126 127 return AssertionLevel.valueOf( assertionLevel ); 128 } 129 130 /** 131 * Method getDebugLevel returns the configured target planner {@link cascading.operation.DebugLevel}. 132 * 133 * @param properties of type Map<Object, Object> 134 * @return DebugLevel the configured DebugLevel 135 */ 136 static DebugLevel getDebugLevel( Map<Object, Object> properties ) 137 { 138 String debugLevel = PropertyUtil.getProperty( properties, "cascading.flowconnector.debuglevel", DebugLevel.DEFAULT.name() ); 139 140 return DebugLevel.valueOf( debugLevel ); 141 } 142 143 { 144 Update.registerPlanner( getClass() ); 145 } 146 147 public Map<Object, Object> getDefaultProperties() 148 { 149 return defaultProperties; 150 } 151 152 public abstract Config getDefaultConfig(); 153 154 public abstract PlannerInfo getPlannerInfo( String name ); 155 156 public abstract PlatformInfo getPlatformInfo(); 157 158 public void initialize( FlowConnector flowConnector, Map<Object, Object> properties ) 159 { 160 this.defaultProperties = properties; 161 this.defaultAssertionLevel = getAssertionLevel( properties ); 162 this.defaultDebugLevel = getDebugLevel( properties ); 163 } 164 165 public F buildFlow( FlowDef flowDef, RuleRegistrySet ruleRegistrySet ) 166 { 167 FlowElementGraph flowElementGraph = null; 168 169 try 170 { 171 flowDef = normalizeTaps( flowDef ); 172 173 verifyAllTaps( flowDef ); 174 175 F flow = createFlow( flowDef ); 176 177 Pipe[] tails = resolveTails( flowDef, flow ); 178 179 verifyAssembly( flowDef, tails ); 180 181 flowElementGraph = createFlowElementGraph( flowDef, tails ); 182 183 TraceWriter traceWriter = new TraceWriter( flow ); 184 RuleSetExec ruleSetExec = new RuleSetExec( traceWriter, this, flow, ruleRegistrySet, flowDef, flowElementGraph ); 185 186 RuleResult ruleResult = ruleSetExec.exec(); 187 188 traceWriter.writeTracePlan( null, "0-initial-flow-element-graph", flowElementGraph ); 189 190 FlowElementGraph finalFlowElementGraph = ruleResult.getAssemblyGraph(); 191 192 finalFlowElementGraph = flow.updateSchemes( finalFlowElementGraph ); 193 194 Map<ElementGraph, List<? extends ElementGraph>> stepToNodes = ruleResult.getStepToNodeGraphMap(); 195 Map<ElementGraph, List<? extends ElementGraph>> nodeToPipeline = ruleResult.getNodeToPipelineGraphMap(); 196 197 FlowStepGraph flowStepGraph = new FlowStepGraph( getFlowStepFactory(), finalFlowElementGraph, stepToNodes, nodeToPipeline ); 198 199 traceWriter.writeFinal( "1-final-flow-registry", ruleResult ); 200 traceWriter.writeTracePlan( null, "2-final-flow-element-graph", finalFlowElementGraph ); 201 traceWriter.writeTracePlan( null, "3-final-flow-step-graph", flowStepGraph ); 202 traceWriter.writeTracePlanSteps( "4-final-flow-steps", flowStepGraph ); 203 204 flow.setPlannerInfo( getPlannerInfo( ruleResult.getRegistry().getName() ) ); 205 206 flow.initialize( finalFlowElementGraph, flowStepGraph ); 207 208 return flow; 209 } 210 catch( Exception exception ) 211 { 212 throw handleExceptionDuringPlanning( flowDef, exception, flowElementGraph ); 213 } 214 } 215 216 protected abstract F createFlow( FlowDef flowDef ); 217 218 public abstract FlowStepFactory<Config> getFlowStepFactory(); 219 220 public FlowNodeFactory getFlowNodeFactory() 221 { 222 return new BaseFlowNodeFactory(); 223 } 224 225 public void configRuleRegistryDefaults( RuleRegistry ruleRegistry ) 226 { 227 ruleRegistry.addDefaultElementFactory( IntermediatePipeElementFactory.IDENTITY, new IdentityElementFactgory() ); 228 } 229 230 protected Pipe[] resolveTails( FlowDef flowDef, F flow ) 231 { 232 Pipe[] tails = flowDef.getTailsArray(); 233 234 tails = resolveAssemblyPlanners( flowDef, flow, tails ); 235 236 return tails; 237 } 238 239 protected Pipe[] resolveAssemblyPlanners( FlowDef flowDef, Flow flow, Pipe[] pipes ) 240 { 241 List<Pipe> tails = Arrays.asList( pipes ); 242 243 List<AssemblyPlanner> assemblyPlanners = flowDef.getAssemblyPlanners(); 244 245 for( AssemblyPlanner assemblyPlanner : assemblyPlanners ) 246 { 247 tails = assemblyPlanner.resolveTails( new AssemblyPlannerContext( flowDef, flow, tails ) ); 248 249 if( tails.isEmpty() ) 250 throw new PlannerException( "assembly planner: " + assemblyPlanner + ", returned zero tails" ); 251 252 tails = Collections.unmodifiableList( tails ); 253 } 254 255 return tails.toArray( new Pipe[ tails.size() ] ); 256 } 257 258 protected void verifyAssembly( FlowDef flowDef, Pipe[] tails ) 259 { 260 verifyPipeAssemblyEndPoints( flowDef, tails ); 261 verifyTraps( flowDef, tails ); 262 verifyCheckpoints( flowDef, tails ); 263 } 264 265 protected void verifyAllTaps( FlowDef flowDef ) 266 { 267 verifySourceNotSinks( flowDef.getSources(), flowDef.getSinks() ); 268 269 verifyTaps( flowDef.getSources(), true, true ); 270 verifyTaps( flowDef.getSinks(), false, true ); 271 verifyTaps( flowDef.getTraps(), false, false ); 272 273 // are both sources and sinks 274 verifyTaps( flowDef.getCheckpoints(), true, false ); 275 verifyTaps( flowDef.getCheckpoints(), false, false ); 276 } 277 278 protected FlowElementGraph createFlowElementGraph( FlowDef flowDef, Pipe[] flowTails ) 279 { 280 Map<String, Tap> sources = flowDef.getSourcesCopy(); 281 Map<String, Tap> sinks = flowDef.getSinksCopy(); 282 Map<String, Tap> traps = flowDef.getTrapsCopy(); 283 Map<String, Tap> checkpoints = flowDef.getCheckpointsCopy(); 284 285 checkpointTapRootPath = makeCheckpointRootPath( flowDef ); 286 287 return new FlowElementGraph( getPlatformInfo(), flowTails, sources, sinks, traps, checkpoints, checkpointTapRootPath != null ); 288 } 289 290 private FlowDef normalizeTaps( FlowDef flowDef ) 291 { 292 Set<Tap> taps = new HashSet<>(); 293 294 Map<String, Tap> sources = flowDef.getSourcesCopy(); 295 Map<String, Tap> sinks = flowDef.getSinksCopy(); 296 Map<String, Tap> traps = flowDef.getTrapsCopy(); 297 Map<String, Tap> checkpoints = flowDef.getCheckpointsCopy(); 298 299 boolean sourcesHasDupes = addTaps( sources, taps ); 300 boolean sinksHasDupes = addTaps( sinks, taps ); 301 boolean trapsHasDupes = addTaps( traps, taps ); 302 boolean checkpointsHasDupes = addTaps( checkpoints, taps ); 303 304 if( sourcesHasDupes ) 305 normalize( taps, sources ); 306 307 if( sinksHasDupes ) 308 normalize( taps, sinks ); 309 310 if( trapsHasDupes ) 311 normalize( taps, traps ); 312 313 if( checkpointsHasDupes ) 314 normalize( taps, checkpoints ); 315 316 return Flows.copy( flowDef, sources, sinks, traps, checkpoints ); 317 } 318 319 private boolean addTaps( Map<String, Tap> current, Set<Tap> taps ) 320 { 321 int size = taps.size(); 322 323 taps.addAll( current.values() ); 324 325 // if all the added values are not unique, taps.size will be less than original size + num tap instances 326 return size + current.size() != taps.size(); 327 } 328 329 private void normalize( Set<Tap> taps, Map<String, Tap> current ) 330 { 331 for( Tap tap : taps ) 332 { 333 for( Map.Entry<String, Tap> entry : current.entrySet() ) 334 { 335 if( entry.getValue().equals( tap ) ) // force equivalent instance to being the same instance 336 entry.setValue( tap ); 337 } 338 } 339 } 340 341 private String makeCheckpointRootPath( FlowDef flowDef ) 342 { 343 String flowName = flowDef.getName(); 344 String runID = flowDef.getRunID(); 345 346 if( runID == null ) 347 return null; 348 349 if( flowName == null ) 350 throw new PlannerException( "flow name is required when providing a run id" ); 351 352 return flowName + "/" + runID; 353 } 354 355 protected void verifySourceNotSinks( Map<String, Tap> sources, Map<String, Tap> sinks ) 356 { 357 Collection<Tap> sourcesSet = sources.values(); 358 359 for( Tap tap : sinks.values() ) 360 { 361 if( sourcesSet.contains( tap ) ) 362 throw new PlannerException( "tap may not be used as both source and sink in the same Flow: " + tap ); 363 } 364 } 365 366 /** 367 * Method verifyTaps ... 368 * 369 * @param taps of type Map<String, Tap> 370 * @param areSources of type boolean 371 * @param mayNotBeEmpty of type boolean 372 */ 373 protected void verifyTaps( Map<String, Tap> taps, boolean areSources, boolean mayNotBeEmpty ) 374 { 375 if( mayNotBeEmpty && taps.isEmpty() ) 376 throw new PlannerException( ( areSources ? "source" : "sink" ) + " taps are required" ); 377 378 for( String tapName : taps.keySet() ) 379 { 380 if( areSources && !taps.get( tapName ).isSource() ) 381 throw new PlannerException( "tap named: '" + tapName + "', cannot be used as a source: " + taps.get( tapName ) ); 382 else if( !areSources && !taps.get( tapName ).isSink() ) 383 throw new PlannerException( "tap named: '" + tapName + "', cannot be used as a sink: " + taps.get( tapName ) ); 384 } 385 } 386 387 /** 388 * Method verifyEndPoints verifies 389 * <p/> 390 * there aren't dupe names in heads or tails. 391 * all the sink and source tap names match up with tail and head pipes 392 */ 393 // todo: force dupe names to throw exceptions 394 protected void verifyPipeAssemblyEndPoints( FlowDef flowDef, Pipe[] flowTails ) 395 { 396 Set<String> tapNames = new HashSet<String>(); 397 398 tapNames.addAll( flowDef.getSources().keySet() ); 399 tapNames.addAll( flowDef.getSinks().keySet() ); 400 401 // handle tails 402 Set<Pipe> tails = new HashSet<Pipe>(); 403 Set<String> tailNames = new HashSet<String>(); 404 405 for( Pipe pipe : flowTails ) 406 { 407 if( pipe instanceof SubAssembly ) 408 { 409 for( Pipe tail : ( (SubAssembly) pipe ).getTails() ) 410 { 411 String tailName = tail.getName(); 412 413 if( !tapNames.contains( tailName ) ) 414 throw new PlannerException( tail, "pipe name not found in either sink or source map: '" + tailName + "'" ); 415 416 if( tailNames.contains( tailName ) && !tails.contains( tail ) ) 417 throw new PlannerException( pipe, "duplicate tail name found: " + tailName ); 418 419 tailNames.add( tailName ); 420 tails.add( tail ); 421 } 422 } 423 else 424 { 425 String tailName = pipe.getName(); 426 427 if( !tapNames.contains( tailName ) ) 428 throw new PlannerException( pipe, "pipe name not found in either sink or source map: '" + tailName + "'" ); 429 430 if( tailNames.contains( tailName ) && !tails.contains( pipe ) ) 431 throw new PlannerException( pipe, "duplicate tail name found: " + tailName ); 432 433 tailNames.add( tailName ); 434 tails.add( pipe ); 435 } 436 } 437 438 tailNames.removeAll( flowDef.getSinks().keySet() ); 439 Set<String> remainingSinks = new HashSet<String>( flowDef.getSinks().keySet() ); 440 remainingSinks.removeAll( tailNames ); 441 442 if( tailNames.size() != 0 ) 443 throw new PlannerException( "not all tail pipes bound to sink taps, remaining tail pipe names: [" + join( quote( tailNames, "'" ), ", " ) + "], remaining sink tap names: [" + join( quote( remainingSinks, "'" ), ", " ) + "]" ); 444 445 // unlike heads, pipes can input to another pipe and simultaneously be a sink 446 // so there is no way to know all the intentional tails, so they aren't listed below in the exception 447 remainingSinks = new HashSet<String>( flowDef.getSinks().keySet() ); 448 remainingSinks.removeAll( asList( Pipe.names( flowTails ) ) ); 449 450 if( remainingSinks.size() != 0 ) 451 throw new PlannerException( "not all sink taps bound to tail pipes, remaining sink tap names: [" + join( quote( remainingSinks, "'" ), ", " ) + "]" ); 452 453 // handle heads 454 Set<Pipe> heads = new HashSet<Pipe>(); 455 Set<String> headNames = new HashSet<String>(); 456 457 for( Pipe pipe : flowTails ) 458 { 459 for( Pipe head : pipe.getHeads() ) 460 { 461 String headName = head.getName(); 462 463 if( !tapNames.contains( headName ) ) 464 throw new PlannerException( head, "pipe name not found in either sink or source map: '" + headName + "'" ); 465 466 if( headNames.contains( headName ) && !heads.contains( head ) ) 467 LOG.warn( "duplicate head name found, not an error but heads should have unique names: '{}'", headName ); 468 469 headNames.add( headName ); 470 heads.add( head ); 471 } 472 } 473 474 Set<String> allHeadNames = new HashSet<String>( headNames ); 475 headNames.removeAll( flowDef.getSources().keySet() ); 476 Set<String> remainingSources = new HashSet<String>( flowDef.getSources().keySet() ); 477 remainingSources.removeAll( headNames ); 478 479 if( headNames.size() != 0 ) 480 throw new PlannerException( "not all head pipes bound to source taps, remaining head pipe names: [" + join( quote( headNames, "'" ), ", " ) + "], remaining source tap names: [" + join( quote( remainingSources, "'" ), ", " ) + "]" ); 481 482 remainingSources = new HashSet<String>( flowDef.getSources().keySet() ); 483 remainingSources.removeAll( allHeadNames ); 484 485 if( remainingSources.size() != 0 ) 486 throw new PlannerException( "not all source taps bound to head pipes, remaining source tap names: [" + join( quote( remainingSources, "'" ), ", " ) + "], remaining head pipe names: [" + join( quote( headNames, "'" ), ", " ) + "]" ); 487 488 } 489 490 protected void verifyTraps( FlowDef flowDef, Pipe[] flowTails ) 491 { 492 verifyNotSourcesSinks( flowDef.getTraps(), flowDef.getSources(), flowDef.getSinks(), "trap" ); 493 494 Set<String> names = new HashSet<String>( asList( Pipe.names( flowTails ) ) ); 495 496 for( String name : flowDef.getTraps().keySet() ) 497 { 498 if( !names.contains( name ) ) 499 throw new PlannerException( "trap name not found in assembly: '" + name + "'" ); 500 } 501 } 502 503 protected void verifyCheckpoints( FlowDef flowDef, Pipe[] flowTails ) 504 { 505 verifyNotSourcesSinks( flowDef.getCheckpoints(), flowDef.getSources(), flowDef.getSinks(), "checkpoint" ); 506 507 for( Tap checkpointTap : flowDef.getCheckpoints().values() ) 508 { 509 Scheme scheme = checkpointTap.getScheme(); 510 511 if( scheme.getSourceFields().equals( Fields.UNKNOWN ) && scheme.getSinkFields().equals( Fields.ALL ) ) 512 continue; 513 514 throw new PlannerException( "checkpoint tap scheme must be undeclared, source fields must be UNKNOWN, and sink fields ALL, got: " + scheme.toString() ); 515 } 516 517 Set<String> names = new HashSet<String>( asList( Pipe.names( flowTails ) ) ); 518 519 for( String name : flowDef.getCheckpoints().keySet() ) 520 { 521 if( !names.contains( name ) ) 522 throw new PlannerException( "named checkpoint declared in FlowDef, but no named branch found in pipe assembly: '" + name + "'" ); 523 524 Set<Pipe> pipes = new HashSet<Pipe>( asList( Pipe.named( name, flowTails ) ) ); 525 526 int count = 0; 527 528 for( Pipe pipe : pipes ) 529 { 530 if( pipe instanceof Checkpoint ) 531 count++; 532 } 533 534 if( count == 0 ) 535 throw new PlannerException( "no checkpoint pipe with branch name found in pipe assembly: '" + name + "'" ); 536 537 if( count > 1 ) 538 throw new PlannerException( "more than one checkpoint pipe with branch name found in pipe assembly: '" + name + "'" ); 539 } 540 } 541 542 private void verifyNotSourcesSinks( Map<String, Tap> taps, Map<String, Tap> sources, Map<String, Tap> sinks, String role ) 543 { 544 Collection<Tap> sourceTaps = sources.values(); 545 Collection<Tap> sinkTaps = sinks.values(); 546 547 for( Tap tap : taps.values() ) 548 { 549 if( sourceTaps.contains( tap ) ) 550 throw new PlannerException( "tap may not be used as both a " + role + " and a source in the same Flow: " + tap ); 551 552 if( sinkTaps.contains( tap ) ) 553 throw new PlannerException( "tap may not be used as both a " + role + " and a sink in the same Flow: " + tap ); 554 } 555 } 556 557 /** 558 * If there are rules for a given {@link cascading.flow.planner.rule.ProcessLevel} on the current platform 559 * there must be sub-graphs partitioned at that level. 560 */ 561 public Exception verifyResult( RuleResult ruleResult ) 562 { 563 try 564 { 565 verifyResultInternal( ruleResult ); 566 } 567 catch( Exception exception ) 568 { 569 return exception; 570 } 571 572 return null; 573 } 574 575 protected void verifyResultInternal( RuleResult ruleResult ) 576 { 577 Set<ProcessLevel> processLevels = getReverseOrderedProcessLevels( ruleResult ); 578 579 for( ProcessLevel processLevel : processLevels ) 580 { 581 String registryName = ruleResult.getRegistry().getName(); 582 583 switch( processLevel ) 584 { 585 case Assembly: 586 587 FlowElementGraph finalFlowElementGraph = ruleResult.getAssemblyGraph(); 588 589 if( finalFlowElementGraph.vertexSet().isEmpty() ) 590 throw new PlannerException( "final assembly graph is empty: " + registryName ); 591 592 break; 593 594 case Step: 595 596 Map<ElementGraph, List<? extends ElementGraph>> assemblyToSteps = ruleResult.getAssemblyToStepGraphMap(); 597 598 if( assemblyToSteps.isEmpty() ) 599 throw new PlannerException( "no steps partitioned: " + registryName ); 600 601 for( ElementGraph assembly : assemblyToSteps.keySet() ) 602 { 603 List<? extends ElementGraph> steps = assemblyToSteps.get( assembly ); 604 605 if( steps.isEmpty() ) 606 throw new PlannerException( "no steps partitioned from assembly: " + registryName, assembly ); 607 608 Set<ElementGraph> stepSet = new HashSet<>( steps.size() ); 609 610 for( ElementGraph step : steps ) 611 { 612 if( !stepSet.add( step ) ) 613 throw new PlannerException( "found duplicate step in flow: " + registryName, step ); 614 } 615 616 Set<FlowElement> elements = createIdentitySet(); 617 618 for( ElementGraph step : steps ) 619 elements.addAll( step.vertexSet() ); 620 621 Set<FlowElement> missing = differenceIdentity( assembly.vertexSet(), elements ); 622 623 if( !missing.isEmpty() ) 624 { 625 String message = "union of steps have " + missing.size() + " fewer elements than parent assembly: " + registryName + ", missing: [" + formatTraces( missing, ", " ) + "]"; 626 throw new PlannerException( message, assembly ); 627 } 628 } 629 630 break; 631 632 case Node: 633 634 Map<ElementGraph, List<? extends ElementGraph>> stepToNodes = ruleResult.getStepToNodeGraphMap(); 635 636 if( stepToNodes.isEmpty() ) 637 throw new PlannerException( "no nodes partitioned: " + registryName ); 638 639 for( ElementGraph step : stepToNodes.keySet() ) 640 { 641 List<? extends ElementGraph> nodes = stepToNodes.get( step ); 642 643 if( nodes.isEmpty() ) 644 throw new PlannerException( "no nodes partitioned from step: " + registryName, step ); 645 646 Set<ElementGraph> nodesSet = new HashSet<>( nodes.size() ); 647 648 for( ElementGraph node : nodes ) 649 { 650 if( !nodesSet.add( node ) ) 651 throw new PlannerException( "found duplicate node in step: " + registryName, node ); 652 } 653 654 Set<FlowElement> elements = createIdentitySet(); 655 656 for( ElementGraph node : nodes ) 657 elements.addAll( node.vertexSet() ); 658 659 Set<FlowElement> missing = differenceIdentity( step.vertexSet(), elements ); 660 661 if( !missing.isEmpty() ) 662 { 663 String message = "union of nodes have " + missing.size() + " fewer elements than parent step: " + registryName + ", missing: [" + formatTraces( missing, ", " ) + "]"; 664 throw new PlannerException( message, step ); 665 } 666 } 667 668 break; 669 670 case Pipeline: 671 672 // all nodes are partitioned into pipelines, but if partitioned all elements should be represented 673 Map<ElementGraph, List<? extends ElementGraph>> nodeToPipeline = ruleResult.getNodeToPipelineGraphMap(); 674 675 if( nodeToPipeline.isEmpty() ) 676 throw new PlannerException( "no pipelines partitioned: " + registryName ); 677 678 for( ElementGraph node : nodeToPipeline.keySet() ) 679 { 680 List<? extends ElementGraph> pipelines = nodeToPipeline.get( node ); 681 682 if( pipelines.isEmpty() ) 683 throw new PlannerException( "no pipelines partitioned from node: " + registryName, node ); 684 685 Set<ElementGraph> pipelineSet = new HashSet<>( pipelines.size() ); 686 687 for( ElementGraph pipeline : pipelines ) 688 { 689 if( !pipelineSet.add( pipeline ) ) 690 throw new PlannerException( "found duplicate pipeline in node: " + registryName, pipeline ); 691 } 692 693 Set<FlowElement> elements = createIdentitySet(); 694 695 for( ElementGraph pipeline : pipelines ) 696 elements.addAll( pipeline.vertexSet() ); 697 698 Set<FlowElement> missing = differenceIdentity( node.vertexSet(), elements ); 699 700 if( !missing.isEmpty() ) 701 { 702 String message = "union of pipelines have " + missing.size() + " fewer elements than parent node: " + registryName + ", missing: [" + formatTraces( missing, ", " ) + "]"; 703 throw new PlannerException( message, node ); 704 } 705 } 706 707 break; 708 } 709 } 710 } 711 712 protected PlannerException handleExceptionDuringPlanning( FlowDef flowDef, Exception exception, FlowElementGraph flowElementGraph ) 713 { 714 if( exception instanceof PlannerException ) 715 { 716 if( ( (PlannerException) exception ).elementGraph == null ) 717 ( (PlannerException) exception ).elementGraph = flowElementGraph; 718 719 return (PlannerException) exception; 720 } 721 else if( exception instanceof ElementGraphException ) 722 { 723 Throwable cause = exception.getCause(); 724 725 if( cause == null ) 726 cause = exception; 727 728 // captures pipegraph for debugging 729 // forward message in case cause or trace is lost 730 String message = String.format( "[%s] could not build flow from assembly", Util.truncate( flowDef.getName(), 25 ) ); 731 732 if( cause.getMessage() != null ) 733 message = String.format( "%s: [%s]", message, cause.getMessage() ); 734 735 if( cause instanceof OperatorException ) 736 return new PlannerException( message, cause, flowElementGraph ); 737 738 if( cause instanceof TapException ) 739 return new PlannerException( message, cause, flowElementGraph ); 740 741 return new PlannerException( ( (ElementGraphException) exception ).getPipe(), message, cause, flowElementGraph ); 742 } 743 else 744 { 745 // captures pipegraph for debugging 746 // forward message in case cause or trace is lost 747 String message = String.format( "[%s] could not build flow from assembly", Util.truncate( flowDef.getName(), 25 ) ); 748 749 if( exception.getMessage() != null ) 750 message = String.format( "%s: [%s]", message, exception.getMessage() ); 751 752 return new PlannerException( message, exception, flowElementGraph ); 753 } 754 } 755 756 public class IdentityElementFactgory extends IntermediatePipeElementFactory 757 { 758 @Override 759 public FlowElement create( ElementGraph graph, FlowElement flowElement ) 760 { 761 return new Each( (Pipe) flowElement, Fields.ALL, new Identity( Fields.ALL ), Fields.RESULTS ); 762 } 763 } 764 765 public class TempTapElementFactory extends IntermediateTapElementFactory 766 { 767 private String defaultDecoratorClassName; 768 769 public TempTapElementFactory() 770 { 771 } 772 773 public TempTapElementFactory( String defaultDecoratorClassName ) 774 { 775 this.defaultDecoratorClassName = defaultDecoratorClassName; 776 } 777 778 @Override 779 public FlowElement create( ElementGraph graph, FlowElement flowElement ) 780 { 781 if( flowElement instanceof Pipe ) 782 return makeTempTap( (FlowElementGraph) graph, (Pipe) flowElement, defaultDecoratorClassName ); 783 784 if( flowElement instanceof Tap ) 785 return decorateTap( null, (Tap) flowElement, FlowConnectorProps.TEMPORARY_TAP_DECORATOR_CLASS, defaultDecoratorClassName ); 786 787 throw new IllegalStateException( "unknown flow element type: " + flowElement ); 788 } 789 } 790 791 private Tap makeTempTap( FlowElementGraph graph, Pipe pipe, String defaultDecoratorClassName ) 792 { 793 Tap checkpointTap = graph.getCheckpointsMap().get( pipe.getName() ); 794 795 if( checkpointTap != null ) 796 { 797 LOG.info( "found checkpoint: {}, using tap: {}", pipe.getName(), checkpointTap ); 798 checkpointTap = decorateTap( pipe, checkpointTap, FlowConnectorProps.CHECKPOINT_TAP_DECORATOR_CLASS, null ); 799 } 800 801 if( checkpointTap == null ) 802 { 803 // only restart from a checkpoint pipe or checkpoint tap below 804 if( pipe instanceof Checkpoint ) 805 { 806 checkpointTap = makeTempTap( checkpointTapRootPath, pipe.getName() ); 807 checkpointTap = decorateTap( pipe, checkpointTap, FlowConnectorProps.CHECKPOINT_TAP_DECORATOR_CLASS, null ); 808 // mark as an anonymous checkpoint 809 checkpointTap.getConfigDef().setProperty( ConfigDef.Mode.DEFAULT, "cascading.checkpoint", "true" ); 810 } 811 else 812 { 813 checkpointTap = makeTempTap( pipe.getName() ); 814 } 815 } 816 817 return decorateTap( pipe, checkpointTap, FlowConnectorProps.TEMPORARY_TAP_DECORATOR_CLASS, defaultDecoratorClassName ); 818 } 819 820 private Tap decorateTap( Pipe pipe, Tap tempTap, String decoratorClassProp, String defaultDecoratorClassName ) 821 { 822 String decoratorClassName = PropertyUtil.getProperty( defaultProperties, pipe, decoratorClassProp ); 823 824 if( Util.isEmpty( decoratorClassName ) ) 825 decoratorClassName = defaultDecoratorClassName; 826 827 if( Util.isEmpty( decoratorClassName ) ) 828 return tempTap; 829 830 LOG.info( "found decorator property: {}, with value: {}, wrapping tap: {}", decoratorClassProp, decoratorClassName, tempTap ); 831 832 tempTap = Util.newInstance( decoratorClassName, tempTap ); 833 834 return tempTap; 835 } 836 837 protected Tap makeTempTap( String name ) 838 { 839 return makeTempTap( null, name ); 840 } 841 842 protected DebugLevel getDebugLevel( FlowDef flowDef ) 843 { 844 return flowDef.getDebugLevel() == null ? this.defaultDebugLevel : flowDef.getDebugLevel(); 845 } 846 847 protected AssertionLevel getAssertionLevel( FlowDef flowDef ) 848 { 849 return flowDef.getAssertionLevel() == null ? this.defaultAssertionLevel : flowDef.getAssertionLevel(); 850 } 851 852 protected abstract Tap makeTempTap( String prefix, String name ); 853 854 private Set<ProcessLevel> getReverseOrderedProcessLevels( RuleResult ruleResult ) 855 { 856 Set<ProcessLevel> ordered = new TreeSet<>( Collections.reverseOrder() ); 857 858 ordered.addAll( ruleResult.getRegistry().getProcessLevels() ); 859 860 return ordered; 861 } 862 }