001/*
002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow.planner;
022
023import java.io.IOException;
024import java.util.ArrayList;
025import java.util.Collection;
026import java.util.List;
027import java.util.concurrent.Callable;
028import java.util.concurrent.CountDownLatch;
029import java.util.concurrent.atomic.AtomicBoolean;
030
031import cascading.flow.Flow;
032import cascading.flow.FlowException;
033import cascading.flow.FlowStep;
034import cascading.flow.FlowStepStrategy;
035import cascading.management.state.ClientState;
036import cascading.stats.CascadingStats;
037import cascading.stats.FlowNodeStats;
038import cascading.stats.FlowStats;
039import cascading.stats.FlowStepStats;
040import cascading.util.Util;
041import org.slf4j.Logger;
042import org.slf4j.LoggerFactory;
043
044import static cascading.util.Util.formatDurationFromMillis;
045
046/**
047 *
048 */
049public abstract class FlowStepJob<Config> implements Callable<Throwable>
050  {
051  // most logs messages should be delegated to the FlowStep.log* methods
052  // non job related issues can use this logger
053  private static final Logger LOG = LoggerFactory.getLogger( FlowStepJob.class );
054
055  /** Field stepName */
056  protected final String stepName;
057  /** Field jobConfiguration */
058  protected final Config jobConfiguration;
059  /** Field pollingInterval */
060  protected long pollingInterval = 1000;
061  /** Field recordStatsInterval */
062  protected long statsStoreInterval = 60 * 1000;
063  /** Field waitTillCompletedChildStatsDuration */
064  protected long blockForCompletedChildDetailDuration = 60 * 1000;
065  /** Field predecessors */
066  protected List<FlowStepJob<Config>> predecessors;
067  /** Field latch */
068  private final CountDownLatch latch = new CountDownLatch( 1 );
069  /** Field started */
070  private AtomicBoolean callableStarted = new AtomicBoolean( false );
071  /** Field stop */
072  private volatile boolean stop = false;
073  /** Field flowStep */
074  protected final BaseFlowStep<Config> flowStep;
075  /** Field stepStats */
076  protected FlowStepStats flowStepStats;
077  /** Field throwable */
078  protected Throwable throwable;
079
080  public FlowStepJob( ClientState clientState, Config jobConfiguration, BaseFlowStep<Config> flowStep, long pollingInterval, long statsStoreInterval, long blockForCompletedChildDetailDuration )
081    {
082    this.jobConfiguration = jobConfiguration;
083    this.stepName = flowStep.getName();
084    this.pollingInterval = pollingInterval;
085    this.statsStoreInterval = statsStoreInterval;
086    this.blockForCompletedChildDetailDuration = blockForCompletedChildDetailDuration;
087    this.flowStep = flowStep;
088    this.flowStepStats = createStepStats( clientState );
089
090    this.flowStepStats.prepare();
091    this.flowStepStats.markPending();
092
093    for( FlowNodeStats flowNodeStats : this.flowStepStats.getFlowNodeStats() )
094      {
095      flowNodeStats.prepare();
096      flowNodeStats.markPending();
097      }
098    }
099
100  public Config getConfig()
101    {
102    return jobConfiguration;
103    }
104
105  protected abstract FlowStepStats createStepStats( ClientState clientState );
106
107  public synchronized void stop()
108    {
109    if( flowStep.isInfoEnabled() )
110      flowStep.logInfo( "stopping: " + stepName );
111
112    stop = true;
113
114    // allow pending -> stopped transition
115    // never want a hanging pending state
116    if( !flowStepStats.isFinished() )
117      flowStepStats.markStopped();
118
119    try
120      {
121      internalBlockOnStop();
122      }
123    catch( IOException exception )
124      {
125      flowStep.logWarn( "unable to kill job: " + stepName, exception );
126      }
127    finally
128      {
129      // call rollback after the job has been stopped, only if it was stopped
130      if( flowStepStats.isStopped() )
131        {
132        flowStep.rollbackSinks();
133        flowStep.fireOnStopping();
134        }
135
136      flowStepStats.cleanup();
137      }
138    }
139
140  protected abstract void internalBlockOnStop() throws IOException;
141
142  public void setPredecessors( List<FlowStepJob<Config>> predecessors )
143    {
144    this.predecessors = predecessors;
145    }
146
147  public Throwable call()
148    {
149    start();
150
151    return throwable;
152    }
153
154  public boolean isCallableStarted()
155    {
156    return callableStarted.get();
157    }
158
159  protected void start()
160    {
161    if( callableStarted.getAndSet( true ) )
162      return;
163
164    try
165      {
166      if( isSkipFlowStep() )
167        {
168        markSkipped();
169
170        if( flowStep.isInfoEnabled() && flowStepStats.isSkipped() )
171          flowStep.logInfo( "skipping step: " + stepName );
172
173        return;
174        }
175
176      synchronized( this ) // added in 3.0, jdk1.7 may have a aggravated
177        {
178        if( stop )
179          {
180          if( flowStep.isInfoEnabled() )
181            flowStep.logInfo( "stop called before start: " + stepName );
182
183          return;
184          }
185
186        markStarted();
187        }
188
189      blockOnPredecessors();
190
191      prepareResources(); // throws Throwable to skip next steps
192
193      applyFlowStepConfStrategy(); // prepare resources beforehand
194
195      blockOnJob();
196      }
197    catch( Throwable throwable )
198      {
199      this.throwable = throwable; // store first, in case throwable leaks out of dumpDebugInfo
200
201      dumpDebugInfo();
202
203      // in case isSkipFlowStep fails before the previous markStarted, we don't fail advancing to the failed state
204      if( flowStepStats.isPending() )
205        markStarted();
206
207      if( !flowStepStats.isFinished() )
208        {
209        flowStepStats.markFailed( this.throwable );
210        flowStep.fireOnThrowable( this.throwable );
211        }
212      }
213    finally
214      {
215      latch.countDown();
216
217      // lets free the latch and capture any failure info if exiting via a throwable
218      // remain in this thread to keep client side alive until shutdown
219      finalizeNodeSliceCapture();
220
221      flowStepStats.cleanup();
222      }
223
224    internalCleanup();
225    }
226
227  private void prepareResources() throws Throwable
228    {
229    if( stop ) // true if a predecessor failed
230      return;
231
232    Throwable throwable = flowStep.prepareResources();
233
234    if( throwable != null )
235      throw throwable;
236    }
237
238  private synchronized boolean markStarted()
239    {
240    if( flowStepStats.isFinished() ) // if stopped, return
241      return false;
242
243    flowStepStats.markStarted();
244
245    return true;
246    }
247
248  private void applyFlowStepConfStrategy()
249    {
250    FlowStepStrategy flowStepStrategy = flowStep.getFlow().getFlowStepStrategy();
251
252    if( flowStepStrategy == null )
253      return;
254
255    List<FlowStep> predecessorSteps = new ArrayList<FlowStep>();
256
257    for( FlowStepJob predecessor : predecessors )
258      predecessorSteps.add( predecessor.flowStep );
259
260    flowStepStrategy.apply( flowStep.getFlow(), predecessorSteps, flowStep );
261    }
262
263  protected boolean isSkipFlowStep() throws IOException
264    {
265    // if runID is not set, never skip a step
266    if( flowStep.getFlow().getRunID() == null )
267      return false;
268
269    return flowStep.allSourcesExist() && !flowStep.areSourcesNewer( flowStep.getSinkModified() );
270    }
271
272  protected void blockOnJob() throws IOException
273    {
274    if( stop ) // true if a predecessor failed
275      return;
276
277    if( flowStep.isInfoEnabled() )
278      flowStep.logInfo( "starting step: " + stepName );
279
280    internalNonBlockingStart();
281
282    markSubmitted();
283    flowStep.fireOnStarting();
284
285    blockTillCompleteOrStopped();
286
287    if( !stop && !internalNonBlockingIsSuccessful() )
288      {
289      if( !flowStepStats.isFinished() )
290        {
291        flowStep.rollbackSinks();
292        flowStepStats.markFailed( getThrowable() );
293        updateNodesStatus();
294        flowStep.fireOnThrowable( getThrowable() );
295        }
296
297      // if available, rethrow the unrecoverable error
298      if( getThrowable() instanceof OutOfMemoryError )
299        throw (OutOfMemoryError) getThrowable();
300
301      dumpDebugInfo();
302
303      if( !isRemoteExecution() )
304        this.throwable = new FlowException( "local step failed: " + stepName, getThrowable() );
305      else
306        this.throwable = new FlowException( "step failed: " + stepName + ", step id: " + getStepStats().getID() + ", job id: " + internalJobId() + ", please see cluster logs for failure messages" );
307      }
308    else
309      {
310      if( internalNonBlockingIsSuccessful() && !flowStepStats.isFinished() )
311        {
312        this.throwable = flowStep.commitSinks();
313
314        if( this.throwable != null )
315          {
316          flowStepStats.markFailed( this.throwable );
317          updateNodesStatus();
318          flowStep.fireOnThrowable( this.throwable );
319          }
320        else
321          {
322          flowStepStats.markSuccessful();
323          updateNodesStatus();
324          flowStep.fireOnCompleted();
325          }
326        }
327      }
328    }
329
330  protected void finalizeNodeSliceCapture()
331    {
332    long startOfFinalPolling = System.currentTimeMillis();
333    long lastLog = 0;
334    long retries = 0;
335
336    boolean allNodesFinished;
337
338    while( true )
339      {
340      allNodesFinished = updateNodesStatus();
341
342      flowStepStats.recordChildStats();
343
344      if( allNodesFinished && flowStepStats.hasCapturedFinalDetail() )
345        break;
346
347      if( ( System.currentTimeMillis() - startOfFinalPolling ) >= blockForCompletedChildDetailDuration )
348        break;
349
350      if( System.currentTimeMillis() - lastLog > 1000 )
351        {
352        if( !allNodesFinished )
353          flowStep.logInfo( "did not capture all completed node details, will retry in {}, prior retries: {}", formatDurationFromMillis( pollingInterval ), retries );
354        else
355          flowStep.logInfo( "did not capture all completed slice details, will retry in {}, prior retries: {}", formatDurationFromMillis( pollingInterval ), retries );
356
357        lastLog = System.currentTimeMillis();
358        }
359
360      retries++;
361
362      sleepForPollingInterval();
363      }
364
365    if( !allNodesFinished )
366      flowStep.logWarn( "unable to capture all completed node details or determine final state within configured duration: {}, configure property to increase wait duration: '{}'", formatDurationFromMillis( blockForCompletedChildDetailDuration ), CascadingStats.STATS_COMPLETE_CHILD_DETAILS_BLOCK_DURATION );
367
368    if( !flowStepStats.hasCapturedFinalDetail() )
369      flowStep.logWarn( "unable to capture all completed slice details within configured duration: {}, configure property to increase wait duration: '{}'", formatDurationFromMillis( blockForCompletedChildDetailDuration ), CascadingStats.STATS_COMPLETE_CHILD_DETAILS_BLOCK_DURATION );
370    }
371
372  protected abstract boolean isRemoteExecution();
373
374  protected abstract String internalJobId();
375
376  protected abstract boolean internalNonBlockingIsSuccessful() throws IOException;
377
378  protected abstract Throwable getThrowable();
379
380  protected abstract void internalNonBlockingStart() throws IOException;
381
382  protected void blockTillCompleteOrStopped() throws IOException
383    {
384    int iterations = (int) Math.floor( statsStoreInterval / pollingInterval );
385    int count = 0;
386
387    while( true )
388      {
389      // test stop last, internalIsStartedRunning may block causing a race condition
390      if( flowStepStats.isSubmitted() && internalIsStartedRunning() && !stop )
391        {
392        markRunning();
393        flowStep.fireOnRunning();
394        }
395
396      if( flowStepStats.isRunning() )
397        updateNodesStatus(); // records node stats on node status change, not slices
398
399      if( stop || internalNonBlockingIsComplete() )
400        break;
401
402      if( iterations == count++ )
403        {
404        count = 0;
405        flowStepStats.recordStats();
406        flowStepStats.recordChildStats(); // records node and slice stats
407        }
408
409      sleepForPollingInterval();
410      }
411    }
412
413  private synchronized void markSubmitted()
414    {
415    if( flowStepStats.isStarted() )
416      {
417      flowStepStats.markSubmitted();
418
419      Collection<FlowNodeStats> children = flowStepStats.getChildren();
420
421      for( FlowNodeStats flowNodeStats : children )
422        flowNodeStats.markStarted();
423      }
424
425    Flow flow = flowStep.getFlow();
426
427    if( flow == null )
428      {
429      LOG.warn( "no parent flow set" );
430      return;
431      }
432
433    FlowStats flowStats = flow.getFlowStats();
434
435    synchronized( flowStats )
436      {
437      if( flowStats.isStarted() )
438        flowStats.markSubmitted();
439      }
440    }
441
442  private synchronized void markSkipped()
443    {
444    if( flowStepStats.isFinished() )
445      return;
446
447    try
448      {
449      flowStepStats.markSkipped();
450      flowStep.fireOnCompleted();
451      }
452    finally
453      {
454      markFlowRunning(); // move to running before marking failed
455      }
456    }
457
458  private synchronized void markRunning()
459    {
460    flowStepStats.markRunning();
461
462    markFlowRunning();
463    }
464
465  private synchronized void markFlowRunning()
466    {
467    Flow flow = flowStep.getFlow();
468
469    if( flow == null )
470      {
471      LOG.warn( "no parent flow set" );
472      return;
473      }
474
475    FlowStats flowStats = flow.getFlowStats();
476
477    synchronized( flowStats )
478      {
479      if( flowStats.isStarted() || flowStats.isSubmitted() )
480        flowStats.markRunning();
481      }
482    }
483
484  private boolean updateNodesStatus()
485    {
486    boolean allFinished = true;
487
488    Collection<FlowNodeStats> children = flowStepStats.getFlowNodeStats();
489
490    for( FlowNodeStats child : children )
491      {
492      // child#markStarted is called above
493      if( child.isFinished() || child.isPending() )
494        continue;
495
496      updateNodeStatus( child );
497
498      allFinished &= child.isFinished();
499      }
500
501    return allFinished;
502    }
503
504  protected abstract void updateNodeStatus( FlowNodeStats flowNodeStats );
505
506  protected abstract boolean internalNonBlockingIsComplete() throws IOException;
507
508  protected void sleepForPollingInterval()
509    {
510    Util.safeSleep( pollingInterval );
511    }
512
513  protected void blockOnPredecessors()
514    {
515    for( FlowStepJob predecessor : predecessors )
516      {
517      if( !predecessor.isSuccessful() )
518        {
519        flowStep.logWarn( "abandoning step: " + stepName + ", predecessor failed: " + predecessor.stepName );
520
521        stop();
522        }
523      }
524    }
525
526  protected abstract void dumpDebugInfo();
527
528  /**
529   * Method isSuccessful returns true if this step completed successfully or was skipped.
530   *
531   * @return the successful (type boolean) of this FlowStepJob object.
532   */
533  public boolean isSuccessful()
534    {
535    try
536      {
537      latch.await(); // freed after step completes in #start()
538
539      return flowStepStats.isSuccessful() || flowStepStats.isSkipped();
540      }
541    catch( InterruptedException exception )
542      {
543      flowStep.logWarn( "latch interrupted", exception );
544
545      return false;
546      }
547    }
548
549  /**
550   * Method isStarted returns true if this underlying job has started running
551   *
552   * @return boolean
553   */
554  public boolean isStarted()
555    {
556    return internalIsStartedRunning();
557    }
558
559  protected abstract boolean internalIsStartedRunning();
560
561  protected void internalCleanup()
562    {
563    // optional, safe to override
564    }
565
566  /**
567   * Method getStepStats returns the stepStats of this FlowStepJob object.
568   *
569   * @return the stepStats (type StepStats) of this FlowStepJob object.
570   */
571  public FlowStepStats getStepStats()
572    {
573    return flowStepStats;
574    }
575  }