001/*
002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.stats;
022
023import java.io.IOException;
024import java.util.Collection;
025import java.util.Collections;
026import java.util.HashSet;
027import java.util.Set;
028import java.util.concurrent.Callable;
029import java.util.concurrent.ExecutionException;
030import java.util.concurrent.ExecutorService;
031import java.util.concurrent.Executors;
032import java.util.concurrent.Future;
033import java.util.concurrent.ThreadFactory;
034import java.util.concurrent.TimeUnit;
035import java.util.concurrent.TimeoutException;
036
037import cascading.flow.FlowException;
038import org.slf4j.Logger;
039import org.slf4j.LoggerFactory;
040
041import static cascading.util.Util.formatDurationFromMillis;
042import static java.lang.System.currentTimeMillis;
043
044/**
045 *
046 */
047public abstract class CounterCache<Config, JobStatus, Counters>
048  {
049  public static final String COUNTER_TIMEOUT_PROPERTY = "cascading.counter.timeout.seconds";
050  public static final String COUNTER_FETCH_RETRIES_PROPERTY = "cascading.counter.fetch.retries";
051  public static final String COUNTER_MAX_AGE_PROPERTY = "cascading.counter.age.max.seconds";
052  public static final String NODE_COUNTER_MAX_AGE_PROPERTY = "cascading.node.counter.age.max.seconds";
053
054  public static final int DEFAULT_TIMEOUT_TIMEOUT_SEC = 0; // zero means making the call synchronously
055  public static final int DEFAULT_FETCH_RETRIES = 3;
056  public static final int DEFAULT_CACHED_AGE_MAX = 0; // rely on client interface caching
057  public static final int DEFAULT_NODE_CACHED_AGE_MAX = 30; // don't re-fetch task reports for 30 seconds
058
059  private static final Logger LOG = LoggerFactory.getLogger( CounterCache.class );
060
061  // hardcoded at one thread to force serialization across all requesters in the jvm
062  // this likely prevents the deadlocks the futures are safeguards against
063  private static ExecutorService futuresPool = Executors.newSingleThreadExecutor( new ThreadFactory()
064  {
065  @Override
066  public Thread newThread( Runnable runnable )
067    {
068    Thread thread = new Thread( runnable, "stats-counter-future" );
069
070    thread.setDaemon( true );
071
072    return thread;
073    }
074  } );
075
076  private CascadingStats stats;
077  private boolean hasCapturedFinalCounters;
078  private boolean hasAvailableCounters = true;
079  private Counters cachedCounters = null;
080  private long lastFetch = -1;
081  private boolean warnedStale = false;
082
083  protected int maxFetchAttempts;
084  protected int fetchAttempts;
085  protected int timeout;
086  protected int maxAge;
087
088  protected final Config configuration;
089
090  protected CounterCache( CascadingStats stats, Config configuration )
091    {
092    this.stats = stats;
093    this.configuration = configuration;
094
095    this.timeout = getIntProperty( COUNTER_TIMEOUT_PROPERTY, DEFAULT_TIMEOUT_TIMEOUT_SEC );
096    this.maxFetchAttempts = getIntProperty( COUNTER_FETCH_RETRIES_PROPERTY, DEFAULT_FETCH_RETRIES );
097
098    if( stats.getType() == CascadingStats.Type.NODE )
099      this.maxAge = getIntProperty( NODE_COUNTER_MAX_AGE_PROPERTY, DEFAULT_NODE_CACHED_AGE_MAX );
100    else
101      this.maxAge = getIntProperty( COUNTER_MAX_AGE_PROPERTY, DEFAULT_CACHED_AGE_MAX );
102    }
103
104  protected abstract int getIntProperty( String property, int defaultValue );
105
106  public long getLastSuccessfulFetch()
107    {
108    return lastFetch;
109    }
110
111  protected abstract JobStatus getJobStatusClient();
112
113  protected abstract boolean areCountersAvailable( JobStatus runningJob );
114
115  protected abstract Counters getCounters( JobStatus runningJob ) throws IOException;
116
117  protected abstract Collection<String> getGroupNames( Counters counters );
118
119  protected abstract Set<String> getCountersFor( Counters counters, String group );
120
121  protected abstract long getCounterValue( Counters counters, Enum counter );
122
123  protected abstract long getCounterValue( Counters counters, String group, String counter );
124
125  public Collection<String> getCounterGroups()
126    {
127    Counters counters = cachedCounters();
128
129    if( counters == null )
130      return Collections.emptySet();
131
132    return Collections.unmodifiableCollection( getGroupNames( counters ) );
133    }
134
135  public Collection<String> getCounterGroupsMatching( String regex )
136    {
137    Counters counters = cachedCounters();
138
139    if( counters == null )
140      return Collections.emptySet();
141
142    Set<String> results = new HashSet<String>();
143
144    for( String counter : getGroupNames( counters ) )
145      {
146      if( counter.matches( regex ) )
147        results.add( counter );
148      }
149
150    return Collections.unmodifiableCollection( results );
151    }
152
153  public Collection<String> getCountersFor( String group )
154    {
155    Counters counters = cachedCounters();
156
157    if( counters == null )
158      return Collections.emptySet();
159
160    Set<String> results = getCountersFor( counters, group );
161
162    return Collections.unmodifiableCollection( results );
163    }
164
165  public long getCounterValue( Enum counter )
166    {
167    Counters counters = cachedCounters();
168
169    if( counters == null )
170      return 0;
171
172    return getCounterValue( counters, counter );
173    }
174
175  public long getCounterValue( String group, String counter )
176    {
177    Counters counters = cachedCounters();
178
179    if( counters == null )
180      return 0;
181
182    return getCounterValue( counters, group, counter );
183    }
184
185  public Counters cachedCounters()
186    {
187    return cachedCounters( false );
188    }
189
190  public synchronized Counters cachedCounters( boolean force )
191    {
192    if( !hasAvailableCounters )
193      return cachedCounters;
194
195    // no point in trying again
196    if( fetchAttempts >= maxFetchAttempts )
197      {
198      if( !hasCapturedFinalCounters && !warnedStale )
199        {
200        if( cachedCounters == null )
201          LOG.warn( "no counters fetched, max num consecutive retries reached: {}, type: {}, status: {}", maxFetchAttempts, stats.getType(), stats.getStatus() );
202        else
203          LOG.warn( "stale counters being returned, max num consecutive retries reached, age: {}, type: {}, status: {}", formatDurationFromMillis( currentTimeMillis() - lastFetch ), stats.getType(), stats.getStatus() );
204
205        warnedStale = true;
206        }
207
208      return cachedCounters;
209      }
210
211    boolean isProcessFinished = stats.isFinished();
212
213    // ignore force, no reason to refresh completed stats
214    if( isProcessFinished && hasCapturedFinalCounters )
215      return cachedCounters;
216
217    // have not capturedFinalCounters - force it
218    if( !force && isProcessFinished )
219      force = true;
220
221    int currentAge = (int) ( ( lastFetch - currentTimeMillis() ) / 1000 );
222
223    boolean isStale = currentAge >= maxAge;
224
225    // if we have counters, aren't being forced to update, and values aren't considered stale, return them
226    if( cachedCounters != null && !force && !isStale )
227      return cachedCounters;
228
229    JobStatus runningJob = getJobStatusClient();
230
231    if( runningJob == null )
232      return cachedCounters;
233
234    if( !areCountersAvailable( runningJob ) )
235      {
236      hasAvailableCounters = false;
237      return cachedCounters;
238      }
239
240    boolean success = false;
241
242    try
243      {
244      Counters fetched = fetchCounters( runningJob );
245
246      success = fetched != null;
247
248      if( success )
249        {
250        cachedCounters = fetched;
251        lastFetch = currentTimeMillis();
252        fetchAttempts = 0; // reset attempt counter, mitigates for transient non-consecutive failures
253        }
254      }
255    catch( InterruptedException exception )
256      {
257      LOG.warn( "fetching counters was interrupted" );
258      }
259    catch( ExecutionException exception )
260      {
261      fetchAttempts++;
262
263      if( fetchAttempts >= maxFetchAttempts )
264        LOG.error( "fetching counters failed, was final consecutive attempt: {}, type: {}, status: {}", fetchAttempts, stats.getType(), stats.getStatus(), exception.getCause() );
265      else
266        LOG.warn( "fetching counters failed, consecutive attempts: {}, type: {}, status: {}, message: {}", fetchAttempts, stats.getType(), stats.getStatus(), exception.getCause().getMessage() );
267
268      if( cachedCounters != null )
269        {
270        LOG.error( "returning cached values" );
271
272        return cachedCounters;
273        }
274
275      LOG.error( "unable to get remote counters, no cached values, rethrowing exception", exception.getCause() );
276
277      if( exception.getCause() instanceof FlowException )
278        throw (FlowException) exception.getCause();
279
280      throw new FlowException( exception.getCause() );
281      }
282    catch( TimeoutException exception )
283      {
284      fetchAttempts++;
285
286      if( fetchAttempts >= maxFetchAttempts )
287        LOG.warn( "fetching counters timed out after: {} seconds, was final consecutive attempt: {}, type: {}, status: {}", timeout, fetchAttempts, stats.getType(), stats.getStatus() );
288      else
289        LOG.warn( "fetching counters timed out after: {} seconds, consecutive attempts: {}, type: {}, status: {}", timeout, fetchAttempts, stats.getType(), stats.getStatus() );
290      }
291
292    hasCapturedFinalCounters = isProcessFinished && success;
293
294    return cachedCounters;
295    }
296
297  private Counters fetchCounters( JobStatus runningJob ) throws InterruptedException, ExecutionException, TimeoutException
298    {
299    // if timeout greater than zero, perform async call
300    if( timeout > 0 )
301      return runFuture( runningJob ).get( timeout, TimeUnit.SECONDS );
302
303    try
304      {
305      return getCounters( runningJob );
306      }
307    catch( IOException exception )
308      {
309      throw new FlowException( "unable to get remote counter values", exception );
310      }
311    }
312
313  private Future<Counters> runFuture( final JobStatus jobStatus )
314    {
315    Callable<Counters> task = new Callable<Counters>()
316    {
317    @Override
318    public Counters call() throws Exception
319      {
320      try
321        {
322        return getCounters( jobStatus );
323        }
324      catch( IOException exception )
325        {
326        throw new FlowException( "unable to get remote counter values", exception );
327        }
328      }
329    };
330
331    return futuresPool.submit( task );
332    }
333  }