001/* 002 * Copyright (c) 2016 Chris K Wensel <chris@wensel.net>. All Rights Reserved. 003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 004 * 005 * Project and contact information: http://www.cascading.org/ 006 * 007 * This file is part of the Cascading project. 008 * 009 * Licensed under the Apache License, Version 2.0 (the "License"); 010 * you may not use this file except in compliance with the License. 011 * You may obtain a copy of the License at 012 * 013 * http://www.apache.org/licenses/LICENSE-2.0 014 * 015 * Unless required by applicable law or agreed to in writing, software 016 * distributed under the License is distributed on an "AS IS" BASIS, 017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 018 * See the License for the specific language governing permissions and 019 * limitations under the License. 020 */ 021 022package cascading.flow.planner.rule; 023 024import java.util.ArrayList; 025import java.util.LinkedHashMap; 026import java.util.LinkedList; 027import java.util.List; 028import java.util.Map; 029import java.util.Set; 030 031import cascading.flow.FlowElement; 032import cascading.flow.planner.PlannerContext; 033import cascading.flow.planner.PlannerException; 034import cascading.flow.planner.graph.AnnotatedGraph; 035import cascading.flow.planner.graph.BoundedElementMultiGraph; 036import cascading.flow.planner.graph.ElementGraph; 037import cascading.flow.planner.graph.ElementGraphs; 038import cascading.flow.planner.graph.ElementMultiGraph; 039import cascading.flow.planner.graph.FlowElementGraph; 040import cascading.flow.planner.graph.IgnoreAnnotationsHashSet; 041import cascading.flow.planner.iso.GraphResult; 042import cascading.flow.planner.iso.assertion.Asserted; 043import cascading.flow.planner.iso.assertion.GraphAssert; 044import cascading.flow.planner.iso.subgraph.Partitions; 045import cascading.flow.planner.iso.transformer.GraphTransformer; 046import cascading.flow.planner.iso.transformer.Transformed; 047import cascading.flow.planner.rule.util.TraceWriter; 048import cascading.util.EnumMultiMap; 049import cascading.util.ProcessLogger; 050 051import static cascading.util.Util.createIdentitySet; 052import static cascading.util.Util.formatDurationFromMillis; 053import static java.lang.String.format; 054 055/** 056 * 057 */ 058public class RuleExec 059 { 060 private static final int ELEMENT_THRESHOLD = 600; 061 062 final TraceWriter traceWriter; 063 final RuleRegistry registry; 064 065 public RuleExec( TraceWriter traceWriter, RuleRegistry registry ) 066 { 067 this.traceWriter = traceWriter; 068 this.registry = registry; 069 } 070 071 public RuleResult exec( PlannerContext plannerContext, FlowElementGraph flowElementGraph ) 072 { 073 RuleResult ruleResult = new RuleResult( registry, flowElementGraph ); 074 075 ProcessLogger logger = plannerContext.getLogger(); 076 int size = flowElementGraph.vertexSet().size(); 077 boolean logAsInfo = size >= ELEMENT_THRESHOLD; 078 079 if( logAsInfo ) 080 logger.logInfo( "elements in graph: {}, info logging threshold: {}, logging planner execution status", size, ELEMENT_THRESHOLD ); 081 082 long beginExec = System.currentTimeMillis(); 083 084 try 085 { 086 planPhases( plannerContext, logAsInfo, ruleResult ); 087 } 088 catch( Exception exception ) 089 { 090 ruleResult.setPlannerException( exception ); 091 } 092 finally 093 { 094 long endExec = System.currentTimeMillis(); 095 096 ruleResult.setDuration( beginExec, endExec ); 097 098 RuleResult.ResultStatus status = ruleResult.getResultStatus(); 099 String duration = formatDurationFromMillis( endExec - beginExec ); 100 logPhase( logger, logAsInfo, "rule registry completed: {}, with status: {}, and duration: {}", registry.getName(), status, duration ); 101 } 102 103 return ruleResult; 104 } 105 106 protected void planPhases( PlannerContext plannerContext, boolean logAsInfo, RuleResult ruleResult ) 107 { 108 ProcessLogger logger = plannerContext.getLogger(); 109 110 for( PlanPhase phase : PlanPhase.values() ) // iterate in order, all planner phases 111 { 112 long beginPhase = System.currentTimeMillis(); 113 114 logPhase( logger, logAsInfo, "starting rule phase: {}", phase ); 115 116 try 117 { 118 switch( phase.getAction() ) 119 { 120 case Resolve: 121 resolveElements( ruleResult ); 122 break; 123 124 case Rule: 125 executeRulePhase( phase, plannerContext, ruleResult ); 126 break; 127 } 128 } 129 finally 130 { 131 long endPhase = System.currentTimeMillis(); 132 133 ruleResult.setPhaseDuration( phase, beginPhase, endPhase ); 134 135 logPhase( logger, logAsInfo, "ending rule phase: {}, duration: {}", phase, formatDurationFromMillis( endPhase - beginPhase ) ); 136 } 137 } 138 } 139 140 private void resolveElements( RuleResult ruleResult ) 141 { 142 if( !registry.enabledResolveElements() ) 143 return; 144 145 FlowElementGraph elementGraph = ruleResult.getAssemblyGraph(); 146 147 elementGraph = (FlowElementGraph) elementGraph.copyElementGraph(); 148 149 ScopeResolver.resolveFields( elementGraph ); 150 151 ruleResult.setLevelResults( ProcessLevel.Assembly, ruleResult.initialAssembly, elementGraph ); 152 } 153 154 public RuleResult executeRulePhase( PlanPhase phase, PlannerContext plannerContext, RuleResult ruleResult ) 155 { 156 ProcessLogger logger = plannerContext.getLogger(); 157 158 logger.logDebug( "executing plan phase: {}", phase ); 159 160 LinkedList<Rule> rules = registry.getRulesFor( phase ); 161 162 writePhaseInitPlan( phase, ruleResult ); 163 164 try 165 { 166 // within this phase, execute all rules in declared order 167 for( Rule rule : rules ) 168 { 169 logger.logDebug( "executing rule: {}", rule ); 170 171 long begin = System.currentTimeMillis(); 172 173 try 174 { 175 switch( phase.getMode() ) 176 { 177 case Mutate: 178 performMutation( plannerContext, ruleResult, phase, rule ); 179 break; 180 181 case Partition: 182 performPartition( plannerContext, ruleResult, phase, rule ); 183 break; 184 } 185 } 186 catch( UnsupportedPlanException exception ) 187 { 188 logger.logDebug( "executing rule failed: {}, message: {}", rule, exception.getMessage() ); 189 190 throw new UnsupportedPlanException( rule, exception ); 191 } 192 catch( PlannerException exception ) 193 { 194 logger.logDebug( "executing rule failed: {}, message: {}", rule, exception.getMessage() ); 195 196 throw exception; // rethrow 197 } 198 catch( Exception exception ) 199 { 200 logger.logDebug( "executing rule failed: {}, message: {}", rule, exception.getMessage() ); 201 202 throw new PlannerException( registry, phase, rule, exception ); 203 } 204 finally 205 { 206 long end = System.currentTimeMillis(); 207 208 ruleResult.setRuleDuration( rule, begin, end ); 209 210 logger.logDebug( "completed rule: {}", rule ); 211 } 212 } 213 214 return ruleResult; 215 } 216 finally 217 { 218 logger.logDebug( "completed plan phase: {}", phase ); 219 writePhaseResultPlan( phase, ruleResult ); 220 } 221 } 222 223 protected void performMutation( PlannerContext plannerContext, RuleResult ruleResult, PlanPhase phase, Rule rule ) 224 { 225 if( rule instanceof GraphTransformer ) 226 performTransform( plannerContext, ruleResult, phase, (GraphTransformer) rule ); 227 else if( rule instanceof GraphAssert ) 228 performAssertion( plannerContext, ruleResult, phase, (GraphAssert) rule ); 229 else 230 throw new PlannerException( "unexpected rule: " + rule.getRuleName() ); 231 } 232 233 private void performPartition( PlannerContext plannerContext, RuleResult ruleResult, PlanPhase phase, Rule rule ) 234 { 235 if( !( rule instanceof RulePartitioner ) ) 236 throw new PlannerException( "unexpected rule: " + rule.getRuleName() ); 237 238 RulePartitioner partitioner = (RulePartitioner) rule; 239 240 if( partitioner.getPartitionSource() == RulePartitioner.PartitionSource.PartitionParent ) 241 handleParentPartitioning( plannerContext, ruleResult, phase, partitioner ); 242 else if( partitioner.getPartitionSource() == RulePartitioner.PartitionSource.PartitionCurrent ) 243 handleCurrentPartitioning( plannerContext, ruleResult, phase, partitioner ); 244 else 245 throw new IllegalStateException( "unknown partitioning type: " + partitioner.getPartitionSource() ); 246 } 247 248 private void handleCurrentPartitioning( PlannerContext plannerContext, RuleResult ruleResult, PlanPhase phase, RulePartitioner partitioner ) 249 { 250 Map<ElementGraph, List<? extends ElementGraph>> priorResults = ruleResult.getLevelResults( phase.getLevel() ); 251 252 Map<ElementGraph, List<? extends ElementGraph>> subGraphs = new LinkedHashMap<>(); 253 254 for( Map.Entry<ElementGraph, List<? extends ElementGraph>> entry : priorResults.entrySet() ) 255 { 256 ElementGraph parent = entry.getKey(); 257 List<? extends ElementGraph> priors = entry.getValue(); 258 259 List<ElementGraph> resultChildren = new ArrayList<>( priors ); 260 261 Set<FlowElement> exclusions = getExclusions( priors, partitioner.getAnnotationExcludes() ); 262 263 for( ElementGraph child : priors ) 264 { 265 ElementGraph priorAnnotated = annotateWithPriors( child, priors ); 266 267 Partitions partitions; 268 269 try 270 { 271 partitions = partitioner.partition( plannerContext, priorAnnotated, exclusions ); 272 } 273 catch( Throwable throwable ) 274 { 275 throw new PlannerException( registry, phase, partitioner, priorAnnotated, throwable ); 276 } 277 278 writeTransformTrace( ruleResult, phase, partitioner, parent, child, partitions ); 279 280 List<ElementGraph> results = makeBoundedOn( ruleResult.getAssemblyGraph(), partitions.getAnnotatedSubGraphs() ); 281 282 if( results.isEmpty() ) 283 continue; 284 285 // ignore annotations on equality, but replace an newer graph with prior 286 IgnoreAnnotationsHashSet uniques = new IgnoreAnnotationsHashSet( results ); 287 288 if( uniques.size() != results.size() ) 289 throw new PlannerException( "rule created duplicate element graphs" ); 290 291 // replace child with partitioned results 292 resultChildren.remove( child ); 293 294 for( ElementGraph prior : resultChildren ) 295 { 296 if( !uniques.add( prior ) ) // todo: setting to force failure on duplicates 297 plannerContext.getLogger().logDebug( "re-partition rule created duplicate element graph to prior partitioner: {}, replacing duplicate result", partitioner.getRuleName() ); 298 } 299 300 // order no longer preserved 301 resultChildren = uniques.asList(); 302 } 303 304 subGraphs.put( parent, resultChildren ); 305 } 306 307 ruleResult.setLevelResults( phase.getLevel(), subGraphs ); 308 } 309 310 private void handleParentPartitioning( PlannerContext plannerContext, RuleResult ruleResult, PlanPhase phase, RulePartitioner partitioner ) 311 { 312 Map<ElementGraph, List<? extends ElementGraph>> priorResults = ruleResult.getLevelResults( phase.getLevel() ); 313 314 Map<ElementGraph, List<? extends ElementGraph>> subGraphs = new LinkedHashMap<>(); 315 316 for( Map.Entry<ElementGraph, List<? extends ElementGraph>> entry : priorResults.entrySet() ) 317 { 318 ElementGraph parent = entry.getKey(); 319 List<? extends ElementGraph> priors = entry.getValue(); 320 321 Set<FlowElement> exclusions = getExclusions( priors, partitioner.getAnnotationExcludes() ); 322 ElementGraph priorAnnotated = annotateWithPriors( parent, priors ); 323 324 Partitions partitions; 325 326 try 327 { 328 partitions = partitioner.partition( plannerContext, priorAnnotated, exclusions ); 329 } 330 catch( Throwable throwable ) 331 { 332 throw new PlannerException( registry, phase, partitioner, priorAnnotated, throwable ); 333 } 334 335 writeTransformTrace( ruleResult, phase, partitioner, parent, null, partitions ); 336 337 List<ElementGraph> results = makeBoundedOn( ruleResult.getAssemblyGraph(), partitions.getAnnotatedSubGraphs() ); 338 339 // ignore annotations on equality, but replace an newer graph with prior 340 IgnoreAnnotationsHashSet uniques = new IgnoreAnnotationsHashSet( results ); 341 342 if( uniques.size() != results.size() ) 343 throw new PlannerException( "rule created duplicate element graphs" ); 344 345 for( ElementGraph prior : priors ) 346 { 347 if( !uniques.add( prior ) ) // todo: setting to force failure on duplicates 348 plannerContext.getLogger().logDebug( "partition rule created duplicate element graph to prior partitioner: {}, replacing duplicate result", partitioner.getRuleName() ); 349 } 350 351 // order no longer preserved 352 subGraphs.put( parent, uniques.asList() ); 353 } 354 355 ruleResult.setLevelResults( phase.getLevel(), subGraphs ); 356 } 357 358 private void performAssertion( PlannerContext plannerContext, RuleResult ruleResult, PlanPhase phase, GraphAssert asserter ) 359 { 360 plannerContext.getLogger().logDebug( "applying assertion: {}", ( (Rule) asserter ).getRuleName() ); 361 362 Map<ElementGraph, List<? extends ElementGraph>> levelResults = ruleResult.getLevelResults( phase.getLevel() ); 363 364 for( Map.Entry<ElementGraph, List<? extends ElementGraph>> entry : levelResults.entrySet() ) 365 { 366 ElementGraph parent = entry.getKey(); // null for root case 367 List<? extends ElementGraph> children = entry.getValue(); 368 369 for( ElementGraph child : children ) 370 { 371 Asserted asserted; 372 373 try 374 { 375 asserted = asserter.assertion( plannerContext, child ); 376 } 377 catch( Throwable throwable ) 378 { 379 throw new PlannerException( registry, phase, (Rule) asserter, child, throwable ); 380 } 381 382 writeTransformTrace( ruleResult, phase, (Rule) asserter, parent, child, asserted ); 383 384 FlowElement primary = asserted.getFirstAnchor(); 385 386 if( primary == null ) 387 continue; 388 389 if( asserted.getAssertionType() == GraphAssert.AssertionType.Unsupported ) 390 throw new UnsupportedPlanException( asserted.getFirstAnchor(), asserted.getMessage() ); 391 else // only two options 392 throw new PlannerException( asserted.getFirstAnchor(), asserted.getMessage() ); 393 } 394 } 395 } 396 397 private void performTransform( PlannerContext plannerContext, RuleResult ruleResult, PlanPhase phase, GraphTransformer transformer ) 398 { 399 plannerContext.getLogger().logDebug( "applying transform: {}", ( (Rule) transformer ).getRuleName() ); 400 401 Map<ElementGraph, List<? extends ElementGraph>> levelResults = ruleResult.getLevelResults( phase.getLevel() ); 402 403 for( Map.Entry<ElementGraph, List<? extends ElementGraph>> entry : levelResults.entrySet() ) 404 { 405 ElementGraph parent = entry.getKey(); // null for root case 406 List<? extends ElementGraph> children = entry.getValue(); 407 408 List<ElementGraph> results = new ArrayList<>(); 409 410 for( ElementGraph child : children ) 411 { 412 Transformed transformed; 413 414 try 415 { 416 transformed = transformer.transform( plannerContext, child ); 417 } 418 catch( TransformException exception ) 419 { 420 writeTransformTrace( ruleResult, phase, (Rule) transformer, parent, child, exception.getTransformed() ); 421 422 throw new PlannerException( registry, phase, (Rule) transformer, child, exception.getCause() ); 423 } 424 catch( Throwable throwable ) 425 { 426 throw new PlannerException( registry, phase, (Rule) transformer, child, throwable ); 427 } 428 429 writeTransformTrace( ruleResult, phase, (Rule) transformer, parent, child, transformed ); 430 431 ElementGraph endGraph = transformed.getEndGraph(); 432 433 if( endGraph == null ) 434 results.add( child ); 435 else if( !ElementGraphs.isEmpty( endGraph ) ) 436 results.add( endGraph ); 437 } 438 439 ruleResult.setLevelResults( phase.getLevel(), parent, results ); 440 } 441 } 442 443 private ElementGraph annotateWithPriors( ElementGraph elementGraph, List<? extends ElementGraph> priorResults ) 444 { 445 if( priorResults == null ) 446 return elementGraph; 447 448 // the results are sub-graphs of the elementGraph, so guaranteed to exist in graph 449 AnnotatedGraph resultGraph = new ElementMultiGraph( elementGraph ); 450 451 for( ElementGraph result : priorResults ) 452 { 453 if( !( result instanceof AnnotatedGraph ) || !( (AnnotatedGraph) result ).hasAnnotations() ) 454 continue; 455 456 EnumMultiMap<FlowElement> annotations = ( (AnnotatedGraph) result ).getAnnotations(); 457 458 resultGraph.getAnnotations().addAll( annotations ); 459 } 460 461 return (ElementGraph) resultGraph; 462 } 463 464 private Set<FlowElement> getExclusions( List<? extends ElementGraph> elementGraphs, Enum[] annotationExcludes ) 465 { 466 if( elementGraphs == null ) 467 return null; 468 469 Set<FlowElement> exclusions = createIdentitySet(); 470 471 for( ElementGraph elementGraph : elementGraphs ) 472 { 473 if( !( elementGraph instanceof AnnotatedGraph ) || !( (AnnotatedGraph) elementGraph ).hasAnnotations() ) 474 continue; 475 476 for( Enum annotationExclude : annotationExcludes ) 477 { 478 Set<FlowElement> flowElements = ( (AnnotatedGraph) elementGraph ).getAnnotations().getValues( annotationExclude ); 479 480 if( flowElements != null ) 481 exclusions.addAll( flowElements ); 482 } 483 } 484 485 return exclusions; 486 } 487 488 // use the final assembly graph so we can get Scopes for heads and tails 489 private List<ElementGraph> makeBoundedOn( ElementGraph currentElementGraph, Map<ElementGraph, EnumMultiMap> subGraphs ) 490 { 491 List<ElementGraph> results = new ArrayList<>( subGraphs.size() ); 492 493 for( ElementGraph subGraph : subGraphs.keySet() ) 494 results.add( new BoundedElementMultiGraph( currentElementGraph, subGraph, subGraphs.get( subGraph ) ) ); 495 496 return results; 497 } 498 499 private void writePhaseInitPlan( PlanPhase phase, RuleResult ruleResult ) 500 { 501 switch( phase.getLevel() ) 502 { 503 case Assembly: 504 traceWriter.writeTransformPlan( registry.getName(), ruleResult.getAssemblyGraph(), format( "%02d-%s-init.dot", phase.ordinal(), phase ) ); 505 break; 506 case Step: 507 break; 508 case Node: 509 break; 510 case Pipeline: 511 break; 512 } 513 } 514 515 private void writePhaseResultPlan( PlanPhase phase, RuleResult ruleResult ) 516 { 517 switch( phase.getLevel() ) 518 { 519 case Assembly: 520 traceWriter.writeTransformPlan( registry.getName(), ruleResult.getAssemblyGraph(), format( "%02d-%s-result.dot", phase.ordinal(), phase ) ); 521 break; 522 case Step: 523 traceWriter.writeTransformPlan( registry.getName(), ruleResult.getAssemblyToStepGraphMap().get( ruleResult.getAssemblyGraph() ), phase, "result" ); 524 break; 525 case Node: 526 traceWriter.writeTransformPlan( registry.getName(), ruleResult.getStepToNodeGraphMap(), phase, "result" ); 527 break; 528 case Pipeline: 529 traceWriter.writeTransformPlan( registry.getName(), ruleResult.getStepToNodeGraphMap(), ruleResult.getNodeToPipelineGraphMap(), phase, "result" ); 530 break; 531 } 532 } 533 534 private void logPhase( ProcessLogger logger, boolean logAsInfo, String message, Object... items ) 535 { 536 if( logAsInfo ) 537 logger.logInfo( message, items ); 538 else 539 logger.logDebug( message, items ); 540 } 541 542 private void writeTransformTrace( RuleResult ruleResult, PlanPhase phase, Rule rule, ElementGraph parent, ElementGraph child, GraphResult result ) 543 { 544 if( traceWriter.isTransformTraceDisabled() ) 545 return; 546 547 int[] path = child != null ? ruleResult.getPathFor( parent, child ) : ruleResult.getPathFor( parent ); 548 549 traceWriter.writeTransformPlan( registry.getName(), phase, rule, path, result ); 550 } 551 }