001/* 002 * Copyright (c) 2016-2017 Chris K Wensel <chris@wensel.net>. All Rights Reserved. 003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 004 * 005 * Project and contact information: http://www.cascading.org/ 006 * 007 * This file is part of the Cascading project. 008 * 009 * Licensed under the Apache License, Version 2.0 (the "License"); 010 * you may not use this file except in compliance with the License. 011 * You may obtain a copy of the License at 012 * 013 * http://www.apache.org/licenses/LICENSE-2.0 014 * 015 * Unless required by applicable law or agreed to in writing, software 016 * distributed under the License is distributed on an "AS IS" BASIS, 017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 018 * See the License for the specific language governing permissions and 019 * limitations under the License. 020 */ 021 022package cascading.flow.tez; 023 024import java.io.IOException; 025import java.util.Map; 026 027import cascading.flow.BaseFlow; 028import cascading.flow.Flow; 029import cascading.flow.FlowDef; 030import cascading.flow.FlowException; 031import cascading.flow.FlowProcess; 032import cascading.flow.FlowStep; 033import cascading.flow.hadoop.util.HadoopUtil; 034import cascading.flow.planner.BaseFlowStep; 035import cascading.flow.planner.PlatformInfo; 036import cascading.property.PropertyUtil; 037import cascading.tap.hadoop.io.HttpFileSystem; 038import cascading.util.ShutdownUtil; 039import org.apache.hadoop.conf.Configuration; 040import org.apache.hadoop.fs.Path; 041import org.apache.hadoop.security.UserGroupInformation; 042import org.apache.tez.common.counters.TaskCounter; 043import org.apache.tez.dag.api.TezConfiguration; 044import riffle.process.ProcessConfiguration; 045 046import static cascading.flow.FlowProps.MAX_CONCURRENT_STEPS; 047import static cascading.flow.FlowProps.PRESERVE_TEMPORARY_FILES; 048 049/** 050 * Class HadoopFlow is the Apache Hadoop specific implementation of a {@link cascading.flow.Flow}. 051 * <p> 052 * HadoopFlow must be created through a {@link cascading.flow.tez.Hadoop2TezFlowConnector} instance. 053 * <p> 054 * If classpath paths are provided on the {@link cascading.flow.FlowDef}, the Hadoop distributed cache mechanism will be used 055 * to augment the remote classpath. 056 * <p> 057 * Any path elements that are relative will be uploaded to HDFS, and the HDFS URI will be used on the JobConf. Note 058 * all paths are added as "files" to the JobConf, not archives, so they aren't needlessly uncompressed cluster side. 059 * 060 * @see cascading.flow.tez.Hadoop2TezFlowConnector 061 */ 062public class Hadoop2TezFlow extends BaseFlow<TezConfiguration> 063 { 064 /** Field hdfsShutdown */ 065 private static Thread hdfsShutdown = null; 066 /** Field shutdownHook */ 067 private static ShutdownUtil.Hook shutdownHook; 068 /** Field jobConf */ 069 private transient TezConfiguration flowConf; 070 /** Field preserveTemporaryFiles */ 071 private boolean preserveTemporaryFiles = false; 072 073 private String flowStagingPath; 074 075 protected Hadoop2TezFlow() 076 { 077 } 078 079 /** 080 * Returns property preserveTemporaryFiles. 081 * 082 * @param properties of type Map 083 * @return a boolean 084 */ 085 static boolean getPreserveTemporaryFiles( Map<Object, Object> properties ) 086 { 087 return Boolean.parseBoolean( PropertyUtil.getProperty( properties, PRESERVE_TEMPORARY_FILES, "false" ) ); 088 } 089 090 static int getMaxConcurrentSteps( TezConfiguration jobConf ) 091 { 092 return jobConf.getInt( MAX_CONCURRENT_STEPS, 0 ); 093 } 094 095 public Hadoop2TezFlow( PlatformInfo platformInfo, Map<Object, Object> properties, TezConfiguration flowConf, FlowDef flowDef ) 096 { 097 super( platformInfo, properties, flowConf, flowDef ); 098 099 initFromProperties( properties ); 100 } 101 102 @Override 103 protected void initFromProperties( Map<Object, Object> properties ) 104 { 105 super.initFromProperties( properties ); 106 107 preserveTemporaryFiles = getPreserveTemporaryFiles( properties ); 108 } 109 110 protected void initConfig( Map<Object, Object> properties, TezConfiguration parentConfig ) 111 { 112 if( properties != null ) 113 parentConfig = createConfig( properties, parentConfig ); 114 115 if( parentConfig == null ) // this is ok, getJobConf will pass a default parent in 116 return; 117 118 flowConf = new TezConfiguration( parentConfig ); // prevent local values from being shared 119 flowConf.set( "fs.http.impl", HttpFileSystem.class.getName() ); 120 flowConf.set( "fs.https.impl", HttpFileSystem.class.getName() ); 121 122 UserGroupInformation.setConfiguration( flowConf ); 123 124 flowStagingPath = createStagingRoot(); 125 } 126 127 public String getFlowStagingPath() 128 { 129 if( flowStagingPath == null ) 130 flowStagingPath = createStagingRoot(); 131 132 return flowStagingPath; 133 } 134 135 private String createStagingRoot() 136 { 137 return ".staging" + Path.SEPARATOR + getID(); 138 } 139 140 @Override 141 protected void setConfigProperty( TezConfiguration config, Object key, Object value ) 142 { 143 // don't let these objects pass, even though toString is called below. 144 if( value instanceof Class || value instanceof Configuration || value == null ) 145 return; 146 147 config.set( key.toString(), value.toString() ); 148 } 149 150 @Override 151 protected TezConfiguration newConfig( TezConfiguration defaultConfig ) 152 { 153 return defaultConfig == null ? new TezConfiguration() : new TezConfiguration( defaultConfig ); 154 } 155 156 @ProcessConfiguration 157 @Override 158 public TezConfiguration getConfig() 159 { 160 if( flowConf == null ) 161 initConfig( null, new TezConfiguration() ); 162 163 return flowConf; 164 } 165 166 @Override 167 public TezConfiguration getConfigCopy() 168 { 169 return new TezConfiguration( getConfig() ); 170 } 171 172 @Override 173 public Map<Object, Object> getConfigAsProperties() 174 { 175 return HadoopUtil.createProperties( getConfig() ); 176 } 177 178 /** 179 * Method getProperty returns the value associated with the given key from the underlying properties system. 180 * 181 * @param key of type String 182 * @return String 183 */ 184 public String getProperty( String key ) 185 { 186 return getConfig().get( key ); 187 } 188 189 @Override 190 public FlowProcess<TezConfiguration> getFlowProcess() 191 { 192 return new Hadoop2TezFlowProcess( getFlowSession(), null, getConfig() ); 193 } 194 195 /** 196 * Method isPreserveTemporaryFiles returns false if temporary files will be cleaned when this Flow completes. 197 * 198 * @return the preserveTemporaryFiles (type boolean) of this Flow object. 199 */ 200 public boolean isPreserveTemporaryFiles() 201 { 202 return preserveTemporaryFiles; 203 } 204 205 @Override 206 protected void internalStart() 207 { 208 try 209 { 210 copyArtifactsToRemote(); 211 deleteSinksIfReplace(); 212 deleteTrapsIfReplace(); 213 deleteCheckpointsIfReplace(); 214 } 215 catch( IOException exception ) 216 { 217 throw new FlowException( "unable to delete sinks", exception ); 218 } 219 220 registerHadoopShutdownHook( this ); 221 } 222 223 private void copyArtifactsToRemote() 224 { 225 for( FlowStep<TezConfiguration> flowStep : getFlowSteps() ) 226 ( (Hadoop2TezFlowStep) flowStep ).syncArtifacts(); 227 } 228 229 @Override 230 public boolean stepsAreLocal() 231 { 232 return HadoopUtil.isLocal( getConfig() ); 233 } 234 235 private void cleanTemporaryFiles( boolean stop ) 236 { 237 if( stop ) // unstable to call fs operations during shutdown 238 return; 239 240 // use step config so cascading.flow.step.path property is properly used 241 for( FlowStep<TezConfiguration> step : getFlowSteps() ) 242 ( (BaseFlowStep<TezConfiguration>) step ).clean(); 243 } 244 245 private static synchronized void registerHadoopShutdownHook( Flow flow ) 246 { 247 if( !flow.isStopJobsOnExit() ) 248 return; 249 250 // guaranteed singleton here 251 if( shutdownHook != null ) 252 return; 253 254 getHdfsShutdownHook(); 255 256 shutdownHook = new ShutdownUtil.Hook() 257 { 258 @Override 259 public Priority priority() 260 { 261 return Priority.LAST; // very last thing to happen 262 } 263 264 @Override 265 public void execute() 266 { 267 callHdfsShutdownHook(); 268 } 269 }; 270 271 ShutdownUtil.addHook( shutdownHook ); 272 } 273 274 private synchronized static void callHdfsShutdownHook() 275 { 276 if( hdfsShutdown != null ) 277 hdfsShutdown.start(); 278 } 279 280 private synchronized static void getHdfsShutdownHook() 281 { 282 if( hdfsShutdown == null ) 283 hdfsShutdown = HadoopUtil.getHDFSShutdownHook(); 284 } 285 286 protected void internalClean( boolean stop ) 287 { 288 if( !isPreserveTemporaryFiles() ) 289 cleanTemporaryFiles( stop ); 290 } 291 292 protected void internalShutdown() 293 { 294 } 295 296 protected int getMaxNumParallelSteps() 297 { 298 return stepsAreLocal() ? 1 : getMaxConcurrentSteps( getConfig() ); 299 } 300 301 @Override 302 protected long getTotalSliceCPUMilliSeconds() 303 { 304 long counterValue = flowStats.getCounterValue( TaskCounter.CPU_MILLISECONDS ); 305 306 if( counterValue == 0 ) 307 return -1; 308 309 return counterValue; 310 } 311 }