001/* 002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.flow.tez; 022 023import java.io.IOException; 024import java.util.Map; 025 026import cascading.flow.BaseFlow; 027import cascading.flow.Flow; 028import cascading.flow.FlowDef; 029import cascading.flow.FlowException; 030import cascading.flow.FlowProcess; 031import cascading.flow.FlowStep; 032import cascading.flow.hadoop.util.HadoopUtil; 033import cascading.flow.planner.BaseFlowStep; 034import cascading.flow.planner.PlatformInfo; 035import cascading.property.PropertyUtil; 036import cascading.tap.hadoop.io.HttpFileSystem; 037import cascading.util.ShutdownUtil; 038import org.apache.hadoop.conf.Configuration; 039import org.apache.hadoop.fs.Path; 040import org.apache.hadoop.security.UserGroupInformation; 041import org.apache.tez.common.counters.TaskCounter; 042import org.apache.tez.dag.api.TezConfiguration; 043import riffle.process.ProcessConfiguration; 044 045import static cascading.flow.FlowProps.MAX_CONCURRENT_STEPS; 046import static cascading.flow.FlowProps.PRESERVE_TEMPORARY_FILES; 047 048/** 049 * Class HadoopFlow is the Apache Hadoop specific implementation of a {@link cascading.flow.Flow}. 050 * <p/> 051 * HadoopFlow must be created through a {@link cascading.flow.tez.Hadoop2TezFlowConnector} instance. 052 * <p/> 053 * If classpath paths are provided on the {@link cascading.flow.FlowDef}, the Hadoop distributed cache mechanism will be used 054 * to augment the remote classpath. 055 * <p/> 056 * Any path elements that are relative will be uploaded to HDFS, and the HDFS URI will be used on the JobConf. Note 057 * all paths are added as "files" to the JobConf, not archives, so they aren't needlessly uncompressed cluster side. 058 * 059 * @see cascading.flow.tez.Hadoop2TezFlowConnector 060 */ 061public class Hadoop2TezFlow extends BaseFlow<TezConfiguration> 062 { 063 /** Field hdfsShutdown */ 064 private static Thread hdfsShutdown = null; 065 /** Field shutdownHook */ 066 private static ShutdownUtil.Hook shutdownHook; 067 /** Field jobConf */ 068 private transient TezConfiguration flowConf; 069 /** Field preserveTemporaryFiles */ 070 private boolean preserveTemporaryFiles = false; 071 072 private String flowStagingPath; 073 074 protected Hadoop2TezFlow() 075 { 076 } 077 078 /** 079 * Returns property preserveTemporaryFiles. 080 * 081 * @param properties of type Map 082 * @return a boolean 083 */ 084 static boolean getPreserveTemporaryFiles( Map<Object, Object> properties ) 085 { 086 return Boolean.parseBoolean( PropertyUtil.getProperty( properties, PRESERVE_TEMPORARY_FILES, "false" ) ); 087 } 088 089 static int getMaxConcurrentSteps( TezConfiguration jobConf ) 090 { 091 return jobConf.getInt( MAX_CONCURRENT_STEPS, 0 ); 092 } 093 094 public Hadoop2TezFlow( PlatformInfo platformInfo, Map<Object, Object> properties, TezConfiguration flowConf, FlowDef flowDef ) 095 { 096 super( platformInfo, properties, flowConf, flowDef ); 097 098 initFromProperties( properties ); 099 } 100 101 @Override 102 protected void initFromProperties( Map<Object, Object> properties ) 103 { 104 super.initFromProperties( properties ); 105 106 preserveTemporaryFiles = getPreserveTemporaryFiles( properties ); 107 } 108 109 protected void initConfig( Map<Object, Object> properties, TezConfiguration parentConfig ) 110 { 111 if( properties != null ) 112 parentConfig = createConfig( properties, parentConfig ); 113 114 if( parentConfig == null ) // this is ok, getJobConf will pass a default parent in 115 return; 116 117 flowConf = new TezConfiguration( parentConfig ); // prevent local values from being shared 118 flowConf.set( "fs.http.impl", HttpFileSystem.class.getName() ); 119 flowConf.set( "fs.https.impl", HttpFileSystem.class.getName() ); 120 121 UserGroupInformation.setConfiguration( flowConf ); 122 123 flowStagingPath = createStagingRoot(); 124 } 125 126 public String getFlowStagingPath() 127 { 128 if( flowStagingPath == null ) 129 flowStagingPath = createStagingRoot(); 130 131 return flowStagingPath; 132 } 133 134 private String createStagingRoot() 135 { 136 return ".staging" + Path.SEPARATOR + getID(); 137 } 138 139 @Override 140 protected void setConfigProperty( TezConfiguration config, Object key, Object value ) 141 { 142 // don't let these objects pass, even though toString is called below. 143 if( value instanceof Class || value instanceof Configuration || value == null ) 144 return; 145 146 config.set( key.toString(), value.toString() ); 147 } 148 149 @Override 150 protected TezConfiguration newConfig( TezConfiguration defaultConfig ) 151 { 152 return defaultConfig == null ? new TezConfiguration() : new TezConfiguration( defaultConfig ); 153 } 154 155 @ProcessConfiguration 156 @Override 157 public TezConfiguration getConfig() 158 { 159 if( flowConf == null ) 160 initConfig( null, new TezConfiguration() ); 161 162 return flowConf; 163 } 164 165 @Override 166 public TezConfiguration getConfigCopy() 167 { 168 return new TezConfiguration( getConfig() ); 169 } 170 171 @Override 172 public Map<Object, Object> getConfigAsProperties() 173 { 174 return HadoopUtil.createProperties( getConfig() ); 175 } 176 177 /** 178 * Method getProperty returns the value associated with the given key from the underlying properties system. 179 * 180 * @param key of type String 181 * @return String 182 */ 183 public String getProperty( String key ) 184 { 185 return getConfig().get( key ); 186 } 187 188 @Override 189 public FlowProcess<TezConfiguration> getFlowProcess() 190 { 191 return new Hadoop2TezFlowProcess( getFlowSession(), null, getConfig() ); 192 } 193 194 /** 195 * Method isPreserveTemporaryFiles returns false if temporary files will be cleaned when this Flow completes. 196 * 197 * @return the preserveTemporaryFiles (type boolean) of this Flow object. 198 */ 199 public boolean isPreserveTemporaryFiles() 200 { 201 return preserveTemporaryFiles; 202 } 203 204 @Override 205 protected void internalStart() 206 { 207 try 208 { 209 copyArtifactsToRemote(); 210 deleteSinksIfReplace(); 211 deleteTrapsIfReplace(); 212 deleteCheckpointsIfReplace(); 213 } 214 catch( IOException exception ) 215 { 216 throw new FlowException( "unable to delete sinks", exception ); 217 } 218 219 registerHadoopShutdownHook( this ); 220 } 221 222 private void copyArtifactsToRemote() 223 { 224 for( FlowStep<TezConfiguration> flowStep : getFlowSteps() ) 225 ( (Hadoop2TezFlowStep) flowStep ).syncArtifacts(); 226 } 227 228 @Override 229 public boolean stepsAreLocal() 230 { 231 return HadoopUtil.isLocal( getConfig() ); 232 } 233 234 private void cleanTemporaryFiles( boolean stop ) 235 { 236 if( stop ) // unstable to call fs operations during shutdown 237 return; 238 239 // use step config so cascading.flow.step.path property is properly used 240 for( FlowStep<TezConfiguration> step : getFlowSteps() ) 241 ( (BaseFlowStep<TezConfiguration>) step ).clean(); 242 } 243 244 private static synchronized void registerHadoopShutdownHook( Flow flow ) 245 { 246 if( !flow.isStopJobsOnExit() ) 247 return; 248 249 // guaranteed singleton here 250 if( shutdownHook != null ) 251 return; 252 253 getHdfsShutdownHook(); 254 255 shutdownHook = new ShutdownUtil.Hook() 256 { 257 @Override 258 public Priority priority() 259 { 260 return Priority.LAST; // very last thing to happen 261 } 262 263 @Override 264 public void execute() 265 { 266 callHdfsShutdownHook(); 267 } 268 }; 269 270 ShutdownUtil.addHook( shutdownHook ); 271 } 272 273 private synchronized static void callHdfsShutdownHook() 274 { 275 if( hdfsShutdown != null ) 276 hdfsShutdown.start(); 277 } 278 279 private synchronized static void getHdfsShutdownHook() 280 { 281 if( hdfsShutdown == null ) 282 hdfsShutdown = HadoopUtil.getHDFSShutdownHook(); 283 } 284 285 protected void internalClean( boolean stop ) 286 { 287 if( !isPreserveTemporaryFiles() ) 288 cleanTemporaryFiles( stop ); 289 } 290 291 protected void internalShutdown() 292 { 293 } 294 295 protected int getMaxNumParallelSteps() 296 { 297 return stepsAreLocal() ? 1 : getMaxConcurrentSteps( getConfig() ); 298 } 299 300 @Override 301 protected long getTotalSliceCPUMilliSeconds() 302 { 303 long counterValue = flowStats.getCounterValue( TaskCounter.CPU_MILLISECONDS ); 304 305 if( counterValue == 0 ) 306 return -1; 307 308 return counterValue; 309 } 310 }