001/* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.stats.hadoop; 022 023import java.io.IOException; 024 025import cascading.flow.FlowException; 026import cascading.flow.FlowNode; 027import cascading.flow.FlowStep; 028import cascading.flow.planner.BaseFlowStep; 029import cascading.management.state.ClientState; 030import cascading.util.Util; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.mapred.JobClient; 033import org.apache.hadoop.mapred.JobConf; 034import org.apache.hadoop.mapred.RunningJob; 035import org.apache.hadoop.mapreduce.Counters; 036import org.apache.hadoop.mapreduce.Job; 037import org.apache.hadoop.mapreduce.TaskCompletionEvent; 038import org.slf4j.Logger; 039import org.slf4j.LoggerFactory; 040 041/** Class HadoopStepStats provides Hadoop specific statistics and methods to underlying Hadoop facilities. */ 042public abstract class HadoopStepStats extends BaseHadoopStepStats<RunningJob, Counters> 043 { 044 private static final Logger LOG = LoggerFactory.getLogger( HadoopStepStats.class ); 045 046 private HadoopNodeStats mapperNodeStats; 047 private HadoopNodeStats reducerNodeStats; 048 049 protected static Job getJob( RunningJob runningJob ) 050 { 051 if( runningJob == null ) // if null, job hasn't been submitted 052 return null; 053 054 Job job = Util.returnInstanceFieldIfExistsSafe( runningJob, "job" ); 055 056 if( job == null ) 057 { 058 LOG.warn( "unable to get underlying org.apache.hadoop.mapreduce.Job from org.apache.hadoop.mapred.RunningJob, task level task counters will be unavailable" ); 059 return null; 060 } 061 062 return job; 063 } 064 065 protected HadoopStepStats( FlowStep<JobConf> flowStep, ClientState clientState ) 066 { 067 super( flowStep, clientState ); 068 069 BaseFlowStep<JobConf> step = (BaseFlowStep<JobConf>) flowStep; 070 071 // don't rely on the iterator topological sort to identify mapper or reducer 072 for( FlowNode current : step.getFlowNodeGraph().vertexSet() ) 073 { 074 if( step.getFlowNodeGraph().inDegreeOf( current ) == 0 ) 075 { 076 if( mapperNodeStats != null ) 077 throw new IllegalStateException( "mapper node already found" ); 078 079 mapperNodeStats = new HadoopNodeStats( this, getConfig(), HadoopSliceStats.Kind.MAPPER, current, clientState ); 080 addNodeStats( mapperNodeStats ); 081 } 082 else 083 { 084 if( reducerNodeStats != null ) 085 throw new IllegalStateException( "reducer node already found" ); 086 087 reducerNodeStats = new HadoopNodeStats( this, getConfig(), HadoopSliceStats.Kind.REDUCER, current, clientState ); 088 addNodeStats( reducerNodeStats ); 089 } 090 } 091 092 if( mapperNodeStats == null ) 093 throw new IllegalStateException( "mapper node not found" ); 094 095 counterCache = new HadoopStepCounterCache( this, (Configuration) getConfig() ) 096 { 097 @Override 098 protected RunningJob getJobStatusClient() 099 { 100 return HadoopStepStats.this.getJobStatusClient(); 101 } 102 }; 103 } 104 105 private Configuration getConfig() 106 { 107 return (Configuration) this.getFlowStep().getConfig(); 108 } 109 110 /** 111 * Method getNumMapTasks returns the numMapTasks from the Hadoop job file. 112 * 113 * @return the numMapTasks (type int) of this HadoopStepStats object. 114 */ 115 public int getNumMapTasks() 116 { 117 return mapperNodeStats.getChildren().size(); 118 } 119 120 /** 121 * Method getNumReduceTasks returns the numReducerTasks from the Hadoop job file. 122 * 123 * @return the numReducerTasks (type int) of this HadoopStepStats object. 124 */ 125 public int getNumReduceTasks() 126 { 127 return reducerNodeStats == null ? 0 : reducerNodeStats.getChildren().size(); 128 } 129 130 @Override 131 public String getProcessStepID() 132 { 133 if( getJobStatusClient() == null ) 134 return null; 135 136 return getJobStatusClient().getJobID().toString(); 137 } 138 139 /** 140 * Method getJobClient returns the Hadoop {@link JobClient} managing this Hadoop job. 141 * 142 * @return the jobClient (type JobClient) of this HadoopStepStats object. 143 */ 144 public abstract JobClient getJobClient(); 145 146 /** 147 * Returns the underlying Map tasks progress percentage. 148 * <p/> 149 * This method is experimental. 150 * 151 * @return float 152 */ 153 public float getMapProgress() 154 { 155 Job runningJob = getJob( getJobStatusClient() ); 156 157 if( runningJob == null ) 158 return 0; 159 160 try 161 { 162 return runningJob.mapProgress(); 163 } 164 catch( IOException exception ) 165 { 166 throw new FlowException( "unable to get progress" ); 167 } 168 } 169 170 /** 171 * Returns the underlying Reduce tasks progress percentage. 172 * <p/> 173 * This method is experimental. 174 * 175 * @return float 176 */ 177 public float getReduceProgress() 178 { 179 Job runningJob = getJob( getJobStatusClient() ); 180 181 if( runningJob == null ) 182 return 0; 183 184 try 185 { 186 return runningJob.reduceProgress(); 187 } 188 catch( IOException exception ) 189 { 190 throw new FlowException( "unable to get progress" ); 191 } 192 } 193 194 @Override 195 public String getProcessStatusURL() 196 { 197 return getStatusURL(); 198 } 199 200 /** 201 * @deprecated see {@link #getProcessStatusURL()} 202 */ 203 @Deprecated 204 public String getStatusURL() 205 { 206 Job runningJob = getJob( getJobStatusClient() ); 207 208 if( runningJob == null ) 209 return null; 210 211 return runningJob.getTrackingURL(); 212 } 213 214 /** Method captureDetail captures statistics task details and completion events. */ 215 @Override 216 public synchronized void captureDetail( Type depth ) 217 { 218 if( !getType().isChild( depth ) || !isDetailStale() ) 219 return; 220 221 Job runningJob = getJob( getJobStatusClient() ); 222 223 if( runningJob == null ) 224 return; 225 226 try 227 { 228 mapperNodeStats.captureDetail( depth ); 229 230 if( reducerNodeStats != null ) 231 reducerNodeStats.captureDetail( depth ); 232 233 int count = 0; 234 235 while( depth == Type.ATTEMPT ) 236 { 237 TaskCompletionEvent[] events = runningJob.getTaskCompletionEvents( count ); 238 239 if( events.length == 0 ) 240 break; 241 242 addAttemptsToTaskStats( events ); 243 count += events.length; 244 } 245 246 markDetailCaptured(); 247 } 248 catch( IOException exception ) 249 { 250 LOG.warn( "unable to get task stats", exception ); 251 } 252 } 253 254 private void addAttemptsToTaskStats( TaskCompletionEvent[] events ) 255 { 256 for( TaskCompletionEvent event : events ) 257 { 258 if( event == null ) 259 { 260 LOG.warn( "found empty completion event" ); 261 continue; 262 } 263 264 if( event.isMapTask() ) 265 mapperNodeStats.addAttempt( event ); 266 else 267 reducerNodeStats.addAttempt( event ); 268 } 269 } 270 }