001/* 002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.pipe; 022 023import java.beans.ConstructorProperties; 024import java.io.Serializable; 025import java.util.Collections; 026import java.util.HashSet; 027import java.util.Set; 028 029import cascading.flow.FlowElement; 030import cascading.flow.planner.Scope; 031import cascading.flow.planner.ScopedElement; 032import cascading.property.ConfigDef; 033import cascading.tuple.Fields; 034import cascading.util.TraceUtil; 035import cascading.util.Traceable; 036import cascading.util.Util; 037 038import static java.util.Arrays.asList; 039 040/** 041 * Class Pipe is used to name branches in pipe assemblies, and as a base class for core 042 * processing model types, specifically {@link Each}, {@link Every}, {@link GroupBy}, 043 * {@link CoGroup}, {@link Merge}, {@link HashJoin}, and {@link SubAssembly}. 044 * <p/> 045 * Pipes are chained together through their constructors. 046 * <p/> 047 * To effect a split in the pipe, 048 * simply pass a Pipe instance to two or more constructors of subsequent Pipe instances. 049 * </p> 050 * A join can be achieved by passing two or more Pipe instances to a {@link CoGroup} or {@link HashJoin} pipe. 051 * <p/> 052 * A merge can be achieved by passing two or more Pipe instances to a {@link GroupBy} or {@link Merge} pipe. 053 * 054 * @see Each 055 * @see Every 056 * @see GroupBy 057 * @see Merge 058 * @see CoGroup 059 * @see HashJoin 060 * @see SubAssembly 061 */ 062public class Pipe implements ScopedElement, FlowElement, Serializable, Traceable 063 { 064 /** Field serialVersionUID */ 065 private static final long serialVersionUID = 1L; 066 /** Field name */ 067 protected String name; 068 /** Field previous */ 069 protected Pipe previous; 070 /** Field parent */ 071 protected Pipe parent; 072 073 protected ConfigDef configDef; 074 075 protected ConfigDef stepConfigDef; 076 077 protected ConfigDef nodeConfigDef; 078 079 /** Field id */ 080 private final String id = Util.createUniqueID(); // 3.0 planner relies on this being consistent 081 /** Field trace */ 082 private String trace = TraceUtil.captureDebugTrace( this ); // see TraceUtil.setTrace() to override 083 084 public static synchronized String id( Pipe pipe ) 085 { 086 return pipe.id; 087 } 088 089 /** 090 * Convenience method to create an array of Pipe instances. 091 * 092 * @param pipes vararg list of pipes 093 * @return array of pipes 094 */ 095 public static Pipe[] pipes( Pipe... pipes ) 096 { 097 return pipes; 098 } 099 100 /** 101 * Convenience method for finding all Pipe names in an assembly. 102 * 103 * @param tails vararg list of all tails in given assembly 104 * @return array of Pipe names 105 */ 106 public static String[] names( Pipe... tails ) 107 { 108 Set<String> names = new HashSet<String>(); 109 110 collectNames( tails, names ); 111 112 return names.toArray( new String[ names.size() ] ); 113 } 114 115 private static void collectNames( Pipe[] pipes, Set<String> names ) 116 { 117 for( Pipe pipe : pipes ) 118 { 119 if( pipe instanceof SubAssembly ) 120 names.addAll( asList( ( (SubAssembly) pipe ).getTailNames() ) ); 121 else 122 names.add( pipe.getName() ); 123 124 collectNames( SubAssembly.unwind( pipe.getPrevious() ), names ); 125 } 126 } 127 128 public static Pipe[] named( String name, Pipe... tails ) 129 { 130 Set<Pipe> pipes = new HashSet<Pipe>(); 131 132 collectPipes( name, tails, pipes ); 133 134 return pipes.toArray( new Pipe[ pipes.size() ] ); 135 } 136 137 private static void collectPipes( String name, Pipe[] tails, Set<Pipe> pipes ) 138 { 139 for( Pipe tail : tails ) 140 { 141 if( !( tail instanceof SubAssembly ) && tail.getName().equals( name ) ) 142 pipes.add( tail ); 143 144 collectPipes( name, SubAssembly.unwind( tail.getPrevious() ), pipes ); 145 } 146 } 147 148 static Pipe[] resolvePreviousAll( Pipe... pipes ) 149 { 150 Pipe[] resolved = new Pipe[ pipes.length ]; 151 152 for( int i = 0; i < pipes.length; i++ ) 153 resolved[ i ] = resolvePrevious( pipes[ i ] ); 154 155 return resolved; 156 } 157 158 static Pipe resolvePrevious( Pipe pipe ) 159 { 160 if( pipe instanceof Splice || pipe instanceof Operator ) 161 return pipe; 162 163 Pipe[] pipes = pipe.getPrevious(); 164 165 if( pipes.length > 1 ) 166 throw new IllegalStateException( "cannot resolve SubAssemblies with multiple tails at this time" ); 167 168 for( Pipe previous : pipes ) 169 { 170 if( previous instanceof Splice || previous instanceof Operator ) 171 return previous; 172 173 return resolvePrevious( previous ); 174 } 175 176 return pipe; 177 } 178 179 protected Pipe() 180 { 181 } 182 183 @ConstructorProperties({"previous"}) 184 protected Pipe( Pipe previous ) 185 { 186 this.previous = previous; 187 188 verifyPipe(); 189 } 190 191 /** 192 * Constructor Pipe creates a new Pipe instance with the given name. This is useful as the 'start' or head 193 * of a pipe assembly. 194 * 195 * @param name name for this branch of Pipes 196 */ 197 @ConstructorProperties({"name"}) 198 public Pipe( String name ) 199 { 200 this.name = name; 201 } 202 203 /** 204 * Constructor Pipe creates a new Pipe instance with the given name and previous Pipe instance. This is useful for 205 * naming a branch in a pipe assembly. Or renaming the branch mid-way down. 206 * 207 * @param name name for this branch of Pipes 208 * @param previous previous Pipe to receive input Tuples from 209 */ 210 @ConstructorProperties({"name", "previous"}) 211 public Pipe( String name, Pipe previous ) 212 { 213 this.name = name; 214 this.previous = previous; 215 216 verifyPipe(); 217 } 218 219 private void verifyPipe() 220 { 221 if( !( previous instanceof SubAssembly ) ) 222 return; 223 224 String[] strings = ( (SubAssembly) previous ).getTailNames(); 225 if( strings.length != 1 ) 226 throw new IllegalArgumentException( "pipe assembly must not return more than one tail pipe instance, found " + Util.join( strings, ", " ) ); 227 } 228 229 /** 230 * Get the name of this pipe. Guaranteed non-null. 231 * 232 * @return String the name of this pipe 233 */ 234 public String getName() 235 { 236 if( name != null ) 237 return name; 238 239 if( previous != null ) 240 { 241 name = previous.getName(); 242 243 return name; 244 } 245 246 return "ANONYMOUS"; 247 } 248 249 /** 250 * Get all the upstream pipes this pipe is connected to. This method will return the Pipe instances 251 * passed on the constructors as inputs to this Pipe instance. 252 * 253 * @return all the upstream pipes this pipe is connected to. 254 */ 255 public Pipe[] getPrevious() 256 { 257 if( previous == null ) 258 return new Pipe[ 0 ]; 259 260 return new Pipe[]{previous}; 261 } 262 263 protected void setParent( Pipe parent ) 264 { 265 this.parent = parent; 266 } 267 268 /** 269 * Returns the enclosing parent Pipe instance, if any. A parent is typically a {@link SubAssembly} that wraps 270 * this instance. 271 * 272 * @return of type Pipe 273 */ 274 public Pipe getParent() 275 { 276 return parent; 277 } 278 279 /** 280 * Returns a {@link ConfigDef} instance that allows for local properties to be set and made available via 281 * a resulting {@link cascading.flow.FlowProcess} instance when the pipe is invoked. 282 * <p/> 283 * Any properties set on the configDef will not show up in any {@link cascading.flow.Flow} or 284 * {@link cascading.flow.FlowStep} process level configuration, but will override any of those values as seen by the 285 * current Pipe instance. 286 * 287 * @return an instance of ConfigDef 288 */ 289 @Override 290 public ConfigDef getConfigDef() 291 { 292 if( configDef == null ) 293 configDef = new ConfigDef(); 294 295 return configDef; 296 } 297 298 /** 299 * Returns {@code true} if there are properties in the configDef instance. 300 * 301 * @return true if there are configDef properties 302 */ 303 @Override 304 public boolean hasConfigDef() 305 { 306 return configDef != null && !configDef.isEmpty(); 307 } 308 309 /** 310 * Returns a {@link ConfigDef} instance that allows for process level properties to be set and made available via 311 * a resulting {@link cascading.flow.FlowProcess} instance when the pipe is invoked. 312 * <p/> 313 * Any properties set on the nodeConfigDef will not show up in any Flow configuration, but will show up in 314 * the current process {@link cascading.flow.FlowNode} (in Apache Tez the Vertex configuration). Any value set in the 315 * nodeConfigDef will be overridden by the pipe local {@code #getConfigDef} instance. 316 * </p> 317 * Use this method to tweak properties in the process node this pipe instance is planned into. In the case of the 318 * Apache Tez platform, when set on a {@link GroupBy} instance, the number of gather partitions can be modified. 319 * <p/> 320 * In the case of any Pipe that spans FlowNode boundaries, like GroupBy and CoGroup may on some platforms, 321 * any ConfigDef properties will be applied to the downstream FlowNode. That is, if a GroupBy is the source 322 * to a node, any node ConfigDef properties will be applied. If the GroupBy encountered when applying properties 323 * is on the sink side of a node, the properties will be ignored. 324 * 325 * @return an instance of ConfigDef 326 */ 327 @Override 328 public ConfigDef getNodeConfigDef() 329 { 330 if( nodeConfigDef == null ) 331 nodeConfigDef = new ConfigDef(); 332 333 return nodeConfigDef; 334 } 335 336 /** 337 * Returns {@code true} if there are properties in the nodeConfigDef instance. 338 * 339 * @return true if there are nodeConfigDef properties 340 */ 341 @Override 342 public boolean hasNodeConfigDef() 343 { 344 return nodeConfigDef != null && !nodeConfigDef.isEmpty(); 345 } 346 347 /** 348 * Returns a {@link ConfigDef} instance that allows for process level properties to be set and made available via 349 * a resulting {@link cascading.flow.FlowProcess} instance when the pipe is invoked. 350 * <p/> 351 * Any properties set on the stepConfigDef will not show up in any Flow configuration, but will show up in 352 * the current process {@link cascading.flow.FlowStep} (in Hadoop the MapReduce jobconf). Any value set in the 353 * stepConfigDef will be overridden by the pipe local {@code #getConfigDef} instance. 354 * </p> 355 * Use this method to tweak properties in the process step this pipe instance is planned into. In the case of the 356 * Hadoop platform, when set on a {@link GroupBy} instance, the number of reducers can be modified. 357 * 358 * @return an instance of ConfigDef 359 */ 360 @Override 361 public ConfigDef getStepConfigDef() 362 { 363 if( stepConfigDef == null ) 364 stepConfigDef = new ConfigDef(); 365 366 return stepConfigDef; 367 } 368 369 /** 370 * Returns {@code true} if there are properties in the stepConfigDef instance. 371 * 372 * @return true if there are stepConfigDef properties 373 */ 374 @Override 375 public boolean hasStepConfigDef() 376 { 377 return stepConfigDef != null && !stepConfigDef.isEmpty(); 378 } 379 380 /** 381 * Method getHeads returns the first Pipe instances in this pipe assembly. 382 * 383 * @return the first (type Pipe[]) of this Pipe object. 384 */ 385 public Pipe[] getHeads() 386 { 387 Pipe[] pipes = getPrevious(); 388 389 if( pipes.length == 0 ) 390 return new Pipe[]{this}; 391 392 if( pipes.length == 1 ) 393 return pipes[ 0 ].getHeads(); 394 395 Set<Pipe> heads = new HashSet<Pipe>(); 396 397 for( Pipe pipe : pipes ) 398 Collections.addAll( heads, pipe.getHeads() ); 399 400 return heads.toArray( new Pipe[ heads.size() ] ); 401 } 402 403 @Override 404 public Scope outgoingScopeFor( Set<Scope> incomingScopes ) 405 { 406 return incomingScopes.iterator().next(); 407 } 408 409 @Override 410 public Fields resolveIncomingOperationArgumentFields( Scope incomingScope ) 411 { 412 throw new IllegalStateException( "resolveIncomingOperationFields should never be called" ); 413 } 414 415 @Override 416 public Fields resolveIncomingOperationPassThroughFields( Scope incomingScope ) 417 { 418 throw new IllegalStateException( "resolveIncomingOperationPassThroughFields should never be called" ); 419 } 420 421 @Override 422 public String getTrace() 423 { 424 return trace; 425 } 426 427 @Override 428 public String toString() 429 { 430 return getClass().getSimpleName() + "(" + getName() + ")"; 431 } 432 433 Scope getFirst( Set<Scope> incomingScopes ) 434 { 435 return incomingScopes.iterator().next(); 436 } 437 438 @SuppressWarnings({"EqualsWhichDoesntCheckParameterClass"}) 439 @Override 440 public boolean equals( Object object ) 441 { 442 // we cannot test equality by names for this class, prevents detection of dupe names in heads or tails 443 return this == object; 444 } 445 446 @Override 447 public int hashCode() 448 { 449 return 31 * getName().hashCode() + getClass().hashCode(); 450 } 451 452 /** 453 * Method print is used internally. 454 * 455 * @param scope of type Scope 456 * @return String 457 */ 458 public String print( Scope scope ) 459 { 460 StringBuffer buffer = new StringBuffer(); 461 462 printInternal( buffer, scope ); 463 464 return buffer.toString(); 465 } 466 467 protected void printInternal( StringBuffer buffer, Scope scope ) 468 { 469 buffer.append( getClass().getSimpleName() ).append( "('" ).append( getName() ).append( "')" ); 470 } 471 }