001/* 002 * Copyright (c) 2016-2017 Chris K Wensel <chris@wensel.net>. All Rights Reserved. 003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 004 * 005 * Project and contact information: http://www.cascading.org/ 006 * 007 * This file is part of the Cascading project. 008 * 009 * Licensed under the Apache License, Version 2.0 (the "License"); 010 * you may not use this file except in compliance with the License. 011 * You may obtain a copy of the License at 012 * 013 * http://www.apache.org/licenses/LICENSE-2.0 014 * 015 * Unless required by applicable law or agreed to in writing, software 016 * distributed under the License is distributed on an "AS IS" BASIS, 017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 018 * See the License for the specific language governing permissions and 019 * limitations under the License. 020 */ 021 022package cascading.stats; 023 024import java.util.Collection; 025import java.util.Collections; 026import java.util.LinkedHashMap; 027import java.util.Map; 028 029import cascading.flow.FlowNode; 030import cascading.management.state.ClientState; 031 032/** 033 * 034 */ 035public abstract class BaseCachedNodeStats<Config, JobStatus, Counters> extends FlowNodeStats 036 { 037 protected final Map<String, FlowSliceStats> sliceStatsMap = new LinkedHashMap<>(); 038 protected CounterCache<Config, JobStatus, Counters> counterCache; 039 040 protected boolean allChildrenFinished; 041 042 /** 043 * Constructor BaseHadoopNodeStats creates a new BaseHadoopNodeStats instance. 044 * 045 * @param flowNode 046 * @param clientState 047 */ 048 protected BaseCachedNodeStats( FlowNode flowNode, ClientState clientState ) 049 { 050 super( flowNode, clientState ); 051 } 052 053 @Override 054 public long getLastSuccessfulCounterFetchTime() 055 { 056 if( counterCache != null ) 057 return counterCache.getLastSuccessfulFetch(); 058 059 return -1; 060 } 061 062 public boolean isAllChildrenFinished() 063 { 064 return allChildrenFinished; 065 } 066 067 /** 068 * Method getCounterGroups returns all of the Hadoop counter groups. 069 * 070 * @return the counterGroups (type Collection) of this HadoopStepStats object. 071 */ 072 @Override 073 public Collection<String> getCounterGroups() 074 { 075 return counterCache.getCounterGroups(); 076 } 077 078 /** 079 * Method getCounterGroupsMatching returns all the Hadoop counter groups that match the give regex pattern. 080 * 081 * @param regex of String 082 * @return Collection 083 */ 084 @Override 085 public Collection<String> getCounterGroupsMatching( String regex ) 086 { 087 return counterCache.getCounterGroupsMatching( regex ); 088 } 089 090 /** 091 * Method getCountersFor returns the Hadoop counters for the given group. 092 * 093 * @param group of String 094 * @return Collection 095 */ 096 @Override 097 public Collection<String> getCountersFor( String group ) 098 { 099 return counterCache.getCountersFor( group ); 100 } 101 102 /** 103 * Method getCounterValue returns the Hadoop counter value for the given counter enum. 104 * 105 * @param counter of Enum 106 * @return long 107 */ 108 @Override 109 public long getCounterValue( Enum counter ) 110 { 111 return counterCache.getCounterValue( counter ); 112 } 113 114 /** 115 * Method getCounterValue returns the Hadoop counter value for the given group and counter name. 116 * 117 * @param group of String 118 * @param counter of String 119 * @return long 120 */ 121 @Override 122 public long getCounterValue( String group, String counter ) 123 { 124 return counterCache.getCounterValue( group, counter ); 125 } 126 127 protected synchronized Counters cachedCounters( boolean force ) 128 { 129 return counterCache.cachedCounters( force ); 130 } 131 132 @Override 133 public Collection<FlowSliceStats> getChildren() 134 { 135 synchronized( sliceStatsMap ) 136 { 137 return Collections.unmodifiableCollection( sliceStatsMap.values() ); 138 } 139 } 140 141 @Override 142 public FlowSliceStats getChildWith( String id ) 143 { 144 return sliceStatsMap.get( id ); 145 } 146 147 @Override 148 public final void captureDetail( Type depth ) 149 { 150 boolean finished = isFinished(); 151 152 if( finished && hasCapturedFinalDetail() ) 153 return; 154 155 synchronized( this ) 156 { 157 if( !getType().isChild( depth ) || !isDetailStale() ) 158 return; 159 160 boolean success = captureChildDetailInternal(); 161 162 markDetailCaptured(); // always mark to prevent double calls 163 164 if( success ) 165 logDebug( "captured remote node statistic details" ); 166 167 hasCapturedFinalDetail = finished && success && allChildrenFinished; 168 169 if( allChildrenFinished ) 170 logInfo( "all {} children are in finished state, have captured final details: {}", sliceStatsMap.size(), hasCapturedFinalDetail() ); 171 } 172 } 173 174 /** 175 * Returns true if was able to capture/refresh the internal child stats cache. 176 * 177 * @return true if successful 178 */ 179 protected abstract boolean captureChildDetailInternal(); 180 181 /** Synchronized to prevent state changes mid record, #stop may be called out of band */ 182 @Override 183 public synchronized void recordChildStats() 184 { 185 try 186 { 187 cachedCounters( true ); 188 } 189 catch( Exception exception ) 190 { 191 // do nothing 192 } 193 194 if( !clientState.isEnabled() ) 195 return; 196 197 captureDetail( Type.ATTEMPT ); 198 199 // FlowSliceStats are not full blown Stats types, but implementation specific 200 // so we can't call recordStats/recordChildStats 201 try 202 { 203 // must use the local ID as the stored id, not task id 204 for( FlowSliceStats value : sliceStatsMap.values() ) 205 clientState.record( value.getID(), value ); 206 } 207 catch( Exception exception ) 208 { 209 logError( "unable to record node stats", exception ); 210 } 211 } 212 }