001/* 002 * Copyright (c) 2016-2017 Chris K Wensel. All Rights Reserved. 003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 004 * 005 * Project and contact information: http://www.cascading.org/ 006 * 007 * This file is part of the Cascading project. 008 * 009 * Licensed under the Apache License, Version 2.0 (the "License"); 010 * you may not use this file except in compliance with the License. 011 * You may obtain a copy of the License at 012 * 013 * http://www.apache.org/licenses/LICENSE-2.0 014 * 015 * Unless required by applicable law or agreed to in writing, software 016 * distributed under the License is distributed on an "AS IS" BASIS, 017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 018 * See the License for the specific language governing permissions and 019 * limitations under the License. 020 */ 021 022package cascading.flow.planner; 023 024import java.io.IOException; 025import java.io.Serializable; 026import java.lang.reflect.Type; 027import java.util.ArrayList; 028import java.util.Collection; 029import java.util.Collections; 030import java.util.Date; 031import java.util.HashMap; 032import java.util.HashSet; 033import java.util.Iterator; 034import java.util.LinkedList; 035import java.util.List; 036import java.util.Map; 037import java.util.Objects; 038import java.util.Set; 039import java.util.stream.Collectors; 040 041import cascading.flow.Flow; 042import cascading.flow.FlowElement; 043import cascading.flow.FlowException; 044import cascading.flow.FlowNode; 045import cascading.flow.FlowProcess; 046import cascading.flow.FlowStep; 047import cascading.flow.FlowStepListener; 048import cascading.flow.planner.graph.AnnotatedGraph; 049import cascading.flow.planner.graph.ElementGraph; 050import cascading.flow.planner.graph.ElementGraphs; 051import cascading.flow.planner.process.FlowNodeGraph; 052import cascading.flow.stream.annotations.StreamMode; 053import cascading.management.CascadingServices; 054import cascading.management.state.ClientState; 055import cascading.operation.Operation; 056import cascading.pipe.Group; 057import cascading.pipe.Operator; 058import cascading.pipe.Pipe; 059import cascading.pipe.Splice; 060import cascading.pipe.SubAssembly; 061import cascading.property.ConfigDef; 062import cascading.stats.FlowStepStats; 063import cascading.tap.Tap; 064import cascading.tuple.Fields; 065import cascading.tuple.type.SerializableType; 066import cascading.util.EnumMultiMap; 067import cascading.util.ProcessLogger; 068import cascading.util.Util; 069 070import static cascading.flow.planner.graph.ElementGraphs.findAllGroups; 071import static cascading.util.Util.narrowIdentitySet; 072 073/** 074 * Class FlowStep is an internal representation of a given Job to be executed on a remote cluster. During 075 * planning, pipe assemblies are broken down into "steps" and encapsulated in this class. 076 * <p> 077 * FlowSteps are submitted in order of dependency. If two or more steps do not share the same dependencies and all 078 * can be scheduled simultaneously, the {@link #getSubmitPriority()} value determines the order in which 079 * all steps will be submitted for execution. The default submit priority is 5. 080 * <p> 081 * This class is for internal use, there are no stable public methods. 082 */ 083public abstract class BaseFlowStep<Config> implements FlowStep<Config>, ProcessLogger, Serializable 084 { 085 /** Field flow */ 086 private transient Flow<Config> flow; 087 /** Field flowName */ 088 private String flowName; 089 /** Field flowID */ 090 private String flowID; 091 092 private transient Config flowStepConf; 093 094 /** Field submitPriority */ 095 private int submitPriority = 5; 096 097 /** Field name */ 098 String name; 099 private String id; 100 private int ordinal; 101 private Map<String, String> processAnnotations; 102 103 /** Field step listeners */ 104 private List<SafeFlowStepListener> listeners; 105 106 /** Field elementGraph */ 107 protected ElementGraph elementGraph; 108 /** Field flowNodeGraph */ 109 protected FlowNodeGraph flowNodeGraph; 110 111 /** Field sources */ 112 protected final Map<Tap, Set<String>> sources = new HashMap<>(); // all sources 113 /** Field sink */ 114 protected final Map<Tap, Set<String>> sinks = new HashMap<>(); // all sinks 115 /** Field traps */ 116 private final Map<String, Tap> traps = new HashMap<>(); 117 118 /** Field tempSink */ 119 protected Tap tempSink; // used if we need to bypass the filesystem 120 121 /** Field groups */ 122 private final List<Group> groups = new ArrayList<Group>(); 123 124 protected transient FlowStepStats flowStepStats; 125 126 private transient FlowStepJob<Config> flowStepJob; 127 128 /** optional metadata about the FlowStep */ 129 private Map<String, String> flowStepDescriptor = Collections.emptyMap(); 130 131 protected BaseFlowStep( String name, int ordinal ) 132 { 133 this( name, ordinal, null ); 134 } 135 136 protected BaseFlowStep( String name, int ordinal, Map<String, String> flowStepDescriptor ) 137 { 138 this( name, ordinal, null, flowStepDescriptor ); 139 } 140 141 protected BaseFlowStep( String name, int ordinal, FlowNodeGraph flowNodeGraph, Map<String, String> flowStepDescriptor ) 142 { 143 this(); 144 setName( name ); 145 this.ordinal = ordinal; 146 147 this.elementGraph = null; 148 this.flowNodeGraph = flowNodeGraph; 149 150 setFlowStepDescriptor( flowStepDescriptor ); 151 } 152 153 protected BaseFlowStep( ElementGraph elementStepGraph, FlowNodeGraph flowNodeGraph ) 154 { 155 this( elementStepGraph, flowNodeGraph, null ); 156 } 157 158 protected BaseFlowStep( ElementGraph elementStepGraph, FlowNodeGraph flowNodeGraph, Map<String, String> flowStepDescriptor ) 159 { 160 this(); 161 this.elementGraph = elementStepGraph; 162 this.flowNodeGraph = flowNodeGraph; // TODO: verify no missing elements in the union of the node graphs 163 164 setFlowStepDescriptor( flowStepDescriptor ); 165 166 configure(); 167 } 168 169 protected BaseFlowStep() 170 { 171 this.id = Util.createUniqueIDWhichStartsWithAChar(); // timeline server cannot filter strings that start with a number 172 } 173 174 protected void configure() 175 { 176 addSources( this, getElementGraph(), getFlowNodeGraph().getSourceTaps() ); 177 addSinks( this, getElementGraph(), getFlowNodeGraph().getSinkTaps() ); 178 179 addAllGroups(); 180 181 traps.putAll( getFlowNodeGraph().getTrapsMap() ); 182 } 183 184 protected void addAllGroups() 185 { 186 addGroups( findAllGroups( getElementGraph() ) ); 187 } 188 189 @Override 190 public String getID() 191 { 192 return id; 193 } 194 195 public void setOrdinal( int ordinal ) 196 { 197 this.ordinal = ordinal; 198 } 199 200 @Override 201 public int getOrdinal() 202 { 203 return ordinal; 204 } 205 206 @Override 207 public String getName() 208 { 209 return name; 210 } 211 212 public void setName( String name ) 213 { 214 if( name == null || name.isEmpty() ) 215 throw new IllegalArgumentException( "step name may not be null or empty" ); 216 217 this.name = name; 218 } 219 220 @Override 221 public Map<String, String> getFlowStepDescriptor() 222 { 223 return Collections.unmodifiableMap( flowStepDescriptor ); 224 } 225 226 protected void setFlowStepDescriptor( Map<String, String> flowStepDescriptor ) 227 { 228 if( flowStepDescriptor != null ) 229 this.flowStepDescriptor = flowStepDescriptor; 230 } 231 232 @Override 233 public Map<String, String> getProcessAnnotations() 234 { 235 if( processAnnotations == null ) 236 return Collections.emptyMap(); 237 238 return Collections.unmodifiableMap( processAnnotations ); 239 } 240 241 @Override 242 public void addProcessAnnotation( Enum annotation ) 243 { 244 if( annotation == null ) 245 return; 246 247 addProcessAnnotation( annotation.getDeclaringClass().getName(), annotation.name() ); 248 } 249 250 @Override 251 public void addProcessAnnotation( String key, String value ) 252 { 253 if( processAnnotations == null ) 254 processAnnotations = new HashMap<>(); 255 256 processAnnotations.put( key, value ); 257 } 258 259 public void setFlow( Flow<Config> flow ) 260 { 261 this.flow = flow; 262 this.flowID = flow.getID(); 263 this.flowName = flow.getName(); 264 } 265 266 @Override 267 public Flow<Config> getFlow() 268 { 269 return flow; 270 } 271 272 @Override 273 public String getFlowID() 274 { 275 return flowID; 276 } 277 278 @Override 279 public String getFlowName() 280 { 281 return flowName; 282 } 283 284 protected void setFlowName( String flowName ) 285 { 286 this.flowName = flowName; 287 } 288 289 @Override 290 public Config getConfig() 291 { 292 return flowStepConf; 293 } 294 295 @Override 296 public Map<Object, Object> getConfigAsProperties() 297 { 298 return Collections.emptyMap(); 299 } 300 301 /** 302 * Set the initialized flowStepConf Config instance 303 * 304 * @param flowStepConf of type Config 305 */ 306 protected void setConfig( Config flowStepConf ) 307 { 308 this.flowStepConf = flowStepConf; 309 } 310 311 @Override 312 public String getStepDisplayName() 313 { 314 return getStepDisplayName( Util.ID_LENGTH ); 315 } 316 317 protected String getStepDisplayName( int idLength ) 318 { 319 if( idLength < 0 || idLength > Util.ID_LENGTH ) 320 idLength = Util.ID_LENGTH; 321 322 if( idLength == 0 ) 323 return String.format( "%s/%s", getFlowName(), getName() ); 324 325 String flowID = getFlowID().substring( 0, idLength ); 326 String stepID = getID().substring( 0, idLength ); 327 328 return String.format( "[%s/%s] %s/%s", flowID, stepID, getFlowName(), getName() ); 329 } 330 331 protected String getNodeDisplayName( FlowNode flowNode, int idLength ) 332 { 333 if( idLength > Util.ID_LENGTH ) 334 idLength = Util.ID_LENGTH; 335 336 String flowID = getFlowID().substring( 0, idLength ); 337 String stepID = getID().substring( 0, idLength ); 338 String nodeID = flowNode.getID().substring( 0, idLength ); 339 340 return String.format( "[%s/%s/%s] %s/%s", flowID, stepID, nodeID, getFlowName(), getName() ); 341 } 342 343 @Override 344 public int getSubmitPriority() 345 { 346 return submitPriority; 347 } 348 349 @Override 350 public void setSubmitPriority( int submitPriority ) 351 { 352 if( submitPriority < 1 || submitPriority > 10 ) 353 throw new IllegalArgumentException( "submitPriority must be between 1 and 10 inclusive, was: " + submitPriority ); 354 355 this.submitPriority = submitPriority; 356 } 357 358 @Override 359 public void setFlowStepStats( FlowStepStats flowStepStats ) 360 { 361 this.flowStepStats = flowStepStats; 362 } 363 364 @Override 365 public FlowStepStats getFlowStepStats() 366 { 367 return flowStepStats; 368 } 369 370 @Override 371 public ElementGraph getElementGraph() 372 { 373 return elementGraph; 374 } 375 376 protected EnumMultiMap getAnnotations() 377 { 378 return ( (AnnotatedGraph) elementGraph ).getAnnotations(); 379 } 380 381 @Override 382 public FlowNodeGraph getFlowNodeGraph() 383 { 384 return flowNodeGraph; 385 } 386 387 @Override 388 public int getNumFlowNodes() 389 { 390 return flowNodeGraph.vertexSet().size(); 391 } 392 393 public Set<FlowElement> getSourceElements() 394 { 395 return ElementGraphs.findSources( getElementGraph(), FlowElement.class ); 396 } 397 398 public Set<FlowElement> getSinkElements() 399 { 400 return ElementGraphs.findSinks( getElementGraph(), FlowElement.class ); 401 } 402 403 @Override 404 public Group getGroup() 405 { 406 if( groups.isEmpty() ) 407 return null; 408 409 if( groups.size() > 1 ) 410 throw new IllegalStateException( "more than one group" ); 411 412 return groups.get( 0 ); 413 } 414 415 @Override 416 public Collection<Group> getGroups() 417 { 418 return groups; 419 } 420 421 public void addGroups( Collection<Group> groups ) 422 { 423 for( Group group : groups ) 424 addGroup( group ); 425 } 426 427 public void addGroup( Group group ) 428 { 429 if( !groups.contains( group ) ) 430 groups.add( group ); 431 } 432 433 /** 434 * Returns all source Tap instances annotated by the planner as being {@link StreamMode#Accumulated}. 435 * 436 * @return Set of accumulated Tap instances 437 */ 438 public Set<Tap> getAllAccumulatedSources() 439 { 440 return narrowIdentitySet( Tap.class, getFlowNodeGraph().getFlowElementsFor( StreamMode.Accumulated ) ); 441 } 442 443 /** 444 * Returns all source Tap instances annotated by the planner as being {@link StreamMode#Streamed}. 445 * 446 * @return Set of streamed Tap instances 447 */ 448 public Set<Tap> getAllStreamedSources() 449 { 450 return narrowIdentitySet( Tap.class, getFlowNodeGraph().getFlowElementsFor( StreamMode.Streamed ) ); 451 } 452 453 public void addSource( String name, Tap source ) 454 { 455 if( !sources.containsKey( source ) ) 456 sources.put( source, new HashSet<String>() ); 457 458 sources.get( source ).add( name ); 459 } 460 461 public void addSink( String name, Tap sink ) 462 { 463 if( !sinks.containsKey( sink ) ) 464 sinks.put( sink, new HashSet<String>() ); 465 466 sinks.get( sink ).add( name ); 467 } 468 469 @Override 470 public Set<Tap> getSourceTaps() 471 { 472 return Collections.unmodifiableSet( new HashSet<Tap>( sources.keySet() ) ); 473 } 474 475 @Override 476 public Set<Tap> getSinkTaps() 477 { 478 return Collections.unmodifiableSet( new HashSet<Tap>( sinks.keySet() ) ); 479 } 480 481 @Override 482 public Tap getSink() 483 { 484 if( sinks.size() == 0 ) 485 return null; 486 487 if( sinks.size() > 1 ) 488 throw new IllegalStateException( "more than one sink" ); 489 490 return sinks.keySet().iterator().next(); 491 } 492 493 @Override 494 public Set<String> getSourceName( Tap source ) 495 { 496 return Collections.unmodifiableSet( sources.get( source ) ); 497 } 498 499 @Override 500 public Set<String> getSinkName( Tap sink ) 501 { 502 return Collections.unmodifiableSet( sinks.get( sink ) ); 503 } 504 505 @Override 506 public Tap getSourceWith( String identifier ) 507 { 508 if( Util.isEmpty( identifier ) ) 509 return null; 510 511 for( Tap tap : sources.keySet() ) 512 { 513 if( identifier.equalsIgnoreCase( tap.getIdentifier() ) ) 514 return tap; 515 } 516 517 return null; 518 } 519 520 @Override 521 public Tap getSinkWith( String identifier ) 522 { 523 if( Util.isEmpty( identifier ) ) 524 return null; 525 526 for( Tap tap : sinks.keySet() ) 527 { 528 if( identifier.equalsIgnoreCase( tap.getIdentifier() ) ) 529 return tap; 530 } 531 532 return null; 533 } 534 535 @Override 536 public Map<String, Tap> getTrapMap() 537 { 538 return traps; 539 } 540 541 @Override 542 public Set<Tap> getTraps() 543 { 544 return Collections.unmodifiableSet( new HashSet<Tap>( traps.values() ) ); 545 } 546 547 public Tap getTrap( String name ) 548 { 549 return getTrapMap().get( name ); 550 } 551 552 boolean allSourcesExist() throws IOException 553 { 554 for( Tap tap : sources.keySet() ) 555 { 556 if( !tap.resourceExists( getConfig() ) ) 557 return false; 558 } 559 560 return true; 561 } 562 563 boolean areSourcesNewer( long sinkModified ) throws IOException 564 { 565 Config config = getConfig(); 566 Iterator<Tap> values = sources.keySet().iterator(); 567 568 long sourceModified = 0; 569 570 try 571 { 572 sourceModified = Util.getSourceModified( config, values, sinkModified ); 573 574 if( sinkModified < sourceModified ) 575 return true; 576 577 return false; 578 } 579 finally 580 { 581 if( isInfoEnabled() ) 582 logInfo( "source modification date at: " + new Date( sourceModified ) ); // not oldest, we didnt check them all 583 } 584 } 585 586 long getSinkModified() throws IOException 587 { 588 long sinkModified = Util.getSinkModified( getConfig(), sinks.keySet() ); 589 590 if( isInfoEnabled() ) 591 { 592 if( sinkModified == -1L ) 593 logInfo( "at least one sink is marked for delete" ); 594 if( sinkModified == 0L ) 595 logInfo( "at least one sink does not exist" ); 596 else 597 logInfo( "sink oldest modified date: " + new Date( sinkModified ) ); 598 } 599 600 return sinkModified; 601 } 602 603 protected Throwable prepareResources() 604 { 605 Throwable throwable = prepareResources( getSourceTaps(), false ); 606 607 if( throwable == null ) 608 throwable = prepareResources( getSinkTaps(), true ); 609 610 if( throwable == null ) 611 throwable = prepareResources( getTraps(), true ); 612 613 return throwable; 614 } 615 616 private Throwable prepareResources( Collection<Tap> taps, boolean forWrite ) 617 { 618 Throwable throwable = null; 619 620 for( Tap tap : taps ) 621 { 622 throwable = prepareResource( tap, forWrite ); 623 624 if( throwable != null ) 625 break; 626 } 627 628 return throwable; 629 } 630 631 private Throwable prepareResource( Tap tap, boolean forWrite ) 632 { 633 Throwable throwable = null; 634 635 try 636 { 637 boolean result; 638 639 if( forWrite ) 640 result = tap.prepareResourceForWrite( getConfig() ); 641 else 642 result = tap.prepareResourceForRead( getConfig() ); 643 644 if( !result ) 645 { 646 String message = String.format( "unable to prepare tap for %s: %s", forWrite ? "write" : "read", tap.getFullIdentifier( getConfig() ) ); 647 648 logError( message ); 649 650 throwable = new FlowException( message ); 651 } 652 } 653 catch( Throwable exception ) 654 { 655 String message = String.format( "unable to prepare tap for %s: %s", forWrite ? "write" : "read", tap.getFullIdentifier( getConfig() ) ); 656 657 logError( message, exception ); 658 659 throwable = new FlowException( message, exception ); 660 } 661 662 return throwable; 663 } 664 665 protected Throwable commitSinks() 666 { 667 Throwable throwable = null; 668 669 for( Tap tap : sinks.keySet() ) 670 { 671 if( throwable != null ) 672 rollbackResource( tap ); 673 else 674 throwable = commitResource( tap ); 675 } 676 677 return throwable; 678 } 679 680 private Throwable commitResource( Tap tap ) 681 { 682 Throwable throwable = null; 683 684 try 685 { 686 if( !tap.commitResource( getConfig() ) ) 687 { 688 String message = "unable to commit sink: " + tap.getFullIdentifier( getConfig() ); 689 690 logError( message ); 691 692 throwable = new FlowException( message ); 693 } 694 } 695 catch( Throwable exception ) 696 { 697 String message = "unable to commit sink: " + tap.getFullIdentifier( getConfig() ); 698 699 logError( message, exception ); 700 701 throwable = new FlowException( message, exception ); 702 } 703 704 return throwable; 705 } 706 707 private Throwable rollbackResource( Tap tap ) 708 { 709 Throwable throwable = null; 710 711 try 712 { 713 if( !tap.rollbackResource( getConfig() ) ) 714 { 715 String message = "unable to rollback sink: " + tap.getFullIdentifier( getConfig() ); 716 717 logError( message ); 718 719 throwable = new FlowException( message ); 720 } 721 } 722 catch( Throwable exception ) 723 { 724 String message = "unable to rollback sink: " + tap.getFullIdentifier( getConfig() ); 725 726 logError( message, exception ); 727 728 throwable = new FlowException( message, exception ); 729 } 730 731 return throwable; 732 } 733 734 protected Throwable rollbackSinks() 735 { 736 Throwable throwable = null; 737 738 for( Tap tap : sinks.keySet() ) 739 { 740 if( throwable != null ) 741 rollbackResource( tap ); 742 else 743 throwable = rollbackResource( tap ); 744 } 745 746 return throwable; 747 } 748 749 /** 750 * Public for testing. 751 * 752 * @param flowProcess 753 * @param parentConfig 754 * @return 755 */ 756 public abstract Config createInitializedConfig( FlowProcess<Config> flowProcess, Config parentConfig ); 757 758 protected Set<String> getFieldDeclaredSerializations( Class base ) 759 { 760 Collection<SerializableType> serializableTypes = findAllSerializableTypes(); 761 762 return serializableTypes.stream() 763 .map( type -> type.getSerializer( base ) ) 764 .filter( Objects::nonNull ) 765 .map( Class::getName ) 766 .collect( Collectors.toSet() ); 767 } 768 769 protected Collection<SerializableType> findAllSerializableTypes() 770 { 771 Set<FlowElement> elements = Util.createIdentitySet(); 772 773 //todo mark FlowElements as relying on serialization w/ an annotation 774 elements.addAll( narrowIdentitySet( Tap.class, elementGraph.vertexSet() ) ); 775 elements.addAll( narrowIdentitySet( Splice.class, elementGraph.vertexSet() ) ); 776 777 Set<SerializableType> types = new HashSet<>(); 778 779 for( FlowElement element : elements ) 780 { 781 Fields fields = elementGraph.outgoingEdgesOf( element ).iterator().next().getOutValuesFields(); 782 783 Type[] fieldTypes = fields.getTypes(); 784 785 if( fieldTypes == null ) 786 continue; 787 788 for( Type type : fieldTypes ) 789 { 790 if( type instanceof SerializableType ) 791 types.add( (SerializableType) type ); 792 } 793 } 794 795 return types; 796 } 797 798 /** 799 * Method getPreviousScopes returns the previous Scope instances. If the flowElement is a Group (specifically a CoGroup), 800 * there will be more than one instance. 801 * 802 * @param flowElement of type FlowElement 803 * @return Set 804 */ 805 public Set<Scope> getPreviousScopes( FlowElement flowElement ) 806 { 807 return getElementGraph().incomingEdgesOf( flowElement ); 808 } 809 810 /** 811 * Method getNextScope returns the next Scope instance in the graph. There will always only be one next. 812 * 813 * @param flowElement of type FlowElement 814 * @return Scope 815 */ 816 public Scope getNextScope( FlowElement flowElement ) 817 { 818 Set<Scope> set = getElementGraph().outgoingEdgesOf( flowElement ); 819 820 if( set.size() != 1 ) 821 throw new IllegalStateException( "should only be one scope after current flow element: " + flowElement + " found: " + set.size() ); 822 823 return set.iterator().next(); 824 } 825 826 public FlowElement getNextFlowElement( Scope scope ) 827 { 828 return getElementGraph().getEdgeTarget( scope ); 829 } 830 831 public Collection<Operation> getAllOperations() 832 { 833 Set<FlowElement> vertices = getElementGraph().vertexSet(); 834 List<Operation> operations = new ArrayList<Operation>(); // operations impl equals, so two instance may be the same 835 836 for( FlowElement vertex : vertices ) 837 { 838 if( vertex instanceof Operator ) 839 operations.add( ( (Operator) vertex ).getOperation() ); 840 } 841 842 return operations; 843 } 844 845 @Override 846 public boolean containsPipeNamed( String pipeName ) 847 { 848 Set<FlowElement> vertices = getElementGraph().vertexSet(); 849 850 for( FlowElement vertex : vertices ) 851 { 852 if( vertex instanceof Pipe && ( (Pipe) vertex ).getName().equals( pipeName ) ) 853 return true; 854 } 855 856 return false; 857 } 858 859 public void clean() 860 { 861 // use step config by default 862 clean( getConfig() ); 863 } 864 865 public abstract void clean( Config config ); 866 867 List<SafeFlowStepListener> getListeners() 868 { 869 if( listeners == null ) 870 listeners = new LinkedList<SafeFlowStepListener>(); 871 872 return listeners; 873 } 874 875 @Override 876 public boolean hasListeners() 877 { 878 return listeners != null && !listeners.isEmpty(); 879 } 880 881 @Override 882 public void addListener( FlowStepListener flowStepListener ) 883 { 884 getListeners().add( new SafeFlowStepListener( flowStepListener ) ); 885 } 886 887 @Override 888 public boolean removeListener( FlowStepListener flowStepListener ) 889 { 890 return getListeners().remove( new SafeFlowStepListener( flowStepListener ) ); 891 } 892 893 protected void fireOnCompleted() 894 { 895 if( hasListeners() ) 896 { 897 if( isDebugEnabled() ) 898 logDebug( "firing onCompleted event: " + getListeners().size() ); 899 900 for( Object flowStepListener : getListeners() ) 901 ( (FlowStepListener) flowStepListener ).onStepCompleted( this ); 902 } 903 } 904 905 protected void fireOnThrowable( Throwable throwable ) 906 { 907 if( hasListeners() ) 908 { 909 if( isDebugEnabled() ) 910 logDebug( "firing onThrowable event: " + getListeners().size() ); 911 912 for( Object flowStepListener : getListeners() ) 913 ( (FlowStepListener) flowStepListener ).onStepThrowable( this, throwable ); 914 } 915 } 916 917 protected void fireOnStopping() 918 { 919 if( hasListeners() ) 920 { 921 if( isDebugEnabled() ) 922 logDebug( "firing onStopping event: " + getListeners() ); 923 924 for( Object flowStepListener : getListeners() ) 925 ( (FlowStepListener) flowStepListener ).onStepStopping( this ); 926 } 927 } 928 929 protected void fireOnStarting() 930 { 931 if( hasListeners() ) 932 { 933 if( isDebugEnabled() ) 934 logDebug( "firing onStarting event: " + getListeners().size() ); 935 936 for( Object flowStepListener : getListeners() ) 937 ( (FlowStepListener) flowStepListener ).onStepStarting( this ); 938 } 939 } 940 941 protected void fireOnRunning() 942 { 943 if( hasListeners() ) 944 { 945 if( isDebugEnabled() ) 946 logDebug( "firing onRunning event: " + getListeners().size() ); 947 948 for( Object flowStepListener : getListeners() ) 949 ( (FlowStepListener) flowStepListener ).onStepRunning( this ); 950 } 951 } 952 953 protected ClientState createClientState( FlowProcess flowProcess ) 954 { 955 CascadingServices services = flowProcess.getCurrentSession().getCascadingServices(); 956 957 if( services == null ) 958 return ClientState.NULL; 959 960 return services.createClientState( getID() ); 961 } 962 963 public FlowStepJob<Config> getFlowStepJob() 964 { 965 return flowStepJob; 966 } 967 968 public FlowStepJob<Config> getCreateFlowStepJob( FlowProcess<Config> flowProcess, Config parentConfig ) 969 { 970 if( flowStepJob != null ) 971 return flowStepJob; 972 973 if( flowProcess == null ) 974 return null; 975 976 Config initializedConfig = createInitializedConfig( flowProcess, parentConfig ); 977 978 setConfig( initializedConfig ); 979 980 ClientState clientState = createClientState( flowProcess ); 981 982 flowStepJob = createFlowStepJob( clientState, flowProcess, initializedConfig ); 983 984 return flowStepJob; 985 } 986 987 protected abstract FlowStepJob createFlowStepJob( ClientState clientState, FlowProcess<Config> flowProcess, Config initializedStepConfig ); 988 989 protected void initConfFromNodeConfigDef( ElementGraph nodeElementGraph, ConfigDef.Setter setter ) 990 { 991 nodeElementGraph = ElementGraphs.asExtentMaskedSubGraph( nodeElementGraph ); 992 993 ElementGraph stepElementGraph = ElementGraphs.asExtentMaskedSubGraph( getElementGraph() ); 994 995 // applies each mode in order, topologically 996 for( ConfigDef.Mode mode : ConfigDef.Mode.values() ) 997 { 998 Iterator<FlowElement> iterator = ElementGraphs.getTopologicalIterator( nodeElementGraph ); 999 1000 while( iterator.hasNext() ) 1001 { 1002 FlowElement element = iterator.next(); 1003 1004 while( element != null ) 1005 { 1006 // intentionally skip any element that spans downstream nodes, like a GroupBy 1007 // this way GroupBy is applied on the inbound side (where partitioning happens) 1008 // not the outbound side. 1009 // parent sub-assemblies (like Unique) will be applied if they have leading Pipes to the current spanning Pipe 1010 if( elementSpansDownStream( stepElementGraph, nodeElementGraph, element ) ) 1011 { 1012 element = null; 1013 continue; 1014 } 1015 1016 if( element instanceof ScopedElement && ( (ScopedElement) element ).hasNodeConfigDef() ) 1017 ( (ScopedElement) element ).getNodeConfigDef().apply( mode, setter ); 1018 1019 // walk up the sub-assembly parent hierarchy 1020 if( element instanceof Pipe ) 1021 element = ( (Pipe) element ).getParent(); 1022 else 1023 element = null; 1024 } 1025 } 1026 } 1027 } 1028 1029 private boolean elementSpansDownStream( ElementGraph stepElementGraph, ElementGraph nodeElementGraph, FlowElement element ) 1030 { 1031 boolean spansNodes = !( element instanceof SubAssembly ); 1032 1033 if( spansNodes ) 1034 spansNodes = nodeElementGraph.outDegreeOf( element ) == 0 && stepElementGraph.outDegreeOf( element ) > 0; 1035 1036 return spansNodes; 1037 } 1038 1039 protected void initConfFromStepConfigDef( ConfigDef.Setter setter ) 1040 { 1041 ElementGraph stepElementGraph = ElementGraphs.asExtentMaskedSubGraph( getElementGraph() ); 1042 1043 // applies each mode in order, topologically 1044 for( ConfigDef.Mode mode : ConfigDef.Mode.values() ) 1045 { 1046 Iterator<FlowElement> iterator = ElementGraphs.getTopologicalIterator( stepElementGraph ); 1047 1048 while( iterator.hasNext() ) 1049 { 1050 FlowElement element = iterator.next(); 1051 1052 while( element != null ) 1053 { 1054 if( element instanceof ScopedElement && ( (ScopedElement) element ).hasStepConfigDef() ) 1055 ( (ScopedElement) element ).getStepConfigDef().apply( mode, setter ); 1056 1057 // walk up the sub-assembly parent hierarchy 1058 if( element instanceof Pipe ) 1059 element = ( (Pipe) element ).getParent(); 1060 else 1061 element = null; 1062 } 1063 } 1064 } 1065 } 1066 1067 protected static void addSources( BaseFlowStep flowStep, ElementGraph elementGraph, Set<Tap> sources ) 1068 { 1069 for( Tap tap : sources ) 1070 { 1071 for( Scope scope : elementGraph.outgoingEdgesOf( tap ) ) 1072 flowStep.addSource( scope.getName(), tap ); 1073 } 1074 } 1075 1076 protected static void addSinks( BaseFlowStep flowStep, ElementGraph elementGraph, Set<Tap> sinks ) 1077 { 1078 for( Tap tap : sinks ) 1079 { 1080 for( Scope scope : elementGraph.incomingEdgesOf( tap ) ) 1081 flowStep.addSink( scope.getName(), tap ); 1082 } 1083 } 1084 1085 @Override 1086 public boolean equals( Object object ) 1087 { 1088 if( this == object ) 1089 return true; 1090 if( object == null || getClass() != object.getClass() ) 1091 return false; 1092 1093 BaseFlowStep flowStep = (BaseFlowStep) object; 1094 1095 if( id != null ? !id.equals( flowStep.id ) : flowStep.id != null ) 1096 return false; 1097 1098 return true; 1099 } 1100 1101 @Override 1102 public int hashCode() 1103 { 1104 return id != null ? id.hashCode() : 0; 1105 } 1106 1107 @Override 1108 public String toString() 1109 { 1110 StringBuffer buffer = new StringBuffer(); 1111 1112 buffer.append( getClass().getSimpleName() ); 1113 buffer.append( "[name: " ).append( getName() ).append( "]" ); 1114 1115 return buffer.toString(); 1116 } 1117 1118 @Override 1119 public final boolean isInfoEnabled() 1120 { 1121 return getLogger().isInfoEnabled(); 1122 } 1123 1124 private ProcessLogger getLogger() 1125 { 1126 if( flow != null && flow instanceof ProcessLogger ) 1127 return (ProcessLogger) flow; 1128 1129 return ProcessLogger.NULL; 1130 } 1131 1132 @Override 1133 public final boolean isDebugEnabled() 1134 { 1135 return ( getLogger() ).isDebugEnabled(); 1136 } 1137 1138 @Override 1139 public void logDebug( String message, Object... arguments ) 1140 { 1141 getLogger().logDebug( message, arguments ); 1142 } 1143 1144 @Override 1145 public void logInfo( String message, Object... arguments ) 1146 { 1147 getLogger().logInfo( message, arguments ); 1148 } 1149 1150 @Override 1151 public void logWarn( String message ) 1152 { 1153 getLogger().logWarn( message ); 1154 } 1155 1156 @Override 1157 public void logWarn( String message, Throwable throwable ) 1158 { 1159 getLogger().logWarn( message, throwable ); 1160 } 1161 1162 @Override 1163 public void logWarn( String message, Object... arguments ) 1164 { 1165 getLogger().logWarn( message, arguments ); 1166 } 1167 1168 @Override 1169 public void logError( String message, Object... arguments ) 1170 { 1171 getLogger().logError( message, arguments ); 1172 } 1173 1174 @Override 1175 public void logError( String message, Throwable throwable ) 1176 { 1177 getLogger().logError( message, throwable ); 1178 } 1179 1180 /** 1181 * Class SafeFlowStepListener safely calls a wrapped FlowStepListener. 1182 * <p> 1183 * This is done for a few reasons, the primary reason is so exceptions thrown by the Listener 1184 * can be caught by the calling Thread. Since Flow is asynchronous, much of the work is done in the run() method 1185 * which in turn is run in a new Thread. 1186 */ 1187 private class SafeFlowStepListener implements FlowStepListener 1188 { 1189 /** Field flowListener */ 1190 final FlowStepListener flowStepListener; 1191 /** Field throwable */ 1192 Throwable throwable; 1193 1194 private SafeFlowStepListener( FlowStepListener flowStepListener ) 1195 { 1196 this.flowStepListener = flowStepListener; 1197 } 1198 1199 public void onStepStarting( FlowStep flowStep ) 1200 { 1201 try 1202 { 1203 flowStepListener.onStepStarting( flowStep ); 1204 } 1205 catch( Throwable throwable ) 1206 { 1207 handleThrowable( throwable ); 1208 } 1209 } 1210 1211 public void onStepStopping( FlowStep flowStep ) 1212 { 1213 try 1214 { 1215 flowStepListener.onStepStopping( flowStep ); 1216 } 1217 catch( Throwable throwable ) 1218 { 1219 handleThrowable( throwable ); 1220 } 1221 } 1222 1223 public void onStepCompleted( FlowStep flowStep ) 1224 { 1225 try 1226 { 1227 flowStepListener.onStepCompleted( flowStep ); 1228 } 1229 catch( Throwable throwable ) 1230 { 1231 handleThrowable( throwable ); 1232 } 1233 } 1234 1235 public void onStepRunning( FlowStep flowStep ) 1236 { 1237 try 1238 { 1239 flowStepListener.onStepRunning( flowStep ); 1240 } 1241 catch( Throwable throwable ) 1242 { 1243 handleThrowable( throwable ); 1244 } 1245 } 1246 1247 public boolean onStepThrowable( FlowStep flowStep, Throwable flowStepThrowable ) 1248 { 1249 try 1250 { 1251 return flowStepListener.onStepThrowable( flowStep, flowStepThrowable ); 1252 } 1253 catch( Throwable throwable ) 1254 { 1255 handleThrowable( throwable ); 1256 } 1257 1258 return false; 1259 } 1260 1261 private void handleThrowable( Throwable throwable ) 1262 { 1263 this.throwable = throwable; 1264 1265 logWarn( String.format( "flow step listener %s threw throwable", flowStepListener ), throwable ); 1266 } 1267 1268 public boolean equals( Object object ) 1269 { 1270 if( object instanceof BaseFlowStep.SafeFlowStepListener ) 1271 return flowStepListener.equals( ( (BaseFlowStep.SafeFlowStepListener) object ).flowStepListener ); 1272 1273 return flowStepListener.equals( object ); 1274 } 1275 1276 public int hashCode() 1277 { 1278 return flowStepListener.hashCode(); 1279 } 1280 } 1281 }