001/* 002 * Copyright (c) 2016-2017 Chris K Wensel <chris@wensel.net>. All Rights Reserved. 003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 004 * 005 * Project and contact information: http://www.cascading.org/ 006 * 007 * This file is part of the Cascading project. 008 * 009 * Licensed under the Apache License, Version 2.0 (the "License"); 010 * you may not use this file except in compliance with the License. 011 * You may obtain a copy of the License at 012 * 013 * http://www.apache.org/licenses/LICENSE-2.0 014 * 015 * Unless required by applicable law or agreed to in writing, software 016 * distributed under the License is distributed on an "AS IS" BASIS, 017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 018 * See the License for the specific language governing permissions and 019 * limitations under the License. 020 */ 021 022package cascading.stats; 023 024import java.util.Collection; 025import java.util.Map; 026 027import cascading.flow.FlowStep; 028import cascading.management.state.ClientState; 029import org.slf4j.Logger; 030import org.slf4j.LoggerFactory; 031 032/** 033 * 034 */ 035public abstract class BaseCachedStepStats<Configuration, JobStatusClient, Counters> extends FlowStepStats 036 { 037 private static final Logger LOG = LoggerFactory.getLogger( BaseCachedStepStats.class ); 038 039 protected CounterCache<Configuration, JobStatusClient, Counters> counterCache; 040 041 public BaseCachedStepStats( FlowStep flowStep, ClientState clientState ) 042 { 043 super( flowStep, clientState ); 044 } 045 046 /** 047 * Method getRunningJob returns the Hadoop {@link org.apache.hadoop.mapred.RunningJob} managing this Hadoop job. 048 * 049 * @return the runningJob (type RunningJob) of this HadoopStepStats object. 050 */ 051 public abstract JobStatusClient getJobStatusClient(); 052 053 @Override 054 public long getLastSuccessfulCounterFetchTime() 055 { 056 if( counterCache != null ) 057 return counterCache.getLastSuccessfulFetch(); 058 059 return -1; 060 } 061 062 /** 063 * Method getCounterGroups returns all of the Hadoop counter groups. 064 * 065 * @return the counterGroups (type Collection) of this HadoopStepStats object. 066 */ 067 @Override 068 public Collection<String> getCounterGroups() 069 { 070 return counterCache.getCounterGroups(); 071 } 072 073 /** 074 * Method getCounterGroupsMatching returns all the Hadoop counter groups that match the give regex pattern. 075 * 076 * @param regex of String 077 * @return Collection 078 */ 079 @Override 080 public Collection<String> getCounterGroupsMatching( String regex ) 081 { 082 return counterCache.getCounterGroupsMatching( regex ); 083 } 084 085 /** 086 * Method getCountersFor returns the Hadoop counters for the given group. 087 * 088 * @param group of String 089 * @return Collection 090 */ 091 @Override 092 public Collection<String> getCountersFor( String group ) 093 { 094 return counterCache.getCountersFor( group ); 095 } 096 097 /** 098 * Method getCounterValue returns the Hadoop counter value for the given counter enum. 099 * 100 * @param counter of Enum 101 * @return long 102 */ 103 @Override 104 public long getCounterValue( Enum counter ) 105 { 106 return counterCache.getCounterValue( counter ); 107 } 108 109 /** 110 * Method getCounterValue returns the Hadoop counter value for the given group and counter name. 111 * 112 * @param group of String 113 * @param counter of String 114 * @return long 115 */ 116 @Override 117 public long getCounterValue( String group, String counter ) 118 { 119 return counterCache.getCounterValue( group, counter ); 120 } 121 122 protected synchronized Counters cachedCounters( boolean force ) 123 { 124 return counterCache.cachedCounters( force ); 125 } 126 127 /** Synchronized to prevent state changes mid record, #stop may be called out of band */ 128 @Override 129 public synchronized void recordChildStats() 130 { 131 try 132 { 133 cachedCounters( true ); 134 } 135 catch( Exception exception ) 136 { 137 // do nothing 138 } 139 140 if( !clientState.isEnabled() ) 141 return; 142 143 captureDetail( Type.ATTEMPT ); 144 145 try 146 { 147 for( Map.Entry<String, FlowNodeStats> entry : getFlowNodeStatsMap().entrySet() ) 148 { 149 entry.getValue().recordStats(); 150 entry.getValue().recordChildStats(); 151 } 152 } 153 catch( Exception exception ) 154 { 155 LOG.error( "unable to record node stats", exception ); 156 } 157 } 158 }