001/*
002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.stats.hadoop;
022
023import java.io.IOException;
024import java.util.Collection;
025import java.util.HashMap;
026import java.util.HashSet;
027import java.util.Map;
028import java.util.Set;
029
030import cascading.flow.hadoop.util.HadoopUtil;
031import cascading.stats.CascadingStats;
032import cascading.stats.FlowNodeStats;
033import cascading.stats.FlowSliceStats;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.mapred.Counters;
036import org.apache.hadoop.mapred.TaskReport;
037
038/**
039 *
040 */
041public class HadoopNodeCounterCache extends HadoopCounterCache<FlowNodeStats, Map<String, Map<String, Long>>>
042  {
043  private FlowNodeStats flowNodeStats;
044
045  protected HadoopNodeCounterCache( FlowNodeStats flowNodeStats, Configuration configuration )
046    {
047    super( flowNodeStats, configuration );
048
049    this.flowNodeStats = flowNodeStats;
050    }
051
052  @Override
053  protected FlowNodeStats getJobStatusClient()
054    {
055    return flowNodeStats;
056    }
057
058  @Override
059  protected boolean areCountersAvailable( FlowNodeStats runningJob )
060    {
061    return !HadoopUtil.isLocal( (Configuration) runningJob.getFlowNode().getFlowStep().getConfig() );
062    }
063
064  protected Map<String, Map<String, Long>> getCounters( FlowNodeStats flowNodeStats ) throws IOException
065    {
066    // will use final or cached remote stats
067    flowNodeStats.captureDetail( CascadingStats.Type.SLICE );
068
069    Map<String, Map<String, Long>> allCounters = new HashMap<>();
070
071    Collection<FlowSliceStats> children = flowNodeStats.getChildren();
072
073    for( FlowSliceStats sliceStats : children )
074      {
075      TaskReport taskReport = ( (HadoopSliceStats) sliceStats ).getTaskReport();
076
077      Counters counters = taskReport.getCounters();
078
079      for( Counters.Group group : counters )
080        {
081        Map<String, Long> values = allCounters.get( group.getName() );
082
083        if( values == null )
084          {
085          values = new HashMap<>();
086          allCounters.put( group.getName(), values );
087          }
088
089        for( Counters.Counter counter : group )
090          {
091          Long value = values.get( counter.getName() );
092
093          if( value == null )
094            value = 0L;
095
096          value += counter.getCounter();
097
098          values.put( counter.getName(), value );
099          }
100        }
101      }
102
103    return allCounters;
104    }
105
106  protected Collection<String> getGroupNames( Map<String, Map<String, Long>> groups )
107    {
108    return groups.keySet();
109    }
110
111  protected Set<String> getCountersFor( Map<String, Map<String, Long>> counters, String group )
112    {
113    Set<String> results = new HashSet<>();
114
115    Map<String, Long> map = counters.get( group );
116
117    if( map != null )
118      results.addAll( map.keySet() );
119
120    return results;
121    }
122
123  protected long getCounterValue( Map<String, Map<String, Long>> counters, Enum counter )
124    {
125    return getCounterValue( counters, counter.getDeclaringClass().getName(), counter.name() );
126    }
127
128  protected long getCounterValue( Map<String, Map<String, Long>> counters, String groupName, String counterName )
129    {
130    Map<String, Long> counterGroup = counters.get( groupName );
131
132    if( counterGroup == null )
133      return 0;
134
135    Long counterValue = counterGroup.get( counterName );
136
137    if( counterValue == null )
138      return 0;
139
140    return counterValue;
141    }
142  }