001    /*
002     * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.flow.planner;
022    
023    import java.io.IOException;
024    import java.util.ArrayList;
025    import java.util.List;
026    import java.util.concurrent.Callable;
027    import java.util.concurrent.CountDownLatch;
028    
029    import cascading.flow.Flow;
030    import cascading.flow.FlowException;
031    import cascading.flow.FlowStep;
032    import cascading.flow.FlowStepStrategy;
033    import cascading.management.state.ClientState;
034    import cascading.stats.FlowStats;
035    import cascading.stats.FlowStepStats;
036    import cascading.util.Util;
037    import org.slf4j.Logger;
038    import org.slf4j.LoggerFactory;
039    
040    /**
041     *
042     */
043    public abstract class FlowStepJob<Config> implements Callable<Throwable>
044      {
045      private static final Logger LOG = LoggerFactory.getLogger( FlowStepJob.class );
046    
047      /** Field stepName */
048      protected final String stepName;
049      /** Field pollingInterval */
050      protected long pollingInterval = 1000;
051      /** Field recordStatsInterval */
052      protected long statsStoreInterval = 60 * 1000;
053      /** Field predecessors */
054      protected List<FlowStepJob<Config>> predecessors;
055      /** Field latch */
056      private final CountDownLatch latch = new CountDownLatch( 1 );
057      /** Field stop */
058      private boolean stop = false;
059      /** Field flowStep */
060      protected final BaseFlowStep<Config> flowStep;
061      /** Field stepStats */
062      protected FlowStepStats flowStepStats;
063      /** Field throwable */
064      protected Throwable throwable;
065    
066      public FlowStepJob( ClientState clientState, BaseFlowStep flowStep, long pollingInterval, long statsStoreInterval )
067        {
068        this.flowStep = flowStep;
069        this.stepName = flowStep.getName();
070        this.pollingInterval = pollingInterval;
071        this.statsStoreInterval = statsStoreInterval;
072        this.flowStepStats = createStepStats( clientState );
073    
074        this.flowStepStats.prepare();
075        this.flowStepStats.markPending();
076        }
077    
078      public abstract Config getConfig();
079    
080      protected abstract FlowStepStats createStepStats( ClientState clientState );
081    
082      public synchronized void stop()
083        {
084        if( flowStep.isInfoEnabled() )
085          flowStep.logInfo( "stopping: " + stepName );
086    
087        stop = true;
088    
089        // allow pending -> stopped transition
090        // never want a hanging pending state
091        if( !flowStepStats.isFinished() )
092          flowStepStats.markStopped();
093    
094        try
095          {
096          internalBlockOnStop();
097          }
098        catch( IOException exception )
099          {
100          flowStep.logWarn( "unable to kill job: " + stepName, exception );
101          }
102        finally
103          {
104          // call rollback after the job has been stopped, only if it was stopped
105          if( flowStepStats.isStopped() )
106            {
107            flowStep.rollbackSinks();
108            flowStep.fireOnStopping();
109            }
110    
111          flowStepStats.cleanup();
112          }
113        }
114    
115      protected abstract void internalBlockOnStop() throws IOException;
116    
117      public void setPredecessors( List<FlowStepJob<Config>> predecessors )
118        {
119        this.predecessors = predecessors;
120        }
121    
122      public Throwable call()
123        {
124        start();
125    
126        return throwable;
127        }
128    
129      protected void start()
130        {
131        try
132          {
133          if( isSkipFlowStep() )
134            {
135            markSkipped();
136    
137            if( flowStep.isInfoEnabled() && flowStepStats.isSkipped() )
138              flowStep.logInfo( "skipping step: " + stepName );
139    
140            return;
141            }
142    
143          synchronized( this ) // backport from 3.0
144            {
145            if( stop )
146              {
147              if( flowStep.isInfoEnabled() )
148                flowStep.logInfo( "stop called before start: " + stepName );
149              return;
150              }
151    
152            if( !markStarted() )
153              return;
154            }
155    
156          blockOnPredecessors();
157    
158          prepareResources(); // throws Throwable to skip next steps
159    
160          applyFlowStepConfStrategy(); // prepare resources beforehand
161    
162          blockOnJob();
163          }
164        catch( Throwable throwable )
165          {
166          dumpDebugInfo();
167    
168          this.throwable = throwable;
169    
170          if( !flowStepStats.isFinished() )
171            {
172            flowStepStats.markFailed( this.throwable );
173            flowStep.fireOnThrowable( this.throwable );
174            }
175          }
176        finally
177          {
178          latch.countDown();
179          flowStepStats.cleanup();
180          }
181        }
182    
183      private void prepareResources() throws Throwable
184        {
185        if( stop ) // true if a predecessor failed
186          return;
187    
188        Throwable throwable = flowStep.prepareResources();
189    
190        if( throwable != null )
191          throw throwable;
192        }
193    
194      private synchronized boolean markStarted()
195        {
196        if( flowStepStats.isFinished() ) // if stopped, return
197          return false;
198    
199        flowStepStats.markStarted();
200    
201        return true;
202        }
203    
204      private void applyFlowStepConfStrategy()
205        {
206        FlowStepStrategy flowStepStrategy = flowStep.getFlow().getFlowStepStrategy();
207    
208        if( flowStepStrategy == null )
209          return;
210    
211        List<FlowStep> predecessorSteps = new ArrayList<FlowStep>();
212    
213        for( FlowStepJob predecessor : predecessors )
214          predecessorSteps.add( predecessor.flowStep );
215    
216        flowStepStrategy.apply( flowStep.getFlow(), predecessorSteps, flowStep );
217        }
218    
219      protected boolean isSkipFlowStep() throws IOException
220        {
221        // if runID is not set, never skip a step
222        if( flowStep.getFlow().getRunID() == null )
223          return false;
224    
225        return flowStep.allSourcesExist() && !flowStep.areSourcesNewer( flowStep.getSinkModified() );
226        }
227    
228      protected void blockOnJob() throws IOException
229        {
230        if( stop ) // true if a predecessor failed
231          return;
232    
233        if( flowStep.isInfoEnabled() )
234          flowStep.logInfo( "starting step: " + stepName );
235    
236        internalNonBlockingStart();
237    
238        markSubmitted();
239        flowStep.fireOnStarting();
240    
241        blockTillCompleteOrStopped();
242    
243        if( !stop && !internalNonBlockingIsSuccessful() )
244          {
245          if( !flowStepStats.isFinished() )
246            {
247            flowStep.rollbackSinks();
248            flowStepStats.markFailed( getThrowable() );
249            flowStep.fireOnThrowable( getThrowable() );
250            }
251    
252          dumpDebugInfo();
253    
254          // if available, rethrow the unrecoverable error
255          if( getThrowable() instanceof OutOfMemoryError )
256            throw ( (OutOfMemoryError) getThrowable() );
257    
258          if( !isRemoteExecution() )
259            this.throwable = new FlowException( "local step failed", getThrowable() );
260          else
261            this.throwable = new FlowException( "step failed: " + stepName + ", with job id: " + internalJobId() + ", please see cluster logs for failure messages" );
262          }
263        else
264          {
265          if( internalNonBlockingIsSuccessful() && !flowStepStats.isFinished() )
266            {
267            this.throwable = flowStep.commitSinks();
268    
269            if( this.throwable != null )
270              {
271              flowStepStats.markFailed( this.throwable );
272              flowStep.fireOnThrowable( this.throwable );
273              }
274            else
275              {
276              flowStepStats.markSuccessful();
277              flowStep.fireOnCompleted();
278              }
279            }
280          }
281    
282        flowStepStats.recordChildStats();
283        }
284    
285      protected abstract boolean isRemoteExecution();
286    
287      protected abstract String internalJobId();
288    
289      protected abstract boolean internalNonBlockingIsSuccessful() throws IOException;
290    
291      protected abstract Throwable getThrowable();
292    
293      protected abstract void internalNonBlockingStart() throws IOException;
294    
295      protected void blockTillCompleteOrStopped() throws IOException
296        {
297        int iterations = (int) Math.floor( statsStoreInterval / pollingInterval );
298        int count = 0;
299    
300        while( true )
301          {
302          if( flowStepStats.isSubmitted() && isStarted() )
303            {
304            markRunning();
305            flowStep.fireOnRunning();
306            }
307    
308          if( stop || internalNonBlockingIsComplete() )
309            break;
310    
311          sleepForPollingInterval();
312    
313          if( iterations == count++ )
314            {
315            count = 0;
316            flowStepStats.recordStats();
317            flowStepStats.recordChildStats();
318            }
319          }
320        }
321    
322      private synchronized void markSubmitted()
323        {
324        if( flowStepStats.isStarted() )
325          flowStepStats.markSubmitted();
326    
327        Flow flow = flowStep.getFlow();
328    
329        if( flow == null )
330          {
331          LOG.warn( "no parent flow set" );
332          return;
333          }
334    
335        FlowStats flowStats = flow.getFlowStats();
336    
337        synchronized( flowStats )
338          {
339          if( flowStats.isStarted() )
340            flowStats.markSubmitted();
341          }
342        }
343    
344      private synchronized void markRunning()
345        {
346        flowStepStats.markRunning();
347    
348        markFlowRunning();
349        }
350    
351      private synchronized void markSkipped()
352        {
353        if( flowStepStats.isFinished() )
354          return;
355    
356        flowStepStats.markSkipped();
357    
358        markFlowRunning();
359        }
360    
361      private synchronized void markFlowRunning()
362        {
363        Flow flow = flowStep.getFlow();
364    
365        if( flow == null )
366          {
367          LOG.warn( "no parent flow set" );
368          return;
369          }
370    
371        FlowStats flowStats = flow.getFlowStats();
372    
373        synchronized( flowStats )
374          {
375          if( flowStats.isStarted() || flowStats.isSubmitted() )
376            flowStats.markRunning();
377          }
378        }
379    
380      protected abstract boolean internalNonBlockingIsComplete() throws IOException;
381    
382      protected void sleepForPollingInterval()
383        {
384        Util.safeSleep( pollingInterval );
385        }
386    
387      protected void blockOnPredecessors()
388        {
389        for( FlowStepJob predecessor : predecessors )
390          {
391          if( !predecessor.isSuccessful() )
392            {
393            flowStep.logWarn( "abandoning step: " + stepName + ", predecessor failed: " + predecessor.stepName );
394    
395            stop();
396            }
397          }
398        }
399    
400      protected abstract void dumpDebugInfo();
401    
402      /**
403       * Method isSuccessful returns true if this step completed successfully or was skipped.
404       *
405       * @return the successful (type boolean) of this FlowStepJob object.
406       */
407      public boolean isSuccessful()
408        {
409        try
410          {
411          latch.await(); // freed after step completes in #start()
412    
413          return flowStepStats.isSuccessful() || flowStepStats.isSkipped();
414          }
415        catch( InterruptedException exception )
416          {
417          flowStep.logWarn( "latch interrupted", exception );
418    
419          return false;
420          }
421        catch( NullPointerException exception )
422          {
423          throw new FlowException( "Hadoop is not keeping a large enough job history, please increase the \'mapred.jobtracker.completeuserjobs.maximum\' property", exception );
424          }
425        }
426    
427      /**
428       * Method wasStarted returns true if this job was started
429       *
430       * @return boolean
431       */
432      public boolean isStarted()
433        {
434        return internalIsStarted();
435        }
436    
437      protected abstract boolean internalIsStarted();
438    
439      /**
440       * Method getStepStats returns the stepStats of this FlowStepJob object.
441       *
442       * @return the stepStats (type StepStats) of this FlowStepJob object.
443       */
444      public FlowStepStats getStepStats()
445        {
446        return flowStepStats;
447        }
448      }