001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.stats.hadoop;
022    
023    import java.io.IOException;
024    import java.util.HashMap;
025    import java.util.Map;
026    
027    import cascading.CascadingException;
028    import cascading.flow.FlowStep;
029    import cascading.management.state.ClientState;
030    import cascading.util.Util;
031    import org.apache.hadoop.mapred.JobClient;
032    import org.apache.hadoop.mapred.JobConf;
033    import org.apache.hadoop.mapred.RunningJob;
034    import org.apache.hadoop.mapred.TaskCompletionEvent;
035    import org.apache.hadoop.mapred.TaskID;
036    import org.apache.hadoop.mapred.TaskReport;
037    import org.slf4j.Logger;
038    import org.slf4j.LoggerFactory;
039    
040    /**
041     * Hadoop 1 specific sub-class of BaseHadoopStats using the mapred API.
042     */
043    public abstract class HadoopStepStats extends BaseHadoopStepStats
044      {
045      /** logger. */
046      private static final Logger LOG = LoggerFactory.getLogger( BaseHadoopStepStats.class );
047    
048      private Map<TaskID, String> idCache = new HashMap<TaskID, String>( 4999 ); // nearest prime, caching for ids
049    
050      protected HadoopStepStats( FlowStep<JobConf> flowStep, ClientState clientState )
051        {
052        super( flowStep, clientState );
053        }
054    
055      @Override
056      protected void addTaskStats( Map<String, HadoopSliceStats> taskStats, HadoopSliceStats.Kind kind, boolean skipLast ) throws IOException
057        {
058        TaskReport[] taskReports = retrieveTaskReports( kind );
059    
060        for( int i = 0; i < taskReports.length - ( skipLast ? 1 : 0 ); i++ )
061          {
062          TaskReport taskReport = taskReports[ i ];
063    
064          if( taskReport == null )
065            {
066            LOG.warn( "found empty task report" );
067            continue;
068            }
069    
070          String id = getIDFor( taskReport.getTaskID() );
071          taskStats.put( id, new HadoopSliceStats( id, getStatus(), kind, stepHasReducers(), taskReport ) );
072    
073          incrementKind( kind );
074          }
075        }
076    
077      /**
078       * Retrieves the TaskReports via the mapred API.
079       */
080      private TaskReport[] retrieveTaskReports( HadoopSliceStats.Kind kind ) throws IOException
081        {
082        JobClient jobClient = getJobClient();
083        RunningJob runningJob = getRunningJob();
084    
085        if( jobClient == null || runningJob == null )
086          return new TaskReport[ 0 ];
087    
088        switch( kind )
089          {
090          case MAPPER:
091            return jobClient.getMapTaskReports( runningJob.getID() );
092          case REDUCER:
093            return jobClient.getReduceTaskReports( runningJob.getID() );
094          case CLEANUP:
095            return jobClient.getCleanupTaskReports( runningJob.getID() );
096          case SETUP:
097            return jobClient.getSetupTaskReports( runningJob.getID() );
098          default:
099            return new TaskReport[ 0 ];
100          }
101        }
102    
103      @Override
104      protected void addAttemptsToTaskStats( Map<String, HadoopSliceStats> taskStats, boolean captureAttempts )
105        {
106        RunningJob runningJob = getRunningJob();
107    
108        if( runningJob == null )
109          return;
110    
111        int count = 0;
112    
113        while( captureAttempts )
114          {
115          try
116            {
117            TaskCompletionEvent[] events = runningJob.getTaskCompletionEvents( count );
118    
119            if( events.length == 0 )
120              break;
121    
122            for( TaskCompletionEvent event : events )
123              {
124              if( event == null )
125                {
126                LOG.warn( "found empty completion event" );
127                continue;
128                }
129    
130              // this will return a housekeeping task, which we are not tracking
131              HadoopSliceStats stats = taskStats.get( getIDFor( event.getTaskAttemptId().getTaskID() ) );
132    
133              if( stats != null )
134                stats.addAttempt( event );
135              }
136    
137            count += events.length;
138            }
139          catch( IOException exception )
140            {
141            throw new CascadingException( exception );
142            }
143          }
144        }
145    
146      private String getIDFor( TaskID taskID )
147        {
148        // using taskID instance as #toString is quite painful
149        String id = idCache.get( taskID );
150    
151        if( id == null )
152          {
153          id = Util.createUniqueID();
154          idCache.put( taskID, id );
155          }
156    
157        return id;
158        }
159      }