001/*
002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.stats.hadoop;
022
023import java.io.IOException;
024import java.util.Collection;
025import java.util.HashMap;
026import java.util.HashSet;
027import java.util.Map;
028import java.util.Set;
029
030import cascading.flow.hadoop.util.HadoopUtil;
031import cascading.stats.CascadingStats;
032import cascading.stats.FlowNodeStats;
033import cascading.stats.FlowSliceStats;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.mapreduce.Counter;
036import org.apache.hadoop.mapreduce.CounterGroup;
037import org.apache.hadoop.mapreduce.Counters;
038import org.apache.hadoop.mapreduce.TaskReport;
039
040/**
041 *
042 */
043public class HadoopNodeCounterCache extends CounterCache<FlowNodeStats, Map<String, Map<String, Long>>>
044  {
045  public static final String NODE_COUNTER_MAX_AGE_PROPERTY = "cascading.node.counter.age.max.seconds";
046  public static final int DEFAULT_NODE_CACHED_AGE_MAX = 30; // don't re-fetch task reports for 30 seconds
047
048  private FlowNodeStats flowNodeStats;
049
050  protected HadoopNodeCounterCache( FlowNodeStats flowNodeStats, Configuration configuration )
051    {
052    super( flowNodeStats, configuration );
053    this.flowNodeStats = flowNodeStats;
054
055    // age matters here since we are aggregating task reports vs getting a pre-aggregated value at the node level
056    this.maxAge = configuration.getInt( NODE_COUNTER_MAX_AGE_PROPERTY, DEFAULT_NODE_CACHED_AGE_MAX );
057    }
058
059  @Override
060  protected FlowNodeStats getJobStatusClient()
061    {
062    return flowNodeStats;
063    }
064
065  @Override
066  protected boolean areCountersAvailable( FlowNodeStats runningJob )
067    {
068    return !HadoopUtil.isLocal( (Configuration) runningJob.getFlowNode().getFlowStep().getConfig() );
069    }
070
071  protected Map<String, Map<String, Long>> getCounters( FlowNodeStats flowNodeStats ) throws IOException
072    {
073    // will use final or cached remote stats
074    flowNodeStats.captureDetail( CascadingStats.Type.SLICE );
075
076    Map<String, Map<String, Long>> allCounters = new HashMap<>();
077
078    Collection<FlowSliceStats> children = flowNodeStats.getChildren();
079
080    for( FlowSliceStats sliceStats : children )
081      {
082      TaskReport taskReport = ( (HadoopSliceStats) sliceStats ).getTaskReport();
083
084      Counters counters = taskReport.getTaskCounters();
085
086      for( CounterGroup group : counters )
087        {
088        Map<String, Long> values = allCounters.get( group.getName() );
089
090        if( values == null )
091          {
092          values = new HashMap<>();
093          allCounters.put( group.getName(), values );
094          }
095
096        for( Counter counter : group )
097          {
098          Long value = values.get( counter.getName() );
099
100          if( value == null )
101            value = 0L;
102
103          value += counter.getValue();
104
105          values.put( counter.getName(), value );
106          }
107        }
108      }
109
110    return allCounters;
111    }
112
113  protected Collection<String> getGroupNames( Map<String, Map<String, Long>> groups )
114    {
115    return groups.keySet();
116    }
117
118  protected Set<String> getCountersFor( Map<String, Map<String, Long>> counters, String group )
119    {
120    Set<String> results = new HashSet<>();
121
122    Map<String, Long> map = counters.get( group );
123
124    if( map != null )
125      results.addAll( map.keySet() );
126
127    return results;
128    }
129
130  protected long getCounterValue( Map<String, Map<String, Long>> counters, Enum counter )
131    {
132    return getCounterValue( counters, counter.getDeclaringClass().getName(), counter.name() );
133    }
134
135  protected long getCounterValue( Map<String, Map<String, Long>> counters, String groupName, String counterName )
136    {
137    Map<String, Long> counterGroup = counters.get( groupName );
138
139    if( counterGroup == null )
140      return 0;
141
142    Long counterValue = counterGroup.get( counterName );
143
144    if( counterValue == null )
145      return 0;
146
147    return counterValue;
148    }
149  }