001 /* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.flow.planner; 022 023 import java.io.IOException; 024 import java.util.ArrayList; 025 import java.util.List; 026 import java.util.concurrent.Callable; 027 import java.util.concurrent.CountDownLatch; 028 029 import cascading.flow.Flow; 030 import cascading.flow.FlowException; 031 import cascading.flow.FlowStep; 032 import cascading.flow.FlowStepStrategy; 033 import cascading.management.state.ClientState; 034 import cascading.stats.FlowStats; 035 import cascading.stats.FlowStepStats; 036 import cascading.util.Util; 037 import org.slf4j.Logger; 038 import org.slf4j.LoggerFactory; 039 040 /** 041 * 042 */ 043 public abstract class FlowStepJob<Config> implements Callable<Throwable> 044 { 045 private static final Logger LOG = LoggerFactory.getLogger( FlowStepJob.class ); 046 047 /** Field stepName */ 048 protected final String stepName; 049 /** Field pollingInterval */ 050 protected long pollingInterval = 1000; 051 /** Field recordStatsInterval */ 052 protected long statsStoreInterval = 60 * 1000; 053 /** Field predecessors */ 054 protected List<FlowStepJob<Config>> predecessors; 055 /** Field latch */ 056 private final CountDownLatch latch = new CountDownLatch( 1 ); 057 /** Field stop */ 058 private boolean stop = false; 059 /** Field flowStep */ 060 protected final BaseFlowStep<Config> flowStep; 061 /** Field stepStats */ 062 protected FlowStepStats flowStepStats; 063 /** Field throwable */ 064 protected Throwable throwable; 065 066 public FlowStepJob( ClientState clientState, BaseFlowStep flowStep, long pollingInterval, long statsStoreInterval ) 067 { 068 this.flowStep = flowStep; 069 this.stepName = flowStep.getName(); 070 this.pollingInterval = pollingInterval; 071 this.statsStoreInterval = statsStoreInterval; 072 this.flowStepStats = createStepStats( clientState ); 073 074 this.flowStepStats.prepare(); 075 this.flowStepStats.markPending(); 076 } 077 078 public abstract Config getConfig(); 079 080 protected abstract FlowStepStats createStepStats( ClientState clientState ); 081 082 public synchronized void stop() 083 { 084 if( flowStep.isInfoEnabled() ) 085 flowStep.logInfo( "stopping: " + stepName ); 086 087 stop = true; 088 089 // allow pending -> stopped transition 090 // never want a hanging pending state 091 if( !flowStepStats.isFinished() ) 092 flowStepStats.markStopped(); 093 094 try 095 { 096 internalBlockOnStop(); 097 } 098 catch( IOException exception ) 099 { 100 flowStep.logWarn( "unable to kill job: " + stepName, exception ); 101 } 102 finally 103 { 104 // call rollback after the job has been stopped, only if it was stopped 105 if( flowStepStats.isStopped() ) 106 { 107 flowStep.rollbackSinks(); 108 flowStep.fireOnStopping(); 109 } 110 111 flowStepStats.cleanup(); 112 } 113 } 114 115 protected abstract void internalBlockOnStop() throws IOException; 116 117 public void setPredecessors( List<FlowStepJob<Config>> predecessors ) 118 { 119 this.predecessors = predecessors; 120 } 121 122 public Throwable call() 123 { 124 start(); 125 126 return throwable; 127 } 128 129 protected void start() 130 { 131 try 132 { 133 if( isSkipFlowStep() ) 134 { 135 markSkipped(); 136 137 if( flowStep.isInfoEnabled() && flowStepStats.isSkipped() ) 138 flowStep.logInfo( "skipping step: " + stepName ); 139 140 return; 141 } 142 143 synchronized( this ) // backport from 3.0 144 { 145 if( stop ) 146 { 147 if( flowStep.isInfoEnabled() ) 148 flowStep.logInfo( "stop called before start: " + stepName ); 149 return; 150 } 151 152 if( !markStarted() ) 153 return; 154 } 155 156 blockOnPredecessors(); 157 158 prepareResources(); // throws Throwable to skip next steps 159 160 applyFlowStepConfStrategy(); // prepare resources beforehand 161 162 blockOnJob(); 163 } 164 catch( Throwable throwable ) 165 { 166 dumpDebugInfo(); 167 168 this.throwable = throwable; 169 170 if( !flowStepStats.isFinished() ) 171 { 172 flowStepStats.markFailed( this.throwable ); 173 flowStep.fireOnThrowable( this.throwable ); 174 } 175 } 176 finally 177 { 178 latch.countDown(); 179 flowStepStats.cleanup(); 180 } 181 } 182 183 private void prepareResources() throws Throwable 184 { 185 if( stop ) // true if a predecessor failed 186 return; 187 188 Throwable throwable = flowStep.prepareResources(); 189 190 if( throwable != null ) 191 throw throwable; 192 } 193 194 private synchronized boolean markStarted() 195 { 196 if( flowStepStats.isFinished() ) // if stopped, return 197 return false; 198 199 flowStepStats.markStarted(); 200 201 return true; 202 } 203 204 private void applyFlowStepConfStrategy() 205 { 206 FlowStepStrategy flowStepStrategy = flowStep.getFlow().getFlowStepStrategy(); 207 208 if( flowStepStrategy == null ) 209 return; 210 211 List<FlowStep> predecessorSteps = new ArrayList<FlowStep>(); 212 213 for( FlowStepJob predecessor : predecessors ) 214 predecessorSteps.add( predecessor.flowStep ); 215 216 flowStepStrategy.apply( flowStep.getFlow(), predecessorSteps, flowStep ); 217 } 218 219 protected boolean isSkipFlowStep() throws IOException 220 { 221 // if runID is not set, never skip a step 222 if( flowStep.getFlow().getRunID() == null ) 223 return false; 224 225 return flowStep.allSourcesExist() && !flowStep.areSourcesNewer( flowStep.getSinkModified() ); 226 } 227 228 protected void blockOnJob() throws IOException 229 { 230 if( stop ) // true if a predecessor failed 231 return; 232 233 if( flowStep.isInfoEnabled() ) 234 flowStep.logInfo( "starting step: " + stepName ); 235 236 internalNonBlockingStart(); 237 238 markSubmitted(); 239 flowStep.fireOnStarting(); 240 241 blockTillCompleteOrStopped(); 242 243 if( !stop && !internalNonBlockingIsSuccessful() ) 244 { 245 if( !flowStepStats.isFinished() ) 246 { 247 flowStep.rollbackSinks(); 248 flowStepStats.markFailed( getThrowable() ); 249 flowStep.fireOnThrowable( getThrowable() ); 250 } 251 252 dumpDebugInfo(); 253 254 // if available, rethrow the unrecoverable error 255 if( getThrowable() instanceof OutOfMemoryError ) 256 throw ( (OutOfMemoryError) getThrowable() ); 257 258 if( !isRemoteExecution() ) 259 this.throwable = new FlowException( "local step failed", getThrowable() ); 260 else 261 this.throwable = new FlowException( "step failed: " + stepName + ", with job id: " + internalJobId() + ", please see cluster logs for failure messages" ); 262 } 263 else 264 { 265 if( internalNonBlockingIsSuccessful() && !flowStepStats.isFinished() ) 266 { 267 this.throwable = flowStep.commitSinks(); 268 269 if( this.throwable != null ) 270 { 271 flowStepStats.markFailed( this.throwable ); 272 flowStep.fireOnThrowable( this.throwable ); 273 } 274 else 275 { 276 flowStepStats.markSuccessful(); 277 flowStep.fireOnCompleted(); 278 } 279 } 280 } 281 282 flowStepStats.recordChildStats(); 283 } 284 285 protected abstract boolean isRemoteExecution(); 286 287 protected abstract String internalJobId(); 288 289 protected abstract boolean internalNonBlockingIsSuccessful() throws IOException; 290 291 protected abstract Throwable getThrowable(); 292 293 protected abstract void internalNonBlockingStart() throws IOException; 294 295 protected void blockTillCompleteOrStopped() throws IOException 296 { 297 int iterations = (int) Math.floor( statsStoreInterval / pollingInterval ); 298 int count = 0; 299 300 while( true ) 301 { 302 if( flowStepStats.isSubmitted() && isStarted() ) 303 { 304 markRunning(); 305 flowStep.fireOnRunning(); 306 } 307 308 if( stop || internalNonBlockingIsComplete() ) 309 break; 310 311 sleepForPollingInterval(); 312 313 if( iterations == count++ ) 314 { 315 count = 0; 316 flowStepStats.recordStats(); 317 flowStepStats.recordChildStats(); 318 } 319 } 320 } 321 322 private synchronized void markSubmitted() 323 { 324 if( flowStepStats.isStarted() ) 325 flowStepStats.markSubmitted(); 326 327 Flow flow = flowStep.getFlow(); 328 329 if( flow == null ) 330 { 331 LOG.warn( "no parent flow set" ); 332 return; 333 } 334 335 FlowStats flowStats = flow.getFlowStats(); 336 337 synchronized( flowStats ) 338 { 339 if( flowStats.isStarted() ) 340 flowStats.markSubmitted(); 341 } 342 } 343 344 private synchronized void markRunning() 345 { 346 flowStepStats.markRunning(); 347 348 markFlowRunning(); 349 } 350 351 private synchronized void markSkipped() 352 { 353 if( flowStepStats.isFinished() ) 354 return; 355 356 flowStepStats.markSkipped(); 357 358 markFlowRunning(); 359 } 360 361 private synchronized void markFlowRunning() 362 { 363 Flow flow = flowStep.getFlow(); 364 365 if( flow == null ) 366 { 367 LOG.warn( "no parent flow set" ); 368 return; 369 } 370 371 FlowStats flowStats = flow.getFlowStats(); 372 373 synchronized( flowStats ) 374 { 375 if( flowStats.isStarted() || flowStats.isSubmitted() ) 376 flowStats.markRunning(); 377 } 378 } 379 380 protected abstract boolean internalNonBlockingIsComplete() throws IOException; 381 382 protected void sleepForPollingInterval() 383 { 384 Util.safeSleep( pollingInterval ); 385 } 386 387 protected void blockOnPredecessors() 388 { 389 for( FlowStepJob predecessor : predecessors ) 390 { 391 if( !predecessor.isSuccessful() ) 392 { 393 flowStep.logWarn( "abandoning step: " + stepName + ", predecessor failed: " + predecessor.stepName ); 394 395 stop(); 396 } 397 } 398 } 399 400 protected abstract void dumpDebugInfo(); 401 402 /** 403 * Method isSuccessful returns true if this step completed successfully or was skipped. 404 * 405 * @return the successful (type boolean) of this FlowStepJob object. 406 */ 407 public boolean isSuccessful() 408 { 409 try 410 { 411 latch.await(); // freed after step completes in #start() 412 413 return flowStepStats.isSuccessful() || flowStepStats.isSkipped(); 414 } 415 catch( InterruptedException exception ) 416 { 417 flowStep.logWarn( "latch interrupted", exception ); 418 419 return false; 420 } 421 catch( NullPointerException exception ) 422 { 423 throw new FlowException( "Hadoop is not keeping a large enough job history, please increase the \'mapred.jobtracker.completeuserjobs.maximum\' property", exception ); 424 } 425 } 426 427 /** 428 * Method wasStarted returns true if this job was started 429 * 430 * @return boolean 431 */ 432 public boolean isStarted() 433 { 434 return internalIsStarted(); 435 } 436 437 protected abstract boolean internalIsStarted(); 438 439 /** 440 * Method getStepStats returns the stepStats of this FlowStepJob object. 441 * 442 * @return the stepStats (type StepStats) of this FlowStepJob object. 443 */ 444 public FlowStepStats getStepStats() 445 { 446 return flowStepStats; 447 } 448 }