001/*
002 * Copyright (c) 2016-2017 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
004 *
005 * Project and contact information: http://www.cascading.org/
006 *
007 * This file is part of the Cascading project.
008 *
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *     http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 */
021
022package cascading.flow.tez;
023
024import java.io.IOException;
025import java.util.Map;
026
027import cascading.flow.BaseFlow;
028import cascading.flow.Flow;
029import cascading.flow.FlowDef;
030import cascading.flow.FlowException;
031import cascading.flow.FlowProcess;
032import cascading.flow.FlowStep;
033import cascading.flow.hadoop.util.HadoopUtil;
034import cascading.flow.planner.BaseFlowStep;
035import cascading.flow.planner.PlatformInfo;
036import cascading.property.PropertyUtil;
037import cascading.tap.hadoop.io.HttpFileSystem;
038import cascading.util.ShutdownUtil;
039import org.apache.hadoop.conf.Configuration;
040import org.apache.hadoop.fs.Path;
041import org.apache.hadoop.security.UserGroupInformation;
042import org.apache.tez.common.counters.TaskCounter;
043import org.apache.tez.dag.api.TezConfiguration;
044import riffle.process.ProcessConfiguration;
045
046import static cascading.flow.FlowProps.MAX_CONCURRENT_STEPS;
047import static cascading.flow.FlowProps.PRESERVE_TEMPORARY_FILES;
048
049/**
050 * Class HadoopFlow is the Apache Hadoop specific implementation of a {@link cascading.flow.Flow}.
051 * <p>
052 * HadoopFlow must be created through a {@link cascading.flow.tez.Hadoop2TezFlowConnector} instance.
053 * <p>
054 * If classpath paths are provided on the {@link cascading.flow.FlowDef}, the Hadoop distributed cache mechanism will be used
055 * to augment the remote classpath.
056 * <p>
057 * Any path elements that are relative will be uploaded to HDFS, and the HDFS URI will be used on the JobConf. Note
058 * all paths are added as "files" to the JobConf, not archives, so they aren't needlessly uncompressed cluster side.
059 *
060 * @see cascading.flow.tez.Hadoop2TezFlowConnector
061 */
062public class Hadoop2TezFlow extends BaseFlow<TezConfiguration>
063  {
064  /** Field hdfsShutdown */
065  private static Thread hdfsShutdown = null;
066  /** Field shutdownHook */
067  private static ShutdownUtil.Hook shutdownHook;
068  /** Field jobConf */
069  private transient TezConfiguration flowConf;
070  /** Field preserveTemporaryFiles */
071  private boolean preserveTemporaryFiles = false;
072
073  private String flowStagingPath;
074
075  protected Hadoop2TezFlow()
076    {
077    }
078
079  /**
080   * Returns property preserveTemporaryFiles.
081   *
082   * @param properties of type Map
083   * @return a boolean
084   */
085  static boolean getPreserveTemporaryFiles( Map<Object, Object> properties )
086    {
087    return Boolean.parseBoolean( PropertyUtil.getProperty( properties, PRESERVE_TEMPORARY_FILES, "false" ) );
088    }
089
090  static int getMaxConcurrentSteps( TezConfiguration jobConf )
091    {
092    return jobConf.getInt( MAX_CONCURRENT_STEPS, 0 );
093    }
094
095  public Hadoop2TezFlow( PlatformInfo platformInfo, Map<Object, Object> properties, TezConfiguration flowConf, FlowDef flowDef )
096    {
097    super( platformInfo, properties, flowConf, flowDef );
098
099    initFromProperties( properties );
100    }
101
102  @Override
103  protected void initFromProperties( Map<Object, Object> properties )
104    {
105    super.initFromProperties( properties );
106
107    preserveTemporaryFiles = getPreserveTemporaryFiles( properties );
108    }
109
110  protected void initConfig( Map<Object, Object> properties, TezConfiguration parentConfig )
111    {
112    if( properties != null )
113      parentConfig = createConfig( properties, parentConfig );
114
115    if( parentConfig == null ) // this is ok, getJobConf will pass a default parent in
116      return;
117
118    flowConf = new TezConfiguration( parentConfig ); // prevent local values from being shared
119    flowConf.set( "fs.http.impl", HttpFileSystem.class.getName() );
120    flowConf.set( "fs.https.impl", HttpFileSystem.class.getName() );
121
122    UserGroupInformation.setConfiguration( flowConf );
123
124    flowStagingPath = createStagingRoot();
125    }
126
127  public String getFlowStagingPath()
128    {
129    if( flowStagingPath == null )
130      flowStagingPath = createStagingRoot();
131
132    return flowStagingPath;
133    }
134
135  private String createStagingRoot()
136    {
137    return ".staging" + Path.SEPARATOR + getID();
138    }
139
140  @Override
141  protected void setConfigProperty( TezConfiguration config, Object key, Object value )
142    {
143    // don't let these objects pass, even though toString is called below.
144    if( value instanceof Class || value instanceof Configuration || value == null )
145      return;
146
147    config.set( key.toString(), value.toString() );
148    }
149
150  @Override
151  protected TezConfiguration newConfig( TezConfiguration defaultConfig )
152    {
153    return defaultConfig == null ? new TezConfiguration() : new TezConfiguration( defaultConfig );
154    }
155
156  @ProcessConfiguration
157  @Override
158  public TezConfiguration getConfig()
159    {
160    if( flowConf == null )
161      initConfig( null, new TezConfiguration() );
162
163    return flowConf;
164    }
165
166  @Override
167  public TezConfiguration getConfigCopy()
168    {
169    return new TezConfiguration( getConfig() );
170    }
171
172  @Override
173  public Map<Object, Object> getConfigAsProperties()
174    {
175    return HadoopUtil.createProperties( getConfig() );
176    }
177
178  /**
179   * Method getProperty returns the value associated with the given key from the underlying properties system.
180   *
181   * @param key of type String
182   * @return String
183   */
184  public String getProperty( String key )
185    {
186    return getConfig().get( key );
187    }
188
189  @Override
190  public FlowProcess<TezConfiguration> getFlowProcess()
191    {
192    return new Hadoop2TezFlowProcess( getFlowSession(), null, getConfig() );
193    }
194
195  /**
196   * Method isPreserveTemporaryFiles returns false if temporary files will be cleaned when this Flow completes.
197   *
198   * @return the preserveTemporaryFiles (type boolean) of this Flow object.
199   */
200  public boolean isPreserveTemporaryFiles()
201    {
202    return preserveTemporaryFiles;
203    }
204
205  @Override
206  protected void internalStart()
207    {
208    try
209      {
210      copyArtifactsToRemote();
211      deleteSinksIfReplace();
212      deleteTrapsIfReplace();
213      deleteCheckpointsIfReplace();
214      }
215    catch( IOException exception )
216      {
217      throw new FlowException( "unable to delete sinks", exception );
218      }
219
220    registerHadoopShutdownHook( this );
221    }
222
223  private void copyArtifactsToRemote()
224    {
225    for( FlowStep<TezConfiguration> flowStep : getFlowSteps() )
226      ( (Hadoop2TezFlowStep) flowStep ).syncArtifacts();
227    }
228
229  @Override
230  public boolean stepsAreLocal()
231    {
232    return HadoopUtil.isLocal( getConfig() );
233    }
234
235  private void cleanTemporaryFiles( boolean stop )
236    {
237    if( stop ) // unstable to call fs operations during shutdown
238      return;
239
240    // use step config so cascading.flow.step.path property is properly used
241    for( FlowStep<TezConfiguration> step : getFlowSteps() )
242      ( (BaseFlowStep<TezConfiguration>) step ).clean();
243    }
244
245  private static synchronized void registerHadoopShutdownHook( Flow flow )
246    {
247    if( !flow.isStopJobsOnExit() )
248      return;
249
250    // guaranteed singleton here
251    if( shutdownHook != null )
252      return;
253
254    getHdfsShutdownHook();
255
256    shutdownHook = new ShutdownUtil.Hook()
257      {
258      @Override
259      public Priority priority()
260        {
261        return Priority.LAST; // very last thing to happen
262        }
263
264      @Override
265      public void execute()
266        {
267        callHdfsShutdownHook();
268        }
269      };
270
271    ShutdownUtil.addHook( shutdownHook );
272    }
273
274  private synchronized static void callHdfsShutdownHook()
275    {
276    if( hdfsShutdown != null )
277      hdfsShutdown.start();
278    }
279
280  private synchronized static void getHdfsShutdownHook()
281    {
282    if( hdfsShutdown == null )
283      hdfsShutdown = HadoopUtil.getHDFSShutdownHook();
284    }
285
286  protected void internalClean( boolean stop )
287    {
288    if( !isPreserveTemporaryFiles() )
289      cleanTemporaryFiles( stop );
290    }
291
292  protected void internalShutdown()
293    {
294    }
295
296  protected int getMaxNumParallelSteps()
297    {
298    return stepsAreLocal() ? 1 : getMaxConcurrentSteps( getConfig() );
299    }
300
301  @Override
302  protected long getTotalSliceCPUMilliSeconds()
303    {
304    long counterValue = flowStats.getCounterValue( TaskCounter.CPU_MILLISECONDS );
305
306    if( counterValue == 0 )
307      return -1;
308
309    return counterValue;
310    }
311  }