001 /* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.flow.planner; 022 023 import java.io.IOException; 024 import java.io.Serializable; 025 import java.util.ArrayList; 026 import java.util.Collection; 027 import java.util.Collections; 028 import java.util.Date; 029 import java.util.HashMap; 030 import java.util.HashSet; 031 import java.util.Iterator; 032 import java.util.LinkedHashMap; 033 import java.util.LinkedList; 034 import java.util.List; 035 import java.util.ListIterator; 036 import java.util.Map; 037 import java.util.Set; 038 039 import cascading.flow.Flow; 040 import cascading.flow.FlowElement; 041 import cascading.flow.FlowException; 042 import cascading.flow.FlowProcess; 043 import cascading.flow.FlowStep; 044 import cascading.flow.FlowStepListener; 045 import cascading.management.CascadingServices; 046 import cascading.management.state.ClientState; 047 import cascading.operation.Operation; 048 import cascading.pipe.Group; 049 import cascading.pipe.HashJoin; 050 import cascading.pipe.Merge; 051 import cascading.pipe.Operator; 052 import cascading.pipe.Pipe; 053 import cascading.property.ConfigDef; 054 import cascading.stats.FlowStepStats; 055 import cascading.tap.Tap; 056 import cascading.util.Util; 057 import org.jgrapht.GraphPath; 058 import org.jgrapht.Graphs; 059 import org.jgrapht.alg.KShortestPaths; 060 import org.jgrapht.graph.SimpleDirectedGraph; 061 import org.jgrapht.traverse.TopologicalOrderIterator; 062 import org.slf4j.Logger; 063 import org.slf4j.LoggerFactory; 064 065 /** 066 * Class FlowStep is an internal representation of a given Job to be executed on a remote cluster. During 067 * planning, pipe assemblies are broken down into "steps" and encapsulated in this class. 068 * <p/> 069 * FlowSteps are submitted in order of dependency. If two or more steps do not share the same dependencies and all 070 * can be scheduled simultaneously, the {@link #getSubmitPriority()} value determines the order in which 071 * all steps will be submitted for execution. The default submit priority is 5. 072 * <p/> 073 * This class is for internal use, there are no stable public methods. 074 */ 075 public abstract class BaseFlowStep<Config> implements Serializable, FlowStep<Config> 076 { 077 /** Field LOG */ 078 private static final Logger LOG = LoggerFactory.getLogger( FlowStep.class ); 079 080 /** Field flow */ 081 private transient Flow<Config> flow; 082 /** Field flowName */ 083 private String flowName; 084 /** Field flowID */ 085 private String flowID; 086 087 private transient Config conf; 088 089 /** Field submitPriority */ 090 private int submitPriority = 5; 091 092 /** Field name */ 093 String name; 094 /** Field id */ 095 private String id; 096 private final int stepNum; 097 098 /** Field step listeners */ 099 private List<SafeFlowStepListener> listeners; 100 101 /** Field graph */ 102 private final SimpleDirectedGraph<FlowElement, Scope> graph = new SimpleDirectedGraph<FlowElement, Scope>( Scope.class ); 103 104 /** Field sources */ 105 protected final Map<Tap, Set<String>> sources = new HashMap<Tap, Set<String>>(); // all sources 106 /** Field sink */ 107 protected final Map<Tap, Set<String>> sinks = new HashMap<Tap, Set<String>>(); // all sinks 108 109 /** Field tempSink */ 110 protected Tap tempSink; // used if we need to bypass the filesystem 111 112 /** Field groups */ 113 private final List<Group> groups = new ArrayList<Group>(); 114 115 // sources streamed into join - not necessarily all sources 116 protected final Map<HashJoin, Tap> streamedSourceByJoin = new LinkedHashMap<HashJoin, Tap>(); 117 // sources accumulated by join 118 protected final Map<HashJoin, Set<Tap>> accumulatedSourcesByJoin = new LinkedHashMap<HashJoin, Set<Tap>>(); 119 120 private transient FlowStepJob<Config> flowStepJob; 121 122 protected BaseFlowStep( String name, int stepNum ) 123 { 124 setName( name ); 125 this.stepNum = stepNum; 126 } 127 128 @Override 129 public String getID() 130 { 131 if( id == null ) 132 id = Util.createUniqueID(); 133 134 return id; 135 } 136 137 @Override 138 public int getStepNum() 139 { 140 return stepNum; 141 } 142 143 @Override 144 public String getName() 145 { 146 return name; 147 } 148 149 void setName( String name ) 150 { 151 if( name == null || name.isEmpty() ) 152 throw new IllegalArgumentException( "step name may not be null or empty" ); 153 154 this.name = name; 155 } 156 157 public void setFlow( Flow<Config> flow ) 158 { 159 this.flow = flow; 160 this.flowID = flow.getID(); 161 this.flowName = flow.getName(); 162 } 163 164 @Override 165 public Flow<Config> getFlow() 166 { 167 return flow; 168 } 169 170 @Override 171 public String getFlowID() 172 { 173 return flowID; 174 } 175 176 @Override 177 public String getFlowName() 178 { 179 return flowName; 180 } 181 182 protected void setFlowName( String flowName ) 183 { 184 this.flowName = flowName; 185 } 186 187 @Override 188 public Config getConfig() 189 { 190 return conf; 191 } 192 193 protected void setConf( Config conf ) 194 { 195 this.conf = conf; 196 } 197 198 @Override 199 public String getStepDisplayName() 200 { 201 return getStepDisplayName( Util.ID_LENGTH ); 202 } 203 204 protected String getStepDisplayName( int idLength ) 205 { 206 if( idLength > Util.ID_LENGTH ) 207 idLength = Util.ID_LENGTH; 208 209 String flowID = getFlowID().substring( 0, idLength ); 210 String stepID = getID().substring( 0, idLength ); 211 212 return String.format( "[%s/%s] %s/%s", flowID, stepID, getFlowName(), getName() ); 213 } 214 215 @Override 216 public int getSubmitPriority() 217 { 218 return submitPriority; 219 } 220 221 @Override 222 public void setSubmitPriority( int submitPriority ) 223 { 224 if( submitPriority < 1 || submitPriority > 10 ) 225 throw new IllegalArgumentException( "submitPriority must be between 1 and 10 inclusive, was: " + submitPriority ); 226 227 this.submitPriority = submitPriority; 228 } 229 230 @Override 231 public FlowStepStats getFlowStepStats() 232 { 233 return flowStepJob.getStepStats(); 234 } 235 236 public SimpleDirectedGraph<FlowElement, Scope> getGraph() 237 { 238 return graph; 239 } 240 241 @Override 242 public Group getGroup() 243 { 244 if( groups.isEmpty() ) 245 return null; 246 247 if( groups.size() > 1 ) 248 throw new IllegalStateException( "more than one group" ); 249 250 return groups.get( 0 ); 251 } 252 253 @Override 254 public List<Group> getGroups() 255 { 256 return groups; 257 } 258 259 public void addGroup( Group group ) 260 { 261 if( !groups.contains( group ) ) 262 groups.add( group ); 263 } 264 265 @Override 266 public Map<HashJoin, Tap> getStreamedSourceByJoin() 267 { 268 return streamedSourceByJoin; 269 } 270 271 public void addStreamedSourceFor( HashJoin join, Tap streamedSource ) 272 { 273 streamedSourceByJoin.put( join, streamedSource ); 274 } 275 276 @Override 277 public Set<Tap> getAllAccumulatedSources() 278 { 279 HashSet<Tap> set = new HashSet<Tap>(); 280 281 for( Set<Tap> taps : accumulatedSourcesByJoin.values() ) 282 set.addAll( taps ); 283 284 return set; 285 } 286 287 public void addAccumulatedSourceFor( HashJoin join, Tap accumulatedSource ) 288 { 289 if( !accumulatedSourcesByJoin.containsKey( join ) ) 290 accumulatedSourcesByJoin.put( join, new HashSet<Tap>() ); 291 292 accumulatedSourcesByJoin.get( join ).add( accumulatedSource ); 293 } 294 295 public void addSource( String name, Tap source ) 296 { 297 if( !sources.containsKey( source ) ) 298 sources.put( source, new HashSet<String>() ); 299 300 sources.get( source ).add( name ); 301 } 302 303 public void addSink( String name, Tap sink ) 304 { 305 if( !sinks.containsKey( sink ) ) 306 sinks.put( sink, new HashSet<String>() ); 307 308 sinks.get( sink ).add( name ); 309 } 310 311 @Override 312 public Set<Tap> getSources() 313 { 314 return Collections.unmodifiableSet( new HashSet<Tap>( sources.keySet() ) ); 315 } 316 317 @Override 318 public Set<Tap> getSinks() 319 { 320 return Collections.unmodifiableSet( new HashSet<Tap>( sinks.keySet() ) ); 321 } 322 323 @Override 324 public Tap getSink() 325 { 326 if( sinks.size() != 1 ) 327 throw new IllegalStateException( "more than one sink" ); 328 329 return sinks.keySet().iterator().next(); 330 } 331 332 @Override 333 public Set<String> getSourceName( Tap source ) 334 { 335 return Collections.unmodifiableSet( sources.get( source ) ); 336 } 337 338 @Override 339 public Set<String> getSinkName( Tap sink ) 340 { 341 return Collections.unmodifiableSet( sinks.get( sink ) ); 342 } 343 344 @Override 345 public Tap getSourceWith( String identifier ) 346 { 347 for( Tap tap : sources.keySet() ) 348 { 349 if( tap.getIdentifier().equalsIgnoreCase( identifier ) ) 350 return tap; 351 } 352 353 return null; 354 } 355 356 @Override 357 public Tap getSinkWith( String identifier ) 358 { 359 for( Tap tap : sinks.keySet() ) 360 { 361 if( tap.getIdentifier().equalsIgnoreCase( identifier ) ) 362 return tap; 363 } 364 365 return null; 366 } 367 368 boolean allSourcesExist() throws IOException 369 { 370 for( Tap tap : sources.keySet() ) 371 { 372 if( !tap.resourceExists( getConfig() ) ) 373 return false; 374 } 375 376 return true; 377 } 378 379 boolean areSourcesNewer( long sinkModified ) throws IOException 380 { 381 Config config = getConfig(); 382 Iterator<Tap> values = sources.keySet().iterator(); 383 384 long sourceModified = 0; 385 386 try 387 { 388 sourceModified = Util.getSourceModified( config, values, sinkModified ); 389 390 if( sinkModified < sourceModified ) 391 return true; 392 393 return false; 394 } 395 finally 396 { 397 if( LOG.isInfoEnabled() ) 398 logInfo( "source modification date at: " + new Date( sourceModified ) ); // not oldest, we didnt check them all 399 } 400 } 401 402 long getSinkModified() throws IOException 403 { 404 long sinkModified = Util.getSinkModified( getConfig(), sinks.keySet() ); 405 406 if( LOG.isInfoEnabled() ) 407 { 408 if( sinkModified == -1L ) 409 logInfo( "at least one sink is marked for delete" ); 410 if( sinkModified == 0L ) 411 logInfo( "at least one sink does not exist" ); 412 else 413 logInfo( "sink oldest modified date: " + new Date( sinkModified ) ); 414 } 415 416 return sinkModified; 417 } 418 419 protected Throwable prepareResources() 420 { 421 Throwable throwable = prepareResources( getSources(), false ); 422 423 if( throwable == null ) 424 throwable = prepareResources( getSinks(), true ); 425 426 if( throwable == null ) 427 throwable = prepareResources( getTraps(), true ); 428 429 return throwable; 430 } 431 432 private Throwable prepareResources( Collection<Tap> taps, boolean forWrite ) 433 { 434 Throwable throwable = null; 435 436 for( Tap tap : taps ) 437 { 438 throwable = prepareResource( tap, forWrite ); 439 440 if( throwable != null ) 441 break; 442 } 443 444 return throwable; 445 } 446 447 private Throwable prepareResource( Tap tap, boolean forWrite ) 448 { 449 Throwable throwable = null; 450 451 try 452 { 453 boolean result; 454 455 if( forWrite ) 456 result = tap.prepareResourceForWrite( getConfig() ); 457 else 458 result = tap.prepareResourceForRead( getConfig() ); 459 460 if( !result ) 461 { 462 String message = String.format( "unable to prepare tap for %s: %s", forWrite ? "write" : "read", tap.getFullIdentifier( getConfig() ) ); 463 464 logError( message, null ); 465 466 throwable = new FlowException( message ); 467 } 468 } 469 catch( Throwable exception ) 470 { 471 String message = String.format( "unable to prepare tap for %s: %s", forWrite ? "write" : "read", tap.getFullIdentifier( getConfig() ) ); 472 473 logError( message, exception ); 474 475 throwable = new FlowException( message, exception ); 476 } 477 478 return throwable; 479 } 480 481 protected Throwable commitSinks() 482 { 483 Throwable throwable = null; 484 485 for( Tap tap : sinks.keySet() ) 486 { 487 if( throwable != null ) 488 rollbackResource( tap ); 489 else 490 throwable = commitResource( tap ); 491 } 492 493 return throwable; 494 } 495 496 private Throwable commitResource( Tap tap ) 497 { 498 Throwable throwable = null; 499 500 try 501 { 502 if( !tap.commitResource( getConfig() ) ) 503 { 504 String message = "unable to commit sink: " + tap.getFullIdentifier( getConfig() ); 505 506 logError( message, null ); 507 508 throwable = new FlowException( message ); 509 } 510 } 511 catch( Throwable exception ) 512 { 513 String message = "unable to commit sink: " + tap.getFullIdentifier( getConfig() ); 514 515 logError( message, exception ); 516 517 throwable = new FlowException( message, exception ); 518 } 519 520 return throwable; 521 } 522 523 private Throwable rollbackResource( Tap tap ) 524 { 525 Throwable throwable = null; 526 527 try 528 { 529 if( !tap.rollbackResource( getConfig() ) ) 530 { 531 String message = "unable to rollback sink: " + tap.getFullIdentifier( getConfig() ); 532 533 logError( message, null ); 534 535 throwable = new FlowException( message ); 536 } 537 } 538 catch( Throwable exception ) 539 { 540 String message = "unable to rollback sink: " + tap.getFullIdentifier( getConfig() ); 541 542 logError( message, exception ); 543 544 throwable = new FlowException( message, exception ); 545 } 546 547 return throwable; 548 } 549 550 protected Throwable rollbackSinks() 551 { 552 Throwable throwable = null; 553 554 for( Tap tap : sinks.keySet() ) 555 { 556 if( throwable != null ) 557 rollbackResource( tap ); 558 else 559 throwable = rollbackResource( tap ); 560 } 561 562 return throwable; 563 } 564 565 protected abstract Config getInitializedConfig( FlowProcess<Config> flowProcess, Config parentConfig ); 566 567 /** 568 * Method getPreviousScopes returns the previous Scope instances. If the flowElement is a Group (specifically a CoGroup), 569 * there will be more than one instance. 570 * 571 * @param flowElement of type FlowElement 572 * @return Set<Scope> 573 */ 574 public Set<Scope> getPreviousScopes( FlowElement flowElement ) 575 { 576 return getGraph().incomingEdgesOf( flowElement ); 577 } 578 579 /** 580 * Method getNextScope returns the next Scope instance in the graph. There will always only be one next. 581 * 582 * @param flowElement of type FlowElement 583 * @return Scope 584 */ 585 public Scope getNextScope( FlowElement flowElement ) 586 { 587 Set<Scope> set = getGraph().outgoingEdgesOf( flowElement ); 588 589 if( set.size() != 1 ) 590 throw new IllegalStateException( "should only be one scope after current flow element: " + flowElement + " found: " + set.size() ); 591 592 return set.iterator().next(); 593 } 594 595 public Scope getScopeFor( FlowElement sourceElement, FlowElement targetElement ) 596 { 597 return getGraph().getEdge( sourceElement, targetElement ); 598 } 599 600 public Set<Scope> getNextScopes( FlowElement flowElement ) 601 { 602 return getGraph().outgoingEdgesOf( flowElement ); 603 } 604 605 public FlowElement getNextFlowElement( Scope scope ) 606 { 607 return getGraph().getEdgeTarget( scope ); 608 } 609 610 public TopologicalOrderIterator<FlowElement, Scope> getTopologicalOrderIterator() 611 { 612 return new TopologicalOrderIterator<FlowElement, Scope>( graph ); 613 } 614 615 public List<FlowElement> getSuccessors( FlowElement element ) 616 { 617 return Graphs.successorListOf( graph, element ); 618 } 619 620 public Set<Tap> getJoinTributariesBetween( FlowElement from, FlowElement to ) 621 { 622 Set<HashJoin> joins = new HashSet<HashJoin>(); 623 Set<Merge> merges = new HashSet<Merge>(); 624 625 List<GraphPath<FlowElement, Scope>> paths = getPathsBetween( from, to ); 626 627 for( GraphPath<FlowElement, Scope> path : paths ) 628 { 629 for( FlowElement flowElement : Graphs.getPathVertexList( path ) ) 630 { 631 if( flowElement instanceof HashJoin ) 632 joins.add( (HashJoin) flowElement ); 633 634 if( flowElement instanceof Merge ) 635 merges.add( (Merge) flowElement ); 636 } 637 } 638 639 Set<Tap> tributaries = new HashSet<Tap>(); 640 641 for( HashJoin join : joins ) 642 { 643 for( Tap source : sources.keySet() ) 644 { 645 List<GraphPath<FlowElement, Scope>> joinPaths = new LinkedList( getPathsBetween( source, join ) ); 646 647 ListIterator<GraphPath<FlowElement, Scope>> iterator = joinPaths.listIterator(); 648 649 while( iterator.hasNext() ) 650 { 651 GraphPath<FlowElement, Scope> joinPath = iterator.next(); 652 653 if( !Collections.disjoint( Graphs.getPathVertexList( joinPath ), merges ) ) 654 iterator.remove(); 655 } 656 657 if( !joinPaths.isEmpty() ) 658 tributaries.add( source ); 659 } 660 } 661 662 return tributaries; 663 } 664 665 private List<GraphPath<FlowElement, Scope>> getPathsBetween( FlowElement from, FlowElement to ) 666 { 667 KShortestPaths<FlowElement, Scope> paths = new KShortestPaths<FlowElement, Scope>( graph, from, Integer.MAX_VALUE ); 668 List<GraphPath<FlowElement, Scope>> results = paths.getPaths( to ); 669 670 if( results == null ) 671 return Collections.EMPTY_LIST; 672 673 return results; 674 } 675 676 public Collection<Operation> getAllOperations() 677 { 678 Set<FlowElement> vertices = getGraph().vertexSet(); 679 List<Operation> operations = new ArrayList<Operation>(); // operations impl equals, so two instance may be the same 680 681 for( FlowElement vertex : vertices ) 682 { 683 if( vertex instanceof Operator ) 684 operations.add( ( (Operator) vertex ).getOperation() ); 685 } 686 687 return operations; 688 } 689 690 @Override 691 public boolean containsPipeNamed( String pipeName ) 692 { 693 Set<FlowElement> vertices = getGraph().vertexSet(); 694 695 for( FlowElement vertex : vertices ) 696 { 697 if( vertex instanceof Pipe && ( (Pipe) vertex ).getName().equals( pipeName ) ) 698 return true; 699 } 700 701 return false; 702 } 703 704 public void clean() 705 { 706 // use step config by default 707 clean( getConfig() ); 708 } 709 710 public abstract void clean( Config config ); 711 712 List<SafeFlowStepListener> getListeners() 713 { 714 if( listeners == null ) 715 listeners = new LinkedList<SafeFlowStepListener>(); 716 717 return listeners; 718 } 719 720 @Override 721 public boolean hasListeners() 722 { 723 return listeners != null && !listeners.isEmpty(); 724 } 725 726 @Override 727 public void addListener( FlowStepListener flowStepListener ) 728 { 729 getListeners().add( new SafeFlowStepListener( flowStepListener ) ); 730 } 731 732 @Override 733 public boolean removeListener( FlowStepListener flowStepListener ) 734 { 735 return getListeners().remove( new SafeFlowStepListener( flowStepListener ) ); 736 } 737 738 protected void fireOnCompleted() 739 { 740 741 if( hasListeners() ) 742 { 743 if( LOG.isDebugEnabled() ) 744 logDebug( "firing onCompleted event: " + getListeners().size() ); 745 746 for( Object flowStepListener : getListeners() ) 747 ( (FlowStepListener) flowStepListener ).onStepCompleted( this ); 748 } 749 } 750 751 protected void fireOnThrowable( Throwable throwable ) 752 { 753 if( hasListeners() ) 754 { 755 if( LOG.isDebugEnabled() ) 756 logDebug( "firing onThrowable event: " + getListeners().size() ); 757 758 for( Object flowStepListener : getListeners() ) 759 ( (FlowStepListener) flowStepListener ).onStepThrowable( this, throwable ); 760 } 761 } 762 763 protected void fireOnStopping() 764 { 765 if( hasListeners() ) 766 { 767 if( LOG.isDebugEnabled() ) 768 logDebug( "firing onStopping event: " + getListeners() ); 769 770 for( Object flowStepListener : getListeners() ) 771 ( (FlowStepListener) flowStepListener ).onStepStopping( this ); 772 } 773 } 774 775 protected void fireOnStarting() 776 { 777 if( hasListeners() ) 778 { 779 if( LOG.isDebugEnabled() ) 780 logDebug( "firing onStarting event: " + getListeners().size() ); 781 782 for( Object flowStepListener : getListeners() ) 783 ( (FlowStepListener) flowStepListener ).onStepStarting( this ); 784 } 785 } 786 787 protected void fireOnRunning() 788 { 789 if( hasListeners() ) 790 { 791 if( LOG.isDebugEnabled() ) 792 logDebug( "firing onRunning event: " + getListeners().size() ); 793 794 for( Object flowStepListener : getListeners() ) 795 ( (FlowStepListener) flowStepListener ).onStepRunning( this ); 796 } 797 } 798 799 @Override 800 public boolean equals( Object object ) 801 { 802 if( this == object ) 803 return true; 804 if( object == null || getClass() != object.getClass() ) 805 return false; 806 807 BaseFlowStep flowStep = (BaseFlowStep) object; 808 809 if( name != null ? !name.equals( flowStep.name ) : flowStep.name != null ) 810 return false; 811 812 return true; 813 } 814 815 protected ClientState createClientState( FlowProcess flowProcess ) 816 { 817 CascadingServices services = flowProcess.getCurrentSession().getCascadingServices(); 818 return services.createClientState( getID() ); 819 } 820 821 public FlowStepJob<Config> getFlowStepJob( FlowProcess<Config> flowProcess, Config parentConfig ) 822 { 823 if( flowStepJob != null ) 824 return flowStepJob; 825 826 if( flowProcess == null ) 827 return null; 828 829 flowStepJob = createFlowStepJob( flowProcess, parentConfig ); 830 831 return flowStepJob; 832 } 833 834 protected abstract FlowStepJob createFlowStepJob( FlowProcess<Config> flowProcess, Config parentConfig ); 835 836 protected void initConfFromProcessConfigDef( ConfigDef.Setter setter ) 837 { 838 // applies each mode in order, topologically 839 for( ConfigDef.Mode mode : ConfigDef.Mode.values() ) 840 { 841 TopologicalOrderIterator<FlowElement, Scope> iterator = getTopologicalOrderIterator(); 842 843 while( iterator.hasNext() ) 844 { 845 FlowElement element = iterator.next(); 846 847 while( element != null ) 848 { 849 if( element.hasStepConfigDef() ) 850 element.getStepConfigDef().apply( mode, setter ); 851 852 if( element instanceof Pipe ) 853 element = ( (Pipe) element ).getParent(); 854 else 855 element = null; 856 } 857 } 858 } 859 } 860 861 @Override 862 public int hashCode() 863 { 864 return name != null ? name.hashCode() : 0; 865 } 866 867 @Override 868 public String toString() 869 { 870 StringBuffer buffer = new StringBuffer(); 871 872 buffer.append( getClass().getSimpleName() ); 873 buffer.append( "[name: " ).append( getName() ).append( "]" ); 874 875 return buffer.toString(); 876 } 877 878 public final boolean isInfoEnabled() 879 { 880 return LOG.isInfoEnabled(); 881 } 882 883 public final boolean isDebugEnabled() 884 { 885 return LOG.isDebugEnabled(); 886 } 887 888 public void logDebug( String message ) 889 { 890 LOG.debug( "[" + Util.truncate( getFlowName(), 25 ) + "] " + message ); 891 } 892 893 public void logInfo( String message ) 894 { 895 LOG.info( "[" + Util.truncate( getFlowName(), 25 ) + "] " + message ); 896 } 897 898 public void logWarn( String message ) 899 { 900 LOG.warn( "[" + Util.truncate( getFlowName(), 25 ) + "] " + message ); 901 } 902 903 public void logWarn( String message, Throwable throwable ) 904 { 905 LOG.warn( "[" + Util.truncate( getFlowName(), 25 ) + "] " + message, throwable ); 906 } 907 908 public void logError( String message, Throwable throwable ) 909 { 910 LOG.error( "[" + Util.truncate( getFlowName(), 25 ) + "] " + message, throwable ); 911 } 912 913 /** 914 * Class SafeFlowStepListener safely calls a wrapped FlowStepListener. 915 * <p/> 916 * This is done for a few reasons, the primary reason is so exceptions thrown by the Listener 917 * can be caught by the calling Thread. Since Flow is asynchronous, much of the work is done in the run() method 918 * which in turn is run in a new Thread. 919 */ 920 private class SafeFlowStepListener implements FlowStepListener 921 { 922 /** Field flowListener */ 923 final FlowStepListener flowStepListener; 924 /** Field throwable */ 925 Throwable throwable; 926 927 private SafeFlowStepListener( FlowStepListener flowStepListener ) 928 { 929 this.flowStepListener = flowStepListener; 930 } 931 932 public void onStepStarting( FlowStep flowStep ) 933 { 934 try 935 { 936 flowStepListener.onStepStarting( flowStep ); 937 } 938 catch( Throwable throwable ) 939 { 940 handleThrowable( throwable ); 941 } 942 } 943 944 public void onStepStopping( FlowStep flowStep ) 945 { 946 try 947 { 948 flowStepListener.onStepStopping( flowStep ); 949 } 950 catch( Throwable throwable ) 951 { 952 handleThrowable( throwable ); 953 } 954 } 955 956 public void onStepCompleted( FlowStep flowStep ) 957 { 958 try 959 { 960 flowStepListener.onStepCompleted( flowStep ); 961 } 962 catch( Throwable throwable ) 963 { 964 handleThrowable( throwable ); 965 } 966 } 967 968 public void onStepRunning( FlowStep flowStep ) 969 { 970 try 971 { 972 flowStepListener.onStepRunning( flowStep ); 973 } 974 catch( Throwable throwable ) 975 { 976 handleThrowable( throwable ); 977 } 978 } 979 980 public boolean onStepThrowable( FlowStep flowStep, Throwable flowStepThrowable ) 981 { 982 try 983 { 984 return flowStepListener.onStepThrowable( flowStep, flowStepThrowable ); 985 } 986 catch( Throwable throwable ) 987 { 988 handleThrowable( throwable ); 989 } 990 991 return false; 992 } 993 994 private void handleThrowable( Throwable throwable ) 995 { 996 this.throwable = throwable; 997 998 logWarn( String.format( "flow step listener %s threw throwable", flowStepListener ), throwable ); 999 } 1000 1001 public boolean equals( Object object ) 1002 { 1003 if( object instanceof BaseFlowStep.SafeFlowStepListener ) 1004 return flowStepListener.equals( ( (BaseFlowStep.SafeFlowStepListener) object ).flowStepListener ); 1005 1006 return flowStepListener.equals( object ); 1007 } 1008 1009 public int hashCode() 1010 { 1011 return flowStepListener.hashCode(); 1012 } 1013 } 1014 }