001/*
002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.stats.hadoop;
022
023import java.io.IOException;
024
025import cascading.flow.FlowException;
026import cascading.flow.FlowNode;
027import cascading.flow.FlowStep;
028import cascading.flow.planner.BaseFlowStep;
029import cascading.management.state.ClientState;
030import cascading.stats.BaseCachedStepStats;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.mapred.Counters;
033import org.apache.hadoop.mapred.JobClient;
034import org.apache.hadoop.mapred.JobConf;
035import org.apache.hadoop.mapred.RunningJob;
036import org.apache.hadoop.mapred.TaskCompletionEvent;
037import org.slf4j.Logger;
038import org.slf4j.LoggerFactory;
039
040/** Class HadoopStepStats provides Hadoop specific statistics and methods to underlying Hadoop facilities. */
041public abstract class HadoopStepStats extends BaseCachedStepStats<Configuration, RunningJob, Counters>
042  {
043  private static final Logger LOG = LoggerFactory.getLogger( HadoopStepStats.class );
044
045  private HadoopNodeStats mapperNodeStats;
046  private HadoopNodeStats reducerNodeStats;
047
048  protected HadoopStepStats( FlowStep<JobConf> flowStep, ClientState clientState )
049    {
050    super( flowStep, clientState );
051
052    BaseFlowStep<JobConf> step = (BaseFlowStep<JobConf>) flowStep;
053
054    // don't rely on the iterator topological sort to identify mapper or reducer
055    for( FlowNode current : step.getFlowNodeGraph().vertexSet() )
056      {
057      if( step.getFlowNodeGraph().inDegreeOf( current ) == 0 )
058        {
059        if( mapperNodeStats != null )
060          throw new IllegalStateException( "mapper node already found" );
061
062        mapperNodeStats = new HadoopNodeStats( this, getConfig(), HadoopSliceStats.Kind.MAPPER, current, clientState );
063        addNodeStats( mapperNodeStats );
064        }
065      else
066        {
067        if( reducerNodeStats != null )
068          throw new IllegalStateException( "reducer node already found" );
069
070        reducerNodeStats = new HadoopNodeStats( this, getConfig(), HadoopSliceStats.Kind.REDUCER, current, clientState );
071        addNodeStats( reducerNodeStats );
072        }
073      }
074
075    if( mapperNodeStats == null )
076      throw new IllegalStateException( "mapper node not found" );
077
078    counterCache = new HadoopStepCounterCache( this, (Configuration) getConfig() )
079      {
080      @Override
081      protected RunningJob getJobStatusClient()
082        {
083        return HadoopStepStats.this.getJobStatusClient();
084        }
085      };
086    }
087
088  private Configuration getConfig()
089    {
090    return (Configuration) this.getFlowStep().getConfig();
091    }
092
093  /**
094   * Method getNumMapTasks returns the numMapTasks from the Hadoop job file.
095   *
096   * @return the numMapTasks (type int) of this HadoopStepStats object.
097   */
098  public int getNumMapTasks()
099    {
100    return mapperNodeStats.getChildren().size();
101    }
102
103  /**
104   * Method getNumReduceTasks returns the numReducerTasks from the Hadoop job file.
105   *
106   * @return the numReducerTasks (type int) of this HadoopStepStats object.
107   */
108  public int getNumReduceTasks()
109    {
110    return reducerNodeStats == null ? 0 : reducerNodeStats.getChildren().size();
111    }
112
113  /**
114   * Method getProcessStepID returns the Hadoop running job JobID.
115   *
116   * @return the jobID (type String) of this HadoopStepStats object.
117   */
118  @Override
119  public String getProcessStepID()
120    {
121    if( getJobStatusClient() == null )
122      return null;
123
124    return getJobStatusClient().getID().toString();
125    }
126
127  /**
128   * Method getJobClient returns the Hadoop {@link JobClient} managing this Hadoop job.
129   *
130   * @return the jobClient (type JobClient) of this HadoopStepStats object.
131   */
132  public abstract JobClient getJobClient();
133
134  /**
135   * Returns the underlying Map tasks progress percentage.
136   * <p/>
137   * This method is experimental.
138   *
139   * @return float
140   */
141  public float getMapProgress()
142    {
143    RunningJob runningJob = getJobStatusClient();
144
145    if( runningJob == null )
146      return 0;
147
148    try
149      {
150      return runningJob.mapProgress();
151      }
152    catch( IOException exception )
153      {
154      throw new FlowException( "unable to get progress" );
155      }
156    }
157
158  /**
159   * Returns the underlying Reduce tasks progress percentage.
160   * <p/>
161   * This method is experimental.
162   *
163   * @return float
164   */
165  public float getReduceProgress()
166    {
167    RunningJob runningJob = getJobStatusClient();
168
169    if( runningJob == null )
170      return 0;
171
172    try
173      {
174      return runningJob.reduceProgress();
175      }
176    catch( IOException exception )
177      {
178      throw new FlowException( "unable to get progress" );
179      }
180    }
181
182  public String getStatusURL()
183    {
184    RunningJob runningJob = getJobStatusClient();
185
186    if( runningJob == null )
187      return null;
188
189    return runningJob.getTrackingURL();
190    }
191
192  private boolean stepHasReducers()
193    {
194    return getFlowStep().getNumFlowNodes() > 1;
195    }
196
197  /** Method captureDetail captures statistics task details and completion events. */
198  @Override
199  public synchronized void captureDetail( Type depth )
200    {
201    if( !getType().isChild( depth ) || !isDetailStale() )
202      return;
203
204    JobClient jobClient = getJobClient();
205    RunningJob runningJob = getJobStatusClient();
206
207    if( jobClient == null || runningJob == null )
208      return;
209
210    try
211      {
212      mapperNodeStats.captureDetail( depth );
213
214      if( reducerNodeStats != null )
215        reducerNodeStats.captureDetail( depth );
216
217      int count = 0;
218
219      while( depth == Type.ATTEMPT )
220        {
221        TaskCompletionEvent[] events = runningJob.getTaskCompletionEvents( count );
222
223        if( events.length == 0 )
224          break;
225
226        addAttemptsToTaskStats( events );
227        count += events.length;
228        }
229
230      markDetailCaptured();
231      }
232    catch( IOException exception )
233      {
234      LOG.warn( "unable to get task stats", exception );
235      }
236    }
237
238  private void addAttemptsToTaskStats( TaskCompletionEvent[] events )
239    {
240    for( TaskCompletionEvent event : events )
241      {
242      if( event == null )
243        {
244        LOG.warn( "found empty completion event" );
245        continue;
246        }
247
248      // in hadoop 1, isMapTask may be false when it is indeed a map task
249      // this may only manifest in mini-cluster tests -- included to be safe
250      if( event.isMapTask() || reducerNodeStats == null )
251        mapperNodeStats.addAttempt( event );
252      else
253        reducerNodeStats.addAttempt( event );
254      }
255    }
256  }