001/* 002 * Copyright (c) 2016-2017 Chris K Wensel <chris@wensel.net>. All Rights Reserved. 003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 004 * 005 * Project and contact information: http://www.cascading.org/ 006 * 007 * This file is part of the Cascading project. 008 * 009 * Licensed under the Apache License, Version 2.0 (the "License"); 010 * you may not use this file except in compliance with the License. 011 * You may obtain a copy of the License at 012 * 013 * http://www.apache.org/licenses/LICENSE-2.0 014 * 015 * Unless required by applicable law or agreed to in writing, software 016 * distributed under the License is distributed on an "AS IS" BASIS, 017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 018 * See the License for the specific language governing permissions and 019 * limitations under the License. 020 */ 021 022package cascading.management; 023 024import java.io.IOException; 025import java.io.InputStream; 026import java.net.URI; 027import java.net.URL; 028import java.net.URLClassLoader; 029import java.util.List; 030import java.util.Map; 031import java.util.Properties; 032 033import cascading.cascade.CascadeException; 034import cascading.management.state.ClientState; 035import cascading.property.PropertyUtil; 036import cascading.provider.CascadingService; 037import cascading.provider.ServiceLoader; 038import cascading.util.ShutdownUtil; 039import cascading.util.Util; 040import org.slf4j.Logger; 041import org.slf4j.LoggerFactory; 042 043/** 044 * Class CascadingServices is the root class for pluggable services Cascading can call out to for distributed 045 * monitoring and management systems. 046 * <p> 047 * Be default all services will be loaded from the jar {@code cascading/management/service.properties} 048 * ({@link #DEFAULT_PROPERTIES}) resource is found in. If the 049 * property {@link #CONTAINER_ENABLED} value is {@code false}, a ClassLoader container will not be created. 050 * <p> 051 * For this to work, all service implementation and dependencies must be archived into a single jar. 052 * <p> 053 * If any packages in the jar should be excluded, set a comma delimited list of names via the {@link #CONTAINER_EXCLUDE} 054 * property. 055 * <p> 056 * If the file {@code cascading-service.properties} ({@link CascadingServices#CASCADING_SERVICES}) is found in the 057 * CLASSPATH, the {@code cascading.management.service.jar} property value will be used to search for 058 * {@code cascading/management/service.properties} resource. 059 * 060 * @see CascadingService 061 */ 062public class CascadingServices 063 { 064 private static final Logger LOG = LoggerFactory.getLogger( CascadingServices.class ); 065 066 public static final String CASCADING_SERVICES = "cascading-service.properties"; 067 public static final String CASCADING_SERVICES_JAR = "cascading.management.service.jar"; 068 public static final String CASCADING_SERVICES_JAR_DISABLE = "cascading.management.service.jar.disable"; 069 070 public static final String DEFAULT_PROPERTIES = "cascading/management/service.properties"; 071 public static final String CONTAINER_ENABLED = "cascading.management.container.enabled"; 072 public static final String CONTAINER_EXCLUDE = "cascading.management.container.exclude"; 073 074 static Properties defaultProperties; 075 static URL libraryURL; 076 static String[] exclusions; 077 078 Map<Object, Object> properties; 079 080 MetricsService metricsService; 081 DocumentService documentService; 082 boolean enableContainer; 083 084 static 085 { 086 ClassLoader classLoader = CascadingServices.class.getClassLoader(); 087 088 // load all properties from cascading-service.properties 089 defaultProperties = loadProperties( new Properties(), CASCADING_SERVICES, classLoader ); 090 091 libraryURL = getLibraryURL(); 092 093 if( libraryURL != null ) 094 classLoader = new URLClassLoader( new URL[]{libraryURL} ); 095 096 // load additional, if only, properties from file in resource jar 097 defaultProperties = loadProperties( new Properties( defaultProperties ), DEFAULT_PROPERTIES, classLoader ); 098 099 // if resources jar is on primary classpath, find the jar so we can isolate it when loading services 100 if( libraryURL == null ) 101 libraryURL = parseLibraryURL( classLoader, DEFAULT_PROPERTIES ); 102 103 exclusions = Util.removeNulls( defaultProperties.getProperty( CONTAINER_EXCLUDE, "" ).split( "," ) ); 104 } 105 106 private static URL parseLibraryURL( ClassLoader classLoader, String resource ) 107 { 108 URL url = classLoader.getResource( resource ); 109 110 if( url != null ) 111 { 112 try 113 { 114 String path = url.toURI().getSchemeSpecificPart(); 115 int endIndex = path.lastIndexOf( '!' ); 116 117 if( endIndex != -1 ) 118 path = path.substring( 0, endIndex ); 119 120 if( path.endsWith( ".jar" ) ) 121 return new URL( path ); 122 } 123 catch( Exception exception ) 124 { 125 LOG.warn( "unable to parse resource library: {}", url, exception ); 126 } 127 } 128 129 return null; 130 } 131 132 private static URL getLibraryURL() 133 { 134 String property = defaultProperties.getProperty( CASCADING_SERVICES_JAR ); 135 136 if( property == null ) 137 return null; 138 139 String disableJar = defaultProperties.getProperty( CASCADING_SERVICES_JAR_DISABLE, System.getProperty( CASCADING_SERVICES_JAR_DISABLE, "false" ) ); 140 141 if( Boolean.valueOf( disableJar ) ) 142 { 143 LOG.info( "property '{}' is set, ignoring services jar: {}", CASCADING_SERVICES_JAR_DISABLE, property ); 144 return null; 145 } 146 147 try 148 { 149 URI uri = URI.create( property ); 150 151 if( !uri.isAbsolute() ) 152 uri = new URI( "file", uri.getAuthority(), uri.getPath(), uri.getQuery(), uri.getFragment() ); 153 154 return uri.toURL(); 155 } 156 catch( Exception exception ) 157 { 158 LOG.warn( "property: {}, has invalid URL value: {}", CASCADING_SERVICES_JAR, property ); 159 } 160 161 return null; 162 } 163 164 private static Properties loadProperties( Properties properties, String resource, ClassLoader classLoader ) 165 { 166 InputStream input = classLoader.getResourceAsStream( resource ); 167 168 try 169 { 170 if( input != null ) 171 { 172 URL url = parseLibraryURL( classLoader, resource ); 173 174 if( url != null ) 175 LOG.info( "loading properties: {}, from jar: {}", resource, url ); 176 else 177 LOG.info( "loading properties: {}, from CLASSPATH", resource ); 178 179 properties.load( input ); 180 } 181 } 182 catch( IOException exception ) 183 { 184 LOG.warn( "unable to load properties from {}", resource, exception ); 185 } 186 187 return properties; 188 } 189 190 private synchronized ServiceLoader getServiceUtil() 191 { 192 return ServiceLoader.getInstance( enableContainer ? libraryURL : null, exclusions ); 193 } 194 195 public CascadingServices( Map<Object, Object> properties ) 196 { 197 this.properties = PropertyUtil.createProperties( properties, defaultProperties ); 198 this.enableContainer = PropertyUtil.getProperty( properties, CONTAINER_ENABLED, defaultProperties.getProperty( CONTAINER_ENABLED, "false" ) ).equalsIgnoreCase( "true" ); 199 } 200 201 private Map<Object, Object> getProperties() 202 { 203 return properties; 204 } 205 206 public MetricsService getMetricsService() 207 { 208 if( metricsService == null ) 209 metricsService = createMetricsService(); 210 211 return metricsService; 212 } 213 214 public DocumentService getDocumentService() 215 { 216 if( documentService == null ) 217 documentService = createDocumentService(); 218 219 return documentService; 220 } 221 222 public ClientState createClientState( String id ) 223 { 224 ClientState clientState = (ClientState) getServiceUtil().loadServiceFrom( defaultProperties, getProperties(), ClientState.STATE_SERVICE_CLASS_PROPERTY ); 225 226 if( clientState != null ) 227 { 228 clientState.initialize( this, id ); 229 230 return clientState; 231 } 232 233 return ClientState.NULL; 234 } 235 236 protected MetricsService createMetricsService() 237 { 238 MetricsService service = (MetricsService) getServiceUtil().loadSingletonServiceFrom( defaultProperties, getProperties(), MetricsService.METRICS_SERVICE_CLASS_PROPERTY ); 239 240 if( service != null ) 241 { 242 registerShutdownHook( service ); 243 244 return service; 245 } 246 247 return new NullMetricsService(); 248 } 249 250 protected DocumentService createDocumentService() 251 { 252 DocumentService service = (DocumentService) getServiceUtil().loadSingletonServiceFrom( defaultProperties, getProperties(), DocumentService.DOCUMENT_SERVICE_CLASS_PROPERTY ); 253 254 if( service != null ) 255 { 256 registerShutdownHook( service ); 257 258 return service; 259 } 260 261 return new NullDocumentService(); 262 } 263 264 private void registerShutdownHook( final CascadingService service ) 265 { 266 if( service == null ) 267 return; 268 269 ShutdownUtil.addHook( new ShutdownUtil.Hook() 270 { 271 @Override 272 public Priority priority() 273 { 274 return Priority.SERVICE_PROVIDER; 275 } 276 277 @Override 278 public void execute() 279 { 280 try 281 { 282 service.stopService(); 283 } 284 catch( Throwable throwable ) 285 { 286 // safe to throw exception here so message is logged 287 LOG.error( "failed stopping cascading service", throwable ); 288 throw new CascadeException( "failed stopping cascading service", throwable ); 289 } 290 } 291 } ); 292 } 293 294 /** Class NullDocumentService provides a null implementation. */ 295 public static class NullDocumentService implements DocumentService 296 { 297 @Override 298 public boolean isEnabled() 299 { 300 return false; 301 } 302 303 @Override 304 public void setProperties( Map<Object, Object> properties ) 305 { 306 } 307 308 @Override 309 public void startService() 310 { 311 } 312 313 @Override 314 public void stopService() 315 { 316 } 317 318 @Override 319 public void put( String key, Object object ) 320 { 321 } 322 323 @Override 324 public void put( String type, String key, Object object ) 325 { 326 } 327 328 @Override 329 public Map get( String type, String key ) 330 { 331 return null; 332 } 333 334 @Override 335 public boolean supportsFind() 336 { 337 return false; 338 } 339 340 @Override 341 public List<Map<String, Object>> find( String type, String[] query ) 342 { 343 return null; 344 } 345 } 346 347 /** Class NullMetricsService provides a null implementation. */ 348 public static class NullMetricsService implements MetricsService 349 { 350 @Override 351 public boolean isEnabled() 352 { 353 return false; 354 } 355 356 @Override 357 public void increment( String[] context, int amount ) 358 { 359 } 360 361 @Override 362 public void set( String[] context, String value ) 363 { 364 } 365 366 @Override 367 public void set( String[] context, int value ) 368 { 369 } 370 371 @Override 372 public void set( String[] context, long value ) 373 { 374 } 375 376 @Override 377 public String getString( String[] context ) 378 { 379 return null; 380 } 381 382 @Override 383 public int getInt( String[] context ) 384 { 385 return 0; 386 } 387 388 @Override 389 public long getLong( String[] context ) 390 { 391 return 0; 392 } 393 394 @Override 395 public boolean compareSet( String[] context, String isValue, String toValue ) 396 { 397 return true; 398 } 399 400 @Override 401 public boolean compareSet( String[] context, int isValue, int toValue ) 402 { 403 return true; 404 } 405 406 @Override 407 public boolean compareSet( String[] context, long isValue, long toValue ) 408 { 409 return true; 410 } 411 412 @Override 413 public void setProperties( Map<Object, Object> properties ) 414 { 415 } 416 417 @Override 418 public void startService() 419 { 420 } 421 422 @Override 423 public void stopService() 424 { 425 } 426 } 427 }