001/*
002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow.local;
022
023import java.io.File;
024import java.io.IOException;
025import java.net.MalformedURLException;
026import java.net.URL;
027import java.net.URLClassLoader;
028import java.util.Map;
029import java.util.Properties;
030
031import cascading.flow.BaseFlow;
032import cascading.flow.FlowDef;
033import cascading.flow.FlowException;
034import cascading.flow.FlowProcess;
035import cascading.flow.planner.PlatformInfo;
036import riffle.process.ProcessConfiguration;
037
038/**
039 * Class LocalFlow is the local mode specific implementation of a {@link cascading.flow.Flow}.
040 * <p/>
041 * LocalFlow must be created through a {@link LocalFlowConnector} instance.
042 * <p/>
043 * If classpath paths are provided on the {@link FlowDef}, the context classloader used to internally urn the current
044 * Flow will be swapped out with an URLClassLoader pointing to each element.
045 *
046 * @see LocalFlowConnector
047 */
048public class LocalFlow extends BaseFlow<Properties>
049  {
050  private Properties config;
051  private FlowProcess<Properties> flowProcess;
052
053  public LocalFlow( PlatformInfo platformInfo, Map<Object, Object> properties, Properties config, FlowDef flowDef )
054    {
055    super( platformInfo, properties, config, flowDef );
056
057    initFromProperties( properties );
058    }
059
060  @Override
061  protected void initConfig( Map<Object, Object> properties, Properties parentConfig )
062    {
063    this.config = createConfig( properties, parentConfig );
064    this.flowProcess = new LocalFlowProcess( getFlowSession(), config );
065    }
066
067  @Override
068  protected void setConfigProperty( Properties properties, Object key, Object value )
069    {
070    properties.setProperty( key.toString(), value.toString() );
071    }
072
073  @Override
074  protected Properties newConfig( Properties defaultConfig )
075    {
076    return defaultConfig == null ? new Properties() : new Properties( defaultConfig );
077    }
078
079  @ProcessConfiguration
080  @Override
081  public Properties getConfig()
082    {
083    return config;
084    }
085
086  @Override
087  public Properties getConfigCopy()
088    {
089    return new Properties( config );
090    }
091
092  @Override
093  public Map<Object, Object> getConfigAsProperties()
094    {
095    return config;
096    }
097
098  @Override
099  public String getProperty( String key )
100    {
101    return config.getProperty( key );
102    }
103
104  @Override
105  public FlowProcess<Properties> getFlowProcess()
106    {
107    return flowProcess;
108    }
109
110  @Override
111  protected void internalStart()
112    {
113    try
114      {
115      deleteSinksIfReplace();
116      deleteTrapsIfReplace();
117      }
118    catch( IOException exception )
119      {
120      throw new FlowException( "unable to delete sinks", exception );
121      }
122    }
123
124  @Override
125  protected Thread createFlowThread( String threadName )
126    {
127    Thread flowThread = super.createFlowThread( threadName );
128
129    flowThread.setContextClassLoader( createClassPathClassloader( flowThread.getContextClassLoader() ) );
130
131    return flowThread;
132    }
133
134  private ClassLoader createClassPathClassloader( ClassLoader classLoader )
135    {
136    if( getClassPath() == null || getClassPath().isEmpty() )
137      return classLoader;
138
139    URL[] urls = new URL[ getClassPath().size() ];
140
141    for( int i = 0; i < getClassPath().size(); i++ )
142      {
143      String path = getClassPath().get( i );
144      File file = new File( path ).getAbsoluteFile();
145
146      if( !file.exists() )
147        throw new FlowException( "path does not exist: " + file );
148
149      try
150        {
151        urls[ i ] = file.toURI().toURL();
152        }
153      catch( MalformedURLException exception )
154        {
155        throw new FlowException( "bad path: " + file, exception );
156        }
157      }
158
159    return new URLClassLoader( urls, classLoader );
160    }
161
162  @Override
163  protected void internalClean( boolean stop )
164    {
165    }
166
167  @Override
168  public boolean stepsAreLocal()
169    {
170    return false;
171    }
172
173  @Override
174  protected int getMaxNumParallelSteps()
175    {
176    return 0;
177    }
178
179  @Override
180  protected void internalShutdown()
181    {
182    }
183  }