001    /*
002     * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.flow.local;
022    
023    import java.io.File;
024    import java.io.IOException;
025    import java.net.MalformedURLException;
026    import java.net.URL;
027    import java.net.URLClassLoader;
028    import java.util.Map;
029    import java.util.Properties;
030    
031    import cascading.flow.BaseFlow;
032    import cascading.flow.FlowDef;
033    import cascading.flow.FlowException;
034    import cascading.flow.FlowProcess;
035    import cascading.flow.planner.PlatformInfo;
036    
037    /**
038     * Class LocalFlow is the local mode specific implementation of a {@link cascading.flow.Flow}.
039     * <p/>
040     * LocalFlow must be created through a {@link LocalFlowConnector} instance.
041     * <p/>
042     * If classpath paths are provided on the {@link FlowDef}, the context classloader used to internally urn the current
043     * Flow will be swapped out with an URLClassLoader pointing to each element.
044     *
045     * @see LocalFlowConnector
046     */
047    public class LocalFlow extends BaseFlow<Properties>
048      {
049      private Properties config;
050    
051      public LocalFlow( PlatformInfo platformInfo, Map<Object, Object> properties, Properties config, FlowDef flowDef )
052        {
053        super( platformInfo, properties, config, flowDef );
054    
055        initFromProperties( properties );
056        }
057    
058      @Override
059      protected void initConfig( Map<Object, Object> properties, Properties parentConfig )
060        {
061        this.config = createConfig( properties, parentConfig );
062        }
063    
064      @Override
065      protected void setConfigProperty( Properties properties, Object key, Object value )
066        {
067        properties.setProperty( key.toString(), value.toString() );
068        }
069    
070      @Override
071      protected Properties newConfig( Properties defaultConfig )
072        {
073        return defaultConfig == null ? new Properties() : new Properties( defaultConfig );
074        }
075    
076      @Override
077      public Properties getConfig()
078        {
079        return config;
080        }
081    
082      @Override
083      public Properties getConfigCopy()
084        {
085        return new Properties( config );
086        }
087    
088      @Override
089      public Map<Object, Object> getConfigAsProperties()
090        {
091        return config;
092        }
093    
094      @Override
095      public String getProperty( String key )
096        {
097        return config.getProperty( key );
098        }
099    
100      @Override
101      public FlowProcess<Properties> getFlowProcess()
102        {
103        return new LocalFlowProcess( getFlowSession(), config );
104        }
105    
106      @Override
107      protected void internalStart()
108        {
109        try
110          {
111          deleteSinksIfReplace();
112          deleteTrapsIfReplace();
113          }
114        catch( IOException exception )
115          {
116          throw new FlowException( "unable to delete sinks", exception );
117          }
118        }
119    
120      @Override
121      protected Thread createFlowThread( String threadName )
122        {
123        Thread flowThread = super.createFlowThread( threadName );
124    
125        flowThread.setContextClassLoader( createClassPathClassloader( flowThread.getContextClassLoader() ) );
126    
127        return flowThread;
128        }
129    
130      private ClassLoader createClassPathClassloader( ClassLoader classLoader )
131        {
132        if( getClassPath() == null || getClassPath().isEmpty() )
133          return classLoader;
134    
135        URL[] urls = new URL[ getClassPath().size() ];
136    
137        for( int i = 0; i < getClassPath().size(); i++ )
138          {
139          String path = getClassPath().get( i );
140          File file = new File( path ).getAbsoluteFile();
141    
142          if( !file.exists() )
143            throw new FlowException( "path does not exist: " + file );
144    
145          try
146            {
147            urls[ i ] = file.toURI().toURL();
148            }
149          catch( MalformedURLException exception )
150            {
151            throw new FlowException( "bad path: " + file, exception );
152            }
153          }
154    
155        return new URLClassLoader( urls, classLoader );
156        }
157    
158      @Override
159      protected void internalClean( boolean stop )
160        {
161        }
162    
163      @Override
164      public boolean stepsAreLocal()
165        {
166        return false;
167        }
168    
169      @Override
170      protected int getMaxNumParallelSteps()
171        {
172        return 0;
173        }
174    
175      @Override
176      protected void internalShutdown()
177        {
178        }
179      }