001    /*
002     * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.management;
022    
023    import java.io.IOException;
024    import java.io.InputStream;
025    import java.net.URI;
026    import java.net.URL;
027    import java.net.URLClassLoader;
028    import java.util.List;
029    import java.util.Map;
030    import java.util.Properties;
031    
032    import cascading.cascade.CascadeException;
033    import cascading.management.state.ClientState;
034    import cascading.property.PropertyUtil;
035    import cascading.provider.CascadingService;
036    import cascading.provider.ServiceLoader;
037    import cascading.util.ShutdownUtil;
038    import cascading.util.Util;
039    import org.slf4j.Logger;
040    import org.slf4j.LoggerFactory;
041    
042    /**
043     * Class CascadingServices is the root class for pluggable services Cascading can call out to for distributed
044     * monitoring and management systems.
045     * <p/>
046     * Be default all services will be loaded from the jar {@code cascading/management/service.properties}
047     * ({@link #DEFAULT_PROPERTIES}) resource is found in. If the
048     * property {@link #CONTAINER_ENABLED} value is {@code false}, a ClassLoader container will not be created.
049     * <p/>
050     * For this to work, all service implementation and dependencies must be archived into a single jar.
051     * <p/>
052     * If any packages in the jar should be excluded, set a comma delimited list of names via the {@link #CONTAINER_EXCLUDE}
053     * property.
054     * <p/>
055     * If the file {@code cascading-service.properties} ({@link CascadingServices#CASCADING_SERVICES}) is found in the
056     * CLASSPATH, the {@code cascading.management.service.jar} property value will be used to search for
057     * {@code cascading/management/service.properties} resource.
058     *
059     * @see CascadingService
060     */
061    public class CascadingServices
062      {
063      private static final Logger LOG = LoggerFactory.getLogger( CascadingServices.class );
064    
065      public static final String CASCADING_SERVICES = "cascading-service.properties";
066      public static final String CASCADING_SERVICES_JAR = "cascading.management.service.jar";
067    
068      public static final String DEFAULT_PROPERTIES = "cascading/management/service.properties";
069      public static final String CONTAINER_ENABLED = "cascading.management.container.enabled";
070      public static final String CONTAINER_EXCLUDE = "cascading.management.container.exclude";
071    
072      static Properties defaultProperties;
073      static URL libraryURL;
074      static String[] exclusions;
075    
076      Map<Object, Object> properties;
077    
078      MetricsService metricsService;
079      DocumentService documentService;
080      boolean enableContainer;
081    
082      static
083        {
084        ClassLoader classLoader = CascadingServices.class.getClassLoader();
085    
086        // load all properties from cascading-services.properties
087        defaultProperties = loadProperties( new Properties(), CASCADING_SERVICES, classLoader );
088    
089        libraryURL = getLibraryURL();
090    
091        if( libraryURL != null )
092          classLoader = new URLClassLoader( new URL[]{libraryURL} );
093    
094        // load additional, if only, properties from file in resource jar
095        defaultProperties = loadProperties( new Properties( defaultProperties ), DEFAULT_PROPERTIES, classLoader );
096    
097        // if resources jar is on primary classpath, find the jar so we can isolate it when loading services
098        if( libraryURL == null )
099          libraryURL = parseLibraryURL( classLoader, DEFAULT_PROPERTIES );
100    
101        exclusions = Util.removeNulls( defaultProperties.getProperty( CONTAINER_EXCLUDE, "" ).split( "," ) );
102        }
103    
104      private static URL parseLibraryURL( ClassLoader classLoader, String resource )
105        {
106        URL url = classLoader.getResource( resource );
107    
108        if( url != null )
109          {
110          try
111            {
112            String path = url.toURI().getSchemeSpecificPart();
113            int endIndex = path.lastIndexOf( '!' );
114    
115            if( endIndex != -1 )
116              path = path.substring( 0, endIndex );
117    
118            if( path.endsWith( ".jar" ) )
119              return new URL( path );
120            }
121          catch( Exception exception )
122            {
123            LOG.warn( "unable to parse resource library: {}", url, exception );
124            }
125          }
126    
127        return null;
128        }
129    
130      private static URL getLibraryURL()
131        {
132        String property = defaultProperties.getProperty( CASCADING_SERVICES_JAR );
133    
134        if( property == null )
135          return null;
136    
137        try
138          {
139          URI uri = URI.create( property );
140    
141          if( !uri.isAbsolute() )
142            uri = new URI( "file", uri.getAuthority(), uri.getPath(), uri.getQuery(), uri.getFragment() );
143    
144          return uri.toURL();
145          }
146        catch( Exception exception )
147          {
148          LOG.warn( "property: {}, has invalid URL value: {}", CASCADING_SERVICES_JAR, property );
149          }
150    
151        return null;
152        }
153    
154      private static Properties loadProperties( Properties properties, String resource, ClassLoader classLoader )
155        {
156        InputStream input = classLoader.getResourceAsStream( resource );
157    
158        try
159          {
160          if( input != null )
161            {
162            URL url = parseLibraryURL( classLoader, resource );
163    
164            if( url != null )
165              LOG.info( "loading properties: {}, from jar: {}", resource, url );
166            else
167              LOG.info( "loading properties: {}", resource );
168    
169            properties.load( input );
170            }
171          }
172        catch( IOException exception )
173          {
174          LOG.warn( "unable to load properties from {}", resource, exception );
175          }
176    
177        return properties;
178        }
179    
180      private synchronized ServiceLoader getServiceUtil()
181        {
182        return ServiceLoader.getInstance( enableContainer ? libraryURL : null, exclusions );
183        }
184    
185      public CascadingServices( Map<Object, Object> properties )
186        {
187        this.properties = PropertyUtil.createProperties( properties, defaultProperties );
188        this.enableContainer = PropertyUtil.getProperty( properties, CONTAINER_ENABLED, defaultProperties.getProperty( CONTAINER_ENABLED, "false" ) ).equalsIgnoreCase( "true" );
189        }
190    
191      private Map<Object, Object> getProperties()
192        {
193        return properties;
194        }
195    
196      public MetricsService getMetricsService()
197        {
198        if( metricsService == null )
199          metricsService = createMetricsService();
200    
201        return metricsService;
202        }
203    
204      public DocumentService getDocumentService()
205        {
206        if( documentService == null )
207          documentService = createDocumentService();
208    
209        return documentService;
210        }
211    
212      public ClientState createClientState( String id )
213        {
214        ClientState clientState = (ClientState) getServiceUtil().loadServiceFrom( defaultProperties, getProperties(), ClientState.STATE_SERVICE_CLASS_PROPERTY );
215    
216        if( clientState != null )
217          {
218          clientState.initialize( this, id );
219    
220          return clientState;
221          }
222    
223        return ClientState.NULL;
224        }
225    
226      protected MetricsService createMetricsService()
227        {
228        MetricsService service = (MetricsService) getServiceUtil().loadSingletonServiceFrom( defaultProperties, getProperties(), MetricsService.METRICS_SERVICE_CLASS_PROPERTY );
229    
230        if( service != null )
231          {
232          registerShutdownHook( service );
233    
234          return service;
235          }
236    
237        return new NullMetricsService();
238        }
239    
240      protected DocumentService createDocumentService()
241        {
242        DocumentService service = (DocumentService) getServiceUtil().loadSingletonServiceFrom( defaultProperties, getProperties(), DocumentService.DOCUMENT_SERVICE_CLASS_PROPERTY );
243    
244        if( service != null )
245          {
246          registerShutdownHook( service );
247    
248          return service;
249          }
250    
251        return new NullDocumentService();
252        }
253    
254      private void registerShutdownHook( final CascadingService service )
255        {
256        if( service == null )
257          return;
258    
259        ShutdownUtil.addHook( new ShutdownUtil.Hook()
260        {
261        @Override
262        public Priority priority()
263          {
264          return Priority.SERVICE_PROVIDER;
265          }
266    
267        @Override
268        public void execute()
269          {
270          try
271            {
272            service.stopService();
273            }
274          catch( Throwable throwable )
275            {
276            // safe to throw exception here so message is logged
277            LOG.error( "failed stopping cascading service", throwable );
278            throw new CascadeException( "failed stopping cascading service", throwable );
279            }
280          }
281        } );
282        }
283    
284      /** Class NullDocumentService provides a null implementation. */
285      public static class NullDocumentService implements DocumentService
286        {
287        @Override
288        public boolean isEnabled()
289          {
290          return false;
291          }
292    
293        @Override
294        public void setProperties( Map<Object, Object> properties )
295          {
296          }
297    
298        @Override
299        public void startService()
300          {
301          }
302    
303        @Override
304        public void stopService()
305          {
306          }
307    
308        @Override
309        public void put( String key, Object object )
310          {
311          }
312    
313        @Override
314        public void put( String type, String key, Object object )
315          {
316          }
317    
318        @Override
319        public Map get( String type, String key )
320          {
321          return null;
322          }
323    
324        @Override
325        public boolean supportsFind()
326          {
327          return false;
328          }
329    
330        @Override
331        public List<Map<String, Object>> find( String type, String[] query )
332          {
333          return null;
334          }
335        }
336    
337      /** Class NullMetricsService provides a null implementation. */
338      public static class NullMetricsService implements MetricsService
339        {
340        @Override
341        public boolean isEnabled()
342          {
343          return false;
344          }
345    
346        @Override
347        public void increment( String[] context, int amount )
348          {
349          }
350    
351        @Override
352        public void set( String[] context, String value )
353          {
354          }
355    
356        @Override
357        public void set( String[] context, int value )
358          {
359          }
360    
361        @Override
362        public void set( String[] context, long value )
363          {
364          }
365    
366        @Override
367        public String getString( String[] context )
368          {
369          return null;
370          }
371    
372        @Override
373        public int getInt( String[] context )
374          {
375          return 0;
376          }
377    
378        @Override
379        public long getLong( String[] context )
380          {
381          return 0;
382          }
383    
384        @Override
385        public boolean compareSet( String[] context, String isValue, String toValue )
386          {
387          return true;
388          }
389    
390        @Override
391        public boolean compareSet( String[] context, int isValue, int toValue )
392          {
393          return true;
394          }
395    
396        @Override
397        public boolean compareSet( String[] context, long isValue, long toValue )
398          {
399          return true;
400          }
401    
402        @Override
403        public void setProperties( Map<Object, Object> properties )
404          {
405          }
406    
407        @Override
408        public void startService()
409          {
410          }
411    
412        @Override
413        public void stopService()
414          {
415          }
416        }
417      }