001 /* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.flow.planner; 022 023 import java.util.ArrayList; 024 import java.util.Arrays; 025 import java.util.Collection; 026 import java.util.Collections; 027 import java.util.HashSet; 028 import java.util.List; 029 import java.util.Map; 030 import java.util.Set; 031 032 import cascading.flow.AssemblyPlanner; 033 import cascading.flow.Flow; 034 import cascading.flow.FlowConnector; 035 import cascading.flow.FlowConnectorProps; 036 import cascading.flow.FlowDef; 037 import cascading.flow.FlowElement; 038 import cascading.operation.AssertionLevel; 039 import cascading.operation.DebugLevel; 040 import cascading.pipe.Checkpoint; 041 import cascading.pipe.CoGroup; 042 import cascading.pipe.Each; 043 import cascading.pipe.Every; 044 import cascading.pipe.Group; 045 import cascading.pipe.GroupBy; 046 import cascading.pipe.HashJoin; 047 import cascading.pipe.Merge; 048 import cascading.pipe.OperatorException; 049 import cascading.pipe.Pipe; 050 import cascading.pipe.Splice; 051 import cascading.pipe.SubAssembly; 052 import cascading.property.ConfigDef; 053 import cascading.property.PropertyUtil; 054 import cascading.scheme.Scheme; 055 import cascading.tap.DecoratorTap; 056 import cascading.tap.Tap; 057 import cascading.tap.TapException; 058 import cascading.tuple.Fields; 059 import cascading.util.Update; 060 import cascading.util.Util; 061 import org.jgrapht.GraphPath; 062 import org.jgrapht.Graphs; 063 import org.slf4j.Logger; 064 import org.slf4j.LoggerFactory; 065 066 import static cascading.flow.planner.ElementGraphs.*; 067 import static java.util.Arrays.asList; 068 069 /** Class FlowPlanner is the base class for all planner implementations. */ 070 public abstract class FlowPlanner<F extends Flow, Config> 071 { 072 /** Field LOG */ 073 private static final Logger LOG = LoggerFactory.getLogger( FlowPlanner.class ); 074 075 /** Field properties */ 076 protected Map<Object, Object> properties; 077 078 protected String checkpointRootPath = null; 079 080 /** Field assertionLevel */ 081 protected AssertionLevel assertionLevel; 082 /** Field debugLevel */ 083 protected DebugLevel debugLevel; 084 085 /** 086 * Method getAssertionLevel returns the configured target planner {@link cascading.operation.AssertionLevel}. 087 * 088 * @param properties of type Map<Object, Object> 089 * @return AssertionLevel the configured AssertionLevel 090 */ 091 static AssertionLevel getAssertionLevel( Map<Object, Object> properties ) 092 { 093 String assertionLevel = PropertyUtil.getProperty( properties, "cascading.flowconnector.assertionlevel", AssertionLevel.STRICT.name() ); 094 095 return AssertionLevel.valueOf( assertionLevel ); 096 } 097 098 /** 099 * Method getDebugLevel returns the configured target planner {@link cascading.operation.DebugLevel}. 100 * 101 * @param properties of type Map<Object, Object> 102 * @return DebugLevel the configured DebugLevel 103 */ 104 static DebugLevel getDebugLevel( Map<Object, Object> properties ) 105 { 106 String debugLevel = PropertyUtil.getProperty( properties, "cascading.flowconnector.debuglevel", DebugLevel.DEFAULT.name() ); 107 108 return DebugLevel.valueOf( debugLevel ); 109 } 110 111 { 112 Update.registerPlanner( getClass() ); 113 } 114 115 public Map<Object, Object> getProperties() 116 { 117 return properties; 118 } 119 120 public abstract Config getConfig(); 121 122 public abstract PlatformInfo getPlatformInfo(); 123 124 public void initialize( FlowConnector flowConnector, Map<Object, Object> properties ) 125 { 126 this.properties = properties; 127 this.assertionLevel = getAssertionLevel( properties ); 128 this.debugLevel = getDebugLevel( properties ); 129 } 130 131 protected abstract Flow createFlow( FlowDef flowDef ); 132 133 /** 134 * Method buildFlow renders the actual Flow instance. 135 * 136 * @param flowDef 137 * @return Flow 138 */ 139 public abstract F buildFlow( FlowDef flowDef ); 140 141 protected Pipe[] resolveTails( FlowDef flowDef, Flow<Config> flow ) 142 { 143 Pipe[] tails = flowDef.getTailsArray(); 144 145 tails = resolveAssemblyPlanners( flowDef, flow, tails ); 146 147 return tails; 148 } 149 150 protected Pipe[] resolveAssemblyPlanners( FlowDef flowDef, Flow flow, Pipe[] pipes ) 151 { 152 List<Pipe> tails = Arrays.asList( pipes ); 153 154 List<AssemblyPlanner> assemblyPlanners = flowDef.getAssemblyPlanners(); 155 156 for( AssemblyPlanner assemblyPlanner : assemblyPlanners ) 157 { 158 tails = assemblyPlanner.resolveTails( new AssemblyPlannerContext( flowDef, flow, tails ) ); 159 160 if( tails.isEmpty() ) 161 throw new PlannerException( "assembly planner: " + assemblyPlanner + ", returned zero tails" ); 162 163 tails = Collections.unmodifiableList( tails ); 164 } 165 166 return tails.toArray( new Pipe[ tails.size() ] ); 167 } 168 169 protected void verifyAssembly( FlowDef flowDef, Pipe[] tails ) 170 { 171 verifyPipeAssemblyEndPoints( flowDef, tails ); 172 verifyTraps( flowDef, tails ); 173 verifyCheckpoints( flowDef, tails ); 174 } 175 176 protected void verifyAllTaps( FlowDef flowDef ) 177 { 178 verifySourceNotSinks( flowDef.getSources(), flowDef.getSinks() ); 179 180 verifyTaps( flowDef.getSources(), true, true ); 181 verifyTaps( flowDef.getSinks(), false, true ); 182 verifyTaps( flowDef.getTraps(), false, false ); 183 184 // are both sources and sinks 185 verifyTaps( flowDef.getCheckpoints(), true, false ); 186 verifyTaps( flowDef.getCheckpoints(), false, false ); 187 } 188 189 protected ElementGraph createElementGraph( FlowDef flowDef, Pipe[] flowTails ) 190 { 191 Map<String, Tap> sources = flowDef.getSourcesCopy(); 192 Map<String, Tap> sinks = flowDef.getSinksCopy(); 193 Map<String, Tap> traps = flowDef.getTrapsCopy(); 194 Map<String, Tap> checkpoints = flowDef.getCheckpointsCopy(); 195 196 AssertionLevel assertionLevel = flowDef.getAssertionLevel() == null ? this.assertionLevel : flowDef.getAssertionLevel(); 197 DebugLevel debugLevel = flowDef.getDebugLevel() == null ? this.debugLevel : flowDef.getDebugLevel(); 198 199 checkpointRootPath = makeCheckpointRootPath( flowDef ); 200 201 return new ElementGraph( getPlatformInfo(), flowTails, sources, sinks, traps, checkpoints, checkpointRootPath != null, assertionLevel, debugLevel ); 202 } 203 204 private String makeCheckpointRootPath( FlowDef flowDef ) 205 { 206 String flowName = flowDef.getName(); 207 String runID = flowDef.getRunID(); 208 209 if( runID == null ) 210 return null; 211 212 if( flowName == null ) 213 throw new PlannerException( "flow name is required when providing a run id" ); 214 215 return flowName + "/" + runID; 216 } 217 218 protected void verifySourceNotSinks( Map<String, Tap> sources, Map<String, Tap> sinks ) 219 { 220 Collection<Tap> sourcesSet = sources.values(); 221 222 for( Tap tap : sinks.values() ) 223 { 224 if( sourcesSet.contains( tap ) ) 225 throw new PlannerException( "tap may not be used as both source and sink in the same Flow: " + tap ); 226 } 227 } 228 229 /** 230 * Method verifyTaps ... 231 * 232 * @param taps of type Map<String, Tap> 233 * @param areSources of type boolean 234 * @param mayNotBeEmpty of type boolean 235 */ 236 protected void verifyTaps( Map<String, Tap> taps, boolean areSources, boolean mayNotBeEmpty ) 237 { 238 if( mayNotBeEmpty && taps.isEmpty() ) 239 throw new PlannerException( ( areSources ? "source" : "sink" ) + " taps are required" ); 240 241 for( String tapName : taps.keySet() ) 242 { 243 if( areSources && !taps.get( tapName ).isSource() ) 244 throw new PlannerException( "tap named: '" + tapName + "', cannot be used as a source: " + taps.get( tapName ) ); 245 else if( !areSources && !taps.get( tapName ).isSink() ) 246 throw new PlannerException( "tap named: '" + tapName + "', cannot be used as a sink: " + taps.get( tapName ) ); 247 } 248 } 249 250 /** 251 * Method verifyEndPoints verifies 252 * <p/> 253 * there aren't dupe names in heads or tails. 254 * all the sink and source tap names match up with tail and head pipes 255 */ 256 // todo: force dupe names to throw exceptions 257 protected void verifyPipeAssemblyEndPoints( FlowDef flowDef, Pipe[] flowTails ) 258 { 259 Set<String> tapNames = new HashSet<String>(); 260 261 tapNames.addAll( flowDef.getSources().keySet() ); 262 tapNames.addAll( flowDef.getSinks().keySet() ); 263 264 // handle tails 265 Set<Pipe> tails = new HashSet<Pipe>(); 266 Set<String> tailNames = new HashSet<String>(); 267 268 for( Pipe pipe : flowTails ) 269 { 270 if( pipe instanceof SubAssembly ) 271 { 272 for( Pipe tail : ( (SubAssembly) pipe ).getTails() ) 273 { 274 String tailName = tail.getName(); 275 276 if( !tapNames.contains( tailName ) ) 277 throw new PlannerException( tail, "pipe name not found in either sink or source map: '" + tailName + "'" ); 278 279 if( tailNames.contains( tailName ) && !tails.contains( tail ) ) 280 LOG.warn( "duplicate tail name found: '{}'", tailName ); 281 // throw new PlannerException( pipe, "duplicate tail name found: " + tailName ); 282 283 tailNames.add( tailName ); 284 tails.add( tail ); 285 } 286 } 287 else 288 { 289 String tailName = pipe.getName(); 290 291 if( !tapNames.contains( tailName ) ) 292 throw new PlannerException( pipe, "pipe name not found in either sink or source map: '" + tailName + "'" ); 293 294 if( tailNames.contains( tailName ) && !tails.contains( pipe ) ) 295 LOG.warn( "duplicate tail name found: '{}'", tailName ); 296 // throw new PlannerException( pipe, "duplicate tail name found: " + tailName ); 297 298 tailNames.add( tailName ); 299 tails.add( pipe ); 300 } 301 } 302 303 // Set<String> allTailNames = new HashSet<String>( tailNames ); 304 tailNames.removeAll( flowDef.getSinks().keySet() ); 305 Set<String> remainingSinks = new HashSet<String>( flowDef.getSinks().keySet() ); 306 remainingSinks.removeAll( tailNames ); 307 308 if( tailNames.size() != 0 ) 309 throw new PlannerException( "not all tail pipes bound to sink taps, remaining tail pipe names: [" + Util.join( Util.quote( tailNames, "'" ), ", " ) + "], remaining sink tap names: [" + Util.join( Util.quote( remainingSinks, "'" ), ", " ) + "]" ); 310 311 // unlike heads, pipes can input to another pipe and simultaneously be a sink 312 // so there is no way to know all the intentional tails, so they aren't listed below in the exception 313 remainingSinks = new HashSet<String>( flowDef.getSinks().keySet() ); 314 remainingSinks.removeAll( asList( Pipe.names( flowTails ) ) ); 315 316 if( remainingSinks.size() != 0 ) 317 throw new PlannerException( "not all sink taps bound to tail pipes, remaining sink tap names: [" + Util.join( Util.quote( remainingSinks, "'" ), ", " ) + "]" ); 318 319 // handle heads 320 Set<Pipe> heads = new HashSet<Pipe>(); 321 Set<String> headNames = new HashSet<String>(); 322 323 for( Pipe pipe : flowTails ) 324 { 325 for( Pipe head : pipe.getHeads() ) 326 { 327 String headName = head.getName(); 328 329 if( !tapNames.contains( headName ) ) 330 throw new PlannerException( head, "pipe name not found in either sink or source map: '" + headName + "'" ); 331 332 if( headNames.contains( headName ) && !heads.contains( head ) ) 333 LOG.warn( "duplicate head name found, not an error but heads should have unique names: '{}'", headName ); 334 // throw new PlannerException( pipe, "duplicate head name found: " + headName ); 335 336 headNames.add( headName ); 337 heads.add( head ); 338 } 339 } 340 341 Set<String> allHeadNames = new HashSet<String>( headNames ); 342 headNames.removeAll( flowDef.getSources().keySet() ); 343 Set<String> remainingSources = new HashSet<String>( flowDef.getSources().keySet() ); 344 remainingSources.removeAll( headNames ); 345 346 if( headNames.size() != 0 ) 347 throw new PlannerException( "not all head pipes bound to source taps, remaining head pipe names: [" + Util.join( Util.quote( headNames, "'" ), ", " ) + "], remaining source tap names: [" + Util.join( Util.quote( remainingSources, "'" ), ", " ) + "]" ); 348 349 remainingSources = new HashSet<String>( flowDef.getSources().keySet() ); 350 remainingSources.removeAll( allHeadNames ); 351 352 if( remainingSources.size() != 0 ) 353 throw new PlannerException( "not all source taps bound to head pipes, remaining source tap names: [" + Util.join( Util.quote( remainingSources, "'" ), ", " ) + "], remaining head pipe names: [" + Util.join( Util.quote( headNames, "'" ), ", " ) + "]" ); 354 355 } 356 357 protected void verifyTraps( FlowDef flowDef, Pipe[] flowTails ) 358 { 359 verifyNotSourcesSinks( flowDef.getTraps(), flowDef.getSources(), flowDef.getSinks(), "trap" ); 360 361 Set<String> names = new HashSet<String>( asList( Pipe.names( flowTails ) ) ); 362 363 for( String name : flowDef.getTraps().keySet() ) 364 { 365 if( !names.contains( name ) ) 366 throw new PlannerException( "trap name not found in assembly: '" + name + "'" ); 367 } 368 } 369 370 protected void verifyCheckpoints( FlowDef flowDef, Pipe[] flowTails ) 371 { 372 verifyNotSourcesSinks( flowDef.getCheckpoints(), flowDef.getSources(), flowDef.getSinks(), "checkpoint" ); 373 374 for( Tap checkpointTap : flowDef.getCheckpoints().values() ) 375 { 376 Scheme scheme = checkpointTap.getScheme(); 377 378 if( scheme.getSourceFields().equals( Fields.UNKNOWN ) && scheme.getSinkFields().equals( Fields.ALL ) ) 379 continue; 380 381 throw new PlannerException( "checkpoint tap scheme must be undeclared, source fields must be UNKNOWN, and sink fields ALL, got: " + scheme.toString() ); 382 } 383 384 Set<String> names = new HashSet<String>( asList( Pipe.names( flowTails ) ) ); 385 386 for( String name : flowDef.getCheckpoints().keySet() ) 387 { 388 if( !names.contains( name ) ) 389 throw new PlannerException( "named checkpoint declared in FlowDef, but no named branch found in pipe assembly: '" + name + "'" ); 390 391 Set<Pipe> pipes = new HashSet<Pipe>( asList( Pipe.named( name, flowTails ) ) ); 392 393 int count = 0; 394 395 for( Pipe pipe : pipes ) 396 { 397 if( pipe instanceof Checkpoint ) 398 count++; 399 } 400 401 if( count == 0 ) 402 throw new PlannerException( "no checkpoint pipe with branch name found in pipe assembly: '" + name + "'" ); 403 404 if( count > 1 ) 405 throw new PlannerException( "more than one checkpoint pipe with branch name found in pipe assembly: '" + name + "'" ); 406 } 407 } 408 409 private void verifyNotSourcesSinks( Map<String, Tap> taps, Map<String, Tap> sources, Map<String, Tap> sinks, String role ) 410 { 411 Collection<Tap> sourceTaps = sources.values(); 412 Collection<Tap> sinkTaps = sinks.values(); 413 414 for( Tap tap : taps.values() ) 415 { 416 if( sourceTaps.contains( tap ) ) 417 throw new PlannerException( "tap may not be used as both a " + role + " and a source in the same Flow: " + tap ); 418 419 if( sinkTaps.contains( tap ) ) 420 throw new PlannerException( "tap may not be used as both a " + role + " and a sink in the same Flow: " + tap ); 421 } 422 } 423 424 /** 425 * Verifies that there are not only GroupAssertions following any given Group instance. This will adversely 426 * affect the stream entering any subsequent Tap of Each instances. 427 */ 428 protected void failOnLoneGroupAssertion( ElementGraph elementGraph ) 429 { 430 List<Group> groups = elementGraph.findAllGroups(); 431 432 // walk Every instances after Group 433 for( Group group : groups ) 434 { 435 for( GraphPath<FlowElement, Scope> path : elementGraph.getAllShortestPathsFrom( group ) ) 436 { 437 List<FlowElement> flowElements = Graphs.getPathVertexList( path ); // last element is tail 438 439 int everies = 0; 440 int assertions = 0; 441 442 for( FlowElement flowElement : flowElements ) 443 { 444 if( flowElement instanceof Group ) 445 continue; 446 447 if( !( flowElement instanceof Every ) ) 448 break; 449 450 everies++; 451 452 Every every = (Every) flowElement; 453 454 if( every.getPlannerLevel() != null ) 455 assertions++; 456 } 457 458 if( everies != 0 && everies == assertions ) 459 throw new PlannerException( "group assertions must be accompanied by aggregator operations" ); 460 } 461 } 462 } 463 464 protected void failOnMissingGroup( ElementGraph elementGraph ) 465 { 466 List<Every> everies = elementGraph.findAllEveries(); 467 468 // walk Every instances after Group 469 for( Every every : everies ) 470 { 471 for( GraphPath<FlowElement, Scope> path : elementGraph.getAllShortestPathsTo( every ) ) 472 { 473 List<FlowElement> flowElements = Graphs.getPathVertexList( path ); // last element is every 474 Collections.reverse( flowElements ); // first element is every 475 476 for( FlowElement flowElement : flowElements ) 477 { 478 if( flowElement instanceof Every || flowElement.getClass() == Pipe.class ) 479 continue; 480 481 if( flowElement instanceof GroupBy || flowElement instanceof CoGroup ) 482 break; 483 484 throw new PlannerException( (Pipe) flowElement, "Every may only be preceded by another Every or a Group pipe, found: " + flowElement ); 485 } 486 } 487 } 488 } 489 490 protected void failOnMisusedBuffer( ElementGraph elementGraph ) 491 { 492 List<Every> everies = elementGraph.findAllEveries(); 493 494 // walk Every instances after Group 495 for( Every every : everies ) 496 { 497 for( GraphPath<FlowElement, Scope> path : elementGraph.getAllShortestPathsTo( every ) ) 498 { 499 List<FlowElement> flowElements = Graphs.getPathVertexList( path ); // last element is every 500 Collections.reverse( flowElements ); // first element is every 501 502 Every last = null; 503 boolean foundBuffer = false; 504 int foundEveries = -1; 505 506 for( FlowElement flowElement : flowElements ) 507 { 508 if( flowElement instanceof Each ) 509 throw new PlannerException( (Pipe) flowElement, "Every may only be preceded by another Every or a GroupBy or CoGroup pipe, found: " + flowElement ); 510 511 if( flowElement instanceof Every ) 512 { 513 foundEveries++; 514 515 boolean isBuffer = ( (Every) flowElement ).isBuffer(); 516 517 if( foundEveries != 0 && ( isBuffer || foundBuffer ) ) 518 throw new PlannerException( (Pipe) flowElement, "Only one Every with a Buffer may follow a GroupBy or CoGroup pipe, no other Every instances are allowed immediately before or after, found: " + flowElement + " before: " + last ); 519 520 if( !foundBuffer ) 521 foundBuffer = isBuffer; 522 523 last = (Every) flowElement; 524 } 525 526 if( flowElement instanceof Group ) 527 break; 528 } 529 } 530 } 531 } 532 533 protected void failOnGroupEverySplit( ElementGraph elementGraph ) 534 { 535 List<Group> groups = new ArrayList<Group>(); 536 537 elementGraph.findAllOfType( 1, 2, Group.class, groups ); 538 539 for( Group group : groups ) 540 { 541 Set<FlowElement> children = elementGraph.getAllChildrenNotExactlyType( group, Pipe.class ); 542 543 for( FlowElement flowElement : children ) 544 { 545 if( flowElement instanceof Every ) 546 throw new PlannerException( (Every) flowElement, "Every instances may not split after a GroupBy or CoGroup pipe, found: " + flowElement + " after: " + group ); 547 } 548 } 549 } 550 551 protected PlannerException handleExceptionDuringPlanning( Exception exception, ElementGraph elementGraph ) 552 { 553 if( exception instanceof PlannerException ) 554 { 555 ( (PlannerException) exception ).elementGraph = elementGraph; 556 557 return (PlannerException) exception; 558 } 559 else if( exception instanceof ElementGraphException ) 560 { 561 Throwable cause = exception.getCause(); 562 563 if( cause == null ) 564 cause = exception; 565 566 // captures pipegraph for debugging 567 // forward message in case cause or trace is lost 568 String message = "could not build flow from assembly"; 569 570 if( cause.getMessage() != null ) 571 message = String.format( "%s: [%s]", message, cause.getMessage() ); 572 573 if( cause instanceof OperatorException ) 574 return new PlannerException( message, cause, elementGraph ); 575 576 if( cause instanceof TapException ) 577 return new PlannerException( message, cause, elementGraph ); 578 579 return new PlannerException( ( (ElementGraphException) exception ).getPipe(), message, cause, elementGraph ); 580 } 581 else 582 { 583 // captures pipegraph for debugging 584 // forward message in case cause or trace is lost 585 String message = "could not build flow from assembly"; 586 587 if( exception.getMessage() != null ) 588 message = String.format( "%s: [%s]", message, exception.getMessage() ); 589 590 return new PlannerException( message, exception, elementGraph ); 591 } 592 } 593 594 protected void handleNonSafeOperations( ElementGraph elementGraph ) 595 { 596 // if there was a graph change, iterate paths again. 597 while( !internalNonSafeOperations( elementGraph ) ) 598 ; 599 } 600 601 private boolean internalNonSafeOperations( ElementGraph elementGraph ) 602 { 603 Set<Pipe> tapInsertions = new HashSet<Pipe>(); 604 605 List<Pipe> splits = elementGraph.findAllPipeSplits(); 606 607 // if any predecessor is unsafe, insert temp 608 for( Pipe split : splits ) 609 { 610 List<GraphPath<FlowElement, Scope>> paths = elementGraph.getAllShortestPathsTo( split ); 611 612 for( GraphPath<FlowElement, Scope> path : paths ) 613 { 614 List<FlowElement> elements = Graphs.getPathVertexList( path ); 615 Collections.reverse( elements ); 616 617 for( FlowElement element : elements ) 618 { 619 if( !( element instanceof Each ) && element.getClass() != Pipe.class ) 620 break; 621 622 if( element.getClass() == Pipe.class ) 623 continue; 624 625 if( !( (Each) element ).getOperation().isSafe() ) 626 { 627 tapInsertions.add( split ); 628 break; 629 } 630 } 631 } 632 } 633 634 for( Pipe pipe : tapInsertions ) 635 insertTempTapAfter( elementGraph, pipe ); 636 637 return tapInsertions.isEmpty(); 638 } 639 640 /** 641 * Method insertTapAfter ... 642 * 643 * @param graph of type PipeGraph 644 * @param pipe of type Pipe 645 */ 646 protected void insertTempTapAfter( ElementGraph graph, Pipe pipe ) 647 { 648 LOG.debug( "inserting tap after: {}", pipe ); 649 650 Tap checkpointTap = graph.getCheckpointsMap().get( pipe.getName() ); 651 652 if( checkpointTap != null ) 653 { 654 LOG.info( "found checkpoint: {}, using tap: {}", pipe.getName(), checkpointTap ); 655 checkpointTap = decorateTap( pipe, checkpointTap, FlowConnectorProps.CHECKPOINT_TAP_DECORATOR_CLASS ); 656 } 657 658 if( checkpointTap == null ) 659 { 660 // only restart from a checkpoint pipe or checkpoint tap below 661 if( pipe instanceof Checkpoint ) 662 { 663 checkpointTap = makeTempTap( checkpointRootPath, pipe.getName() ); 664 checkpointTap = decorateTap( pipe, checkpointTap, FlowConnectorProps.CHECKPOINT_TAP_DECORATOR_CLASS ); 665 // mark as an anonymous checkpoint 666 checkpointTap.getConfigDef().setProperty( ConfigDef.Mode.DEFAULT, "cascading.checkpoint", "true" ); 667 } 668 else 669 { 670 checkpointTap = makeTempTap( pipe.getName() ); 671 } 672 } 673 674 checkpointTap = decorateTap( pipe, checkpointTap, FlowConnectorProps.TEMPORARY_TAP_DECORATOR_CLASS ); 675 676 graph.insertFlowElementAfter( pipe, checkpointTap ); 677 } 678 679 private Tap decorateTap( Pipe pipe, Tap tempTap, String decoratorClass ) 680 { 681 String decoratorClassName = PropertyUtil.getProperty( properties, pipe, decoratorClass ); 682 683 if( Util.isEmpty( decoratorClassName ) ) 684 return tempTap; 685 686 LOG.info( "found decorator: {}, wrapping tap: {}", decoratorClass, tempTap ); 687 688 tempTap = Util.newInstance( decoratorClassName, tempTap ); 689 690 return tempTap; 691 } 692 693 protected Tap makeTempTap( String name ) 694 { 695 return makeTempTap( null, name ); 696 } 697 698 protected abstract Tap makeTempTap( String prefix, String name ); 699 700 /** 701 * Inserts a temporary Tap between logical MR jobs. 702 * <p/> 703 * Since all joins are at groups or splices, depth first search is safe 704 * <p/> 705 * todo: refactor so that rules are applied to path segments bounded by taps 706 * todo: this would allow balancing of operations within paths instead of pushing 707 * todo: all operations up. may allow for consolidation of rules 708 * 709 * @param elementGraph of type PipeGraph 710 */ 711 protected void handleJobPartitioning( ElementGraph elementGraph ) 712 { 713 // if there was a graph change, iterate paths again. prevents many temp taps from being inserted in front of a group 714 while( !internalJobPartitioning( elementGraph ) ) 715 ; 716 } 717 718 private boolean internalJobPartitioning( ElementGraph elementGraph ) 719 { 720 for( GraphPath<FlowElement, Scope> path : elementGraph.getAllShortestPathsBetweenExtents() ) 721 { 722 List<FlowElement> flowElements = Graphs.getPathVertexList( path ); 723 List<Pipe> tapInsertions = new ArrayList<Pipe>(); 724 725 boolean foundGroup = false; 726 727 for( int i = 0; i < flowElements.size(); i++ ) 728 { 729 FlowElement flowElement = flowElements.get( i ); 730 731 if( flowElement instanceof ElementGraph.Extent ) // is an extent: head or tail 732 continue; 733 else if( flowElement instanceof Tap && flowElements.get( i - 1 ) instanceof ElementGraph.Extent ) // is a source tap 734 continue; 735 736 if( flowElement instanceof Group && !foundGroup ) 737 { 738 foundGroup = true; 739 } 740 else if( flowElement instanceof Splice && foundGroup ) // add tap between groups, push joins/merge map side 741 { 742 tapInsertions.add( (Pipe) flowElements.get( i - 1 ) ); 743 744 if( !( flowElement instanceof Group ) ) 745 foundGroup = false; 746 } 747 else if( flowElement instanceof Checkpoint ) // add tap after checkpoint 748 { 749 if( flowElements.get( i + 1 ) instanceof Tap ) // don't keep inserting 750 continue; 751 752 tapInsertions.add( (Pipe) flowElement ); 753 foundGroup = false; 754 } 755 else if( flowElement instanceof Tap ) 756 { 757 foundGroup = false; 758 } 759 } 760 761 for( Pipe pipe : tapInsertions ) 762 insertTempTapAfter( elementGraph, pipe ); 763 764 if( !tapInsertions.isEmpty() ) 765 return false; 766 } 767 768 return true; 769 } 770 771 /** 772 * Prevent leftmost sources from sourcing a downstream join on the rightmost side intra-task by inserting a 773 * temp tap between the left-sourced join and right-sourced join. 774 * 775 * @param elementGraph 776 */ 777 protected void handleJoins( ElementGraph elementGraph ) 778 { 779 while( !internalJoins( elementGraph ) ) 780 ; 781 } 782 783 private boolean internalJoins( ElementGraph elementGraph ) 784 { 785 List<GraphPath<FlowElement, Scope>> paths = elementGraph.getAllShortestPathsBetweenExtents(); 786 787 // large to small 788 Collections.reverse( paths ); 789 790 for( GraphPath<FlowElement, Scope> path : paths ) 791 { 792 List<FlowElement> flowElements = Graphs.getPathVertexList( path ); 793 List<Pipe> tapInsertions = new ArrayList<Pipe>(); 794 List<HashJoin> joins = new ArrayList<HashJoin>(); 795 List<Merge> merges = new ArrayList<Merge>(); 796 797 FlowElement lastSourceElement = null; 798 799 for( int i = 0; i < flowElements.size(); i++ ) 800 { 801 FlowElement flowElement = flowElements.get( i ); 802 803 if( flowElement instanceof Merge ) 804 { 805 merges.add( (Merge) flowElement ); 806 } 807 else if( flowElement instanceof HashJoin ) 808 { 809 HashJoin join = (HashJoin) flowElement; 810 811 Map<Integer, Integer> pathCounts = countOrderedDirectPathsBetween( elementGraph, lastSourceElement, join, true ); 812 813 // is this path streamed 814 int pathPosition = pathPositionInto( path, join ); 815 boolean thisPathIsStreamed = pathPosition == 0; 816 817 boolean isAccumulatedAndStreamed = isBothAccumulatedAndStreamedPath( pathCounts ); // has streamed and accumulated paths 818 int pathCount = countPaths( pathCounts ); 819 820 int priorJoins = countTypesBetween( elementGraph, lastSourceElement, join, HashJoin.class ); 821 822 if( priorJoins == 0 ) 823 { 824 // if same source is leading into the hashjoin, insert tap on the accumulated side 825 if( pathCount == 2 && isAccumulatedAndStreamed && !thisPathIsStreamed ) 826 { 827 tapInsertions.add( (Pipe) flowElements.get( flowElements.indexOf( join ) - 1 ) ); 828 break; 829 } 830 831 // if more than one path into streamed and accumulated branches, insert tap on streamed side 832 if( pathCount > 2 && isAccumulatedAndStreamed && thisPathIsStreamed ) 833 { 834 tapInsertions.add( (Pipe) flowElements.get( flowElements.indexOf( join ) - 1 ) ); 835 break; 836 } 837 } 838 839 if( !merges.isEmpty() ) 840 { 841 // if a Merge is prior to a HashJoin, and its an accumulated path, force Merge results to disk 842 int joinPos = flowElements.indexOf( join ); 843 int mergePos = nearest( flowElements, joinPos, merges ); 844 845 if( mergePos != -1 && joinPos > mergePos ) 846 { 847 // if all paths are accumulated and streamed, insert 848 // else if just if this path is accumulated 849 if( ( isAccumulatedAndStreamed && thisPathIsStreamed ) || !thisPathIsStreamed ) 850 { 851 tapInsertions.add( (Pipe) flowElements.get( flowElements.indexOf( join ) - 1 ) ); 852 break; 853 } 854 } 855 } 856 857 joins.add( (HashJoin) flowElement ); 858 } 859 else if( flowElement instanceof Tap || flowElement instanceof Group ) 860 { 861 // added for JoinFieldedPipesPlatformTest.testJoinMergeGroupBy where Merge hides streamed nature of path 862 if( flowElement instanceof Group && !joins.isEmpty() ) 863 { 864 List<Splice> splices = new ArrayList<Splice>(); 865 866 splices.addAll( merges ); 867 splices.add( (Splice) flowElement ); 868 869 Collections.reverse( splices ); 870 871 for( Splice splice : splices ) 872 { 873 Map<Integer, Integer> pathCounts = countOrderedDirectPathsBetween( elementGraph, lastSourceElement, splice, true ); 874 875 if( isBothAccumulatedAndStreamedPath( pathCounts ) ) 876 { 877 tapInsertions.add( (Pipe) flowElements.get( flowElements.indexOf( splice ) - 1 ) ); 878 break; 879 } 880 } 881 882 if( !tapInsertions.isEmpty() ) 883 break; 884 } 885 886 for( int j = 0; j < joins.size(); j++ ) 887 { 888 HashJoin join = joins.get( j ); 889 890 int pathPosition = pathPositionInto( path, join ); 891 boolean thisPathIsStreamed = pathPosition == 0; 892 893 Map<Integer, Integer> pathCounts = countOrderedDirectPathsBetween( elementGraph, lastSourceElement, join, true ); 894 895 boolean isAccumulatedAndStreamed = isBothAccumulatedAndStreamedPath( pathCounts ); // has streamed and accumulated paths 896 int pathCount = countPaths( pathCounts ); 897 898 if( pathCount >= 2 && isAccumulatedAndStreamed && thisPathIsStreamed ) 899 { 900 tapInsertions.add( (Pipe) flowElements.get( flowElements.indexOf( join ) - 1 ) ); 901 break; 902 } 903 904 if( thisPathIsStreamed ) 905 continue; 906 907 if( j == 0 ) // is accumulated on first join 908 break; 909 910 // prevent a streamed path from being accumulated by injecting a tap before the 911 // current HashJoin 912 tapInsertions.add( (Pipe) flowElements.get( flowElements.indexOf( join ) - 1 ) ); 913 break; 914 } 915 916 if( !tapInsertions.isEmpty() ) 917 break; 918 919 lastSourceElement = flowElement; 920 merges.clear(); 921 joins.clear(); 922 } 923 } 924 925 for( Pipe pipe : tapInsertions ) 926 insertTempTapAfter( elementGraph, pipe ); 927 928 if( !tapInsertions.isEmpty() ) 929 return false; 930 } 931 932 return true; 933 } 934 935 private int nearest( List<FlowElement> flowElements, int index, List<Merge> merges ) 936 { 937 List<Merge> reversed = new ArrayList<Merge>( merges ); 938 Collections.reverse( reversed ); 939 940 for( Merge merge : reversed ) 941 { 942 int pos = flowElements.indexOf( merge ); 943 if( pos < index ) 944 return pos; 945 } 946 947 return -1; 948 } 949 }