001 /* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.flow; 022 023 import java.util.ArrayList; 024 import java.util.Arrays; 025 import java.util.Collection; 026 import java.util.HashMap; 027 import java.util.LinkedHashMap; 028 import java.util.List; 029 import java.util.Map; 030 031 import cascading.operation.AssertionLevel; 032 import cascading.operation.DebugLevel; 033 import cascading.pipe.Checkpoint; 034 import cascading.pipe.Pipe; 035 import cascading.property.UnitOfWorkDef; 036 import cascading.tap.Tap; 037 import cascading.util.Util; 038 039 /** 040 * Class FlowDef is a fluent interface for defining a {@link Flow}. 041 * <p/> 042 * This allows for ad-hoc building of Flow data and meta-data like tags. 043 * <p/> 044 * Instead of calling one of the {@link FlowConnector} connect methods, {@link FlowConnector#connect(FlowDef)} 045 * can be called. 046 */ 047 public class FlowDef extends UnitOfWorkDef<FlowDef> 048 { 049 protected Map<String, Tap> sources = new HashMap<String, Tap>(); 050 protected Map<String, Tap> sinks = new HashMap<String, Tap>(); 051 protected Map<String, Tap> traps = new HashMap<String, Tap>(); 052 protected Map<String, Tap> checkpoints = new HashMap<String, Tap>(); 053 054 protected List<String> classPath = new ArrayList<String>(); 055 protected List<Pipe> tails = new ArrayList<Pipe>(); 056 protected List<AssemblyPlanner> assemblyPlanners = new ArrayList<AssemblyPlanner>(); 057 058 protected HashMap<String, String> flowDescriptor = new LinkedHashMap<String, String>(); 059 060 protected AssertionLevel assertionLevel; 061 protected DebugLevel debugLevel; 062 063 protected String runID; 064 065 /** 066 * Creates a new instance of a FlowDef. 067 * 068 * @return a FlowDef 069 */ 070 public static FlowDef flowDef() 071 { 072 return new FlowDef(); 073 } 074 075 /** Constructor FlowDef creates a new FlowDef instance. */ 076 public FlowDef() 077 { 078 } 079 080 /** 081 * Method getAssemblyPlanners returns the current registered AssemblyPlanners. 082 * 083 * @return a List of AssemblyPlanner instances 084 */ 085 public List<AssemblyPlanner> getAssemblyPlanners() 086 { 087 return assemblyPlanners; 088 } 089 090 /** 091 * Method addAssemblyPlanner adds new AssemblyPlanner instances to be evaluated. 092 * 093 * @param assemblyPlanner of type AssemblyPlanner 094 * @return a FlowDef 095 */ 096 public FlowDef addAssemblyPlanner( AssemblyPlanner assemblyPlanner ) 097 { 098 assemblyPlanners.add( assemblyPlanner ); 099 addDescriptions( assemblyPlanner.getFlowDescriptor() ); 100 101 return this; 102 } 103 104 /** 105 * Method getSources returns the sources of this FlowDef object. 106 * 107 * @return the sources (type Map<String, Tap>) of this FlowDef object. 108 */ 109 public Map<String, Tap> getSources() 110 { 111 return sources; 112 } 113 114 /** 115 * Method getSourcesCopy returns a copy of the sources Map. 116 * 117 * @return the sourcesCopy (type Map<String, Tap>) of this FlowDef object. 118 */ 119 public Map<String, Tap> getSourcesCopy() 120 { 121 return new HashMap<String, Tap>( sources ); 122 } 123 124 /** 125 * Method getFlowDescriptor returns the flowDescriptor of this FlowDef. 126 * 127 * @return the flowDescriptor of this FlowDef object. 128 */ 129 public HashMap<String, String> getFlowDescriptor() 130 { 131 return flowDescriptor; 132 } 133 134 /** 135 * Method addSource adds a new named source {@link Tap} for use in the resulting {@link Flow}. 136 * 137 * @param name of String 138 * @param source of Tap 139 * @return FlowDef 140 */ 141 public FlowDef addSource( String name, Tap source ) 142 { 143 if( sources.containsKey( name ) ) 144 throw new IllegalArgumentException( "cannot add duplicate source: " + name ); 145 146 sources.put( name, source ); 147 return this; 148 } 149 150 /** 151 * Method addSource adds a new source {@link Tap} named after the given {@link Pipe} for use in the resulting {@link Flow}. 152 * <p/> 153 * If the given pipe is not a head pipe, it will be resolved. If more than one is found, an 154 * {@link IllegalArgumentException} will be thrown. 155 * 156 * @param pipe of Pipe 157 * @param source of Tap 158 * @return FlowDef 159 */ 160 public FlowDef addSource( Pipe pipe, Tap source ) 161 { 162 if( pipe == null ) 163 throw new IllegalArgumentException( "pipe may not be null" ); 164 165 Pipe[] heads = pipe.getHeads(); 166 167 if( heads.length != 1 ) 168 throw new IllegalArgumentException( "pipe has too many heads, found: " + Arrays.toString( Pipe.names( heads ) ) ); 169 170 addSource( heads[ 0 ].getName(), source ); 171 return this; 172 } 173 174 /** 175 * Method addSources adds a map of name and {@link Tap} pairs. 176 * 177 * @param sources of Map<String, Tap> 178 * @return FlowDef 179 */ 180 public FlowDef addSources( Map<String, Tap> sources ) 181 { 182 if( sources != null ) 183 { 184 for( Map.Entry<String, Tap> entry : sources.entrySet() ) 185 addSource( entry.getKey(), entry.getValue() ); 186 } 187 188 return this; 189 } 190 191 /** 192 * Method addDescription adds a user readable description to the flowDescriptor. 193 * <p/> 194 * This uses the {@link FlowDescriptors#DESCRIPTION} key. 195 */ 196 public FlowDef addDescription( String description ) 197 { 198 addDescription( FlowDescriptors.DESCRIPTION, description ); 199 200 return this; 201 } 202 203 /** 204 * Method addDescription adds a description to the flowDescriptor. 205 * <p/> 206 * Flow descriptions provide meta-data to monitoring systems describing the workload a given Flow represents. 207 * For known description types, see {@link FlowDescriptors}. 208 * <p/> 209 * If an existing key exists, it will be appended to the original value using 210 * {@link FlowDescriptors#VALUE_SEPARATOR}. 211 * 212 * @param key The key as a String. 213 * @param value The value as a String. 214 * @return FlowDef 215 */ 216 public FlowDef addDescription( String key, String value ) 217 { 218 if( Util.isEmpty( value ) ) // do nothing 219 return this; 220 221 if( flowDescriptor.containsKey( key ) ) 222 { 223 String original = flowDescriptor.get( key ); 224 225 if( !Util.isEmpty( original ) ) 226 value = original + FlowDescriptors.VALUE_SEPARATOR + value; 227 } 228 229 flowDescriptor.put( key, value ); 230 231 return this; 232 } 233 234 /** 235 * Method addProperties adds all properties in the given map in order to the flowDescriptor. If the given Map has 236 * an explicit order, it will be preserved. 237 * <p/> 238 * Flow descriptions provide meta-data to monitoring systems describing the workload a given Flow represents. 239 * For known description types, see {@link FlowDescriptors}. 240 * 241 * @param descriptions The properties to be added to the map. 242 * @return FlowDef 243 */ 244 public FlowDef addDescriptions( Map<String, String> descriptions ) 245 { 246 for( Map.Entry<String, String> entry : descriptions.entrySet() ) 247 addDescription( entry.getKey(), entry.getValue() ); 248 249 return this; 250 } 251 252 /** 253 * Method getSinks returns the sinks of this FlowDef object. 254 * 255 * @return the sinks (type Map<String, Tap>) of this FlowDef object. 256 */ 257 public Map<String, Tap> getSinks() 258 { 259 return sinks; 260 } 261 262 /** 263 * Method getSinksCopy returns a copy of the sink Map. 264 * 265 * @return the sinksCopy (type Map<String, Tap>) of this FlowDef object. 266 */ 267 public Map<String, Tap> getSinksCopy() 268 { 269 return new HashMap<String, Tap>( sinks ); 270 } 271 272 /** 273 * Method addSink adds a new named sink {@link Tap} for use in the resulting {@link Flow}. 274 * 275 * @param name of String 276 * @param sink of Tap 277 * @return FlowDef 278 */ 279 public FlowDef addSink( String name, Tap sink ) 280 { 281 if( sinks.containsKey( name ) ) 282 throw new IllegalArgumentException( "cannot add duplicate sink: " + name ); 283 284 sinks.put( name, sink ); 285 return this; 286 } 287 288 /** 289 * Method addSink adds a new sink {@link Tap} named after the given {@link Pipe} for use in the resulting {@link Flow}. 290 * 291 * @param tail of Pipe 292 * @param sink of Tap 293 * @return FlowDef 294 */ 295 public FlowDef addSink( Pipe tail, Tap sink ) 296 { 297 addSink( tail.getName(), sink ); 298 return this; 299 } 300 301 /** 302 * Method addTailSink adds the tail {@link Pipe} and sink {@link Tap} to this FlowDef. 303 * <p/> 304 * This is a convenience method for adding both a tail and sink simultaneously. There isn't a similar method 305 * for heads and sources as the head Pipe can always be derived. 306 * 307 * @param tail of Pipe 308 * @param sink of Tap 309 * @return FlowDef 310 */ 311 public FlowDef addTailSink( Pipe tail, Tap sink ) 312 { 313 addSink( tail.getName(), sink ); 314 addTail( tail ); 315 return this; 316 } 317 318 /** 319 * Method addSinks adds a Map of the named and {@link Tap} pairs. 320 * 321 * @param sinks of Map<String, Tap> 322 * @return FlowDef 323 */ 324 public FlowDef addSinks( Map<String, Tap> sinks ) 325 { 326 if( sinks != null ) 327 { 328 for( Map.Entry<String, Tap> entry : sinks.entrySet() ) 329 addSink( entry.getKey(), entry.getValue() ); 330 } 331 332 return this; 333 } 334 335 /** 336 * Method getTraps returns the traps of this FlowDef object. 337 * 338 * @return the traps (type Map<String, Tap>) of this FlowDef object. 339 */ 340 public Map<String, Tap> getTraps() 341 { 342 return traps; 343 } 344 345 /** 346 * Method getTrapsCopy returns a copy of the trap Map. 347 * 348 * @return the trapsCopy (type Map<String, Tap>) of this FlowDef object. 349 */ 350 public Map<String, Tap> getTrapsCopy() 351 { 352 return new HashMap<String, Tap>( traps ); 353 } 354 355 /** 356 * Method addTrap adds a new named trap {@link Tap} for use in the resulting {@link Flow}. 357 * 358 * @param name of String 359 * @param trap of Tap 360 * @return FlowDef 361 */ 362 public FlowDef addTrap( String name, Tap trap ) 363 { 364 if( traps.containsKey( name ) ) 365 throw new IllegalArgumentException( "cannot add duplicate trap: " + name ); 366 367 traps.put( name, trap ); 368 return this; 369 } 370 371 /** 372 * Method addTrap adds a new trap {@link Tap} named after the given {@link Pipe} for use in the resulting {@link Flow}. 373 * 374 * @param pipe of Pipe 375 * @param trap of Tap 376 * @return FlowDef 377 */ 378 public FlowDef addTrap( Pipe pipe, Tap trap ) 379 { 380 addTrap( pipe.getName(), trap ); 381 return this; 382 } 383 384 /** 385 * Method addTraps adds a Map of the names and {@link Tap} pairs. 386 * 387 * @param traps of Map<String, Tap> 388 * @return FlowDef 389 */ 390 public FlowDef addTraps( Map<String, Tap> traps ) 391 { 392 if( traps != null ) 393 { 394 for( Map.Entry<String, Tap> entry : traps.entrySet() ) 395 addTrap( entry.getKey(), entry.getValue() ); 396 } 397 398 return this; 399 } 400 401 /** 402 * Method getCheckpoints returns the checkpoint taps of this FlowDef object. 403 * 404 * @return the checkpoints (type Map<String, Tap>) of this FlowDef object. 405 */ 406 public Map<String, Tap> getCheckpoints() 407 { 408 return checkpoints; 409 } 410 411 /** 412 * Method getCheckpointsCopy returns a copy of the checkpoint tap Map. 413 * 414 * @return the checkpointsCopy (type Map<String, Tap>) of this FlowDef object. 415 */ 416 public Map<String, Tap> getCheckpointsCopy() 417 { 418 return new HashMap<String, Tap>( checkpoints ); 419 } 420 421 /** 422 * Method addCheckpoint adds a new named checkpoint {@link Tap} for use in the resulting {@link Flow}. 423 * 424 * @param name of String 425 * @param checkpoint of Tap 426 * @return FlowDef 427 */ 428 public FlowDef addCheckpoint( String name, Tap checkpoint ) 429 { 430 if( checkpoints.containsKey( name ) ) 431 throw new IllegalArgumentException( "cannot add duplicate checkpoint: " + name ); 432 433 checkpoints.put( name, checkpoint ); 434 return this; 435 } 436 437 /** 438 * Method addCheckpoint adds a new checkpoint {@link Tap} named after the given {@link Checkpoint} for use in the resulting {@link Flow}. 439 * 440 * @param pipe of Pipe 441 * @param checkpoint of Tap 442 * @return FlowDef 443 */ 444 public FlowDef addCheckpoint( Checkpoint pipe, Tap checkpoint ) 445 { 446 addCheckpoint( pipe.getName(), checkpoint ); 447 return this; 448 } 449 450 /** 451 * Method addCheckpoints adds a Map of the names and {@link Tap} pairs. 452 * 453 * @param checkpoints of Map<String, Tap> 454 * @return FlowDef 455 */ 456 public FlowDef addCheckpoints( Map<String, Tap> checkpoints ) 457 { 458 if( checkpoints != null ) 459 { 460 for( Map.Entry<String, Tap> entry : checkpoints.entrySet() ) 461 addCheckpoint( entry.getKey(), entry.getValue() ); 462 } 463 464 return this; 465 } 466 467 /** 468 * Method getTails returns all the current pipe assembly tails the FlowDef holds. 469 * 470 * @return the tails (type List<Pipe>) of this FlowDef object. 471 */ 472 public List<Pipe> getTails() 473 { 474 return tails; 475 } 476 477 /** 478 * Method getTailsArray returns all the current pipe assembly tails the FlowDef holds. 479 * 480 * @return the tailsArray (type Pipe[]) of this FlowDef object. 481 */ 482 public Pipe[] getTailsArray() 483 { 484 return tails.toArray( new Pipe[ tails.size() ] ); 485 } 486 487 /** 488 * Method addTail adds a new {@link Pipe} to this FlowDef that represents a tail in a pipe assembly. 489 * <p/> 490 * Be sure to add a sink tap that has the same name as this tail. 491 * 492 * @param tail of Pipe 493 * @return FlowDef 494 */ 495 public FlowDef addTail( Pipe tail ) 496 { 497 if( tail != null ) 498 this.tails.add( tail ); 499 500 return this; 501 } 502 503 /** 504 * Method addTails adds a Collection of tails. 505 * 506 * @param tails of Collection<Pipe> 507 * @return FlowDef 508 */ 509 public FlowDef addTails( Collection<Pipe> tails ) 510 { 511 for( Pipe tail : tails ) 512 addTail( tail ); 513 514 return this; 515 } 516 517 /** 518 * Method addTails adds an array of tails. 519 * 520 * @param tails of Pipe... 521 * @return FlowDef 522 */ 523 public FlowDef addTails( Pipe... tails ) 524 { 525 for( Pipe tail : tails ) 526 addTail( tail ); 527 528 return this; 529 } 530 531 public FlowDef setAssertionLevel( AssertionLevel assertionLevel ) 532 { 533 this.assertionLevel = assertionLevel; 534 535 return this; 536 } 537 538 public AssertionLevel getAssertionLevel() 539 { 540 return assertionLevel; 541 } 542 543 public FlowDef setDebugLevel( DebugLevel debugLevel ) 544 { 545 this.debugLevel = debugLevel; 546 547 return this; 548 } 549 550 public DebugLevel getDebugLevel() 551 { 552 return debugLevel; 553 } 554 555 /** 556 * Method setRunID sets the checkpoint run or execution ID to be used to find prior failed runs against 557 * this runID. 558 * <p/> 559 * When given, and a {@link Flow} fails to execute, a subsequent attempt to run the same Flow with the same 560 * runID will allow the Flow instance to start where it left off. 561 * <p/> 562 * Not all planners support this feature. 563 * <p/> 564 * A Flow name is required when using a runID. 565 * 566 * @param runID of type String 567 * @return FlowDef 568 */ 569 public FlowDef setRunID( String runID ) 570 { 571 if( runID != null && runID.isEmpty() ) 572 return this; 573 574 this.runID = runID; 575 576 return this; 577 } 578 579 public String getRunID() 580 { 581 return runID; 582 } 583 584 public List<String> getClassPath() 585 { 586 return classPath; 587 } 588 589 /** 590 * Adds each given artifact to the classpath the assembly will execute under allowing 591 * {@link cascading.pipe.Operator}s to dynamically load classes and resources from a {@link ClassLoader}. 592 * 593 * @param artifact a jar or other file String path 594 * @return FlowDef 595 */ 596 public FlowDef addToClassPath( String artifact ) 597 { 598 if( artifact == null || artifact.isEmpty() ) 599 return this; 600 601 classPath.add( artifact ); 602 603 return this; 604 } 605 }