001/* 002 * Copyright (c) 2016-2017 Chris K Wensel <chris@wensel.net>. All Rights Reserved. 003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 004 * 005 * Project and contact information: http://www.cascading.org/ 006 * 007 * This file is part of the Cascading project. 008 * 009 * Licensed under the Apache License, Version 2.0 (the "License"); 010 * you may not use this file except in compliance with the License. 011 * You may obtain a copy of the License at 012 * 013 * http://www.apache.org/licenses/LICENSE-2.0 014 * 015 * Unless required by applicable law or agreed to in writing, software 016 * distributed under the License is distributed on an "AS IS" BASIS, 017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 018 * See the License for the specific language governing permissions and 019 * limitations under the License. 020 */ 021 022package cascading.pipe; 023 024import java.beans.ConstructorProperties; 025import java.io.Serializable; 026import java.util.Collections; 027import java.util.HashSet; 028import java.util.Set; 029 030import cascading.flow.FlowElement; 031import cascading.flow.planner.Scope; 032import cascading.flow.planner.ScopedElement; 033import cascading.property.ConfigDef; 034import cascading.tuple.Fields; 035import cascading.util.TraceUtil; 036import cascading.util.Traceable; 037import cascading.util.Util; 038 039import static java.util.Arrays.asList; 040 041/** 042 * Class Pipe is used to name branches in pipe assemblies, and as a base class for core 043 * processing model types, specifically {@link Each}, {@link Every}, {@link GroupBy}, 044 * {@link CoGroup}, {@link Merge}, {@link HashJoin}, and {@link SubAssembly}. 045 * <p> 046 * Pipes are chained together through their constructors. 047 * <p> 048 * To effect a split in the pipe, 049 * simply pass a Pipe instance to two or more constructors of subsequent Pipe instances. 050 * <p> 051 * A join can be achieved by passing two or more Pipe instances to a {@link CoGroup} or {@link HashJoin} pipe. 052 * <p> 053 * A merge can be achieved by passing two or more Pipe instances to a {@link GroupBy} or {@link Merge} pipe. 054 * 055 * @see Each 056 * @see Every 057 * @see GroupBy 058 * @see Merge 059 * @see CoGroup 060 * @see HashJoin 061 * @see SubAssembly 062 */ 063public class Pipe implements ScopedElement, FlowElement, Serializable, Traceable 064 { 065 /** Field serialVersionUID */ 066 private static final long serialVersionUID = 1L; 067 /** Field name */ 068 protected String name; 069 /** Field previous */ 070 protected Pipe previous; 071 /** Field parent */ 072 protected Pipe parent; 073 074 protected ConfigDef configDef; 075 076 protected ConfigDef stepConfigDef; 077 078 protected ConfigDef nodeConfigDef; 079 080 /** Field id */ 081 private final String id = Util.createUniqueID(); // 3.0 planner relies on this being consistent 082 /** Field trace */ 083 private String trace = TraceUtil.captureDebugTrace( this ); // see TraceUtil.setTrace() to override 084 085 public static synchronized String id( Pipe pipe ) 086 { 087 return pipe.id; 088 } 089 090 /** 091 * Convenience method to create an array of Pipe instances. 092 * 093 * @param pipes vararg list of pipes 094 * @return array of pipes 095 */ 096 public static Pipe[] pipes( Pipe... pipes ) 097 { 098 return pipes; 099 } 100 101 /** 102 * Convenience method for finding all Pipe names in an assembly. 103 * 104 * @param tails vararg list of all tails in given assembly 105 * @return array of Pipe names 106 */ 107 public static String[] names( Pipe... tails ) 108 { 109 Set<String> names = new HashSet<String>(); 110 111 collectNames( tails, names ); 112 113 return names.toArray( new String[ names.size() ] ); 114 } 115 116 private static void collectNames( Pipe[] pipes, Set<String> names ) 117 { 118 for( Pipe pipe : pipes ) 119 { 120 if( pipe instanceof SubAssembly ) 121 names.addAll( asList( ( (SubAssembly) pipe ).getTailNames() ) ); 122 else 123 names.add( pipe.getName() ); 124 125 collectNames( SubAssembly.unwind( pipe.getPrevious() ), names ); 126 } 127 } 128 129 public static Pipe[] named( String name, Pipe... tails ) 130 { 131 Set<Pipe> pipes = new HashSet<Pipe>(); 132 133 collectPipes( name, tails, pipes ); 134 135 return pipes.toArray( new Pipe[ pipes.size() ] ); 136 } 137 138 private static void collectPipes( String name, Pipe[] tails, Set<Pipe> pipes ) 139 { 140 for( Pipe tail : tails ) 141 { 142 if( !( tail instanceof SubAssembly ) && tail.getName().equals( name ) ) 143 pipes.add( tail ); 144 145 collectPipes( name, SubAssembly.unwind( tail.getPrevious() ), pipes ); 146 } 147 } 148 149 static Pipe[] resolvePreviousAll( Pipe... pipes ) 150 { 151 Pipe[] resolved = new Pipe[ pipes.length ]; 152 153 for( int i = 0; i < pipes.length; i++ ) 154 resolved[ i ] = resolvePrevious( pipes[ i ] ); 155 156 return resolved; 157 } 158 159 static Pipe resolvePrevious( Pipe pipe ) 160 { 161 if( pipe instanceof Splice || pipe instanceof Operator ) 162 return pipe; 163 164 Pipe[] pipes = pipe.getPrevious(); 165 166 if( pipes.length > 1 ) 167 throw new IllegalStateException( "cannot resolve SubAssemblies with multiple tails at this time" ); 168 169 for( Pipe previous : pipes ) 170 { 171 if( previous instanceof Splice || previous instanceof Operator ) 172 return previous; 173 174 return resolvePrevious( previous ); 175 } 176 177 return pipe; 178 } 179 180 protected Pipe() 181 { 182 } 183 184 @ConstructorProperties({"previous"}) 185 protected Pipe( Pipe previous ) 186 { 187 this.previous = previous; 188 189 verifyPipe(); 190 } 191 192 /** 193 * Constructor Pipe creates a new Pipe instance with the given name. This is useful as the 'start' or head 194 * of a pipe assembly. 195 * 196 * @param name name for this branch of Pipes 197 */ 198 @ConstructorProperties({"name"}) 199 public Pipe( String name ) 200 { 201 this.name = name; 202 } 203 204 /** 205 * Constructor Pipe creates a new Pipe instance with the given name and previous Pipe instance. This is useful for 206 * naming a branch in a pipe assembly. Or renaming the branch mid-way down. 207 * 208 * @param name name for this branch of Pipes 209 * @param previous previous Pipe to receive input Tuples from 210 */ 211 @ConstructorProperties({"name", "previous"}) 212 public Pipe( String name, Pipe previous ) 213 { 214 this.name = name; 215 this.previous = previous; 216 217 verifyPipe(); 218 } 219 220 private void verifyPipe() 221 { 222 if( !( previous instanceof SubAssembly ) ) 223 return; 224 225 String[] strings = ( (SubAssembly) previous ).getTailNames(); 226 if( strings.length != 1 ) 227 throw new IllegalArgumentException( "pipe assembly must not return more than one tail pipe instance, found " + Util.join( strings, ", " ) ); 228 } 229 230 /** 231 * Get the name of this pipe. Guaranteed non-null. 232 * 233 * @return String the name of this pipe 234 */ 235 public String getName() 236 { 237 if( name != null ) 238 return name; 239 240 if( previous != null ) 241 { 242 name = previous.getName(); 243 244 return name; 245 } 246 247 return "ANONYMOUS"; 248 } 249 250 /** 251 * Get all the upstream pipes this pipe is connected to. This method will return the Pipe instances 252 * passed on the constructors as inputs to this Pipe instance. 253 * 254 * @return all the upstream pipes this pipe is connected to. 255 */ 256 public Pipe[] getPrevious() 257 { 258 if( previous == null ) 259 return new Pipe[ 0 ]; 260 261 return new Pipe[]{previous}; 262 } 263 264 protected void setParent( Pipe parent ) 265 { 266 this.parent = parent; 267 } 268 269 /** 270 * Returns the enclosing parent Pipe instance, if any. A parent is typically a {@link SubAssembly} that wraps 271 * this instance. 272 * 273 * @return of type Pipe 274 */ 275 public Pipe getParent() 276 { 277 return parent; 278 } 279 280 /** 281 * Returns a {@link ConfigDef} instance that allows for local properties to be set and made available via 282 * a resulting {@link cascading.flow.FlowProcess} instance when the pipe is invoked. 283 * <p> 284 * Any properties set on the configDef will not show up in any {@link cascading.flow.Flow} or 285 * {@link cascading.flow.FlowStep} process level configuration, but will override any of those values as seen by the 286 * current Pipe instance. 287 * 288 * @return an instance of ConfigDef 289 */ 290 @Override 291 public ConfigDef getConfigDef() 292 { 293 if( configDef == null ) 294 configDef = new ConfigDef(); 295 296 return configDef; 297 } 298 299 /** 300 * Returns {@code true} if there are properties in the configDef instance. 301 * 302 * @return true if there are configDef properties 303 */ 304 @Override 305 public boolean hasConfigDef() 306 { 307 return configDef != null && !configDef.isEmpty(); 308 } 309 310 /** 311 * Returns a {@link ConfigDef} instance that allows for process level properties to be set and made available via 312 * a resulting {@link cascading.flow.FlowProcess} instance when the pipe is invoked. 313 * <p> 314 * Any properties set on the nodeConfigDef will not show up in any Flow configuration, but will show up in 315 * the current process {@link cascading.flow.FlowNode} (in Apache Tez the Vertex configuration). Any value set in the 316 * nodeConfigDef will be overridden by the pipe local {@code #getConfigDef} instance. 317 * <p> 318 * Use this method to tweak properties in the process node this pipe instance is planned into. In the case of the 319 * Apache Tez platform, when set on a {@link GroupBy} instance, the number of gather partitions can be modified. 320 * <p> 321 * In the case of any Pipe that spans FlowNode boundaries, like GroupBy and CoGroup may on some platforms, 322 * any ConfigDef properties will be applied to the downstream FlowNode. That is, if a GroupBy is the source 323 * to a node, any node ConfigDef properties will be applied. If the GroupBy encountered when applying properties 324 * is on the sink side of a node, the properties will be ignored. 325 * 326 * @return an instance of ConfigDef 327 */ 328 @Override 329 public ConfigDef getNodeConfigDef() 330 { 331 if( nodeConfigDef == null ) 332 nodeConfigDef = new ConfigDef(); 333 334 return nodeConfigDef; 335 } 336 337 /** 338 * Returns {@code true} if there are properties in the nodeConfigDef instance. 339 * 340 * @return true if there are nodeConfigDef properties 341 */ 342 @Override 343 public boolean hasNodeConfigDef() 344 { 345 return nodeConfigDef != null && !nodeConfigDef.isEmpty(); 346 } 347 348 /** 349 * Returns a {@link ConfigDef} instance that allows for process level properties to be set and made available via 350 * a resulting {@link cascading.flow.FlowProcess} instance when the pipe is invoked. 351 * <p> 352 * Any properties set on the stepConfigDef will not show up in any Flow configuration, but will show up in 353 * the current process {@link cascading.flow.FlowStep} (in Hadoop the MapReduce jobconf). Any value set in the 354 * stepConfigDef will be overridden by the pipe local {@code #getConfigDef} instance. 355 * <p> 356 * Use this method to tweak properties in the process step this pipe instance is planned into. In the case of the 357 * Hadoop platform, when set on a {@link GroupBy} instance, the number of reducers can be modified. 358 * 359 * @return an instance of ConfigDef 360 */ 361 @Override 362 public ConfigDef getStepConfigDef() 363 { 364 if( stepConfigDef == null ) 365 stepConfigDef = new ConfigDef(); 366 367 return stepConfigDef; 368 } 369 370 /** 371 * Returns {@code true} if there are properties in the stepConfigDef instance. 372 * 373 * @return true if there are stepConfigDef properties 374 */ 375 @Override 376 public boolean hasStepConfigDef() 377 { 378 return stepConfigDef != null && !stepConfigDef.isEmpty(); 379 } 380 381 /** 382 * Method getHeads returns the first Pipe instances in this pipe assembly. 383 * 384 * @return the first (type Pipe[]) of this Pipe object. 385 */ 386 public Pipe[] getHeads() 387 { 388 Pipe[] pipes = getPrevious(); 389 390 if( pipes.length == 0 ) 391 return new Pipe[]{this}; 392 393 if( pipes.length == 1 ) 394 return pipes[ 0 ].getHeads(); 395 396 Set<Pipe> heads = new HashSet<Pipe>(); 397 398 for( Pipe pipe : pipes ) 399 Collections.addAll( heads, pipe.getHeads() ); 400 401 return heads.toArray( new Pipe[ heads.size() ] ); 402 } 403 404 @Override 405 public Scope outgoingScopeFor( Set<Scope> incomingScopes ) 406 { 407 return incomingScopes.iterator().next(); 408 } 409 410 @Override 411 public Fields resolveIncomingOperationArgumentFields( Scope incomingScope ) 412 { 413 throw new IllegalStateException( "resolveIncomingOperationFields should never be called" ); 414 } 415 416 @Override 417 public Fields resolveIncomingOperationPassThroughFields( Scope incomingScope ) 418 { 419 throw new IllegalStateException( "resolveIncomingOperationPassThroughFields should never be called" ); 420 } 421 422 @Override 423 public String getTrace() 424 { 425 return trace; 426 } 427 428 @Override 429 public String toString() 430 { 431 return getClass().getSimpleName() + "(" + getName() + ")"; 432 } 433 434 Scope getFirst( Set<Scope> incomingScopes ) 435 { 436 return incomingScopes.iterator().next(); 437 } 438 439 @SuppressWarnings({"EqualsWhichDoesntCheckParameterClass"}) 440 @Override 441 public boolean equals( Object object ) 442 { 443 // we cannot test equality by names for this class, prevents detection of dupe names in heads or tails 444 return this == object; 445 } 446 447 @Override 448 public int hashCode() 449 { 450 return 31 * getName().hashCode() + getClass().hashCode(); 451 } 452 453 /** 454 * Method print is used internally. 455 * 456 * @param scope of type Scope 457 * @return String 458 */ 459 public String print( Scope scope ) 460 { 461 StringBuffer buffer = new StringBuffer(); 462 463 printInternal( buffer, scope ); 464 465 return buffer.toString(); 466 } 467 468 protected void printInternal( StringBuffer buffer, Scope scope ) 469 { 470 buffer.append( getClass().getSimpleName() ).append( "('" ).append( getName() ).append( "')" ); 471 } 472 }