001/*
002 * Copyright (c) 2016-2017 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
004 *
005 * Project and contact information: http://www.cascading.org/
006 *
007 * This file is part of the Cascading project.
008 *
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *     http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 */
021
022package cascading.stats;
023
024import java.util.Collection;
025import java.util.Collections;
026import java.util.LinkedHashMap;
027import java.util.Map;
028
029import cascading.flow.FlowNode;
030import cascading.management.state.ClientState;
031
032/**
033 *
034 */
035public abstract class BaseCachedNodeStats<Config, JobStatus, Counters> extends FlowNodeStats
036  {
037  protected final Map<String, FlowSliceStats> sliceStatsMap = new LinkedHashMap<>();
038  protected CounterCache<Config, JobStatus, Counters> counterCache;
039
040  protected boolean allChildrenFinished;
041
042  /**
043   * Constructor BaseHadoopNodeStats creates a new BaseHadoopNodeStats instance.
044   *
045   * @param flowNode
046   * @param clientState
047   */
048  protected BaseCachedNodeStats( FlowNode flowNode, ClientState clientState )
049    {
050    super( flowNode, clientState );
051    }
052
053  @Override
054  public long getLastSuccessfulCounterFetchTime()
055    {
056    if( counterCache != null )
057      return counterCache.getLastSuccessfulFetch();
058
059    return -1;
060    }
061
062  public boolean isAllChildrenFinished()
063    {
064    return allChildrenFinished;
065    }
066
067  /**
068   * Method getCounterGroups returns all of the Hadoop counter groups.
069   *
070   * @return the counterGroups (type Collection) of this HadoopStepStats object.
071   */
072  @Override
073  public Collection<String> getCounterGroups()
074    {
075    return counterCache.getCounterGroups();
076    }
077
078  /**
079   * Method getCounterGroupsMatching returns all the Hadoop counter groups that match the give regex pattern.
080   *
081   * @param regex of String
082   * @return Collection
083   */
084  @Override
085  public Collection<String> getCounterGroupsMatching( String regex )
086    {
087    return counterCache.getCounterGroupsMatching( regex );
088    }
089
090  /**
091   * Method getCountersFor returns the Hadoop counters for the given group.
092   *
093   * @param group of String
094   * @return Collection
095   */
096  @Override
097  public Collection<String> getCountersFor( String group )
098    {
099    return counterCache.getCountersFor( group );
100    }
101
102  /**
103   * Method getCounterValue returns the Hadoop counter value for the given counter enum.
104   *
105   * @param counter of Enum
106   * @return long
107   */
108  @Override
109  public long getCounterValue( Enum counter )
110    {
111    return counterCache.getCounterValue( counter );
112    }
113
114  /**
115   * Method getCounterValue returns the Hadoop counter value for the given group and counter name.
116   *
117   * @param group   of String
118   * @param counter of String
119   * @return long
120   */
121  @Override
122  public long getCounterValue( String group, String counter )
123    {
124    return counterCache.getCounterValue( group, counter );
125    }
126
127  protected synchronized Counters cachedCounters( boolean force )
128    {
129    return counterCache.cachedCounters( force );
130    }
131
132  @Override
133  public Collection<FlowSliceStats> getChildren()
134    {
135    synchronized( sliceStatsMap )
136      {
137      return Collections.unmodifiableCollection( sliceStatsMap.values() );
138      }
139    }
140
141  @Override
142  public FlowSliceStats getChildWith( String id )
143    {
144    return sliceStatsMap.get( id );
145    }
146
147  @Override
148  public final void captureDetail( Type depth )
149    {
150    boolean finished = isFinished();
151
152    if( finished && hasCapturedFinalDetail() )
153      return;
154
155    synchronized( this )
156      {
157      if( !getType().isChild( depth ) || !isDetailStale() )
158        return;
159
160      boolean success = captureChildDetailInternal();
161
162      markDetailCaptured(); // always mark to prevent double calls
163
164      if( success )
165        logDebug( "captured remote node statistic details" );
166
167      hasCapturedFinalDetail = finished && success && allChildrenFinished;
168
169      if( allChildrenFinished )
170        logInfo( "all {} children are in finished state, have captured final details: {}", sliceStatsMap.size(), hasCapturedFinalDetail() );
171      }
172    }
173
174  /**
175   * Returns true if was able to capture/refresh the internal child stats cache.
176   *
177   * @return true if successful
178   */
179  protected abstract boolean captureChildDetailInternal();
180
181  /** Synchronized to prevent state changes mid record, #stop may be called out of band */
182  @Override
183  public synchronized void recordChildStats()
184    {
185    try
186      {
187      cachedCounters( true );
188      }
189    catch( Exception exception )
190      {
191      // do nothing
192      }
193
194    if( !clientState.isEnabled() )
195      return;
196
197    captureDetail( Type.ATTEMPT );
198
199    // FlowSliceStats are not full blown Stats types, but implementation specific
200    // so we can't call recordStats/recordChildStats
201    try
202      {
203      // must use the local ID as the stored id, not task id
204      for( FlowSliceStats value : sliceStatsMap.values() )
205        clientState.record( value.getID(), value );
206      }
207    catch( Exception exception )
208      {
209      logError( "unable to record node stats", exception );
210      }
211    }
212  }