001/* 002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.flow; 022 023import java.util.Map; 024import java.util.Properties; 025 026import cascading.property.Props; 027 028/** 029 * Class FlowProps is a fluent helper class for setting {@link Flow} specific properties through 030 * a {@link FlowConnector}. 031 * 032 * @see cascading.property.AppProps 033 * @see cascading.cascade.CascadeProps 034 * @see FlowConnectorProps 035 */ 036public class FlowProps extends Props 037 { 038 public static final String DEFAULT_ELEMENT_COMPARATOR = "cascading.flow.tuple.element.comparator"; 039 public static final String PRESERVE_TEMPORARY_FILES = "cascading.flow.preservetemporaryfiles"; 040 public static final String JOB_POLLING_INTERVAL = "cascading.flow.job.pollinginterval"; 041 public static final String MAX_CONCURRENT_STEPS = "cascading.flow.maxconcurrentsteps"; 042 public static final String STOP_JOBS_ON_EXIT = "cascading.flow.stopjobsonexit"; // create a stop flows on exit for AppConfig 043 044 String defaultTupleElementComparator = null; 045 boolean preserveTemporaryFiles = false; 046 int jobPollingInterval = 5000; 047 int maxConcurrentSteps = 0; 048 boolean stopJobsOnExit = true; 049 050 /** 051 * Sets a default {@link java.util.Comparator} to be used if no Comparator can be found for the class via the 052 * {@link cascading.tuple.Comparison} interface. 053 * <p/> 054 * In the case of Hadoop, if the Comparator instance also implements {@link org.apache.hadoop.conf.Configurable}, the 055 * {@link org.apache.hadoop.conf.Configurable#setConf(org.apache.hadoop.conf.Configuration)} 056 * will be called. 057 * <p/> 058 * In local mode, only the default constructor will be called for the comparator. 059 * 060 * @param properties 061 * @param className 062 */ 063 public static void setDefaultTupleElementComparator( Map<Object, Object> properties, String className ) 064 { 065 if( className != null ) 066 properties.put( DEFAULT_ELEMENT_COMPARATOR, className ); 067 } 068 069 /** 070 * Property preserveTemporaryFiles forces the Flow instance to keep any temporary intermediate data sets. Useful 071 * for debugging. Defaults to {@code false}. 072 * 073 * @param properties of type Map 074 * @param preserveTemporaryFiles of type boolean 075 */ 076 public static void setPreserveTemporaryFiles( Map<Object, Object> properties, boolean preserveTemporaryFiles ) 077 { 078 properties.put( PRESERVE_TEMPORARY_FILES, Boolean.toString( preserveTemporaryFiles ) ); 079 } 080 081 /** 082 * Property jobPollingInterval will set the time to wait between polling the remote server for the status of a job. 083 * The default value is 5000 msec (5 seconds). 084 * 085 * @param properties of type Map 086 * @param interval of type long 087 */ 088 public static void setJobPollingInterval( Map<Object, Object> properties, long interval ) 089 { 090 properties.put( JOB_POLLING_INTERVAL, Long.toString( interval ) ); 091 } 092 093 /** 094 * Method setMaxConcurrentSteps sets the maximum number of steps that a Flow can run concurrently. 095 * <p/> 096 * By default a Flow will attempt to run all give steps at the same time. But there are occasions 097 * where limiting the number of steps helps manages resources. 098 * 099 * @param properties of type Map<Object, Object> 100 * @param numConcurrentSteps of type int 101 */ 102 public static void setMaxConcurrentSteps( Map<Object, Object> properties, int numConcurrentSteps ) 103 { 104 properties.put( MAX_CONCURRENT_STEPS, Integer.toString( numConcurrentSteps ) ); 105 } 106 107 /** 108 * Property stopJobsOnExit will tell the Flow to add a JVM shutdown hook that will kill all running processes if the 109 * underlying computing system supports it. Defaults to {@code true}. 110 * 111 * @param properties of type Map 112 * @param stopJobsOnExit of type boolean 113 */ 114 public static void setStopJobsOnExit( Map<Object, Object> properties, boolean stopJobsOnExit ) 115 { 116 properties.put( STOP_JOBS_ON_EXIT, Boolean.toString( stopJobsOnExit ) ); 117 } 118 119 /** 120 * Creates a new FlowProps instance. 121 * 122 * @return FlowProps instance 123 */ 124 public static FlowProps flowProps() 125 { 126 return new FlowProps(); 127 } 128 129 public FlowProps() 130 { 131 } 132 133 public String getDefaultTupleElementComparator() 134 { 135 return defaultTupleElementComparator; 136 } 137 138 /** 139 * Sets a default {@link java.util.Comparator} to be used if no Comparator can be found for the class via the 140 * {@link cascading.tuple.Comparison} interface. 141 * <p/> 142 * In the case of Hadoop, if the Comparator instance also implements {@link org.apache.hadoop.conf.Configurable}, the 143 * {@link org.apache.hadoop.conf.Configurable#setConf(org.apache.hadoop.conf.Configuration)} 144 * will be called. 145 * <p/> 146 * In local mode, only the default constructor will be called for the comparator. 147 * 148 * @param defaultTupleElementComparator 149 */ 150 public FlowProps setDefaultTupleElementComparator( String defaultTupleElementComparator ) 151 { 152 this.defaultTupleElementComparator = defaultTupleElementComparator; 153 154 return this; 155 } 156 157 public boolean isPreserveTemporaryFiles() 158 { 159 return preserveTemporaryFiles; 160 } 161 162 /** 163 * Property preserveTemporaryFiles forces the Flow instance to keep any temporary intermediate data sets. Useful 164 * for debugging. Defaults to {@code false}. 165 * 166 * @param preserveTemporaryFiles of type boolean 167 */ 168 public FlowProps setPreserveTemporaryFiles( boolean preserveTemporaryFiles ) 169 { 170 this.preserveTemporaryFiles = preserveTemporaryFiles; 171 172 return this; 173 } 174 175 public int getJobPollingInterval() 176 { 177 return jobPollingInterval; 178 } 179 180 /** 181 * Property jobPollingInterval will set the time to wait between polling the remote server for the status of a job. 182 * The default value is 5000 msec (5 seconds). 183 * 184 * @param jobPollingInterval of type long 185 */ 186 public FlowProps setJobPollingInterval( int jobPollingInterval ) 187 { 188 this.jobPollingInterval = jobPollingInterval; 189 190 return this; 191 } 192 193 public int getMaxConcurrentSteps() 194 { 195 return maxConcurrentSteps; 196 } 197 198 /** 199 * Method setMaxConcurrentSteps sets the maximum number of steps that a Flow can run concurrently. 200 * <p/> 201 * By default a Flow will attempt to run all give steps at the same time. But there are occasions 202 * where limiting the number of steps helps manages resources. 203 * 204 * @param maxConcurrentSteps of type int 205 */ 206 public FlowProps setMaxConcurrentSteps( int maxConcurrentSteps ) 207 { 208 this.maxConcurrentSteps = maxConcurrentSteps; 209 210 return this; 211 } 212 213 public boolean isStopJobsOnExit() 214 { 215 return stopJobsOnExit; 216 } 217 218 /** 219 * Property stopJobsOnExit will tell the Flow to add a JVM shutdown hook that will kill all running processes if the 220 * underlying computing system supports it. Defaults to {@code true}. 221 * 222 * @param stopJobsOnExit of type boolean 223 */ 224 public FlowProps setStopJobsOnExit( boolean stopJobsOnExit ) 225 { 226 this.stopJobsOnExit = stopJobsOnExit; 227 228 return this; 229 } 230 231 @Override 232 protected void addPropertiesTo( Properties properties ) 233 { 234 setDefaultTupleElementComparator( properties, defaultTupleElementComparator ); 235 setPreserveTemporaryFiles( properties, preserveTemporaryFiles ); 236 setJobPollingInterval( properties, jobPollingInterval ); 237 setMaxConcurrentSteps( properties, maxConcurrentSteps ); 238 setStopJobsOnExit( properties, stopJobsOnExit ); 239 } 240 }