001/*
002 * Copyright (c) 2016-2017 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
004 *
005 * Project and contact information: http://www.cascading.org/
006 *
007 * This file is part of the Cascading project.
008 *
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *     http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 */
021
022package cascading.management;
023
024import java.io.IOException;
025import java.io.InputStream;
026import java.net.URI;
027import java.net.URL;
028import java.net.URLClassLoader;
029import java.util.List;
030import java.util.Map;
031import java.util.Properties;
032
033import cascading.cascade.CascadeException;
034import cascading.management.state.ClientState;
035import cascading.property.PropertyUtil;
036import cascading.provider.CascadingService;
037import cascading.provider.ServiceLoader;
038import cascading.util.ShutdownUtil;
039import cascading.util.Util;
040import org.slf4j.Logger;
041import org.slf4j.LoggerFactory;
042
043/**
044 * Class CascadingServices is the root class for pluggable services Cascading can call out to for distributed
045 * monitoring and management systems.
046 * <p>
047 * Be default all services will be loaded from the jar {@code cascading/management/service.properties}
048 * ({@link #DEFAULT_PROPERTIES}) resource is found in. If the
049 * property {@link #CONTAINER_ENABLED} value is {@code false}, a ClassLoader container will not be created.
050 * <p>
051 * For this to work, all service implementation and dependencies must be archived into a single jar.
052 * <p>
053 * If any packages in the jar should be excluded, set a comma delimited list of names via the {@link #CONTAINER_EXCLUDE}
054 * property.
055 * <p>
056 * If the file {@code cascading-service.properties} ({@link CascadingServices#CASCADING_SERVICES}) is found in the
057 * CLASSPATH, the {@code cascading.management.service.jar} property value will be used to search for
058 * {@code cascading/management/service.properties} resource.
059 *
060 * @see CascadingService
061 */
062public class CascadingServices
063  {
064  private static final Logger LOG = LoggerFactory.getLogger( CascadingServices.class );
065
066  public static final String CASCADING_SERVICES = "cascading-service.properties";
067  public static final String CASCADING_SERVICES_JAR = "cascading.management.service.jar";
068  public static final String CASCADING_SERVICES_JAR_DISABLE = "cascading.management.service.jar.disable";
069
070  public static final String DEFAULT_PROPERTIES = "cascading/management/service.properties";
071  public static final String CONTAINER_ENABLED = "cascading.management.container.enabled";
072  public static final String CONTAINER_EXCLUDE = "cascading.management.container.exclude";
073
074  static Properties defaultProperties;
075  static URL libraryURL;
076  static String[] exclusions;
077
078  Map<Object, Object> properties;
079
080  MetricsService metricsService;
081  DocumentService documentService;
082  boolean enableContainer;
083
084  static
085    {
086    ClassLoader classLoader = CascadingServices.class.getClassLoader();
087
088    // load all properties from cascading-service.properties
089    defaultProperties = loadProperties( new Properties(), CASCADING_SERVICES, classLoader );
090
091    libraryURL = getLibraryURL();
092
093    if( libraryURL != null )
094      classLoader = new URLClassLoader( new URL[]{libraryURL} );
095
096    // load additional, if only, properties from file in resource jar
097    defaultProperties = loadProperties( new Properties( defaultProperties ), DEFAULT_PROPERTIES, classLoader );
098
099    // if resources jar is on primary classpath, find the jar so we can isolate it when loading services
100    if( libraryURL == null )
101      libraryURL = parseLibraryURL( classLoader, DEFAULT_PROPERTIES );
102
103    exclusions = Util.removeNulls( defaultProperties.getProperty( CONTAINER_EXCLUDE, "" ).split( "," ) );
104    }
105
106  private static URL parseLibraryURL( ClassLoader classLoader, String resource )
107    {
108    URL url = classLoader.getResource( resource );
109
110    if( url != null )
111      {
112      try
113        {
114        String path = url.toURI().getSchemeSpecificPart();
115        int endIndex = path.lastIndexOf( '!' );
116
117        if( endIndex != -1 )
118          path = path.substring( 0, endIndex );
119
120        if( path.endsWith( ".jar" ) )
121          return new URL( path );
122        }
123      catch( Exception exception )
124        {
125        LOG.warn( "unable to parse resource library: {}", url, exception );
126        }
127      }
128
129    return null;
130    }
131
132  private static URL getLibraryURL()
133    {
134    String property = defaultProperties.getProperty( CASCADING_SERVICES_JAR );
135
136    if( property == null )
137      return null;
138
139    String disableJar = defaultProperties.getProperty( CASCADING_SERVICES_JAR_DISABLE, System.getProperty( CASCADING_SERVICES_JAR_DISABLE, "false" ) );
140
141    if( Boolean.valueOf( disableJar ) )
142      {
143      LOG.info( "property '{}' is set, ignoring services jar: {}", CASCADING_SERVICES_JAR_DISABLE, property );
144      return null;
145      }
146
147    try
148      {
149      URI uri = URI.create( property );
150
151      if( !uri.isAbsolute() )
152        uri = new URI( "file", uri.getAuthority(), uri.getPath(), uri.getQuery(), uri.getFragment() );
153
154      return uri.toURL();
155      }
156    catch( Exception exception )
157      {
158      LOG.warn( "property: {}, has invalid URL value: {}", CASCADING_SERVICES_JAR, property );
159      }
160
161    return null;
162    }
163
164  private static Properties loadProperties( Properties properties, String resource, ClassLoader classLoader )
165    {
166    InputStream input = classLoader.getResourceAsStream( resource );
167
168    try
169      {
170      if( input != null )
171        {
172        URL url = parseLibraryURL( classLoader, resource );
173
174        if( url != null )
175          LOG.info( "loading properties: {}, from jar: {}", resource, url );
176        else
177          LOG.info( "loading properties: {}, from CLASSPATH", resource );
178
179        properties.load( input );
180        }
181      }
182    catch( IOException exception )
183      {
184      LOG.warn( "unable to load properties from {}", resource, exception );
185      }
186
187    return properties;
188    }
189
190  private synchronized ServiceLoader getServiceUtil()
191    {
192    return ServiceLoader.getInstance( enableContainer ? libraryURL : null, exclusions );
193    }
194
195  public CascadingServices( Map<Object, Object> properties )
196    {
197    this.properties = PropertyUtil.createProperties( properties, defaultProperties );
198    this.enableContainer = PropertyUtil.getProperty( properties, CONTAINER_ENABLED, defaultProperties.getProperty( CONTAINER_ENABLED, "false" ) ).equalsIgnoreCase( "true" );
199    }
200
201  private Map<Object, Object> getProperties()
202    {
203    return properties;
204    }
205
206  public MetricsService getMetricsService()
207    {
208    if( metricsService == null )
209      metricsService = createMetricsService();
210
211    return metricsService;
212    }
213
214  public DocumentService getDocumentService()
215    {
216    if( documentService == null )
217      documentService = createDocumentService();
218
219    return documentService;
220    }
221
222  public ClientState createClientState( String id )
223    {
224    ClientState clientState = (ClientState) getServiceUtil().loadServiceFrom( defaultProperties, getProperties(), ClientState.STATE_SERVICE_CLASS_PROPERTY );
225
226    if( clientState != null )
227      {
228      clientState.initialize( this, id );
229
230      return clientState;
231      }
232
233    return ClientState.NULL;
234    }
235
236  protected MetricsService createMetricsService()
237    {
238    MetricsService service = (MetricsService) getServiceUtil().loadSingletonServiceFrom( defaultProperties, getProperties(), MetricsService.METRICS_SERVICE_CLASS_PROPERTY );
239
240    if( service != null )
241      {
242      registerShutdownHook( service );
243
244      return service;
245      }
246
247    return new NullMetricsService();
248    }
249
250  protected DocumentService createDocumentService()
251    {
252    DocumentService service = (DocumentService) getServiceUtil().loadSingletonServiceFrom( defaultProperties, getProperties(), DocumentService.DOCUMENT_SERVICE_CLASS_PROPERTY );
253
254    if( service != null )
255      {
256      registerShutdownHook( service );
257
258      return service;
259      }
260
261    return new NullDocumentService();
262    }
263
264  private void registerShutdownHook( final CascadingService service )
265    {
266    if( service == null )
267      return;
268
269    ShutdownUtil.addHook( new ShutdownUtil.Hook()
270      {
271      @Override
272      public Priority priority()
273        {
274        return Priority.SERVICE_PROVIDER;
275        }
276
277      @Override
278      public void execute()
279        {
280        try
281          {
282          service.stopService();
283          }
284        catch( Throwable throwable )
285          {
286          // safe to throw exception here so message is logged
287          LOG.error( "failed stopping cascading service", throwable );
288          throw new CascadeException( "failed stopping cascading service", throwable );
289          }
290        }
291      } );
292    }
293
294  /** Class NullDocumentService provides a null implementation. */
295  public static class NullDocumentService implements DocumentService
296    {
297    @Override
298    public boolean isEnabled()
299      {
300      return false;
301      }
302
303    @Override
304    public void setProperties( Map<Object, Object> properties )
305      {
306      }
307
308    @Override
309    public void startService()
310      {
311      }
312
313    @Override
314    public void stopService()
315      {
316      }
317
318    @Override
319    public void put( String key, Object object )
320      {
321      }
322
323    @Override
324    public void put( String type, String key, Object object )
325      {
326      }
327
328    @Override
329    public Map get( String type, String key )
330      {
331      return null;
332      }
333
334    @Override
335    public boolean supportsFind()
336      {
337      return false;
338      }
339
340    @Override
341    public List<Map<String, Object>> find( String type, String[] query )
342      {
343      return null;
344      }
345    }
346
347  /** Class NullMetricsService provides a null implementation. */
348  public static class NullMetricsService implements MetricsService
349    {
350    @Override
351    public boolean isEnabled()
352      {
353      return false;
354      }
355
356    @Override
357    public void increment( String[] context, int amount )
358      {
359      }
360
361    @Override
362    public void set( String[] context, String value )
363      {
364      }
365
366    @Override
367    public void set( String[] context, int value )
368      {
369      }
370
371    @Override
372    public void set( String[] context, long value )
373      {
374      }
375
376    @Override
377    public String getString( String[] context )
378      {
379      return null;
380      }
381
382    @Override
383    public int getInt( String[] context )
384      {
385      return 0;
386      }
387
388    @Override
389    public long getLong( String[] context )
390      {
391      return 0;
392      }
393
394    @Override
395    public boolean compareSet( String[] context, String isValue, String toValue )
396      {
397      return true;
398      }
399
400    @Override
401    public boolean compareSet( String[] context, int isValue, int toValue )
402      {
403      return true;
404      }
405
406    @Override
407    public boolean compareSet( String[] context, long isValue, long toValue )
408      {
409      return true;
410      }
411
412    @Override
413    public void setProperties( Map<Object, Object> properties )
414      {
415      }
416
417    @Override
418    public void startService()
419      {
420      }
421
422    @Override
423    public void stopService()
424      {
425      }
426    }
427  }