001/*
002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow;
022
023import java.util.Map;
024import java.util.Properties;
025
026import cascading.property.Props;
027
028/**
029 * Class FlowProps is a fluent helper class for setting {@link Flow} specific properties through
030 * a {@link FlowConnector}.
031 *
032 * @see cascading.property.AppProps
033 * @see cascading.cascade.CascadeProps
034 * @see FlowConnectorProps
035 */
036public class FlowProps extends Props
037  {
038  public static final String DEFAULT_ELEMENT_COMPARATOR = "cascading.flow.tuple.element.comparator";
039  public static final String PRESERVE_TEMPORARY_FILES = "cascading.flow.preservetemporaryfiles";
040  public static final String JOB_POLLING_INTERVAL = "cascading.flow.job.pollinginterval";
041  public static final String MAX_CONCURRENT_STEPS = "cascading.flow.maxconcurrentsteps";
042  public static final String STOP_JOBS_ON_EXIT = "cascading.flow.stopjobsonexit"; // create a stop flows on exit for AppConfig
043
044  String defaultTupleElementComparator = null;
045  boolean preserveTemporaryFiles = false;
046  int jobPollingInterval = 5000;
047  int maxConcurrentSteps = 0;
048  boolean stopJobsOnExit = true;
049
050  /**
051   * Sets a default {@link java.util.Comparator} to be used if no Comparator can be found for the class via the
052   * {@link cascading.tuple.Comparison} interface.
053   * <p/>
054   * In the case of Hadoop, if the Comparator instance also implements {@link org.apache.hadoop.conf.Configurable}, the
055   * {@link org.apache.hadoop.conf.Configurable#setConf(org.apache.hadoop.conf.Configuration)}
056   * will be called.
057   * <p/>
058   * In local mode, only the default constructor will be called for the comparator.
059   *
060   * @param properties
061   * @param className
062   */
063  public static void setDefaultTupleElementComparator( Map<Object, Object> properties, String className )
064    {
065    if( className != null )
066      properties.put( DEFAULT_ELEMENT_COMPARATOR, className );
067    }
068
069  /**
070   * Property preserveTemporaryFiles forces the Flow instance to keep any temporary intermediate data sets. Useful
071   * for debugging. Defaults to {@code false}.
072   *
073   * @param properties             of type Map
074   * @param preserveTemporaryFiles of type boolean
075   */
076  public static void setPreserveTemporaryFiles( Map<Object, Object> properties, boolean preserveTemporaryFiles )
077    {
078    properties.put( PRESERVE_TEMPORARY_FILES, Boolean.toString( preserveTemporaryFiles ) );
079    }
080
081  /**
082   * Property jobPollingInterval will set the time to wait between polling the remote server for the status of a job.
083   * The default value is 5000 msec (5 seconds).
084   *
085   * @param properties of type Map
086   * @param interval   of type long
087   */
088  public static void setJobPollingInterval( Map<Object, Object> properties, long interval )
089    {
090    properties.put( JOB_POLLING_INTERVAL, Long.toString( interval ) );
091    }
092
093  /**
094   * Method setMaxConcurrentSteps sets the maximum number of steps that a Flow can run concurrently.
095   * <p/>
096   * By default a Flow will attempt to run all give steps at the same time. But there are occasions
097   * where limiting the number of steps helps manages resources.
098   *
099   * @param properties         of type Map<Object, Object>
100   * @param numConcurrentSteps of type int
101   */
102  public static void setMaxConcurrentSteps( Map<Object, Object> properties, int numConcurrentSteps )
103    {
104    properties.put( MAX_CONCURRENT_STEPS, Integer.toString( numConcurrentSteps ) );
105    }
106
107  /**
108   * Property stopJobsOnExit will tell the Flow to add a JVM shutdown hook that will kill all running processes if the
109   * underlying computing system supports it. Defaults to {@code true}.
110   *
111   * @param properties     of type Map
112   * @param stopJobsOnExit of type boolean
113   */
114  public static void setStopJobsOnExit( Map<Object, Object> properties, boolean stopJobsOnExit )
115    {
116    properties.put( STOP_JOBS_ON_EXIT, Boolean.toString( stopJobsOnExit ) );
117    }
118
119  /**
120   * Creates a new FlowProps instance.
121   *
122   * @return FlowProps instance
123   */
124  public static FlowProps flowProps()
125    {
126    return new FlowProps();
127    }
128
129  public FlowProps()
130    {
131    }
132
133  public String getDefaultTupleElementComparator()
134    {
135    return defaultTupleElementComparator;
136    }
137
138  /**
139   * Sets a default {@link java.util.Comparator} to be used if no Comparator can be found for the class via the
140   * {@link cascading.tuple.Comparison} interface.
141   * <p/>
142   * In the case of Hadoop, if the Comparator instance also implements {@link org.apache.hadoop.conf.Configurable}, the
143   * {@link org.apache.hadoop.conf.Configurable#setConf(org.apache.hadoop.conf.Configuration)}
144   * will be called.
145   * <p/>
146   * In local mode, only the default constructor will be called for the comparator.
147   *
148   * @param defaultTupleElementComparator
149   */
150  public FlowProps setDefaultTupleElementComparator( String defaultTupleElementComparator )
151    {
152    this.defaultTupleElementComparator = defaultTupleElementComparator;
153
154    return this;
155    }
156
157  public boolean isPreserveTemporaryFiles()
158    {
159    return preserveTemporaryFiles;
160    }
161
162  /**
163   * Property preserveTemporaryFiles forces the Flow instance to keep any temporary intermediate data sets. Useful
164   * for debugging. Defaults to {@code false}.
165   *
166   * @param preserveTemporaryFiles of type boolean
167   */
168  public FlowProps setPreserveTemporaryFiles( boolean preserveTemporaryFiles )
169    {
170    this.preserveTemporaryFiles = preserveTemporaryFiles;
171
172    return this;
173    }
174
175  public int getJobPollingInterval()
176    {
177    return jobPollingInterval;
178    }
179
180  /**
181   * Property jobPollingInterval will set the time to wait between polling the remote server for the status of a job.
182   * The default value is 5000 msec (5 seconds).
183   *
184   * @param jobPollingInterval of type long
185   */
186  public FlowProps setJobPollingInterval( int jobPollingInterval )
187    {
188    this.jobPollingInterval = jobPollingInterval;
189
190    return this;
191    }
192
193  public int getMaxConcurrentSteps()
194    {
195    return maxConcurrentSteps;
196    }
197
198  /**
199   * Method setMaxConcurrentSteps sets the maximum number of steps that a Flow can run concurrently.
200   * <p/>
201   * By default a Flow will attempt to run all give steps at the same time. But there are occasions
202   * where limiting the number of steps helps manages resources.
203   *
204   * @param maxConcurrentSteps of type int
205   */
206  public FlowProps setMaxConcurrentSteps( int maxConcurrentSteps )
207    {
208    this.maxConcurrentSteps = maxConcurrentSteps;
209
210    return this;
211    }
212
213  public boolean isStopJobsOnExit()
214    {
215    return stopJobsOnExit;
216    }
217
218  /**
219   * Property stopJobsOnExit will tell the Flow to add a JVM shutdown hook that will kill all running processes if the
220   * underlying computing system supports it. Defaults to {@code true}.
221   *
222   * @param stopJobsOnExit of type boolean
223   */
224  public FlowProps setStopJobsOnExit( boolean stopJobsOnExit )
225    {
226    this.stopJobsOnExit = stopJobsOnExit;
227
228    return this;
229    }
230
231  @Override
232  protected void addPropertiesTo( Properties properties )
233    {
234    setDefaultTupleElementComparator( properties, defaultTupleElementComparator );
235    setPreserveTemporaryFiles( properties, preserveTemporaryFiles );
236    setJobPollingInterval( properties, jobPollingInterval );
237    setMaxConcurrentSteps( properties, maxConcurrentSteps );
238    setStopJobsOnExit( properties, stopJobsOnExit );
239    }
240  }