001/* 002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.stats.hadoop; 022 023import java.io.IOException; 024 025import cascading.flow.FlowException; 026import cascading.flow.FlowNode; 027import cascading.flow.FlowStep; 028import cascading.flow.planner.BaseFlowStep; 029import cascading.management.state.ClientState; 030import cascading.stats.BaseCachedStepStats; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.mapred.Counters; 033import org.apache.hadoop.mapred.JobClient; 034import org.apache.hadoop.mapred.JobConf; 035import org.apache.hadoop.mapred.RunningJob; 036import org.apache.hadoop.mapred.TaskCompletionEvent; 037import org.slf4j.Logger; 038import org.slf4j.LoggerFactory; 039 040/** Class HadoopStepStats provides Hadoop specific statistics and methods to underlying Hadoop facilities. */ 041public abstract class HadoopStepStats extends BaseCachedStepStats<Configuration, RunningJob, Counters> 042 { 043 private static final Logger LOG = LoggerFactory.getLogger( HadoopStepStats.class ); 044 045 private HadoopNodeStats mapperNodeStats; 046 private HadoopNodeStats reducerNodeStats; 047 048 protected HadoopStepStats( FlowStep<JobConf> flowStep, ClientState clientState ) 049 { 050 super( flowStep, clientState ); 051 052 BaseFlowStep<JobConf> step = (BaseFlowStep<JobConf>) flowStep; 053 054 // don't rely on the iterator topological sort to identify mapper or reducer 055 for( FlowNode current : step.getFlowNodeGraph().vertexSet() ) 056 { 057 if( step.getFlowNodeGraph().inDegreeOf( current ) == 0 ) 058 { 059 if( mapperNodeStats != null ) 060 throw new IllegalStateException( "mapper node already found" ); 061 062 mapperNodeStats = new HadoopNodeStats( this, getConfig(), HadoopSliceStats.Kind.MAPPER, current, clientState ); 063 addNodeStats( mapperNodeStats ); 064 } 065 else 066 { 067 if( reducerNodeStats != null ) 068 throw new IllegalStateException( "reducer node already found" ); 069 070 reducerNodeStats = new HadoopNodeStats( this, getConfig(), HadoopSliceStats.Kind.REDUCER, current, clientState ); 071 addNodeStats( reducerNodeStats ); 072 } 073 } 074 075 if( mapperNodeStats == null ) 076 throw new IllegalStateException( "mapper node not found" ); 077 078 counterCache = new HadoopStepCounterCache( this, (Configuration) getConfig() ) 079 { 080 @Override 081 protected RunningJob getJobStatusClient() 082 { 083 return HadoopStepStats.this.getJobStatusClient(); 084 } 085 }; 086 } 087 088 private Configuration getConfig() 089 { 090 return (Configuration) this.getFlowStep().getConfig(); 091 } 092 093 /** 094 * Method getNumMapTasks returns the numMapTasks from the Hadoop job file. 095 * 096 * @return the numMapTasks (type int) of this HadoopStepStats object. 097 */ 098 public int getNumMapTasks() 099 { 100 return mapperNodeStats.getChildren().size(); 101 } 102 103 /** 104 * Method getNumReduceTasks returns the numReducerTasks from the Hadoop job file. 105 * 106 * @return the numReducerTasks (type int) of this HadoopStepStats object. 107 */ 108 public int getNumReduceTasks() 109 { 110 return reducerNodeStats == null ? 0 : reducerNodeStats.getChildren().size(); 111 } 112 113 /** 114 * Method getProcessStepID returns the Hadoop running job JobID. 115 * 116 * @return the jobID (type String) of this HadoopStepStats object. 117 */ 118 @Override 119 public String getProcessStepID() 120 { 121 if( getJobStatusClient() == null ) 122 return null; 123 124 return getJobStatusClient().getID().toString(); 125 } 126 127 /** 128 * Method getJobClient returns the Hadoop {@link JobClient} managing this Hadoop job. 129 * 130 * @return the jobClient (type JobClient) of this HadoopStepStats object. 131 */ 132 public abstract JobClient getJobClient(); 133 134 /** 135 * Returns the underlying Map tasks progress percentage. 136 * <p/> 137 * This method is experimental. 138 * 139 * @return float 140 */ 141 public float getMapProgress() 142 { 143 RunningJob runningJob = getJobStatusClient(); 144 145 if( runningJob == null ) 146 return 0; 147 148 try 149 { 150 return runningJob.mapProgress(); 151 } 152 catch( IOException exception ) 153 { 154 throw new FlowException( "unable to get progress" ); 155 } 156 } 157 158 /** 159 * Returns the underlying Reduce tasks progress percentage. 160 * <p/> 161 * This method is experimental. 162 * 163 * @return float 164 */ 165 public float getReduceProgress() 166 { 167 RunningJob runningJob = getJobStatusClient(); 168 169 if( runningJob == null ) 170 return 0; 171 172 try 173 { 174 return runningJob.reduceProgress(); 175 } 176 catch( IOException exception ) 177 { 178 throw new FlowException( "unable to get progress" ); 179 } 180 } 181 182 public String getStatusURL() 183 { 184 RunningJob runningJob = getJobStatusClient(); 185 186 if( runningJob == null ) 187 return null; 188 189 return runningJob.getTrackingURL(); 190 } 191 192 private boolean stepHasReducers() 193 { 194 return getFlowStep().getNumFlowNodes() > 1; 195 } 196 197 /** Method captureDetail captures statistics task details and completion events. */ 198 @Override 199 public synchronized void captureDetail( Type depth ) 200 { 201 if( !getType().isChild( depth ) || !isDetailStale() ) 202 return; 203 204 JobClient jobClient = getJobClient(); 205 RunningJob runningJob = getJobStatusClient(); 206 207 if( jobClient == null || runningJob == null ) 208 return; 209 210 try 211 { 212 mapperNodeStats.captureDetail( depth ); 213 214 if( reducerNodeStats != null ) 215 reducerNodeStats.captureDetail( depth ); 216 217 int count = 0; 218 219 while( depth == Type.ATTEMPT ) 220 { 221 TaskCompletionEvent[] events = runningJob.getTaskCompletionEvents( count ); 222 223 if( events.length == 0 ) 224 break; 225 226 addAttemptsToTaskStats( events ); 227 count += events.length; 228 } 229 230 markDetailCaptured(); 231 } 232 catch( IOException exception ) 233 { 234 LOG.warn( "unable to get task stats", exception ); 235 } 236 } 237 238 private void addAttemptsToTaskStats( TaskCompletionEvent[] events ) 239 { 240 for( TaskCompletionEvent event : events ) 241 { 242 if( event == null ) 243 { 244 LOG.warn( "found empty completion event" ); 245 continue; 246 } 247 248 // in hadoop 1, isMapTask may be false when it is indeed a map task 249 // this may only manifest in mini-cluster tests -- included to be safe 250 if( event.isMapTask() || reducerNodeStats == null ) 251 mapperNodeStats.addAttempt( event ); 252 else 253 reducerNodeStats.addAttempt( event ); 254 } 255 } 256 }