001/* 002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.stats.tez; 022 023import java.io.IOException; 024import java.util.Iterator; 025 026import cascading.flow.FlowNode; 027import cascading.flow.FlowStep; 028import cascading.management.state.ClientState; 029import cascading.stats.BaseCachedStepStats; 030import cascading.stats.CascadingStats; 031import cascading.stats.FlowNodeStats; 032import cascading.stats.tez.util.TezStatsUtil; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.tez.common.counters.TezCounters; 035import org.apache.tez.dag.api.client.DAGClient; 036import org.apache.tez.dag.api.client.DAGStatus; 037 038/** 039 * 040 */ 041public abstract class TezStepStats extends BaseCachedStepStats<Configuration, DAGClient, TezCounters> 042 { 043 /** 044 * Constructor CascadingStats creates a new CascadingStats instance. 045 * 046 * @param flowStep 047 * @param clientState 048 */ 049 protected TezStepStats( FlowStep flowStep, ClientState clientState ) 050 { 051 super( flowStep, clientState ); 052 053 Configuration config = (Configuration) flowStep.getConfig(); 054 055 this.counterCache = new TezCounterCache<DAGClient>( this, config ) 056 { 057 @Override 058 protected DAGClient getJobStatusClient() 059 { 060 return TezStepStats.this.getJobStatusClient(); 061 } 062 063 protected TezCounters getCounters( DAGClient statusClient ) throws IOException 064 { 065 DAGStatus dagStatus = TezStatsUtil.getDagStatusWithCounters( statusClient ); 066 067 if( dagStatus != null ) 068 { 069 TezCounters counters = dagStatus.getDAGCounters(); 070 071 if( counters != null && counters.countCounters() != 0 ) 072 return counters; 073 } 074 075 return null; 076 } 077 }; 078 079 Iterator<FlowNode> iterator = flowStep.getFlowNodeGraph().getOrderedTopologicalIterator(); 080 081 while( iterator.hasNext() ) 082 addNodeStats( new TezNodeStats( this, iterator.next(), clientState, config ) ); 083 } 084 085 /** Method captureDetail captures statistics task details and completion events. */ 086 @Override 087 public void captureDetail( CascadingStats.Type depth ) 088 { 089 if( !getType().isChild( depth ) || !isDetailStale() ) 090 return; 091 092 DAGClient dagClient = getJobStatusClient(); 093 094 if( dagClient == null ) 095 return; 096 097 synchronized( this ) 098 { 099 if( !isDetailStale() ) 100 return; 101 102 for( FlowNodeStats flowNodeStats : getFlowNodeStatsMap().values() ) 103 flowNodeStats.captureDetail( depth ); 104 105 markDetailCaptured(); 106 } 107 } 108 }