001 /* 002 * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.management; 022 023 import java.io.IOException; 024 import java.io.InputStream; 025 import java.net.URI; 026 import java.net.URL; 027 import java.net.URLClassLoader; 028 import java.util.List; 029 import java.util.Map; 030 import java.util.Properties; 031 032 import cascading.cascade.CascadeException; 033 import cascading.management.state.ClientState; 034 import cascading.property.PropertyUtil; 035 import cascading.provider.CascadingService; 036 import cascading.provider.ServiceLoader; 037 import cascading.util.ShutdownUtil; 038 import cascading.util.Util; 039 import org.slf4j.Logger; 040 import org.slf4j.LoggerFactory; 041 042 /** 043 * Class CascadingServices is the root class for pluggable services Cascading can call out to for distributed 044 * monitoring and management systems. 045 * <p/> 046 * Be default all services will be loaded from the jar {@code cascading/management/service.properties} 047 * ({@link #DEFAULT_PROPERTIES}) resource is found in. If the 048 * property {@link #CONTAINER_ENABLED} value is {@code false}, a ClassLoader container will not be created. 049 * <p/> 050 * For this to work, all service implementation and dependencies must be archived into a single jar. 051 * <p/> 052 * If any packages in the jar should be excluded, set a comma delimited list of names via the {@link #CONTAINER_EXCLUDE} 053 * property. 054 * <p/> 055 * If the file {@code cascading-service.properties} ({@link CascadingServices#CASCADING_SERVICES}) is found in the 056 * CLASSPATH, the {@code cascading.management.service.jar} property value will be used to search for 057 * {@code cascading/management/service.properties} resource. 058 * 059 * @see CascadingService 060 */ 061 public class CascadingServices 062 { 063 private static final Logger LOG = LoggerFactory.getLogger( CascadingServices.class ); 064 065 public static final String CASCADING_SERVICES = "cascading-service.properties"; 066 public static final String CASCADING_SERVICES_JAR = "cascading.management.service.jar"; 067 068 public static final String DEFAULT_PROPERTIES = "cascading/management/service.properties"; 069 public static final String CONTAINER_ENABLED = "cascading.management.container.enabled"; 070 public static final String CONTAINER_EXCLUDE = "cascading.management.container.exclude"; 071 072 static Properties defaultProperties; 073 static URL libraryURL; 074 static String[] exclusions; 075 076 Map<Object, Object> properties; 077 078 MetricsService metricsService; 079 DocumentService documentService; 080 boolean enableContainer; 081 082 static 083 { 084 ClassLoader classLoader = CascadingServices.class.getClassLoader(); 085 086 // load all properties from cascading-services.properties 087 defaultProperties = loadProperties( new Properties(), CASCADING_SERVICES, classLoader ); 088 089 libraryURL = getLibraryURL(); 090 091 if( libraryURL != null ) 092 classLoader = new URLClassLoader( new URL[]{libraryURL} ); 093 094 // load additional, if only, properties from file in resource jar 095 defaultProperties = loadProperties( new Properties( defaultProperties ), DEFAULT_PROPERTIES, classLoader ); 096 097 // if resources jar is on primary classpath, find the jar so we can isolate it when loading services 098 if( libraryURL == null ) 099 libraryURL = parseLibraryURL( classLoader, DEFAULT_PROPERTIES ); 100 101 exclusions = Util.removeNulls( defaultProperties.getProperty( CONTAINER_EXCLUDE, "" ).split( "," ) ); 102 } 103 104 private static URL parseLibraryURL( ClassLoader classLoader, String resource ) 105 { 106 URL url = classLoader.getResource( resource ); 107 108 if( url != null ) 109 { 110 try 111 { 112 String path = url.toURI().getSchemeSpecificPart(); 113 int endIndex = path.lastIndexOf( '!' ); 114 115 if( endIndex != -1 ) 116 path = path.substring( 0, endIndex ); 117 118 if( path.endsWith( ".jar" ) ) 119 return new URL( path ); 120 } 121 catch( Exception exception ) 122 { 123 LOG.warn( "unable to parse resource library: {}", url, exception ); 124 } 125 } 126 127 return null; 128 } 129 130 private static URL getLibraryURL() 131 { 132 String property = defaultProperties.getProperty( CASCADING_SERVICES_JAR ); 133 134 if( property == null ) 135 return null; 136 137 try 138 { 139 URI uri = URI.create( property ); 140 141 if( !uri.isAbsolute() ) 142 uri = new URI( "file", uri.getAuthority(), uri.getPath(), uri.getQuery(), uri.getFragment() ); 143 144 return uri.toURL(); 145 } 146 catch( Exception exception ) 147 { 148 LOG.warn( "property: {}, has invalid URL value: {}", CASCADING_SERVICES_JAR, property ); 149 } 150 151 return null; 152 } 153 154 private static Properties loadProperties( Properties properties, String resource, ClassLoader classLoader ) 155 { 156 InputStream input = classLoader.getResourceAsStream( resource ); 157 158 try 159 { 160 if( input != null ) 161 { 162 URL url = parseLibraryURL( classLoader, resource ); 163 164 if( url != null ) 165 LOG.info( "loading properties: {}, from jar: {}", resource, url ); 166 else 167 LOG.info( "loading properties: {}", resource ); 168 169 properties.load( input ); 170 } 171 } 172 catch( IOException exception ) 173 { 174 LOG.warn( "unable to load properties from {}", resource, exception ); 175 } 176 177 return properties; 178 } 179 180 private synchronized ServiceLoader getServiceUtil() 181 { 182 return ServiceLoader.getInstance( enableContainer ? libraryURL : null, exclusions ); 183 } 184 185 public CascadingServices( Map<Object, Object> properties ) 186 { 187 this.properties = PropertyUtil.createProperties( properties, defaultProperties ); 188 this.enableContainer = PropertyUtil.getProperty( properties, CONTAINER_ENABLED, defaultProperties.getProperty( CONTAINER_ENABLED, "false" ) ).equalsIgnoreCase( "true" ); 189 } 190 191 private Map<Object, Object> getProperties() 192 { 193 return properties; 194 } 195 196 public MetricsService getMetricsService() 197 { 198 if( metricsService == null ) 199 metricsService = createMetricsService(); 200 201 return metricsService; 202 } 203 204 public DocumentService getDocumentService() 205 { 206 if( documentService == null ) 207 documentService = createDocumentService(); 208 209 return documentService; 210 } 211 212 public ClientState createClientState( String id ) 213 { 214 ClientState clientState = (ClientState) getServiceUtil().loadServiceFrom( defaultProperties, getProperties(), ClientState.STATE_SERVICE_CLASS_PROPERTY ); 215 216 if( clientState != null ) 217 { 218 clientState.initialize( this, id ); 219 220 return clientState; 221 } 222 223 return ClientState.NULL; 224 } 225 226 protected MetricsService createMetricsService() 227 { 228 MetricsService service = (MetricsService) getServiceUtil().loadSingletonServiceFrom( defaultProperties, getProperties(), MetricsService.METRICS_SERVICE_CLASS_PROPERTY ); 229 230 if( service != null ) 231 { 232 registerShutdownHook( service ); 233 234 return service; 235 } 236 237 return new NullMetricsService(); 238 } 239 240 protected DocumentService createDocumentService() 241 { 242 DocumentService service = (DocumentService) getServiceUtil().loadSingletonServiceFrom( defaultProperties, getProperties(), DocumentService.DOCUMENT_SERVICE_CLASS_PROPERTY ); 243 244 if( service != null ) 245 { 246 registerShutdownHook( service ); 247 248 return service; 249 } 250 251 return new NullDocumentService(); 252 } 253 254 private void registerShutdownHook( final CascadingService service ) 255 { 256 if( service == null ) 257 return; 258 259 ShutdownUtil.addHook( new ShutdownUtil.Hook() 260 { 261 @Override 262 public Priority priority() 263 { 264 return Priority.SERVICE_PROVIDER; 265 } 266 267 @Override 268 public void execute() 269 { 270 try 271 { 272 service.stopService(); 273 } 274 catch( Throwable throwable ) 275 { 276 // safe to throw exception here so message is logged 277 LOG.error( "failed stopping cascading service", throwable ); 278 throw new CascadeException( "failed stopping cascading service", throwable ); 279 } 280 } 281 } ); 282 } 283 284 /** Class NullDocumentService provides a null implementation. */ 285 public static class NullDocumentService implements DocumentService 286 { 287 @Override 288 public boolean isEnabled() 289 { 290 return false; 291 } 292 293 @Override 294 public void setProperties( Map<Object, Object> properties ) 295 { 296 } 297 298 @Override 299 public void startService() 300 { 301 } 302 303 @Override 304 public void stopService() 305 { 306 } 307 308 @Override 309 public void put( String key, Object object ) 310 { 311 } 312 313 @Override 314 public void put( String type, String key, Object object ) 315 { 316 } 317 318 @Override 319 public Map get( String type, String key ) 320 { 321 return null; 322 } 323 324 @Override 325 public boolean supportsFind() 326 { 327 return false; 328 } 329 330 @Override 331 public List<Map<String, Object>> find( String type, String[] query ) 332 { 333 return null; 334 } 335 } 336 337 /** Class NullMetricsService provides a null implementation. */ 338 public static class NullMetricsService implements MetricsService 339 { 340 @Override 341 public boolean isEnabled() 342 { 343 return false; 344 } 345 346 @Override 347 public void increment( String[] context, int amount ) 348 { 349 } 350 351 @Override 352 public void set( String[] context, String value ) 353 { 354 } 355 356 @Override 357 public void set( String[] context, int value ) 358 { 359 } 360 361 @Override 362 public void set( String[] context, long value ) 363 { 364 } 365 366 @Override 367 public String getString( String[] context ) 368 { 369 return null; 370 } 371 372 @Override 373 public int getInt( String[] context ) 374 { 375 return 0; 376 } 377 378 @Override 379 public long getLong( String[] context ) 380 { 381 return 0; 382 } 383 384 @Override 385 public boolean compareSet( String[] context, String isValue, String toValue ) 386 { 387 return true; 388 } 389 390 @Override 391 public boolean compareSet( String[] context, int isValue, int toValue ) 392 { 393 return true; 394 } 395 396 @Override 397 public boolean compareSet( String[] context, long isValue, long toValue ) 398 { 399 return true; 400 } 401 402 @Override 403 public void setProperties( Map<Object, Object> properties ) 404 { 405 } 406 407 @Override 408 public void startService() 409 { 410 } 411 412 @Override 413 public void stopService() 414 { 415 } 416 } 417 }