001    /*
002     * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.stats;
022    
023    import java.io.Serializable;
024    import java.util.Collection;
025    import java.util.Map;
026    
027    import cascading.flow.Flow;
028    import cascading.management.state.ClientState;
029    
030    /**
031     * Class CascadingStats is the base class for all Cascading statistics gathering. It also reports the status of
032     * core elements that have state.
033     * <p/>
034     * There are eight states the stats object reports; PENDING, SKIPPED, STARTED, SUBMITTED, RUNNING, SUCCESSFUL, STOPPED, and FAILED.
035     * <ul>
036     * <li>{@code pending} - when the Flow or Cascade has yet to start.</li>
037     * <li>{@code skipped} - when the Flow was skipped by the parent Cascade.</li>
038     * <li>{@code started} - when {@link cascading.flow.Flow#start()} was called.</li>
039     * <li>{@code submitted} - when the Step was submitted to the underlying platform for work.</li>
040     * <li>{@code running} - when the Flow or Cascade is executing a workload.</li>
041     * <li>{@code stopped} - when the user calls {@link cascading.flow.Flow#stop()} on the Flow or Cascade.</li>
042     * <li>{@code failed} - when the Flow or Cascade threw an error and failed to finish the workload.</li>
043     * <li>{@code successful} - when the Flow or Cascade naturally completed its workload without failure.</li>
044     * </ul>
045     * <p/>
046     * CascadingStats also reports four unique timestamps.
047     * <ul>
048     * <li>{@code startTime} - when the {@code start()} method was called.</li>
049     * <li>{@code submitTime} - when the unit of work was actually submitted for execution. Not supported by all sub-classes.</li>
050     * <li>{@code runTime} - when the unit of work actually began to execute work. This value may be affected by any "polling interval" in place.</li>
051     * <li>{@code finishedTime} - when all work has completed successfully, failed, or stopped.</li>
052     * </ul>
053     * <p/>
054     * A unit of work is considered {@code finished} when the Flow or Cascade is no longer processing a workload and {@code successful},
055     * {@code failed}, or {@code stopped} is true.
056     *
057     * @see CascadeStats
058     * @see FlowStats
059     * @see FlowStepStats
060     */
061    public abstract class CascadingStats implements Serializable
062      {
063      public static final String STATS_STORE_INTERVAL = "cascading.stats.store.interval";
064    
065      /**
066       * Method setStatsStoreInterval sets the interval time between store operations against the underlying
067       * document storage services. This affects the rate at which metrics and status information is updated.
068       *
069       * @param properties of type Properties
070       * @param intervalMs milliseconds between storage calls
071       */
072      public static void setStatsStoreInterval( Map<Object, Object> properties, long intervalMs )
073        {
074        if( intervalMs <= 0 )
075          throw new IllegalArgumentException( "interval must be greater than zero, got: " + intervalMs );
076    
077        properties.put( STATS_STORE_INTERVAL, Long.toString( intervalMs ) );
078        }
079    
080      public enum Status
081        {
082          PENDING, SKIPPED, STARTED, SUBMITTED, RUNNING, SUCCESSFUL, STOPPED, FAILED
083        }
084    
085      /** Field name */
086      final String name;
087      protected final ClientState clientState;
088    
089      /** Field status */
090      Status status = Status.PENDING;
091    
092      /** Field pendingTime */
093      long pendingTime;
094      /** Field startTime */
095      long startTime;
096      /** Field submitTime */
097      long submitTime;
098      /** Field runTime */
099      long runTime;
100      /** Field finishedTime */
101      long finishedTime;
102      /** Field throwable */
103      Throwable throwable;
104    
105      protected CascadingStats( String name, ClientState clientState )
106        {
107        this.name = name;
108        this.clientState = clientState;
109        }
110    
111      /** Method prepare initializes this instance. */
112      public void prepare()
113        {
114        clientState.startService();
115        }
116    
117      /** Method cleanup destroys any resources allocated by this instance. */
118      public void cleanup()
119        {
120        clientState.stopService();
121        }
122    
123      /**
124       * Method getID returns the ID of this CascadingStats object.
125       *
126       * @return the ID (type Object) of this CascadingStats object.
127       */
128      public abstract String getID();
129    
130      /**
131       * Method getName returns the name of this CascadingStats object.
132       *
133       * @return the name (type String) of this CascadingStats object.
134       */
135      public String getName()
136        {
137        return name;
138        }
139    
140      /**
141       * Method getThrowable returns the throwable of this CascadingStats object.
142       *
143       * @return the throwable (type Throwable) of this CascadingStats object.
144       */
145      public Throwable getThrowable()
146        {
147        return throwable;
148        }
149    
150      /**
151       * Method isPending returns true if no work has been submitted.
152       *
153       * @return the pending (type boolean) of this CascadingStats object.
154       */
155      public boolean isPending()
156        {
157        return status == Status.PENDING;
158        }
159    
160      /**
161       * Method isSkipped returns true when the works was skipped.
162       * <p/>
163       * Flows are skipped if the appropriate {@link cascading.flow.FlowSkipStrategy#skipFlow(Flow)}
164       * returns {@code true};
165       *
166       * @return the skipped (type boolean) of this CascadingStats object.
167       */
168      public boolean isSkipped()
169        {
170        return status == Status.SKIPPED;
171        }
172    
173      /**
174       * Method isStarted returns true when work has started.
175       *
176       * @return the started (type boolean) of this CascadingStats object.
177       */
178      public boolean isStarted()
179        {
180        return status == Status.STARTED;
181        }
182    
183      /**
184       * Method isSubmitted returns true if no work has started.
185       *
186       * @return the submitted (type boolean) of this CascadingStats object.
187       */
188      public boolean isSubmitted()
189        {
190        return status == Status.SUBMITTED;
191        }
192    
193      /**
194       * Method isRunning returns true when work has begun.
195       *
196       * @return the running (type boolean) of this CascadingStats object.
197       */
198      public boolean isRunning()
199        {
200        return status == Status.RUNNING;
201        }
202    
203      /**
204       * Method isEngaged returns true when there is work being executed, if
205       * {@link #isStarted()}, {@link #isSubmitted()}, or {@link #isRunning()} returns true;
206       *
207       * @return the engaged (type boolean) of this CascadingStats object.
208       */
209      public boolean isEngaged()
210        {
211        return isStarted() || isSubmitted() || isRunning();
212        }
213    
214      /**
215       * Method isSuccessful returns true when work has completed successfully.
216       *
217       * @return the completed (type boolean) of this CascadingStats object.
218       */
219      public boolean isSuccessful()
220        {
221        return status == Status.SUCCESSFUL;
222        }
223    
224      /**
225       * Method isFailed returns true when the work ended with an error.
226       *
227       * @return the failed (type boolean) of this CascadingStats object.
228       */
229      public boolean isFailed()
230        {
231        return status == Status.FAILED;
232        }
233    
234      /**
235       * Method isStopped returns true when the user stopped the work.
236       *
237       * @return the stopped (type boolean) of this CascadingStats object.
238       */
239      public boolean isStopped()
240        {
241        return status == Status.STOPPED;
242        }
243    
244      /**
245       * Method isFinished returns true if the current status shows no work currently being executed,
246       * if {@link #isSkipped()}, {@link #isSuccessful()}, {@link #isFailed()}, or {@link #isStopped()} returns true.
247       *
248       * @return the finished (type boolean) of this CascadingStats object.
249       */
250      public boolean isFinished()
251        {
252        return status == Status.SUCCESSFUL || status == Status.FAILED || status == Status.STOPPED || status == Status.SKIPPED;
253        }
254    
255      /**
256       * Method getStatus returns the {@link Status} of this CascadingStats object.
257       *
258       * @return the status (type Status) of this CascadingStats object.
259       */
260      public Status getStatus()
261        {
262        return status;
263        }
264    
265      /** Method recordStats forces recording of current status information. */
266      public void recordStats()
267        {
268        this.clientState.recordStats( this );
269        }
270    
271      public abstract void recordInfo();
272    
273      /** Method markPending sets the status to {@link Status#PENDING}. */
274      public synchronized void markPending()
275        {
276        markPendingTime();
277        recordStats();
278        recordInfo();
279        }
280    
281      protected void markPendingTime()
282        {
283        if( pendingTime == 0 )
284          pendingTime = System.currentTimeMillis();
285        }
286    
287      /**
288       * Method markStartedThenRunning consecutively marks the status as {@link Status#STARTED} then {@link Status#RUNNING}
289       * and forces the start and running time to be equals.
290       */
291      public synchronized void markStartedThenRunning()
292        {
293        if( status != Status.PENDING )
294          throw new IllegalStateException( "may not mark as " + Status.STARTED + ", is already " + status );
295    
296        markStartToRunTime();
297        markStarted();
298        markRunning();
299        }
300    
301      protected void markStartToRunTime()
302        {
303        startTime = submitTime = runTime = System.currentTimeMillis();
304        }
305    
306      /** Method markStarted sets the status to {@link Status#STARTED}. */
307      public synchronized void markStarted()
308        {
309        if( status != Status.PENDING )
310          throw new IllegalStateException( "may not mark as " + Status.STARTED + ", is already " + status );
311    
312        status = Status.STARTED;
313        markStartTime();
314    
315        clientState.start( startTime );
316        clientState.setStatus( status, startTime );
317        recordStats();
318        }
319    
320      protected void markStartTime()
321        {
322        if( startTime == 0 )
323          startTime = System.currentTimeMillis();
324        }
325    
326      /** Method markSubmitted sets the status to {@link Status#SUBMITTED}. */
327      public synchronized void markSubmitted()
328        {
329        if( status == Status.SUBMITTED )
330          return;
331    
332        if( status != Status.STARTED )
333          throw new IllegalStateException( "may not mark as " + Status.SUBMITTED + ", is already " + status );
334    
335        status = Status.SUBMITTED;
336        markSubmitTime();
337    
338        clientState.submit( submitTime );
339        clientState.setStatus( status, submitTime );
340        recordStats();
341        recordInfo();
342        }
343    
344      protected void markSubmitTime()
345        {
346        if( submitTime == 0 )
347          submitTime = System.currentTimeMillis();
348        }
349    
350      /** Method markRunning sets the status to {@link Status#RUNNING}. */
351      public synchronized void markRunning()
352        {
353        if( status == Status.RUNNING )
354          return;
355    
356        if( status != Status.STARTED && status != Status.SUBMITTED )
357          throw new IllegalStateException( "may not mark as " + Status.RUNNING + ", is already " + status );
358    
359        status = Status.RUNNING;
360        markRunTime();
361    
362        clientState.run( runTime );
363        clientState.setStatus( status, runTime );
364        recordStats();
365        }
366    
367      protected void markRunTime()
368        {
369        if( runTime == 0 )
370          runTime = System.currentTimeMillis();
371        }
372    
373      /** Method markSuccessful sets the status to {@link Status#SUCCESSFUL}. */
374      public synchronized void markSuccessful()
375        {
376        if( status != Status.RUNNING && status != Status.SUBMITTED )
377          throw new IllegalStateException( "may not mark as " + Status.SUCCESSFUL + ", is already " + status );
378    
379        status = Status.SUCCESSFUL;
380        markFinishedTime();
381    
382        clientState.setStatus( status, finishedTime );
383        clientState.stop( finishedTime );
384        recordStats();
385        recordInfo();
386        }
387    
388      private void markFinishedTime()
389        {
390        finishedTime = System.currentTimeMillis();
391        }
392    
393      /**
394       * Method markFailed sets the status to {@link Status#FAILED}.
395       *
396       * @param throwable of type Throwable
397       */
398      public synchronized void markFailed( Throwable throwable )
399        {
400        if( status != Status.STARTED && status != Status.RUNNING && status != Status.SUBMITTED )
401          throw new IllegalStateException( "may not mark as " + Status.FAILED + ", is already " + status );
402    
403        status = Status.FAILED;
404        markFinishedTime();
405        this.throwable = throwable;
406    
407        clientState.setStatus( status, finishedTime );
408        clientState.stop( finishedTime );
409        recordStats();
410        recordInfo();
411        }
412    
413      /** Method markStopped sets the status to {@link Status#STOPPED}. */
414      public synchronized void markStopped()
415        {
416        if( status != Status.PENDING && status != Status.STARTED && status != Status.SUBMITTED && status != Status.RUNNING )
417          throw new IllegalStateException( "may not mark as " + Status.STOPPED + ", is already " + status );
418    
419        status = Status.STOPPED;
420        markFinishedTime();
421    
422        clientState.setStatus( status, finishedTime );
423        recordStats();
424        recordInfo();
425        clientState.stop( finishedTime );
426        }
427    
428      /** Method markSkipped sets the status to {@link Status#SKIPPED}. */
429      public synchronized void markSkipped()
430        {
431        if( status != Status.PENDING )
432          throw new IllegalStateException( "may not mark as " + Status.SKIPPED + ", is already " + status );
433    
434        status = Status.SKIPPED;
435    
436        clientState.setStatus( status, System.currentTimeMillis() );
437        recordStats();
438        }
439    
440      /**
441       * Method getPendingTime returns the pendingTime of this CascadingStats object.
442       *
443       * @return the pendingTime (type long) of this CascadingStats object.
444       */
445      public long getPendingTime()
446        {
447        return pendingTime;
448        }
449    
450      /**
451       * Method getStartTime returns the startTime of this CascadingStats object.
452       *
453       * @return the startTime (type long) of this CascadingStats object.
454       */
455      public long getStartTime()
456        {
457        return startTime;
458        }
459    
460      /**
461       * Method getSubmitTime returns the submitTime of this CascadingStats object.
462       *
463       * @return the submitTime (type long) of this CascadingStats object.
464       */
465      public long getSubmitTime()
466        {
467        return submitTime;
468        }
469    
470      /**
471       * Method getRunTime returns the runTime of this CascadingStats object.
472       *
473       * @return the runTime (type long) of this CascadingStats object.
474       */
475      public long getRunTime()
476        {
477        return runTime;
478        }
479    
480      /**
481       * Method getFinishedTime returns the finishedTime of this CascadingStats object.
482       *
483       * @return the finishedTime (type long) of this CascadingStats object.
484       */
485      public long getFinishedTime()
486        {
487        return finishedTime;
488        }
489    
490      /**
491       * Method getDuration returns the duration the work executed before being finished.
492       * <p/>
493       * This method will return zero until the work is finished. See {@link #getCurrentDuration()}
494       * if you wish to poll for the current duration value.
495       * <p/>
496       * Duration is calculated as {@code finishedTime - startTime}.
497       *
498       * @return the duration (type long) of this CascadingStats object.
499       */
500      public long getDuration()
501        {
502        if( finishedTime != 0 )
503          return finishedTime - startTime;
504        else
505          return 0;
506        }
507    
508      /**
509       * Method getCurrentDuration returns the current duration of the current work whether or not
510       * the work is finished. When finished, the return value will be the same as {@link #getDuration()}.
511       * <p/>
512       * Duration is calculated as {@code finishedTime - startTime}.
513       *
514       * @return the currentDuration (type long) of this CascadingStats object.
515       */
516      public long getCurrentDuration()
517        {
518        if( finishedTime != 0 )
519          return finishedTime - startTime;
520        else
521          return System.currentTimeMillis() - startTime;
522        }
523    
524      /**
525       * Method getCounterGroups returns all the available counter group names.
526       *
527       * @return the counterGroups (type Collection<String>) of this CascadingStats object.
528       */
529      public abstract Collection<String> getCounterGroups();
530    
531      /**
532       * Method getCounterGroupsMatching returns all the available counter group names that match
533       * the given regular expression.
534       *
535       * @param regex of type String
536       * @return Collection<String>
537       */
538      public abstract Collection<String> getCounterGroupsMatching( String regex );
539    
540      /**
541       * Method getCountersFor returns all the counter names for the give group name.
542       *
543       * @param group
544       * @return Collection<String>
545       */
546      public abstract Collection<String> getCountersFor( String group );
547    
548      /**
549       * Method getCountersFor returns all the counter names for the counter Enum.
550       *
551       * @param group
552       * @return Collection<String>
553       */
554      public Collection<String> getCountersFor( Class<? extends Enum> group )
555        {
556        return getCountersFor( group.getName() );
557        }
558    
559      /**
560       * Method getCounter returns the current value for the given counter Enum.
561       *
562       * @param counter of type Enum
563       * @return the current counter value
564       */
565      public abstract long getCounterValue( Enum counter );
566    
567      /**
568       * Method getCounter returns the current value for the given group and counter.
569       *
570       * @param group   of type String
571       * @param counter of type String
572       * @return the current counter value
573       */
574      public abstract long getCounterValue( String group, String counter );
575    
576      /**
577       * Method captureDetail will recursively capture details about nested systems. Use this method to persist
578       * statistics about a given Cascade, Flow, or FlowStep.
579       * <p/>
580       * Each CascadingStats object must be individually inspected for any system specific details.
581       */
582      public abstract void captureDetail();
583    
584      /**
585       * Method getChildren returns any relevant child statistics instances. They may not be of type CascadingStats, but
586       * instead platform specific.
587       *
588       * @return a Collection of child statistics
589       */
590      public abstract Collection getChildren();
591    
592      protected String getStatsString()
593        {
594        String string = "status=" + status + ", startTime=" + startTime;
595    
596        if( finishedTime != 0 )
597          string += ", duration=" + ( finishedTime - startTime );
598    
599        return string;
600        }
601    
602      @Override
603      public String toString()
604        {
605        return "Cascading{" + getStatsString() + '}';
606        }
607      }