001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.stats.hadoop;
022    
023    import java.io.IOException;
024    import java.util.Collection;
025    import java.util.Collections;
026    import java.util.HashMap;
027    import java.util.HashSet;
028    import java.util.Map;
029    import java.util.Set;
030    import java.util.concurrent.Callable;
031    import java.util.concurrent.ExecutionException;
032    import java.util.concurrent.ExecutorService;
033    import java.util.concurrent.Executors;
034    import java.util.concurrent.Future;
035    import java.util.concurrent.ThreadFactory;
036    import java.util.concurrent.TimeUnit;
037    import java.util.concurrent.TimeoutException;
038    
039    import cascading.flow.FlowException;
040    import cascading.flow.FlowStep;
041    import cascading.management.state.ClientState;
042    import cascading.stats.FlowStepStats;
043    import cascading.util.Util;
044    import org.apache.hadoop.mapred.Counters;
045    import org.apache.hadoop.mapred.JobClient;
046    import org.apache.hadoop.mapred.JobConf;
047    import org.apache.hadoop.mapred.RunningJob;
048    import org.apache.hadoop.mapred.TaskCompletionEvent;
049    import org.apache.hadoop.mapred.TaskID;
050    import org.apache.hadoop.mapred.TaskReport;
051    import org.slf4j.Logger;
052    import org.slf4j.LoggerFactory;
053    
054    /** Class HadoopStepStats provides Hadoop specific statistics and methods to underlying Hadoop facilities. */
055    public abstract class HadoopStepStats extends FlowStepStats
056      {
057      public static final String COUNTER_TIMEOUT_PROPERTY = "cascading.step.counter.timeout";
058    
059      /** Field LOG */
060      private static final Logger LOG = LoggerFactory.getLogger( HadoopStepStats.class );
061      public static final int TIMEOUT_MAX = 3;
062    
063      private Map<TaskID, String> idCache = new HashMap<TaskID, String>( 4999 ); // nearest prime, caching for ids
064    
065      /** Field numMapTasks */
066      int numMapTasks;
067      /** Field numReducerTasks */
068      int numReduceTasks;
069    
070      /** Fields counters */
071      private Counters cachedCounters = null;
072    
073      /** Fields timeouts */
074      private int timeouts;
075    
076      /** Field taskStats */
077      Map<String, HadoopSliceStats> taskStats = (Map<String, HadoopSliceStats>) Collections.EMPTY_MAP;
078    
079      protected HadoopStepStats( FlowStep<JobConf> flowStep, ClientState clientState )
080        {
081        super( flowStep, clientState );
082        }
083    
084      /**
085       * Method getTaskStats returns the taskStats of this HadoopStepStats object.
086       *
087       * @return the taskStats (type ArrayList<HadoopTaskStats>) of this HadoopStepStats object.
088       */
089      public Map<String, HadoopSliceStats> getTaskStats()
090        {
091        return taskStats;
092        }
093    
094      protected void setTaskStats( Map<String, HadoopSliceStats> taskStats )
095        {
096        this.taskStats = taskStats;
097        }
098    
099      /**
100       * Method getNumMapTasks returns the numMapTasks from the Hadoop job file.
101       *
102       * @return the numMapTasks (type int) of this HadoopStepStats object.
103       */
104      public int getNumMapTasks()
105        {
106        return numMapTasks;
107        }
108    
109      void setNumMapTasks( int numMapTasks )
110        {
111        this.numMapTasks = numMapTasks;
112        }
113    
114      /**
115       * Method getNumReduceTasks returns the numReducerTasks from the Hadoop job file.
116       *
117       * @return the numReducerTasks (type int) of this HadoopStepStats object.
118       */
119      public int getNumReduceTasks()
120        {
121        return numReduceTasks;
122        }
123    
124      void setNumReduceTasks( int numReduceTasks )
125        {
126        this.numReduceTasks = numReduceTasks;
127        }
128    
129      /**
130       * Method getJobID returns the Hadoop running job JobID.
131       *
132       * @return the jobID (type String) of this HadoopStepStats object.
133       */
134      public String getJobID()
135        {
136        if( getRunningJob() == null )
137          return null;
138    
139        return getRunningJob().getJobID();
140        }
141    
142      /**
143       * Method getJobClient returns the Hadoop {@link JobClient} managing this Hadoop job.
144       *
145       * @return the jobClient (type JobClient) of this HadoopStepStats object.
146       */
147      public abstract JobClient getJobClient();
148    
149      /**
150       * Method getRunningJob returns the Hadoop {@link RunningJob} managing this Hadoop job.
151       *
152       * @return the runningJob (type RunningJob) of this HadoopStepStats object.
153       */
154      public abstract RunningJob getRunningJob();
155    
156      /**
157       * Method getCounterGroups returns all of the Hadoop counter groups.
158       *
159       * @return the counterGroups (type Collection<String>) of this HadoopStepStats object.
160       */
161      @Override
162      public Collection<String> getCounterGroups()
163        {
164        Counters counters = cachedCounters();
165    
166        if( counters == null )
167          return Collections.emptySet();
168    
169        return Collections.unmodifiableCollection( counters.getGroupNames() );
170        }
171    
172      /**
173       * Method getCounterGroupsMatching returns all the Hadoop counter groups that match the give regex pattern.
174       *
175       * @param regex of String
176       * @return Collection<String>
177       */
178      @Override
179      public Collection<String> getCounterGroupsMatching( String regex )
180        {
181        Counters counters = cachedCounters();
182    
183        if( counters == null )
184          return Collections.emptySet();
185    
186        Set<String> results = new HashSet<String>();
187    
188        for( String counter : counters.getGroupNames() )
189          {
190          if( counter.matches( regex ) )
191            results.add( counter );
192          }
193    
194        return Collections.unmodifiableCollection( results );
195        }
196    
197      /**
198       * Method getCountersFor returns the Hadoop counters for the given group.
199       *
200       * @param group of String
201       * @return Collection<String>
202       */
203      @Override
204      public Collection<String> getCountersFor( String group )
205        {
206        Counters counters = cachedCounters();
207    
208        if( counters == null )
209          return Collections.emptySet();
210    
211        Set<String> results = new HashSet<String>();
212    
213        for( Counters.Counter counter : counters.getGroup( group ) )
214          results.add( counter.getName() );
215    
216        return Collections.unmodifiableCollection( results );
217        }
218    
219      /**
220       * Method getCounterValue returns the Hadoop counter value for the given counter enum.
221       *
222       * @param counter of Enum
223       * @return long
224       */
225      @Override
226      public long getCounterValue( Enum counter )
227        {
228        Counters counters = cachedCounters();
229    
230        if( counters == null )
231          return 0;
232    
233        return counters.getCounter( counter );
234        }
235    
236      /**
237       * Method getCounterValue returns the Hadoop counter value for the given group and counter name.
238       *
239       * @param group   of String
240       * @param counter of String
241       * @return long
242       */
243      @Override
244      public long getCounterValue( String group, String counter )
245        {
246        Counters counters = cachedCounters();
247    
248        if( counters == null )
249          return 0;
250    
251        Counters.Group counterGroup = counters.getGroup( group );
252    
253        if( group == null )
254          return 0;
255    
256        // geCounter actually searches the display name, wtf
257        // in theory this is lazily created if does not exist, but don't rely on it
258        Counters.Counter counterValue = counterGroup.getCounterForName( counter );
259    
260        if( counter == null )
261          return 0;
262    
263        return counterValue.getValue();
264        }
265    
266      protected Counters cachedCounters()
267        {
268        return cachedCounters( false );
269        }
270    
271      protected synchronized Counters cachedCounters( boolean force )
272        {
273        if( !force && ( isFinished() || timeouts >= TIMEOUT_MAX ) )
274          return cachedCounters;
275    
276        RunningJob runningJob = getRunningJob();
277    
278        if( runningJob == null )
279          return cachedCounters;
280    
281        Future<Counters> future = runFuture( runningJob );
282    
283        int timeout = ( (JobConf) getFlowStep().getConfig() ).getInt( COUNTER_TIMEOUT_PROPERTY, 5 );
284    
285        try
286          {
287          Counters fetched = future.get( timeout, TimeUnit.SECONDS );
288    
289          if( fetched != null )
290            cachedCounters = fetched;
291          }
292        catch( InterruptedException exception )
293          {
294          LOG.warn( "fetching counters was interrupted" );
295          }
296        catch( ExecutionException exception )
297          {
298          if( cachedCounters != null )
299            {
300            LOG.error( "unable to get remote counters, returning cached values", exception.getCause() );
301    
302            return cachedCounters;
303            }
304    
305          LOG.error( "unable to get remote counters, no cached values, throwing exception", exception.getCause() );
306    
307          if( exception.getCause() instanceof FlowException )
308            throw (FlowException) exception.getCause();
309    
310          throw new FlowException( exception.getCause() );
311          }
312        catch( TimeoutException exception )
313          {
314          timeouts++;
315    
316          if( timeouts >= TIMEOUT_MAX )
317            LOG.warn( "fetching counters timed out after: {} seconds, final attempt: {}", timeout, timeouts );
318          else
319            LOG.warn( "fetching counters timed out after: {} seconds, attempts: {}", timeout, timeouts );
320          }
321    
322        return cachedCounters;
323        }
324    
325      // hardcoded at one thread to force serialization across all requesters in the jvm
326      // this likely prevents the deadlocks the futures are safeguards against
327      private static ExecutorService futuresPool = Executors.newSingleThreadExecutor( new ThreadFactory()
328      {
329      @Override
330      public Thread newThread( Runnable runnable )
331        {
332        Thread thread = new Thread( runnable, "stats-futures" );
333    
334        thread.setDaemon( true );
335    
336        return thread;
337        }
338      } );
339    
340      private Future<Counters> runFuture( final RunningJob runningJob )
341        {
342        Callable<Counters> task = new Callable<Counters>()
343        {
344        @Override
345        public Counters call() throws Exception
346          {
347          try
348            {
349            return runningJob.getCounters();
350            }
351          catch( IOException exception )
352            {
353            throw new FlowException( "unable to get remote counter values" );
354            }
355          }
356        };
357    
358        return futuresPool.submit( task );
359        }
360    
361      /**
362       * Returns the underlying Map tasks progress percentage.
363       * <p/>
364       * This method is experimental.
365       *
366       * @return float
367       */
368      public float getMapProgress()
369        {
370        RunningJob runningJob = getRunningJob();
371    
372        if( runningJob == null )
373          return 0;
374    
375        try
376          {
377          return runningJob.mapProgress();
378          }
379        catch( IOException exception )
380          {
381          throw new FlowException( "unable to get progress" );
382          }
383        }
384    
385      /**
386       * Returns the underlying Reduce tasks progress percentage.
387       * <p/>
388       * This method is experimental.
389       *
390       * @return float
391       */
392      public float getReduceProgress()
393        {
394        RunningJob runningJob = getRunningJob();
395    
396        if( runningJob == null )
397          return 0;
398    
399        try
400          {
401          return runningJob.reduceProgress();
402          }
403        catch( IOException exception )
404          {
405          throw new FlowException( "unable to get progress" );
406          }
407        }
408    
409      public String getStatusURL()
410        {
411        RunningJob runningJob = getRunningJob();
412    
413        if( runningJob == null )
414          return null;
415    
416        return runningJob.getTrackingURL();
417        }
418    
419      /**
420       * Method getChildren returns the children of this HadoopStepStats object.
421       *
422       * @return the children (type Collection) of this HadoopStepStats object.
423       */
424      @Override
425      public Collection getChildren()
426        {
427        return getTaskStats().values();
428        }
429    
430      public Set<String> getChildIDs()
431        {
432        return getTaskStats().keySet();
433        }
434    
435      /** Synchronized to prevent state changes mid record, #stop may be called out of band */
436      @Override
437      public synchronized void recordChildStats()
438        {
439        try
440          {
441          cachedCounters( true );
442          }
443        catch( Exception exception )
444          {
445          // do nothing
446          }
447    
448        // if null instance don't bother capturing detail
449        if( !clientState.isEnabled() )
450          return;
451    
452        captureDetail();
453    
454        try
455          {
456          for( String id : taskStats.keySet() )
457            clientState.record( id, taskStats.get( id ) );
458          }
459        catch( Exception exception )
460          {
461          LOG.error( "unable to record slice stats", exception );
462          }
463        }
464    
465      /** Method captureDetail captures statistics task details and completion events. */
466      @Override
467      public synchronized void captureDetail()
468        {
469        captureDetail( true );
470        }
471    
472      public void captureDetail( boolean captureAttempts )
473        {
474        HashMap<String, HadoopSliceStats> newStats = new HashMap<String, HadoopSliceStats>();
475    
476        JobClient jobClient = getJobClient();
477        RunningJob runningJob = getRunningJob();
478    
479        if( jobClient == null || runningJob == null )
480          return;
481    
482        numMapTasks = 0;
483        numReduceTasks = 0;
484    
485        try
486          {
487          // cleanup/setup tasks have no useful info so far.
488    //      addTaskStats( newStats, HadoopTaskStats.Kind.SETUP, jobClient.getSetupTaskReports( runningJob.getID() ), false );
489    //      addTaskStats( newStats, HadoopTaskStats.Kind.CLEANUP, jobClient.getCleanupTaskReports( runningJob.getID() ), false );
490          addTaskStats( newStats, HadoopSliceStats.Kind.MAPPER, jobClient.getMapTaskReports( runningJob.getID() ), false );
491          addTaskStats( newStats, HadoopSliceStats.Kind.REDUCER, jobClient.getReduceTaskReports( runningJob.getID() ), false );
492    
493          int count = 0;
494    
495          while( captureAttempts )
496            {
497            TaskCompletionEvent[] events = runningJob.getTaskCompletionEvents( count );
498    
499            if( events.length == 0 )
500              break;
501    
502            addAttemptsToTaskStats( newStats, events );
503            count += events.length;
504            }
505    
506          setTaskStats( newStats );
507          }
508        catch( IOException exception )
509          {
510          LOG.warn( "unable to get task stats", exception );
511          }
512        }
513    
514      private void addTaskStats( Map<String, HadoopSliceStats> taskStats, HadoopSliceStats.Kind kind, TaskReport[] taskReports, boolean skipLast )
515        {
516        for( int i = 0; i < taskReports.length - ( skipLast ? 1 : 0 ); i++ )
517          {
518          TaskReport taskReport = taskReports[ i ];
519    
520          if( taskReport == null )
521            {
522            LOG.warn( "found empty task report" );
523            continue;
524            }
525    
526          String id = getIDFor( taskReport.getTaskID() );
527          taskStats.put( id, new HadoopSliceStats( id, getStatus(), kind, stepHasReducers(), taskReport ) );
528    
529          incrementKind( kind );
530          }
531        }
532    
533      private boolean stepHasReducers()
534        {
535        return !getFlowStep().getGroups().isEmpty();
536        }
537    
538      private void incrementKind( HadoopSliceStats.Kind kind )
539        {
540        switch( kind )
541          {
542          case SETUP:
543            break;
544          case MAPPER:
545            numMapTasks++;
546            break;
547          case REDUCER:
548            numReduceTasks++;
549            break;
550          case CLEANUP:
551            break;
552          }
553        }
554    
555      private void addAttemptsToTaskStats( Map<String, HadoopSliceStats> taskStats, TaskCompletionEvent[] events )
556        {
557        for( TaskCompletionEvent event : events )
558          {
559          if( event == null )
560            {
561            LOG.warn( "found empty completion event" );
562            continue;
563            }
564    
565          // this will return a housekeeping task, which we are not tracking
566          HadoopSliceStats stats = taskStats.get( getIDFor( event.getTaskAttemptId().getTaskID() ) );
567    
568          if( stats != null )
569            stats.addAttempt( event );
570          }
571        }
572    
573      private String getIDFor( TaskID taskID )
574        {
575        // using taskID instance as #toString is quite painful
576        String id = idCache.get( taskID );
577    
578        if( id == null )
579          {
580          id = Util.createUniqueID();
581          idCache.put( taskID, id );
582          }
583    
584        return id;
585        }
586      }