001/* 002 * Copyright (c) 2016-2017 Chris K Wensel <chris@wensel.net>. All Rights Reserved. 003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 004 * 005 * Project and contact information: http://www.cascading.org/ 006 * 007 * This file is part of the Cascading project. 008 * 009 * Licensed under the Apache License, Version 2.0 (the "License"); 010 * you may not use this file except in compliance with the License. 011 * You may obtain a copy of the License at 012 * 013 * http://www.apache.org/licenses/LICENSE-2.0 014 * 015 * Unless required by applicable law or agreed to in writing, software 016 * distributed under the License is distributed on an "AS IS" BASIS, 017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 018 * See the License for the specific language governing permissions and 019 * limitations under the License. 020 */ 021 022package cascading.flow; 023 024import java.util.Map; 025import java.util.Properties; 026 027import cascading.property.Props; 028 029/** 030 * Class FlowProps is a fluent helper class for setting {@link Flow} specific properties through 031 * a {@link FlowConnector}. 032 * 033 * @see cascading.property.AppProps 034 * @see cascading.cascade.CascadeProps 035 * @see FlowConnectorProps 036 */ 037public class FlowProps extends Props 038 { 039 public static final String DEFAULT_ELEMENT_COMPARATOR = "cascading.flow.tuple.element.comparator"; 040 public static final String PRESERVE_TEMPORARY_FILES = "cascading.flow.preservetemporaryfiles"; 041 public static final String JOB_POLLING_INTERVAL = "cascading.flow.job.pollinginterval"; 042 public static final String MAX_CONCURRENT_STEPS = "cascading.flow.maxconcurrentsteps"; 043 public static final String STOP_JOBS_ON_EXIT = "cascading.flow.stopjobsonexit"; // create a stop flows on exit for AppConfig 044 045 String defaultTupleElementComparator = null; 046 boolean preserveTemporaryFiles = false; 047 int jobPollingInterval = 5000; 048 int maxConcurrentSteps = 0; 049 boolean stopJobsOnExit = true; 050 051 /** 052 * Sets a default {@link java.util.Comparator} to be used if no Comparator can be found for the class via the 053 * {@link cascading.tuple.Comparison} interface. 054 * <p> 055 * In the case of Hadoop, if the Comparator instance also implements {@link org.apache.hadoop.conf.Configurable}, the 056 * {@link org.apache.hadoop.conf.Configurable#setConf(org.apache.hadoop.conf.Configuration)} 057 * will be called. 058 * <p> 059 * In local mode, only the default constructor will be called for the comparator. 060 * 061 * @param properties 062 * @param className 063 */ 064 public static void setDefaultTupleElementComparator( Map<Object, Object> properties, String className ) 065 { 066 if( className != null ) 067 properties.put( DEFAULT_ELEMENT_COMPARATOR, className ); 068 } 069 070 /** 071 * Property preserveTemporaryFiles forces the Flow instance to keep any temporary intermediate data sets. Useful 072 * for debugging. Defaults to {@code false}. 073 * 074 * @param properties of type Map 075 * @param preserveTemporaryFiles of type boolean 076 */ 077 public static void setPreserveTemporaryFiles( Map<Object, Object> properties, boolean preserveTemporaryFiles ) 078 { 079 properties.put( PRESERVE_TEMPORARY_FILES, Boolean.toString( preserveTemporaryFiles ) ); 080 } 081 082 /** 083 * Property jobPollingInterval will set the time to wait between polling the remote server for the status of a job. 084 * The default value is 5000 msec (5 seconds). 085 * 086 * @param properties of type Map 087 * @param interval of type long 088 */ 089 public static void setJobPollingInterval( Map<Object, Object> properties, long interval ) 090 { 091 properties.put( JOB_POLLING_INTERVAL, Long.toString( interval ) ); 092 } 093 094 /** 095 * Method setMaxConcurrentSteps sets the maximum number of steps that a Flow can run concurrently. 096 * <p> 097 * By default a Flow will attempt to run all give steps at the same time. But there are occasions 098 * where limiting the number of steps helps manages resources. 099 * 100 * @param properties of type Map 101 * @param numConcurrentSteps of type int 102 */ 103 public static void setMaxConcurrentSteps( Map<Object, Object> properties, int numConcurrentSteps ) 104 { 105 properties.put( MAX_CONCURRENT_STEPS, Integer.toString( numConcurrentSteps ) ); 106 } 107 108 /** 109 * Property stopJobsOnExit will tell the Flow to add a JVM shutdown hook that will kill all running processes if the 110 * underlying computing system supports it. Defaults to {@code true}. 111 * 112 * @param properties of type Map 113 * @param stopJobsOnExit of type boolean 114 */ 115 public static void setStopJobsOnExit( Map<Object, Object> properties, boolean stopJobsOnExit ) 116 { 117 properties.put( STOP_JOBS_ON_EXIT, Boolean.toString( stopJobsOnExit ) ); 118 } 119 120 /** 121 * Creates a new FlowProps instance. 122 * 123 * @return FlowProps instance 124 */ 125 public static FlowProps flowProps() 126 { 127 return new FlowProps(); 128 } 129 130 public FlowProps() 131 { 132 } 133 134 public String getDefaultTupleElementComparator() 135 { 136 return defaultTupleElementComparator; 137 } 138 139 /** 140 * Sets a default {@link java.util.Comparator} to be used if no Comparator can be found for the class via the 141 * {@link cascading.tuple.Comparison} interface. 142 * <p> 143 * In the case of Hadoop, if the Comparator instance also implements {@link org.apache.hadoop.conf.Configurable}, the 144 * {@link org.apache.hadoop.conf.Configurable#setConf(org.apache.hadoop.conf.Configuration)} 145 * will be called. 146 * <p> 147 * In local mode, only the default constructor will be called for the comparator. 148 * 149 * @param defaultTupleElementComparator 150 */ 151 public FlowProps setDefaultTupleElementComparator( String defaultTupleElementComparator ) 152 { 153 this.defaultTupleElementComparator = defaultTupleElementComparator; 154 155 return this; 156 } 157 158 public boolean isPreserveTemporaryFiles() 159 { 160 return preserveTemporaryFiles; 161 } 162 163 /** 164 * Property preserveTemporaryFiles forces the Flow instance to keep any temporary intermediate data sets. Useful 165 * for debugging. Defaults to {@code false}. 166 * 167 * @param preserveTemporaryFiles of type boolean 168 */ 169 public FlowProps setPreserveTemporaryFiles( boolean preserveTemporaryFiles ) 170 { 171 this.preserveTemporaryFiles = preserveTemporaryFiles; 172 173 return this; 174 } 175 176 public int getJobPollingInterval() 177 { 178 return jobPollingInterval; 179 } 180 181 /** 182 * Property jobPollingInterval will set the time to wait between polling the remote server for the status of a job. 183 * The default value is 5000 msec (5 seconds). 184 * 185 * @param jobPollingInterval of type long 186 */ 187 public FlowProps setJobPollingInterval( int jobPollingInterval ) 188 { 189 this.jobPollingInterval = jobPollingInterval; 190 191 return this; 192 } 193 194 public int getMaxConcurrentSteps() 195 { 196 return maxConcurrentSteps; 197 } 198 199 /** 200 * Method setMaxConcurrentSteps sets the maximum number of steps that a Flow can run concurrently. 201 * <p> 202 * By default a Flow will attempt to run all give steps at the same time. But there are occasions 203 * where limiting the number of steps helps manages resources. 204 * 205 * @param maxConcurrentSteps of type int 206 */ 207 public FlowProps setMaxConcurrentSteps( int maxConcurrentSteps ) 208 { 209 this.maxConcurrentSteps = maxConcurrentSteps; 210 211 return this; 212 } 213 214 public boolean isStopJobsOnExit() 215 { 216 return stopJobsOnExit; 217 } 218 219 /** 220 * Property stopJobsOnExit will tell the Flow to add a JVM shutdown hook that will kill all running processes if the 221 * underlying computing system supports it. Defaults to {@code true}. 222 * 223 * @param stopJobsOnExit of type boolean 224 */ 225 public FlowProps setStopJobsOnExit( boolean stopJobsOnExit ) 226 { 227 this.stopJobsOnExit = stopJobsOnExit; 228 229 return this; 230 } 231 232 @Override 233 protected void addPropertiesTo( Properties properties ) 234 { 235 setDefaultTupleElementComparator( properties, defaultTupleElementComparator ); 236 setPreserveTemporaryFiles( properties, preserveTemporaryFiles ); 237 setJobPollingInterval( properties, jobPollingInterval ); 238 setMaxConcurrentSteps( properties, maxConcurrentSteps ); 239 setStopJobsOnExit( properties, stopJobsOnExit ); 240 } 241 }