001/* 002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.flow.hadoop; 022 023import java.io.IOException; 024import java.util.Map; 025 026import cascading.flow.BaseFlow; 027import cascading.flow.Flow; 028import cascading.flow.FlowDef; 029import cascading.flow.FlowException; 030import cascading.flow.FlowProcess; 031import cascading.flow.FlowStep; 032import cascading.flow.hadoop.util.HadoopMRUtil; 033import cascading.flow.hadoop.util.HadoopUtil; 034import cascading.flow.planner.BaseFlowStep; 035import cascading.flow.planner.PlatformInfo; 036import cascading.property.PropertyUtil; 037import cascading.tap.hadoop.io.HttpFileSystem; 038import cascading.util.ShutdownUtil; 039import org.apache.hadoop.fs.Path; 040import org.apache.hadoop.mapred.JobConf; 041import riffle.process.ProcessConfiguration; 042 043import static cascading.flow.FlowProps.MAX_CONCURRENT_STEPS; 044import static cascading.flow.FlowProps.PRESERVE_TEMPORARY_FILES; 045 046/** 047 * Class HadoopFlow is the Apache Hadoop specific implementation of a {@link Flow}. 048 * <p/> 049 * HadoopFlow must be created through a {@link cascading.flow.FlowConnector} sub-class instance. 050 * <p/> 051 * If classpath paths are provided on the {@link FlowDef}, the Hadoop distributed cache mechanism will be used 052 * to augment the remote classpath. 053 * <p/> 054 * Any path elements that are relative will be uploaded to HDFS, and the HDFS URI will be used on the JobConf. Note 055 * all paths are added as "files" to the JobConf, not archives, so they aren't needlessly uncompressed cluster side. 056 * 057 * @see cascading.flow.FlowConnector 058 */ 059public class HadoopFlow extends BaseFlow<JobConf> 060 { 061 /** Field hdfsShutdown */ 062 private static Thread hdfsShutdown = null; 063 /** Field shutdownHook */ 064 private static ShutdownUtil.Hook shutdownHook; 065 /** Field jobConf */ 066 private transient JobConf jobConf; 067 /** Field preserveTemporaryFiles */ 068 private boolean preserveTemporaryFiles = false; 069 /** Field syncPaths */ 070 private transient Map<Path, Path> syncPaths; 071 072 protected HadoopFlow() 073 { 074 } 075 076 /** 077 * Returns property preserveTemporaryFiles. 078 * 079 * @param properties of type Map 080 * @return a boolean 081 */ 082 static boolean getPreserveTemporaryFiles( Map<Object, Object> properties ) 083 { 084 return Boolean.parseBoolean( PropertyUtil.getProperty( properties, PRESERVE_TEMPORARY_FILES, "false" ) ); 085 } 086 087 static int getMaxConcurrentSteps( JobConf jobConf ) 088 { 089 return jobConf.getInt( MAX_CONCURRENT_STEPS, 0 ); 090 } 091 092 protected HadoopFlow( PlatformInfo platformInfo, Map<Object, Object> properties, JobConf jobConf, String name, Map<String, String> flowDescriptor ) 093 { 094 super( platformInfo, properties, jobConf, name, flowDescriptor ); 095 096 initFromProperties( properties ); 097 } 098 099 public HadoopFlow( PlatformInfo platformInfo, Map<Object, Object> properties, JobConf jobConf, FlowDef flowDef ) 100 { 101 super( platformInfo, properties, jobConf, flowDef ); 102 103 initFromProperties( properties ); 104 } 105 106 @Override 107 protected void initFromProperties( Map<Object, Object> properties ) 108 { 109 super.initFromProperties( properties ); 110 preserveTemporaryFiles = getPreserveTemporaryFiles( properties ); 111 } 112 113 protected void initConfig( Map<Object, Object> properties, JobConf parentConfig ) 114 { 115 if( properties != null ) 116 parentConfig = createConfig( properties, parentConfig ); 117 118 if( parentConfig == null ) // this is ok, getJobConf will pass a default parent in 119 return; 120 121 jobConf = HadoopUtil.copyJobConf( parentConfig ); // prevent local values from being shared 122 jobConf.set( "fs.http.impl", HttpFileSystem.class.getName() ); 123 jobConf.set( "fs.https.impl", HttpFileSystem.class.getName() ); 124 125 syncPaths = HadoopMRUtil.addToClassPath( jobConf, getClassPath() ); 126 } 127 128 @Override 129 protected void setConfigProperty( JobConf config, Object key, Object value ) 130 { 131 // don't let these objects pass, even though toString is called below. 132 if( value instanceof Class || value instanceof JobConf || value == null ) 133 return; 134 135 config.set( key.toString(), value.toString() ); 136 } 137 138 @Override 139 protected JobConf newConfig( JobConf defaultConfig ) 140 { 141 return defaultConfig == null ? new JobConf() : HadoopUtil.copyJobConf( defaultConfig ); 142 } 143 144 @ProcessConfiguration 145 @Override 146 public JobConf getConfig() 147 { 148 if( jobConf == null ) 149 initConfig( null, new JobConf() ); 150 151 return jobConf; 152 } 153 154 @Override 155 public JobConf getConfigCopy() 156 { 157 return HadoopUtil.copyJobConf( getConfig() ); 158 } 159 160 @Override 161 public Map<Object, Object> getConfigAsProperties() 162 { 163 return HadoopUtil.createProperties( getConfig() ); 164 } 165 166 /** 167 * Method getProperty returns the value associated with the given key from the underlying properties system. 168 * 169 * @param key of type String 170 * @return String 171 */ 172 public String getProperty( String key ) 173 { 174 return getConfig().get( key ); 175 } 176 177 @Override 178 public FlowProcess<JobConf> getFlowProcess() 179 { 180 return new HadoopFlowProcess( getFlowSession(), getConfig() ); 181 } 182 183 /** 184 * Method isPreserveTemporaryFiles returns false if temporary files will be cleaned when this Flow completes. 185 * 186 * @return the preserveTemporaryFiles (type boolean) of this Flow object. 187 */ 188 public boolean isPreserveTemporaryFiles() 189 { 190 return preserveTemporaryFiles; 191 } 192 193 @Override 194 protected void internalStart() 195 { 196 try 197 { 198 copyToDistributedCache(); 199 deleteSinksIfReplace(); 200 deleteTrapsIfReplace(); 201 deleteCheckpointsIfReplace(); 202 } 203 catch( IOException exception ) 204 { 205 throw new FlowException( "unable to delete sinks", exception ); 206 } 207 208 registerHadoopShutdownHook(); 209 } 210 211 protected void registerHadoopShutdownHook() 212 { 213 registerHadoopShutdownHook( this ); 214 } 215 216 protected void copyToDistributedCache() 217 { 218 HadoopUtil.syncPaths( jobConf, syncPaths, true ); 219 } 220 221 @Override 222 public boolean stepsAreLocal() 223 { 224 return HadoopUtil.isLocal( getConfig() ); 225 } 226 227 private void cleanTemporaryFiles( boolean stop ) 228 { 229 if( stop ) // unstable to call fs operations during shutdown 230 return; 231 232 // use step config so cascading.flow.step.path property is properly used 233 for( FlowStep<JobConf> step : getFlowSteps() ) 234 ( (BaseFlowStep<JobConf>) step ).clean(); 235 } 236 237 private static synchronized void registerHadoopShutdownHook( Flow flow ) 238 { 239 if( !flow.isStopJobsOnExit() ) 240 return; 241 242 // guaranteed singleton here 243 if( shutdownHook != null ) 244 return; 245 246 getHdfsShutdownHook(); 247 248 shutdownHook = new ShutdownUtil.Hook() 249 { 250 @Override 251 public Priority priority() 252 { 253 return Priority.LAST; // very last thing to happen 254 } 255 256 @Override 257 public void execute() 258 { 259 callHdfsShutdownHook(); 260 } 261 }; 262 263 ShutdownUtil.addHook( shutdownHook ); 264 } 265 266 private synchronized static void callHdfsShutdownHook() 267 { 268 if( hdfsShutdown != null ) 269 hdfsShutdown.start(); 270 } 271 272 private synchronized static void getHdfsShutdownHook() 273 { 274 if( hdfsShutdown == null ) 275 hdfsShutdown = HadoopUtil.getHDFSShutdownHook(); 276 } 277 278 protected void internalClean( boolean stop ) 279 { 280 if( !isPreserveTemporaryFiles() ) 281 cleanTemporaryFiles( stop ); 282 } 283 284 protected void internalShutdown() 285 { 286 } 287 288 protected int getMaxNumParallelSteps() 289 { 290 return stepsAreLocal() ? 1 : getMaxConcurrentSteps( getConfig() ); 291 } 292 293 @Override 294 protected long getTotalSliceCPUMilliSeconds() 295 { 296 // this is a hadoop2 MR specific counter/value 297 long counterValue = flowStats.getCounterValue( "org.apache.hadoop.mapreduce.TaskCounter", "CPU_MILLISECONDS" ); 298 299 if( counterValue == 0 ) 300 return -1; 301 302 return counterValue; 303 } 304 }