001/* 002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.stats.tez.util; 022 023import java.io.IOException; 024import java.util.HashMap; 025import java.util.Iterator; 026import java.util.Map; 027import java.util.Set; 028import javax.annotation.Nullable; 029 030import cascading.CascadingException; 031import org.apache.hadoop.yarn.api.records.ApplicationId; 032import org.apache.tez.client.FrameworkClient; 033import org.apache.tez.common.ATSConstants; 034import org.apache.tez.dag.api.TezConfiguration; 035import org.apache.tez.dag.api.TezException; 036import org.apache.tez.dag.api.client.DAGClient; 037import org.apache.tez.dag.api.client.DAGClientTimelineImpl; 038import org.apache.tez.dag.api.client.DAGStatus; 039import org.apache.tez.dag.api.client.StatusGetOpts; 040import org.apache.tez.dag.api.client.VertexStatus; 041import org.codehaus.jettison.json.JSONArray; 042import org.codehaus.jettison.json.JSONException; 043import org.codehaus.jettison.json.JSONObject; 044import org.slf4j.Logger; 045import org.slf4j.LoggerFactory; 046 047import static org.apache.tez.common.ATSConstants.*; 048import static org.apache.tez.dag.history.logging.EntityTypes.TEZ_TASK_ID; 049 050/** 051 * 052 */ 053public class TezTimelineClient extends DAGClientTimelineImpl implements TimelineClient 054 { 055 private static final Logger LOG = LoggerFactory.getLogger( TezTimelineClient.class ); 056 057 private static final String FILTER_BY_FIELDS = "primaryfilters,otherinfo"; 058 059 private final String dagId; 060 private final FrameworkClient frameworkClient; 061 private final DAGClient dagClient; 062 063 public TezTimelineClient( ApplicationId appId, String dagId, TezConfiguration conf, FrameworkClient frameworkClient, DAGClient dagClient ) throws TezException 064 { 065 super( appId, dagId, conf, frameworkClient, 5000 ); 066 this.dagId = dagId; 067 this.frameworkClient = frameworkClient; 068 this.dagClient = dagClient; 069 } 070 071 public DAGClient getDAGClient() 072 { 073 return dagClient; 074 } 075 076 public FrameworkClient getFrameworkClient() 077 { 078 return frameworkClient; 079 } 080 081 @Override 082 public DAGStatus getDAGStatus( @Nullable Set<StatusGetOpts> statusOptions ) throws IOException, TezException 083 { 084 return dagClient.getDAGStatus( statusOptions ); 085 } 086 087 @Override 088 public VertexStatus getVertexStatus( String vertexName, Set<StatusGetOpts> statusOptions ) throws IOException, TezException 089 { 090 return dagClient.getVertexStatus( vertexName, statusOptions ); 091 } 092 093 @Override 094 public String getVertexID( String vertexName ) throws IOException, TezException 095 { 096 // the filter 'vertexName' is in the 'otherinfo' field, so it must be requested, otherwise timeline server throws 097 // an NPE. to be safe, we include both fields in the result 098 String format = "%s/%s?primaryFilter=%s:%s&secondaryFilter=vertexName:%s&fields=%s"; 099 String url = String.format( format, baseUri, TEZ_VERTEX_ID, TEZ_DAG_ID, dagId, vertexName, FILTER_BY_FIELDS ); 100 101 JSONObject jsonRoot = getJsonRootEntity( url ); 102 JSONArray entitiesNode = jsonRoot.optJSONArray( ENTITIES ); 103 104 if( entitiesNode == null || entitiesNode.length() != 1 ) 105 throw new CascadingException( "failed to get vertex status from timeline server" ); 106 107 try 108 { 109 return getJsonObject( entitiesNode, 0 ).getString( ENTITY ); 110 } 111 catch( JSONException exception ) 112 { 113 throw new CascadingException( "unable to get vertex node", exception ); 114 } 115 } 116 117 @Override 118 public Iterator<TaskStatus> getVertexChildren( String vertexID, int limit, String startTaskID ) throws IOException, TezException 119 { 120 if( vertexID == null ) 121 throw new IllegalArgumentException( "vertexID is required" ); 122 123 String format = "%s/%s?primaryFilter=%s:%s&fields=%s&limit=%s"; 124 String url = String.format( format, baseUri, TEZ_TASK_ID, TEZ_VERTEX_ID, vertexID, FILTER_BY_FIELDS, limit ); 125 126 if( startTaskID != null ) 127 url = String.format( "%s&fromId=%s", url, startTaskID ); 128 129 JSONObject jsonRoot = getJsonRootEntity( url ); 130 final JSONArray entitiesNode = jsonRoot.optJSONArray( ATSConstants.ENTITIES ); 131 132 if( entitiesNode == null ) 133 throw new CascadingException( "failed to get vertex task statuses from timeline server" ); 134 135 LOG.debug( "vertex: {}, retrieved {} tasks", vertexID, entitiesNode.length() ); 136 137 return new Iterator<TaskStatus>() 138 { 139 int index = 0; 140 141 @Override 142 public boolean hasNext() 143 { 144 return entitiesNode.length() != index; 145 } 146 147 @Override 148 public TaskStatus next() 149 { 150 return parseTaskStatus( getJsonObject( entitiesNode, index++ ) ); 151 } 152 153 @Override 154 public void remove() 155 { 156 157 } 158 }; 159 } 160 161 @Override 162 public TaskStatus getVertexChild( String taskID ) throws TezException 163 { 164 String format = "%s/%s/%s?fields=%s"; 165 String url = String.format( format, baseUri, TEZ_TASK_ID, taskID, FILTER_BY_FIELDS ); 166 167 JSONObject jsonRoot = getJsonRootEntity( url ); 168 169 if( jsonRoot == null ) 170 throw new CascadingException( "failed to get vertex task status from timeline server, for id: " + taskID ); 171 172 return parseTaskStatus( jsonRoot ); 173 } 174 175 private TaskStatus parseTaskStatus( JSONObject jsonRoot ) 176 { 177 try 178 { 179 String taskID = jsonRoot.optString( ATSConstants.ENTITY ); 180 JSONObject otherInfoNode = jsonRoot.getJSONObject( ATSConstants.OTHER_INFO ); 181 String status = otherInfoNode.optString( ATSConstants.STATUS ); 182 long scheduledTime = otherInfoNode.optLong( ATSConstants.SCHEDULED_TIME, -1 ); 183 long startTime = otherInfoNode.optLong( ATSConstants.START_TIME, -1 ); // actual attempt launch time 184 long endTime = otherInfoNode.optLong( ATSConstants.FINISH_TIME, -1 ); // endTime 185 String successfulAttemptID = otherInfoNode.optString( ATSConstants.SUCCESSFUL_ATTEMPT_ID ); 186 String diagnostics = otherInfoNode.optString( ATSConstants.DIAGNOSTICS ); 187 188 if( status.equals( "" ) ) 189 return new TaskStatus( taskID ); 190 191 JSONObject countersNode = otherInfoNode.optJSONObject( ATSConstants.COUNTERS ); 192 Map<String, Map<String, Long>> counters = parseDagCounters( countersNode ); 193 194 return new TaskStatus( taskID, status, scheduledTime, startTime, endTime, successfulAttemptID, counters, diagnostics ); 195 } 196 catch( JSONException exception ) 197 { 198 throw new CascadingException( exception ); 199 } 200 } 201 202 private Map<String, Map<String, Long>> parseDagCounters( JSONObject countersNode ) throws JSONException 203 { 204 if( countersNode == null ) 205 return null; 206 207 JSONArray counterGroupNodes = countersNode.optJSONArray( ATSConstants.COUNTER_GROUPS ); 208 209 if( counterGroupNodes == null ) 210 return null; 211 212 Map<String, Map<String, Long>> counters = new HashMap<>(); 213 int numCounterGroups = counterGroupNodes.length(); 214 215 for( int i = 0; i < numCounterGroups; i++ ) 216 parseCounterGroup( counters, counterGroupNodes.optJSONObject( i ) ); 217 218 return counters; 219 } 220 221 private void parseCounterGroup( Map<String, Map<String, Long>> counters, JSONObject counterGroupNode ) throws JSONException 222 { 223 if( counterGroupNode == null ) 224 return; 225 226 final String groupName = counterGroupNode.optString( ATSConstants.COUNTER_GROUP_NAME ); 227// final String groupDisplayName = counterGroupNode.optString( ATSConstants.COUNTER_GROUP_DISPLAY_NAME ); 228 final JSONArray counterNodes = counterGroupNode.optJSONArray( ATSConstants.COUNTERS ); 229 final int numCounters = counterNodes.length(); 230 231 Map<String, Long> values = new HashMap<>(); 232 233 counters.put( groupName, values ); 234 235 for( int i = 0; i < numCounters; i++ ) 236 { 237 JSONObject counterNode = counterNodes.getJSONObject( i ); 238 String counterName = counterNode.getString( ATSConstants.COUNTER_NAME ); 239// String counterDisplayName = counterNode.getString( ATSConstants.COUNTER_DISPLAY_NAME ); 240 long counterValue = counterNode.getLong( ATSConstants.COUNTER_VALUE ); 241 242 values.put( counterName, counterValue ); 243 } 244 } 245 246 // remove is unsupported in jettison on hadoop 24 247 protected JSONObject getRemoveJsonObject( JSONArray entitiesNode, int index, boolean doRemove ) 248 { 249 try 250 { 251 JSONObject jsonObject = entitiesNode.getJSONObject( index ); 252 253 if( doRemove ) 254 entitiesNode.remove( jsonObject ); 255 256 return jsonObject; 257 } 258 catch( JSONException exception ) 259 { 260 throw new CascadingException( exception ); 261 } 262 } 263 264 protected JSONObject getJsonObject( JSONArray entitiesNode, int index ) 265 { 266 try 267 { 268 return entitiesNode.getJSONObject( index ); 269 } 270 catch( JSONException exception ) 271 { 272 throw new CascadingException( exception ); 273 } 274 } 275 }