001/*
002 * Copyright (c) 2016-2017 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
004 *
005 * Project and contact information: http://www.cascading.org/
006 *
007 * This file is part of the Cascading project.
008 *
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *     http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 */
021
022package cascading.pipe;
023
024import java.beans.ConstructorProperties;
025import java.io.Serializable;
026import java.util.Collections;
027import java.util.HashSet;
028import java.util.Set;
029
030import cascading.flow.FlowElement;
031import cascading.flow.planner.Scope;
032import cascading.flow.planner.ScopedElement;
033import cascading.property.ConfigDef;
034import cascading.tuple.Fields;
035import cascading.util.TraceUtil;
036import cascading.util.Traceable;
037import cascading.util.Util;
038
039import static java.util.Arrays.asList;
040
041/**
042 * Class Pipe is used to name branches in pipe assemblies, and as a base class for core
043 * processing model types, specifically {@link Each}, {@link Every}, {@link GroupBy},
044 * {@link CoGroup}, {@link Merge}, {@link HashJoin}, and {@link SubAssembly}.
045 * <p>
046 * Pipes are chained together through their constructors.
047 * <p>
048 * To effect a split in the pipe,
049 * simply pass a Pipe instance to two or more constructors of subsequent Pipe instances.
050 * <p>
051 * A join can be achieved by passing two or more Pipe instances to a {@link CoGroup} or {@link HashJoin} pipe.
052 * <p>
053 * A merge can be achieved by passing two or more Pipe instances to a {@link GroupBy} or {@link Merge} pipe.
054 *
055 * @see Each
056 * @see Every
057 * @see GroupBy
058 * @see Merge
059 * @see CoGroup
060 * @see HashJoin
061 * @see SubAssembly
062 */
063public class Pipe implements ScopedElement, FlowElement, Serializable, Traceable
064  {
065  /** Field serialVersionUID */
066  private static final long serialVersionUID = 1L;
067  /** Field name */
068  protected String name;
069  /** Field previous */
070  protected Pipe previous;
071  /** Field parent */
072  protected Pipe parent;
073
074  protected ConfigDef configDef;
075
076  protected ConfigDef stepConfigDef;
077
078  protected ConfigDef nodeConfigDef;
079
080  /** Field id */
081  private final String id = Util.createUniqueID(); // 3.0 planner relies on this being consistent
082  /** Field trace */
083  private String trace = TraceUtil.captureDebugTrace( this ); // see TraceUtil.setTrace() to override
084
085  public static synchronized String id( Pipe pipe )
086    {
087    return pipe.id;
088    }
089
090  /**
091   * Convenience method to create an array of Pipe instances.
092   *
093   * @param pipes vararg list of pipes
094   * @return array of pipes
095   */
096  public static Pipe[] pipes( Pipe... pipes )
097    {
098    return pipes;
099    }
100
101  /**
102   * Convenience method for finding all Pipe names in an assembly.
103   *
104   * @param tails vararg list of all tails in given assembly
105   * @return array of Pipe names
106   */
107  public static String[] names( Pipe... tails )
108    {
109    Set<String> names = new HashSet<String>();
110
111    collectNames( tails, names );
112
113    return names.toArray( new String[ names.size() ] );
114    }
115
116  private static void collectNames( Pipe[] pipes, Set<String> names )
117    {
118    for( Pipe pipe : pipes )
119      {
120      if( pipe instanceof SubAssembly )
121        names.addAll( asList( ( (SubAssembly) pipe ).getTailNames() ) );
122      else
123        names.add( pipe.getName() );
124
125      collectNames( SubAssembly.unwind( pipe.getPrevious() ), names );
126      }
127    }
128
129  public static Pipe[] named( String name, Pipe... tails )
130    {
131    Set<Pipe> pipes = new HashSet<Pipe>();
132
133    collectPipes( name, tails, pipes );
134
135    return pipes.toArray( new Pipe[ pipes.size() ] );
136    }
137
138  private static void collectPipes( String name, Pipe[] tails, Set<Pipe> pipes )
139    {
140    for( Pipe tail : tails )
141      {
142      if( !( tail instanceof SubAssembly ) && tail.getName().equals( name ) )
143        pipes.add( tail );
144
145      collectPipes( name, SubAssembly.unwind( tail.getPrevious() ), pipes );
146      }
147    }
148
149  static Pipe[] resolvePreviousAll( Pipe... pipes )
150    {
151    Pipe[] resolved = new Pipe[ pipes.length ];
152
153    for( int i = 0; i < pipes.length; i++ )
154      resolved[ i ] = resolvePrevious( pipes[ i ] );
155
156    return resolved;
157    }
158
159  static Pipe resolvePrevious( Pipe pipe )
160    {
161    if( pipe instanceof Splice || pipe instanceof Operator )
162      return pipe;
163
164    Pipe[] pipes = pipe.getPrevious();
165
166    if( pipes.length > 1 )
167      throw new IllegalStateException( "cannot resolve SubAssemblies with multiple tails at this time" );
168
169    for( Pipe previous : pipes )
170      {
171      if( previous instanceof Splice || previous instanceof Operator )
172        return previous;
173
174      return resolvePrevious( previous );
175      }
176
177    return pipe;
178    }
179
180  protected Pipe()
181    {
182    }
183
184  @ConstructorProperties({"previous"})
185  protected Pipe( Pipe previous )
186    {
187    this.previous = previous;
188
189    verifyPipe();
190    }
191
192  /**
193   * Constructor Pipe creates a new Pipe instance with the given name. This is useful as the 'start' or head
194   * of a pipe assembly.
195   *
196   * @param name name for this branch of Pipes
197   */
198  @ConstructorProperties({"name"})
199  public Pipe( String name )
200    {
201    this.name = name;
202    }
203
204  /**
205   * Constructor Pipe creates a new Pipe instance with the given name and previous Pipe instance. This is useful for
206   * naming a branch in a pipe assembly. Or renaming the branch mid-way down.
207   *
208   * @param name     name for this branch of Pipes
209   * @param previous previous Pipe to receive input Tuples from
210   */
211  @ConstructorProperties({"name", "previous"})
212  public Pipe( String name, Pipe previous )
213    {
214    this.name = name;
215    this.previous = previous;
216
217    verifyPipe();
218    }
219
220  private void verifyPipe()
221    {
222    if( !( previous instanceof SubAssembly ) )
223      return;
224
225    String[] strings = ( (SubAssembly) previous ).getTailNames();
226    if( strings.length != 1 )
227      throw new IllegalArgumentException( "pipe assembly must not return more than one tail pipe instance, found " + Util.join( strings, ", " ) );
228    }
229
230  /**
231   * Get the name of this pipe. Guaranteed non-null.
232   *
233   * @return String the name of this pipe
234   */
235  public String getName()
236    {
237    if( name != null )
238      return name;
239
240    if( previous != null )
241      {
242      name = previous.getName();
243
244      return name;
245      }
246
247    return "ANONYMOUS";
248    }
249
250  /**
251   * Get all the upstream pipes this pipe is connected to. This method will return the Pipe instances
252   * passed on the constructors as inputs to this Pipe instance.
253   *
254   * @return all the upstream pipes this pipe is connected to.
255   */
256  public Pipe[] getPrevious()
257    {
258    if( previous == null )
259      return new Pipe[ 0 ];
260
261    return new Pipe[]{previous};
262    }
263
264  protected void setParent( Pipe parent )
265    {
266    this.parent = parent;
267    }
268
269  /**
270   * Returns the enclosing parent Pipe instance, if any. A parent is typically a {@link SubAssembly} that wraps
271   * this instance.
272   *
273   * @return of type Pipe
274   */
275  public Pipe getParent()
276    {
277    return parent;
278    }
279
280  /**
281   * Returns a {@link ConfigDef} instance that allows for local properties to be set and made available via
282   * a resulting {@link cascading.flow.FlowProcess} instance when the pipe is invoked.
283   * <p>
284   * Any properties set on the configDef will not show up in any {@link cascading.flow.Flow} or
285   * {@link cascading.flow.FlowStep} process level configuration, but will override any of those values as seen by the
286   * current Pipe instance.
287   *
288   * @return an instance of ConfigDef
289   */
290  @Override
291  public ConfigDef getConfigDef()
292    {
293    if( configDef == null )
294      configDef = new ConfigDef();
295
296    return configDef;
297    }
298
299  /**
300   * Returns {@code true} if there are properties in the configDef instance.
301   *
302   * @return true if there are configDef properties
303   */
304  @Override
305  public boolean hasConfigDef()
306    {
307    return configDef != null && !configDef.isEmpty();
308    }
309
310  /**
311   * Returns a {@link ConfigDef} instance that allows for process level properties to be set and made available via
312   * a resulting {@link cascading.flow.FlowProcess} instance when the pipe is invoked.
313   * <p>
314   * Any properties set on the nodeConfigDef will not show up in any Flow configuration, but will show up in
315   * the current process {@link cascading.flow.FlowNode} (in Apache Tez the Vertex configuration). Any value set in the
316   * nodeConfigDef will be overridden by the pipe local {@code #getConfigDef} instance.
317   * <p>
318   * Use this method to tweak properties in the process node this pipe instance is planned into. In the case of the
319   * Apache Tez platform, when set on a {@link GroupBy} instance, the number of gather partitions can be modified.
320   * <p>
321   * In the case of any Pipe that spans FlowNode boundaries, like GroupBy and CoGroup may on some platforms,
322   * any ConfigDef properties will be applied to the downstream FlowNode. That is, if a GroupBy is the source
323   * to a node, any node ConfigDef properties will be applied. If the GroupBy encountered when applying properties
324   * is on the sink side of a node, the properties will be ignored.
325   *
326   * @return an instance of ConfigDef
327   */
328  @Override
329  public ConfigDef getNodeConfigDef()
330    {
331    if( nodeConfigDef == null )
332      nodeConfigDef = new ConfigDef();
333
334    return nodeConfigDef;
335    }
336
337  /**
338   * Returns {@code true} if there are properties in the nodeConfigDef instance.
339   *
340   * @return true if there are nodeConfigDef properties
341   */
342  @Override
343  public boolean hasNodeConfigDef()
344    {
345    return nodeConfigDef != null && !nodeConfigDef.isEmpty();
346    }
347
348  /**
349   * Returns a {@link ConfigDef} instance that allows for process level properties to be set and made available via
350   * a resulting {@link cascading.flow.FlowProcess} instance when the pipe is invoked.
351   * <p>
352   * Any properties set on the stepConfigDef will not show up in any Flow configuration, but will show up in
353   * the current process {@link cascading.flow.FlowStep} (in Hadoop the MapReduce jobconf). Any value set in the
354   * stepConfigDef will be overridden by the pipe local {@code #getConfigDef} instance.
355   * <p>
356   * Use this method to tweak properties in the process step this pipe instance is planned into. In the case of the
357   * Hadoop platform, when set on a {@link GroupBy} instance, the number of reducers can be modified.
358   *
359   * @return an instance of ConfigDef
360   */
361  @Override
362  public ConfigDef getStepConfigDef()
363    {
364    if( stepConfigDef == null )
365      stepConfigDef = new ConfigDef();
366
367    return stepConfigDef;
368    }
369
370  /**
371   * Returns {@code true} if there are properties in the stepConfigDef instance.
372   *
373   * @return true if there are stepConfigDef properties
374   */
375  @Override
376  public boolean hasStepConfigDef()
377    {
378    return stepConfigDef != null && !stepConfigDef.isEmpty();
379    }
380
381  /**
382   * Method getHeads returns the first Pipe instances in this pipe assembly.
383   *
384   * @return the first (type Pipe[]) of this Pipe object.
385   */
386  public Pipe[] getHeads()
387    {
388    Pipe[] pipes = getPrevious();
389
390    if( pipes.length == 0 )
391      return new Pipe[]{this};
392
393    if( pipes.length == 1 )
394      return pipes[ 0 ].getHeads();
395
396    Set<Pipe> heads = new HashSet<Pipe>();
397
398    for( Pipe pipe : pipes )
399      Collections.addAll( heads, pipe.getHeads() );
400
401    return heads.toArray( new Pipe[ heads.size() ] );
402    }
403
404  @Override
405  public Scope outgoingScopeFor( Set<Scope> incomingScopes )
406    {
407    return incomingScopes.iterator().next();
408    }
409
410  @Override
411  public Fields resolveIncomingOperationArgumentFields( Scope incomingScope )
412    {
413    throw new IllegalStateException( "resolveIncomingOperationFields should never be called" );
414    }
415
416  @Override
417  public Fields resolveIncomingOperationPassThroughFields( Scope incomingScope )
418    {
419    throw new IllegalStateException( "resolveIncomingOperationPassThroughFields should never be called" );
420    }
421
422  @Override
423  public String getTrace()
424    {
425    return trace;
426    }
427
428  @Override
429  public String toString()
430    {
431    return getClass().getSimpleName() + "(" + getName() + ")";
432    }
433
434  Scope getFirst( Set<Scope> incomingScopes )
435    {
436    return incomingScopes.iterator().next();
437    }
438
439  @SuppressWarnings({"EqualsWhichDoesntCheckParameterClass"})
440  @Override
441  public boolean equals( Object object )
442    {
443    // we cannot test equality by names for this class, prevents detection of dupe names in heads or tails
444    return this == object;
445    }
446
447  @Override
448  public int hashCode()
449    {
450    return 31 * getName().hashCode() + getClass().hashCode();
451    }
452
453  /**
454   * Method print is used internally.
455   *
456   * @param scope of type Scope
457   * @return String
458   */
459  public String print( Scope scope )
460    {
461    StringBuffer buffer = new StringBuffer();
462
463    printInternal( buffer, scope );
464
465    return buffer.toString();
466    }
467
468  protected void printInternal( StringBuffer buffer, Scope scope )
469    {
470    buffer.append( getClass().getSimpleName() ).append( "('" ).append( getName() ).append( "')" );
471    }
472  }