001/*
002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.stats.hadoop;
022
023import java.io.IOException;
024import java.util.HashMap;
025import java.util.Map;
026
027import cascading.flow.FlowNode;
028import cascading.management.state.ClientState;
029import cascading.stats.FlowNodeStats;
030import cascading.stats.FlowSliceStats;
031import cascading.util.Util;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.mapred.RunningJob;
034import org.apache.hadoop.mapreduce.Job;
035import org.apache.hadoop.mapreduce.TaskCompletionEvent;
036import org.apache.hadoop.mapreduce.TaskID;
037import org.apache.hadoop.mapreduce.TaskReport;
038import org.apache.hadoop.mapreduce.TaskType;
039
040import static cascading.util.Util.formatDurationFromMillis;
041
042/**
043 *
044 */
045public class HadoopNodeStats extends BaseHadoopNodeStats<FlowNodeStats, Map<String, Map<String, Long>>>
046  {
047  private Map<TaskID, String> sliceIDCache = new HashMap<TaskID, String>( 4999 ); // caching for ids
048
049  private HadoopStepStats parentStepStats;
050  private HadoopSliceStats.Kind kind;
051
052  /**
053   * Constructor CascadingStats creates a new CascadingStats instance.
054   *
055   * @param parentStepStats
056   * @param configuration
057   * @param kind
058   * @param flowNode
059   * @param clientState
060   */
061  protected HadoopNodeStats( final HadoopStepStats parentStepStats, Configuration configuration, HadoopSliceStats.Kind kind, FlowNode flowNode, ClientState clientState )
062    {
063    super( flowNode, clientState );
064    this.parentStepStats = parentStepStats;
065    this.kind = kind;
066
067    this.counterCache = new HadoopNodeCounterCache( this, configuration );
068    }
069
070  @Override
071  public String getKind()
072    {
073    if( kind == null )
074      return null;
075
076    return kind.name();
077    }
078
079  private Status getParentStatus()
080    {
081    return parentStepStats.getStatus();
082    }
083
084  private RunningJob getJobStatusClient()
085    {
086    return parentStepStats.getJobStatusClient();
087    }
088
089  /**
090   * Retrieves the TaskReports via the mapreduce API.
091   *
092   * @param kind The kind of TaskReport to retrieve.
093   * @return An array of TaskReports, but never <code>nul</code>.
094   * @throws IOException
095   */
096  private TaskReport[] retrieveTaskReports( HadoopSliceStats.Kind kind ) throws IOException, InterruptedException
097    {
098    Job job = HadoopStepStats.getJob( getJobStatusClient() );
099
100    if( job == null )
101      return new TaskReport[ 0 ];
102
103    switch( kind )
104      {
105      case MAPPER:
106        return job.getTaskReports( TaskType.MAP );
107      case REDUCER:
108        return job.getTaskReports( TaskType.REDUCE );
109      case SETUP:
110        return job.getTaskReports( TaskType.JOB_SETUP );
111      case CLEANUP:
112        return job.getTaskReports( TaskType.JOB_CLEANUP );
113      default:
114        return new TaskReport[ 0 ];
115      }
116    }
117
118  @Override
119  protected boolean captureChildDetailInternal()
120    {
121    if( allChildrenFinished )
122      return true;
123
124    Job job = HadoopStepStats.getJob( getJobStatusClient() );
125
126    if( job == null )
127      return false;
128
129    try
130      {
131      TaskReport[] taskReports = retrieveTaskReports( kind );
132
133      if( taskReports.length == 0 )
134        return false;
135
136      addTaskStats( taskReports, false );
137
138      return true;
139      }
140    catch( IOException exception )
141      {
142      logWarn( "unable to retrieve slice stats via task reports", exception );
143      }
144    catch( InterruptedException exception )
145      {
146      logWarn( "retrieving task reports timed out, consider increasing timeout delay in CounterCache via: '{}', message: {}", CounterCache.COUNTER_TIMEOUT_PROPERTY, exception.getMessage() );
147      }
148
149    return false;
150    }
151
152  protected void addTaskStats( TaskReport[] taskReports, boolean skipLast )
153    {
154    logInfo( "retrieved task reports: {}", taskReports.length );
155
156    long lastFetch = System.currentTimeMillis();
157    boolean fetchedAreFinished = true;
158
159    synchronized( sliceStatsMap )
160      {
161      int added = 0;
162      int updated = 0;
163
164      for( int i = 0; i < taskReports.length - ( skipLast ? 1 : 0 ); i++ )
165        {
166        TaskReport taskReport = taskReports[ i ];
167
168        if( taskReport == null )
169          {
170          logWarn( "found empty task report" );
171          continue;
172          }
173
174        String id = getSliceIDFor( taskReport.getTaskID() );
175        HadoopSliceStats sliceStats = (HadoopSliceStats) sliceStatsMap.get( id );
176
177        if( sliceStats != null )
178          {
179          sliceStats.update( getParentStatus(), kind, taskReport, lastFetch );
180          updated++;
181          }
182        else
183          {
184          sliceStats = new HadoopSliceStats( id, getParentStatus(), kind, taskReport, lastFetch );
185          sliceStatsMap.put( id, sliceStats );
186          added++;
187          }
188
189        if( !sliceStats.getStatus().isFinished() )
190          fetchedAreFinished = false;
191        }
192
193      int total = sliceStatsMap.size();
194      String duration = formatDurationFromMillis( System.currentTimeMillis() - lastFetch );
195
196      logInfo( "added {}, updated: {} slices, with duration: {}, total fetched: {}", added, updated, duration, total );
197      }
198
199    allChildrenFinished = taskReports.length != 0 && fetchedAreFinished;
200    }
201
202  protected void addAttempt( TaskCompletionEvent event )
203    {
204    // the event could be a housekeeping task, which we are not tracking
205    String sliceID = sliceIDCache.get( event.getTaskAttemptId().getTaskID() );
206
207    if( sliceID == null )
208      return;
209
210    FlowSliceStats stats;
211
212    synchronized( sliceStatsMap )
213      {
214      stats = sliceStatsMap.get( sliceID );
215      }
216
217    if( stats == null )
218      return;
219
220    ( (HadoopSliceStats) stats ).addAttempt( event );
221    }
222
223  private String getSliceIDFor( TaskID taskID )
224    {
225    // using taskID instance as #toString is quite painful
226    String id = sliceIDCache.get( taskID );
227
228    if( id == null )
229      {
230      id = Util.createUniqueID();
231      sliceIDCache.put( taskID, id );
232      }
233
234    return id;
235    }
236  }