001 /* 002 * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.flow; 022 023 import java.io.IOException; 024 import java.util.Collection; 025 import java.util.LinkedHashMap; 026 import java.util.List; 027 import java.util.Map; 028 029 import cascading.flow.planner.PlatformInfo; 030 import cascading.management.UnitOfWork; 031 import cascading.stats.FlowStats; 032 import cascading.tap.Tap; 033 import cascading.tuple.TupleEntryCollector; 034 import cascading.tuple.TupleEntryIterator; 035 036 /** 037 * A Flow is a logical unit of work declared by an assembly of {@link cascading.pipe.Pipe} instances connected to source 038 * and sink {@link Tap} instances. 039 * <p/> 040 * A Flow is then executed to push the incoming source data through the assembly into one or more sinks. 041 * <p/> 042 * A Flow sub-class instance may not be instantiated directly in most cases, see sub-classes of {@link FlowConnector} class 043 * for supported platforms. 044 * <p/> 045 * Note that {@link cascading.pipe.Pipe} assemblies can be reused in multiple Flow instances. They maintain 046 * no state regarding the Flow execution. Subsequently, {@link cascading.pipe.Pipe} assemblies can be given 047 * parameters through its calling Flow so they can be built in a generic fashion. 048 * <p/> 049 * When a Flow is created, an optimized internal representation is created that is then executed 050 * on the underlying execution platform. This is typically done by creating one or more {@link FlowStep} instances. 051 * </p> 052 * Flows are submitted in order of dependency when used with a {@link cascading.cascade.Cascade}. If two or more steps do not share the 053 * same dependencies and all can be scheduled simultaneously, the {@link #getSubmitPriority()} value determines 054 * the order in which all steps will be submitted for execution. The default submit priority is 5. 055 * <p/> 056 * Use the {@link FlowListener} to receive any events on the life-cycle of the Flow as it executes. Any 057 * {@link Tap} instances owned by the Flow also implementing FlowListener will automatically be added to the 058 * set of listeners. 059 * 060 * @see FlowListener 061 * @see cascading.flow.FlowConnector 062 */ 063 public interface Flow<Config> extends UnitOfWork<FlowStats> 064 { 065 String CASCADING_FLOW_ID = "cascading.flow.id"; 066 067 /** 068 * Method getName returns the name of this Flow object. 069 * 070 * @return the name (type String) of this Flow object. 071 */ 072 @Override 073 String getName(); 074 075 /** 076 * Method prepare is used by a {@link cascading.cascade.Cascade} to notify the given Flow it should initialize or clear any resources 077 * necessary for {@link #start()} to be called successfully. 078 * <p/> 079 * Specifically, this implementation calls {@link BaseFlow#deleteSinksIfNotUpdate()} && {@link BaseFlow#deleteTrapsIfNotUpdate()}. 080 * 081 * @throws java.io.IOException when 082 */ 083 @Override 084 void prepare(); 085 086 /** 087 * Method start begins the execution of this Flow instance. It will return immediately. Use the method {@link #complete()} 088 * to block until this Flow completes. 089 */ 090 @Override 091 void start(); 092 093 /** Method stop stops all running jobs, killing any currently executing. */ 094 @Override 095 void stop(); 096 097 /** Method complete starts the current Flow instance if it has not be previously started, then block until completion. */ 098 @Override 099 void complete(); 100 101 @Override 102 void cleanup(); 103 104 /** 105 * Method getConfig returns the internal configuration object. 106 * <p/> 107 * Any changes to this object will not be reflected in child steps. See {@link cascading.flow.FlowConnector} for setting 108 * default properties visible to children. Or see {@link cascading.flow.FlowStepStrategy} for setting properties on 109 * individual steps before they are executed. 110 * 111 * @return the default configuration of this Flow 112 */ 113 Config getConfig(); 114 115 /** 116 * Method getConfigCopy returns a copy of the internal configuration object. This object can be safely 117 * modified. 118 * 119 * @return a copy of the default configuration of this Flow 120 */ 121 Config getConfigCopy(); 122 123 /** 124 * Method getConfiAsProperties converts the internal configuration object into a {@link java.util.Map} of 125 * key value pairs. 126 * 127 * @return a Map of key/value pairs 128 */ 129 Map<Object, Object> getConfigAsProperties(); 130 131 String getProperty( String key ); 132 133 /** 134 * Method getID returns the ID of this Flow object. 135 * <p/> 136 * The ID value is a long HEX String used to identify this instance globally. Subsequent Flow 137 * instances created with identical parameters will not return the same ID. 138 * 139 * @return the ID (type String) of this Flow object. 140 */ 141 @Override 142 String getID(); 143 144 /** 145 * Returns an immutable map of properties giving more details about the Flow object. 146 * <p/> 147 * See {@link cascading.flow.FlowDef#addDescription(String, String)} to set values on a given Flow. 148 * <p/> 149 * Flow descriptions provide meta-data to monitoring systems describing the workload a given Flow represents. 150 * For known description types, see {@link cascading.flow.FLowDescriptors}. 151 * 152 * @return Map<String,String> 153 */ 154 public Map<String, String> getFlowDescriptor(); 155 156 @Override 157 String getTags(); 158 159 /** 160 * Method getSubmitPriority returns the submitPriority of this Flow object. 161 * <p/> 162 * 10 is lowest, 1 is the highest, 5 is the default. 163 * 164 * @return the submitPriority (type int) of this FlowStep object. 165 */ 166 int getSubmitPriority(); 167 168 /** 169 * Method setSubmitPriority sets the submitPriority of this Flow object. 170 * <p/> 171 * 10 is lowest, 1 is the highest, 5 is the default. 172 * 173 * @param submitPriority the submitPriority of this FlowStep object. 174 */ 175 void setSubmitPriority( int submitPriority ); 176 177 FlowProcess<Config> getFlowProcess(); 178 179 /** 180 * Method getFlowStats returns the flowStats of this Flow object. 181 * 182 * @return the flowStats (type FlowStats) of this Flow object. 183 */ 184 FlowStats getFlowStats(); 185 186 /** 187 * Method hasListeners returns true if {@link FlowListener} instances have been registered. 188 * 189 * @return boolean 190 */ 191 boolean hasListeners(); 192 193 /** 194 * Method addListener registers the given flowListener with this instance. 195 * 196 * @param flowListener of type FlowListener 197 */ 198 void addListener( FlowListener flowListener ); 199 200 /** 201 * Method removeListener removes the given flowListener from this instance. 202 * 203 * @param flowListener of type FlowListener 204 * @return true if the listener was removed 205 */ 206 boolean removeListener( FlowListener flowListener ); 207 208 /** 209 * Method hasStepListeners returns true if {@link FlowStepListener} instances have been registered 210 * with any of the {@link FlowStep}s belonging to this instance 211 * 212 * @return boolean 213 */ 214 boolean hasStepListeners(); 215 216 /** 217 * Method addStepListener registers the given flowStepListener with this instance. 218 * 219 * @param flowStepListener of type addStepListener 220 */ 221 void addStepListener( FlowStepListener flowStepListener ); 222 223 /** 224 * Method removeStepListener removes the given flowStepListener from this instance. 225 * 226 * @param flowStepListener of type FlowStepListener 227 * @return true if the listener was removed from all the {@link FlowStep} belonging to this instance 228 */ 229 boolean removeStepListener( FlowStepListener flowStepListener ); 230 231 /** 232 * Method getSources returns the sources of this Flow object. 233 * 234 * @return the sources (type Map) of this Flow object. 235 */ 236 Map<String, Tap> getSources(); 237 238 List<String> getSourceNames(); 239 240 Tap getSource( String name ); 241 242 /** 243 * Method getSourcesCollection returns a {@link Collection} of source {@link Tap}s for this Flow object. 244 * 245 * @return the sourcesCollection (type Collection<Tap>) of this Flow object. 246 */ 247 Collection<Tap> getSourcesCollection(); 248 249 /** 250 * Method getSinks returns the sinks of this Flow object. 251 * 252 * @return the sinks (type Map) of this Flow object. 253 */ 254 Map<String, Tap> getSinks(); 255 256 List<String> getSinkNames(); 257 258 Tap getSink( String name ); 259 260 /** 261 * Method getSinksCollection returns a {@link Collection} of sink {@link Tap}s for this Flow object. 262 * 263 * @return the sinkCollection (type Collection<Tap>) of this Flow object. 264 */ 265 Collection<Tap> getSinksCollection(); 266 267 /** 268 * Method getSink returns the first sink of this Flow object. 269 * 270 * @return the sink (type Tap) of this Flow object. 271 */ 272 Tap getSink(); 273 274 /** 275 * Method getTraps returns the traps of this Flow object. 276 * 277 * @return the traps (type Map<String, Tap>) of this Flow object. 278 */ 279 Map<String, Tap> getTraps(); 280 281 List<String> getTrapNames(); 282 283 /** 284 * Method getTrapsCollection returns a {@link Collection} of trap {@link Tap}s for this Flow object. 285 * 286 * @return the trapsCollection (type Collection<Tap>) of this Flow object. 287 */ 288 Collection<Tap> getTrapsCollection(); 289 290 /** 291 * Method getCheckpoints returns the checkpoint taps of this Flow object. 292 * 293 * @return the traps (type Map<String, Tap>) of this Flow object. 294 */ 295 Map<String, Tap> getCheckpoints(); 296 297 List<String> getCheckpointNames(); 298 299 /** 300 * Method getCheckpointsCollection returns a {@link Collection} of checkpoint {@link Tap}s for this Flow object. 301 * 302 * @return the trapsCollection (type Collection<Tap>) of this Flow object. 303 */ 304 Collection<Tap> getCheckpointsCollection(); 305 306 /** 307 * Method getFlowSkipStrategy returns the current {@link cascading.flow.FlowSkipStrategy} used by this Flow. 308 * 309 * @return FlowSkipStrategy 310 */ 311 FlowSkipStrategy getFlowSkipStrategy(); 312 313 /** 314 * Method setFlowSkipStrategy sets a new {@link cascading.flow.FlowSkipStrategy}, the current strategy is returned. 315 * <p/> 316 * FlowSkipStrategy instances define when a Flow instance should be skipped. The default strategy is {@link FlowSkipIfSinkNotStale}. 317 * An alternative strategy would be {@link cascading.flow.FlowSkipIfSinkExists}. 318 * <p/> 319 * A FlowSkipStrategy will not be consulted when executing a Flow directly through {@link #start()} or {@link #complete()}. Only 320 * when the Flow is executed through a {@link cascading.cascade.Cascade} instance. 321 * 322 * @param flowSkipStrategy of type FlowSkipStrategy 323 * @return FlowSkipStrategy 324 */ 325 FlowSkipStrategy setFlowSkipStrategy( FlowSkipStrategy flowSkipStrategy ); 326 327 /** 328 * Method isSkipFlow returns true if the parent {@link cascading.cascade.Cascade} should skip this Flow instance. True is returned 329 * if the current {@link cascading.flow.FlowSkipStrategy} returns true. 330 * 331 * @return the skipFlow (type boolean) of this Flow object. 332 * @throws IOException when 333 */ 334 boolean isSkipFlow() throws IOException; 335 336 /** 337 * Method areSinksStale returns true if any of the sinks referenced are out of date in relation to the sources. Or 338 * if any sink method {@link cascading.tap.Tap#isReplace()} returns true. 339 * 340 * @return boolean 341 * @throws java.io.IOException when 342 */ 343 boolean areSinksStale() throws IOException; 344 345 /** 346 * Method areSourcesNewer returns true if any source is newer than the given sinkModified date value. 347 * 348 * @param sinkModified of type long 349 * @return boolean 350 * @throws java.io.IOException when 351 */ 352 boolean areSourcesNewer( long sinkModified ) throws IOException; 353 354 /** 355 * Method getSinkModified returns the youngest modified date of any sink {@link cascading.tap.Tap} managed by this Flow instance. 356 * <p/> 357 * If zero (0) is returned, at least one of the sink resources does not exist. If minus one (-1) is returned, 358 * atleast one of the sinks are marked for delete ({@link cascading.tap.Tap#isReplace() returns true}). 359 * 360 * @return the sinkModified (type long) of this Flow object. 361 * @throws java.io.IOException when 362 */ 363 long getSinkModified() throws IOException; 364 365 /** 366 * Returns the current {@link FlowStepStrategy} instance. 367 * 368 * @return FlowStepStrategy 369 */ 370 FlowStepStrategy getFlowStepStrategy(); 371 372 /** 373 * Sets a default {@link FlowStepStrategy} instance. 374 * <p/> 375 * Use a FlowStepStrategy to change {@link cascading.flow.FlowStep} configuration properties 376 * before the properties are submitted to the underlying platform for the step 377 * unit of work. 378 * 379 * @param flowStepStrategy The FlowStepStrategy to use. 380 */ 381 void setFlowStepStrategy( FlowStepStrategy flowStepStrategy ); 382 383 /** 384 * Method getFlowSteps returns the flowSteps of this Flow object. They will be in topological order. 385 * 386 * @return the steps (type List<FlowStep>) of this Flow object. 387 */ 388 List<FlowStep<Config>> getFlowSteps(); 389 390 /** 391 * Method openSource opens the first source Tap. 392 * 393 * @return TupleIterator 394 * @throws IOException when 395 */ 396 TupleEntryIterator openSource() throws IOException; 397 398 /** 399 * Method openSource opens the named source Tap. 400 * 401 * @param name of type String 402 * @return TupleIterator 403 * @throws IOException when 404 */ 405 TupleEntryIterator openSource( String name ) throws IOException; 406 407 /** 408 * Method openSink opens the first sink Tap. 409 * 410 * @return TupleIterator 411 * @throws IOException when 412 */ 413 TupleEntryIterator openSink() throws IOException; 414 415 /** 416 * Method openSink opens the named sink Tap. 417 * 418 * @param name of type String 419 * @return TupleIterator 420 * @throws IOException when 421 */ 422 TupleEntryIterator openSink( String name ) throws IOException; 423 424 /** 425 * Method openTrap opens the first trap Tap. 426 * 427 * @return TupleIterator 428 * @throws IOException when 429 */ 430 TupleEntryIterator openTrap() throws IOException; 431 432 /** 433 * Method openTrap opens the named trap Tap. 434 * 435 * @param name of type String 436 * @return TupleIterator 437 * @throws IOException when 438 */ 439 TupleEntryIterator openTrap( String name ) throws IOException; 440 441 /** 442 * Method resourceExists returns true if the resource represented by the given Tap instance exists. 443 * 444 * @param tap of type Tap 445 * @return boolean 446 * @throws IOException when 447 */ 448 boolean resourceExists( Tap tap ) throws IOException; 449 450 /** 451 * Method openTapForRead return a {@link cascading.tuple.TupleEntryIterator} for the given Tap instance. 452 * <p/> 453 * Note the returned iterator will return the same instance of {@link cascading.tuple.TupleEntry} on every call, 454 * thus a copy must be made of either the TupleEntry or the underlying {@code Tuple} instance if they are to be 455 * stored in a Collection. 456 * 457 * @param tap of type Tap 458 * @return TupleIterator 459 * @throws IOException when there is an error opening the resource 460 */ 461 TupleEntryIterator openTapForRead( Tap tap ) throws IOException; 462 463 /** 464 * Method openTapForWrite returns a (@link TupleCollector} for the given Tap instance. 465 * 466 * @param tap of type Tap 467 * @return TupleCollector 468 * @throws IOException when there is an error opening the resource 469 */ 470 TupleEntryCollector openTapForWrite( Tap tap ) throws IOException; 471 472 /** 473 * Method writeDOT writes this Flow instance to the given filename as a DOT file for import into a graphics package. 474 * 475 * @param filename of type String 476 */ 477 void writeDOT( String filename ); 478 479 /** 480 * Method writeStepsDOT writes this Flow step graph to the given filename as a DOT file for import into a graphics package. 481 * 482 * @param filename of type String 483 */ 484 void writeStepsDOT( String filename ); 485 486 String getCascadeID(); 487 488 String getRunID(); 489 490 PlatformInfo getPlatformInfo(); 491 492 /** 493 * Method jobsAreLocal returns true if all jobs are executed in-process as a single map and reduce task. 494 * 495 * @return boolean 496 */ 497 boolean stepsAreLocal(); 498 499 /** 500 * Method isStopJobsOnExit returns the stopJobsOnExit of this Flow object. Defaults to {@code true}. 501 * 502 * @return the stopJobsOnExit (type boolean) of this Flow object. 503 */ 504 boolean isStopJobsOnExit(); 505 }