001 /* 002 * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.flow.planner; 022 023 import java.util.ArrayList; 024 import java.util.Arrays; 025 import java.util.Collection; 026 import java.util.Collections; 027 import java.util.HashSet; 028 import java.util.List; 029 import java.util.Map; 030 import java.util.Set; 031 032 import cascading.flow.AssemblyPlanner; 033 import cascading.flow.Flow; 034 import cascading.flow.FlowConnector; 035 import cascading.flow.FlowConnectorProps; 036 import cascading.flow.FlowDef; 037 import cascading.flow.FlowElement; 038 import cascading.operation.AssertionLevel; 039 import cascading.operation.DebugLevel; 040 import cascading.pipe.Checkpoint; 041 import cascading.pipe.CoGroup; 042 import cascading.pipe.Each; 043 import cascading.pipe.Every; 044 import cascading.pipe.Group; 045 import cascading.pipe.GroupBy; 046 import cascading.pipe.HashJoin; 047 import cascading.pipe.Merge; 048 import cascading.pipe.OperatorException; 049 import cascading.pipe.Pipe; 050 import cascading.pipe.Splice; 051 import cascading.pipe.SubAssembly; 052 import cascading.property.ConfigDef; 053 import cascading.property.PropertyUtil; 054 import cascading.scheme.Scheme; 055 import cascading.tap.DecoratorTap; 056 import cascading.tap.Tap; 057 import cascading.tap.TapException; 058 import cascading.tuple.Fields; 059 import cascading.util.Util; 060 import org.jgrapht.GraphPath; 061 import org.jgrapht.Graphs; 062 import org.slf4j.Logger; 063 import org.slf4j.LoggerFactory; 064 065 import static cascading.flow.planner.ElementGraphs.*; 066 import static java.util.Arrays.asList; 067 068 /** Class FlowPlanner is the base class for all planner implementations. */ 069 public abstract class FlowPlanner<F extends Flow, Config> 070 { 071 /** Field LOG */ 072 private static final Logger LOG = LoggerFactory.getLogger( FlowPlanner.class ); 073 074 /** Field properties */ 075 protected Map<Object, Object> properties; 076 077 protected String checkpointRootPath = null; 078 079 /** Field assertionLevel */ 080 protected AssertionLevel assertionLevel; 081 /** Field debugLevel */ 082 protected DebugLevel debugLevel; 083 084 /** 085 * Method getAssertionLevel returns the configured target planner {@link cascading.operation.AssertionLevel}. 086 * 087 * @param properties of type Map<Object, Object> 088 * @return AssertionLevel the configured AssertionLevel 089 */ 090 static AssertionLevel getAssertionLevel( Map<Object, Object> properties ) 091 { 092 String assertionLevel = PropertyUtil.getProperty( properties, "cascading.flowconnector.assertionlevel", AssertionLevel.STRICT.name() ); 093 094 return AssertionLevel.valueOf( assertionLevel ); 095 } 096 097 /** 098 * Method getDebugLevel returns the configured target planner {@link cascading.operation.DebugLevel}. 099 * 100 * @param properties of type Map<Object, Object> 101 * @return DebugLevel the configured DebugLevel 102 */ 103 static DebugLevel getDebugLevel( Map<Object, Object> properties ) 104 { 105 String debugLevel = PropertyUtil.getProperty( properties, "cascading.flowconnector.debuglevel", DebugLevel.DEFAULT.name() ); 106 107 return DebugLevel.valueOf( debugLevel ); 108 } 109 110 public Map<Object, Object> getProperties() 111 { 112 return properties; 113 } 114 115 public abstract Config getConfig(); 116 117 public abstract PlatformInfo getPlatformInfo(); 118 119 public void initialize( FlowConnector flowConnector, Map<Object, Object> properties ) 120 { 121 this.properties = properties; 122 this.assertionLevel = getAssertionLevel( properties ); 123 this.debugLevel = getDebugLevel( properties ); 124 } 125 126 protected abstract Flow createFlow( FlowDef flowDef ); 127 128 /** 129 * Method buildFlow renders the actual Flow instance. 130 * 131 * @param flowDef 132 * @return Flow 133 */ 134 public abstract F buildFlow( FlowDef flowDef ); 135 136 protected Pipe[] resolveTails( FlowDef flowDef, Flow<Config> flow ) 137 { 138 Pipe[] tails = flowDef.getTailsArray(); 139 140 tails = resolveAssemblyPlanners( flowDef, flow, tails ); 141 142 return tails; 143 } 144 145 protected Pipe[] resolveAssemblyPlanners( FlowDef flowDef, Flow flow, Pipe[] pipes ) 146 { 147 List<Pipe> tails = Arrays.asList( pipes ); 148 149 List<AssemblyPlanner> assemblyPlanners = flowDef.getAssemblyPlanners(); 150 151 for( AssemblyPlanner assemblyPlanner : assemblyPlanners ) 152 { 153 tails = assemblyPlanner.resolveTails( new AssemblyPlannerContext( flowDef, flow, tails ) ); 154 155 if( tails.isEmpty() ) 156 throw new PlannerException( "assembly planner: " + assemblyPlanner + ", returned zero tails" ); 157 158 tails = Collections.unmodifiableList( tails ); 159 } 160 161 return tails.toArray( new Pipe[ tails.size() ] ); 162 } 163 164 protected void verifyAssembly( FlowDef flowDef, Pipe[] tails ) 165 { 166 verifyPipeAssemblyEndPoints( flowDef, tails ); 167 verifyTraps( flowDef, tails ); 168 verifyCheckpoints( flowDef, tails ); 169 } 170 171 protected void verifyAllTaps( FlowDef flowDef ) 172 { 173 verifySourceNotSinks( flowDef.getSources(), flowDef.getSinks() ); 174 175 verifyTaps( flowDef.getSources(), true, true ); 176 verifyTaps( flowDef.getSinks(), false, true ); 177 verifyTaps( flowDef.getTraps(), false, false ); 178 179 // are both sources and sinks 180 verifyTaps( flowDef.getCheckpoints(), true, false ); 181 verifyTaps( flowDef.getCheckpoints(), false, false ); 182 } 183 184 protected ElementGraph createElementGraph( FlowDef flowDef, Pipe[] flowTails ) 185 { 186 Map<String, Tap> sources = flowDef.getSourcesCopy(); 187 Map<String, Tap> sinks = flowDef.getSinksCopy(); 188 Map<String, Tap> traps = flowDef.getTrapsCopy(); 189 Map<String, Tap> checkpoints = flowDef.getCheckpointsCopy(); 190 191 AssertionLevel assertionLevel = flowDef.getAssertionLevel() == null ? this.assertionLevel : flowDef.getAssertionLevel(); 192 DebugLevel debugLevel = flowDef.getDebugLevel() == null ? this.debugLevel : flowDef.getDebugLevel(); 193 194 checkpointRootPath = makeCheckpointRootPath( flowDef ); 195 196 return new ElementGraph( getPlatformInfo(), flowTails, sources, sinks, traps, checkpoints, checkpointRootPath != null, assertionLevel, debugLevel ); 197 } 198 199 private String makeCheckpointRootPath( FlowDef flowDef ) 200 { 201 String flowName = flowDef.getName(); 202 String runID = flowDef.getRunID(); 203 204 if( runID == null ) 205 return null; 206 207 if( flowName == null ) 208 throw new PlannerException( "flow name is required when providing a run id" ); 209 210 return flowName + "/" + runID; 211 } 212 213 protected void verifySourceNotSinks( Map<String, Tap> sources, Map<String, Tap> sinks ) 214 { 215 Collection<Tap> sourcesSet = sources.values(); 216 217 for( Tap tap : sinks.values() ) 218 { 219 if( sourcesSet.contains( tap ) ) 220 throw new PlannerException( "tap may not be used as both source and sink in the same Flow: " + tap ); 221 } 222 } 223 224 /** 225 * Method verifyTaps ... 226 * 227 * @param taps of type Map<String, Tap> 228 * @param areSources of type boolean 229 * @param mayNotBeEmpty of type boolean 230 */ 231 protected void verifyTaps( Map<String, Tap> taps, boolean areSources, boolean mayNotBeEmpty ) 232 { 233 if( mayNotBeEmpty && taps.isEmpty() ) 234 throw new PlannerException( ( areSources ? "source" : "sink" ) + " taps are required" ); 235 236 for( String tapName : taps.keySet() ) 237 { 238 if( areSources && !taps.get( tapName ).isSource() ) 239 throw new PlannerException( "tap named: '" + tapName + "', cannot be used as a source: " + taps.get( tapName ) ); 240 else if( !areSources && !taps.get( tapName ).isSink() ) 241 throw new PlannerException( "tap named: '" + tapName + "', cannot be used as a sink: " + taps.get( tapName ) ); 242 } 243 } 244 245 /** 246 * Method verifyEndPoints verifies 247 * <p/> 248 * there aren't dupe names in heads or tails. 249 * all the sink and source tap names match up with tail and head pipes 250 */ 251 // todo: force dupe names to throw exceptions 252 protected void verifyPipeAssemblyEndPoints( FlowDef flowDef, Pipe[] flowTails ) 253 { 254 Set<String> tapNames = new HashSet<String>(); 255 256 tapNames.addAll( flowDef.getSources().keySet() ); 257 tapNames.addAll( flowDef.getSinks().keySet() ); 258 259 // handle tails 260 Set<Pipe> tails = new HashSet<Pipe>(); 261 Set<String> tailNames = new HashSet<String>(); 262 263 for( Pipe pipe : flowTails ) 264 { 265 if( pipe instanceof SubAssembly ) 266 { 267 for( Pipe tail : ( (SubAssembly) pipe ).getTails() ) 268 { 269 String tailName = tail.getName(); 270 271 if( !tapNames.contains( tailName ) ) 272 throw new PlannerException( tail, "pipe name not found in either sink or source map: '" + tailName + "'" ); 273 274 if( tailNames.contains( tailName ) && !tails.contains( tail ) ) 275 LOG.warn( "duplicate tail name found: '{}'", tailName ); 276 // throw new PlannerException( pipe, "duplicate tail name found: " + tailName ); 277 278 tailNames.add( tailName ); 279 tails.add( tail ); 280 } 281 } 282 else 283 { 284 String tailName = pipe.getName(); 285 286 if( !tapNames.contains( tailName ) ) 287 throw new PlannerException( pipe, "pipe name not found in either sink or source map: '" + tailName + "'" ); 288 289 if( tailNames.contains( tailName ) && !tails.contains( pipe ) ) 290 LOG.warn( "duplicate tail name found: '{}'", tailName ); 291 // throw new PlannerException( pipe, "duplicate tail name found: " + tailName ); 292 293 tailNames.add( tailName ); 294 tails.add( pipe ); 295 } 296 } 297 298 // Set<String> allTailNames = new HashSet<String>( tailNames ); 299 tailNames.removeAll( flowDef.getSinks().keySet() ); 300 Set<String> remainingSinks = new HashSet<String>( flowDef.getSinks().keySet() ); 301 remainingSinks.removeAll( tailNames ); 302 303 if( tailNames.size() != 0 ) 304 throw new PlannerException( "not all tail pipes bound to sink taps, remaining tail pipe names: [" + Util.join( Util.quote( tailNames, "'" ), ", " ) + "], remaining sink tap names: [" + Util.join( Util.quote( remainingSinks, "'" ), ", " ) + "]" ); 305 306 // unlike heads, pipes can input to another pipe and simultaneously be a sink 307 // so there is no way to know all the intentional tails, so they aren't listed below in the exception 308 remainingSinks = new HashSet<String>( flowDef.getSinks().keySet() ); 309 remainingSinks.removeAll( asList( Pipe.names( flowTails ) ) ); 310 311 if( remainingSinks.size() != 0 ) 312 throw new PlannerException( "not all sink taps bound to tail pipes, remaining sink tap names: [" + Util.join( Util.quote( remainingSinks, "'" ), ", " ) + "]" ); 313 314 // handle heads 315 Set<Pipe> heads = new HashSet<Pipe>(); 316 Set<String> headNames = new HashSet<String>(); 317 318 for( Pipe pipe : flowTails ) 319 { 320 for( Pipe head : pipe.getHeads() ) 321 { 322 String headName = head.getName(); 323 324 if( !tapNames.contains( headName ) ) 325 throw new PlannerException( head, "pipe name not found in either sink or source map: '" + headName + "'" ); 326 327 if( headNames.contains( headName ) && !heads.contains( head ) ) 328 LOG.warn( "duplicate head name found, not an error but heads should have unique names: '{}'", headName ); 329 // throw new PlannerException( pipe, "duplicate head name found: " + headName ); 330 331 headNames.add( headName ); 332 heads.add( head ); 333 } 334 } 335 336 Set<String> allHeadNames = new HashSet<String>( headNames ); 337 headNames.removeAll( flowDef.getSources().keySet() ); 338 Set<String> remainingSources = new HashSet<String>( flowDef.getSources().keySet() ); 339 remainingSources.removeAll( headNames ); 340 341 if( headNames.size() != 0 ) 342 throw new PlannerException( "not all head pipes bound to source taps, remaining head pipe names: [" + Util.join( Util.quote( headNames, "'" ), ", " ) + "], remaining source tap names: [" + Util.join( Util.quote( remainingSources, "'" ), ", " ) + "]" ); 343 344 remainingSources = new HashSet<String>( flowDef.getSources().keySet() ); 345 remainingSources.removeAll( allHeadNames ); 346 347 if( remainingSources.size() != 0 ) 348 throw new PlannerException( "not all source taps bound to head pipes, remaining source tap names: [" + Util.join( Util.quote( remainingSources, "'" ), ", " ) + "], remaining head pipe names: [" + Util.join( Util.quote( headNames, "'" ), ", " ) + "]" ); 349 350 } 351 352 protected void verifyTraps( FlowDef flowDef, Pipe[] flowTails ) 353 { 354 verifyNotSourcesSinks( flowDef.getTraps(), flowDef.getSources(), flowDef.getSinks(), "trap" ); 355 356 Set<String> names = new HashSet<String>( asList( Pipe.names( flowTails ) ) ); 357 358 for( String name : flowDef.getTraps().keySet() ) 359 { 360 if( !names.contains( name ) ) 361 throw new PlannerException( "trap name not found in assembly: '" + name + "'" ); 362 } 363 } 364 365 protected void verifyCheckpoints( FlowDef flowDef, Pipe[] flowTails ) 366 { 367 verifyNotSourcesSinks( flowDef.getCheckpoints(), flowDef.getSources(), flowDef.getSinks(), "checkpoint" ); 368 369 for( Tap checkpointTap : flowDef.getCheckpoints().values() ) 370 { 371 Scheme scheme = checkpointTap.getScheme(); 372 373 if( scheme.getSourceFields().equals( Fields.UNKNOWN ) && scheme.getSinkFields().equals( Fields.ALL ) ) 374 continue; 375 376 throw new PlannerException( "checkpoint tap scheme must be undeclared, source fields must be UNKNOWN, and sink fields ALL, got: " + scheme.toString() ); 377 } 378 379 Set<String> names = new HashSet<String>( asList( Pipe.names( flowTails ) ) ); 380 381 for( String name : flowDef.getCheckpoints().keySet() ) 382 { 383 if( !names.contains( name ) ) 384 throw new PlannerException( "checkpoint name not found in assembly: '" + name + "'" ); 385 386 Set<Pipe> pipes = new HashSet<Pipe>( asList( Pipe.named( name, flowTails ) ) ); 387 388 int count = 0; 389 390 for( Pipe pipe : pipes ) 391 { 392 if( pipe instanceof Checkpoint ) 393 count++; 394 } 395 396 if( count == 0 ) 397 throw new PlannerException( "no checkpoint with name found in assembly: '" + name + "'" ); 398 399 if( count > 1 ) 400 throw new PlannerException( "more than one checkpoint with name found in assembly: '" + name + "'" ); 401 } 402 } 403 404 private void verifyNotSourcesSinks( Map<String, Tap> taps, Map<String, Tap> sources, Map<String, Tap> sinks, String role ) 405 { 406 Collection<Tap> sourceTaps = sources.values(); 407 Collection<Tap> sinkTaps = sinks.values(); 408 409 for( Tap tap : taps.values() ) 410 { 411 if( sourceTaps.contains( tap ) ) 412 throw new PlannerException( "tap may not be used as both a " + role + " and a source in the same Flow: " + tap ); 413 414 if( sinkTaps.contains( tap ) ) 415 throw new PlannerException( "tap may not be used as both a " + role + " and a sink in the same Flow: " + tap ); 416 } 417 } 418 419 /** 420 * Verifies that there are not only GroupAssertions following any given Group instance. This will adversely 421 * affect the stream entering any subsequent Tap of Each instances. 422 */ 423 protected void failOnLoneGroupAssertion( ElementGraph elementGraph ) 424 { 425 List<Group> groups = elementGraph.findAllGroups(); 426 427 // walk Every instances after Group 428 for( Group group : groups ) 429 { 430 for( GraphPath<FlowElement, Scope> path : elementGraph.getAllShortestPathsFrom( group ) ) 431 { 432 List<FlowElement> flowElements = Graphs.getPathVertexList( path ); // last element is tail 433 434 int everies = 0; 435 int assertions = 0; 436 437 for( FlowElement flowElement : flowElements ) 438 { 439 if( flowElement instanceof Group ) 440 continue; 441 442 if( !( flowElement instanceof Every ) ) 443 break; 444 445 everies++; 446 447 Every every = (Every) flowElement; 448 449 if( every.getPlannerLevel() != null ) 450 assertions++; 451 } 452 453 if( everies != 0 && everies == assertions ) 454 throw new PlannerException( "group assertions must be accompanied by aggregator operations" ); 455 } 456 } 457 } 458 459 protected void failOnMissingGroup( ElementGraph elementGraph ) 460 { 461 List<Every> everies = elementGraph.findAllEveries(); 462 463 // walk Every instances after Group 464 for( Every every : everies ) 465 { 466 for( GraphPath<FlowElement, Scope> path : elementGraph.getAllShortestPathsTo( every ) ) 467 { 468 List<FlowElement> flowElements = Graphs.getPathVertexList( path ); // last element is every 469 Collections.reverse( flowElements ); // first element is every 470 471 for( FlowElement flowElement : flowElements ) 472 { 473 if( flowElement instanceof Every || flowElement.getClass() == Pipe.class ) 474 continue; 475 476 if( flowElement instanceof GroupBy || flowElement instanceof CoGroup ) 477 break; 478 479 throw new PlannerException( (Pipe) flowElement, "Every may only be preceded by another Every or a Group pipe, found: " + flowElement ); 480 } 481 } 482 } 483 } 484 485 protected void failOnMisusedBuffer( ElementGraph elementGraph ) 486 { 487 List<Every> everies = elementGraph.findAllEveries(); 488 489 // walk Every instances after Group 490 for( Every every : everies ) 491 { 492 for( GraphPath<FlowElement, Scope> path : elementGraph.getAllShortestPathsTo( every ) ) 493 { 494 List<FlowElement> flowElements = Graphs.getPathVertexList( path ); // last element is every 495 Collections.reverse( flowElements ); // first element is every 496 497 Every last = null; 498 boolean foundBuffer = false; 499 int foundEveries = -1; 500 501 for( FlowElement flowElement : flowElements ) 502 { 503 if( flowElement instanceof Each ) 504 throw new PlannerException( (Pipe) flowElement, "Every may only be preceded by another Every or a GroupBy or CoGroup pipe, found: " + flowElement ); 505 506 if( flowElement instanceof Every ) 507 { 508 foundEveries++; 509 510 boolean isBuffer = ( (Every) flowElement ).isBuffer(); 511 512 if( foundEveries != 0 && ( isBuffer || foundBuffer ) ) 513 throw new PlannerException( (Pipe) flowElement, "Only one Every with a Buffer may follow a GroupBy or CoGroup pipe, no other Every instances are allowed immediately before or after, found: " + flowElement + " before: " + last ); 514 515 if( !foundBuffer ) 516 foundBuffer = isBuffer; 517 518 last = (Every) flowElement; 519 } 520 521 if( flowElement instanceof Group ) 522 break; 523 } 524 } 525 } 526 } 527 528 protected void failOnGroupEverySplit( ElementGraph elementGraph ) 529 { 530 List<Group> groups = new ArrayList<Group>(); 531 532 elementGraph.findAllOfType( 1, 2, Group.class, groups ); 533 534 for( Group group : groups ) 535 { 536 Set<FlowElement> children = elementGraph.getAllChildrenNotExactlyType( group, Pipe.class ); 537 538 for( FlowElement flowElement : children ) 539 { 540 if( flowElement instanceof Every ) 541 throw new PlannerException( (Every) flowElement, "Every instances may not split after a GroupBy or CoGroup pipe, found: " + flowElement + " after: " + group ); 542 } 543 } 544 } 545 546 protected PlannerException handleExceptionDuringPlanning( Exception exception, ElementGraph elementGraph ) 547 { 548 if( exception instanceof PlannerException ) 549 { 550 ( (PlannerException) exception ).elementGraph = elementGraph; 551 552 return (PlannerException) exception; 553 } 554 else if( exception instanceof ElementGraphException ) 555 { 556 Throwable cause = exception.getCause(); 557 558 if( cause == null ) 559 cause = exception; 560 561 // captures pipegraph for debugging 562 // forward message in case cause or trace is lost 563 String message = String.format( "could not build flow from assembly: [%s]", cause.getMessage() ); 564 565 if( cause instanceof OperatorException ) 566 return new PlannerException( message, cause, elementGraph ); 567 568 if( cause instanceof TapException ) 569 return new PlannerException( message, cause, elementGraph ); 570 571 return new PlannerException( ( (ElementGraphException) exception ).getPipe(), message, cause, elementGraph ); 572 } 573 else 574 { 575 // captures pipegraph for debugging 576 // forward message in case cause or trace is lost 577 String message = String.format( "could not build flow from assembly: [%s]", exception.getMessage() ); 578 return new PlannerException( message, exception, elementGraph ); 579 } 580 } 581 582 protected void handleNonSafeOperations( ElementGraph elementGraph ) 583 { 584 // if there was a graph change, iterate paths again. 585 while( !internalNonSafeOperations( elementGraph ) ) 586 ; 587 } 588 589 private boolean internalNonSafeOperations( ElementGraph elementGraph ) 590 { 591 Set<Pipe> tapInsertions = new HashSet<Pipe>(); 592 593 List<Pipe> splits = elementGraph.findAllPipeSplits(); 594 595 // if any predecessor is unsafe, insert temp 596 for( Pipe split : splits ) 597 { 598 List<GraphPath<FlowElement, Scope>> paths = elementGraph.getAllShortestPathsTo( split ); 599 600 for( GraphPath<FlowElement, Scope> path : paths ) 601 { 602 List<FlowElement> elements = Graphs.getPathVertexList( path ); 603 Collections.reverse( elements ); 604 605 for( FlowElement element : elements ) 606 { 607 if( !( element instanceof Each ) && element.getClass() != Pipe.class ) 608 break; 609 610 if( element.getClass() == Pipe.class ) 611 continue; 612 613 if( !( (Each) element ).getOperation().isSafe() ) 614 { 615 tapInsertions.add( split ); 616 break; 617 } 618 } 619 } 620 } 621 622 for( Pipe pipe : tapInsertions ) 623 insertTempTapAfter( elementGraph, pipe ); 624 625 return tapInsertions.isEmpty(); 626 } 627 628 /** 629 * Method insertTapAfter ... 630 * 631 * @param graph of type PipeGraph 632 * @param pipe of type Pipe 633 */ 634 protected void insertTempTapAfter( ElementGraph graph, Pipe pipe ) 635 { 636 LOG.debug( "inserting tap after: {}", pipe ); 637 638 Tap checkpointTap = graph.getCheckpointsMap().get( pipe.getName() ); 639 640 if( checkpointTap != null ) 641 { 642 LOG.info( "found checkpoint: {}, using tap: {}", pipe.getName(), checkpointTap ); 643 checkpointTap = decorateTap( pipe, checkpointTap, FlowConnectorProps.CHECKPOINT_TAP_DECORATOR_CLASS ); 644 } 645 646 if( checkpointTap == null ) 647 { 648 // only restart from a checkpoint pipe or checkpoint tap below 649 if( pipe instanceof Checkpoint ) 650 { 651 checkpointTap = makeTempTap( checkpointRootPath, pipe.getName() ); 652 checkpointTap = decorateTap( pipe, checkpointTap, FlowConnectorProps.CHECKPOINT_TAP_DECORATOR_CLASS ); 653 // mark as an anonymous checkpoint 654 checkpointTap.getConfigDef().setProperty( ConfigDef.Mode.DEFAULT, "cascading.checkpoint", "true" ); 655 } 656 else 657 { 658 checkpointTap = makeTempTap( pipe.getName() ); 659 } 660 } 661 662 checkpointTap = decorateTap( pipe, checkpointTap, FlowConnectorProps.TEMPORARY_TAP_DECORATOR_CLASS ); 663 664 graph.insertFlowElementAfter( pipe, checkpointTap ); 665 } 666 667 private Tap decorateTap( Pipe pipe, Tap tempTap, String decoratorClass ) 668 { 669 String decoratorClassName = PropertyUtil.getProperty( properties, pipe, decoratorClass ); 670 671 if( Util.isEmpty( decoratorClassName ) ) 672 return tempTap; 673 674 LOG.info( "found decorator: {}, wrapping tap: {}", decoratorClass, tempTap ); 675 676 tempTap = Util.newInstance( decoratorClassName, tempTap ); 677 678 return tempTap; 679 } 680 681 protected Tap makeTempTap( String name ) 682 { 683 return makeTempTap( null, name ); 684 } 685 686 protected abstract Tap makeTempTap( String prefix, String name ); 687 688 /** 689 * Inserts a temporary Tap between logical MR jobs. 690 * <p/> 691 * Since all joins are at groups or splices, depth first search is safe 692 * <p/> 693 * todo: refactor so that rules are applied to path segments bounded by taps 694 * todo: this would allow balancing of operations within paths instead of pushing 695 * todo: all operations up. may allow for consolidation of rules 696 * 697 * @param elementGraph of type PipeGraph 698 */ 699 protected void handleJobPartitioning( ElementGraph elementGraph ) 700 { 701 // if there was a graph change, iterate paths again. prevents many temp taps from being inserted in front of a group 702 while( !internalJobPartitioning( elementGraph ) ) 703 ; 704 } 705 706 private boolean internalJobPartitioning( ElementGraph elementGraph ) 707 { 708 for( GraphPath<FlowElement, Scope> path : elementGraph.getAllShortestPathsBetweenExtents() ) 709 { 710 List<FlowElement> flowElements = Graphs.getPathVertexList( path ); 711 List<Pipe> tapInsertions = new ArrayList<Pipe>(); 712 713 boolean foundGroup = false; 714 715 for( int i = 0; i < flowElements.size(); i++ ) 716 { 717 FlowElement flowElement = flowElements.get( i ); 718 719 if( flowElement instanceof ElementGraph.Extent ) // is an extent: head or tail 720 continue; 721 else if( flowElement instanceof Tap && flowElements.get( i - 1 ) instanceof ElementGraph.Extent ) // is a source tap 722 continue; 723 724 if( flowElement instanceof Group && !foundGroup ) 725 { 726 foundGroup = true; 727 } 728 else if( flowElement instanceof Splice && foundGroup ) // add tap between groups, push joins/merge map side 729 { 730 tapInsertions.add( (Pipe) flowElements.get( i - 1 ) ); 731 732 if( !( flowElement instanceof Group ) ) 733 foundGroup = false; 734 } 735 else if( flowElement instanceof Checkpoint ) // add tap after checkpoint 736 { 737 if( flowElements.get( i + 1 ) instanceof Tap ) // don't keep inserting 738 continue; 739 740 tapInsertions.add( (Pipe) flowElement ); 741 foundGroup = false; 742 } 743 else if( flowElement instanceof Tap ) 744 { 745 foundGroup = false; 746 } 747 } 748 749 for( Pipe pipe : tapInsertions ) 750 insertTempTapAfter( elementGraph, pipe ); 751 752 if( !tapInsertions.isEmpty() ) 753 return false; 754 } 755 756 return true; 757 } 758 759 /** 760 * Prevent leftmost sources from sourcing a downstream join on the rightmost side intra-task by inserting a 761 * temp tap between the left-sourced join and right-sourced join. 762 * 763 * @param elementGraph 764 */ 765 protected void handleJoins( ElementGraph elementGraph ) 766 { 767 while( !internalJoins( elementGraph ) ) 768 ; 769 } 770 771 private boolean internalJoins( ElementGraph elementGraph ) 772 { 773 List<GraphPath<FlowElement, Scope>> paths = elementGraph.getAllShortestPathsBetweenExtents(); 774 775 // large to small 776 Collections.reverse( paths ); 777 778 for( GraphPath<FlowElement, Scope> path : paths ) 779 { 780 List<FlowElement> flowElements = Graphs.getPathVertexList( path ); 781 List<Pipe> tapInsertions = new ArrayList<Pipe>(); 782 List<HashJoin> joins = new ArrayList<HashJoin>(); 783 List<Merge> merges = new ArrayList<Merge>(); 784 785 FlowElement lastSourceElement = null; 786 787 for( int i = 0; i < flowElements.size(); i++ ) 788 { 789 FlowElement flowElement = flowElements.get( i ); 790 791 if( flowElement instanceof Merge ) 792 { 793 merges.add( (Merge) flowElement ); 794 } 795 else if( flowElement instanceof HashJoin ) 796 { 797 HashJoin join = (HashJoin) flowElement; 798 799 Map<Integer, Integer> pathCounts = countOrderedDirectPathsBetween( elementGraph, lastSourceElement, join, true ); 800 801 // is this path streamed 802 int pathPosition = pathPositionInto( path, join ); 803 boolean thisPathIsStreamed = pathPosition == 0; 804 805 boolean isAccumulatedAndStreamed = isBothAccumulatedAndStreamedPath( pathCounts ); // has streamed and accumulated paths 806 int pathCount = countPaths( pathCounts ); 807 808 int priorJoins = countTypesBetween( elementGraph, lastSourceElement, join, HashJoin.class ); 809 810 if( priorJoins == 0 ) 811 { 812 // if same source is leading into the hashjoin, insert tap on the accumulated side 813 if( pathCount == 2 && isAccumulatedAndStreamed && !thisPathIsStreamed ) 814 { 815 tapInsertions.add( (Pipe) flowElements.get( flowElements.indexOf( join ) - 1 ) ); 816 break; 817 } 818 819 // if more than one path into streamed and accumulated branches, insert tap on streamed side 820 if( pathCount > 2 && isAccumulatedAndStreamed && thisPathIsStreamed ) 821 { 822 tapInsertions.add( (Pipe) flowElements.get( flowElements.indexOf( join ) - 1 ) ); 823 break; 824 } 825 } 826 827 if( !merges.isEmpty() ) 828 { 829 // if a Merge is prior to a HashJoin, and its an accumulated path, force Merge results to disk 830 int joinPos = flowElements.indexOf( join ); 831 int mergePos = nearest( flowElements, joinPos, merges ); 832 833 if( mergePos != -1 && joinPos > mergePos ) 834 { 835 // if all paths are accumulated and streamed, insert 836 // else if just if this path is accumulated 837 if( ( isAccumulatedAndStreamed && thisPathIsStreamed ) || !thisPathIsStreamed ) 838 { 839 tapInsertions.add( (Pipe) flowElements.get( flowElements.indexOf( join ) - 1 ) ); 840 break; 841 } 842 } 843 } 844 845 joins.add( (HashJoin) flowElement ); 846 } 847 else if( flowElement instanceof Tap || flowElement instanceof Group ) 848 { 849 // added for JoinFieldedPipesPlatformTest.testJoinMergeGroupBy where Merge hides streamed nature of path 850 if( flowElement instanceof Group && !joins.isEmpty() ) 851 { 852 List<Splice> splices = new ArrayList<Splice>(); 853 854 splices.addAll( merges ); 855 splices.add( (Splice) flowElement ); 856 857 Collections.reverse( splices ); 858 859 for( Splice splice : splices ) 860 { 861 Map<Integer, Integer> pathCounts = countOrderedDirectPathsBetween( elementGraph, lastSourceElement, splice, true ); 862 863 if( isBothAccumulatedAndStreamedPath( pathCounts ) ) 864 { 865 tapInsertions.add( (Pipe) flowElements.get( flowElements.indexOf( splice ) - 1 ) ); 866 break; 867 } 868 } 869 870 if( !tapInsertions.isEmpty() ) 871 break; 872 } 873 874 for( int j = 0; j < joins.size(); j++ ) 875 { 876 HashJoin join = joins.get( j ); 877 878 int pathPosition = pathPositionInto( path, join ); 879 boolean thisPathIsStreamed = pathPosition == 0; 880 881 Map<Integer, Integer> pathCounts = countOrderedDirectPathsBetween( elementGraph, lastSourceElement, join, true ); 882 883 boolean isAccumulatedAndStreamed = isBothAccumulatedAndStreamedPath( pathCounts ); // has streamed and accumulated paths 884 int pathCount = countPaths( pathCounts ); 885 886 if( pathCount >= 2 && isAccumulatedAndStreamed && thisPathIsStreamed ) 887 { 888 tapInsertions.add( (Pipe) flowElements.get( flowElements.indexOf( join ) - 1 ) ); 889 break; 890 } 891 892 if( thisPathIsStreamed ) 893 continue; 894 895 if( j == 0 ) // is accumulated on first join 896 break; 897 898 // prevent a streamed path from being accumulated by injecting a tap before the 899 // current HashJoin 900 tapInsertions.add( (Pipe) flowElements.get( flowElements.indexOf( join ) - 1 ) ); 901 break; 902 } 903 904 if( !tapInsertions.isEmpty() ) 905 break; 906 907 lastSourceElement = flowElement; 908 merges.clear(); 909 joins.clear(); 910 } 911 } 912 913 for( Pipe pipe : tapInsertions ) 914 insertTempTapAfter( elementGraph, pipe ); 915 916 if( !tapInsertions.isEmpty() ) 917 return false; 918 } 919 920 return true; 921 } 922 923 private int nearest( List<FlowElement> flowElements, int index, List<Merge> merges ) 924 { 925 List<Merge> reversed = new ArrayList<Merge>( merges ); 926 Collections.reverse( reversed ); 927 928 for( Merge merge : reversed ) 929 { 930 int pos = flowElements.indexOf( merge ); 931 if( pos < index ) 932 return pos; 933 } 934 935 return -1; 936 } 937 }