001/* 002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.flow; 022 023import java.io.IOException; 024import java.util.Collection; 025import java.util.Collections; 026import java.util.HashMap; 027import java.util.Map; 028 029import cascading.flow.stream.duct.DuctException; 030import cascading.tap.Tap; 031import cascading.tuple.TupleEntryCollector; 032import cascading.tuple.TupleEntryIterator; 033 034/** 035 * FlowProcess implementations provide a call-back interface into the current computing system. Each 036 * {@link cascading.operation.Operation} is given a reference to a particular implementation, allowing it 037 * to get configuration properties, send a "keep alive" ping, or to set a counter value. 038 * <p/> 039 * Depending on the underlying system, FlowProcess instances are not continuous across all operations in a {@link Flow}. 040 * Thus, a call to {@link #increment(Enum, long)} may start incrementing from zero if the operation making the call 041 * belongs to a subsequent 'job' or 'step' from any previous operations calling increment. 042 * <p/> 043 * A FlowProcess is roughly a child of {@link FlowSession}. FlowSession is roughly one to one with a particular {@link Flow}. 044 * And every FlowSession will have one or more FlowProcesses. 045 * 046 * @see FlowSession 047 */ 048public abstract class FlowProcess<Config> 049 { 050 /** Field NULL is a noop implementation of FlowSession. */ 051 public static FlowProcess NULL = new NullFlowProcess(); 052 053 public static class NullFlowProcess extends FlowProcess<Object> 054 { 055 protected NullFlowProcess() 056 { 057 } 058 059 @Override 060 public FlowProcess copyWith( Object object ) 061 { 062 return new NullFlowProcess(); 063 } 064 065 public Object getProperty( String key ) 066 { 067 return null; 068 } 069 070 @Override 071 public Collection<String> getPropertyKeys() 072 { 073 return Collections.EMPTY_SET; 074 } 075 076 @Override 077 public Object newInstance( String className ) 078 { 079 return null; 080 } 081 082 public void keepAlive() 083 { 084 } 085 086 public void increment( Enum counter, long amount ) 087 { 088 } 089 090 public void increment( String group, String counter, long amount ) 091 { 092 } 093 094 @Override 095 public long getCounterValue( Enum counter ) 096 { 097 return 0; 098 } 099 100 @Override 101 public long getCounterValue( String group, String counter ) 102 { 103 return 0; 104 } 105 106 public void setStatus( String status ) 107 { 108 } 109 110 @Override 111 public boolean isCounterStatusInitialized() 112 { 113 return true; 114 } 115 116 @Override 117 public int getNumProcessSlices() 118 { 119 return 1; 120 } 121 122 @Override 123 public int getCurrentSliceNum() 124 { 125 return 0; 126 } 127 128 public TupleEntryIterator openTapForRead( Tap tap ) throws IOException 129 { 130 return tap.openForRead( this ); 131 } 132 133 public TupleEntryCollector openTapForWrite( Tap tap ) throws IOException 134 { 135 return tap.openForWrite( this ); 136 } 137 138 @Override 139 public TupleEntryCollector openTrapForWrite( Tap trap ) throws IOException 140 { 141 return trap.openForWrite( this ); 142 } 143 144 @Override 145 public TupleEntryCollector openSystemIntermediateForWrite() throws IOException 146 { 147 return null; 148 } 149 150 @Override 151 public Object getConfig() 152 { 153 return null; 154 } 155 156 @Override 157 public Object getConfigCopy() 158 { 159 return null; 160 } 161 162 @Override 163 public Object copyConfig( Object config ) 164 { 165 return config; 166 } 167 168 @Override 169 public Map<String, String> diffConfigIntoMap( Object defaultConfig, Object updatedConfig ) 170 { 171 return null; 172 } 173 174 @Override 175 public Object mergeMapIntoConfig( Object defaultConfig, Map<String, String> map ) 176 { 177 return null; 178 } 179 } 180 181 private FlowSession currentSession = FlowSession.NULL; 182 private Map<Tap, TupleEntryCollector> trapCollectors; 183 184 protected FlowProcess() 185 { 186 } 187 188 protected FlowProcess( FlowSession currentSession ) 189 { 190 setCurrentSession( currentSession ); 191 } 192 193 /** 194 * Copy constructor. 195 * <p/> 196 * Shares the underlying trap collector collection across copies to avoid a static collection. 197 * 198 * @param flowProcess 199 */ 200 protected FlowProcess( FlowProcess<Config> flowProcess ) 201 { 202 setCurrentSession( flowProcess.getCurrentSession() ); 203 204 // lazy initialize trap collectors collection and share across copies 205 this.trapCollectors = flowProcess.getTrapCollectors(); 206 } 207 208 public abstract FlowProcess<Config> copyWith( Config config ); 209 210 /** 211 * Method getID() returns the current 212 * 213 * @return of type String 214 */ 215 public String getID() 216 { 217 return getStringProperty( FlowStep.CASCADING_FLOW_STEP_ID ); 218 } 219 220 /** 221 * Method getCurrentSession returns the currentSession of this FlowProcess object. 222 * 223 * @return the currentSession (type FlowSession) of this FlowProcess object. 224 */ 225 public FlowSession getCurrentSession() 226 { 227 return currentSession; 228 } 229 230 /** 231 * Method setCurrentSession sets the currentSession of this FlowProcess object. 232 * 233 * @param currentSession the currentSession of this FlowProcess object. 234 */ 235 public void setCurrentSession( FlowSession currentSession ) 236 { 237 this.currentSession = currentSession; 238 239 currentSession.setCurrentProcess( this ); 240 } 241 242 /** 243 * Method getNumProcessSlices returns the number of parallel slices or tasks allocated 244 * for this process execution. 245 * <p/> 246 * For MapReduce platforms, this is the same as the number of tasks for a given MapReduce job. 247 * 248 * @return an int 249 */ 250 public abstract int getNumProcessSlices(); 251 252 /** 253 * Method getCurrentSliceNum returns an integer representing which slice instance currently running. 254 * <p/> 255 * {@code 0} (zero) is the first slice instance. 256 * 257 * @return an int 258 */ 259 public abstract int getCurrentSliceNum(); 260 261 /** 262 * Method getProperty should be used to return configuration parameters from the underlying system. 263 * <p/> 264 * In the case of Hadoop, the current Configuration will be queried. 265 * 266 * @param key of type String 267 * @return an Object 268 */ 269 public abstract Object getProperty( String key ); 270 271 /** 272 * Method getStringProperty should be used to return configuration parameters from the underlying system. 273 * <p/> 274 * In the case of Hadoop, the current Configuration will be queried. 275 * 276 * @param key of type String, 277 * @return null if property is not set 278 */ 279 public String getStringProperty( String key ) 280 { 281 Object value = getProperty( key ); 282 283 if( value == null ) 284 return null; 285 286 return value.toString(); 287 } 288 289 /** 290 * Method getStringProperty should be used to return configuration parameters from the underlying system. 291 * <p/> 292 * In the case of Hadoop, the current Configuration will be queried. 293 * 294 * @param key of type String, 295 * @param defaultValue of type String, 296 * @return {@code defaultValue} if property is not set 297 */ 298 public String getStringProperty( String key, String defaultValue ) 299 { 300 Object value = getProperty( key ); 301 302 if( value == null ) 303 return defaultValue; 304 305 return value.toString(); 306 } 307 308 /** 309 * Method getIntegerProperty should be used to return configuration parameters from the underlying system. 310 * <p/> 311 * In the case of Hadoop, the current Configuration will be queried. 312 * 313 * @param key of type String, 314 * @return null if property is not set 315 */ 316 public Integer getIntegerProperty( String key ) 317 { 318 String value = getStringProperty( key ); 319 320 if( value == null || value.isEmpty() ) 321 return null; 322 323 return Integer.valueOf( value ); 324 } 325 326 /** 327 * Method getIntegerProperty should be used to return configuration parameters from the underlying system. 328 * <p/> 329 * In the case of Hadoop, the current Configuration will be queried. 330 * 331 * @param key of type String, 332 * @param defaultValue of type int, 333 * @return {@code defaultValue} if property is not set 334 */ 335 public int getIntegerProperty( String key, int defaultValue ) 336 { 337 String value = getStringProperty( key ); 338 339 if( value == null || value.isEmpty() ) 340 return defaultValue; 341 342 return Integer.valueOf( value ); 343 } 344 345 /** 346 * Method getBooleanProperty should be used to return configuration parameters from the underlying system. 347 * <p/> 348 * In the case of Hadoop, the current Configuration will be queried. 349 * 350 * @param key of type Boolean, null if property is not set 351 * @return an Object 352 */ 353 public Boolean getBooleanProperty( String key ) 354 { 355 String value = getStringProperty( key ); 356 357 if( value == null || value.isEmpty() ) 358 return null; 359 360 return Boolean.valueOf( value ); 361 } 362 363 /** 364 * Method getBooleanProperty should be used to return configuration parameters from the underlying system. 365 * <p/> 366 * In the case of Hadoop, the current Configuration will be queried. 367 * 368 * @param key of type String 369 * @param defaultValue of type boolean 370 * @return {@code defaultValue} if property is not set 371 */ 372 public boolean getBooleanProperty( String key, boolean defaultValue ) 373 { 374 String value = getStringProperty( key ); 375 376 if( value == null || value.isEmpty() ) 377 return defaultValue; 378 379 return Boolean.valueOf( value ); 380 } 381 382 /** 383 * Method getPropertyKeys returns an immutable collection of all available property key values. 384 * 385 * @return a Collection<String> 386 */ 387 public abstract Collection<String> getPropertyKeys(); 388 389 /** 390 * Method newInstance creates a new object instance from the given className argument delegating to any 391 * platform specific instantiation and configuration routines. 392 * 393 * @param className 394 * @return an instance of className 395 */ 396 public abstract Object newInstance( String className ); 397 398 /** 399 * Method keepAlive notifies the system that the current process is still alive. Use this method if a particular 400 * {@link cascading.operation.Operation} takes some moments to complete. Each system is different, so calling 401 * ping every few seconds to every minute or so would be best. 402 * <p/> 403 * This method will fail silently if the underlying mechanism to notify keepAlive status are not initialized. 404 */ 405 public abstract void keepAlive(); 406 407 /** 408 * Method increment is used to increment a custom counter. Counters must be of type Enum. The amount 409 * to increment must be a integer value. 410 * <p/> 411 * This method will fail if the underlying counter infrastructure is unavailable. See {@link #isCounterStatusInitialized()}. 412 * 413 * @param counter of type Enum 414 * @param amount of type int 415 */ 416 public abstract void increment( Enum counter, long amount ); 417 418 /** 419 * Method increment is used to increment a custom counter. The amount to increment must be a integer value. 420 * <p/> 421 * This method will fail if the underlying counter infrastructure is unavailable. See {@link #isCounterStatusInitialized()}. 422 * 423 * @param group of type String 424 * @param counter of type String 425 * @param amount of type int 426 */ 427 public abstract void increment( String group, String counter, long amount ); 428 429 /** 430 * Method getCounterValue is used to retrieve a counter value. 431 * <p/> 432 * This method will fail if the underlying counter infrastructure is unavailable. See {@link #isCounterStatusInitialized()}. 433 * 434 * @param counter of type Enum 435 */ 436 public abstract long getCounterValue( Enum counter ); 437 438 /** 439 * Method getCounterValue is used to retrieve a counter value. 440 * <p/> 441 * This method will fail if the underlying counter infrastructure is unavailable. See {@link #isCounterStatusInitialized()}. 442 * 443 * @param group of type String 444 * @param counter of type String 445 */ 446 public abstract long getCounterValue( String group, String counter ); 447 448 /** 449 * Method setStatus is used to set the status of the current operation. 450 * <p/> 451 * This method will fail if the underlying counter infrastructure is unavailable. See {@link #isCounterStatusInitialized()}. 452 * 453 * @param status of type String 454 */ 455 public abstract void setStatus( String status ); 456 457 /** 458 * Method isCounterStatusInitialized returns true if it is safe to increment a counter or set a status. 459 * 460 * @return boolean 461 */ 462 public abstract boolean isCounterStatusInitialized(); 463 464 /** 465 * Method openTapForRead return a {@link cascading.tuple.TupleEntryIterator} for the given Tap instance. 466 * <p/> 467 * Note the returned iterator will return the same instance of {@link cascading.tuple.TupleEntry} on every call, 468 * thus a copy must be made of either the TupleEntry or the underlying {@code Tuple} instance if they are to be 469 * stored in a Collection. 470 * 471 * @param tap of type Tap 472 * @return TupleIterator 473 * @throws java.io.IOException when there is a failure opening the resource 474 */ 475 public abstract TupleEntryIterator openTapForRead( Tap tap ) throws IOException; 476 477 /** 478 * Method openTapForWrite returns a (@link TupleCollector} for the given Tap instance. 479 * 480 * @param tap of type Tap 481 * @return TupleCollector 482 * @throws java.io.IOException when there is a failure opening the resource 483 */ 484 public abstract TupleEntryCollector openTapForWrite( Tap tap ) throws IOException; 485 486 /** 487 * Method openTrapForWrite returns a (@link TupleCollector} for the given Tap instance. 488 * 489 * @param trap of type Tap 490 * @return TupleCollector 491 * @throws java.io.IOException when there is a failure opening the resource 492 */ 493 public abstract TupleEntryCollector openTrapForWrite( Tap trap ) throws IOException; 494 495 public abstract TupleEntryCollector openSystemIntermediateForWrite() throws IOException; 496 497 /** 498 * Method getConfig returns the actual instance of the underlying configuration instance. 499 * <p/> 500 * This instance should not be modified or cached, see {@link #getConfigCopy()} for a modifiable instance. 501 * 502 * @return an instance of the configuration 503 */ 504 public abstract Config getConfig(); 505 506 public abstract Config getConfigCopy(); 507 508 public abstract <C> C copyConfig( C config ); 509 510 public abstract <C> Map<String, String> diffConfigIntoMap( C defaultConfig, C updatedConfig ); 511 512 public abstract Config mergeMapIntoConfig( Config defaultConfig, Map<String, String> map ); 513 514 /** 515 * Method getTrapCollectorFor will return a new {@link TupleEntryCollector} if one hasn't previously 516 * been created for the given trap Tap. 517 * 518 * @param trap 519 * @return TupleEntryCollector 520 */ 521 public TupleEntryCollector getTrapCollectorFor( Tap trap ) 522 { 523 Map<Tap, TupleEntryCollector> trapCollectors = getTrapCollectors(); 524 525 TupleEntryCollector trapCollector = trapCollectors.get( trap ); 526 527 if( trapCollector == null ) 528 { 529 try 530 { 531 trapCollector = openTrapForWrite( trap ); 532 trapCollectors.put( trap, trapCollector ); 533 } 534 catch( IOException exception ) 535 { 536 throw new DuctException( exception ); 537 } 538 } 539 540 return trapCollector; 541 } 542 543 protected synchronized Map<Tap, TupleEntryCollector> getTrapCollectors() 544 { 545 if( trapCollectors == null ) 546 trapCollectors = Collections.synchronizedMap( new HashMap<Tap, TupleEntryCollector>() ); 547 548 return trapCollectors; 549 } 550 551 public synchronized void closeTrapCollectors() 552 { 553 if( trapCollectors == null ) 554 return; 555 556 for( TupleEntryCollector trapCollector : trapCollectors.values() ) 557 { 558 try 559 { 560 trapCollector.close(); 561 } 562 catch( Exception exception ) 563 { 564 // do nothing 565 } 566 } 567 568 trapCollectors.clear(); 569 } 570 }