001/*
002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.stats;
022
023import java.util.Collection;
024import java.util.Map;
025
026import cascading.flow.FlowStep;
027import cascading.management.state.ClientState;
028import org.slf4j.Logger;
029import org.slf4j.LoggerFactory;
030
031/**
032 *
033 */
034public abstract class BaseCachedStepStats<Configuration, JobStatusClient, Counters> extends FlowStepStats
035  {
036  private static final Logger LOG = LoggerFactory.getLogger( BaseCachedStepStats.class );
037
038  protected CounterCache<Configuration, JobStatusClient, Counters> counterCache;
039
040  public BaseCachedStepStats( FlowStep flowStep, ClientState clientState )
041    {
042    super( flowStep, clientState );
043    }
044
045  /**
046   * Method getRunningJob returns the Hadoop {@link org.apache.hadoop.mapred.RunningJob} managing this Hadoop job.
047   *
048   * @return the runningJob (type RunningJob) of this HadoopStepStats object.
049   */
050  public abstract JobStatusClient getJobStatusClient();
051
052  @Override
053  public long getLastSuccessfulCounterFetchTime()
054    {
055    if( counterCache != null )
056      return counterCache.getLastSuccessfulFetch();
057
058    return -1;
059    }
060
061  /**
062   * Method getCounterGroups returns all of the Hadoop counter groups.
063   *
064   * @return the counterGroups (type Collection<String>) of this HadoopStepStats object.
065   */
066  @Override
067  public Collection<String> getCounterGroups()
068    {
069    return counterCache.getCounterGroups();
070    }
071
072  /**
073   * Method getCounterGroupsMatching returns all the Hadoop counter groups that match the give regex pattern.
074   *
075   * @param regex of String
076   * @return Collection<String>
077   */
078  @Override
079  public Collection<String> getCounterGroupsMatching( String regex )
080    {
081    return counterCache.getCounterGroupsMatching( regex );
082    }
083
084  /**
085   * Method getCountersFor returns the Hadoop counters for the given group.
086   *
087   * @param group of String
088   * @return Collection<String>
089   */
090  @Override
091  public Collection<String> getCountersFor( String group )
092    {
093    return counterCache.getCountersFor( group );
094    }
095
096  /**
097   * Method getCounterValue returns the Hadoop counter value for the given counter enum.
098   *
099   * @param counter of Enum
100   * @return long
101   */
102  @Override
103  public long getCounterValue( Enum counter )
104    {
105    return counterCache.getCounterValue( counter );
106    }
107
108  /**
109   * Method getCounterValue returns the Hadoop counter value for the given group and counter name.
110   *
111   * @param group   of String
112   * @param counter of String
113   * @return long
114   */
115  @Override
116  public long getCounterValue( String group, String counter )
117    {
118    return counterCache.getCounterValue( group, counter );
119    }
120
121  protected synchronized Counters cachedCounters( boolean force )
122    {
123    return counterCache.cachedCounters( force );
124    }
125
126  /** Synchronized to prevent state changes mid record, #stop may be called out of band */
127  @Override
128  public synchronized void recordChildStats()
129    {
130    try
131      {
132      cachedCounters( true );
133      }
134    catch( Exception exception )
135      {
136      // do nothing
137      }
138
139    if( !clientState.isEnabled() )
140      return;
141
142    captureDetail( Type.ATTEMPT );
143
144    try
145      {
146      for( Map.Entry<String, FlowNodeStats> entry : getFlowNodeStatsMap().entrySet() )
147        {
148        entry.getValue().recordStats();
149        entry.getValue().recordChildStats();
150        }
151      }
152    catch( Exception exception )
153      {
154      LOG.error( "unable to record node stats", exception );
155      }
156    }
157  }