001/* 002 * Copyright (c) 2016-2017 Chris K Wensel <chris@wensel.net>. All Rights Reserved. 003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 004 * 005 * Project and contact information: http://www.cascading.org/ 006 * 007 * This file is part of the Cascading project. 008 * 009 * Licensed under the Apache License, Version 2.0 (the "License"); 010 * you may not use this file except in compliance with the License. 011 * You may obtain a copy of the License at 012 * 013 * http://www.apache.org/licenses/LICENSE-2.0 014 * 015 * Unless required by applicable law or agreed to in writing, software 016 * distributed under the License is distributed on an "AS IS" BASIS, 017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 018 * See the License for the specific language governing permissions and 019 * limitations under the License. 020 */ 021 022package cascading.stats; 023 024import java.io.Serializable; 025import java.util.Collection; 026import java.util.LinkedHashSet; 027import java.util.Map; 028import java.util.Set; 029import java.util.concurrent.atomic.AtomicLong; 030 031import cascading.flow.Flow; 032import cascading.management.state.ClientState; 033import cascading.util.ProcessLogger; 034 035/** 036 * Class CascadingStats is the base class for all Cascading statistics gathering. It also reports the status of 037 * core elements that have state. 038 * <p> 039 * There are eight states the stats object reports; PENDING, SKIPPED, STARTED, SUBMITTED, RUNNING, SUCCESSFUL, STOPPED, and FAILED. 040 * <ul> 041 * <li>{@code pending} - when the Flow or Cascade has yet to start.</li> 042 * <li>{@code skipped} - when the Flow was skipped by the parent Cascade.</li> 043 * <li>{@code started} - when {@link cascading.flow.Flow#start()} was called.</li> 044 * <li>{@code submitted} - when the Step was submitted to the underlying platform for work.</li> 045 * <li>{@code running} - when the Flow or Cascade is executing a workload.</li> 046 * <li>{@code stopped} - when the user calls {@link cascading.flow.Flow#stop()} on the Flow or Cascade.</li> 047 * <li>{@code failed} - when the Flow or Cascade threw an error and failed to finish the workload.</li> 048 * <li>{@code successful} - when the Flow or Cascade naturally completed its workload without failure.</li> 049 * </ul> 050 * <p> 051 * CascadingStats also reports four unique timestamps. 052 * <ul> 053 * <li>{@code startTime} - when the {@code start()} method was called.</li> 054 * <li>{@code submitTime} - when the unit of work was actually submitted for execution. Not supported by all sub-classes.</li> 055 * <li>{@code runTime} - when the unit of work actually began to execute work. This value may be affected by any "polling interval" in place.</li> 056 * <li>{@code finishedTime} - when all work has completed successfully, failed, or stopped.</li> 057 * </ul> 058 * <p> 059 * A unit of work is considered {@code finished} when the Flow or Cascade is no longer processing a workload and {@code successful}, 060 * {@code skipped}, {@code failed}, or {@code stopped} is true. 061 * <p> 062 * It is important to note all the timestamps are client side observations. Not values reported by the underlying 063 * platform. That said, the transitions are seen by polling the client interface to the underlying platform so are 064 * effected by the {@link cascading.flow.FlowProps#getJobPollingInterval()} value. 065 * 066 * @see CascadeStats 067 * @see FlowStats 068 * @see FlowStepStats 069 */ 070public abstract class CascadingStats<Child> implements ProvidesCounters, Serializable 071 { 072 public static final String STATS_STORE_INTERVAL = "cascading.stats.store.interval"; 073 public static final String STATS_COMPLETE_CHILD_DETAILS_BLOCK_DURATION = "cascading.stats.complete_child_details.block.duration"; 074 075 /** 076 * Method setStatsStoreInterval sets the interval time between store operations against the underlying 077 * document storage services. This affects the rate at which metrics and status information is updated. 078 * 079 * @param properties of type Properties 080 * @param intervalMs milliseconds between storage calls 081 */ 082 public static void setStatsStoreInterval( Map<Object, Object> properties, long intervalMs ) 083 { 084 if( intervalMs <= 0 ) 085 throw new IllegalArgumentException( "interval must be greater than zero, got: " + intervalMs ); 086 087 properties.put( STATS_STORE_INTERVAL, Long.toString( intervalMs ) ); 088 } 089 090 public enum Type 091 { 092 CASCADE, FLOW, STEP, NODE, SLICE, ATTEMPT; 093 094 public boolean isChild( Type type ) 095 { 096 return ordinal() < type.ordinal(); 097 } 098 } 099 100 public enum Status 101 { 102 PENDING( false ), SKIPPED( true ), STARTED( false ), SUBMITTED( false ), RUNNING( false ), SUCCESSFUL( true ), STOPPED( true ), FAILED( true ); 103 104 boolean isFinished = false; // is this a completed state 105 106 Status( boolean isFinished ) 107 { 108 this.isFinished = isFinished; 109 } 110 111 public boolean isFinished() 112 { 113 return isFinished; 114 } 115 } 116 117 private transient String prefixID; // cached sub-string 118 119 /** Field name */ 120 protected final String name; 121 protected final ClientState clientState; 122 123 /** Field status */ 124 protected Status status = Status.PENDING; 125 126 protected Set<StatsListener> listeners; 127 128 /** Field pendingTime */ 129 protected long pendingTime; 130 /** Field startTime */ 131 protected long startTime; 132 /** Field submitTime */ 133 protected long submitTime; 134 /** Field runTime */ 135 protected long runTime; 136 /** Field finishedTime */ 137 protected long finishedTime; 138 /** Field throwable */ 139 protected Throwable throwable; 140 /** Field throwableTrace */ 141 protected String[] throwableTrace; 142 143 protected AtomicLong lastCaptureDetail = new AtomicLong( 0 ); 144 145 protected CascadingStats( String name, ClientState clientState ) 146 { 147 this.name = name; 148 this.clientState = clientState; 149 } 150 151 /** Method prepare initializes this instance. */ 152 public void prepare() 153 { 154 clientState.startService(); 155 } 156 157 /** Method cleanup destroys any resources allocated by this instance. */ 158 public void cleanup() 159 { 160 clientState.stopService(); 161 } 162 163 /** 164 * Method getID returns the ID of this CascadingStats object. 165 * 166 * @return the ID (type Object) of this CascadingStats object. 167 */ 168 public abstract String getID(); 169 170 /** 171 * Method getName returns the name of this CascadingStats object. 172 * 173 * @return the name (type String) of this CascadingStats object. 174 */ 175 public String getName() 176 { 177 return name; 178 } 179 180 public abstract Type getType(); 181 182 /** 183 * Method getThrowable returns the throwable of this CascadingStats object. 184 * 185 * @return the throwable (type Throwable) of this CascadingStats object. 186 */ 187 public Throwable getThrowable() 188 { 189 return throwable; 190 } 191 192 /** 193 * Method getThrowableTrace returns the throwableTrace of this CascadingStats object. 194 * <p> 195 * Will return null if not set. 196 * 197 * @return the throwableTrace (type String[]) of this CascadingStats object. 198 */ 199 public String[] getThrowableTrace() 200 { 201 return throwableTrace; 202 } 203 204 /** 205 * Method isPending returns true if no work has been submitted. 206 * 207 * @return the pending (type boolean) of this CascadingStats object. 208 */ 209 public boolean isPending() 210 { 211 return status == Status.PENDING; 212 } 213 214 /** 215 * Method isSkipped returns true when the works was skipped. 216 * <p> 217 * Flows are skipped if the appropriate {@link cascading.flow.FlowSkipStrategy#skipFlow(Flow)} 218 * returns {@code true}; 219 * 220 * @return the skipped (type boolean) of this CascadingStats object. 221 */ 222 public boolean isSkipped() 223 { 224 return status == Status.SKIPPED; 225 } 226 227 /** 228 * Method isStarted returns true when work has started. 229 * 230 * @return the started (type boolean) of this CascadingStats object. 231 */ 232 public boolean isStarted() 233 { 234 return status == Status.STARTED; 235 } 236 237 /** 238 * Method isSubmitted returns true if no work has started. 239 * 240 * @return the submitted (type boolean) of this CascadingStats object. 241 */ 242 public boolean isSubmitted() 243 { 244 return status == Status.SUBMITTED; 245 } 246 247 /** 248 * Method isRunning returns true when work has begun. 249 * 250 * @return the running (type boolean) of this CascadingStats object. 251 */ 252 public boolean isRunning() 253 { 254 return status == Status.RUNNING; 255 } 256 257 /** 258 * Method isEngaged returns true when there is work being executed, if 259 * {@link #isStarted()}, {@link #isSubmitted()}, or {@link #isRunning()} returns true; 260 * 261 * @return the engaged (type boolean) of this CascadingStats object. 262 */ 263 public boolean isEngaged() 264 { 265 return isStarted() || isSubmitted() || isRunning(); 266 } 267 268 /** 269 * Method isSuccessful returns true when work has completed successfully. 270 * 271 * @return the completed (type boolean) of this CascadingStats object. 272 */ 273 public boolean isSuccessful() 274 { 275 return status == Status.SUCCESSFUL; 276 } 277 278 /** 279 * Method isFailed returns true when the work ended with an error. 280 * 281 * @return the failed (type boolean) of this CascadingStats object. 282 */ 283 public boolean isFailed() 284 { 285 return status == Status.FAILED; 286 } 287 288 /** 289 * Method isStopped returns true when the user stopped the work. 290 * 291 * @return the stopped (type boolean) of this CascadingStats object. 292 */ 293 public boolean isStopped() 294 { 295 return status == Status.STOPPED; 296 } 297 298 /** 299 * Method isFinished returns true if the current status shows no work currently being executed, 300 * if {@link #isSkipped()}, {@link #isSuccessful()}, {@link #isFailed()}, or {@link #isStopped()} returns true. 301 * 302 * @return the finished (type boolean) of this CascadingStats object. 303 */ 304 public boolean isFinished() 305 { 306 return status == Status.SUCCESSFUL || status == Status.FAILED || status == Status.STOPPED || status == Status.SKIPPED; 307 } 308 309 /** 310 * Method getStatus returns the {@link Status} of this CascadingStats object. 311 * 312 * @return the status (type Status) of this CascadingStats object. 313 */ 314 public Status getStatus() 315 { 316 return status; 317 } 318 319 /** Method recordStats forces recording of current status information. */ 320 public void recordStats() 321 { 322 clientState.recordStats( this ); 323 } 324 325 public abstract void recordInfo(); 326 327 /** Method markPending sets the status to {@link Status#PENDING}. */ 328 public synchronized void markPending() 329 { 330 markPendingTime(); 331 332 fireListeners( null, Status.PENDING ); 333 334 recordStats(); 335 recordInfo(); 336 } 337 338 protected void markPendingTime() 339 { 340 if( pendingTime == 0 ) 341 pendingTime = System.currentTimeMillis(); 342 } 343 344 /** 345 * Method markStartedThenRunning consecutively marks the status as {@link Status#STARTED} then {@link Status#RUNNING} 346 * and forces the start and running time to be equals. 347 */ 348 public synchronized void markStartedThenRunning() 349 { 350 if( status != Status.PENDING ) 351 throw new IllegalStateException( "may not mark as " + Status.STARTED + ", is already " + status ); 352 353 markStartToRunTime(); 354 markStarted(); 355 markRunning(); 356 } 357 358 protected void markStartToRunTime() 359 { 360 startTime = submitTime = runTime = System.currentTimeMillis(); 361 } 362 363 /** Method markStarted sets the status to {@link Status#STARTED}. */ 364 public synchronized void markStarted() 365 { 366 if( status != Status.PENDING ) 367 throw new IllegalStateException( "may not mark as " + Status.STARTED + ", is already " + status ); 368 369 Status priorStatus = status; 370 status = Status.STARTED; 371 markStartTime(); 372 373 fireListeners( priorStatus, status ); 374 375 clientState.start( startTime ); 376 clientState.setStatus( status, startTime ); 377 recordStats(); 378 recordInfo(); 379 } 380 381 protected void markStartTime() 382 { 383 if( startTime == 0 ) 384 startTime = System.currentTimeMillis(); 385 } 386 387 /** Method markSubmitted sets the status to {@link Status#SUBMITTED}. */ 388 public synchronized void markSubmitted() 389 { 390 if( status == Status.SUBMITTED ) 391 return; 392 393 if( status != Status.STARTED ) 394 throw new IllegalStateException( "may not mark as " + Status.SUBMITTED + ", is already " + status ); 395 396 Status priorStatus = status; 397 status = Status.SUBMITTED; 398 markSubmitTime(); 399 400 fireListeners( priorStatus, status ); 401 402 clientState.submit( submitTime ); 403 clientState.setStatus( status, submitTime ); 404 recordStats(); 405 recordInfo(); 406 } 407 408 protected void markSubmitTime() 409 { 410 if( submitTime == 0 ) 411 submitTime = System.currentTimeMillis(); 412 } 413 414 /** Method markRunning sets the status to {@link Status#RUNNING}. */ 415 public synchronized void markRunning() 416 { 417 if( status == Status.RUNNING ) 418 return; 419 420 if( status != Status.STARTED && status != Status.SUBMITTED ) 421 throw new IllegalStateException( "may not mark as " + Status.RUNNING + ", is already " + status ); 422 423 Status priorStatus = status; 424 status = Status.RUNNING; 425 markRunTime(); 426 427 fireListeners( priorStatus, status ); 428 429 clientState.run( runTime ); 430 clientState.setStatus( status, runTime ); 431 recordStats(); 432 recordInfo(); 433 } 434 435 protected void markRunTime() 436 { 437 if( runTime == 0 ) 438 runTime = System.currentTimeMillis(); 439 } 440 441 /** Method markSuccessful sets the status to {@link Status#SUCCESSFUL}. */ 442 public synchronized void markSuccessful() 443 { 444 if( status != Status.RUNNING && status != Status.SUBMITTED ) 445 throw new IllegalStateException( "may not mark as " + Status.SUCCESSFUL + ", is already " + status ); 446 447 Status priorStatus = status; 448 status = Status.SUCCESSFUL; 449 markFinishedTime(); 450 451 fireListeners( priorStatus, status ); 452 453 clientState.setStatus( status, finishedTime ); 454 clientState.stop( finishedTime ); 455 recordStats(); 456 recordInfo(); 457 } 458 459 protected void markFinishedTime() 460 { 461 finishedTime = System.currentTimeMillis(); 462 } 463 464 /** 465 * Method markFailed sets the status to {@link Status#FAILED}. 466 */ 467 public void markFailed() 468 { 469 markFailed( null, null ); 470 } 471 472 /** 473 * Method markFailed sets the status to {@link Status#FAILED}. 474 * 475 * @param throwable of type Throwable 476 */ 477 public synchronized void markFailed( Throwable throwable ) 478 { 479 markFailed( throwable, null ); 480 } 481 482 /** 483 * Method markFailed sets the status to {@link Status#FAILED}. 484 * 485 * @param throwableTrace of type String[] 486 */ 487 public synchronized void markFailed( String[] throwableTrace ) 488 { 489 markFailed( null, throwableTrace ); 490 } 491 492 protected synchronized void markFailed( Throwable throwable, String[] throwableTrace ) 493 { 494 if( status != Status.STARTED && status != Status.RUNNING && status != Status.SUBMITTED ) 495 throw new IllegalStateException( "may not mark as " + Status.FAILED + ", is already " + status ); 496 497 Status priorStatus = status; 498 status = Status.FAILED; 499 markFinishedTime(); 500 this.throwable = throwable; 501 this.throwableTrace = throwableTrace; 502 503 fireListeners( priorStatus, status ); 504 505 clientState.setStatus( status, finishedTime ); 506 clientState.stop( finishedTime ); 507 recordStats(); 508 recordInfo(); 509 } 510 511 /** Method markStopped sets the status to {@link Status#STOPPED}. */ 512 public synchronized void markStopped() 513 { 514 if( status != Status.PENDING && status != Status.STARTED && status != Status.SUBMITTED && status != Status.RUNNING ) 515 throw new IllegalStateException( "may not mark as " + Status.STOPPED + ", is already " + status ); 516 517 Status priorStatus = status; 518 status = Status.STOPPED; 519 markFinishedTime(); 520 521 fireListeners( priorStatus, status ); 522 523 clientState.setStatus( status, finishedTime ); 524 recordStats(); 525 recordInfo(); 526 clientState.stop( finishedTime ); 527 } 528 529 /** Method markSkipped sets the status to {@link Status#SKIPPED}. */ 530 public synchronized void markSkipped() 531 { 532 if( status != Status.PENDING ) 533 throw new IllegalStateException( "may not mark as " + Status.SKIPPED + ", is already " + status ); 534 535 Status priorStatus = status; 536 status = Status.SKIPPED; 537 538 fireListeners( priorStatus, status ); 539 540 clientState.setStatus( status, System.currentTimeMillis() ); 541 recordStats(); 542 recordInfo(); 543 } 544 545 /** 546 * Method getPendingTime returns the pendingTime of this CascadingStats object. 547 * 548 * @return the pendingTime (type long) of this CascadingStats object. 549 */ 550 public long getPendingTime() 551 { 552 return pendingTime; 553 } 554 555 /** 556 * Method getStartTime returns the startTime of this CascadingStats object. 557 * 558 * @return the startTime (type long) of this CascadingStats object. 559 */ 560 public long getStartTime() 561 { 562 return startTime; 563 } 564 565 /** 566 * Method getSubmitTime returns the submitTime of this CascadingStats object. 567 * 568 * @return the submitTime (type long) of this CascadingStats object. 569 */ 570 public long getSubmitTime() 571 { 572 return submitTime; 573 } 574 575 /** 576 * Method getRunTime returns the runTime of this CascadingStats object. 577 * 578 * @return the runTime (type long) of this CascadingStats object. 579 */ 580 public long getRunTime() 581 { 582 return runTime; 583 } 584 585 /** 586 * Method getFinishedTime returns the finishedTime of this CascadingStats object. 587 * 588 * @return the finishedTime (type long) of this CascadingStats object. 589 */ 590 public long getFinishedTime() 591 { 592 return finishedTime; 593 } 594 595 /** 596 * Method getDuration returns the duration the work executed before being finished. 597 * <p> 598 * This method will return zero until the work is finished. See {@link #getCurrentDuration()} 599 * if you wish to poll for the current duration value. 600 * <p> 601 * Duration is calculated as {@code finishedTime - startTime}. 602 * 603 * @return the duration (type long) of this CascadingStats object. 604 */ 605 public long getDuration() 606 { 607 if( finishedTime != 0 ) 608 return finishedTime - startTime; 609 else 610 return 0; 611 } 612 613 /** 614 * Method getCurrentDuration returns the current duration of the current work whether or not 615 * the work is finished. When finished, the return value will be the same as {@link #getDuration()}. 616 * <p> 617 * Duration is calculated as {@code finishedTime - startTime}. 618 * 619 * @return the currentDuration (type long) of this CascadingStats object. 620 */ 621 public long getCurrentDuration() 622 { 623 if( finishedTime != 0 ) 624 return finishedTime - startTime; 625 else 626 return System.currentTimeMillis() - startTime; 627 } 628 629 @Override 630 public Collection<String> getCountersFor( Class<? extends Enum> group ) 631 { 632 return getCountersFor( group.getName() ); 633 } 634 635 /** 636 * Method getCounterGroupsMatching returns all the available counter group names that match 637 * the given regular expression. 638 * 639 * @param regex of type String 640 * @return Collection 641 */ 642 public abstract Collection<String> getCounterGroupsMatching( String regex ); 643 644 /** 645 * Method captureDetail will recursively capture details about nested systems. Use this method to persist 646 * statistics about a given Cascade, Flow, FlowStep, or FlowNode. 647 * <p> 648 * Each CascadingStats object must be individually inspected for any system specific details. 649 * <p> 650 * Each call to this method will refresh the internal cache unless the current Stats object is marked finished. One 651 * additional refresh will happen after this instance is marked finished. 652 */ 653 public void captureDetail() 654 { 655 captureDetail( Type.ATTEMPT ); 656 } 657 658 public abstract void captureDetail( Type depth ); 659 660 /** 661 * For rate limiting access to the backend. 662 * <p> 663 * Currently used at the Step and below. 664 */ 665 protected boolean isDetailStale() 666 { 667 return ( System.currentTimeMillis() - lastCaptureDetail.get() ) > 500; 668 } 669 670 protected void markDetailCaptured() 671 { 672 lastCaptureDetail.set( System.currentTimeMillis() ); 673 } 674 675 /** 676 * Method getChildren returns any relevant child statistics instances. They may not be of type CascadingStats, but 677 * instead platform specific. 678 * 679 * @return a Collection of child statistics 680 */ 681 public abstract Collection<Child> getChildren(); 682 683 /** 684 * Method getChildWith returns a child stats instance with the given ID value. 685 * 686 * @param id the id of a child instance 687 * @return the child stats instance or null if not found 688 */ 689 public abstract Child getChildWith( String id ); 690 691 public synchronized void addListener( StatsListener statsListener ) 692 { 693 if( listeners == null ) 694 listeners = new LinkedHashSet<>(); 695 696 listeners.add( statsListener ); 697 } 698 699 public synchronized boolean removeListener( StatsListener statsListener ) 700 { 701 return listeners != null && listeners.remove( statsListener ); 702 } 703 704 protected synchronized void fireListeners( CascadingStats.Status fromStatus, CascadingStats.Status toStatus ) 705 { 706 if( listeners == null ) 707 return; 708 709 for( StatsListener listener : listeners ) 710 { 711 try 712 { 713 listener.notify( this, fromStatus, toStatus ); 714 } 715 catch( Throwable throwable ) 716 { 717 logWarn( "error during listener notification, continuing with remaining listener notification", throwable ); 718 } 719 } 720 } 721 722 protected abstract ProcessLogger getProcessLogger(); 723 724 protected String getStatsString() 725 { 726 String string = "status=" + status + ", startTime=" + startTime; 727 728 if( finishedTime != 0 ) 729 string += ", duration=" + ( finishedTime - startTime ); 730 731 return string; 732 } 733 734 @Override 735 public String toString() 736 { 737 return "Cascading{" + getStatsString() + '}'; 738 } 739 740 protected void logInfo( String message, Object... arguments ) 741 { 742 getProcessLogger().logInfo( getPrefix() + message, arguments ); 743 } 744 745 protected void logDebug( String message, Object... arguments ) 746 { 747 getProcessLogger().logDebug( getPrefix() + message, arguments ); 748 } 749 750 protected void logWarn( String message, Object... arguments ) 751 { 752 getProcessLogger().logWarn( getPrefix() + message, arguments ); 753 } 754 755 protected void logError( String message, Object... arguments ) 756 { 757 getProcessLogger().logError( getPrefix() + message, arguments ); 758 } 759 760 protected void logError( String message, Throwable throwable ) 761 { 762 getProcessLogger().logError( getPrefix() + message, throwable ); 763 } 764 765 protected String getPrefix() 766 { 767 if( prefixID == null ) 768 prefixID = "[" + getType().name().toLowerCase() + ":" + getID().substring( 0, 5 ) + "] "; 769 770 return prefixID; 771 } 772 }