001/* 002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.stats; 022 023import java.io.IOException; 024import java.util.Collection; 025import java.util.Collections; 026import java.util.HashSet; 027import java.util.Set; 028import java.util.concurrent.Callable; 029import java.util.concurrent.ExecutionException; 030import java.util.concurrent.ExecutorService; 031import java.util.concurrent.Executors; 032import java.util.concurrent.Future; 033import java.util.concurrent.ThreadFactory; 034import java.util.concurrent.TimeUnit; 035import java.util.concurrent.TimeoutException; 036 037import cascading.flow.FlowException; 038import org.slf4j.Logger; 039import org.slf4j.LoggerFactory; 040 041import static cascading.util.Util.formatDurationFromMillis; 042import static java.lang.System.currentTimeMillis; 043 044/** 045 * 046 */ 047public abstract class CounterCache<Config, JobStatus, Counters> 048 { 049 public static final String COUNTER_TIMEOUT_PROPERTY = "cascading.counter.timeout.seconds"; 050 public static final String COUNTER_FETCH_RETRIES_PROPERTY = "cascading.counter.fetch.retries"; 051 public static final String COUNTER_MAX_AGE_PROPERTY = "cascading.counter.age.max.seconds"; 052 public static final String NODE_COUNTER_MAX_AGE_PROPERTY = "cascading.node.counter.age.max.seconds"; 053 054 public static final int DEFAULT_TIMEOUT_TIMEOUT_SEC = 0; // zero means making the call synchronously 055 public static final int DEFAULT_FETCH_RETRIES = 3; 056 public static final int DEFAULT_CACHED_AGE_MAX = 0; // rely on client interface caching 057 public static final int DEFAULT_NODE_CACHED_AGE_MAX = 30; // don't re-fetch task reports for 30 seconds 058 059 private static final Logger LOG = LoggerFactory.getLogger( CounterCache.class ); 060 061 // hardcoded at one thread to force serialization across all requesters in the jvm 062 // this likely prevents the deadlocks the futures are safeguards against 063 private static ExecutorService futuresPool = Executors.newSingleThreadExecutor( new ThreadFactory() 064 { 065 @Override 066 public Thread newThread( Runnable runnable ) 067 { 068 Thread thread = new Thread( runnable, "stats-counter-future" ); 069 070 thread.setDaemon( true ); 071 072 return thread; 073 } 074 } ); 075 076 private CascadingStats stats; 077 private boolean hasCapturedFinalCounters; 078 private boolean hasAvailableCounters = true; 079 private Counters cachedCounters = null; 080 private long lastFetch = -1; 081 private boolean warnedStale = false; 082 083 protected int maxFetchAttempts; 084 protected int fetchAttempts; 085 protected int timeout; 086 protected int maxAge; 087 088 protected final Config configuration; 089 090 protected CounterCache( CascadingStats stats, Config configuration ) 091 { 092 this.stats = stats; 093 this.configuration = configuration; 094 095 this.timeout = getIntProperty( COUNTER_TIMEOUT_PROPERTY, DEFAULT_TIMEOUT_TIMEOUT_SEC ); 096 this.maxFetchAttempts = getIntProperty( COUNTER_FETCH_RETRIES_PROPERTY, DEFAULT_FETCH_RETRIES ); 097 098 if( stats.getType() == CascadingStats.Type.NODE ) 099 this.maxAge = getIntProperty( NODE_COUNTER_MAX_AGE_PROPERTY, DEFAULT_NODE_CACHED_AGE_MAX ); 100 else 101 this.maxAge = getIntProperty( COUNTER_MAX_AGE_PROPERTY, DEFAULT_CACHED_AGE_MAX ); 102 } 103 104 protected abstract int getIntProperty( String property, int defaultValue ); 105 106 public long getLastSuccessfulFetch() 107 { 108 return lastFetch; 109 } 110 111 protected abstract JobStatus getJobStatusClient(); 112 113 protected abstract boolean areCountersAvailable( JobStatus runningJob ); 114 115 protected abstract Counters getCounters( JobStatus runningJob ) throws IOException; 116 117 protected abstract Collection<String> getGroupNames( Counters counters ); 118 119 protected abstract Set<String> getCountersFor( Counters counters, String group ); 120 121 protected abstract long getCounterValue( Counters counters, Enum counter ); 122 123 protected abstract long getCounterValue( Counters counters, String group, String counter ); 124 125 public Collection<String> getCounterGroups() 126 { 127 Counters counters = cachedCounters(); 128 129 if( counters == null ) 130 return Collections.emptySet(); 131 132 return Collections.unmodifiableCollection( getGroupNames( counters ) ); 133 } 134 135 public Collection<String> getCounterGroupsMatching( String regex ) 136 { 137 Counters counters = cachedCounters(); 138 139 if( counters == null ) 140 return Collections.emptySet(); 141 142 Set<String> results = new HashSet<String>(); 143 144 for( String counter : getGroupNames( counters ) ) 145 { 146 if( counter.matches( regex ) ) 147 results.add( counter ); 148 } 149 150 return Collections.unmodifiableCollection( results ); 151 } 152 153 public Collection<String> getCountersFor( String group ) 154 { 155 Counters counters = cachedCounters(); 156 157 if( counters == null ) 158 return Collections.emptySet(); 159 160 Set<String> results = getCountersFor( counters, group ); 161 162 return Collections.unmodifiableCollection( results ); 163 } 164 165 public long getCounterValue( Enum counter ) 166 { 167 Counters counters = cachedCounters(); 168 169 if( counters == null ) 170 return 0; 171 172 return getCounterValue( counters, counter ); 173 } 174 175 public long getCounterValue( String group, String counter ) 176 { 177 Counters counters = cachedCounters(); 178 179 if( counters == null ) 180 return 0; 181 182 return getCounterValue( counters, group, counter ); 183 } 184 185 public Counters cachedCounters() 186 { 187 return cachedCounters( false ); 188 } 189 190 public synchronized Counters cachedCounters( boolean force ) 191 { 192 if( !hasAvailableCounters ) 193 return cachedCounters; 194 195 // no point in trying again 196 if( fetchAttempts >= maxFetchAttempts ) 197 { 198 if( !hasCapturedFinalCounters && !warnedStale ) 199 { 200 if( cachedCounters == null ) 201 LOG.warn( "no counters fetched, max num consecutive retries reached: {}, type: {}, status: {}", maxFetchAttempts, stats.getType(), stats.getStatus() ); 202 else 203 LOG.warn( "stale counters being returned, max num consecutive retries reached, age: {}, type: {}, status: {}", formatDurationFromMillis( currentTimeMillis() - lastFetch ), stats.getType(), stats.getStatus() ); 204 205 warnedStale = true; 206 } 207 208 return cachedCounters; 209 } 210 211 boolean isProcessFinished = stats.isFinished(); 212 213 // ignore force, no reason to refresh completed stats 214 if( isProcessFinished && hasCapturedFinalCounters ) 215 return cachedCounters; 216 217 // have not capturedFinalCounters - force it 218 if( !force && isProcessFinished ) 219 force = true; 220 221 int currentAge = (int) ( ( lastFetch - currentTimeMillis() ) / 1000 ); 222 223 boolean isStale = currentAge >= maxAge; 224 225 // if we have counters, aren't being forced to update, and values aren't considered stale, return them 226 if( cachedCounters != null && !force && !isStale ) 227 return cachedCounters; 228 229 JobStatus runningJob = getJobStatusClient(); 230 231 if( runningJob == null ) 232 return cachedCounters; 233 234 if( !areCountersAvailable( runningJob ) ) 235 { 236 hasAvailableCounters = false; 237 return cachedCounters; 238 } 239 240 boolean success = false; 241 242 try 243 { 244 Counters fetched = fetchCounters( runningJob ); 245 246 success = fetched != null; 247 248 if( success ) 249 { 250 cachedCounters = fetched; 251 lastFetch = currentTimeMillis(); 252 fetchAttempts = 0; // reset attempt counter, mitigates for transient non-consecutive failures 253 } 254 } 255 catch( InterruptedException exception ) 256 { 257 LOG.warn( "fetching counters was interrupted" ); 258 } 259 catch( ExecutionException exception ) 260 { 261 fetchAttempts++; 262 263 if( fetchAttempts >= maxFetchAttempts ) 264 LOG.error( "fetching counters failed, was final consecutive attempt: {}, type: {}, status: {}", fetchAttempts, stats.getType(), stats.getStatus(), exception.getCause() ); 265 else 266 LOG.warn( "fetching counters failed, consecutive attempts: {}, type: {}, status: {}, message: {}", fetchAttempts, stats.getType(), stats.getStatus(), exception.getCause().getMessage() ); 267 268 if( cachedCounters != null ) 269 { 270 LOG.error( "returning cached values" ); 271 272 return cachedCounters; 273 } 274 275 LOG.error( "unable to get remote counters, no cached values, rethrowing exception", exception.getCause() ); 276 277 if( exception.getCause() instanceof FlowException ) 278 throw (FlowException) exception.getCause(); 279 280 throw new FlowException( exception.getCause() ); 281 } 282 catch( TimeoutException exception ) 283 { 284 fetchAttempts++; 285 286 if( fetchAttempts >= maxFetchAttempts ) 287 LOG.warn( "fetching counters timed out after: {} seconds, was final consecutive attempt: {}, type: {}, status: {}", timeout, fetchAttempts, stats.getType(), stats.getStatus() ); 288 else 289 LOG.warn( "fetching counters timed out after: {} seconds, consecutive attempts: {}, type: {}, status: {}", timeout, fetchAttempts, stats.getType(), stats.getStatus() ); 290 } 291 292 hasCapturedFinalCounters = isProcessFinished && success; 293 294 return cachedCounters; 295 } 296 297 private Counters fetchCounters( JobStatus runningJob ) throws InterruptedException, ExecutionException, TimeoutException 298 { 299 // if timeout greater than zero, perform async call 300 if( timeout > 0 ) 301 return runFuture( runningJob ).get( timeout, TimeUnit.SECONDS ); 302 303 try 304 { 305 return getCounters( runningJob ); 306 } 307 catch( IOException exception ) 308 { 309 throw new FlowException( "unable to get remote counter values", exception ); 310 } 311 } 312 313 private Future<Counters> runFuture( final JobStatus jobStatus ) 314 { 315 Callable<Counters> task = new Callable<Counters>() 316 { 317 @Override 318 public Counters call() throws Exception 319 { 320 try 321 { 322 return getCounters( jobStatus ); 323 } 324 catch( IOException exception ) 325 { 326 throw new FlowException( "unable to get remote counter values", exception ); 327 } 328 } 329 }; 330 331 return futuresPool.submit( task ); 332 } 333 }