001/*
002 * Copyright (c) 2016-2017 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
004 *
005 * Project and contact information: http://www.cascading.org/
006 *
007 * This file is part of the Cascading project.
008 *
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *     http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 */
021
022package cascading.flow;
023
024import java.util.Map;
025import java.util.Properties;
026
027import cascading.property.Props;
028
029/**
030 * Class FlowProps is a fluent helper class for setting {@link Flow} specific properties through
031 * a {@link FlowConnector}.
032 *
033 * @see cascading.property.AppProps
034 * @see cascading.cascade.CascadeProps
035 * @see FlowConnectorProps
036 */
037public class FlowProps extends Props
038  {
039  public static final String DEFAULT_ELEMENT_COMPARATOR = "cascading.flow.tuple.element.comparator";
040  public static final String PRESERVE_TEMPORARY_FILES = "cascading.flow.preservetemporaryfiles";
041  public static final String JOB_POLLING_INTERVAL = "cascading.flow.job.pollinginterval";
042  public static final String MAX_CONCURRENT_STEPS = "cascading.flow.maxconcurrentsteps";
043  public static final String STOP_JOBS_ON_EXIT = "cascading.flow.stopjobsonexit"; // create a stop flows on exit for AppConfig
044
045  String defaultTupleElementComparator = null;
046  boolean preserveTemporaryFiles = false;
047  int jobPollingInterval = 5000;
048  int maxConcurrentSteps = 0;
049  boolean stopJobsOnExit = true;
050
051  /**
052   * Sets a default {@link java.util.Comparator} to be used if no Comparator can be found for the class via the
053   * {@link cascading.tuple.Comparison} interface.
054   * <p>
055   * In the case of Hadoop, if the Comparator instance also implements {@link org.apache.hadoop.conf.Configurable}, the
056   * {@link org.apache.hadoop.conf.Configurable#setConf(org.apache.hadoop.conf.Configuration)}
057   * will be called.
058   * <p>
059   * In local mode, only the default constructor will be called for the comparator.
060   *
061   * @param properties
062   * @param className
063   */
064  public static void setDefaultTupleElementComparator( Map<Object, Object> properties, String className )
065    {
066    if( className != null )
067      properties.put( DEFAULT_ELEMENT_COMPARATOR, className );
068    }
069
070  /**
071   * Property preserveTemporaryFiles forces the Flow instance to keep any temporary intermediate data sets. Useful
072   * for debugging. Defaults to {@code false}.
073   *
074   * @param properties             of type Map
075   * @param preserveTemporaryFiles of type boolean
076   */
077  public static void setPreserveTemporaryFiles( Map<Object, Object> properties, boolean preserveTemporaryFiles )
078    {
079    properties.put( PRESERVE_TEMPORARY_FILES, Boolean.toString( preserveTemporaryFiles ) );
080    }
081
082  /**
083   * Property jobPollingInterval will set the time to wait between polling the remote server for the status of a job.
084   * The default value is 5000 msec (5 seconds).
085   *
086   * @param properties of type Map
087   * @param interval   of type long
088   */
089  public static void setJobPollingInterval( Map<Object, Object> properties, long interval )
090    {
091    properties.put( JOB_POLLING_INTERVAL, Long.toString( interval ) );
092    }
093
094  /**
095   * Method setMaxConcurrentSteps sets the maximum number of steps that a Flow can run concurrently.
096   * <p>
097   * By default a Flow will attempt to run all give steps at the same time. But there are occasions
098   * where limiting the number of steps helps manages resources.
099   *
100   * @param properties         of type Map
101   * @param numConcurrentSteps of type int
102   */
103  public static void setMaxConcurrentSteps( Map<Object, Object> properties, int numConcurrentSteps )
104    {
105    properties.put( MAX_CONCURRENT_STEPS, Integer.toString( numConcurrentSteps ) );
106    }
107
108  /**
109   * Property stopJobsOnExit will tell the Flow to add a JVM shutdown hook that will kill all running processes if the
110   * underlying computing system supports it. Defaults to {@code true}.
111   *
112   * @param properties     of type Map
113   * @param stopJobsOnExit of type boolean
114   */
115  public static void setStopJobsOnExit( Map<Object, Object> properties, boolean stopJobsOnExit )
116    {
117    properties.put( STOP_JOBS_ON_EXIT, Boolean.toString( stopJobsOnExit ) );
118    }
119
120  /**
121   * Creates a new FlowProps instance.
122   *
123   * @return FlowProps instance
124   */
125  public static FlowProps flowProps()
126    {
127    return new FlowProps();
128    }
129
130  public FlowProps()
131    {
132    }
133
134  public String getDefaultTupleElementComparator()
135    {
136    return defaultTupleElementComparator;
137    }
138
139  /**
140   * Sets a default {@link java.util.Comparator} to be used if no Comparator can be found for the class via the
141   * {@link cascading.tuple.Comparison} interface.
142   * <p>
143   * In the case of Hadoop, if the Comparator instance also implements {@link org.apache.hadoop.conf.Configurable}, the
144   * {@link org.apache.hadoop.conf.Configurable#setConf(org.apache.hadoop.conf.Configuration)}
145   * will be called.
146   * <p>
147   * In local mode, only the default constructor will be called for the comparator.
148   *
149   * @param defaultTupleElementComparator
150   */
151  public FlowProps setDefaultTupleElementComparator( String defaultTupleElementComparator )
152    {
153    this.defaultTupleElementComparator = defaultTupleElementComparator;
154
155    return this;
156    }
157
158  public boolean isPreserveTemporaryFiles()
159    {
160    return preserveTemporaryFiles;
161    }
162
163  /**
164   * Property preserveTemporaryFiles forces the Flow instance to keep any temporary intermediate data sets. Useful
165   * for debugging. Defaults to {@code false}.
166   *
167   * @param preserveTemporaryFiles of type boolean
168   */
169  public FlowProps setPreserveTemporaryFiles( boolean preserveTemporaryFiles )
170    {
171    this.preserveTemporaryFiles = preserveTemporaryFiles;
172
173    return this;
174    }
175
176  public int getJobPollingInterval()
177    {
178    return jobPollingInterval;
179    }
180
181  /**
182   * Property jobPollingInterval will set the time to wait between polling the remote server for the status of a job.
183   * The default value is 5000 msec (5 seconds).
184   *
185   * @param jobPollingInterval of type long
186   */
187  public FlowProps setJobPollingInterval( int jobPollingInterval )
188    {
189    this.jobPollingInterval = jobPollingInterval;
190
191    return this;
192    }
193
194  public int getMaxConcurrentSteps()
195    {
196    return maxConcurrentSteps;
197    }
198
199  /**
200   * Method setMaxConcurrentSteps sets the maximum number of steps that a Flow can run concurrently.
201   * <p>
202   * By default a Flow will attempt to run all give steps at the same time. But there are occasions
203   * where limiting the number of steps helps manages resources.
204   *
205   * @param maxConcurrentSteps of type int
206   */
207  public FlowProps setMaxConcurrentSteps( int maxConcurrentSteps )
208    {
209    this.maxConcurrentSteps = maxConcurrentSteps;
210
211    return this;
212    }
213
214  public boolean isStopJobsOnExit()
215    {
216    return stopJobsOnExit;
217    }
218
219  /**
220   * Property stopJobsOnExit will tell the Flow to add a JVM shutdown hook that will kill all running processes if the
221   * underlying computing system supports it. Defaults to {@code true}.
222   *
223   * @param stopJobsOnExit of type boolean
224   */
225  public FlowProps setStopJobsOnExit( boolean stopJobsOnExit )
226    {
227    this.stopJobsOnExit = stopJobsOnExit;
228
229    return this;
230    }
231
232  @Override
233  protected void addPropertiesTo( Properties properties )
234    {
235    setDefaultTupleElementComparator( properties, defaultTupleElementComparator );
236    setPreserveTemporaryFiles( properties, preserveTemporaryFiles );
237    setJobPollingInterval( properties, jobPollingInterval );
238    setMaxConcurrentSteps( properties, maxConcurrentSteps );
239    setStopJobsOnExit( properties, stopJobsOnExit );
240    }
241  }