001/* 002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.flow; 022 023import java.util.Collection; 024import java.util.Collections; 025import java.util.HashMap; 026import java.util.HashSet; 027import java.util.Map; 028import java.util.Properties; 029import java.util.Set; 030 031import cascading.CascadingException; 032import cascading.flow.planner.FlowPlanner; 033import cascading.flow.planner.PlatformInfo; 034import cascading.flow.planner.rule.RuleRegistrySet; 035import cascading.pipe.Pipe; 036import cascading.property.AppProps; 037import cascading.property.PropertyUtil; 038import cascading.scheme.Scheme; 039import cascading.tap.Tap; 040import cascading.util.Util; 041 042import static cascading.flow.FlowDef.flowDef; 043 044/** 045 * Class FlowConnector is the base class for all platform planners. 046 * <p/> 047 * See the {@link FlowDef} class for a fluent way to define a new Flow. 048 * <p/> 049 * Use the FlowConnector to link source and sink {@link Tap} instances with an assembly of {@link Pipe} instances into 050 * an executable {@link cascading.flow.Flow}. 051 * <p/> 052 * FlowConnector invokes a planner for the target execution environment. 053 * <p/> 054 * For executing Flows in local memory against local files, see {@link cascading.flow.local.LocalFlowConnector}. 055 * <p/> 056 * For Apache Hadoop, see the {@link cascading.flow.hadoop.HadoopFlowConnector}. 057 * Or if you have a pre-existing custom Hadoop job to execute, see {@link cascading.flow.hadoop.MapReduceFlow}, which 058 * doesn't require a planner. 059 * <p/> 060 * Note that all {@code connect} methods take a single {@code tail} or an array of {@code tail} Pipe instances. "tail" 061 * refers to the last connected Pipe instances in a pipe-assembly. Pipe-assemblies are graphs of object with "heads" 062 * and "tails". From a given "tail", all connected heads can be found, but not the reverse. So "tails" must be 063 * supplied by the user. 064 * <p/> 065 * The FlowConnector and the underlying execution framework (Hadoop or local mode) can be configured via a 066 * {@link Map} or {@link Properties} instance given to the constructor. 067 * <p/> 068 * This properties map must be populated before constructing a FlowConnector instance. Many planner specific 069 * properties can be set through the {@link FlowConnectorProps} fluent interface. 070 * <p/> 071 * Some planners have required properties. Hadoop expects {@link AppProps#setApplicationJarPath(java.util.Map, String)} or 072 * {@link AppProps#setApplicationJarClass(java.util.Map, Class)} to be set. 073 * <p/> 074 * Any properties set and passed through the FlowConnector constructor will be global to all Flow instances created through 075 * the that FlowConnector instance. Some properties are on the {@link FlowDef} and would only be applicable to the 076 * resulting Flow instance. 077 * <p/> 078 * These properties are used to influence the current planner and are also passed down to the 079 * execution framework to override any default values. For example when using the Hadoop planner, the number of reducers 080 * or mappers can be set by using platform specific properties. 081 * <p/> 082 * Custom operations (Functions, Filter, etc) may also retrieve these property values at runtime through calls to 083 * {@link cascading.flow.FlowProcess#getProperty(String)} or {@link FlowProcess#getStringProperty(String)}. 084 * <p/> 085 * Most applications will need to call {@link cascading.property.AppProps#setApplicationJarClass(java.util.Map, Class)} or 086 * {@link cascading.property.AppProps#setApplicationJarPath(java.util.Map, String)} so that 087 * the correct application jar file is passed through to all child processes. The Class or path must reference 088 * the custom application jar, not a Cascading library class or jar. The easiest thing to do is give setApplicationJarClass 089 * the Class with your static main function and let Cascading figure out which jar to use. 090 * <p/> 091 * Note that Map<Object,Object> is compatible with the {@link Properties} class, so properties can be loaded at 092 * runtime from a configuration file. 093 * <p/> 094 * By default, all {@link cascading.operation.Assertion}s are planned into the resulting Flow instance. This can be 095 * changed for a given Flow by calling {@link FlowDef#setAssertionLevel(cascading.operation.AssertionLevel)} or globally 096 * via {@link FlowConnectorProps#setAssertionLevel(cascading.operation.AssertionLevel)}. 097 * <p/> 098 * Also by default, all {@link cascading.operation.Debug}s are planned into the resulting Flow instance. This can be 099 * changed for a given flow by calling {@link FlowDef#setDebugLevel(cascading.operation.DebugLevel)} or globally via 100 * {@link FlowConnectorProps#setDebugLevel(cascading.operation.DebugLevel)}. 101 * <p/> 102 * As of version 3.0, custom {@link cascading.flow.planner.rule.RuleRegistry} instances can be provided to customize 103 * a given planner. 104 * 105 * @see cascading.flow.local.LocalFlowConnector 106 * @see cascading.flow.hadoop.HadoopFlowConnector 107 * @see cascading.flow.hadoop2.Hadoop2MR1FlowConnector 108 */ 109public abstract class FlowConnector 110 { 111 /** Field properties */ 112 protected Map<Object, Object> properties; // may be a Map or Properties instance. see PropertyUtil 113 114 private RuleRegistrySet ruleRegistrySet; 115 116 /** 117 * Method getIntermediateSchemeClass is used for debugging. 118 * 119 * @param properties of type Map<Object, Object> 120 * @return Class 121 */ 122 public Class getIntermediateSchemeClass( Map<Object, Object> properties ) 123 { 124 // supporting stuffed classes to overcome classloading issue 125 Object type = PropertyUtil.getProperty( properties, FlowConnectorProps.INTERMEDIATE_SCHEME_CLASS, null ); 126 127 if( type == null ) 128 return getDefaultIntermediateSchemeClass(); 129 130 if( type instanceof Class ) 131 return (Class) type; 132 133 try 134 { 135 return FlowConnector.class.getClassLoader().loadClass( type.toString() ); 136 } 137 catch( ClassNotFoundException exception ) 138 { 139 throw new CascadingException( "unable to load class: " + type.toString(), exception ); 140 } 141 } 142 143 protected abstract Class<? extends Scheme> getDefaultIntermediateSchemeClass(); 144 145 protected FlowConnector() 146 { 147 this.properties = new HashMap<>(); 148 } 149 150 protected FlowConnector( RuleRegistrySet ruleRegistrySet ) 151 { 152 this(); 153 this.ruleRegistrySet = ruleRegistrySet; 154 } 155 156 protected FlowConnector( Map<Object, Object> properties ) 157 { 158 if( properties == null ) 159 this.properties = new HashMap<>(); 160 else if( properties instanceof Properties ) 161 this.properties = new Properties( (Properties) properties ); 162 else 163 this.properties = new HashMap<>( properties ); 164 } 165 166 protected FlowConnector( Map<Object, Object> properties, RuleRegistrySet ruleRegistrySet ) 167 { 168 this( properties ); 169 this.ruleRegistrySet = ruleRegistrySet; 170 } 171 172 /** 173 * Method getProperties returns the properties of this FlowConnector object. The returned Map instance 174 * is immutable to prevent changes to the underlying property values in this FlowConnector instance. 175 * <p/> 176 * If a {@link Properties} instance was passed to the constructor, the returned object will be a flattened 177 * {@link Map} instance. 178 * 179 * @return the properties (type Map<Object, Object>) of this FlowConnector object. 180 */ 181 public Map<Object, Object> getProperties() 182 { 183 // Sub-classes of FlowConnector should rely on PropertyUtil to manage access to properties objects internally. 184 return Collections.unmodifiableMap( PropertyUtil.asFlatMap( properties ) ); 185 } 186 187 /** 188 * Method connect links the given source and sink Taps to the given pipe assembly. 189 * 190 * @param source source Tap to bind to the head of the given tail Pipe 191 * @param sink sink Tap to bind to the given tail Pipe 192 * @param tail tail end of a pipe assembly 193 * @return Flow 194 */ 195 public Flow connect( Tap source, Tap sink, Pipe tail ) 196 { 197 return connect( null, source, sink, tail ); 198 } 199 200 /** 201 * Method connect links the given source and sink Taps to the given pipe assembly. 202 * 203 * @param name name to give the resulting Flow 204 * @param source source Tap to bind to the head of the given tail Pipe 205 * @param sink sink Tap to bind to the given tail Pipe 206 * @param tail tail end of a pipe assembly 207 * @return Flow 208 */ 209 public Flow connect( String name, Tap source, Tap sink, Pipe tail ) 210 { 211 Map<String, Tap> sources = new HashMap<String, Tap>(); 212 213 sources.put( tail.getHeads()[ 0 ].getName(), source ); 214 215 return connect( name, sources, sink, tail ); 216 } 217 218 /** 219 * Method connect links the given source, sink, and trap Taps to the given pipe assembly. The given trap will 220 * be linked to the assembly head along with the source. 221 * 222 * @param name name to give the resulting Flow 223 * @param source source Tap to bind to the head of the given tail Pipe 224 * @param sink sink Tap to bind to the given tail Pipe 225 * @param trap trap Tap to sink all failed Tuples into 226 * @param tail tail end of a pipe assembly 227 * @return Flow 228 */ 229 public Flow connect( String name, Tap source, Tap sink, Tap trap, Pipe tail ) 230 { 231 Map<String, Tap> sources = new HashMap<String, Tap>(); 232 233 sources.put( tail.getHeads()[ 0 ].getName(), source ); 234 235 Map<String, Tap> traps = new HashMap<String, Tap>(); 236 237 traps.put( tail.getHeads()[ 0 ].getName(), trap ); 238 239 return connect( name, sources, sink, traps, tail ); 240 } 241 242 /** 243 * Method connect links the named source Taps and sink Tap to the given pipe assembly. 244 * 245 * @param sources all head names and source Taps to bind to the heads of the given tail Pipe 246 * @param sink sink Tap to bind to the given tail Pipe 247 * @param tail tail end of a pipe assembly 248 * @return Flow 249 */ 250 public Flow connect( Map<String, Tap> sources, Tap sink, Pipe tail ) 251 { 252 return connect( null, sources, sink, tail ); 253 } 254 255 /** 256 * Method connect links the named source Taps and sink Tap to the given pipe assembly. 257 * 258 * @param name name to give the resulting Flow 259 * @param sources all head names and source Taps to bind to the heads of the given tail Pipe 260 * @param sink sink Tap to bind to the given tail Pipe 261 * @param tail tail end of a pipe assembly 262 * @return Flow 263 */ 264 public Flow connect( String name, Map<String, Tap> sources, Tap sink, Pipe tail ) 265 { 266 Map<String, Tap> sinks = new HashMap<String, Tap>(); 267 268 sinks.put( tail.getName(), sink ); 269 270 return connect( name, sources, sinks, tail ); 271 } 272 273 /** 274 * Method connect links the named source and trap Taps and sink Tap to the given pipe assembly. 275 * 276 * @param name name to give the resulting Flow 277 * @param sources all head names and source Taps to bind to the heads of the given tail Pipe 278 * @param sink sink Tap to bind to the given tail Pipe 279 * @param traps all pipe names and trap Taps to sink all failed Tuples into 280 * @param tail tail end of a pipe assembly 281 * @return Flow 282 */ 283 public Flow connect( String name, Map<String, Tap> sources, Tap sink, Map<String, Tap> traps, Pipe tail ) 284 { 285 Map<String, Tap> sinks = new HashMap<String, Tap>(); 286 287 sinks.put( tail.getName(), sink ); 288 289 return connect( name, sources, sinks, traps, tail ); 290 } 291 292 /** 293 * Method connect links the named trap Taps, source and sink Tap to the given pipe assembly. 294 * 295 * @param name name to give the resulting Flow 296 * @param source source Tap to bind to the head of the given tail Pipe 297 * @param sink sink Tap to bind to the given tail Pipe 298 * @param traps all pipe names and trap Taps to sink all failed Tuples into 299 * @param tail tail end of a pipe assembly 300 * @return Flow 301 */ 302 public Flow connect( String name, Tap source, Tap sink, Map<String, Tap> traps, Pipe tail ) 303 { 304 Map<String, Tap> sources = new HashMap<String, Tap>(); 305 306 sources.put( tail.getHeads()[ 0 ].getName(), source ); 307 308 Map<String, Tap> sinks = new HashMap<String, Tap>(); 309 310 sinks.put( tail.getName(), sink ); 311 312 return connect( name, sources, sinks, traps, tail ); 313 } 314 315 /** 316 * Method connect links the named source Taps and sink Tap to the given pipe assembly. 317 * <p/> 318 * Since only once source Tap is given, it is assumed to be associated with the 'head' pipe. 319 * So the head pipe does not need to be included as an argument. 320 * 321 * @param source source Tap to bind to the head of the given tail Pipes 322 * @param sinks all tail names and sink Taps to bind to the given tail Pipes 323 * @param tails all tail ends of a pipe assembly 324 * @return Flow 325 */ 326 public Flow connect( Tap source, Map<String, Tap> sinks, Collection<Pipe> tails ) 327 { 328 return connect( null, source, sinks, tails.toArray( new Pipe[ tails.size() ] ) ); 329 } 330 331 /** 332 * Method connect links the named source Taps and sink Tap to the given pipe assembly. 333 * <p/> 334 * Since only once source Tap is given, it is assumed to be associated with the 'head' pipe. 335 * So the head pipe does not need to be included as an argument. 336 * 337 * @param name name to give the resulting Flow 338 * @param source source Tap to bind to the head of the given tail Pipes 339 * @param sinks all tail names and sink Taps to bind to the given tail Pipes 340 * @param tails all tail ends of a pipe assembly 341 * @return Flow 342 */ 343 public Flow connect( String name, Tap source, Map<String, Tap> sinks, Collection<Pipe> tails ) 344 { 345 return connect( name, source, sinks, tails.toArray( new Pipe[ tails.size() ] ) ); 346 } 347 348 /** 349 * Method connect links the named source Taps and sink Tap to the given pipe assembly. 350 * <p/> 351 * Since only once source Tap is given, it is assumed to be associated with the 'head' pipe. 352 * So the head pipe does not need to be included as an argument. 353 * 354 * @param source source Tap to bind to the head of the given tail Pipes 355 * @param sinks all tail names and sink Taps to bind to the given tail Pipes 356 * @param tails all tail ends of a pipe assembly 357 * @return Flow 358 */ 359 public Flow connect( Tap source, Map<String, Tap> sinks, Pipe... tails ) 360 { 361 return connect( null, source, sinks, tails ); 362 } 363 364 /** 365 * Method connect links the named source Taps and sink Tap to the given pipe assembly. 366 * <p/> 367 * Since only once source Tap is given, it is assumed to be associated with the 'head' pipe. 368 * So the head pipe does not need to be included as an argument. 369 * 370 * @param name name to give the resulting Flow 371 * @param source source Tap to bind to the head of the given tail Pipes 372 * @param sinks all tail names and sink Taps to bind to the given tail Pipes 373 * @param tails all tail ends of a pipe assembly 374 * @return Flow 375 */ 376 public Flow connect( String name, Tap source, Map<String, Tap> sinks, Pipe... tails ) 377 { 378 Set<Pipe> heads = new HashSet<Pipe>(); 379 380 for( Pipe pipe : tails ) 381 Collections.addAll( heads, pipe.getHeads() ); 382 383 if( heads.isEmpty() ) 384 throw new IllegalArgumentException( "no pipe instance found" ); 385 386 if( heads.size() != 1 ) 387 throw new IllegalArgumentException( "there may be only 1 head pipe instance, found " + heads.size() ); 388 389 Map<String, Tap> sources = new HashMap<String, Tap>(); 390 391 for( Pipe pipe : heads ) 392 sources.put( pipe.getName(), source ); 393 394 return connect( name, sources, sinks, tails ); 395 } 396 397 /** 398 * Method connect links the named sources and sinks to the given pipe assembly. 399 * 400 * @param sources all head names and source Taps to bind to the heads of the given tail Pipes 401 * @param sinks all tail names and sink Taps to bind to the given tail Pipes 402 * @param tails all tail ends of a pipe assembly 403 * @return Flow 404 */ 405 public Flow connect( Map<String, Tap> sources, Map<String, Tap> sinks, Pipe... tails ) 406 { 407 return connect( null, sources, sinks, tails ); 408 } 409 410 /** 411 * Method connect links the named sources and sinks to the given pipe assembly. 412 * 413 * @param name name to give the resulting Flow 414 * @param sources all head names and source Taps to bind to the heads of the given tail Pipes 415 * @param sinks all tail names and sink Taps to bind to the given tail Pipes 416 * @param tails all tail ends of a pipe assembly 417 * @return Flow 418 */ 419 public Flow connect( String name, Map<String, Tap> sources, Map<String, Tap> sinks, Pipe... tails ) 420 { 421 return connect( name, sources, sinks, new HashMap<String, Tap>(), tails ); 422 } 423 424 /** 425 * Method connect links the named sources, sinks and traps to the given pipe assembly. 426 * 427 * @param name name to give the resulting Flow 428 * @param sources all head names and source Taps to bind to the heads of the given tail Pipes 429 * @param sinks all tail names and sink Taps to bind to the given tail Pipes 430 * @param traps all pipe names and trap Taps to sink all failed Tuples into 431 * @param tails all tail ends of a pipe assembly 432 * @return Flow 433 */ 434 public Flow connect( String name, Map<String, Tap> sources, Map<String, Tap> sinks, Map<String, Tap> traps, Pipe... tails ) 435 { 436 name = name == null ? makeName( tails ) : name; 437 438 FlowDef flowDef = flowDef() 439 .setName( name ) 440 .addSources( sources ) 441 .addSinks( sinks ) 442 .addTraps( traps ) 443 .addTails( tails ); 444 445 return connect( flowDef ); 446 } 447 448 public Flow connect( FlowDef flowDef ) 449 { 450 FlowPlanner flowPlanner = createFlowPlanner(); 451 452 flowPlanner.initialize( this, properties ); 453 454 RuleRegistrySet ruleRegistrySet = getRuleRegistrySet(); 455 456 return flowPlanner.buildFlow( flowDef, ruleRegistrySet ); 457 } 458 459 protected abstract FlowPlanner createFlowPlanner(); 460 461 /** 462 * Returns the configured RuleRegistry, or the default for this platform. 463 * <p/> 464 * The registry is mutable, and will be applied to all subsequent planner operations via {@link #connect(FlowDef)}. 465 * 466 * @return the current RuleRegistry instance 467 */ 468 public RuleRegistrySet getRuleRegistrySet() 469 { 470 if( ruleRegistrySet != null ) 471 return ruleRegistrySet; 472 473 ruleRegistrySet = createDefaultRuleRegistrySet(); 474 475 return ruleRegistrySet; 476 } 477 478 protected abstract RuleRegistrySet createDefaultRuleRegistrySet(); 479 480 /** 481 * Method getPlatformInfo returns an instance of {@link PlatformInfo} for the underlying platform. 482 * 483 * @return of type PlatformInfo 484 */ 485 public PlatformInfo getPlatformInfo() 486 { 487 return createFlowPlanner().getPlatformInfo(); 488 } 489 490 ///////// 491 // UTIL 492 ///////// 493 494 private String makeName( Pipe[] pipes ) 495 { 496 String[] names = new String[ pipes.length ]; 497 498 for( int i = 0; i < pipes.length; i++ ) 499 names[ i ] = pipes[ i ].getName(); 500 501 String name = Util.join( names, "+" ); 502 503 if( name.length() > 32 ) 504 name = name.substring( 0, 32 ); 505 506 return name; 507 } 508 }