001 /* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.flow; 022 023 import java.io.IOException; 024 import java.util.Collection; 025 import java.util.Collections; 026 import java.util.Map; 027 028 import cascading.tap.Tap; 029 import cascading.tuple.TupleEntryCollector; 030 import cascading.tuple.TupleEntryIterator; 031 032 /** 033 * FlowProcess implementations provide a call-back interface into the current computing system. Each 034 * {@link cascading.operation.Operation} is given a reference to a particular implementation, allowing it 035 * to get configuration properties, send a "keep alive" ping, or to set a counter value. 036 * <p/> 037 * Depending on the underlying system, FlowProcess instances are not continuous across all operations in a {@link Flow}. 038 * Thus, a call to {@link #increment(Enum, long)} may start incrementing from zero if the operation making the call 039 * belongs to a subsequent 'job' or 'step' from any previous operations calling increment. 040 * <p/> 041 * A FlowProcess is roughly a child of {@link FlowSession}. FlowSession is roughly one to one with a particular {@link Flow}. 042 * And every FlowSession will have one or more FlowProcesses. 043 * 044 * @see FlowSession 045 */ 046 public abstract class FlowProcess<Config> 047 { 048 /** Field NULL is a noop implementation of FlowSession. */ 049 public static FlowProcess NULL = new NullFlowProcess(); 050 051 public static class NullFlowProcess extends FlowProcess<Object> 052 { 053 protected NullFlowProcess() 054 { 055 } 056 057 @Override 058 public FlowProcess copyWith( Object object ) 059 { 060 return new NullFlowProcess(); 061 } 062 063 public Object getProperty( String key ) 064 { 065 return null; 066 } 067 068 @Override 069 public Collection<String> getPropertyKeys() 070 { 071 return Collections.EMPTY_SET; 072 } 073 074 @Override 075 public Object newInstance( String className ) 076 { 077 return null; 078 } 079 080 public void keepAlive() 081 { 082 } 083 084 public void increment( Enum counter, long amount ) 085 { 086 } 087 088 public void increment( String group, String counter, long amount ) 089 { 090 } 091 092 public void setStatus( String status ) 093 { 094 } 095 096 @Override 097 public boolean isCounterStatusInitialized() 098 { 099 return true; 100 } 101 102 @Override 103 public int getNumProcessSlices() 104 { 105 return 1; 106 } 107 108 @Override 109 public int getCurrentSliceNum() 110 { 111 return 0; 112 } 113 114 public TupleEntryIterator openTapForRead( Tap tap ) throws IOException 115 { 116 return tap.openForRead( this ); 117 } 118 119 public TupleEntryCollector openTapForWrite( Tap tap ) throws IOException 120 { 121 return tap.openForWrite( this ); 122 } 123 124 @Override 125 public TupleEntryCollector openTrapForWrite( Tap trap ) throws IOException 126 { 127 return trap.openForWrite( this ); 128 } 129 130 @Override 131 public TupleEntryCollector openSystemIntermediateForWrite() throws IOException 132 { 133 return null; 134 } 135 136 @Override 137 public Object getConfigCopy() 138 { 139 return null; 140 } 141 142 @Override 143 public Object copyConfig( Object config ) 144 { 145 return config; 146 } 147 148 @Override 149 public Map<String, String> diffConfigIntoMap( Object defaultConfig, Object updatedConfig ) 150 { 151 return null; 152 } 153 154 @Override 155 public Object mergeMapIntoConfig( Object defaultConfig, Map<String, String> map ) 156 { 157 return null; 158 } 159 } 160 161 /** Field currentSession */ 162 private FlowSession currentSession = FlowSession.NULL; 163 164 protected FlowProcess() 165 { 166 } 167 168 protected FlowProcess( FlowSession currentSession ) 169 { 170 setCurrentSession( currentSession ); 171 } 172 173 public abstract FlowProcess copyWith( Config config ); 174 175 /** 176 * Method getID() returns the current 177 * 178 * @return of type String 179 */ 180 public String getID() 181 { 182 return getStringProperty( FlowStep.CASCADING_FLOW_STEP_ID ); 183 } 184 185 /** 186 * Method getCurrentSession returns the currentSession of this FlowProcess object. 187 * 188 * @return the currentSession (type FlowSession) of this FlowProcess object. 189 */ 190 public FlowSession getCurrentSession() 191 { 192 return currentSession; 193 } 194 195 /** 196 * Method setCurrentSession sets the currentSession of this FlowProcess object. 197 * 198 * @param currentSession the currentSession of this FlowProcess object. 199 */ 200 public void setCurrentSession( FlowSession currentSession ) 201 { 202 this.currentSession = currentSession; 203 204 currentSession.setCurrentProcess( this ); 205 } 206 207 /** 208 * Method getNumProcessSlices returns the number of parallel slices or tasks allocated 209 * for this process execution. 210 * <p/> 211 * For MapReduce platforms, this is the same as the number of tasks for a given MapReduce job. 212 * 213 * @return an int 214 */ 215 public abstract int getNumProcessSlices(); 216 217 /** 218 * Method getCurrentSliceNum returns an integer representing which slice instance currently running. 219 * <p/> 220 * {@code 0} (zero) is the first slice instance. 221 * 222 * @return an int 223 */ 224 public abstract int getCurrentSliceNum(); 225 226 /** 227 * Method getProperty should be used to return configuration parameters from the underlying system. 228 * <p/> 229 * In the case of Hadoop, the current Configuration will be queried. 230 * 231 * @param key of type String 232 * @return an Object 233 */ 234 public abstract Object getProperty( String key ); 235 236 /** 237 * Method getStringProperty should be used to return configuration parameters from the underlying system. 238 * <p/> 239 * In the case of Hadoop, the current Configuration will be queried. 240 * 241 * @param key of type String, 242 * @return null if property is not set 243 */ 244 public String getStringProperty( String key ) 245 { 246 Object value = getProperty( key ); 247 248 if( value == null ) 249 return null; 250 251 return value.toString(); 252 } 253 254 /** 255 * Method getStringProperty should be used to return configuration parameters from the underlying system. 256 * <p/> 257 * In the case of Hadoop, the current Configuration will be queried. 258 * 259 * @param key of type String, 260 * @param defaultValue of type String, 261 * @return {@code defaultValue} if property is not set 262 */ 263 public String getStringProperty( String key, String defaultValue ) 264 { 265 Object value = getProperty( key ); 266 267 if( value == null ) 268 return defaultValue; 269 270 return value.toString(); 271 } 272 273 /** 274 * Method getIntegerProperty should be used to return configuration parameters from the underlying system. 275 * <p/> 276 * In the case of Hadoop, the current Configuration will be queried. 277 * 278 * @param key of type String, 279 * @return null if property is not set 280 */ 281 public Integer getIntegerProperty( String key ) 282 { 283 String value = getStringProperty( key ); 284 285 if( value == null || value.isEmpty() ) 286 return null; 287 288 return Integer.valueOf( value ); 289 } 290 291 /** 292 * Method getIntegerProperty should be used to return configuration parameters from the underlying system. 293 * <p/> 294 * In the case of Hadoop, the current Configuration will be queried. 295 * 296 * @param key of type String, 297 * @param defaultValue of type int, 298 * @return {@code defaultValue} if property is not set 299 */ 300 public int getIntegerProperty( String key, int defaultValue ) 301 { 302 String value = getStringProperty( key ); 303 304 if( value == null || value.isEmpty() ) 305 return defaultValue; 306 307 return Integer.valueOf( value ); 308 } 309 310 /** 311 * Method getBooleanProperty should be used to return configuration parameters from the underlying system. 312 * <p/> 313 * In the case of Hadoop, the current Configuration will be queried. 314 * 315 * @param key of type Boolean, null if property is not set 316 * @return an Object 317 */ 318 public Boolean getBooleanProperty( String key ) 319 { 320 String value = getStringProperty( key ); 321 322 if( value == null || value.isEmpty() ) 323 return null; 324 325 return Boolean.valueOf( value ); 326 } 327 328 /** 329 * Method getBooleanProperty should be used to return configuration parameters from the underlying system. 330 * <p/> 331 * In the case of Hadoop, the current Configuration will be queried. 332 * 333 * @param key of type String 334 * @param defaultValue of type boolean 335 * @return {@code defaultValue} if property is not set 336 */ 337 public boolean getBooleanProperty( String key, boolean defaultValue ) 338 { 339 String value = getStringProperty( key ); 340 341 if( value == null || value.isEmpty() ) 342 return defaultValue; 343 344 return Boolean.valueOf( value ); 345 } 346 347 /** 348 * Method getPropertyKeys returns an immutable collection of all available property key values. 349 * 350 * @return a Collection<String> 351 */ 352 public abstract Collection<String> getPropertyKeys(); 353 354 /** 355 * Method newInstance creates a new object instance from the given className argument delegating to any 356 * platform specific instantiation and configuration routines. 357 * 358 * @param className 359 * @return an instance of className 360 */ 361 public abstract Object newInstance( String className ); 362 363 /** 364 * Method keepAlive notifies the system that the current process is still alive. Use this method if a particular 365 * {@link cascading.operation.Operation} takes some moments to complete. Each system is different, so calling 366 * ping every few seconds to every minute or so would be best. 367 * <p/> 368 * This method will fail silently if the underlying mechanism to notify keepAlive status are not initialized. 369 */ 370 public abstract void keepAlive(); 371 372 /** 373 * Method increment is used to increment a custom counter. Counters must be of type Enum. The amount 374 * to increment must be a integer value. 375 * <p/> 376 * This method will fail if the underlying counter infrastructure is unavailable. See {@link #isCounterStatusInitialized()}. 377 * 378 * @param counter of type Enum 379 * @param amount of type int 380 */ 381 public abstract void increment( Enum counter, long amount ); 382 383 /** 384 * Method increment is used to increment a custom counter. The amount to increment must be a integer value. 385 * <p/> 386 * This method will fail if the underlying counter infrastructure is unavailable. See {@link #isCounterStatusInitialized()}. 387 * 388 * @param group of type String 389 * @param counter of type String 390 * @param amount of type int 391 */ 392 public abstract void increment( String group, String counter, long amount ); 393 394 /** 395 * Method setStatus is used to set the status of the current operation. 396 * <p/> 397 * This method will fail if the underlying counter infrastructure is unavailable. See {@link #isCounterStatusInitialized()}. 398 * 399 * @param status of type String 400 */ 401 public abstract void setStatus( String status ); 402 403 /** 404 * Method isCounterStatusInitialized returns true if it is safe to increment a counter or set a status. 405 * 406 * @return boolean 407 */ 408 public abstract boolean isCounterStatusInitialized(); 409 410 /** 411 * Method openTapForRead return a {@link cascading.tuple.TupleEntryIterator} for the given Tap instance. 412 * <p/> 413 * Note the returned iterator will return the same instance of {@link cascading.tuple.TupleEntry} on every call, 414 * thus a copy must be made of either the TupleEntry or the underlying {@code Tuple} instance if they are to be 415 * stored in a Collection. 416 * 417 * @param tap of type Tap 418 * @return TupleIterator 419 * @throws java.io.IOException when there is a failure opening the resource 420 */ 421 public abstract TupleEntryIterator openTapForRead( Tap tap ) throws IOException; 422 423 /** 424 * Method openTapForWrite returns a (@link TupleCollector} for the given Tap instance. 425 * 426 * @param tap of type Tap 427 * @return TupleCollector 428 * @throws java.io.IOException when there is a failure opening the resource 429 */ 430 public abstract TupleEntryCollector openTapForWrite( Tap tap ) throws IOException; 431 432 /** 433 * Method openTrapForWrite returns a (@link TupleCollector} for the given Tap instance. 434 * 435 * @param trap of type Tap 436 * @return TupleCollector 437 * @throws java.io.IOException when there is a failure opening the resource 438 */ 439 public abstract TupleEntryCollector openTrapForWrite( Tap trap ) throws IOException; 440 441 public abstract TupleEntryCollector openSystemIntermediateForWrite() throws IOException; 442 443 public abstract Config getConfigCopy(); 444 445 public abstract Config copyConfig( Config jobConf ); 446 447 public abstract Map<String, String> diffConfigIntoMap( Config defaultConfig, Config updatedConfig ); 448 449 public abstract Config mergeMapIntoConfig( Config defaultConfig, Map<String, String> map ); 450 }