001/* 002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.stats.hadoop; 022 023import java.io.IOException; 024import java.util.Collection; 025import java.util.HashMap; 026import java.util.HashSet; 027import java.util.Map; 028import java.util.Set; 029 030import cascading.flow.hadoop.util.HadoopUtil; 031import cascading.stats.CascadingStats; 032import cascading.stats.FlowNodeStats; 033import cascading.stats.FlowSliceStats; 034import org.apache.hadoop.conf.Configuration; 035import org.apache.hadoop.mapreduce.Counter; 036import org.apache.hadoop.mapreduce.CounterGroup; 037import org.apache.hadoop.mapreduce.Counters; 038import org.apache.hadoop.mapreduce.TaskReport; 039 040/** 041 * 042 */ 043public class HadoopNodeCounterCache extends HadoopCounterCache<FlowNodeStats, Map<String, Map<String, Long>>> 044 { 045 private FlowNodeStats flowNodeStats; 046 047 private Configuration configuration; 048 049 protected HadoopNodeCounterCache( FlowNodeStats flowNodeStats, Configuration configuration ) 050 { 051 super( flowNodeStats, configuration ); 052 053 this.flowNodeStats = flowNodeStats; 054 this.configuration = configuration; 055 } 056 057 @Override 058 protected FlowNodeStats getJobStatusClient() 059 { 060 return flowNodeStats; 061 } 062 063 @Override 064 protected boolean areCountersAvailable( FlowNodeStats runningJob ) 065 { 066 return !HadoopUtil.isLocal( this.configuration ); 067 } 068 069 protected Map<String, Map<String, Long>> getCounters( FlowNodeStats flowNodeStats ) throws IOException 070 { 071 // will use final or cached remote stats 072 flowNodeStats.captureDetail( CascadingStats.Type.SLICE ); 073 074 Map<String, Map<String, Long>> allCounters = new HashMap<>(); 075 076 Collection<FlowSliceStats> children = flowNodeStats.getChildren(); 077 078 for( FlowSliceStats sliceStats : children ) 079 { 080 TaskReport taskReport = ( (HadoopSliceStats) sliceStats ).getTaskReport(); 081 082 Counters counters = taskReport.getTaskCounters(); 083 084 for( CounterGroup group : counters ) 085 { 086 Map<String, Long> values = allCounters.get( group.getName() ); 087 088 if( values == null ) 089 { 090 values = new HashMap<>(); 091 allCounters.put( group.getName(), values ); 092 } 093 094 for( Counter counter : group ) 095 { 096 Long value = values.get( counter.getName() ); 097 098 if( value == null ) 099 value = 0L; 100 101 value += counter.getValue(); 102 103 values.put( counter.getName(), value ); 104 } 105 } 106 } 107 108 return allCounters; 109 } 110 111 protected Collection<String> getGroupNames( Map<String, Map<String, Long>> groups ) 112 { 113 return groups.keySet(); 114 } 115 116 protected Set<String> getCountersFor( Map<String, Map<String, Long>> counters, String group ) 117 { 118 Set<String> results = new HashSet<>(); 119 120 Map<String, Long> map = counters.get( group ); 121 122 if( map != null ) 123 results.addAll( map.keySet() ); 124 125 return results; 126 } 127 128 protected long getCounterValue( Map<String, Map<String, Long>> counters, Enum counter ) 129 { 130 return getCounterValue( counters, counter.getDeclaringClass().getName(), counter.name() ); 131 } 132 133 protected long getCounterValue( Map<String, Map<String, Long>> counters, String groupName, String counterName ) 134 { 135 Map<String, Long> counterGroup = counters.get( groupName ); 136 137 if( counterGroup == null ) 138 return 0; 139 140 Long counterValue = counterGroup.get( counterName ); 141 142 if( counterValue == null ) 143 return 0; 144 145 return counterValue; 146 } 147 }