001/* 002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.stats.hadoop; 022 023import java.io.IOException; 024import java.util.HashMap; 025import java.util.Map; 026 027import cascading.flow.FlowNode; 028import cascading.management.state.ClientState; 029import cascading.stats.BaseCachedNodeStats; 030import cascading.stats.FlowNodeStats; 031import cascading.stats.FlowSliceStats; 032import cascading.util.Util; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.mapred.JobClient; 035import org.apache.hadoop.mapred.RunningJob; 036import org.apache.hadoop.mapred.TaskCompletionEvent; 037import org.apache.hadoop.mapred.TaskID; 038import org.apache.hadoop.mapred.TaskReport; 039 040import static cascading.util.Util.formatDurationFromMillis; 041 042/** 043 * 044 */ 045public class HadoopNodeStats extends BaseCachedNodeStats<Configuration, FlowNodeStats, Map<String, Map<String, Long>>> 046 { 047 private Map<TaskID, String> sliceIDCache = new HashMap<TaskID, String>( 4999 ); // caching for ids 048 049 private HadoopStepStats parentStepStats; 050 private HadoopSliceStats.Kind kind; 051 052 /** 053 * Constructor CascadingStats creates a new CascadingStats instance. 054 * 055 * @param parentStepStats 056 * @param configuration 057 * @param kind 058 * @param flowNode 059 * @param clientState 060 */ 061 protected HadoopNodeStats( final HadoopStepStats parentStepStats, Configuration configuration, HadoopSliceStats.Kind kind, FlowNode flowNode, ClientState clientState ) 062 { 063 super( flowNode, clientState ); 064 this.parentStepStats = parentStepStats; 065 this.kind = kind; 066 067 this.counterCache = new HadoopNodeCounterCache( this, configuration ); 068 } 069 070 @Override 071 public String getKind() 072 { 073 if( kind == null ) 074 return null; 075 076 return kind.name(); 077 } 078 079 private Status getParentStatus() 080 { 081 return parentStepStats.getStatus(); 082 } 083 084 @Override 085 protected boolean captureChildDetailInternal() 086 { 087 if( allChildrenFinished ) 088 return true; 089 090 JobClient jobClient = parentStepStats.getJobClient(); 091 RunningJob runningJob = parentStepStats.getJobStatusClient(); 092 093 if( jobClient == null || runningJob == null ) 094 return false; 095 096 try 097 { 098 TaskReport[] taskReports; // todo: use Job task reports 099 100 if( kind == HadoopSliceStats.Kind.MAPPER ) 101 taskReports = jobClient.getMapTaskReports( runningJob.getID() ); 102 else 103 taskReports = jobClient.getReduceTaskReports( runningJob.getID() ); 104 105 if( taskReports.length == 0 ) 106 return false; 107 108 addTaskStats( taskReports, false ); 109 110 return true; 111 } 112 catch( IOException exception ) 113 { 114 logWarn( "unable to retrieve slice stats via task reports", exception ); 115 } 116 117 return false; 118 } 119 120 protected void addTaskStats( TaskReport[] taskReports, boolean skipLast ) 121 { 122 logInfo( "retrieved task reports: {}", taskReports.length ); 123 124 long lastFetch = System.currentTimeMillis(); 125 boolean fetchedAreFinished = true; 126 127 synchronized( sliceStatsMap ) 128 { 129 int added = 0; 130 int updated = 0; 131 132 for( int i = 0; i < taskReports.length - ( skipLast ? 1 : 0 ); i++ ) 133 { 134 TaskReport taskReport = taskReports[ i ]; 135 136 if( taskReport == null ) 137 { 138 logWarn( "found empty task report" ); 139 continue; 140 } 141 142 String id = getSliceIDFor( taskReport.getTaskID() ); 143 HadoopSliceStats sliceStats = new HadoopSliceStats( id, getParentStatus(), kind, taskReport, lastFetch ); 144 145 if( sliceStatsMap.put( id, sliceStats ) != null ) 146 updated++; 147 else 148 added++; 149 150 if( !sliceStats.getStatus().isFinished() ) 151 fetchedAreFinished = false; 152 } 153 154 int total = sliceStatsMap.size(); 155 String duration = formatDurationFromMillis( System.currentTimeMillis() - lastFetch ); 156 157 logInfo( "added {}, updated: {} slices, with duration: {}, total fetched: {}", added, updated, duration, total ); 158 } 159 160 allChildrenFinished = taskReports.length != 0 && fetchedAreFinished; 161 } 162 163 protected void addAttempt( TaskCompletionEvent event ) 164 { 165 // the event could be a housekeeping task, which we are not tracking 166 String sliceID = sliceIDCache.get( event.getTaskAttemptId().getTaskID() ); 167 168 if( sliceID == null ) 169 return; 170 171 FlowSliceStats stats; 172 173 synchronized( sliceStatsMap ) 174 { 175 stats = sliceStatsMap.get( sliceID ); 176 } 177 178 if( stats == null ) 179 return; 180 181 ( (HadoopSliceStats) stats ).addAttempt( event ); 182 } 183 184 private String getSliceIDFor( TaskID taskID ) 185 { 186 // using taskID instance as #toString is quite painful 187 String id = sliceIDCache.get( taskID ); 188 189 if( id == null ) 190 { 191 id = Util.createUniqueID(); 192 sliceIDCache.put( taskID, id ); 193 } 194 195 return id; 196 } 197 }