001/*
002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.stats.hadoop;
022
023import java.io.IOException;
024
025import cascading.flow.FlowException;
026import cascading.flow.FlowNode;
027import cascading.flow.FlowStep;
028import cascading.flow.planner.BaseFlowStep;
029import cascading.management.state.ClientState;
030import cascading.util.Util;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.mapred.JobClient;
033import org.apache.hadoop.mapred.JobConf;
034import org.apache.hadoop.mapred.RunningJob;
035import org.apache.hadoop.mapreduce.Counters;
036import org.apache.hadoop.mapreduce.Job;
037import org.apache.hadoop.mapreduce.TaskCompletionEvent;
038import org.slf4j.Logger;
039import org.slf4j.LoggerFactory;
040
041/** Class HadoopStepStats provides Hadoop specific statistics and methods to underlying Hadoop facilities. */
042public abstract class HadoopStepStats extends BaseHadoopStepStats<RunningJob, Counters>
043  {
044  private static final Logger LOG = LoggerFactory.getLogger( HadoopStepStats.class );
045
046  private HadoopNodeStats mapperNodeStats;
047  private HadoopNodeStats reducerNodeStats;
048
049  protected static Job getJob( RunningJob runningJob )
050    {
051    if( runningJob == null ) // if null, job hasn't been submitted
052      return null;
053
054    Job job = Util.returnInstanceFieldIfExistsSafe( runningJob, "job" );
055
056    if( job == null )
057      {
058      LOG.warn( "unable to get underlying org.apache.hadoop.mapreduce.Job from org.apache.hadoop.mapred.RunningJob, task level task counters will be unavailable" );
059      return null;
060      }
061
062    return job;
063    }
064
065  protected HadoopStepStats( FlowStep<JobConf> flowStep, ClientState clientState )
066    {
067    super( flowStep, clientState );
068
069    BaseFlowStep<JobConf> step = (BaseFlowStep<JobConf>) flowStep;
070
071    // don't rely on the iterator topological sort to identify mapper or reducer
072    for( FlowNode current : step.getFlowNodeGraph().vertexSet() )
073      {
074      if( step.getFlowNodeGraph().inDegreeOf( current ) == 0 )
075        {
076        if( mapperNodeStats != null )
077          throw new IllegalStateException( "mapper node already found" );
078
079        mapperNodeStats = new HadoopNodeStats( this, getConfig(), HadoopSliceStats.Kind.MAPPER, current, clientState );
080        addNodeStats( mapperNodeStats );
081        }
082      else
083        {
084        if( reducerNodeStats != null )
085          throw new IllegalStateException( "reducer node already found" );
086
087        reducerNodeStats = new HadoopNodeStats( this, getConfig(), HadoopSliceStats.Kind.REDUCER, current, clientState );
088        addNodeStats( reducerNodeStats );
089        }
090      }
091
092    if( mapperNodeStats == null )
093      throw new IllegalStateException( "mapper node not found" );
094
095    counterCache = new HadoopStepCounterCache( this, (Configuration) getConfig() )
096    {
097    @Override
098    protected RunningJob getJobStatusClient()
099      {
100      return HadoopStepStats.this.getJobStatusClient();
101      }
102    };
103    }
104
105  private Configuration getConfig()
106    {
107    return (Configuration) this.getFlowStep().getConfig();
108    }
109
110  /**
111   * Method getNumMapTasks returns the numMapTasks from the Hadoop job file.
112   *
113   * @return the numMapTasks (type int) of this HadoopStepStats object.
114   */
115  public int getNumMapTasks()
116    {
117    return mapperNodeStats.getChildren().size();
118    }
119
120  /**
121   * Method getNumReduceTasks returns the numReducerTasks from the Hadoop job file.
122   *
123   * @return the numReducerTasks (type int) of this HadoopStepStats object.
124   */
125  public int getNumReduceTasks()
126    {
127    return reducerNodeStats == null ? 0 : reducerNodeStats.getChildren().size();
128    }
129
130  @Override
131  public String getProcessStepID()
132    {
133    if( getJobStatusClient() == null )
134      return null;
135
136    return getJobStatusClient().getJobID().toString();
137    }
138
139  /**
140   * Method getJobClient returns the Hadoop {@link JobClient} managing this Hadoop job.
141   *
142   * @return the jobClient (type JobClient) of this HadoopStepStats object.
143   */
144  public abstract JobClient getJobClient();
145
146  /**
147   * Returns the underlying Map tasks progress percentage.
148   * <p/>
149   * This method is experimental.
150   *
151   * @return float
152   */
153  public float getMapProgress()
154    {
155    Job runningJob = getJob( getJobStatusClient() );
156
157    if( runningJob == null )
158      return 0;
159
160    try
161      {
162      return runningJob.mapProgress();
163      }
164    catch( IOException exception )
165      {
166      throw new FlowException( "unable to get progress" );
167      }
168    }
169
170  /**
171   * Returns the underlying Reduce tasks progress percentage.
172   * <p/>
173   * This method is experimental.
174   *
175   * @return float
176   */
177  public float getReduceProgress()
178    {
179    Job runningJob = getJob( getJobStatusClient() );
180
181    if( runningJob == null )
182      return 0;
183
184    try
185      {
186      return runningJob.reduceProgress();
187      }
188    catch( IOException exception )
189      {
190      throw new FlowException( "unable to get progress" );
191      }
192    }
193
194  @Override
195  public String getProcessStatusURL()
196    {
197    return getStatusURL();
198    }
199
200  /**
201   * @deprecated see {@link #getProcessStatusURL()}
202   */
203  @Deprecated
204  public String getStatusURL()
205    {
206    Job runningJob = getJob( getJobStatusClient() );
207
208    if( runningJob == null )
209      return null;
210
211    return runningJob.getTrackingURL();
212    }
213
214  /** Method captureDetail captures statistics task details and completion events. */
215  @Override
216  public synchronized void captureDetail( Type depth )
217    {
218    if( !getType().isChild( depth ) || !isDetailStale() )
219      return;
220
221    Job runningJob = getJob( getJobStatusClient() );
222
223    if( runningJob == null )
224      return;
225
226    try
227      {
228      mapperNodeStats.captureDetail( depth );
229
230      if( reducerNodeStats != null )
231        reducerNodeStats.captureDetail( depth );
232
233      int count = 0;
234
235      while( depth == Type.ATTEMPT )
236        {
237        TaskCompletionEvent[] events = runningJob.getTaskCompletionEvents( count );
238
239        if( events.length == 0 )
240          break;
241
242        addAttemptsToTaskStats( events );
243        count += events.length;
244        }
245
246      markDetailCaptured();
247      }
248    catch( IOException exception )
249      {
250      LOG.warn( "unable to get task stats", exception );
251      }
252    }
253
254  private void addAttemptsToTaskStats( TaskCompletionEvent[] events )
255    {
256    for( TaskCompletionEvent event : events )
257      {
258      if( event == null )
259        {
260        LOG.warn( "found empty completion event" );
261        continue;
262        }
263
264      if( event.isMapTask() )
265        mapperNodeStats.addAttempt( event );
266      else
267        reducerNodeStats.addAttempt( event );
268      }
269    }
270  }