001 /* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.flow.local; 022 023 import java.io.File; 024 import java.io.IOException; 025 import java.net.MalformedURLException; 026 import java.net.URL; 027 import java.net.URLClassLoader; 028 import java.util.Map; 029 import java.util.Properties; 030 031 import cascading.flow.BaseFlow; 032 import cascading.flow.FlowDef; 033 import cascading.flow.FlowException; 034 import cascading.flow.FlowProcess; 035 import cascading.flow.planner.PlatformInfo; 036 037 /** 038 * Class LocalFlow is the local mode specific implementation of a {@link cascading.flow.Flow}. 039 * <p/> 040 * LocalFlow must be created through a {@link LocalFlowConnector} instance. 041 * <p/> 042 * If classpath paths are provided on the {@link FlowDef}, the context classloader used to internally urn the current 043 * Flow will be swapped out with an URLClassLoader pointing to each element. 044 * 045 * @see LocalFlowConnector 046 */ 047 public class LocalFlow extends BaseFlow<Properties> 048 { 049 private Properties config; 050 051 public LocalFlow( PlatformInfo platformInfo, Map<Object, Object> properties, Properties config, FlowDef flowDef ) 052 { 053 super( platformInfo, properties, config, flowDef ); 054 055 initFromProperties( properties ); 056 } 057 058 @Override 059 protected void initConfig( Map<Object, Object> properties, Properties parentConfig ) 060 { 061 this.config = createConfig( properties, parentConfig ); 062 } 063 064 @Override 065 protected void setConfigProperty( Properties properties, Object key, Object value ) 066 { 067 properties.setProperty( key.toString(), value.toString() ); 068 } 069 070 @Override 071 protected Properties newConfig( Properties defaultConfig ) 072 { 073 return defaultConfig == null ? new Properties() : new Properties( defaultConfig ); 074 } 075 076 @Override 077 public Properties getConfig() 078 { 079 return config; 080 } 081 082 @Override 083 public Properties getConfigCopy() 084 { 085 return new Properties( config ); 086 } 087 088 @Override 089 public Map<Object, Object> getConfigAsProperties() 090 { 091 return config; 092 } 093 094 @Override 095 public String getProperty( String key ) 096 { 097 return config.getProperty( key ); 098 } 099 100 @Override 101 public FlowProcess<Properties> getFlowProcess() 102 { 103 return new LocalFlowProcess( getFlowSession(), config ); 104 } 105 106 @Override 107 protected void internalStart() 108 { 109 try 110 { 111 deleteSinksIfReplace(); 112 deleteTrapsIfReplace(); 113 } 114 catch( IOException exception ) 115 { 116 throw new FlowException( "unable to delete sinks", exception ); 117 } 118 } 119 120 @Override 121 protected Thread createFlowThread( String threadName ) 122 { 123 Thread flowThread = super.createFlowThread( threadName ); 124 125 flowThread.setContextClassLoader( createClassPathClassloader( flowThread.getContextClassLoader() ) ); 126 127 return flowThread; 128 } 129 130 private ClassLoader createClassPathClassloader( ClassLoader classLoader ) 131 { 132 if( getClassPath() == null || getClassPath().isEmpty() ) 133 return classLoader; 134 135 URL[] urls = new URL[ getClassPath().size() ]; 136 137 for( int i = 0; i < getClassPath().size(); i++ ) 138 { 139 String path = getClassPath().get( i ); 140 File file = new File( path ).getAbsoluteFile(); 141 142 if( !file.exists() ) 143 throw new FlowException( "path does not exist: " + file ); 144 145 try 146 { 147 urls[ i ] = file.toURI().toURL(); 148 } 149 catch( MalformedURLException exception ) 150 { 151 throw new FlowException( "bad path: " + file, exception ); 152 } 153 } 154 155 return new URLClassLoader( urls, classLoader ); 156 } 157 158 @Override 159 protected void internalClean( boolean stop ) 160 { 161 } 162 163 @Override 164 public boolean stepsAreLocal() 165 { 166 return false; 167 } 168 169 @Override 170 protected int getMaxNumParallelSteps() 171 { 172 return 0; 173 } 174 175 @Override 176 protected void internalShutdown() 177 { 178 } 179 }