001/* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.stats.hadoop; 022 023import java.io.IOException; 024import java.util.Collection; 025import java.util.HashMap; 026import java.util.HashSet; 027import java.util.Map; 028import java.util.Set; 029 030import cascading.flow.hadoop.util.HadoopUtil; 031import cascading.stats.CascadingStats; 032import cascading.stats.FlowNodeStats; 033import cascading.stats.FlowSliceStats; 034import org.apache.hadoop.conf.Configuration; 035import org.apache.hadoop.mapreduce.Counter; 036import org.apache.hadoop.mapreduce.CounterGroup; 037import org.apache.hadoop.mapreduce.Counters; 038import org.apache.hadoop.mapreduce.TaskReport; 039 040/** 041 * 042 */ 043public class HadoopNodeCounterCache extends CounterCache<FlowNodeStats, Map<String, Map<String, Long>>> 044 { 045 public static final String NODE_COUNTER_MAX_AGE_PROPERTY = "cascading.node.counter.age.max.seconds"; 046 public static final int DEFAULT_NODE_CACHED_AGE_MAX = 30; // don't re-fetch task reports for 30 seconds 047 048 private FlowNodeStats flowNodeStats; 049 050 protected HadoopNodeCounterCache( FlowNodeStats flowNodeStats, Configuration configuration ) 051 { 052 super( flowNodeStats, configuration ); 053 this.flowNodeStats = flowNodeStats; 054 055 // age matters here since we are aggregating task reports vs getting a pre-aggregated value at the node level 056 this.maxAge = configuration.getInt( NODE_COUNTER_MAX_AGE_PROPERTY, DEFAULT_NODE_CACHED_AGE_MAX ); 057 } 058 059 @Override 060 protected FlowNodeStats getJobStatusClient() 061 { 062 return flowNodeStats; 063 } 064 065 @Override 066 protected boolean areCountersAvailable( FlowNodeStats runningJob ) 067 { 068 return !HadoopUtil.isLocal( (Configuration) runningJob.getFlowNode().getFlowStep().getConfig() ); 069 } 070 071 protected Map<String, Map<String, Long>> getCounters( FlowNodeStats flowNodeStats ) throws IOException 072 { 073 // will use final or cached remote stats 074 flowNodeStats.captureDetail( CascadingStats.Type.SLICE ); 075 076 Map<String, Map<String, Long>> allCounters = new HashMap<>(); 077 078 Collection<FlowSliceStats> children = flowNodeStats.getChildren(); 079 080 for( FlowSliceStats sliceStats : children ) 081 { 082 TaskReport taskReport = ( (HadoopSliceStats) sliceStats ).getTaskReport(); 083 084 Counters counters = taskReport.getTaskCounters(); 085 086 for( CounterGroup group : counters ) 087 { 088 Map<String, Long> values = allCounters.get( group.getName() ); 089 090 if( values == null ) 091 { 092 values = new HashMap<>(); 093 allCounters.put( group.getName(), values ); 094 } 095 096 for( Counter counter : group ) 097 { 098 Long value = values.get( counter.getName() ); 099 100 if( value == null ) 101 value = 0L; 102 103 value += counter.getValue(); 104 105 values.put( counter.getName(), value ); 106 } 107 } 108 } 109 110 return allCounters; 111 } 112 113 protected Collection<String> getGroupNames( Map<String, Map<String, Long>> groups ) 114 { 115 return groups.keySet(); 116 } 117 118 protected Set<String> getCountersFor( Map<String, Map<String, Long>> counters, String group ) 119 { 120 Set<String> results = new HashSet<>(); 121 122 Map<String, Long> map = counters.get( group ); 123 124 if( map != null ) 125 results.addAll( map.keySet() ); 126 127 return results; 128 } 129 130 protected long getCounterValue( Map<String, Map<String, Long>> counters, Enum counter ) 131 { 132 return getCounterValue( counters, counter.getDeclaringClass().getName(), counter.name() ); 133 } 134 135 protected long getCounterValue( Map<String, Map<String, Long>> counters, String groupName, String counterName ) 136 { 137 Map<String, Long> counterGroup = counters.get( groupName ); 138 139 if( counterGroup == null ) 140 return 0; 141 142 Long counterValue = counterGroup.get( counterName ); 143 144 if( counterValue == null ) 145 return 0; 146 147 return counterValue; 148 } 149 }