001/*
002 * Copyright (c) 2016-2017 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
004 *
005 * Project and contact information: http://www.cascading.org/
006 *
007 * This file is part of the Cascading project.
008 *
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *     http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 */
021
022package cascading.stats;
023
024import java.util.Collection;
025import java.util.Map;
026
027import cascading.flow.FlowStep;
028import cascading.management.state.ClientState;
029import org.slf4j.Logger;
030import org.slf4j.LoggerFactory;
031
032/**
033 *
034 */
035public abstract class BaseCachedStepStats<Configuration, JobStatusClient, Counters> extends FlowStepStats
036  {
037  private static final Logger LOG = LoggerFactory.getLogger( BaseCachedStepStats.class );
038
039  protected CounterCache<Configuration, JobStatusClient, Counters> counterCache;
040
041  public BaseCachedStepStats( FlowStep flowStep, ClientState clientState )
042    {
043    super( flowStep, clientState );
044    }
045
046  /**
047   * Method getRunningJob returns the Hadoop {@link org.apache.hadoop.mapred.RunningJob} managing this Hadoop job.
048   *
049   * @return the runningJob (type RunningJob) of this HadoopStepStats object.
050   */
051  public abstract JobStatusClient getJobStatusClient();
052
053  @Override
054  public long getLastSuccessfulCounterFetchTime()
055    {
056    if( counterCache != null )
057      return counterCache.getLastSuccessfulFetch();
058
059    return -1;
060    }
061
062  /**
063   * Method getCounterGroups returns all of the Hadoop counter groups.
064   *
065   * @return the counterGroups (type Collection) of this HadoopStepStats object.
066   */
067  @Override
068  public Collection<String> getCounterGroups()
069    {
070    return counterCache.getCounterGroups();
071    }
072
073  /**
074   * Method getCounterGroupsMatching returns all the Hadoop counter groups that match the give regex pattern.
075   *
076   * @param regex of String
077   * @return Collection
078   */
079  @Override
080  public Collection<String> getCounterGroupsMatching( String regex )
081    {
082    return counterCache.getCounterGroupsMatching( regex );
083    }
084
085  /**
086   * Method getCountersFor returns the Hadoop counters for the given group.
087   *
088   * @param group of String
089   * @return Collection
090   */
091  @Override
092  public Collection<String> getCountersFor( String group )
093    {
094    return counterCache.getCountersFor( group );
095    }
096
097  /**
098   * Method getCounterValue returns the Hadoop counter value for the given counter enum.
099   *
100   * @param counter of Enum
101   * @return long
102   */
103  @Override
104  public long getCounterValue( Enum counter )
105    {
106    return counterCache.getCounterValue( counter );
107    }
108
109  /**
110   * Method getCounterValue returns the Hadoop counter value for the given group and counter name.
111   *
112   * @param group   of String
113   * @param counter of String
114   * @return long
115   */
116  @Override
117  public long getCounterValue( String group, String counter )
118    {
119    return counterCache.getCounterValue( group, counter );
120    }
121
122  protected synchronized Counters cachedCounters( boolean force )
123    {
124    return counterCache.cachedCounters( force );
125    }
126
127  /** Synchronized to prevent state changes mid record, #stop may be called out of band */
128  @Override
129  public synchronized void recordChildStats()
130    {
131    try
132      {
133      cachedCounters( true );
134      }
135    catch( Exception exception )
136      {
137      // do nothing
138      }
139
140    if( !clientState.isEnabled() )
141      return;
142
143    captureDetail( Type.ATTEMPT );
144
145    try
146      {
147      for( Map.Entry<String, FlowNodeStats> entry : getFlowNodeStatsMap().entrySet() )
148        {
149        entry.getValue().recordStats();
150        entry.getValue().recordChildStats();
151        }
152      }
153    catch( Exception exception )
154      {
155      LOG.error( "unable to record node stats", exception );
156      }
157    }
158  }