001    /*
002     * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.flow;
022    
023    import java.util.Map;
024    import java.util.Properties;
025    
026    import cascading.property.Props;
027    
028    /**
029     * Class FlowProps is a fluent helper class for setting {@link Flow} specific properties through
030     * a {@link FlowConnector}.
031     *
032     * @see cascading.property.AppProps
033     * @see cascading.cascade.CascadeProps
034     * @see FlowConnectorProps
035     */
036    public class FlowProps extends Props
037      {
038      public static final String DEFAULT_ELEMENT_COMPARATOR = "cascading.flow.tuple.element.comparator";
039      public static final String PRESERVE_TEMPORARY_FILES = "cascading.flow.preservetemporaryfiles";
040      public static final String JOB_POLLING_INTERVAL = "cascading.flow.job.pollinginterval";
041      public static final String MAX_CONCURRENT_STEPS = "cascading.flow.maxconcurrentsteps";
042      public static final String STOP_JOBS_ON_EXIT = "cascading.flow.stopjobsonexit"; // create a stop flows on exit for AppConfig
043    
044      String defaultTupleElementComparator = null;
045      boolean preserveTemporaryFiles = false;
046      int jobPollingInterval = 5000;
047      int maxConcurrentSteps = 0;
048      boolean stopJobsOnExit = true;
049    
050      /**
051       * Sets a default {@link java.util.Comparator} to be used if no Comparator can be found for the class via the
052       * {@link cascading.tuple.Comparison} interface.
053       * <p/>
054       * In the case of Hadoop, if the Comparator instance also implements {@link org.apache.hadoop.conf.Configurable}, the
055       * {@link org.apache.hadoop.conf.Configurable#setConf(org.apache.hadoop.conf.Configuration)}
056       * will be called.
057       * <p/>
058       * In local mode, only the default constructor will be called for the comparator.
059       *
060       * @param properties
061       * @param className
062       */
063      public static void setDefaultTupleElementComparator( Map<Object, Object> properties, String className )
064        {
065        if( className != null )
066          properties.put( DEFAULT_ELEMENT_COMPARATOR, className );
067        }
068    
069      /**
070       * Property preserveTemporaryFiles forces the Flow instance to keep any temporary intermediate data sets. Useful
071       * for debugging. Defaults to {@code false}.
072       *
073       * @param properties             of type Map
074       * @param preserveTemporaryFiles of type boolean
075       */
076      public static void setPreserveTemporaryFiles( Map<Object, Object> properties, boolean preserveTemporaryFiles )
077        {
078        properties.put( PRESERVE_TEMPORARY_FILES, Boolean.toString( preserveTemporaryFiles ) );
079        }
080    
081      /**
082       * Property jobPollingInterval will set the time to wait between polling the remote server for the status of a job.
083       * The default value is 5000 msec (5 seconds).
084       *
085       * @param properties of type Map
086       * @param interval   of type long
087       */
088      public static void setJobPollingInterval( Map<Object, Object> properties, long interval )
089        {
090        properties.put( JOB_POLLING_INTERVAL, Long.toString( interval ) );
091        }
092    
093      /**
094       * Method setMaxConcurrentSteps sets the maximum number of steps that a Flow can run concurrently.
095       * <p/>
096       * By default a Flow will attempt to run all give steps at the same time. But there are occasions
097       * where limiting the number of steps helps manages resources.
098       *
099       * @param properties         of type Map<Object, Object>
100       * @param numConcurrentSteps of type int
101       */
102      public static void setMaxConcurrentSteps( Map<Object, Object> properties, int numConcurrentSteps )
103        {
104        properties.put( MAX_CONCURRENT_STEPS, Integer.toString( numConcurrentSteps ) );
105        }
106    
107      /**
108       * Property stopJobsOnExit will tell the Flow to add a JVM shutdown hook that will kill all running processes if the
109       * underlying computing system supports it. Defaults to {@code true}.
110       *
111       * @param properties     of type Map
112       * @param stopJobsOnExit of type boolean
113       */
114      public static void setStopJobsOnExit( Map<Object, Object> properties, boolean stopJobsOnExit )
115        {
116        properties.put( STOP_JOBS_ON_EXIT, Boolean.toString( stopJobsOnExit ) );
117        }
118    
119      /**
120       * Creates a new FlowProps instance.
121       *
122       * @return FlowProps instance
123       */
124      public static FlowProps flowProps()
125        {
126        return new FlowProps();
127        }
128    
129      public FlowProps()
130        {
131        }
132    
133      public String getDefaultTupleElementComparator()
134        {
135        return defaultTupleElementComparator;
136        }
137    
138      /**
139       * Sets a default {@link java.util.Comparator} to be used if no Comparator can be found for the class via the
140       * {@link cascading.tuple.Comparison} interface.
141       * <p/>
142       * In the case of Hadoop, if the Comparator instance also implements {@link org.apache.hadoop.conf.Configurable}, the
143       * {@link org.apache.hadoop.conf.Configurable#setConf(org.apache.hadoop.conf.Configuration)}
144       * will be called.
145       * <p/>
146       * In local mode, only the default constructor will be called for the comparator.
147       *
148       * @param defaultTupleElementComparator
149       */
150      public FlowProps setDefaultTupleElementComparator( String defaultTupleElementComparator )
151        {
152        this.defaultTupleElementComparator = defaultTupleElementComparator;
153    
154        return this;
155        }
156    
157      public boolean isPreserveTemporaryFiles()
158        {
159        return preserveTemporaryFiles;
160        }
161    
162      /**
163       * Property preserveTemporaryFiles forces the Flow instance to keep any temporary intermediate data sets. Useful
164       * for debugging. Defaults to {@code false}.
165       *
166       * @param preserveTemporaryFiles of type boolean
167       */
168      public FlowProps setPreserveTemporaryFiles( boolean preserveTemporaryFiles )
169        {
170        this.preserveTemporaryFiles = preserveTemporaryFiles;
171    
172        return this;
173        }
174    
175      public int getJobPollingInterval()
176        {
177        return jobPollingInterval;
178        }
179    
180      /**
181       * Property jobPollingInterval will set the time to wait between polling the remote server for the status of a job.
182       * The default value is 5000 msec (5 seconds).
183       *
184       * @param jobPollingInterval of type long
185       */
186      public FlowProps setJobPollingInterval( int jobPollingInterval )
187        {
188        this.jobPollingInterval = jobPollingInterval;
189    
190        return this;
191        }
192    
193      public int getMaxConcurrentSteps()
194        {
195        return maxConcurrentSteps;
196        }
197    
198      /**
199       * Method setMaxConcurrentSteps sets the maximum number of steps that a Flow can run concurrently.
200       * <p/>
201       * By default a Flow will attempt to run all give steps at the same time. But there are occasions
202       * where limiting the number of steps helps manages resources.
203       *
204       * @param maxConcurrentSteps of type int
205       */
206      public FlowProps setMaxConcurrentSteps( int maxConcurrentSteps )
207        {
208        this.maxConcurrentSteps = maxConcurrentSteps;
209    
210        return this;
211        }
212    
213      public boolean isStopJobsOnExit()
214        {
215        return stopJobsOnExit;
216        }
217    
218      /**
219       * Property stopJobsOnExit will tell the Flow to add a JVM shutdown hook that will kill all running processes if the
220       * underlying computing system supports it. Defaults to {@code true}.
221       *
222       * @param stopJobsOnExit of type boolean
223       */
224      public FlowProps setStopJobsOnExit( boolean stopJobsOnExit )
225        {
226        this.stopJobsOnExit = stopJobsOnExit;
227    
228        return this;
229        }
230    
231      @Override
232      protected void addPropertiesTo( Properties properties )
233        {
234        setDefaultTupleElementComparator( properties, defaultTupleElementComparator );
235        setPreserveTemporaryFiles( properties, preserveTemporaryFiles );
236        setJobPollingInterval( properties, jobPollingInterval );
237        setMaxConcurrentSteps( properties, maxConcurrentSteps );
238        setStopJobsOnExit( properties, stopJobsOnExit );
239        }
240      }