001/* 002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.stats; 022 023import java.util.Collection; 024import java.util.Map; 025 026import cascading.flow.FlowStep; 027import cascading.management.state.ClientState; 028import org.slf4j.Logger; 029import org.slf4j.LoggerFactory; 030 031/** 032 * 033 */ 034public abstract class BaseCachedStepStats<Configuration, JobStatusClient, Counters> extends FlowStepStats 035 { 036 private static final Logger LOG = LoggerFactory.getLogger( BaseCachedStepStats.class ); 037 038 protected CounterCache<Configuration, JobStatusClient, Counters> counterCache; 039 040 public BaseCachedStepStats( FlowStep flowStep, ClientState clientState ) 041 { 042 super( flowStep, clientState ); 043 } 044 045 /** 046 * Method getRunningJob returns the Hadoop {@link org.apache.hadoop.mapred.RunningJob} managing this Hadoop job. 047 * 048 * @return the runningJob (type RunningJob) of this HadoopStepStats object. 049 */ 050 public abstract JobStatusClient getJobStatusClient(); 051 052 @Override 053 public long getLastSuccessfulCounterFetchTime() 054 { 055 if( counterCache != null ) 056 return counterCache.getLastSuccessfulFetch(); 057 058 return -1; 059 } 060 061 /** 062 * Method getCounterGroups returns all of the Hadoop counter groups. 063 * 064 * @return the counterGroups (type Collection<String>) of this HadoopStepStats object. 065 */ 066 @Override 067 public Collection<String> getCounterGroups() 068 { 069 return counterCache.getCounterGroups(); 070 } 071 072 /** 073 * Method getCounterGroupsMatching returns all the Hadoop counter groups that match the give regex pattern. 074 * 075 * @param regex of String 076 * @return Collection<String> 077 */ 078 @Override 079 public Collection<String> getCounterGroupsMatching( String regex ) 080 { 081 return counterCache.getCounterGroupsMatching( regex ); 082 } 083 084 /** 085 * Method getCountersFor returns the Hadoop counters for the given group. 086 * 087 * @param group of String 088 * @return Collection<String> 089 */ 090 @Override 091 public Collection<String> getCountersFor( String group ) 092 { 093 return counterCache.getCountersFor( group ); 094 } 095 096 /** 097 * Method getCounterValue returns the Hadoop counter value for the given counter enum. 098 * 099 * @param counter of Enum 100 * @return long 101 */ 102 @Override 103 public long getCounterValue( Enum counter ) 104 { 105 return counterCache.getCounterValue( counter ); 106 } 107 108 /** 109 * Method getCounterValue returns the Hadoop counter value for the given group and counter name. 110 * 111 * @param group of String 112 * @param counter of String 113 * @return long 114 */ 115 @Override 116 public long getCounterValue( String group, String counter ) 117 { 118 return counterCache.getCounterValue( group, counter ); 119 } 120 121 protected synchronized Counters cachedCounters( boolean force ) 122 { 123 return counterCache.cachedCounters( force ); 124 } 125 126 /** Synchronized to prevent state changes mid record, #stop may be called out of band */ 127 @Override 128 public synchronized void recordChildStats() 129 { 130 try 131 { 132 cachedCounters( true ); 133 } 134 catch( Exception exception ) 135 { 136 // do nothing 137 } 138 139 if( !clientState.isEnabled() ) 140 return; 141 142 captureDetail( Type.ATTEMPT ); 143 144 try 145 { 146 for( Map.Entry<String, FlowNodeStats> entry : getFlowNodeStatsMap().entrySet() ) 147 { 148 entry.getValue().recordStats(); 149 entry.getValue().recordChildStats(); 150 } 151 } 152 catch( Exception exception ) 153 { 154 LOG.error( "unable to record node stats", exception ); 155 } 156 } 157 }