001 /* 002 * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.stats.hadoop; 022 023 import java.io.IOException; 024 import java.util.HashMap; 025 import java.util.Map; 026 027 import cascading.CascadingException; 028 import cascading.flow.FlowStep; 029 import cascading.management.state.ClientState; 030 import cascading.util.Util; 031 import org.apache.hadoop.mapred.JobClient; 032 import org.apache.hadoop.mapred.JobConf; 033 import org.apache.hadoop.mapred.RunningJob; 034 import org.apache.hadoop.mapred.TaskCompletionEvent; 035 import org.apache.hadoop.mapred.TaskID; 036 import org.apache.hadoop.mapred.TaskReport; 037 import org.slf4j.Logger; 038 import org.slf4j.LoggerFactory; 039 040 /** 041 * Hadoop 1 specific sub-class of BaseHadoopStats using the mapred API. 042 */ 043 public abstract class HadoopStepStats extends BaseHadoopStepStats 044 { 045 /** logger. */ 046 private static final Logger LOG = LoggerFactory.getLogger( BaseHadoopStepStats.class ); 047 048 private Map<TaskID, String> idCache = new HashMap<TaskID, String>( 4999 ); // nearest prime, caching for ids 049 050 protected HadoopStepStats( FlowStep<JobConf> flowStep, ClientState clientState ) 051 { 052 super( flowStep, clientState ); 053 } 054 055 @Override 056 protected void addTaskStats( Map<String, HadoopSliceStats> taskStats, HadoopSliceStats.Kind kind, boolean skipLast ) throws IOException 057 { 058 TaskReport[] taskReports = retrieveTaskReports( kind ); 059 060 for( int i = 0; i < taskReports.length - ( skipLast ? 1 : 0 ); i++ ) 061 { 062 TaskReport taskReport = taskReports[ i ]; 063 064 if( taskReport == null ) 065 { 066 LOG.warn( "found empty task report" ); 067 continue; 068 } 069 070 String id = getIDFor( taskReport.getTaskID() ); 071 taskStats.put( id, new HadoopSliceStats( id, getStatus(), kind, stepHasReducers(), taskReport ) ); 072 073 incrementKind( kind ); 074 } 075 } 076 077 /** 078 * Retrieves the TaskReports via the mapred API. 079 */ 080 private TaskReport[] retrieveTaskReports( HadoopSliceStats.Kind kind ) throws IOException 081 { 082 JobClient jobClient = getJobClient(); 083 RunningJob runningJob = getRunningJob(); 084 085 if( jobClient == null || runningJob == null ) 086 return new TaskReport[ 0 ]; 087 088 switch( kind ) 089 { 090 case MAPPER: 091 return jobClient.getMapTaskReports( runningJob.getID() ); 092 case REDUCER: 093 return jobClient.getReduceTaskReports( runningJob.getID() ); 094 case CLEANUP: 095 return jobClient.getCleanupTaskReports( runningJob.getID() ); 096 case SETUP: 097 return jobClient.getSetupTaskReports( runningJob.getID() ); 098 default: 099 return new TaskReport[ 0 ]; 100 } 101 } 102 103 @Override 104 protected void addAttemptsToTaskStats( Map<String, HadoopSliceStats> taskStats, boolean captureAttempts ) 105 { 106 RunningJob runningJob = getRunningJob(); 107 108 if( runningJob == null ) 109 return; 110 111 int count = 0; 112 113 while( captureAttempts ) 114 { 115 try 116 { 117 TaskCompletionEvent[] events = runningJob.getTaskCompletionEvents( count ); 118 119 if( events.length == 0 ) 120 break; 121 122 for( TaskCompletionEvent event : events ) 123 { 124 if( event == null ) 125 { 126 LOG.warn( "found empty completion event" ); 127 continue; 128 } 129 130 // this will return a housekeeping task, which we are not tracking 131 HadoopSliceStats stats = taskStats.get( getIDFor( event.getTaskAttemptId().getTaskID() ) ); 132 133 if( stats != null ) 134 stats.addAttempt( event ); 135 } 136 137 count += events.length; 138 } 139 catch( IOException exception ) 140 { 141 throw new CascadingException( exception ); 142 } 143 } 144 } 145 146 private String getIDFor( TaskID taskID ) 147 { 148 // using taskID instance as #toString is quite painful 149 String id = idCache.get( taskID ); 150 151 if( id == null ) 152 { 153 id = Util.createUniqueID(); 154 idCache.put( taskID, id ); 155 } 156 157 return id; 158 } 159 }