001/* 002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.stats.hadoop; 022 023import java.io.IOException; 024import java.util.Collection; 025import java.util.HashMap; 026import java.util.HashSet; 027import java.util.Map; 028import java.util.Set; 029 030import cascading.flow.hadoop.util.HadoopUtil; 031import cascading.stats.CascadingStats; 032import cascading.stats.FlowNodeStats; 033import cascading.stats.FlowSliceStats; 034import org.apache.hadoop.conf.Configuration; 035import org.apache.hadoop.mapred.Counters; 036import org.apache.hadoop.mapred.TaskReport; 037 038/** 039 * 040 */ 041public class HadoopNodeCounterCache extends HadoopCounterCache<FlowNodeStats, Map<String, Map<String, Long>>> 042 { 043 private FlowNodeStats flowNodeStats; 044 045 protected HadoopNodeCounterCache( FlowNodeStats flowNodeStats, Configuration configuration ) 046 { 047 super( flowNodeStats, configuration ); 048 049 this.flowNodeStats = flowNodeStats; 050 } 051 052 @Override 053 protected FlowNodeStats getJobStatusClient() 054 { 055 return flowNodeStats; 056 } 057 058 @Override 059 protected boolean areCountersAvailable( FlowNodeStats runningJob ) 060 { 061 return !HadoopUtil.isLocal( (Configuration) runningJob.getFlowNode().getFlowStep().getConfig() ); 062 } 063 064 protected Map<String, Map<String, Long>> getCounters( FlowNodeStats flowNodeStats ) throws IOException 065 { 066 // will use final or cached remote stats 067 flowNodeStats.captureDetail( CascadingStats.Type.SLICE ); 068 069 Map<String, Map<String, Long>> allCounters = new HashMap<>(); 070 071 Collection<FlowSliceStats> children = flowNodeStats.getChildren(); 072 073 for( FlowSliceStats sliceStats : children ) 074 { 075 TaskReport taskReport = ( (HadoopSliceStats) sliceStats ).getTaskReport(); 076 077 Counters counters = taskReport.getCounters(); 078 079 for( Counters.Group group : counters ) 080 { 081 Map<String, Long> values = allCounters.get( group.getName() ); 082 083 if( values == null ) 084 { 085 values = new HashMap<>(); 086 allCounters.put( group.getName(), values ); 087 } 088 089 for( Counters.Counter counter : group ) 090 { 091 Long value = values.get( counter.getName() ); 092 093 if( value == null ) 094 value = 0L; 095 096 value += counter.getCounter(); 097 098 values.put( counter.getName(), value ); 099 } 100 } 101 } 102 103 return allCounters; 104 } 105 106 protected Collection<String> getGroupNames( Map<String, Map<String, Long>> groups ) 107 { 108 return groups.keySet(); 109 } 110 111 protected Set<String> getCountersFor( Map<String, Map<String, Long>> counters, String group ) 112 { 113 Set<String> results = new HashSet<>(); 114 115 Map<String, Long> map = counters.get( group ); 116 117 if( map != null ) 118 results.addAll( map.keySet() ); 119 120 return results; 121 } 122 123 protected long getCounterValue( Map<String, Map<String, Long>> counters, Enum counter ) 124 { 125 return getCounterValue( counters, counter.getDeclaringClass().getName(), counter.name() ); 126 } 127 128 protected long getCounterValue( Map<String, Map<String, Long>> counters, String groupName, String counterName ) 129 { 130 Map<String, Long> counterGroup = counters.get( groupName ); 131 132 if( counterGroup == null ) 133 return 0; 134 135 Long counterValue = counterGroup.get( counterName ); 136 137 if( counterValue == null ) 138 return 0; 139 140 return counterValue; 141 } 142 }