001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.flow.hadoop;
022    
023    import java.io.IOException;
024    import java.util.Map;
025    
026    import cascading.flow.BaseFlow;
027    import cascading.flow.Flow;
028    import cascading.flow.FlowDef;
029    import cascading.flow.FlowException;
030    import cascading.flow.FlowProcess;
031    import cascading.flow.FlowStep;
032    import cascading.flow.hadoop.util.HadoopUtil;
033    import cascading.flow.planner.BaseFlowStep;
034    import cascading.flow.planner.PlatformInfo;
035    import cascading.property.PropertyUtil;
036    import cascading.tap.hadoop.io.HttpFileSystem;
037    import cascading.util.ShutdownUtil;
038    import org.apache.hadoop.fs.Path;
039    import org.apache.hadoop.mapred.JobConf;
040    
041    import static cascading.flow.FlowProps.MAX_CONCURRENT_STEPS;
042    import static cascading.flow.FlowProps.PRESERVE_TEMPORARY_FILES;
043    
044    /**
045     * Class HadoopFlow is the Apache Hadoop specific implementation of a {@link Flow}.
046     * <p/>
047     * HadoopFlow must be created through a {@link HadoopFlowConnector} instance.
048     * <p/>
049     * If classpath paths are provided on the {@link FlowDef}, the Hadoop distributed cache mechanism will be used
050     * to augment the remote classpath.
051     * <p/>
052     * Any path elements that are relative will be uploaded to HDFS, and the HDFS URI will be used on the JobConf. Note
053     * all paths are added as "files" to the JobConf, not archives, so they aren't needlessly uncompressed cluster side.
054     *
055     * @see HadoopFlowConnector
056     */
057    public class HadoopFlow extends BaseFlow<JobConf>
058      {
059      /** Field hdfsShutdown */
060      private static Thread hdfsShutdown = null;
061      /** Field shutdownHook */
062      private static ShutdownUtil.Hook shutdownHook;
063      /** Field jobConf */
064      private transient JobConf jobConf;
065      /** Field preserveTemporaryFiles */
066      private boolean preserveTemporaryFiles = false;
067      /** Field syncPaths */
068      private transient Map<Path, Path> syncPaths;
069    
070      protected HadoopFlow()
071        {
072        }
073    
074      /**
075       * Returns property preserveTemporaryFiles.
076       *
077       * @param properties of type Map
078       * @return a boolean
079       */
080      static boolean getPreserveTemporaryFiles( Map<Object, Object> properties )
081        {
082        return Boolean.parseBoolean( PropertyUtil.getProperty( properties, PRESERVE_TEMPORARY_FILES, "false" ) );
083        }
084    
085      static int getMaxConcurrentSteps( JobConf jobConf )
086        {
087        return jobConf.getInt( MAX_CONCURRENT_STEPS, 0 );
088        }
089    
090      protected HadoopFlow( PlatformInfo platformInfo, Map<Object, Object> properties, JobConf jobConf, String name )
091        {
092        super( platformInfo, properties, jobConf, name );
093        initFromProperties( properties );
094        }
095    
096      public HadoopFlow( PlatformInfo platformInfo, Map<Object, Object> properties, JobConf jobConf, FlowDef flowDef )
097        {
098        super( platformInfo, properties, jobConf, flowDef );
099    
100        initFromProperties( properties );
101        }
102    
103      @Override
104      protected void initFromProperties( Map<Object, Object> properties )
105        {
106        super.initFromProperties( properties );
107        preserveTemporaryFiles = getPreserveTemporaryFiles( properties );
108        }
109    
110      protected void initConfig( Map<Object, Object> properties, JobConf parentConfig )
111        {
112        if( properties != null )
113          parentConfig = createConfig( properties, parentConfig );
114    
115        if( parentConfig == null ) // this is ok, getJobConf will pass a default parent in
116          return;
117    
118        jobConf = HadoopUtil.copyJobConf( parentConfig ); // prevent local values from being shared
119        jobConf.set( "fs.http.impl", HttpFileSystem.class.getName() );
120        jobConf.set( "fs.https.impl", HttpFileSystem.class.getName() );
121    
122        syncPaths = HadoopUtil.addToClassPath( jobConf, getClassPath() );
123        }
124    
125      @Override
126      protected void setConfigProperty( JobConf config, Object key, Object value )
127        {
128        // don't let these objects pass, even though toString is called below.
129        if( value instanceof Class || value instanceof JobConf )
130          return;
131    
132        config.set( key.toString(), value.toString() );
133        }
134    
135      @Override
136      protected JobConf newConfig( JobConf defaultConfig )
137        {
138        return defaultConfig == null ? new JobConf() : HadoopUtil.copyJobConf( defaultConfig );
139        }
140    
141      @Override
142      public JobConf getConfig()
143        {
144        if( jobConf == null )
145          initConfig( null, new JobConf() );
146    
147        return jobConf;
148        }
149    
150      @Override
151      public JobConf getConfigCopy()
152        {
153        return HadoopUtil.copyJobConf( getConfig() );
154        }
155    
156      @Override
157      public Map<Object, Object> getConfigAsProperties()
158        {
159        return HadoopUtil.createProperties( getConfig() );
160        }
161    
162      /**
163       * Method getProperty returns the value associated with the given key from the underlying properties system.
164       *
165       * @param key of type String
166       * @return String
167       */
168      public String getProperty( String key )
169        {
170        return getConfig().get( key );
171        }
172    
173      @Override
174      public FlowProcess<JobConf> getFlowProcess()
175        {
176        return new HadoopFlowProcess( getFlowSession(), getConfig() );
177        }
178    
179      /**
180       * Method isPreserveTemporaryFiles returns false if temporary files will be cleaned when this Flow completes.
181       *
182       * @return the preserveTemporaryFiles (type boolean) of this Flow object.
183       */
184      public boolean isPreserveTemporaryFiles()
185        {
186        return preserveTemporaryFiles;
187        }
188    
189      @Override
190      protected void internalStart()
191        {
192        try
193          {
194          copyToDistributedCache();
195          deleteSinksIfReplace();
196          deleteTrapsIfReplace();
197          deleteCheckpointsIfReplace();
198          }
199        catch( IOException exception )
200          {
201          throw new FlowException( "unable to delete sinks", exception );
202          }
203    
204        registerHadoopShutdownHook( this );
205        }
206    
207      private void copyToDistributedCache()
208        {
209        HadoopUtil.syncPaths( jobConf, syncPaths );
210        }
211    
212      @Override
213      public boolean stepsAreLocal()
214        {
215        return HadoopUtil.isLocal( getConfig() );
216        }
217    
218      private void cleanTemporaryFiles( boolean stop )
219        {
220        if( stop ) // unstable to call fs operations during shutdown
221          return;
222    
223        // use step config so cascading.flow.step.path property is properly used
224        for( FlowStep<JobConf> step : getFlowSteps() )
225          ( (BaseFlowStep<JobConf>) step ).clean();
226        }
227    
228      private static synchronized void registerHadoopShutdownHook( Flow flow )
229        {
230        if( !flow.isStopJobsOnExit() )
231          return;
232    
233        // guaranteed singleton here
234        if( shutdownHook != null )
235          return;
236    
237        getHdfsShutdownHook();
238    
239        shutdownHook = new ShutdownUtil.Hook()
240        {
241        @Override
242        public Priority priority()
243          {
244          return Priority.LAST; // very last thing to happen
245          }
246    
247        @Override
248        public void execute()
249          {
250          callHdfsShutdownHook();
251          }
252        };
253    
254        ShutdownUtil.addHook( shutdownHook );
255        }
256    
257      private synchronized static void callHdfsShutdownHook()
258        {
259        if( hdfsShutdown != null )
260          hdfsShutdown.start();
261        }
262    
263      private synchronized static void getHdfsShutdownHook()
264        {
265        if( hdfsShutdown == null )
266          hdfsShutdown = HadoopUtil.getHDFSShutdownHook();
267        }
268    
269      protected void internalClean( boolean stop )
270        {
271        if( !isPreserveTemporaryFiles() )
272          cleanTemporaryFiles( stop );
273        }
274    
275      protected void internalShutdown()
276        {
277        }
278    
279      protected int getMaxNumParallelSteps()
280        {
281        return stepsAreLocal() ? 1 : getMaxConcurrentSteps( getConfig() );
282        }
283      }