001/* 002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.management; 022 023import java.io.IOException; 024import java.io.InputStream; 025import java.net.URI; 026import java.net.URL; 027import java.net.URLClassLoader; 028import java.util.List; 029import java.util.Map; 030import java.util.Properties; 031 032import cascading.cascade.CascadeException; 033import cascading.management.state.ClientState; 034import cascading.property.PropertyUtil; 035import cascading.provider.CascadingService; 036import cascading.provider.ServiceLoader; 037import cascading.util.ShutdownUtil; 038import cascading.util.Util; 039import org.slf4j.Logger; 040import org.slf4j.LoggerFactory; 041 042/** 043 * Class CascadingServices is the root class for pluggable services Cascading can call out to for distributed 044 * monitoring and management systems. 045 * <p/> 046 * Be default all services will be loaded from the jar {@code cascading/management/service.properties} 047 * ({@link #DEFAULT_PROPERTIES}) resource is found in. If the 048 * property {@link #CONTAINER_ENABLED} value is {@code false}, a ClassLoader container will not be created. 049 * <p/> 050 * For this to work, all service implementation and dependencies must be archived into a single jar. 051 * <p/> 052 * If any packages in the jar should be excluded, set a comma delimited list of names via the {@link #CONTAINER_EXCLUDE} 053 * property. 054 * <p/> 055 * If the file {@code cascading-service.properties} ({@link CascadingServices#CASCADING_SERVICES}) is found in the 056 * CLASSPATH, the {@code cascading.management.service.jar} property value will be used to search for 057 * {@code cascading/management/service.properties} resource. 058 * 059 * @see CascadingService 060 */ 061public class CascadingServices 062 { 063 private static final Logger LOG = LoggerFactory.getLogger( CascadingServices.class ); 064 065 public static final String CASCADING_SERVICES = "cascading-service.properties"; 066 public static final String CASCADING_SERVICES_JAR = "cascading.management.service.jar"; 067 public static final String CASCADING_SERVICES_JAR_DISABLE = "cascading.management.service.jar.disable"; 068 069 public static final String DEFAULT_PROPERTIES = "cascading/management/service.properties"; 070 public static final String CONTAINER_ENABLED = "cascading.management.container.enabled"; 071 public static final String CONTAINER_EXCLUDE = "cascading.management.container.exclude"; 072 073 static Properties defaultProperties; 074 static URL libraryURL; 075 static String[] exclusions; 076 077 Map<Object, Object> properties; 078 079 MetricsService metricsService; 080 DocumentService documentService; 081 boolean enableContainer; 082 083 static 084 { 085 ClassLoader classLoader = CascadingServices.class.getClassLoader(); 086 087 // load all properties from cascading-service.properties 088 defaultProperties = loadProperties( new Properties(), CASCADING_SERVICES, classLoader ); 089 090 libraryURL = getLibraryURL(); 091 092 if( libraryURL != null ) 093 classLoader = new URLClassLoader( new URL[]{libraryURL} ); 094 095 // load additional, if only, properties from file in resource jar 096 defaultProperties = loadProperties( new Properties( defaultProperties ), DEFAULT_PROPERTIES, classLoader ); 097 098 // if resources jar is on primary classpath, find the jar so we can isolate it when loading services 099 if( libraryURL == null ) 100 libraryURL = parseLibraryURL( classLoader, DEFAULT_PROPERTIES ); 101 102 exclusions = Util.removeNulls( defaultProperties.getProperty( CONTAINER_EXCLUDE, "" ).split( "," ) ); 103 } 104 105 private static URL parseLibraryURL( ClassLoader classLoader, String resource ) 106 { 107 URL url = classLoader.getResource( resource ); 108 109 if( url != null ) 110 { 111 try 112 { 113 String path = url.toURI().getSchemeSpecificPart(); 114 int endIndex = path.lastIndexOf( '!' ); 115 116 if( endIndex != -1 ) 117 path = path.substring( 0, endIndex ); 118 119 if( path.endsWith( ".jar" ) ) 120 return new URL( path ); 121 } 122 catch( Exception exception ) 123 { 124 LOG.warn( "unable to parse resource library: {}", url, exception ); 125 } 126 } 127 128 return null; 129 } 130 131 private static URL getLibraryURL() 132 { 133 String property = defaultProperties.getProperty( CASCADING_SERVICES_JAR ); 134 135 if( property == null ) 136 return null; 137 138 String disableJar = defaultProperties.getProperty( CASCADING_SERVICES_JAR_DISABLE, System.getProperty( CASCADING_SERVICES_JAR_DISABLE, "false" ) ); 139 140 if( Boolean.valueOf( disableJar ) ) 141 { 142 LOG.info( "property '{}' is set, ignoring services jar: {}", CASCADING_SERVICES_JAR_DISABLE, property ); 143 return null; 144 } 145 146 try 147 { 148 URI uri = URI.create( property ); 149 150 if( !uri.isAbsolute() ) 151 uri = new URI( "file", uri.getAuthority(), uri.getPath(), uri.getQuery(), uri.getFragment() ); 152 153 return uri.toURL(); 154 } 155 catch( Exception exception ) 156 { 157 LOG.warn( "property: {}, has invalid URL value: {}", CASCADING_SERVICES_JAR, property ); 158 } 159 160 return null; 161 } 162 163 private static Properties loadProperties( Properties properties, String resource, ClassLoader classLoader ) 164 { 165 InputStream input = classLoader.getResourceAsStream( resource ); 166 167 try 168 { 169 if( input != null ) 170 { 171 URL url = parseLibraryURL( classLoader, resource ); 172 173 if( url != null ) 174 LOG.info( "loading properties: {}, from jar: {}", resource, url ); 175 else 176 LOG.info( "loading properties: {}, from CLASSPATH", resource ); 177 178 properties.load( input ); 179 } 180 } 181 catch( IOException exception ) 182 { 183 LOG.warn( "unable to load properties from {}", resource, exception ); 184 } 185 186 return properties; 187 } 188 189 private synchronized ServiceLoader getServiceUtil() 190 { 191 return ServiceLoader.getInstance( enableContainer ? libraryURL : null, exclusions ); 192 } 193 194 public CascadingServices( Map<Object, Object> properties ) 195 { 196 this.properties = PropertyUtil.createProperties( properties, defaultProperties ); 197 this.enableContainer = PropertyUtil.getProperty( properties, CONTAINER_ENABLED, defaultProperties.getProperty( CONTAINER_ENABLED, "false" ) ).equalsIgnoreCase( "true" ); 198 } 199 200 private Map<Object, Object> getProperties() 201 { 202 return properties; 203 } 204 205 public MetricsService getMetricsService() 206 { 207 if( metricsService == null ) 208 metricsService = createMetricsService(); 209 210 return metricsService; 211 } 212 213 public DocumentService getDocumentService() 214 { 215 if( documentService == null ) 216 documentService = createDocumentService(); 217 218 return documentService; 219 } 220 221 public ClientState createClientState( String id ) 222 { 223 ClientState clientState = (ClientState) getServiceUtil().loadServiceFrom( defaultProperties, getProperties(), ClientState.STATE_SERVICE_CLASS_PROPERTY ); 224 225 if( clientState != null ) 226 { 227 clientState.initialize( this, id ); 228 229 return clientState; 230 } 231 232 return ClientState.NULL; 233 } 234 235 protected MetricsService createMetricsService() 236 { 237 MetricsService service = (MetricsService) getServiceUtil().loadSingletonServiceFrom( defaultProperties, getProperties(), MetricsService.METRICS_SERVICE_CLASS_PROPERTY ); 238 239 if( service != null ) 240 { 241 registerShutdownHook( service ); 242 243 return service; 244 } 245 246 return new NullMetricsService(); 247 } 248 249 protected DocumentService createDocumentService() 250 { 251 DocumentService service = (DocumentService) getServiceUtil().loadSingletonServiceFrom( defaultProperties, getProperties(), DocumentService.DOCUMENT_SERVICE_CLASS_PROPERTY ); 252 253 if( service != null ) 254 { 255 registerShutdownHook( service ); 256 257 return service; 258 } 259 260 return new NullDocumentService(); 261 } 262 263 private void registerShutdownHook( final CascadingService service ) 264 { 265 if( service == null ) 266 return; 267 268 ShutdownUtil.addHook( new ShutdownUtil.Hook() 269 { 270 @Override 271 public Priority priority() 272 { 273 return Priority.SERVICE_PROVIDER; 274 } 275 276 @Override 277 public void execute() 278 { 279 try 280 { 281 service.stopService(); 282 } 283 catch( Throwable throwable ) 284 { 285 // safe to throw exception here so message is logged 286 LOG.error( "failed stopping cascading service", throwable ); 287 throw new CascadeException( "failed stopping cascading service", throwable ); 288 } 289 } 290 } ); 291 } 292 293 /** Class NullDocumentService provides a null implementation. */ 294 public static class NullDocumentService implements DocumentService 295 { 296 @Override 297 public boolean isEnabled() 298 { 299 return false; 300 } 301 302 @Override 303 public void setProperties( Map<Object, Object> properties ) 304 { 305 } 306 307 @Override 308 public void startService() 309 { 310 } 311 312 @Override 313 public void stopService() 314 { 315 } 316 317 @Override 318 public void put( String key, Object object ) 319 { 320 } 321 322 @Override 323 public void put( String type, String key, Object object ) 324 { 325 } 326 327 @Override 328 public Map get( String type, String key ) 329 { 330 return null; 331 } 332 333 @Override 334 public boolean supportsFind() 335 { 336 return false; 337 } 338 339 @Override 340 public List<Map<String, Object>> find( String type, String[] query ) 341 { 342 return null; 343 } 344 } 345 346 /** Class NullMetricsService provides a null implementation. */ 347 public static class NullMetricsService implements MetricsService 348 { 349 @Override 350 public boolean isEnabled() 351 { 352 return false; 353 } 354 355 @Override 356 public void increment( String[] context, int amount ) 357 { 358 } 359 360 @Override 361 public void set( String[] context, String value ) 362 { 363 } 364 365 @Override 366 public void set( String[] context, int value ) 367 { 368 } 369 370 @Override 371 public void set( String[] context, long value ) 372 { 373 } 374 375 @Override 376 public String getString( String[] context ) 377 { 378 return null; 379 } 380 381 @Override 382 public int getInt( String[] context ) 383 { 384 return 0; 385 } 386 387 @Override 388 public long getLong( String[] context ) 389 { 390 return 0; 391 } 392 393 @Override 394 public boolean compareSet( String[] context, String isValue, String toValue ) 395 { 396 return true; 397 } 398 399 @Override 400 public boolean compareSet( String[] context, int isValue, int toValue ) 401 { 402 return true; 403 } 404 405 @Override 406 public boolean compareSet( String[] context, long isValue, long toValue ) 407 { 408 return true; 409 } 410 411 @Override 412 public void setProperties( Map<Object, Object> properties ) 413 { 414 } 415 416 @Override 417 public void startService() 418 { 419 } 420 421 @Override 422 public void stopService() 423 { 424 } 425 } 426 }