001 /* 002 * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.stats.hadoop; 022 023 import java.io.IOException; 024 import java.util.Collection; 025 import java.util.Collections; 026 import java.util.HashMap; 027 import java.util.HashSet; 028 import java.util.Map; 029 import java.util.Set; 030 import java.util.concurrent.Callable; 031 import java.util.concurrent.ExecutionException; 032 import java.util.concurrent.ExecutorService; 033 import java.util.concurrent.Executors; 034 import java.util.concurrent.Future; 035 import java.util.concurrent.ThreadFactory; 036 import java.util.concurrent.TimeUnit; 037 import java.util.concurrent.TimeoutException; 038 039 import cascading.flow.FlowException; 040 import cascading.flow.FlowStep; 041 import cascading.management.state.ClientState; 042 import cascading.stats.FlowStepStats; 043 import org.apache.hadoop.mapred.Counters; 044 import org.apache.hadoop.mapred.JobClient; 045 import org.apache.hadoop.mapred.JobConf; 046 import org.apache.hadoop.mapred.RunningJob; 047 import org.slf4j.Logger; 048 import org.slf4j.LoggerFactory; 049 050 /** Class BaseHadoopStepStats is a base class to Hadoop specific statistics and methods to underlying Hadoop facilities. */ 051 public abstract class BaseHadoopStepStats extends FlowStepStats 052 { 053 public static final String COUNTER_TIMEOUT_PROPERTY = "cascading.step.counter.timeout"; 054 055 /** Field LOG */ 056 private static final Logger LOG = LoggerFactory.getLogger( BaseHadoopStepStats.class ); 057 058 public static final int TIMEOUT_MAX = 3; 059 060 /** Field numMapTasks */ 061 int numMapTasks; 062 /** Field numReducerTasks */ 063 int numReduceTasks; 064 065 /** Fields counters */ 066 private Counters cachedCounters = null; 067 068 /** Fields timeouts */ 069 private int timeouts; 070 071 /** Field taskStats */ 072 Map<String, HadoopSliceStats> taskStats = (Map<String, HadoopSliceStats>) Collections.EMPTY_MAP; 073 074 protected BaseHadoopStepStats( FlowStep<JobConf> flowStep, ClientState clientState ) 075 { 076 super( flowStep, clientState ); 077 } 078 079 /** 080 * Method getTaskStats returns the taskStats of this HadoopStepStats object. 081 * 082 * @return the taskStats (type ArrayList<HadoopTaskStats>) of this HadoopStepStats object. 083 */ 084 public Map<String, HadoopSliceStats> getTaskStats() 085 { 086 return taskStats; 087 } 088 089 protected void setTaskStats( Map<String, HadoopSliceStats> taskStats ) 090 { 091 this.taskStats = taskStats; 092 } 093 094 /** 095 * Method getNumMapTasks returns the numMapTasks from the Hadoop job file. 096 * 097 * @return the numMapTasks (type int) of this HadoopStepStats object. 098 */ 099 public int getNumMapTasks() 100 { 101 return numMapTasks; 102 } 103 104 void setNumMapTasks( int numMapTasks ) 105 { 106 this.numMapTasks = numMapTasks; 107 } 108 109 /** 110 * Method getNumReduceTasks returns the numReducerTasks from the Hadoop job file. 111 * 112 * @return the numReducerTasks (type int) of this HadoopStepStats object. 113 */ 114 public int getNumReduceTasks() 115 { 116 return numReduceTasks; 117 } 118 119 void setNumReduceTasks( int numReduceTasks ) 120 { 121 this.numReduceTasks = numReduceTasks; 122 } 123 124 /** 125 * Method getJobID returns the Hadoop running job JobID. 126 * 127 * @return the jobID (type String) of this HadoopStepStats object. 128 */ 129 public String getJobID() 130 { 131 if( getRunningJob() == null ) 132 return null; 133 134 return getRunningJob().getJobID(); 135 } 136 137 /** 138 * Method getJobClient returns the Hadoop {@link JobClient} managing this Hadoop job. 139 * 140 * @return the jobClient (type JobClient) of this HadoopStepStats object. 141 */ 142 public abstract JobClient getJobClient(); 143 144 /** 145 * Method getRunningJob returns the Hadoop {@link RunningJob} managing this Hadoop job. 146 * 147 * @return the runningJob (type RunningJob) of this HadoopStepStats object. 148 */ 149 public abstract RunningJob getRunningJob(); 150 151 /** 152 * Method getCounterGroups returns all of the Hadoop counter groups. 153 * 154 * @return the counterGroups (type Collection<String>) of this HadoopStepStats object. 155 */ 156 @Override 157 public Collection<String> getCounterGroups() 158 { 159 Counters counters = cachedCounters(); 160 161 if( counters == null ) 162 return Collections.emptySet(); 163 164 return Collections.unmodifiableCollection( counters.getGroupNames() ); 165 } 166 167 /** 168 * Method getCounterGroupsMatching returns all the Hadoop counter groups that match the give regex pattern. 169 * 170 * @param regex of String 171 * @return Collection<String> 172 */ 173 @Override 174 public Collection<String> getCounterGroupsMatching( String regex ) 175 { 176 Counters counters = cachedCounters(); 177 178 if( counters == null ) 179 return Collections.emptySet(); 180 181 Set<String> results = new HashSet<String>(); 182 183 for( String counter : counters.getGroupNames() ) 184 { 185 if( counter.matches( regex ) ) 186 results.add( counter ); 187 } 188 189 return Collections.unmodifiableCollection( results ); 190 } 191 192 /** 193 * Method getCountersFor returns the Hadoop counters for the given group. 194 * 195 * @param group of String 196 * @return Collection<String> 197 */ 198 @Override 199 public Collection<String> getCountersFor( String group ) 200 { 201 Counters counters = cachedCounters(); 202 203 if( counters == null ) 204 return Collections.emptySet(); 205 206 Set<String> results = new HashSet<String>(); 207 208 for( Counters.Counter counter : counters.getGroup( group ) ) 209 results.add( counter.getName() ); 210 211 return Collections.unmodifiableCollection( results ); 212 } 213 214 /** 215 * Method getCounterValue returns the Hadoop counter value for the given counter enum. 216 * 217 * @param counter of Enum 218 * @return long 219 */ 220 @Override 221 public long getCounterValue( Enum counter ) 222 { 223 Counters counters = cachedCounters(); 224 225 if( counters == null ) 226 return 0; 227 228 return counters.getCounter( counter ); 229 } 230 231 /** 232 * Method getCounterValue returns the Hadoop counter value for the given group and counter name. 233 * 234 * @param group of String 235 * @param counter of String 236 * @return long 237 */ 238 @Override 239 public long getCounterValue( String group, String counter ) 240 { 241 Counters counters = cachedCounters(); 242 243 if( counters == null ) 244 return 0; 245 246 Counters.Group counterGroup = counters.getGroup( group ); 247 248 if( group == null ) 249 return 0; 250 251 // geCounter actually searches the display name, wtf 252 // in theory this is lazily created if does not exist, but don't rely on it 253 Counters.Counter counterValue = counterGroup.getCounterForName( counter ); 254 255 if( counter == null ) 256 return 0; 257 258 return counterValue.getValue(); 259 } 260 261 protected Counters cachedCounters() 262 { 263 return cachedCounters( false ); 264 } 265 266 protected synchronized Counters cachedCounters( boolean force ) 267 { 268 if( !force && ( isFinished() || timeouts >= TIMEOUT_MAX ) ) 269 return cachedCounters; 270 271 RunningJob runningJob = getRunningJob(); 272 273 if( runningJob == null ) 274 return cachedCounters; 275 276 Future<Counters> future = runFuture( runningJob ); 277 278 int timeout = ( (JobConf) getFlowStep().getConfig() ).getInt( COUNTER_TIMEOUT_PROPERTY, 5 ); 279 280 try 281 { 282 Counters fetched = future.get( timeout, TimeUnit.SECONDS ); 283 284 if( fetched != null ) 285 cachedCounters = fetched; 286 } 287 catch( InterruptedException exception ) 288 { 289 LOG.warn( "fetching counters was interrupted" ); 290 } 291 catch( ExecutionException exception ) 292 { 293 if( cachedCounters != null ) 294 { 295 LOG.error( "unable to get remote counters, returning cached values", exception.getCause() ); 296 297 return cachedCounters; 298 } 299 300 LOG.error( "unable to get remote counters, no cached values, throwing exception", exception.getCause() ); 301 302 if( exception.getCause() instanceof FlowException ) 303 throw (FlowException) exception.getCause(); 304 305 throw new FlowException( exception.getCause() ); 306 } 307 catch( TimeoutException exception ) 308 { 309 timeouts++; 310 311 if( timeouts >= TIMEOUT_MAX ) 312 LOG.warn( "fetching counters timed out after: {} seconds, final attempt: {}", timeout, timeouts ); 313 else 314 LOG.warn( "fetching counters timed out after: {} seconds, attempts: {}", timeout, timeouts ); 315 } 316 317 return cachedCounters; 318 } 319 320 // hardcoded at one thread to force serialization across all requesters in the jvm 321 // this likely prevents the deadlocks the futures are safeguards against 322 private static ExecutorService futuresPool = Executors.newSingleThreadExecutor( new ThreadFactory() 323 { 324 @Override 325 public Thread newThread( Runnable runnable ) 326 { 327 Thread thread = new Thread( runnable, "stats-futures" ); 328 329 thread.setDaemon( true ); 330 331 return thread; 332 } 333 } ); 334 335 private Future<Counters> runFuture( final RunningJob runningJob ) 336 { 337 Callable<Counters> task = new Callable<Counters>() 338 { 339 @Override 340 public Counters call() throws Exception 341 { 342 try 343 { 344 return runningJob.getCounters(); 345 } 346 catch( IOException exception ) 347 { 348 throw new FlowException( "unable to get remote counter values" ); 349 } 350 } 351 }; 352 353 return futuresPool.submit( task ); 354 } 355 356 /** 357 * Returns the underlying Map tasks progress percentage. 358 * <p/> 359 * This method is experimental. 360 * 361 * @return float 362 */ 363 public float getMapProgress() 364 { 365 RunningJob runningJob = getRunningJob(); 366 367 if( runningJob == null ) 368 return 0; 369 370 try 371 { 372 return runningJob.mapProgress(); 373 } 374 catch( IOException exception ) 375 { 376 throw new FlowException( "unable to get progress" ); 377 } 378 } 379 380 /** 381 * Returns the underlying Reduce tasks progress percentage. 382 * <p/> 383 * This method is experimental. 384 * 385 * @return float 386 */ 387 public float getReduceProgress() 388 { 389 RunningJob runningJob = getRunningJob(); 390 391 if( runningJob == null ) 392 return 0; 393 394 try 395 { 396 return runningJob.reduceProgress(); 397 } 398 catch( IOException exception ) 399 { 400 throw new FlowException( "unable to get progress" ); 401 } 402 } 403 404 public String getStatusURL() 405 { 406 RunningJob runningJob = getRunningJob(); 407 408 if( runningJob == null ) 409 return null; 410 411 return runningJob.getTrackingURL(); 412 } 413 414 /** 415 * Method getChildren returns the children of this HadoopStepStats object. 416 * 417 * @return the children (type Collection) of this HadoopStepStats object. 418 */ 419 @Override 420 public Collection getChildren() 421 { 422 return getTaskStats().values(); 423 } 424 425 public Set<String> getChildIDs() 426 { 427 return getTaskStats().keySet(); 428 } 429 430 /** Synchronized to prevent state changes mid record, #stop may be called out of band */ 431 @Override 432 public synchronized void recordChildStats() 433 { 434 try 435 { 436 cachedCounters( true ); 437 } 438 catch( Exception exception ) 439 { 440 // do nothing 441 } 442 443 // if null instance don't bother capturing detail 444 if( !clientState.isEnabled() ) 445 return; 446 447 captureDetail(); 448 449 try 450 { 451 for( String id : taskStats.keySet() ) 452 clientState.record( id, taskStats.get( id ) ); 453 } 454 catch( Exception exception ) 455 { 456 LOG.error( "unable to record slice stats", exception ); 457 } 458 } 459 460 /** Method captureDetail captures statistics task details and completion events. */ 461 @Override 462 public synchronized void captureDetail() 463 { 464 captureDetail( true ); 465 } 466 467 public void captureDetail( boolean captureAttempts ) 468 { 469 Map<String, HadoopSliceStats> newStats = new HashMap<String, HadoopSliceStats>(); 470 471 JobClient jobClient = getJobClient(); 472 RunningJob runningJob = getRunningJob(); 473 474 if( jobClient == null || runningJob == null ) 475 return; 476 477 numMapTasks = 0; 478 numReduceTasks = 0; 479 480 try 481 { 482 // cleanup/setup tasks have no useful info so far. 483 // addTaskStats( newStats, HadoopSliceStats.Kind.SETUP, false ); 484 // addTaskStats( newStats, HadoopSliceStats.Kind.CLEANUP, false ); 485 addTaskStats( newStats, HadoopSliceStats.Kind.MAPPER, false ); 486 addTaskStats( newStats, HadoopSliceStats.Kind.REDUCER, false ); 487 488 addAttemptsToTaskStats( newStats, captureAttempts ); 489 490 setTaskStats( newStats ); 491 } 492 catch( IOException exception ) 493 { 494 LOG.warn( "unable to get task stats", exception ); 495 } 496 } 497 498 boolean stepHasReducers() 499 { 500 return !getFlowStep().getGroups().isEmpty(); 501 } 502 503 void incrementKind( HadoopSliceStats.Kind kind ) 504 { 505 switch( kind ) 506 { 507 case SETUP: 508 break; 509 case MAPPER: 510 numMapTasks++; 511 break; 512 case REDUCER: 513 numReduceTasks++; 514 break; 515 case CLEANUP: 516 break; 517 } 518 } 519 520 protected abstract void addTaskStats( Map<String, HadoopSliceStats> taskStats, HadoopSliceStats.Kind kind, boolean skipLast ) throws IOException; 521 522 protected abstract void addAttemptsToTaskStats( Map<String, HadoopSliceStats> taskStats, boolean captureAttempts ); 523 }