001/* 002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.stats.hadoop; 022 023import java.io.IOException; 024 025import cascading.flow.FlowException; 026import cascading.flow.FlowNode; 027import cascading.flow.FlowStep; 028import cascading.flow.planner.BaseFlowStep; 029import cascading.management.state.ClientState; 030import cascading.stats.BaseCachedStepStats; 031import cascading.util.Util; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.mapred.JobClient; 034import org.apache.hadoop.mapred.JobConf; 035import org.apache.hadoop.mapred.RunningJob; 036import org.apache.hadoop.mapreduce.Counters; 037import org.apache.hadoop.mapreduce.Job; 038import org.apache.hadoop.mapreduce.TaskCompletionEvent; 039import org.slf4j.Logger; 040import org.slf4j.LoggerFactory; 041 042/** Class HadoopStepStats provides Hadoop specific statistics and methods to underlying Hadoop facilities. */ 043public abstract class HadoopStepStats extends BaseCachedStepStats<Configuration, RunningJob, Counters> 044 { 045 private static final Logger LOG = LoggerFactory.getLogger( HadoopStepStats.class ); 046 047 private HadoopNodeStats mapperNodeStats; 048 private HadoopNodeStats reducerNodeStats; 049 050 protected static Job getJob( RunningJob runningJob ) 051 { 052 if( runningJob == null ) // if null, job hasn't been submitted 053 return null; 054 055 Job job = Util.returnInstanceFieldIfExistsSafe( runningJob, "job" ); 056 057 if( job == null ) 058 { 059 LOG.warn( "unable to get underlying org.apache.hadoop.mapreduce.Job from org.apache.hadoop.mapred.RunningJob, task level task counters will be unavailable" ); 060 return null; 061 } 062 063 return job; 064 } 065 066 protected HadoopStepStats( FlowStep<JobConf> flowStep, ClientState clientState ) 067 { 068 super( flowStep, clientState ); 069 070 BaseFlowStep<JobConf> step = (BaseFlowStep<JobConf>) flowStep; 071 072 // don't rely on the iterator topological sort to identify mapper or reducer 073 for( FlowNode current : step.getFlowNodeGraph().vertexSet() ) 074 { 075 if( step.getFlowNodeGraph().inDegreeOf( current ) == 0 ) 076 { 077 if( mapperNodeStats != null ) 078 throw new IllegalStateException( "mapper node already found" ); 079 080 mapperNodeStats = new HadoopNodeStats( this, getConfig(), HadoopSliceStats.Kind.MAPPER, current, clientState ); 081 addNodeStats( mapperNodeStats ); 082 } 083 else 084 { 085 if( reducerNodeStats != null ) 086 throw new IllegalStateException( "reducer node already found" ); 087 088 reducerNodeStats = new HadoopNodeStats( this, getConfig(), HadoopSliceStats.Kind.REDUCER, current, clientState ); 089 addNodeStats( reducerNodeStats ); 090 } 091 } 092 093 if( mapperNodeStats == null ) 094 throw new IllegalStateException( "mapper node not found" ); 095 096 counterCache = new HadoopStepCounterCache( this, (Configuration) getConfig() ) 097 { 098 @Override 099 protected RunningJob getJobStatusClient() 100 { 101 return HadoopStepStats.this.getJobStatusClient(); 102 } 103 }; 104 } 105 106 private Configuration getConfig() 107 { 108 return (Configuration) this.getFlowStep().getConfig(); 109 } 110 111 /** 112 * Method getNumMapTasks returns the numMapTasks from the Hadoop job file. 113 * 114 * @return the numMapTasks (type int) of this HadoopStepStats object. 115 */ 116 public int getNumMapTasks() 117 { 118 return mapperNodeStats.getChildren().size(); 119 } 120 121 /** 122 * Method getNumReduceTasks returns the numReducerTasks from the Hadoop job file. 123 * 124 * @return the numReducerTasks (type int) of this HadoopStepStats object. 125 */ 126 public int getNumReduceTasks() 127 { 128 return reducerNodeStats == null ? 0 : reducerNodeStats.getChildren().size(); 129 } 130 131 @Override 132 public String getProcessStepID() 133 { 134 if( getJobStatusClient() == null ) 135 return null; 136 137 return getJobStatusClient().getJobID().toString(); 138 } 139 140 /** 141 * Method getJobClient returns the Hadoop {@link JobClient} managing this Hadoop job. 142 * 143 * @return the jobClient (type JobClient) of this HadoopStepStats object. 144 */ 145 public abstract JobClient getJobClient(); 146 147 /** 148 * Returns the underlying Map tasks progress percentage. 149 * <p/> 150 * This method is experimental. 151 * 152 * @return float 153 */ 154 public float getMapProgress() 155 { 156 Job runningJob = getJob( getJobStatusClient() ); 157 158 if( runningJob == null ) 159 return 0; 160 161 try 162 { 163 return runningJob.mapProgress(); 164 } 165 catch( IOException exception ) 166 { 167 throw new FlowException( "unable to get progress" ); 168 } 169 } 170 171 /** 172 * Returns the underlying Reduce tasks progress percentage. 173 * <p/> 174 * This method is experimental. 175 * 176 * @return float 177 */ 178 public float getReduceProgress() 179 { 180 Job runningJob = getJob( getJobStatusClient() ); 181 182 if( runningJob == null ) 183 return 0; 184 185 try 186 { 187 return runningJob.reduceProgress(); 188 } 189 catch( IOException exception ) 190 { 191 throw new FlowException( "unable to get progress" ); 192 } 193 } 194 195 @Override 196 public String getProcessStatusURL() 197 { 198 return getStatusURL(); 199 } 200 201 /** 202 * @deprecated see {@link #getProcessStatusURL()} 203 */ 204 @Deprecated 205 public String getStatusURL() 206 { 207 Job runningJob = getJob( getJobStatusClient() ); 208 209 if( runningJob == null ) 210 return null; 211 212 return runningJob.getTrackingURL(); 213 } 214 215 /** Method captureDetail captures statistics task details and completion events. */ 216 @Override 217 public synchronized void captureDetail( Type depth ) 218 { 219 if( !getType().isChild( depth ) || !isDetailStale() ) 220 return; 221 222 Job runningJob = getJob( getJobStatusClient() ); 223 224 if( runningJob == null ) 225 return; 226 227 try 228 { 229 mapperNodeStats.captureDetail( depth ); 230 231 if( reducerNodeStats != null ) 232 reducerNodeStats.captureDetail( depth ); 233 234 int count = 0; 235 236 while( depth == Type.ATTEMPT ) 237 { 238 TaskCompletionEvent[] events = runningJob.getTaskCompletionEvents( count ); 239 240 if( events.length == 0 ) 241 break; 242 243 addAttemptsToTaskStats( events ); 244 count += events.length; 245 } 246 247 markDetailCaptured(); 248 } 249 catch( IOException exception ) 250 { 251 LOG.warn( "unable to get task stats", exception ); 252 } 253 } 254 255 private void addAttemptsToTaskStats( TaskCompletionEvent[] events ) 256 { 257 for( TaskCompletionEvent event : events ) 258 { 259 if( event == null ) 260 { 261 LOG.warn( "found empty completion event" ); 262 continue; 263 } 264 265 if( event.isMapTask() ) 266 mapperNodeStats.addAttempt( event ); 267 else 268 reducerNodeStats.addAttempt( event ); 269 } 270 } 271 }