001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.flow.hadoop.planner;
022    
023    import java.io.IOException;
024    
025    import cascading.flow.hadoop.HadoopFlowStep;
026    import cascading.flow.planner.BaseFlowStep;
027    import cascading.flow.planner.FlowStepJob;
028    import cascading.management.state.ClientState;
029    import cascading.stats.FlowStepStats;
030    import cascading.stats.hadoop.HadoopStepStats;
031    import org.apache.hadoop.mapred.JobClient;
032    import org.apache.hadoop.mapred.JobConf;
033    import org.apache.hadoop.mapred.JobStatus;
034    import org.apache.hadoop.mapred.RunningJob;
035    import org.apache.hadoop.mapred.TaskCompletionEvent;
036    
037    import static cascading.flow.FlowProps.JOB_POLLING_INTERVAL;
038    import static cascading.stats.CascadingStats.STATS_STORE_INTERVAL;
039    
040    /**
041     *
042     */
043    public class HadoopFlowStepJob extends FlowStepJob<JobConf>
044      {
045      /** static field to capture errors in hadoop local mode */
046      private static Throwable localError;
047      /** Field currentConf */
048      private final JobConf currentConf;
049      /** Field jobClient */
050      private JobClient jobClient;
051      /** Field runningJob */
052      private RunningJob runningJob;
053    
054      private static long getStoreInterval( JobConf jobConf )
055        {
056        return jobConf.getLong( STATS_STORE_INTERVAL, 60 * 1000 );
057        }
058    
059      public static long getJobPollingInterval( JobConf jobConf )
060        {
061        return jobConf.getLong( JOB_POLLING_INTERVAL, 5000 );
062        }
063    
064      public HadoopFlowStepJob( ClientState clientState, BaseFlowStep flowStep, JobConf currentConf )
065        {
066        super( clientState, flowStep, getJobPollingInterval( currentConf ), getStoreInterval( currentConf ) );
067        this.currentConf = currentConf;
068    
069        if( flowStep.isDebugEnabled() )
070          flowStep.logDebug( "using polling interval: " + pollingInterval );
071        }
072    
073      @Override
074      public JobConf getConfig()
075        {
076        return currentConf;
077        }
078    
079      @Override
080      protected FlowStepStats createStepStats( ClientState clientState )
081        {
082        return new HadoopStepStats( flowStep, clientState )
083        {
084        @Override
085        public JobClient getJobClient()
086          {
087          return jobClient;
088          }
089    
090        @Override
091        public RunningJob getRunningJob()
092          {
093          return runningJob;
094          }
095        };
096        }
097    
098      protected void internalBlockOnStop() throws IOException
099        {
100        if( runningJob != null && !runningJob.isComplete() )
101          runningJob.killJob();
102        }
103    
104      protected void internalNonBlockingStart() throws IOException
105        {
106        jobClient = new JobClient( currentConf );
107        runningJob = jobClient.submitJob( currentConf );
108    
109        flowStep.logInfo( "submitted hadoop job: " + runningJob.getID() );
110    
111        if( runningJob.getTrackingURL() != null )
112          flowStep.logInfo( "tracking url: " + runningJob.getTrackingURL() );
113        }
114    
115      protected boolean internalNonBlockingIsSuccessful() throws IOException
116        {
117        return runningJob != null && runningJob.isSuccessful();
118        }
119    
120      @Override
121      protected boolean isRemoteExecution()
122        {
123        return !( (HadoopFlowStep) flowStep ).isHadoopLocalMode( getConfig() );
124        }
125    
126      @Override
127      protected Throwable getThrowable()
128        {
129        return localError;
130        }
131    
132      protected String internalJobId()
133        {
134        return runningJob.getJobID();
135        }
136    
137      protected boolean internalNonBlockingIsComplete() throws IOException
138        {
139        return runningJob.isComplete();
140        }
141    
142      protected void dumpDebugInfo()
143        {
144        try
145          {
146          if( runningJob == null )
147            return;
148    
149          flowStep.logWarn( "hadoop job " + runningJob.getID() + " state at " + JobStatus.getJobRunState( runningJob.getJobState() ) );
150          flowStep.logWarn( "failure info: " + runningJob.getFailureInfo() );
151    
152          TaskCompletionEvent[] events = runningJob.getTaskCompletionEvents( 0 );
153          flowStep.logWarn( "task completion events identify failed tasks" );
154          flowStep.logWarn( "task completion events count: " + events.length );
155    
156          for( TaskCompletionEvent event : events )
157            flowStep.logWarn( "event = " + event );
158          }
159        catch( IOException exception )
160          {
161          flowStep.logError( "failed reading task completion events", exception );
162          }
163        }
164    
165      protected boolean internalIsStarted()
166        {
167        if( runningJob == null )
168          return false;
169    
170        try
171          {
172          return runningJob.mapProgress() > 0;
173          }
174        catch( IOException exception )
175          {
176          flowStep.logWarn( "unable to test for map progress", exception );
177          return false;
178          }
179        }
180    
181      /**
182       * Internal method to report errors that happen on hadoop local mode. Hadoops local
183       * JobRunner does not give access to TaskReports, but we want to be able to capture
184       * the exception and not just print it to stderr. FlowMapper and FlowReducer use this method.
185       *
186       * @param throwable the throwable to be reported.
187       *
188       * */
189      public static void reportLocalError( Throwable throwable )
190        {
191        localError = throwable;
192        }
193      }