001/*
002 * Copyright (c) 2016-2017 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
004 *
005 * Project and contact information: http://www.cascading.org/
006 *
007 * This file is part of the Cascading project.
008 *
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *     http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 */
021
022package cascading.stats;
023
024import java.io.Serializable;
025import java.util.Collection;
026import java.util.LinkedHashSet;
027import java.util.Map;
028import java.util.Set;
029import java.util.concurrent.atomic.AtomicLong;
030
031import cascading.flow.Flow;
032import cascading.management.state.ClientState;
033import cascading.util.ProcessLogger;
034
035/**
036 * Class CascadingStats is the base class for all Cascading statistics gathering. It also reports the status of
037 * core elements that have state.
038 * <p>
039 * There are eight states the stats object reports; PENDING, SKIPPED, STARTED, SUBMITTED, RUNNING, SUCCESSFUL, STOPPED, and FAILED.
040 * <ul>
041 * <li>{@code pending} - when the Flow or Cascade has yet to start.</li>
042 * <li>{@code skipped} - when the Flow was skipped by the parent Cascade.</li>
043 * <li>{@code started} - when {@link cascading.flow.Flow#start()} was called.</li>
044 * <li>{@code submitted} - when the Step was submitted to the underlying platform for work.</li>
045 * <li>{@code running} - when the Flow or Cascade is executing a workload.</li>
046 * <li>{@code stopped} - when the user calls {@link cascading.flow.Flow#stop()} on the Flow or Cascade.</li>
047 * <li>{@code failed} - when the Flow or Cascade threw an error and failed to finish the workload.</li>
048 * <li>{@code successful} - when the Flow or Cascade naturally completed its workload without failure.</li>
049 * </ul>
050 * <p>
051 * CascadingStats also reports four unique timestamps.
052 * <ul>
053 * <li>{@code startTime} - when the {@code start()} method was called.</li>
054 * <li>{@code submitTime} - when the unit of work was actually submitted for execution. Not supported by all sub-classes.</li>
055 * <li>{@code runTime} - when the unit of work actually began to execute work. This value may be affected by any "polling interval" in place.</li>
056 * <li>{@code finishedTime} - when all work has completed successfully, failed, or stopped.</li>
057 * </ul>
058 * <p>
059 * A unit of work is considered {@code finished} when the Flow or Cascade is no longer processing a workload and {@code successful},
060 * {@code skipped}, {@code failed}, or {@code stopped} is true.
061 * <p>
062 * It is important to note all the timestamps are client side observations. Not values reported by the underlying
063 * platform. That said, the transitions are seen by polling the client interface to the underlying platform so are
064 * effected by the {@link cascading.flow.FlowProps#getJobPollingInterval()} value.
065 *
066 * @see CascadeStats
067 * @see FlowStats
068 * @see FlowStepStats
069 */
070public abstract class CascadingStats<Child> implements ProvidesCounters, Serializable
071  {
072  public static final String STATS_STORE_INTERVAL = "cascading.stats.store.interval";
073  public static final String STATS_COMPLETE_CHILD_DETAILS_BLOCK_DURATION = "cascading.stats.complete_child_details.block.duration";
074
075  /**
076   * Method setStatsStoreInterval sets the interval time between store operations against the underlying
077   * document storage services. This affects the rate at which metrics and status information is updated.
078   *
079   * @param properties of type Properties
080   * @param intervalMs milliseconds between storage calls
081   */
082  public static void setStatsStoreInterval( Map<Object, Object> properties, long intervalMs )
083    {
084    if( intervalMs <= 0 )
085      throw new IllegalArgumentException( "interval must be greater than zero, got: " + intervalMs );
086
087    properties.put( STATS_STORE_INTERVAL, Long.toString( intervalMs ) );
088    }
089
090  public enum Type
091    {
092      CASCADE, FLOW, STEP, NODE, SLICE, ATTEMPT;
093
094    public boolean isChild( Type type )
095      {
096      return ordinal() < type.ordinal();
097      }
098    }
099
100  public enum Status
101    {
102      PENDING( false ), SKIPPED( true ), STARTED( false ), SUBMITTED( false ), RUNNING( false ), SUCCESSFUL( true ), STOPPED( true ), FAILED( true );
103
104    boolean isFinished = false; // is this a completed state
105
106    Status( boolean isFinished )
107      {
108      this.isFinished = isFinished;
109      }
110
111    public boolean isFinished()
112      {
113      return isFinished;
114      }
115    }
116
117  private transient String prefixID; // cached sub-string
118
119  /** Field name */
120  protected final String name;
121  protected final ClientState clientState;
122
123  /** Field status */
124  protected Status status = Status.PENDING;
125
126  protected Set<StatsListener> listeners;
127
128  /** Field pendingTime */
129  protected long pendingTime;
130  /** Field startTime */
131  protected long startTime;
132  /** Field submitTime */
133  protected long submitTime;
134  /** Field runTime */
135  protected long runTime;
136  /** Field finishedTime */
137  protected long finishedTime;
138  /** Field throwable */
139  protected Throwable throwable;
140  /** Field throwableTrace */
141  protected String[] throwableTrace;
142
143  protected AtomicLong lastCaptureDetail = new AtomicLong( 0 );
144
145  protected CascadingStats( String name, ClientState clientState )
146    {
147    this.name = name;
148    this.clientState = clientState;
149    }
150
151  /** Method prepare initializes this instance. */
152  public void prepare()
153    {
154    clientState.startService();
155    }
156
157  /** Method cleanup destroys any resources allocated by this instance. */
158  public void cleanup()
159    {
160    clientState.stopService();
161    }
162
163  /**
164   * Method getID returns the ID of this CascadingStats object.
165   *
166   * @return the ID (type Object) of this CascadingStats object.
167   */
168  public abstract String getID();
169
170  /**
171   * Method getName returns the name of this CascadingStats object.
172   *
173   * @return the name (type String) of this CascadingStats object.
174   */
175  public String getName()
176    {
177    return name;
178    }
179
180  public abstract Type getType();
181
182  /**
183   * Method getThrowable returns the throwable of this CascadingStats object.
184   *
185   * @return the throwable (type Throwable) of this CascadingStats object.
186   */
187  public Throwable getThrowable()
188    {
189    return throwable;
190    }
191
192  /**
193   * Method getThrowableTrace returns the throwableTrace of this CascadingStats object.
194   * <p>
195   * Will return null if not set.
196   *
197   * @return the throwableTrace (type String[]) of this CascadingStats object.
198   */
199  public String[] getThrowableTrace()
200    {
201    return throwableTrace;
202    }
203
204  /**
205   * Method isPending returns true if no work has been submitted.
206   *
207   * @return the pending (type boolean) of this CascadingStats object.
208   */
209  public boolean isPending()
210    {
211    return status == Status.PENDING;
212    }
213
214  /**
215   * Method isSkipped returns true when the works was skipped.
216   * <p>
217   * Flows are skipped if the appropriate {@link cascading.flow.FlowSkipStrategy#skipFlow(Flow)}
218   * returns {@code true};
219   *
220   * @return the skipped (type boolean) of this CascadingStats object.
221   */
222  public boolean isSkipped()
223    {
224    return status == Status.SKIPPED;
225    }
226
227  /**
228   * Method isStarted returns true when work has started.
229   *
230   * @return the started (type boolean) of this CascadingStats object.
231   */
232  public boolean isStarted()
233    {
234    return status == Status.STARTED;
235    }
236
237  /**
238   * Method isSubmitted returns true if no work has started.
239   *
240   * @return the submitted (type boolean) of this CascadingStats object.
241   */
242  public boolean isSubmitted()
243    {
244    return status == Status.SUBMITTED;
245    }
246
247  /**
248   * Method isRunning returns true when work has begun.
249   *
250   * @return the running (type boolean) of this CascadingStats object.
251   */
252  public boolean isRunning()
253    {
254    return status == Status.RUNNING;
255    }
256
257  /**
258   * Method isEngaged returns true when there is work being executed, if
259   * {@link #isStarted()}, {@link #isSubmitted()}, or {@link #isRunning()} returns true;
260   *
261   * @return the engaged (type boolean) of this CascadingStats object.
262   */
263  public boolean isEngaged()
264    {
265    return isStarted() || isSubmitted() || isRunning();
266    }
267
268  /**
269   * Method isSuccessful returns true when work has completed successfully.
270   *
271   * @return the completed (type boolean) of this CascadingStats object.
272   */
273  public boolean isSuccessful()
274    {
275    return status == Status.SUCCESSFUL;
276    }
277
278  /**
279   * Method isFailed returns true when the work ended with an error.
280   *
281   * @return the failed (type boolean) of this CascadingStats object.
282   */
283  public boolean isFailed()
284    {
285    return status == Status.FAILED;
286    }
287
288  /**
289   * Method isStopped returns true when the user stopped the work.
290   *
291   * @return the stopped (type boolean) of this CascadingStats object.
292   */
293  public boolean isStopped()
294    {
295    return status == Status.STOPPED;
296    }
297
298  /**
299   * Method isFinished returns true if the current status shows no work currently being executed,
300   * if {@link #isSkipped()}, {@link #isSuccessful()}, {@link #isFailed()}, or {@link #isStopped()} returns true.
301   *
302   * @return the finished (type boolean) of this CascadingStats object.
303   */
304  public boolean isFinished()
305    {
306    return status == Status.SUCCESSFUL || status == Status.FAILED || status == Status.STOPPED || status == Status.SKIPPED;
307    }
308
309  /**
310   * Method getStatus returns the {@link Status} of this CascadingStats object.
311   *
312   * @return the status (type Status) of this CascadingStats object.
313   */
314  public Status getStatus()
315    {
316    return status;
317    }
318
319  /** Method recordStats forces recording of current status information. */
320  public void recordStats()
321    {
322    clientState.recordStats( this );
323    }
324
325  public abstract void recordInfo();
326
327  /** Method markPending sets the status to {@link Status#PENDING}. */
328  public synchronized void markPending()
329    {
330    markPendingTime();
331
332    fireListeners( null, Status.PENDING );
333
334    recordStats();
335    recordInfo();
336    }
337
338  protected void markPendingTime()
339    {
340    if( pendingTime == 0 )
341      pendingTime = System.currentTimeMillis();
342    }
343
344  /**
345   * Method markStartedThenRunning consecutively marks the status as {@link Status#STARTED} then {@link Status#RUNNING}
346   * and forces the start and running time to be equals.
347   */
348  public synchronized void markStartedThenRunning()
349    {
350    if( status != Status.PENDING )
351      throw new IllegalStateException( "may not mark as " + Status.STARTED + ", is already " + status );
352
353    markStartToRunTime();
354    markStarted();
355    markRunning();
356    }
357
358  protected void markStartToRunTime()
359    {
360    startTime = submitTime = runTime = System.currentTimeMillis();
361    }
362
363  /** Method markStarted sets the status to {@link Status#STARTED}. */
364  public synchronized void markStarted()
365    {
366    if( status != Status.PENDING )
367      throw new IllegalStateException( "may not mark as " + Status.STARTED + ", is already " + status );
368
369    Status priorStatus = status;
370    status = Status.STARTED;
371    markStartTime();
372
373    fireListeners( priorStatus, status );
374
375    clientState.start( startTime );
376    clientState.setStatus( status, startTime );
377    recordStats();
378    recordInfo();
379    }
380
381  protected void markStartTime()
382    {
383    if( startTime == 0 )
384      startTime = System.currentTimeMillis();
385    }
386
387  /** Method markSubmitted sets the status to {@link Status#SUBMITTED}. */
388  public synchronized void markSubmitted()
389    {
390    if( status == Status.SUBMITTED )
391      return;
392
393    if( status != Status.STARTED )
394      throw new IllegalStateException( "may not mark as " + Status.SUBMITTED + ", is already " + status );
395
396    Status priorStatus = status;
397    status = Status.SUBMITTED;
398    markSubmitTime();
399
400    fireListeners( priorStatus, status );
401
402    clientState.submit( submitTime );
403    clientState.setStatus( status, submitTime );
404    recordStats();
405    recordInfo();
406    }
407
408  protected void markSubmitTime()
409    {
410    if( submitTime == 0 )
411      submitTime = System.currentTimeMillis();
412    }
413
414  /** Method markRunning sets the status to {@link Status#RUNNING}. */
415  public synchronized void markRunning()
416    {
417    if( status == Status.RUNNING )
418      return;
419
420    if( status != Status.STARTED && status != Status.SUBMITTED )
421      throw new IllegalStateException( "may not mark as " + Status.RUNNING + ", is already " + status );
422
423    Status priorStatus = status;
424    status = Status.RUNNING;
425    markRunTime();
426
427    fireListeners( priorStatus, status );
428
429    clientState.run( runTime );
430    clientState.setStatus( status, runTime );
431    recordStats();
432    recordInfo();
433    }
434
435  protected void markRunTime()
436    {
437    if( runTime == 0 )
438      runTime = System.currentTimeMillis();
439    }
440
441  /** Method markSuccessful sets the status to {@link Status#SUCCESSFUL}. */
442  public synchronized void markSuccessful()
443    {
444    if( status != Status.RUNNING && status != Status.SUBMITTED )
445      throw new IllegalStateException( "may not mark as " + Status.SUCCESSFUL + ", is already " + status );
446
447    Status priorStatus = status;
448    status = Status.SUCCESSFUL;
449    markFinishedTime();
450
451    fireListeners( priorStatus, status );
452
453    clientState.setStatus( status, finishedTime );
454    clientState.stop( finishedTime );
455    recordStats();
456    recordInfo();
457    }
458
459  protected void markFinishedTime()
460    {
461    finishedTime = System.currentTimeMillis();
462    }
463
464  /**
465   * Method markFailed sets the status to {@link Status#FAILED}.
466   */
467  public void markFailed()
468    {
469    markFailed( null, null );
470    }
471
472  /**
473   * Method markFailed sets the status to {@link Status#FAILED}.
474   *
475   * @param throwable of type Throwable
476   */
477  public synchronized void markFailed( Throwable throwable )
478    {
479    markFailed( throwable, null );
480    }
481
482  /**
483   * Method markFailed sets the status to {@link Status#FAILED}.
484   *
485   * @param throwableTrace of type String[]
486   */
487  public synchronized void markFailed( String[] throwableTrace )
488    {
489    markFailed( null, throwableTrace );
490    }
491
492  protected synchronized void markFailed( Throwable throwable, String[] throwableTrace )
493    {
494    if( status != Status.STARTED && status != Status.RUNNING && status != Status.SUBMITTED )
495      throw new IllegalStateException( "may not mark as " + Status.FAILED + ", is already " + status );
496
497    Status priorStatus = status;
498    status = Status.FAILED;
499    markFinishedTime();
500    this.throwable = throwable;
501    this.throwableTrace = throwableTrace;
502
503    fireListeners( priorStatus, status );
504
505    clientState.setStatus( status, finishedTime );
506    clientState.stop( finishedTime );
507    recordStats();
508    recordInfo();
509    }
510
511  /** Method markStopped sets the status to {@link Status#STOPPED}. */
512  public synchronized void markStopped()
513    {
514    if( status != Status.PENDING && status != Status.STARTED && status != Status.SUBMITTED && status != Status.RUNNING )
515      throw new IllegalStateException( "may not mark as " + Status.STOPPED + ", is already " + status );
516
517    Status priorStatus = status;
518    status = Status.STOPPED;
519    markFinishedTime();
520
521    fireListeners( priorStatus, status );
522
523    clientState.setStatus( status, finishedTime );
524    recordStats();
525    recordInfo();
526    clientState.stop( finishedTime );
527    }
528
529  /** Method markSkipped sets the status to {@link Status#SKIPPED}. */
530  public synchronized void markSkipped()
531    {
532    if( status != Status.PENDING )
533      throw new IllegalStateException( "may not mark as " + Status.SKIPPED + ", is already " + status );
534
535    Status priorStatus = status;
536    status = Status.SKIPPED;
537
538    fireListeners( priorStatus, status );
539
540    clientState.setStatus( status, System.currentTimeMillis() );
541    recordStats();
542    recordInfo();
543    }
544
545  /**
546   * Method getPendingTime returns the pendingTime of this CascadingStats object.
547   *
548   * @return the pendingTime (type long) of this CascadingStats object.
549   */
550  public long getPendingTime()
551    {
552    return pendingTime;
553    }
554
555  /**
556   * Method getStartTime returns the startTime of this CascadingStats object.
557   *
558   * @return the startTime (type long) of this CascadingStats object.
559   */
560  public long getStartTime()
561    {
562    return startTime;
563    }
564
565  /**
566   * Method getSubmitTime returns the submitTime of this CascadingStats object.
567   *
568   * @return the submitTime (type long) of this CascadingStats object.
569   */
570  public long getSubmitTime()
571    {
572    return submitTime;
573    }
574
575  /**
576   * Method getRunTime returns the runTime of this CascadingStats object.
577   *
578   * @return the runTime (type long) of this CascadingStats object.
579   */
580  public long getRunTime()
581    {
582    return runTime;
583    }
584
585  /**
586   * Method getFinishedTime returns the finishedTime of this CascadingStats object.
587   *
588   * @return the finishedTime (type long) of this CascadingStats object.
589   */
590  public long getFinishedTime()
591    {
592    return finishedTime;
593    }
594
595  /**
596   * Method getDuration returns the duration the work executed before being finished.
597   * <p>
598   * This method will return zero until the work is finished. See {@link #getCurrentDuration()}
599   * if you wish to poll for the current duration value.
600   * <p>
601   * Duration is calculated as {@code finishedTime - startTime}.
602   *
603   * @return the duration (type long) of this CascadingStats object.
604   */
605  public long getDuration()
606    {
607    if( finishedTime != 0 )
608      return finishedTime - startTime;
609    else
610      return 0;
611    }
612
613  /**
614   * Method getCurrentDuration returns the current duration of the current work whether or not
615   * the work is finished. When finished, the return value will be the same as {@link #getDuration()}.
616   * <p>
617   * Duration is calculated as {@code finishedTime - startTime}.
618   *
619   * @return the currentDuration (type long) of this CascadingStats object.
620   */
621  public long getCurrentDuration()
622    {
623    if( finishedTime != 0 )
624      return finishedTime - startTime;
625    else
626      return System.currentTimeMillis() - startTime;
627    }
628
629  @Override
630  public Collection<String> getCountersFor( Class<? extends Enum> group )
631    {
632    return getCountersFor( group.getName() );
633    }
634
635  /**
636   * Method getCounterGroupsMatching returns all the available counter group names that match
637   * the given regular expression.
638   *
639   * @param regex of type String
640   * @return Collection
641   */
642  public abstract Collection<String> getCounterGroupsMatching( String regex );
643
644  /**
645   * Method captureDetail will recursively capture details about nested systems. Use this method to persist
646   * statistics about a given Cascade, Flow, FlowStep, or FlowNode.
647   * <p>
648   * Each CascadingStats object must be individually inspected for any system specific details.
649   * <p>
650   * Each call to this method will refresh the internal cache unless the current Stats object is marked finished. One
651   * additional refresh will happen after this instance is marked finished.
652   */
653  public void captureDetail()
654    {
655    captureDetail( Type.ATTEMPT );
656    }
657
658  public abstract void captureDetail( Type depth );
659
660  /**
661   * For rate limiting access to the backend.
662   * <p>
663   * Currently used at the Step and below.
664   */
665  protected boolean isDetailStale()
666    {
667    return ( System.currentTimeMillis() - lastCaptureDetail.get() ) > 500;
668    }
669
670  protected void markDetailCaptured()
671    {
672    lastCaptureDetail.set( System.currentTimeMillis() );
673    }
674
675  /**
676   * Method getChildren returns any relevant child statistics instances. They may not be of type CascadingStats, but
677   * instead platform specific.
678   *
679   * @return a Collection of child statistics
680   */
681  public abstract Collection<Child> getChildren();
682
683  /**
684   * Method getChildWith returns a child stats instance with the given ID value.
685   *
686   * @param id the id of a child instance
687   * @return the child stats instance or null if not found
688   */
689  public abstract Child getChildWith( String id );
690
691  public synchronized void addListener( StatsListener statsListener )
692    {
693    if( listeners == null )
694      listeners = new LinkedHashSet<>();
695
696    listeners.add( statsListener );
697    }
698
699  public synchronized boolean removeListener( StatsListener statsListener )
700    {
701    return listeners != null && listeners.remove( statsListener );
702    }
703
704  protected synchronized void fireListeners( CascadingStats.Status fromStatus, CascadingStats.Status toStatus )
705    {
706    if( listeners == null )
707      return;
708
709    for( StatsListener listener : listeners )
710      {
711      try
712        {
713        listener.notify( this, fromStatus, toStatus );
714        }
715      catch( Throwable throwable )
716        {
717        logWarn( "error during listener notification, continuing with remaining listener notification", throwable );
718        }
719      }
720    }
721
722  protected abstract ProcessLogger getProcessLogger();
723
724  protected String getStatsString()
725    {
726    String string = "status=" + status + ", startTime=" + startTime;
727
728    if( finishedTime != 0 )
729      string += ", duration=" + ( finishedTime - startTime );
730
731    return string;
732    }
733
734  @Override
735  public String toString()
736    {
737    return "Cascading{" + getStatsString() + '}';
738    }
739
740  protected void logInfo( String message, Object... arguments )
741    {
742    getProcessLogger().logInfo( getPrefix() + message, arguments );
743    }
744
745  protected void logDebug( String message, Object... arguments )
746    {
747    getProcessLogger().logDebug( getPrefix() + message, arguments );
748    }
749
750  protected void logWarn( String message, Object... arguments )
751    {
752    getProcessLogger().logWarn( getPrefix() + message, arguments );
753    }
754
755  protected void logError( String message, Object... arguments )
756    {
757    getProcessLogger().logError( getPrefix() + message, arguments );
758    }
759
760  protected void logError( String message, Throwable throwable )
761    {
762    getProcessLogger().logError( getPrefix() + message, throwable );
763    }
764
765  protected String getPrefix()
766    {
767    if( prefixID == null )
768      prefixID = "[" + getType().name().toLowerCase() + ":" + getID().substring( 0, 5 ) + "] ";
769
770    return prefixID;
771    }
772  }