001 /* 002 * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.stats.hadoop; 022 023 import java.io.IOException; 024 import java.util.Collection; 025 import java.util.Collections; 026 import java.util.HashMap; 027 import java.util.HashSet; 028 import java.util.Map; 029 import java.util.Set; 030 import java.util.concurrent.Callable; 031 import java.util.concurrent.ExecutionException; 032 import java.util.concurrent.ExecutorService; 033 import java.util.concurrent.Executors; 034 import java.util.concurrent.Future; 035 import java.util.concurrent.ThreadFactory; 036 import java.util.concurrent.TimeUnit; 037 import java.util.concurrent.TimeoutException; 038 039 import cascading.flow.FlowException; 040 import cascading.flow.FlowStep; 041 import cascading.management.state.ClientState; 042 import cascading.stats.FlowStepStats; 043 import cascading.util.Util; 044 import org.apache.hadoop.mapred.Counters; 045 import org.apache.hadoop.mapred.JobClient; 046 import org.apache.hadoop.mapred.JobConf; 047 import org.apache.hadoop.mapred.RunningJob; 048 import org.apache.hadoop.mapred.TaskCompletionEvent; 049 import org.apache.hadoop.mapred.TaskID; 050 import org.apache.hadoop.mapred.TaskReport; 051 import org.slf4j.Logger; 052 import org.slf4j.LoggerFactory; 053 054 /** Class HadoopStepStats provides Hadoop specific statistics and methods to underlying Hadoop facilities. */ 055 public abstract class HadoopStepStats extends FlowStepStats 056 { 057 public static final String COUNTER_TIMEOUT_PROPERTY = "cascading.step.counter.timeout"; 058 059 /** Field LOG */ 060 private static final Logger LOG = LoggerFactory.getLogger( HadoopStepStats.class ); 061 public static final int TIMEOUT_MAX = 3; 062 063 private Map<TaskID, String> idCache = new HashMap<TaskID, String>( 4999 ); // nearest prime, caching for ids 064 065 /** Field numMapTasks */ 066 int numMapTasks; 067 /** Field numReducerTasks */ 068 int numReduceTasks; 069 070 /** Fields counters */ 071 private Counters cachedCounters = null; 072 073 /** Fields timeouts */ 074 private int timeouts; 075 076 /** Field taskStats */ 077 Map<String, HadoopSliceStats> taskStats = (Map<String, HadoopSliceStats>) Collections.EMPTY_MAP; 078 079 protected HadoopStepStats( FlowStep<JobConf> flowStep, ClientState clientState ) 080 { 081 super( flowStep, clientState ); 082 } 083 084 /** 085 * Method getTaskStats returns the taskStats of this HadoopStepStats object. 086 * 087 * @return the taskStats (type ArrayList<HadoopTaskStats>) of this HadoopStepStats object. 088 */ 089 public Map<String, HadoopSliceStats> getTaskStats() 090 { 091 return taskStats; 092 } 093 094 protected void setTaskStats( Map<String, HadoopSliceStats> taskStats ) 095 { 096 this.taskStats = taskStats; 097 } 098 099 /** 100 * Method getNumMapTasks returns the numMapTasks from the Hadoop job file. 101 * 102 * @return the numMapTasks (type int) of this HadoopStepStats object. 103 */ 104 public int getNumMapTasks() 105 { 106 return numMapTasks; 107 } 108 109 void setNumMapTasks( int numMapTasks ) 110 { 111 this.numMapTasks = numMapTasks; 112 } 113 114 /** 115 * Method getNumReduceTasks returns the numReducerTasks from the Hadoop job file. 116 * 117 * @return the numReducerTasks (type int) of this HadoopStepStats object. 118 */ 119 public int getNumReduceTasks() 120 { 121 return numReduceTasks; 122 } 123 124 void setNumReduceTasks( int numReduceTasks ) 125 { 126 this.numReduceTasks = numReduceTasks; 127 } 128 129 /** 130 * Method getJobID returns the Hadoop running job JobID. 131 * 132 * @return the jobID (type String) of this HadoopStepStats object. 133 */ 134 public String getJobID() 135 { 136 if( getRunningJob() == null ) 137 return null; 138 139 return getRunningJob().getJobID(); 140 } 141 142 /** 143 * Method getJobClient returns the Hadoop {@link JobClient} managing this Hadoop job. 144 * 145 * @return the jobClient (type JobClient) of this HadoopStepStats object. 146 */ 147 public abstract JobClient getJobClient(); 148 149 /** 150 * Method getRunningJob returns the Hadoop {@link RunningJob} managing this Hadoop job. 151 * 152 * @return the runningJob (type RunningJob) of this HadoopStepStats object. 153 */ 154 public abstract RunningJob getRunningJob(); 155 156 /** 157 * Method getCounterGroups returns all of the Hadoop counter groups. 158 * 159 * @return the counterGroups (type Collection<String>) of this HadoopStepStats object. 160 */ 161 @Override 162 public Collection<String> getCounterGroups() 163 { 164 Counters counters = cachedCounters(); 165 166 if( counters == null ) 167 return Collections.emptySet(); 168 169 return Collections.unmodifiableCollection( counters.getGroupNames() ); 170 } 171 172 /** 173 * Method getCounterGroupsMatching returns all the Hadoop counter groups that match the give regex pattern. 174 * 175 * @param regex of String 176 * @return Collection<String> 177 */ 178 @Override 179 public Collection<String> getCounterGroupsMatching( String regex ) 180 { 181 Counters counters = cachedCounters(); 182 183 if( counters == null ) 184 return Collections.emptySet(); 185 186 Set<String> results = new HashSet<String>(); 187 188 for( String counter : counters.getGroupNames() ) 189 { 190 if( counter.matches( regex ) ) 191 results.add( counter ); 192 } 193 194 return Collections.unmodifiableCollection( results ); 195 } 196 197 /** 198 * Method getCountersFor returns the Hadoop counters for the given group. 199 * 200 * @param group of String 201 * @return Collection<String> 202 */ 203 @Override 204 public Collection<String> getCountersFor( String group ) 205 { 206 Counters counters = cachedCounters(); 207 208 if( counters == null ) 209 return Collections.emptySet(); 210 211 Set<String> results = new HashSet<String>(); 212 213 for( Counters.Counter counter : counters.getGroup( group ) ) 214 results.add( counter.getName() ); 215 216 return Collections.unmodifiableCollection( results ); 217 } 218 219 /** 220 * Method getCounterValue returns the Hadoop counter value for the given counter enum. 221 * 222 * @param counter of Enum 223 * @return long 224 */ 225 @Override 226 public long getCounterValue( Enum counter ) 227 { 228 Counters counters = cachedCounters(); 229 230 if( counters == null ) 231 return 0; 232 233 return counters.getCounter( counter ); 234 } 235 236 /** 237 * Method getCounterValue returns the Hadoop counter value for the given group and counter name. 238 * 239 * @param group of String 240 * @param counter of String 241 * @return long 242 */ 243 @Override 244 public long getCounterValue( String group, String counter ) 245 { 246 Counters counters = cachedCounters(); 247 248 if( counters == null ) 249 return 0; 250 251 Counters.Group counterGroup = counters.getGroup( group ); 252 253 if( group == null ) 254 return 0; 255 256 // geCounter actually searches the display name, wtf 257 // in theory this is lazily created if does not exist, but don't rely on it 258 Counters.Counter counterValue = counterGroup.getCounterForName( counter ); 259 260 if( counter == null ) 261 return 0; 262 263 return counterValue.getValue(); 264 } 265 266 protected Counters cachedCounters() 267 { 268 return cachedCounters( false ); 269 } 270 271 protected synchronized Counters cachedCounters( boolean force ) 272 { 273 if( !force && ( isFinished() || timeouts >= TIMEOUT_MAX ) ) 274 return cachedCounters; 275 276 RunningJob runningJob = getRunningJob(); 277 278 if( runningJob == null ) 279 return cachedCounters; 280 281 Future<Counters> future = runFuture( runningJob ); 282 283 int timeout = ( (JobConf) getFlowStep().getConfig() ).getInt( COUNTER_TIMEOUT_PROPERTY, 5 ); 284 285 try 286 { 287 Counters fetched = future.get( timeout, TimeUnit.SECONDS ); 288 289 if( fetched != null ) 290 cachedCounters = fetched; 291 } 292 catch( InterruptedException exception ) 293 { 294 LOG.warn( "fetching counters was interrupted" ); 295 } 296 catch( ExecutionException exception ) 297 { 298 if( cachedCounters != null ) 299 { 300 LOG.error( "unable to get remote counters, returning cached values", exception.getCause() ); 301 302 return cachedCounters; 303 } 304 305 LOG.error( "unable to get remote counters, no cached values, throwing exception", exception.getCause() ); 306 307 if( exception.getCause() instanceof FlowException ) 308 throw (FlowException) exception.getCause(); 309 310 throw new FlowException( exception.getCause() ); 311 } 312 catch( TimeoutException exception ) 313 { 314 timeouts++; 315 316 if( timeouts >= TIMEOUT_MAX ) 317 LOG.warn( "fetching counters timed out after: {} seconds, final attempt: {}", timeout, timeouts ); 318 else 319 LOG.warn( "fetching counters timed out after: {} seconds, attempts: {}", timeout, timeouts ); 320 } 321 322 return cachedCounters; 323 } 324 325 // hardcoded at one thread to force serialization across all requesters in the jvm 326 // this likely prevents the deadlocks the futures are safeguards against 327 private static ExecutorService futuresPool = Executors.newSingleThreadExecutor( new ThreadFactory() 328 { 329 @Override 330 public Thread newThread( Runnable runnable ) 331 { 332 Thread thread = new Thread( runnable, "stats-futures" ); 333 334 thread.setDaemon( true ); 335 336 return thread; 337 } 338 } ); 339 340 private Future<Counters> runFuture( final RunningJob runningJob ) 341 { 342 Callable<Counters> task = new Callable<Counters>() 343 { 344 @Override 345 public Counters call() throws Exception 346 { 347 try 348 { 349 return runningJob.getCounters(); 350 } 351 catch( IOException exception ) 352 { 353 throw new FlowException( "unable to get remote counter values" ); 354 } 355 } 356 }; 357 358 return futuresPool.submit( task ); 359 } 360 361 /** 362 * Returns the underlying Map tasks progress percentage. 363 * <p/> 364 * This method is experimental. 365 * 366 * @return float 367 */ 368 public float getMapProgress() 369 { 370 RunningJob runningJob = getRunningJob(); 371 372 if( runningJob == null ) 373 return 0; 374 375 try 376 { 377 return runningJob.mapProgress(); 378 } 379 catch( IOException exception ) 380 { 381 throw new FlowException( "unable to get progress" ); 382 } 383 } 384 385 /** 386 * Returns the underlying Reduce tasks progress percentage. 387 * <p/> 388 * This method is experimental. 389 * 390 * @return float 391 */ 392 public float getReduceProgress() 393 { 394 RunningJob runningJob = getRunningJob(); 395 396 if( runningJob == null ) 397 return 0; 398 399 try 400 { 401 return runningJob.reduceProgress(); 402 } 403 catch( IOException exception ) 404 { 405 throw new FlowException( "unable to get progress" ); 406 } 407 } 408 409 public String getStatusURL() 410 { 411 RunningJob runningJob = getRunningJob(); 412 413 if( runningJob == null ) 414 return null; 415 416 return runningJob.getTrackingURL(); 417 } 418 419 /** 420 * Method getChildren returns the children of this HadoopStepStats object. 421 * 422 * @return the children (type Collection) of this HadoopStepStats object. 423 */ 424 @Override 425 public Collection getChildren() 426 { 427 return getTaskStats().values(); 428 } 429 430 public Set<String> getChildIDs() 431 { 432 return getTaskStats().keySet(); 433 } 434 435 /** Synchronized to prevent state changes mid record, #stop may be called out of band */ 436 @Override 437 public synchronized void recordChildStats() 438 { 439 try 440 { 441 cachedCounters( true ); 442 } 443 catch( Exception exception ) 444 { 445 // do nothing 446 } 447 448 // if null instance don't bother capturing detail 449 if( !clientState.isEnabled() ) 450 return; 451 452 captureDetail(); 453 454 try 455 { 456 for( String id : taskStats.keySet() ) 457 clientState.record( id, taskStats.get( id ) ); 458 } 459 catch( Exception exception ) 460 { 461 LOG.error( "unable to record slice stats", exception ); 462 } 463 } 464 465 /** Method captureDetail captures statistics task details and completion events. */ 466 @Override 467 public synchronized void captureDetail() 468 { 469 captureDetail( true ); 470 } 471 472 public void captureDetail( boolean captureAttempts ) 473 { 474 HashMap<String, HadoopSliceStats> newStats = new HashMap<String, HadoopSliceStats>(); 475 476 JobClient jobClient = getJobClient(); 477 RunningJob runningJob = getRunningJob(); 478 479 if( jobClient == null || runningJob == null ) 480 return; 481 482 numMapTasks = 0; 483 numReduceTasks = 0; 484 485 try 486 { 487 // cleanup/setup tasks have no useful info so far. 488 // addTaskStats( newStats, HadoopTaskStats.Kind.SETUP, jobClient.getSetupTaskReports( runningJob.getID() ), false ); 489 // addTaskStats( newStats, HadoopTaskStats.Kind.CLEANUP, jobClient.getCleanupTaskReports( runningJob.getID() ), false ); 490 addTaskStats( newStats, HadoopSliceStats.Kind.MAPPER, jobClient.getMapTaskReports( runningJob.getID() ), false ); 491 addTaskStats( newStats, HadoopSliceStats.Kind.REDUCER, jobClient.getReduceTaskReports( runningJob.getID() ), false ); 492 493 int count = 0; 494 495 while( captureAttempts ) 496 { 497 TaskCompletionEvent[] events = runningJob.getTaskCompletionEvents( count ); 498 499 if( events.length == 0 ) 500 break; 501 502 addAttemptsToTaskStats( newStats, events ); 503 count += events.length; 504 } 505 506 setTaskStats( newStats ); 507 } 508 catch( IOException exception ) 509 { 510 LOG.warn( "unable to get task stats", exception ); 511 } 512 } 513 514 private void addTaskStats( Map<String, HadoopSliceStats> taskStats, HadoopSliceStats.Kind kind, TaskReport[] taskReports, boolean skipLast ) 515 { 516 for( int i = 0; i < taskReports.length - ( skipLast ? 1 : 0 ); i++ ) 517 { 518 TaskReport taskReport = taskReports[ i ]; 519 520 if( taskReport == null ) 521 { 522 LOG.warn( "found empty task report" ); 523 continue; 524 } 525 526 String id = getIDFor( taskReport.getTaskID() ); 527 taskStats.put( id, new HadoopSliceStats( id, getStatus(), kind, stepHasReducers(), taskReport ) ); 528 529 incrementKind( kind ); 530 } 531 } 532 533 private boolean stepHasReducers() 534 { 535 return !getFlowStep().getGroups().isEmpty(); 536 } 537 538 private void incrementKind( HadoopSliceStats.Kind kind ) 539 { 540 switch( kind ) 541 { 542 case SETUP: 543 break; 544 case MAPPER: 545 numMapTasks++; 546 break; 547 case REDUCER: 548 numReduceTasks++; 549 break; 550 case CLEANUP: 551 break; 552 } 553 } 554 555 private void addAttemptsToTaskStats( Map<String, HadoopSliceStats> taskStats, TaskCompletionEvent[] events ) 556 { 557 for( TaskCompletionEvent event : events ) 558 { 559 if( event == null ) 560 { 561 LOG.warn( "found empty completion event" ); 562 continue; 563 } 564 565 // this will return a housekeeping task, which we are not tracking 566 HadoopSliceStats stats = taskStats.get( getIDFor( event.getTaskAttemptId().getTaskID() ) ); 567 568 if( stats != null ) 569 stats.addAttempt( event ); 570 } 571 } 572 573 private String getIDFor( TaskID taskID ) 574 { 575 // using taskID instance as #toString is quite painful 576 String id = idCache.get( taskID ); 577 578 if( id == null ) 579 { 580 id = Util.createUniqueID(); 581 idCache.put( taskID, id ); 582 } 583 584 return id; 585 } 586 }