001/*
002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow.hadoop.planner;
022
023import java.io.IOException;
024
025import cascading.flow.FlowException;
026import cascading.flow.hadoop.HadoopFlowStep;
027import cascading.flow.planner.BaseFlowStep;
028import cascading.flow.planner.FlowStepJob;
029import cascading.management.state.ClientState;
030import cascading.stats.FlowNodeStats;
031import cascading.stats.FlowStepStats;
032import cascading.stats.hadoop.HadoopStepStats;
033import org.apache.hadoop.mapred.JobClient;
034import org.apache.hadoop.mapred.JobConf;
035import org.apache.hadoop.mapred.JobStatus;
036import org.apache.hadoop.mapred.RunningJob;
037import org.apache.hadoop.mapred.TaskCompletionEvent;
038
039import static cascading.flow.FlowProps.JOB_POLLING_INTERVAL;
040import static cascading.stats.CascadingStats.STATS_COMPLETE_CHILD_DETAILS_BLOCK_DURATION;
041import static cascading.stats.CascadingStats.STATS_STORE_INTERVAL;
042
043/**
044 *
045 */
046public class HadoopFlowStepJob extends FlowStepJob<JobConf>
047  {
048  /** static field to capture errors in hadoop local mode */
049  private static Throwable localError;
050  /** Field jobClient */
051  private JobClient jobClient;
052  /** Field runningJob */
053  private RunningJob runningJob;
054
055  private static long getStoreInterval( JobConf jobConf )
056    {
057    return jobConf.getLong( STATS_STORE_INTERVAL, 60 * 1000 );
058    }
059
060  private static long getChildDetailsBlockingDuration( JobConf jobConf )
061    {
062    return jobConf.getLong( STATS_COMPLETE_CHILD_DETAILS_BLOCK_DURATION, 60 * 1000 );
063    }
064
065  public static long getJobPollingInterval( JobConf jobConf )
066    {
067    return jobConf.getLong( JOB_POLLING_INTERVAL, 5000 );
068    }
069
070  public HadoopFlowStepJob( ClientState clientState, BaseFlowStep<JobConf> flowStep, JobConf currentConf )
071    {
072    super( clientState, currentConf, flowStep, getJobPollingInterval( currentConf ), getStoreInterval( currentConf ), getChildDetailsBlockingDuration( currentConf ) );
073
074    if( flowStep.isDebugEnabled() )
075      flowStep.logDebug( "using polling interval: " + pollingInterval );
076    }
077
078  @Override
079  protected FlowStepStats createStepStats( ClientState clientState )
080    {
081    return new HadoopStepStats( flowStep, clientState )
082    {
083    @Override
084    public JobClient getJobClient()
085      {
086      return jobClient;
087      }
088
089    @Override
090    public RunningJob getJobStatusClient()
091      {
092      return runningJob;
093      }
094    };
095    }
096
097  protected void internalBlockOnStop() throws IOException
098    {
099    if( runningJob != null && !runningJob.isComplete() )
100      runningJob.killJob();
101    }
102
103  protected void internalNonBlockingStart() throws IOException
104    {
105    jobClient = new JobClient( jobConfiguration );
106    runningJob = jobClient.submitJob( jobConfiguration );
107
108    flowStep.logInfo( "submitted hadoop job: " + runningJob.getID() );
109
110    if( runningJob.getTrackingURL() != null )
111      flowStep.logInfo( "tracking url: " + runningJob.getTrackingURL() );
112    }
113
114  @Override
115  protected void updateNodeStatus( FlowNodeStats flowNodeStats )
116    {
117    try
118      {
119      if( runningJob == null || flowNodeStats.isFinished() )
120        return;
121
122      boolean isMapper = flowNodeStats.getOrdinal() == 0;
123      int jobState = runningJob.getJobState();
124
125      if( JobStatus.FAILED == jobState )
126        {
127        flowNodeStats.markFailed();
128        return;
129        }
130
131      if( JobStatus.KILLED == jobState )
132        {
133        flowNodeStats.markStopped();
134        return;
135        }
136
137      float progress;
138
139      if( isMapper )
140        progress = runningJob.mapProgress();
141      else
142        progress = runningJob.reduceProgress();
143
144      if( progress == 0.0F ) // not yet running, is only started
145        return;
146
147      if( progress != 1.0F )
148        {
149        flowNodeStats.markRunning();
150        return;
151        }
152
153      flowNodeStats.markRunning();
154
155      if( isMapper && runningJob.reduceProgress() > 0.0F )
156        {
157        flowNodeStats.markSuccessful();
158        return;
159        }
160
161      if( JobStatus.SUCCEEDED == jobState )
162        flowNodeStats.markSuccessful();
163      }
164    catch( IOException exception )
165      {
166      flowStep.logError( "failed setting node status", throwable );
167      }
168    }
169
170  @Override
171  public boolean isSuccessful()
172    {
173    try
174      {
175      return super.isSuccessful();
176      }
177    catch( NullPointerException exception )
178      {
179      throw new FlowException( "Hadoop is not keeping a large enough job history, please increase the \'mapred.jobtracker.completeuserjobs.maximum\' property", exception );
180      }
181    }
182
183  protected boolean internalNonBlockingIsSuccessful() throws IOException
184    {
185    return runningJob != null && runningJob.isSuccessful();
186    }
187
188  @Override
189  protected boolean isRemoteExecution()
190    {
191    return !( (HadoopFlowStep) flowStep ).isHadoopLocalMode( getConfig() );
192    }
193
194  @Override
195  protected Throwable getThrowable()
196    {
197    return localError;
198    }
199
200  protected String internalJobId()
201    {
202    return runningJob.getJobID();
203    }
204
205  protected boolean internalNonBlockingIsComplete() throws IOException
206    {
207    return runningJob.isComplete();
208    }
209
210  protected void dumpDebugInfo()
211    {
212    try
213      {
214      if( runningJob == null )
215        return;
216
217      int jobState = runningJob.getJobState(); // may throw an NPE internally
218
219      flowStep.logWarn( "hadoop job " + runningJob.getID() + " state at " + JobStatus.getJobRunState( jobState ) );
220      flowStep.logWarn( "failure info: " + runningJob.getFailureInfo() );
221
222      TaskCompletionEvent[] events = runningJob.getTaskCompletionEvents( 0 );
223      flowStep.logWarn( "task completion events identify failed tasks" );
224      flowStep.logWarn( "task completion events count: " + events.length );
225
226      for( TaskCompletionEvent event : events )
227        flowStep.logWarn( "event = " + event );
228      }
229    catch( Throwable throwable )
230      {
231      flowStep.logError( "failed reading task completion events", throwable );
232      }
233    }
234
235  protected boolean internalIsStartedRunning()
236    {
237    if( runningJob == null )
238      return false;
239
240    try
241      {
242      return runningJob.mapProgress() > 0;
243      }
244    catch( IOException exception )
245      {
246      flowStep.logWarn( "unable to test for map progress", exception );
247      return false;
248      }
249    }
250
251  /**
252   * Internal method to report errors that happen on hadoop local mode. Hadoops local
253   * JobRunner does not give access to TaskReports, but we want to be able to capture
254   * the exception and not just print it to stderr. FlowMapper and FlowReducer use this method.
255   *
256   * @param throwable the throwable to be reported.
257   */
258  public static void reportLocalError( Throwable throwable )
259    {
260    localError = throwable;
261    }
262  }