001/* 002 * Copyright (c) 2016-2017 Chris K Wensel. All Rights Reserved. 003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 004 * 005 * Project and contact information: http://www.cascading.org/ 006 * 007 * This file is part of the Cascading project. 008 * 009 * Licensed under the Apache License, Version 2.0 (the "License"); 010 * you may not use this file except in compliance with the License. 011 * You may obtain a copy of the License at 012 * 013 * http://www.apache.org/licenses/LICENSE-2.0 014 * 015 * Unless required by applicable law or agreed to in writing, software 016 * distributed under the License is distributed on an "AS IS" BASIS, 017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 018 * See the License for the specific language governing permissions and 019 * limitations under the License. 020 */ 021 022package cascading.flow; 023 024import java.io.IOException; 025import java.util.Collection; 026import java.util.List; 027import java.util.Map; 028import java.util.stream.Stream; 029 030import cascading.flow.planner.PlannerInfo; 031import cascading.flow.planner.PlatformInfo; 032import cascading.management.UnitOfWork; 033import cascading.stats.FlowStats; 034import cascading.tap.Tap; 035import cascading.tuple.Fields; 036import cascading.tuple.Tuple; 037import cascading.tuple.TupleEntry; 038import cascading.tuple.TupleEntryCollector; 039import cascading.tuple.TupleEntryIterator; 040import cascading.tuple.TupleEntryStream; 041import cascading.tuple.TupleStream; 042 043/** 044 * A Flow is a logical unit of work declared by an assembly of {@link cascading.pipe.Pipe} instances connected to source 045 * and sink {@link Tap} instances. 046 * <p> 047 * A Flow is then executed to push the incoming source data through the assembly into one or more sinks. 048 * <p> 049 * A Flow sub-class instance may not be instantiated directly in most cases, see sub-classes of {@link FlowConnector} class 050 * for supported platforms. 051 * <p> 052 * Note that {@link cascading.pipe.Pipe} assemblies can be reused in multiple Flow instances. They maintain 053 * no state regarding the Flow execution. Subsequently, {@link cascading.pipe.Pipe} assemblies can be given 054 * parameters through its calling Flow so they can be built in a generic fashion. 055 * <p> 056 * When a Flow is created, an optimized internal representation is created that is then executed 057 * on the underlying execution platform. This is typically done by creating one or more {@link FlowStep} instances. 058 * <p> 059 * Flows are submitted in order of dependency when used with a {@link cascading.cascade.Cascade}. If two or more steps do not share the 060 * same dependencies and all can be scheduled simultaneously, the {@link #getSubmitPriority()} value determines 061 * the order in which all steps will be submitted for execution. The default submit priority is 5. 062 * <p> 063 * Use the {@link FlowListener} to receive any events on the life-cycle of the Flow as it executes. Any 064 * {@link Tap} instances owned by the Flow also implementing FlowListener will automatically be added to the 065 * set of listeners. 066 * 067 * @see FlowListener 068 * @see cascading.flow.FlowConnector 069 */ 070public interface Flow<Config> extends UnitOfWork<FlowStats> 071 { 072 String CASCADING_FLOW_ID = "cascading.flow.id"; 073 074 /** 075 * Method getName returns the name of this Flow object. 076 * 077 * @return the name (type String) of this Flow object. 078 */ 079 @Override 080 String getName(); 081 082 /** 083 * Method prepare is used by a {@link cascading.cascade.Cascade} to notify the given Flow it should initialize or clear any resources 084 * necessary for {@link #start()} to be called successfully. 085 * <p> 086 * Specifically, this implementation calls {@link BaseFlow#deleteSinksIfNotUpdate()} && {@link BaseFlow#deleteTrapsIfNotUpdate()}. 087 * 088 * @throws java.io.IOException when 089 */ 090 @Override 091 void prepare(); 092 093 /** 094 * Method start begins the execution of this Flow instance. It will return immediately. Use the method {@link #complete()} 095 * to block until this Flow completes. 096 */ 097 @Override 098 void start(); 099 100 /** Method stop stops all running jobs, killing any currently executing. */ 101 @Override 102 void stop(); 103 104 /** Method complete starts the current Flow instance if it has not be previously started, then block until completion. */ 105 @Override 106 void complete(); 107 108 @Override 109 void cleanup(); 110 111 /** 112 * Returns any meta-data about the planner that created this Flow instance. 113 * 114 * @return an instance of PlannerInfo 115 */ 116 PlannerInfo getPlannerInfo(); 117 118 /** 119 * Returns any meta-data about the underlying platform this Flow instance will run against. 120 * 121 * @return an instance of PlatformInfo 122 */ 123 PlatformInfo getPlatformInfo(); 124 125 /** 126 * Method getConfig returns the internal configuration object. 127 * <p> 128 * Any changes to this object will not be reflected in child steps. See {@link cascading.flow.FlowConnector} for setting 129 * default properties visible to children. Or see {@link cascading.flow.FlowStepStrategy} for setting properties on 130 * individual steps before they are executed. 131 * 132 * @return the default configuration of this Flow 133 */ 134 Config getConfig(); 135 136 /** 137 * Method getConfigCopy returns a copy of the internal configuration object. This object can be safely 138 * modified. 139 * 140 * @return a copy of the default configuration of this Flow 141 */ 142 Config getConfigCopy(); 143 144 /** 145 * Method getConfigAsProperties converts the internal configuration object into a {@link java.util.Map} of 146 * key value pairs. 147 * 148 * @return a Map of key/value pairs 149 */ 150 Map<Object, Object> getConfigAsProperties(); 151 152 /** 153 * Returns the String property associated with the given key from the current Configuration instance. 154 * 155 * @param key of type String 156 * @return the String value 157 */ 158 String getProperty( String key ); 159 160 /** 161 * Method getID returns the ID of this Flow object. 162 * <p> 163 * The ID value is a long HEX String used to identify this instance globally. Subsequent Flow 164 * instances created with identical parameters will not return the same ID. 165 * 166 * @return the ID (type String) of this Flow object. 167 */ 168 @Override 169 String getID(); 170 171 /** 172 * Returns an immutable map of properties giving more details about the Flow object. 173 * <p> 174 * See {@link cascading.flow.FlowDef#addDescription(String, String)} to set values on a given Flow. 175 * <p> 176 * Flow descriptions provide meta-data to monitoring systems describing the workload a given Flow represents. 177 * For known description types, see {@link FlowDescriptors}. 178 * 179 * @return Map 180 */ 181 Map<String, String> getFlowDescriptor(); 182 183 @Override 184 String getTags(); 185 186 /** 187 * Method getSubmitPriority returns the submitPriority of this Flow object. 188 * <p> 189 * 10 is lowest, 1 is the highest, 5 is the default. 190 * 191 * @return the submitPriority (type int) of this FlowStep object. 192 */ 193 int getSubmitPriority(); 194 195 /** 196 * Method setSubmitPriority sets the submitPriority of this Flow object. 197 * <p> 198 * 10 is lowest, 1 is the highest, 5 is the default. 199 * 200 * @param submitPriority the submitPriority of this FlowStep object. 201 */ 202 void setSubmitPriority( int submitPriority ); 203 204 FlowProcess<Config> getFlowProcess(); 205 206 /** 207 * Method getFlowStats returns the flowStats of this Flow object. 208 * 209 * @return the flowStats (type FlowStats) of this Flow object. 210 */ 211 FlowStats getFlowStats(); 212 213 /** 214 * Method hasListeners returns true if {@link FlowListener} instances have been registered. 215 * 216 * @return boolean 217 */ 218 boolean hasListeners(); 219 220 /** 221 * Method addListener registers the given flowListener with this instance. 222 * 223 * @param flowListener of type FlowListener 224 */ 225 void addListener( FlowListener flowListener ); 226 227 /** 228 * Method removeListener removes the given flowListener from this instance. 229 * 230 * @param flowListener of type FlowListener 231 * @return true if the listener was removed 232 */ 233 boolean removeListener( FlowListener flowListener ); 234 235 /** 236 * Method hasStepListeners returns true if {@link FlowStepListener} instances have been registered 237 * with any of the {@link FlowStep}s belonging to this instance 238 * 239 * @return boolean 240 */ 241 boolean hasStepListeners(); 242 243 /** 244 * Method addStepListener registers the given flowStepListener with this instance. 245 * 246 * @param flowStepListener of type addStepListener 247 */ 248 void addStepListener( FlowStepListener flowStepListener ); 249 250 /** 251 * Method removeStepListener removes the given flowStepListener from this instance. 252 * 253 * @param flowStepListener of type FlowStepListener 254 * @return true if the listener was removed from all the {@link FlowStep} belonging to this instance 255 */ 256 boolean removeStepListener( FlowStepListener flowStepListener ); 257 258 /** 259 * Method getSources returns the sources of this Flow object. 260 * 261 * @return the sources (type Map) of this Flow object. 262 */ 263 Map<String, Tap> getSources(); 264 265 List<String> getSourceNames(); 266 267 Tap getSource( String name ); 268 269 /** 270 * Method getSourcesCollection returns a {@link Collection} of source {@link Tap}s for this Flow object. 271 * 272 * @return the sourcesCollection (type Collection) of this Flow object. 273 */ 274 Collection<Tap> getSourcesCollection(); 275 276 /** 277 * Method getSinks returns the sinks of this Flow object. 278 * 279 * @return the sinks (type Map) of this Flow object. 280 */ 281 Map<String, Tap> getSinks(); 282 283 List<String> getSinkNames(); 284 285 Tap getSink( String name ); 286 287 /** 288 * Method getSinksCollection returns a {@link Collection} of sink {@link Tap}s for this Flow object. 289 * 290 * @return the sinkCollection (type Collection) of this Flow object. 291 */ 292 Collection<Tap> getSinksCollection(); 293 294 /** 295 * Method getSink returns the first sink of this Flow object. 296 * 297 * @return the sink (type Tap) of this Flow object. 298 */ 299 Tap getSink(); 300 301 /** 302 * Method getTraps returns the traps of this Flow object. 303 * 304 * @return the traps (type Map) of this Flow object. 305 */ 306 Map<String, Tap> getTraps(); 307 308 List<String> getTrapNames(); 309 310 /** 311 * Method getTrapsCollection returns a {@link Collection} of trap {@link Tap}s for this Flow object. 312 * 313 * @return the trapsCollection (type Collection) of this Flow object. 314 */ 315 Collection<Tap> getTrapsCollection(); 316 317 /** 318 * Method getCheckpoints returns the checkpoint taps of this Flow object. 319 * 320 * @return the traps (type Map) of this Flow object. 321 */ 322 Map<String, Tap> getCheckpoints(); 323 324 List<String> getCheckpointNames(); 325 326 /** 327 * Method getCheckpointsCollection returns a {@link Collection} of checkpoint {@link Tap}s for this Flow object. 328 * 329 * @return the trapsCollection (type Collection) of this Flow object. 330 */ 331 Collection<Tap> getCheckpointsCollection(); 332 333 /** 334 * Method getFlowSkipStrategy returns the current {@link cascading.flow.FlowSkipStrategy} used by this Flow. 335 * 336 * @return FlowSkipStrategy 337 */ 338 FlowSkipStrategy getFlowSkipStrategy(); 339 340 /** 341 * Method setFlowSkipStrategy sets a new {@link cascading.flow.FlowSkipStrategy}, the current strategy is returned. 342 * <p> 343 * FlowSkipStrategy instances define when a Flow instance should be skipped. The default strategy is {@link FlowSkipIfSinkNotStale}. 344 * An alternative strategy would be {@link cascading.flow.FlowSkipIfSinkExists}. 345 * <p> 346 * A FlowSkipStrategy will not be consulted when executing a Flow directly through {@link #start()} or {@link #complete()}. Only 347 * when the Flow is executed through a {@link cascading.cascade.Cascade} instance. 348 * 349 * @param flowSkipStrategy of type FlowSkipStrategy 350 * @return FlowSkipStrategy 351 */ 352 FlowSkipStrategy setFlowSkipStrategy( FlowSkipStrategy flowSkipStrategy ); 353 354 /** 355 * Method isSkipFlow returns true if the parent {@link cascading.cascade.Cascade} should skip this Flow instance. True is returned 356 * if the current {@link cascading.flow.FlowSkipStrategy} returns true. 357 * 358 * @return the skipFlow (type boolean) of this Flow object. 359 * @throws IOException when 360 */ 361 boolean isSkipFlow() throws IOException; 362 363 /** 364 * Method areSinksStale returns true if any of the sinks referenced are out of date in relation to the sources. Or 365 * if any sink method {@link cascading.tap.Tap#isReplace()} returns true. 366 * 367 * @return boolean 368 * @throws java.io.IOException when 369 */ 370 boolean areSinksStale() throws IOException; 371 372 /** 373 * Method areSourcesNewer returns true if any source is newer than the given sinkModified date value. 374 * 375 * @param sinkModified of type long 376 * @return boolean 377 * @throws java.io.IOException when 378 */ 379 boolean areSourcesNewer( long sinkModified ) throws IOException; 380 381 /** 382 * Method getSinkModified returns the youngest modified date of any sink {@link cascading.tap.Tap} managed by this Flow instance. 383 * <p> 384 * If zero (0) is returned, at least one of the sink resources does not exist. If minus one (-1) is returned, 385 * at least one of the sinks are marked for delete ({@link cascading.tap.Tap#isReplace() returns true}). 386 * 387 * @return the sinkModified (type long) of this Flow object. 388 * @throws java.io.IOException when 389 */ 390 long getSinkModified() throws IOException; 391 392 /** 393 * Returns the current {@link FlowStepStrategy} instance. 394 * 395 * @return FlowStepStrategy 396 */ 397 FlowStepStrategy getFlowStepStrategy(); 398 399 /** 400 * Sets a default {@link FlowStepStrategy} instance. 401 * <p> 402 * Use a FlowStepStrategy to change {@link cascading.flow.FlowStep} configuration properties 403 * before the properties are submitted to the underlying platform for the step 404 * unit of work. 405 * 406 * @param flowStepStrategy The FlowStepStrategy to use. 407 */ 408 void setFlowStepStrategy( FlowStepStrategy flowStepStrategy ); 409 410 /** 411 * Method getFlowSteps returns the flowSteps of this Flow object. They will be in topological order. 412 * 413 * @return the steps (type List) of this Flow object. 414 */ 415 List<FlowStep<Config>> getFlowSteps(); 416 417 /** 418 * Method openSource opens the first source Tap. 419 * 420 * @return TupleIterator 421 * @throws IOException when 422 */ 423 TupleEntryIterator openSource() throws IOException; 424 425 /** 426 * Method openSource opens the named source Tap. 427 * 428 * @param name of type String 429 * @return TupleIterator 430 * @throws IOException when 431 */ 432 TupleEntryIterator openSource( String name ) throws IOException; 433 434 /** 435 * Method openSink opens the first sink Tap. 436 * 437 * @return TupleIterator 438 * @throws IOException when 439 */ 440 TupleEntryIterator openSink() throws IOException; 441 442 /** 443 * Method openSink opens the named sink Tap. 444 * 445 * @param name of type String 446 * @return TupleIterator 447 * @throws IOException when 448 */ 449 TupleEntryIterator openSink( String name ) throws IOException; 450 451 /** 452 * Method openTrap opens the first trap Tap. 453 * 454 * @return TupleIterator 455 * @throws IOException when 456 */ 457 TupleEntryIterator openTrap() throws IOException; 458 459 /** 460 * Method openTrap opens the named trap Tap. 461 * 462 * @param name of type String 463 * @return TupleIterator 464 * @throws IOException when 465 */ 466 TupleEntryIterator openTrap( String name ) throws IOException; 467 468 /** 469 * Method resourceExists returns true if the resource represented by the given Tap instance exists. 470 * 471 * @param tap of type Tap 472 * @return boolean 473 * @throws IOException when 474 */ 475 boolean resourceExists( Tap tap ) throws IOException; 476 477 /** 478 * Method openTapForRead return a {@link cascading.tuple.TupleEntryIterator} for the given Tap instance. 479 * <p> 480 * Note the returned iterator will return the same instance of {@link cascading.tuple.TupleEntry} on every call, 481 * thus a copy must be made of either the TupleEntry or the underlying {@code Tuple} instance if they are to be 482 * stored in a Collection. 483 * 484 * @param tap of type Tap 485 * @return TupleIterator 486 * @throws IOException when there is an error opening the resource 487 */ 488 TupleEntryIterator openTapForRead( Tap tap ) throws IOException; 489 490 /** 491 * Method openTapForWrite returns a (@link TupleCollector} for the given Tap instance. 492 * 493 * @param tap of type Tap 494 * @return TupleCollector 495 * @throws IOException when there is an error opening the resource 496 */ 497 TupleEntryCollector openTapForWrite( Tap tap ) throws IOException; 498 499 /** 500 * Method writeDOT writes this Flow instance to the given filename as a DOT file for import into a graphics package. 501 * 502 * @param filename of type String 503 */ 504 void writeDOT( String filename ); 505 506 /** 507 * Method writeStepsDOT writes this Flow step graph to the given filename as a DOT file for import into a graphics package. 508 * 509 * @param filename of type String 510 */ 511 void writeStepsDOT( String filename ); 512 513 /** 514 * Returns the parent Cascade ID that owns this Flow instance. 515 * 516 * @return of type String 517 */ 518 String getCascadeID(); 519 520 /** 521 * Returns the run ID given when this Flow instance was defined in the FlowDef. 522 * 523 * @return of type String 524 */ 525 String getRunID(); 526 527 /** 528 * Method jobsAreLocal returns true if all jobs are executed in-process as a single map and reduce task. 529 * 530 * @return boolean 531 */ 532 boolean stepsAreLocal(); 533 534 /** 535 * Method isStopJobsOnExit returns the stopJobsOnExit of this Flow object. Defaults to {@code true}. 536 * 537 * @return the stopJobsOnExit (type boolean) of this Flow object. 538 */ 539 boolean isStopJobsOnExit(); 540 541 default Stream<TupleEntry> getSourceEntryStream( String name ) 542 { 543 return TupleEntryStream.entryStream( getSource( name ), getFlowProcess() ); 544 } 545 546 default Stream<TupleEntry> getSourceEntryStream( String name, Fields selector ) 547 { 548 return TupleEntryStream.entryStream( getSource( name ), getFlowProcess(), selector ); 549 } 550 551 default Stream<TupleEntry> getSourceEntryStreamCopy( String name ) 552 { 553 return TupleEntryStream.entryStreamCopy( getSource( name ), getFlowProcess() ); 554 } 555 556 default Stream<TupleEntry> getSourceEntryStreamCopy( String name, Fields selector ) 557 { 558 return TupleEntryStream.entryStreamCopy( getSource( name ), getFlowProcess(), selector ); 559 } 560 561 default Stream<Tuple> getSourceTupleStream( String name ) 562 { 563 return TupleStream.tupleStream( getSource( name ), getFlowProcess() ); 564 } 565 566 default Stream<Tuple> getSourceTupleStream( String name, Fields selector ) 567 { 568 return TupleStream.tupleStream( getSource( name ), getFlowProcess(), selector ); 569 } 570 571 default Stream<Tuple> getSourceTupleStreamCopy( String name ) 572 { 573 return TupleStream.tupleStream( getSource( name ), getFlowProcess() ); 574 } 575 576 default Stream<Tuple> getSourceTupleStreamCopy( String name, Fields selector ) 577 { 578 return TupleStream.tupleStream( getSource( name ), getFlowProcess(), selector ); 579 } 580 581 default Stream<TupleEntry> getSinkEntryStream() 582 { 583 return TupleEntryStream.entryStream( getSink(), getFlowProcess() ); 584 } 585 586 default Stream<TupleEntry> getSinkEntryStream( Fields selector ) 587 { 588 return TupleEntryStream.entryStream( getSink(), getFlowProcess(), selector ); 589 } 590 591 default Stream<TupleEntry> getSinkEntryStreamCopy() 592 { 593 return TupleEntryStream.entryStreamCopy( getSink(), getFlowProcess() ); 594 } 595 596 default Stream<TupleEntry> getSinkEntryStreamCopy( Fields selector ) 597 { 598 return TupleEntryStream.entryStreamCopy( getSink(), getFlowProcess(), selector ); 599 } 600 601 default Stream<Tuple> getSinkTupleStream() 602 { 603 return TupleStream.tupleStream( getSink(), getFlowProcess() ); 604 } 605 606 default Stream<Tuple> getSinkTupleStream( Fields selector ) 607 { 608 return TupleStream.tupleStream( getSink(), getFlowProcess(), selector ); 609 } 610 611 default Stream<Tuple> getSinkTupleStreamCopy() 612 { 613 return TupleStream.tupleStream( getSink(), getFlowProcess() ); 614 } 615 616 default Stream<Tuple> getSinkTupleStreamCopy( Fields selector ) 617 { 618 return TupleStream.tupleStream( getSink(), getFlowProcess(), selector ); 619 } 620 621 default Stream<TupleEntry> getSinkEntryStream( String name ) 622 { 623 return TupleEntryStream.entryStream( getSink( name ), getFlowProcess() ); 624 } 625 626 default Stream<TupleEntry> getSinkEntryStream( String name, Fields selector ) 627 { 628 return TupleEntryStream.entryStream( getSink( name ), getFlowProcess(), selector ); 629 } 630 631 default Stream<TupleEntry> getSinkEntryStreamCopy( String name ) 632 { 633 return TupleEntryStream.entryStreamCopy( getSink( name ), getFlowProcess() ); 634 } 635 636 default Stream<TupleEntry> getSinkEntryStreamCopy( String name, Fields selector ) 637 { 638 return TupleEntryStream.entryStreamCopy( getSink( name ), getFlowProcess(), selector ); 639 } 640 641 default Stream<Tuple> getSinkTupleStream( String name ) 642 { 643 return TupleStream.tupleStream( getSink( name ), getFlowProcess() ); 644 } 645 646 default Stream<Tuple> getSinkTupleStream( String name, Fields selector ) 647 { 648 return TupleStream.tupleStream( getSink( name ), getFlowProcess(), selector ); 649 } 650 651 default Stream<Tuple> getSinkTupleStreamCopy( String name ) 652 { 653 return TupleStream.tupleStream( getSink( name ), getFlowProcess() ); 654 } 655 656 default Stream<Tuple> getSinkTupleStreamCopy( String name, Fields selector ) 657 { 658 return TupleStream.tupleStream( getSink( name ), getFlowProcess(), selector ); 659 } 660 }