001/* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.flow.hadoop.planner; 022 023import java.io.IOException; 024 025import cascading.flow.FlowException; 026import cascading.flow.hadoop.HadoopFlowStep; 027import cascading.flow.planner.BaseFlowStep; 028import cascading.flow.planner.FlowStepJob; 029import cascading.management.state.ClientState; 030import cascading.stats.FlowNodeStats; 031import cascading.stats.FlowStepStats; 032import cascading.stats.hadoop.HadoopStepStats; 033import org.apache.hadoop.mapred.JobClient; 034import org.apache.hadoop.mapred.JobConf; 035import org.apache.hadoop.mapred.JobStatus; 036import org.apache.hadoop.mapred.RunningJob; 037import org.apache.hadoop.mapred.TaskCompletionEvent; 038 039import static cascading.flow.FlowProps.JOB_POLLING_INTERVAL; 040import static cascading.stats.CascadingStats.STATS_COMPLETE_CHILD_DETAILS_BLOCK_DURATION; 041import static cascading.stats.CascadingStats.STATS_STORE_INTERVAL; 042 043/** 044 * 045 */ 046public class HadoopFlowStepJob extends FlowStepJob<JobConf> 047 { 048 /** static field to capture errors in hadoop local mode */ 049 private static Throwable localError; 050 /** Field jobClient */ 051 private JobClient jobClient; 052 /** Field runningJob */ 053 private RunningJob runningJob; 054 055 private static long getStoreInterval( JobConf jobConf ) 056 { 057 return jobConf.getLong( STATS_STORE_INTERVAL, 60 * 1000 ); 058 } 059 060 private static long getChildDetailsBlockingDuration( JobConf jobConf ) 061 { 062 return jobConf.getLong( STATS_COMPLETE_CHILD_DETAILS_BLOCK_DURATION, 60 * 1000 ); 063 } 064 065 public static long getJobPollingInterval( JobConf jobConf ) 066 { 067 return jobConf.getLong( JOB_POLLING_INTERVAL, 5000 ); 068 } 069 070 public HadoopFlowStepJob( ClientState clientState, BaseFlowStep<JobConf> flowStep, JobConf currentConf ) 071 { 072 super( clientState, currentConf, flowStep, getJobPollingInterval( currentConf ), getStoreInterval( currentConf ), getChildDetailsBlockingDuration( currentConf ) ); 073 074 if( flowStep.isDebugEnabled() ) 075 flowStep.logDebug( "using polling interval: " + pollingInterval ); 076 } 077 078 @Override 079 protected FlowStepStats createStepStats( ClientState clientState ) 080 { 081 return new HadoopStepStats( flowStep, clientState ) 082 { 083 @Override 084 public JobClient getJobClient() 085 { 086 return jobClient; 087 } 088 089 @Override 090 public RunningJob getJobStatusClient() 091 { 092 return runningJob; 093 } 094 }; 095 } 096 097 protected void internalBlockOnStop() throws IOException 098 { 099 if( runningJob != null && !runningJob.isComplete() ) 100 runningJob.killJob(); 101 } 102 103 protected void internalNonBlockingStart() throws IOException 104 { 105 jobClient = new JobClient( jobConfiguration ); 106 runningJob = jobClient.submitJob( jobConfiguration ); 107 108 flowStep.logInfo( "submitted hadoop job: " + runningJob.getID() ); 109 110 if( runningJob.getTrackingURL() != null ) 111 flowStep.logInfo( "tracking url: " + runningJob.getTrackingURL() ); 112 } 113 114 @Override 115 protected void updateNodeStatus( FlowNodeStats flowNodeStats ) 116 { 117 try 118 { 119 if( runningJob == null || flowNodeStats.isFinished() ) 120 return; 121 122 boolean isMapper = flowNodeStats.getOrdinal() == 0; 123 int jobState = runningJob.getJobState(); 124 125 if( JobStatus.FAILED == jobState ) 126 { 127 flowNodeStats.markFailed(); 128 return; 129 } 130 131 if( JobStatus.KILLED == jobState ) 132 { 133 flowNodeStats.markStopped(); 134 return; 135 } 136 137 float progress; 138 139 if( isMapper ) 140 progress = runningJob.mapProgress(); 141 else 142 progress = runningJob.reduceProgress(); 143 144 if( progress == 0.0F ) // not yet running, is only started 145 return; 146 147 if( progress != 1.0F ) 148 { 149 flowNodeStats.markRunning(); 150 return; 151 } 152 153 flowNodeStats.markRunning(); 154 155 if( isMapper && runningJob.reduceProgress() > 0.0F ) 156 { 157 flowNodeStats.markSuccessful(); 158 return; 159 } 160 161 if( JobStatus.SUCCEEDED == jobState ) 162 flowNodeStats.markSuccessful(); 163 } 164 catch( IOException exception ) 165 { 166 flowStep.logError( "failed setting node status", throwable ); 167 } 168 } 169 170 @Override 171 public boolean isSuccessful() 172 { 173 try 174 { 175 return super.isSuccessful(); 176 } 177 catch( NullPointerException exception ) 178 { 179 throw new FlowException( "Hadoop is not keeping a large enough job history, please increase the \'mapred.jobtracker.completeuserjobs.maximum\' property", exception ); 180 } 181 } 182 183 protected boolean internalNonBlockingIsSuccessful() throws IOException 184 { 185 return runningJob != null && runningJob.isSuccessful(); 186 } 187 188 @Override 189 protected boolean isRemoteExecution() 190 { 191 return !( (HadoopFlowStep) flowStep ).isHadoopLocalMode( getConfig() ); 192 } 193 194 @Override 195 protected Throwable getThrowable() 196 { 197 return localError; 198 } 199 200 protected String internalJobId() 201 { 202 return runningJob.getJobID(); 203 } 204 205 protected boolean internalNonBlockingIsComplete() throws IOException 206 { 207 return runningJob.isComplete(); 208 } 209 210 protected void dumpDebugInfo() 211 { 212 try 213 { 214 if( runningJob == null ) 215 return; 216 217 int jobState = runningJob.getJobState(); // may throw an NPE internally 218 219 flowStep.logWarn( "hadoop job " + runningJob.getID() + " state at " + JobStatus.getJobRunState( jobState ) ); 220 flowStep.logWarn( "failure info: " + runningJob.getFailureInfo() ); 221 222 TaskCompletionEvent[] events = runningJob.getTaskCompletionEvents( 0 ); 223 flowStep.logWarn( "task completion events identify failed tasks" ); 224 flowStep.logWarn( "task completion events count: " + events.length ); 225 226 for( TaskCompletionEvent event : events ) 227 flowStep.logWarn( "event = " + event ); 228 } 229 catch( Throwable throwable ) 230 { 231 flowStep.logError( "failed reading task completion events", throwable ); 232 } 233 } 234 235 protected boolean internalIsStartedRunning() 236 { 237 if( runningJob == null ) 238 return false; 239 240 try 241 { 242 return runningJob.mapProgress() > 0; 243 } 244 catch( IOException exception ) 245 { 246 flowStep.logWarn( "unable to test for map progress", exception ); 247 return false; 248 } 249 } 250 251 /** 252 * Internal method to report errors that happen on hadoop local mode. Hadoops local 253 * JobRunner does not give access to TaskReports, but we want to be able to capture 254 * the exception and not just print it to stderr. FlowMapper and FlowReducer use this method. 255 * 256 * @param throwable the throwable to be reported. 257 */ 258 public static void reportLocalError( Throwable throwable ) 259 { 260 localError = throwable; 261 } 262 }