001/* 002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.flow.local; 022 023import java.io.File; 024import java.io.IOException; 025import java.net.MalformedURLException; 026import java.net.URL; 027import java.net.URLClassLoader; 028import java.util.Map; 029import java.util.Properties; 030 031import cascading.flow.BaseFlow; 032import cascading.flow.FlowDef; 033import cascading.flow.FlowException; 034import cascading.flow.FlowProcess; 035import cascading.flow.planner.PlatformInfo; 036import riffle.process.ProcessConfiguration; 037 038/** 039 * Class LocalFlow is the local mode specific implementation of a {@link cascading.flow.Flow}. 040 * <p/> 041 * LocalFlow must be created through a {@link LocalFlowConnector} instance. 042 * <p/> 043 * If classpath paths are provided on the {@link FlowDef}, the context classloader used to internally urn the current 044 * Flow will be swapped out with an URLClassLoader pointing to each element. 045 * 046 * @see LocalFlowConnector 047 */ 048public class LocalFlow extends BaseFlow<Properties> 049 { 050 private Properties config; 051 private FlowProcess<Properties> flowProcess; 052 053 public LocalFlow( PlatformInfo platformInfo, Map<Object, Object> properties, Properties config, FlowDef flowDef ) 054 { 055 super( platformInfo, properties, config, flowDef ); 056 057 initFromProperties( properties ); 058 } 059 060 @Override 061 protected void initConfig( Map<Object, Object> properties, Properties parentConfig ) 062 { 063 this.config = createConfig( properties, parentConfig ); 064 this.flowProcess = new LocalFlowProcess( getFlowSession(), config ); 065 } 066 067 @Override 068 protected void setConfigProperty( Properties properties, Object key, Object value ) 069 { 070 properties.setProperty( key.toString(), value.toString() ); 071 } 072 073 @Override 074 protected Properties newConfig( Properties defaultConfig ) 075 { 076 return defaultConfig == null ? new Properties() : new Properties( defaultConfig ); 077 } 078 079 @ProcessConfiguration 080 @Override 081 public Properties getConfig() 082 { 083 return config; 084 } 085 086 @Override 087 public Properties getConfigCopy() 088 { 089 return new Properties( config ); 090 } 091 092 @Override 093 public Map<Object, Object> getConfigAsProperties() 094 { 095 return config; 096 } 097 098 @Override 099 public String getProperty( String key ) 100 { 101 return config.getProperty( key ); 102 } 103 104 @Override 105 public FlowProcess<Properties> getFlowProcess() 106 { 107 return flowProcess; 108 } 109 110 @Override 111 protected void internalStart() 112 { 113 try 114 { 115 deleteSinksIfReplace(); 116 deleteTrapsIfReplace(); 117 } 118 catch( IOException exception ) 119 { 120 throw new FlowException( "unable to delete sinks", exception ); 121 } 122 } 123 124 @Override 125 protected Thread createFlowThread( String threadName ) 126 { 127 Thread flowThread = super.createFlowThread( threadName ); 128 129 flowThread.setContextClassLoader( createClassPathClassloader( flowThread.getContextClassLoader() ) ); 130 131 return flowThread; 132 } 133 134 private ClassLoader createClassPathClassloader( ClassLoader classLoader ) 135 { 136 if( getClassPath() == null || getClassPath().isEmpty() ) 137 return classLoader; 138 139 URL[] urls = new URL[ getClassPath().size() ]; 140 141 for( int i = 0; i < getClassPath().size(); i++ ) 142 { 143 String path = getClassPath().get( i ); 144 File file = new File( path ).getAbsoluteFile(); 145 146 if( !file.exists() ) 147 throw new FlowException( "path does not exist: " + file ); 148 149 try 150 { 151 urls[ i ] = file.toURI().toURL(); 152 } 153 catch( MalformedURLException exception ) 154 { 155 throw new FlowException( "bad path: " + file, exception ); 156 } 157 } 158 159 return new URLClassLoader( urls, classLoader ); 160 } 161 162 @Override 163 protected void internalClean( boolean stop ) 164 { 165 } 166 167 @Override 168 public boolean stepsAreLocal() 169 { 170 return false; 171 } 172 173 @Override 174 protected int getMaxNumParallelSteps() 175 { 176 return 0; 177 } 178 179 @Override 180 protected void internalShutdown() 181 { 182 } 183 }