001/* 002 * Copyright (c) 2016-2017 Chris K Wensel <chris@wensel.net>. All Rights Reserved. 003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 004 * 005 * Project and contact information: http://www.cascading.org/ 006 * 007 * This file is part of the Cascading project. 008 * 009 * Licensed under the Apache License, Version 2.0 (the "License"); 010 * you may not use this file except in compliance with the License. 011 * You may obtain a copy of the License at 012 * 013 * http://www.apache.org/licenses/LICENSE-2.0 014 * 015 * Unless required by applicable law or agreed to in writing, software 016 * distributed under the License is distributed on an "AS IS" BASIS, 017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 018 * See the License for the specific language governing permissions and 019 * limitations under the License. 020 */ 021 022package cascading.flow.process; 023 024import java.beans.ConstructorProperties; 025import java.io.IOException; 026import java.util.Arrays; 027import java.util.Collection; 028import java.util.HashMap; 029import java.util.Map; 030import java.util.Properties; 031 032import cascading.flow.BaseFlow; 033import cascading.flow.FlowException; 034import cascading.flow.FlowProcess; 035import cascading.flow.planner.PlatformInfo; 036import cascading.scheme.Scheme; 037import cascading.scheme.SinkCall; 038import cascading.scheme.SourceCall; 039import cascading.stats.process.ProcessFlowStats; 040import cascading.tap.Tap; 041import cascading.tuple.TupleEntryCollector; 042import cascading.tuple.TupleEntryIterator; 043import cascading.util.Version; 044import riffle.process.scheduler.ProcessException; 045import riffle.process.scheduler.ProcessWrapper; 046 047/** 048 * Class ProcessFlow is a {@link cascading.flow.Flow} subclass that supports custom Riffle jobs. 049 * <p> 050 * Use this class to allow custom Riffle jobs to participate in the {@link cascading.cascade.Cascade} scheduler. If 051 * other Flow instances in the Cascade share resources with this Flow instance, all participants will be scheduled 052 * according to their dependencies (topologically). 053 * <p> 054 * Currently {@link cascading.flow.FlowListener}s are supported but the 055 * {@link cascading.flow.FlowListener#onThrowable(cascading.flow.Flow, Throwable)} event is not. 056 */ 057public class ProcessFlow<Process, Config> extends BaseFlow<Config> 058 { 059 /** Field process */ 060 private final Process process; 061 /** Field processWrapper */ 062 private final ProcessWrapper processWrapper; 063 /** Configuration object */ 064 private Config config; 065 066 private boolean isStarted = false; // only used for event handling 067 068 /** flow related properties */ 069 private Map<Object, Object> properties; 070 071 /** 072 * Constructor ProcessFlow creates a new ProcessFlow instance. 073 * 074 * @param name of type String 075 * @param process of type JobConf 076 */ 077 @ConstructorProperties({"name", "process"}) 078 public ProcessFlow( String name, Process process ) 079 { 080 this( new Properties(), name, process ); 081 } 082 083 /** 084 * Constructor ProcessFlow creates a new ProcessFlow instance. 085 * 086 * @param properties of type Map 087 * @param name of type String 088 * @param process of type P 089 */ 090 @ConstructorProperties({"properties", "name", "process"}) 091 public ProcessFlow( Map<Object, Object> properties, String name, Process process ) 092 { 093 this( properties, name, process, null ); 094 } 095 096 /** 097 * Constructor ProcessFlow creates a new ProcessFlow instance. 098 * 099 * @param properties of type Map 100 * @param name of type String 101 * @param process of type P 102 * @param flowDescriptor pf type LinkedHashMap 103 */ 104 @ConstructorProperties({"properties", "name", "process", "flowDescriptor"}) 105 public ProcessFlow( Map<Object, Object> properties, String name, Process process, Map<String, String> flowDescriptor ) 106 { 107 super( new PlatformInfo( "process", "Xplenty, Inc.", Version.getRelease() ), properties, null, name, flowDescriptor ); 108 this.process = process; 109 this.processWrapper = new ProcessWrapper( this.process ); 110 this.properties = properties; 111 112 setName( name ); 113 setTapFromProcess(); 114 initProcessConfig(); 115 initStats(); 116 } 117 118 private void initStats() 119 { 120 try 121 { 122 if( processWrapper.hasCounters() ) 123 { 124 flowStats = new ProcessFlowStats( this, getFlowSession().getCascadingServices().createClientState( getID() ), processWrapper ); 125 flowStats.prepare(); 126 flowStats.markPending(); 127 } 128 else 129 { 130 flowStats = createPrepareFlowStats(); 131 } 132 } 133 catch( ProcessException exception ) 134 { 135 throw new FlowException( exception ); 136 } 137 } 138 139 /** 140 * Method setTapFromProcess build {@link Tap} instance for the give process incoming and outgoing dependencies. 141 * <p> 142 * This method may be called repeatedly to re-configure the source and sink taps. 143 */ 144 public void setTapFromProcess() 145 { 146 setSources( createSources( this.processWrapper ) ); 147 setSinks( createSinks( this.processWrapper ) ); 148 setTraps( createTraps( this.processWrapper ) ); 149 } 150 151 /** 152 * Method getProcess returns the process of this ProcessFlow object. 153 * 154 * @return the process (type P) of this ProcessFlow object. 155 */ 156 public Process getProcess() 157 { 158 return process; 159 } 160 161 @Override 162 protected void initConfig( Map<Object, Object> properties, Config parentConfig ) 163 { 164 165 } 166 167 private void initProcessConfig() 168 { 169 try 170 { 171 config = (Config) processWrapper.getConfiguration(); 172 } 173 catch( ProcessException exception ) 174 { 175 if( exception.getCause() instanceof RuntimeException ) 176 throw (RuntimeException) exception.getCause(); 177 178 throw new FlowException( "could not get configuration from process", exception.getCause() ); 179 } 180 } 181 182 @Override 183 protected void setConfigProperty( Config properties, Object key, Object value ) 184 { 185 186 } 187 188 @Override 189 protected Config newConfig( Config defaultConfig ) 190 { 191 return null; 192 } 193 194 @Override 195 public Config getConfig() 196 { 197 return config; 198 } 199 200 @Override 201 public Config getConfigCopy() 202 { 203 return null; 204 } 205 206 @Override 207 public Map<Object, Object> getConfigAsProperties() 208 { 209 Map<Object, Object> props = new HashMap<>(); 210 211 if( properties != null ) 212 props.putAll( this.properties ); 213 214 return props; 215 } 216 217 @Override 218 public String getProperty( String key ) 219 { 220 return null; 221 } 222 223 @Override 224 public FlowProcess<Config> getFlowProcess() 225 { 226 return FlowProcess.NULL; 227 } 228 229 @Override 230 public boolean stepsAreLocal() 231 { 232 return true; 233 } 234 235 @Override 236 public void prepare() 237 { 238 try 239 { 240 processWrapper.prepare(); 241 } 242 catch( Throwable throwable ) 243 { 244 if( throwable.getCause() instanceof RuntimeException ) 245 throw (RuntimeException) throwable.getCause(); 246 247 throw new FlowException( "could not call prepare on process", throwable.getCause() ); 248 } 249 } 250 251 @Override 252 public void start() 253 { 254 try 255 { 256 flowStats.markPending(); 257 fireOnStarting(); 258 processWrapper.start(); 259 flowStats.markStarted(); 260 isStarted = true; 261 } 262 catch( Throwable throwable ) 263 { 264 fireOnThrowable( throwable ); 265 266 if( throwable.getCause() instanceof RuntimeException ) 267 throw (RuntimeException) throwable.getCause(); 268 269 throw new FlowException( "could not call start on process", throwable.getCause() ); 270 } 271 } 272 273 @Override 274 protected void internalStart() 275 { 276 try 277 { 278 deleteSinksIfReplace(); 279 deleteTrapsIfReplace(); 280 deleteCheckpointsIfReplace(); 281 } 282 catch( IOException exception ) 283 { 284 throw new FlowException( "unable to delete sinks", exception ); 285 } 286 } 287 288 @Override 289 public void stop() 290 { 291 try 292 { 293 fireOnStopping(); 294 processWrapper.stop(); 295 296 if( !flowStats.isFinished() ) 297 flowStats.markStopped(); 298 } 299 catch( Throwable throwable ) 300 { 301 flowStats.markFailed( throwable ); 302 fireOnThrowable( throwable ); 303 304 if( throwable.getCause() instanceof RuntimeException ) 305 throw (RuntimeException) throwable.getCause(); 306 307 throw new FlowException( "could not call stop on process", throwable.getCause() ); 308 } 309 } 310 311 @Override 312 protected void internalClean( boolean stop ) 313 { 314 315 } 316 317 @Override 318 public void complete() 319 { 320 try 321 { 322 if( !isStarted ) 323 { 324 flowStats.markPending(); 325 fireOnStarting(); 326 isStarted = true; 327 flowStats.markStarted(); 328 } 329 330 flowStats.markRunning(); 331 processWrapper.complete(); 332 fireOnCompleted(); 333 flowStats.markSuccessful(); 334 } 335 catch( Throwable throwable ) 336 { 337 flowStats.markFailed( throwable ); 338 fireOnThrowable( throwable ); 339 340 if( throwable.getCause() instanceof RuntimeException ) 341 throw (RuntimeException) throwable.getCause(); 342 343 throw new FlowException( "could not call complete on process", throwable.getCause() ); 344 } 345 } 346 347 @Override 348 public void cleanup() 349 { 350 try 351 { 352 processWrapper.cleanup(); 353 } 354 catch( Throwable throwable ) 355 { 356 if( throwable.getCause() instanceof RuntimeException ) 357 throw (RuntimeException) throwable.getCause(); 358 359 throw new FlowException( "could not call cleanup on process", throwable.getCause() ); 360 } 361 } 362 363 @Override 364 protected int getMaxNumParallelSteps() 365 { 366 return 1; 367 } 368 369 @Override 370 protected void internalShutdown() 371 { 372 373 } 374 375 private Map<String, Tap> createSources( ProcessWrapper processParent ) 376 { 377 try 378 { 379 return makeTapMap( processParent.getDependencyIncoming() ); 380 } 381 catch( ProcessException exception ) 382 { 383 if( exception.getCause() instanceof RuntimeException ) 384 throw (RuntimeException) exception.getCause(); 385 386 throw new FlowException( "could not get process incoming dependency", exception.getCause() ); 387 } 388 } 389 390 private Map<String, Tap> createSinks( ProcessWrapper processParent ) 391 { 392 try 393 { 394 return makeTapMap( processParent.getDependencyOutgoing() ); 395 } 396 catch( ProcessException exception ) 397 { 398 if( exception.getCause() instanceof RuntimeException ) 399 throw (RuntimeException) exception.getCause(); 400 401 throw new FlowException( "could not get process outgoing dependency", exception.getCause() ); 402 } 403 } 404 405 private Map<String, Tap> makeTapMap( Object resource ) 406 { 407 Collection paths = makeCollection( resource ); 408 409 Map<String, Tap> taps = new HashMap<String, Tap>(); 410 411 for( Object path : paths ) 412 { 413 if( path instanceof Tap && ( (Tap) path ).getIdentifier() != null ) 414 taps.put( ( (Tap) path ).getIdentifier(), (Tap) path ); 415 else 416 taps.put( path.toString(), new ProcessTap( new NullScheme(), path.toString() ) ); 417 } 418 419 return taps; 420 } 421 422 private Collection makeCollection( Object resource ) 423 { 424 if( resource instanceof Collection ) 425 return (Collection) resource; 426 else if( resource instanceof Object[] ) 427 return Arrays.asList( (Object[]) resource ); 428 else 429 return Arrays.asList( resource ); 430 } 431 432 private Map<String, Tap> createTraps( ProcessWrapper processParent ) 433 { 434 return new HashMap<String, Tap>(); 435 } 436 437 @Override 438 public String toString() 439 { 440 return getName() + ":" + process; 441 } 442 443 static class NullScheme extends Scheme 444 { 445 public void sourceConfInit( FlowProcess flowProcess, Tap tap, Object conf ) 446 { 447 } 448 449 public void sinkConfInit( FlowProcess flowProcess, Tap tap, Object conf ) 450 { 451 } 452 453 public boolean source( FlowProcess flowProcess, SourceCall sourceCall ) throws IOException 454 { 455 throw new UnsupportedOperationException( "sourcing is not supported in the scheme" ); 456 } 457 458 @Override 459 public String toString() 460 { 461 return getClass().getSimpleName(); 462 } 463 464 public void sink( FlowProcess flowProcess, SinkCall sinkCall ) throws IOException 465 { 466 throw new UnsupportedOperationException( "sinking is not supported in the scheme" ); 467 } 468 } 469 470 /** 471 * 472 */ 473 static class ProcessTap<Config> extends Tap<Config, Object, Object> 474 { 475 private final String token; 476 477 ProcessTap( NullScheme scheme, String token ) 478 { 479 super( scheme ); 480 this.token = token; 481 } 482 483 @Override 484 public String getIdentifier() 485 { 486 return token; 487 } 488 489 @Override 490 public String getFullIdentifier( Config conf ) 491 { 492 return getIdentifier(); 493 } 494 495 @Override 496 public TupleEntryIterator openForRead( FlowProcess<? extends Config> flowProcess, Object input ) throws IOException 497 { 498 return null; 499 } 500 501 @Override 502 public TupleEntryCollector openForWrite( FlowProcess<? extends Config> flowProcess, Object output ) throws IOException 503 { 504 return null; 505 } 506 507 @Override 508 public boolean createResource( Config conf ) throws IOException 509 { 510 return false; 511 } 512 513 @Override 514 public boolean deleteResource( Config conf ) throws IOException 515 { 516 return false; 517 } 518 519 @Override 520 public boolean resourceExists( Config conf ) throws IOException 521 { 522 return false; 523 } 524 525 @Override 526 public long getModifiedTime( Config conf ) throws IOException 527 { 528 return 0; 529 } 530 531 @Override 532 public String toString() 533 { 534 return token; 535 } 536 } 537 }