001/* 002 * Copyright (c) 2016-2017 Chris K Wensel <chris@wensel.net>. All Rights Reserved. 003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 004 * 005 * Project and contact information: http://www.cascading.org/ 006 * 007 * This file is part of the Cascading project. 008 * 009 * Licensed under the Apache License, Version 2.0 (the "License"); 010 * you may not use this file except in compliance with the License. 011 * You may obtain a copy of the License at 012 * 013 * http://www.apache.org/licenses/LICENSE-2.0 014 * 015 * Unless required by applicable law or agreed to in writing, software 016 * distributed under the License is distributed on an "AS IS" BASIS, 017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 018 * See the License for the specific language governing permissions and 019 * limitations under the License. 020 */ 021 022package cascading.flow; 023 024import java.io.IOException; 025import java.util.Collection; 026import java.util.Collections; 027import java.util.HashMap; 028import java.util.Map; 029 030import cascading.flow.stream.duct.DuctException; 031import cascading.tap.Tap; 032import cascading.tap.type.FileType; 033import cascading.tuple.TupleEntryCollector; 034import cascading.tuple.TupleEntryIterator; 035 036/** 037 * FlowProcess implementations provide a call-back interface into the current computing system. Each 038 * {@link cascading.operation.Operation} is given a reference to a particular implementation, allowing it 039 * to get configuration properties, send a "keep alive" ping, or to set a counter value. 040 * <p> 041 * Depending on the underlying system, FlowProcess instances are not continuous across all operations in a {@link Flow}. 042 * Thus, a call to {@link #increment(Enum, long)} may start incrementing from zero if the operation making the call 043 * belongs to a subsequent 'job' or 'step' from any previous operations calling increment. 044 * <p> 045 * A FlowProcess is roughly a child of {@link FlowSession}. FlowSession is roughly one to one with a particular {@link Flow}. 046 * And every FlowSession will have one or more FlowProcesses. 047 * 048 * @see FlowSession 049 */ 050public abstract class FlowProcess<Config> 051 { 052 /** Field NULL is a noop implementation of FlowProcess. */ 053 @SuppressWarnings("StaticInitializerReferencesSubClass") 054 public static FlowProcess NULL = new NullFlowProcess(); 055 056 /** 057 * A noop implementation of a FlowProcess. 058 * 059 * @return A noop implementation of a FlowProcess. 060 */ 061 @SuppressWarnings("unchecked") 062 public static <C> FlowProcess<C> nullFlowProcess() 063 { 064 return (FlowProcess<C>) NULL; 065 } 066 067 public static class NullFlowProcess extends FlowProcess<Object> 068 { 069 protected NullFlowProcess() 070 { 071 } 072 073 @Override 074 public FlowProcess copyWith( Object object ) 075 { 076 return new NullFlowProcess(); 077 } 078 079 public Object getProperty( String key ) 080 { 081 return null; 082 } 083 084 @Override 085 public Collection<String> getPropertyKeys() 086 { 087 return Collections.EMPTY_SET; 088 } 089 090 @Override 091 public Object newInstance( String className ) 092 { 093 return null; 094 } 095 096 public void keepAlive() 097 { 098 } 099 100 public void increment( Enum counter, long amount ) 101 { 102 } 103 104 public void increment( String group, String counter, long amount ) 105 { 106 } 107 108 @Override 109 public long getCounterValue( Enum counter ) 110 { 111 return 0; 112 } 113 114 @Override 115 public long getCounterValue( String group, String counter ) 116 { 117 return 0; 118 } 119 120 public void setStatus( String status ) 121 { 122 } 123 124 @Override 125 public boolean isCounterStatusInitialized() 126 { 127 return true; 128 } 129 130 @Override 131 public int getNumProcessSlices() 132 { 133 return 1; 134 } 135 136 @Override 137 public int getCurrentSliceNum() 138 { 139 return 0; 140 } 141 142 public TupleEntryIterator openTapForRead( Tap tap ) throws IOException 143 { 144 return tap.openForRead( this ); 145 } 146 147 public TupleEntryCollector openTapForWrite( Tap tap ) throws IOException 148 { 149 return tap.openForWrite( this ); 150 } 151 152 @Override 153 public TupleEntryCollector openTrapForWrite( Tap trap ) throws IOException 154 { 155 return trap.openForWrite( this ); 156 } 157 158 @Override 159 public TupleEntryCollector openSystemIntermediateForWrite() throws IOException 160 { 161 return null; 162 } 163 164 @Override 165 public Object getConfig() 166 { 167 return null; 168 } 169 170 @Override 171 public Object getConfigCopy() 172 { 173 return null; 174 } 175 176 @Override 177 public Object copyConfig( Object config ) 178 { 179 return config; 180 } 181 182 @Override 183 public Map<String, String> diffConfigIntoMap( Object defaultConfig, Object updatedConfig ) 184 { 185 return null; 186 } 187 188 @Override 189 public Object mergeMapIntoConfig( Object defaultConfig, Map<String, String> map ) 190 { 191 return null; 192 } 193 } 194 195 public class FlowProcessContext 196 { 197 String sourcePath; 198 199 public String getSourcePath() 200 { 201 return FlowProcess.this.getStringProperty( FileType.CASCADING_SOURCE_PATH, sourcePath ); 202 } 203 204 public void setSourcePath( String sourcePath ) 205 { 206 this.sourcePath = sourcePath; 207 } 208 } 209 210 private FlowSession currentSession = FlowSession.NULL; 211 private Map<Tap, TupleEntryCollector> trapCollectors; 212 private FlowProcessContext flowProcessContext = new FlowProcessContext(); 213 214 protected FlowProcess() 215 { 216 } 217 218 protected FlowProcess( FlowSession currentSession ) 219 { 220 setCurrentSession( currentSession ); 221 } 222 223 /** 224 * Copy constructor. 225 * <p> 226 * Shares the underlying trap collector collection across copies to avoid a static collection. 227 * 228 * @param flowProcess 229 */ 230 protected FlowProcess( FlowProcess<Config> flowProcess ) 231 { 232 setCurrentSession( flowProcess.getCurrentSession() ); 233 234 // lazy initialize trap collectors collection and share across copies 235 this.trapCollectors = flowProcess.getTrapCollectors(); 236 this.flowProcessContext = flowProcess.getFlowProcessContext(); 237 } 238 239 public FlowProcessContext getFlowProcessContext() 240 { 241 return flowProcessContext; 242 } 243 244 public abstract FlowProcess<Config> copyWith( Config config ); 245 246 /** 247 * Method getID() returns the current 248 * 249 * @return of type String 250 */ 251 public String getID() 252 { 253 return getStringProperty( FlowStep.CASCADING_FLOW_STEP_ID ); 254 } 255 256 /** 257 * Method getCurrentSession returns the currentSession of this FlowProcess object. 258 * 259 * @return the currentSession (type FlowSession) of this FlowProcess object. 260 */ 261 public FlowSession getCurrentSession() 262 { 263 return currentSession; 264 } 265 266 /** 267 * Method setCurrentSession sets the currentSession of this FlowProcess object. 268 * 269 * @param currentSession the currentSession of this FlowProcess object. 270 */ 271 public void setCurrentSession( FlowSession currentSession ) 272 { 273 this.currentSession = currentSession; 274 275 currentSession.setCurrentProcess( this ); 276 } 277 278 /** 279 * Method getNumProcessSlices returns the number of parallel slices or tasks allocated 280 * for this process execution. 281 * <p> 282 * For MapReduce platforms, this is the same as the number of tasks for a given MapReduce job. 283 * 284 * @return an int 285 */ 286 public abstract int getNumProcessSlices(); 287 288 /** 289 * Method getCurrentSliceNum returns an integer representing which slice instance currently running. 290 * <p> 291 * {@code 0} (zero) is the first slice instance. 292 * 293 * @return an int 294 */ 295 public abstract int getCurrentSliceNum(); 296 297 /** 298 * Method getProperty should be used to return configuration parameters from the underlying system. 299 * <p> 300 * In the case of Hadoop, the current Configuration will be queried. 301 * 302 * @param key of type String 303 * @return an Object 304 */ 305 public abstract Object getProperty( String key ); 306 307 /** 308 * Method getStringProperty should be used to return configuration parameters from the underlying system. 309 * <p> 310 * In the case of Hadoop, the current Configuration will be queried. 311 * 312 * @param key of type String, 313 * @return null if property is not set 314 */ 315 public String getStringProperty( String key ) 316 { 317 Object value = getProperty( key ); 318 319 if( value == null ) 320 return null; 321 322 return value.toString(); 323 } 324 325 /** 326 * Method getStringProperty should be used to return configuration parameters from the underlying system. 327 * <p> 328 * In the case of Hadoop, the current Configuration will be queried. 329 * 330 * @param key of type String, 331 * @param defaultValue of type String, 332 * @return {@code defaultValue} if property is not set 333 */ 334 public String getStringProperty( String key, String defaultValue ) 335 { 336 Object value = getProperty( key ); 337 338 if( value == null ) 339 return defaultValue; 340 341 return value.toString(); 342 } 343 344 /** 345 * Method getIntegerProperty should be used to return configuration parameters from the underlying system. 346 * <p> 347 * In the case of Hadoop, the current Configuration will be queried. 348 * 349 * @param key of type String, 350 * @return null if property is not set 351 */ 352 public Integer getIntegerProperty( String key ) 353 { 354 String value = getStringProperty( key ); 355 356 if( value == null || value.isEmpty() ) 357 return null; 358 359 return Integer.valueOf( value ); 360 } 361 362 /** 363 * Method getIntegerProperty should be used to return configuration parameters from the underlying system. 364 * <p> 365 * In the case of Hadoop, the current Configuration will be queried. 366 * 367 * @param key of type String, 368 * @param defaultValue of type int, 369 * @return {@code defaultValue} if property is not set 370 */ 371 public int getIntegerProperty( String key, int defaultValue ) 372 { 373 String value = getStringProperty( key ); 374 375 if( value == null || value.isEmpty() ) 376 return defaultValue; 377 378 return Integer.valueOf( value ); 379 } 380 381 /** 382 * Method getBooleanProperty should be used to return configuration parameters from the underlying system. 383 * <p> 384 * In the case of Hadoop, the current Configuration will be queried. 385 * 386 * @param key of type Boolean, null if property is not set 387 * @return an Object 388 */ 389 public Boolean getBooleanProperty( String key ) 390 { 391 String value = getStringProperty( key ); 392 393 if( value == null || value.isEmpty() ) 394 return null; 395 396 return Boolean.valueOf( value ); 397 } 398 399 /** 400 * Method getBooleanProperty should be used to return configuration parameters from the underlying system. 401 * <p> 402 * In the case of Hadoop, the current Configuration will be queried. 403 * 404 * @param key of type String 405 * @param defaultValue of type boolean 406 * @return {@code defaultValue} if property is not set 407 */ 408 public boolean getBooleanProperty( String key, boolean defaultValue ) 409 { 410 String value = getStringProperty( key ); 411 412 if( value == null || value.isEmpty() ) 413 return defaultValue; 414 415 return Boolean.valueOf( value ); 416 } 417 418 /** 419 * Method getPropertyKeys returns an immutable collection of all available property key values. 420 * 421 * @return a Collection 422 */ 423 public abstract Collection<String> getPropertyKeys(); 424 425 /** 426 * Method newInstance creates a new object instance from the given className argument delegating to any 427 * platform specific instantiation and configuration routines. 428 * 429 * @param className 430 * @return an instance of className 431 */ 432 public abstract Object newInstance( String className ); 433 434 /** 435 * Method keepAlive notifies the system that the current process is still alive. Use this method if a particular 436 * {@link cascading.operation.Operation} takes some moments to complete. Each system is different, so calling 437 * ping every few seconds to every minute or so would be best. 438 * <p> 439 * This method will fail silently if the underlying mechanism to notify keepAlive status are not initialized. 440 */ 441 public abstract void keepAlive(); 442 443 /** 444 * Method increment is used to increment a custom counter. Counters must be of type Enum. The amount 445 * to increment must be a integer value. 446 * <p> 447 * This method will fail if the underlying counter infrastructure is unavailable. See {@link #isCounterStatusInitialized()}. 448 * 449 * @param counter of type Enum 450 * @param amount of type int 451 */ 452 public abstract void increment( Enum counter, long amount ); 453 454 /** 455 * Method increment is used to increment a custom counter. The amount to increment must be a integer value. 456 * <p> 457 * This method will fail if the underlying counter infrastructure is unavailable. See {@link #isCounterStatusInitialized()}. 458 * 459 * @param group of type String 460 * @param counter of type String 461 * @param amount of type int 462 */ 463 public abstract void increment( String group, String counter, long amount ); 464 465 /** 466 * Method getCounterValue is used to retrieve a counter value. 467 * <p> 468 * This method will fail if the underlying counter infrastructure is unavailable. See {@link #isCounterStatusInitialized()}. 469 * 470 * @param counter of type Enum 471 */ 472 public abstract long getCounterValue( Enum counter ); 473 474 /** 475 * Method getCounterValue is used to retrieve a counter value. 476 * <p> 477 * This method will fail if the underlying counter infrastructure is unavailable. See {@link #isCounterStatusInitialized()}. 478 * 479 * @param group of type String 480 * @param counter of type String 481 */ 482 public abstract long getCounterValue( String group, String counter ); 483 484 /** 485 * Method setStatus is used to set the status of the current operation. 486 * <p> 487 * This method will fail if the underlying counter infrastructure is unavailable. See {@link #isCounterStatusInitialized()}. 488 * 489 * @param status of type String 490 */ 491 public abstract void setStatus( String status ); 492 493 /** 494 * Method isCounterStatusInitialized returns true if it is safe to increment a counter or set a status. 495 * 496 * @return boolean 497 */ 498 public abstract boolean isCounterStatusInitialized(); 499 500 /** 501 * Method openTapForRead return a {@link cascading.tuple.TupleEntryIterator} for the given Tap instance. 502 * <p> 503 * Note the returned iterator will return the same instance of {@link cascading.tuple.TupleEntry} on every call, 504 * thus a copy must be made of either the TupleEntry or the underlying {@code Tuple} instance if they are to be 505 * stored in a Collection. 506 * 507 * @param tap of type Tap 508 * @return TupleIterator 509 * @throws java.io.IOException when there is a failure opening the resource 510 */ 511 public abstract TupleEntryIterator openTapForRead( Tap tap ) throws IOException; 512 513 /** 514 * Method openTapForWrite returns a (@link TupleCollector} for the given Tap instance. 515 * 516 * @param tap of type Tap 517 * @return TupleCollector 518 * @throws java.io.IOException when there is a failure opening the resource 519 */ 520 public abstract TupleEntryCollector openTapForWrite( Tap tap ) throws IOException; 521 522 /** 523 * Method openTrapForWrite returns a (@link TupleCollector} for the given Tap instance. 524 * 525 * @param trap of type Tap 526 * @return TupleCollector 527 * @throws java.io.IOException when there is a failure opening the resource 528 */ 529 public abstract TupleEntryCollector openTrapForWrite( Tap trap ) throws IOException; 530 531 public abstract TupleEntryCollector openSystemIntermediateForWrite() throws IOException; 532 533 /** 534 * Method getConfig returns the actual instance of the underlying configuration instance. 535 * <p> 536 * This instance should not be modified or cached, see {@link #getConfigCopy()} for a modifiable instance. 537 * 538 * @return an instance of the configuration 539 */ 540 public abstract Config getConfig(); 541 542 public abstract Config getConfigCopy(); 543 544 public abstract <C> C copyConfig( C config ); 545 546 public abstract <C> Map<String, String> diffConfigIntoMap( C defaultConfig, C updatedConfig ); 547 548 public abstract Config mergeMapIntoConfig( Config defaultConfig, Map<String, String> map ); 549 550 /** 551 * Method getTrapCollectorFor will return a new {@link TupleEntryCollector} if one hasn't previously 552 * been created for the given trap Tap. 553 * 554 * @param trap 555 * @return TupleEntryCollector 556 */ 557 public TupleEntryCollector getTrapCollectorFor( Tap trap ) 558 { 559 Map<Tap, TupleEntryCollector> trapCollectors = getTrapCollectors(); 560 561 TupleEntryCollector trapCollector = trapCollectors.get( trap ); 562 563 if( trapCollector == null ) 564 { 565 try 566 { 567 trapCollector = openTrapForWrite( trap ); 568 trapCollectors.put( trap, trapCollector ); 569 } 570 catch( IOException exception ) 571 { 572 throw new DuctException( exception ); 573 } 574 } 575 576 return trapCollector; 577 } 578 579 protected synchronized Map<Tap, TupleEntryCollector> getTrapCollectors() 580 { 581 if( trapCollectors == null ) 582 trapCollectors = Collections.synchronizedMap( new HashMap<Tap, TupleEntryCollector>() ); 583 584 return trapCollectors; 585 } 586 587 public synchronized void closeTrapCollectors() 588 { 589 if( trapCollectors == null ) 590 return; 591 592 for( TupleEntryCollector trapCollector : trapCollectors.values() ) 593 { 594 try 595 { 596 trapCollector.close(); 597 } 598 catch( Exception exception ) 599 { 600 // do nothing 601 } 602 } 603 604 trapCollectors.clear(); 605 } 606 }