001 /* 002 * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.flow.hadoop.planner; 022 023 import java.io.IOException; 024 025 import cascading.flow.hadoop.HadoopFlowStep; 026 import cascading.flow.planner.BaseFlowStep; 027 import cascading.flow.planner.FlowStepJob; 028 import cascading.management.state.ClientState; 029 import cascading.stats.FlowStepStats; 030 import cascading.stats.hadoop.HadoopStepStats; 031 import org.apache.hadoop.mapred.JobClient; 032 import org.apache.hadoop.mapred.JobConf; 033 import org.apache.hadoop.mapred.JobStatus; 034 import org.apache.hadoop.mapred.RunningJob; 035 import org.apache.hadoop.mapred.TaskCompletionEvent; 036 037 import static cascading.flow.FlowProps.JOB_POLLING_INTERVAL; 038 import static cascading.stats.CascadingStats.STATS_STORE_INTERVAL; 039 040 /** 041 * 042 */ 043 public class HadoopFlowStepJob extends FlowStepJob<JobConf> 044 { 045 /** static field to capture errors in hadoop local mode */ 046 private static Throwable localError; 047 /** Field currentConf */ 048 private final JobConf currentConf; 049 /** Field jobClient */ 050 private JobClient jobClient; 051 /** Field runningJob */ 052 private RunningJob runningJob; 053 054 private static long getStoreInterval( JobConf jobConf ) 055 { 056 return jobConf.getLong( STATS_STORE_INTERVAL, 60 * 1000 ); 057 } 058 059 public static long getJobPollingInterval( JobConf jobConf ) 060 { 061 return jobConf.getLong( JOB_POLLING_INTERVAL, 5000 ); 062 } 063 064 public HadoopFlowStepJob( ClientState clientState, BaseFlowStep flowStep, JobConf currentConf ) 065 { 066 super( clientState, flowStep, getJobPollingInterval( currentConf ), getStoreInterval( currentConf ) ); 067 this.currentConf = currentConf; 068 069 if( flowStep.isDebugEnabled() ) 070 flowStep.logDebug( "using polling interval: " + pollingInterval ); 071 } 072 073 @Override 074 public JobConf getConfig() 075 { 076 return currentConf; 077 } 078 079 @Override 080 protected FlowStepStats createStepStats( ClientState clientState ) 081 { 082 return new HadoopStepStats( flowStep, clientState ) 083 { 084 @Override 085 public JobClient getJobClient() 086 { 087 return jobClient; 088 } 089 090 @Override 091 public RunningJob getRunningJob() 092 { 093 return runningJob; 094 } 095 }; 096 } 097 098 protected void internalBlockOnStop() throws IOException 099 { 100 if( runningJob != null && !runningJob.isComplete() ) 101 runningJob.killJob(); 102 } 103 104 protected void internalNonBlockingStart() throws IOException 105 { 106 jobClient = new JobClient( currentConf ); 107 runningJob = jobClient.submitJob( currentConf ); 108 109 flowStep.logInfo( "submitted hadoop job: " + runningJob.getID() ); 110 111 if( runningJob.getTrackingURL() != null ) 112 flowStep.logInfo( "tracking url: " + runningJob.getTrackingURL() ); 113 } 114 115 protected boolean internalNonBlockingIsSuccessful() throws IOException 116 { 117 return runningJob != null && runningJob.isSuccessful(); 118 } 119 120 @Override 121 protected boolean isRemoteExecution() 122 { 123 return !( (HadoopFlowStep) flowStep ).isHadoopLocalMode( getConfig() ); 124 } 125 126 @Override 127 protected Throwable getThrowable() 128 { 129 return localError; 130 } 131 132 protected String internalJobId() 133 { 134 return runningJob.getJobID(); 135 } 136 137 protected boolean internalNonBlockingIsComplete() throws IOException 138 { 139 return runningJob.isComplete(); 140 } 141 142 protected void dumpDebugInfo() 143 { 144 try 145 { 146 if( runningJob == null ) 147 return; 148 149 flowStep.logWarn( "hadoop job " + runningJob.getID() + " state at " + JobStatus.getJobRunState( runningJob.getJobState() ) ); 150 flowStep.logWarn( "failure info: " + runningJob.getFailureInfo() ); 151 152 TaskCompletionEvent[] events = runningJob.getTaskCompletionEvents( 0 ); 153 flowStep.logWarn( "task completion events identify failed tasks" ); 154 flowStep.logWarn( "task completion events count: " + events.length ); 155 156 for( TaskCompletionEvent event : events ) 157 flowStep.logWarn( "event = " + event ); 158 } 159 catch( IOException exception ) 160 { 161 flowStep.logError( "failed reading task completion events", exception ); 162 } 163 } 164 165 protected boolean internalIsStarted() 166 { 167 if( runningJob == null ) 168 return false; 169 170 try 171 { 172 return runningJob.mapProgress() > 0; 173 } 174 catch( IOException exception ) 175 { 176 flowStep.logWarn( "unable to test for map progress", exception ); 177 return false; 178 } 179 } 180 181 /** 182 * Internal method to report errors that happen on hadoop local mode. Hadoops local 183 * JobRunner does not give access to TaskReports, but we want to be able to capture 184 * the exception and not just print it to stderr. FlowMapper and FlowReducer use this method. 185 * 186 * @param throwable the throwable to be reported. 187 * 188 * */ 189 public static void reportLocalError( Throwable throwable ) 190 { 191 localError = throwable; 192 } 193 }