001/*
002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.pipe;
022
023import java.beans.ConstructorProperties;
024import java.io.Serializable;
025import java.util.Collections;
026import java.util.HashSet;
027import java.util.Set;
028
029import cascading.flow.FlowElement;
030import cascading.flow.planner.Scope;
031import cascading.flow.planner.ScopedElement;
032import cascading.property.ConfigDef;
033import cascading.tuple.Fields;
034import cascading.util.TraceUtil;
035import cascading.util.Traceable;
036import cascading.util.Util;
037
038import static java.util.Arrays.asList;
039
040/**
041 * Class Pipe is used to name branches in pipe assemblies, and as a base class for core
042 * processing model types, specifically {@link Each}, {@link Every}, {@link GroupBy},
043 * {@link CoGroup}, {@link Merge}, {@link HashJoin}, and {@link SubAssembly}.
044 * <p/>
045 * Pipes are chained together through their constructors.
046 * <p/>
047 * To effect a split in the pipe,
048 * simply pass a Pipe instance to two or more constructors of subsequent Pipe instances.
049 * </p>
050 * A join can be achieved by passing two or more Pipe instances to a {@link CoGroup} or {@link HashJoin} pipe.
051 * <p/>
052 * A merge can be achieved by passing two or more Pipe instances to a {@link GroupBy} or {@link Merge} pipe.
053 *
054 * @see Each
055 * @see Every
056 * @see GroupBy
057 * @see Merge
058 * @see CoGroup
059 * @see HashJoin
060 * @see SubAssembly
061 */
062public class Pipe implements ScopedElement, FlowElement, Serializable, Traceable
063  {
064  /** Field serialVersionUID */
065  private static final long serialVersionUID = 1L;
066  /** Field name */
067  protected String name;
068  /** Field previous */
069  protected Pipe previous;
070  /** Field parent */
071  protected Pipe parent;
072
073  protected ConfigDef configDef;
074
075  protected ConfigDef stepConfigDef;
076
077  protected ConfigDef nodeConfigDef;
078
079  /** Field id */
080  private final String id = Util.createUniqueID(); // 3.0 planner relies on this being consistent
081  /** Field trace */
082  private String trace = TraceUtil.captureDebugTrace( this ); // see TraceUtil.setTrace() to override
083
084  public static synchronized String id( Pipe pipe )
085    {
086    return pipe.id;
087    }
088
089  /**
090   * Convenience method to create an array of Pipe instances.
091   *
092   * @param pipes vararg list of pipes
093   * @return array of pipes
094   */
095  public static Pipe[] pipes( Pipe... pipes )
096    {
097    return pipes;
098    }
099
100  /**
101   * Convenience method for finding all Pipe names in an assembly.
102   *
103   * @param tails vararg list of all tails in given assembly
104   * @return array of Pipe names
105   */
106  public static String[] names( Pipe... tails )
107    {
108    Set<String> names = new HashSet<String>();
109
110    collectNames( tails, names );
111
112    return names.toArray( new String[ names.size() ] );
113    }
114
115  private static void collectNames( Pipe[] pipes, Set<String> names )
116    {
117    for( Pipe pipe : pipes )
118      {
119      if( pipe instanceof SubAssembly )
120        names.addAll( asList( ( (SubAssembly) pipe ).getTailNames() ) );
121      else
122        names.add( pipe.getName() );
123
124      collectNames( SubAssembly.unwind( pipe.getPrevious() ), names );
125      }
126    }
127
128  public static Pipe[] named( String name, Pipe... tails )
129    {
130    Set<Pipe> pipes = new HashSet<Pipe>();
131
132    collectPipes( name, tails, pipes );
133
134    return pipes.toArray( new Pipe[ pipes.size() ] );
135    }
136
137  private static void collectPipes( String name, Pipe[] tails, Set<Pipe> pipes )
138    {
139    for( Pipe tail : tails )
140      {
141      if( !( tail instanceof SubAssembly ) && tail.getName().equals( name ) )
142        pipes.add( tail );
143
144      collectPipes( name, SubAssembly.unwind( tail.getPrevious() ), pipes );
145      }
146    }
147
148  static Pipe[] resolvePreviousAll( Pipe... pipes )
149    {
150    Pipe[] resolved = new Pipe[ pipes.length ];
151
152    for( int i = 0; i < pipes.length; i++ )
153      resolved[ i ] = resolvePrevious( pipes[ i ] );
154
155    return resolved;
156    }
157
158  static Pipe resolvePrevious( Pipe pipe )
159    {
160    if( pipe instanceof Splice || pipe instanceof Operator )
161      return pipe;
162
163    Pipe[] pipes = pipe.getPrevious();
164
165    if( pipes.length > 1 )
166      throw new IllegalStateException( "cannot resolve SubAssemblies with multiple tails at this time" );
167
168    for( Pipe previous : pipes )
169      {
170      if( previous instanceof Splice || previous instanceof Operator )
171        return previous;
172
173      return resolvePrevious( previous );
174      }
175
176    return pipe;
177    }
178
179  protected Pipe()
180    {
181    }
182
183  @ConstructorProperties({"previous"})
184  protected Pipe( Pipe previous )
185    {
186    this.previous = previous;
187
188    verifyPipe();
189    }
190
191  /**
192   * Constructor Pipe creates a new Pipe instance with the given name. This is useful as the 'start' or head
193   * of a pipe assembly.
194   *
195   * @param name name for this branch of Pipes
196   */
197  @ConstructorProperties({"name"})
198  public Pipe( String name )
199    {
200    this.name = name;
201    }
202
203  /**
204   * Constructor Pipe creates a new Pipe instance with the given name and previous Pipe instance. This is useful for
205   * naming a branch in a pipe assembly. Or renaming the branch mid-way down.
206   *
207   * @param name     name for this branch of Pipes
208   * @param previous previous Pipe to receive input Tuples from
209   */
210  @ConstructorProperties({"name", "previous"})
211  public Pipe( String name, Pipe previous )
212    {
213    this.name = name;
214    this.previous = previous;
215
216    verifyPipe();
217    }
218
219  private void verifyPipe()
220    {
221    if( !( previous instanceof SubAssembly ) )
222      return;
223
224    String[] strings = ( (SubAssembly) previous ).getTailNames();
225    if( strings.length != 1 )
226      throw new IllegalArgumentException( "pipe assembly must not return more than one tail pipe instance, found " + Util.join( strings, ", " ) );
227    }
228
229  /**
230   * Get the name of this pipe. Guaranteed non-null.
231   *
232   * @return String the name of this pipe
233   */
234  public String getName()
235    {
236    if( name != null )
237      return name;
238
239    if( previous != null )
240      {
241      name = previous.getName();
242
243      return name;
244      }
245
246    return "ANONYMOUS";
247    }
248
249  /**
250   * Get all the upstream pipes this pipe is connected to. This method will return the Pipe instances
251   * passed on the constructors as inputs to this Pipe instance.
252   *
253   * @return all the upstream pipes this pipe is connected to.
254   */
255  public Pipe[] getPrevious()
256    {
257    if( previous == null )
258      return new Pipe[ 0 ];
259
260    return new Pipe[]{previous};
261    }
262
263  protected void setParent( Pipe parent )
264    {
265    this.parent = parent;
266    }
267
268  /**
269   * Returns the enclosing parent Pipe instance, if any. A parent is typically a {@link SubAssembly} that wraps
270   * this instance.
271   *
272   * @return of type Pipe
273   */
274  public Pipe getParent()
275    {
276    return parent;
277    }
278
279  /**
280   * Returns a {@link ConfigDef} instance that allows for local properties to be set and made available via
281   * a resulting {@link cascading.flow.FlowProcess} instance when the pipe is invoked.
282   * <p/>
283   * Any properties set on the configDef will not show up in any {@link cascading.flow.Flow} or
284   * {@link cascading.flow.FlowStep} process level configuration, but will override any of those values as seen by the
285   * current Pipe instance.
286   *
287   * @return an instance of ConfigDef
288   */
289  @Override
290  public ConfigDef getConfigDef()
291    {
292    if( configDef == null )
293      configDef = new ConfigDef();
294
295    return configDef;
296    }
297
298  /**
299   * Returns {@code true} if there are properties in the configDef instance.
300   *
301   * @return true if there are configDef properties
302   */
303  @Override
304  public boolean hasConfigDef()
305    {
306    return configDef != null && !configDef.isEmpty();
307    }
308
309  /**
310   * Returns a {@link ConfigDef} instance that allows for process level properties to be set and made available via
311   * a resulting {@link cascading.flow.FlowProcess} instance when the pipe is invoked.
312   * <p/>
313   * Any properties set on the nodeConfigDef will not show up in any Flow configuration, but will show up in
314   * the current process {@link cascading.flow.FlowNode} (in Apache Tez the Vertex configuration). Any value set in the
315   * nodeConfigDef will be overridden by the pipe local {@code #getConfigDef} instance.
316   * </p>
317   * Use this method to tweak properties in the process node this pipe instance is planned into. In the case of the
318   * Apache Tez platform, when set on a {@link GroupBy} instance, the number of gather partitions can be modified.
319   * <p/>
320   * In the case of any Pipe that spans FlowNode boundaries, like GroupBy and CoGroup may on some platforms,
321   * any ConfigDef properties will be applied to the downstream FlowNode. That is, if a GroupBy is the source
322   * to a node, any node ConfigDef properties will be applied. If the GroupBy encountered when applying properties
323   * is on the sink side of a node, the properties will be ignored.
324   *
325   * @return an instance of ConfigDef
326   */
327  @Override
328  public ConfigDef getNodeConfigDef()
329    {
330    if( nodeConfigDef == null )
331      nodeConfigDef = new ConfigDef();
332
333    return nodeConfigDef;
334    }
335
336  /**
337   * Returns {@code true} if there are properties in the nodeConfigDef instance.
338   *
339   * @return true if there are nodeConfigDef properties
340   */
341  @Override
342  public boolean hasNodeConfigDef()
343    {
344    return nodeConfigDef != null && !nodeConfigDef.isEmpty();
345    }
346
347  /**
348   * Returns a {@link ConfigDef} instance that allows for process level properties to be set and made available via
349   * a resulting {@link cascading.flow.FlowProcess} instance when the pipe is invoked.
350   * <p/>
351   * Any properties set on the stepConfigDef will not show up in any Flow configuration, but will show up in
352   * the current process {@link cascading.flow.FlowStep} (in Hadoop the MapReduce jobconf). Any value set in the
353   * stepConfigDef will be overridden by the pipe local {@code #getConfigDef} instance.
354   * </p>
355   * Use this method to tweak properties in the process step this pipe instance is planned into. In the case of the
356   * Hadoop platform, when set on a {@link GroupBy} instance, the number of reducers can be modified.
357   *
358   * @return an instance of ConfigDef
359   */
360  @Override
361  public ConfigDef getStepConfigDef()
362    {
363    if( stepConfigDef == null )
364      stepConfigDef = new ConfigDef();
365
366    return stepConfigDef;
367    }
368
369  /**
370   * Returns {@code true} if there are properties in the stepConfigDef instance.
371   *
372   * @return true if there are stepConfigDef properties
373   */
374  @Override
375  public boolean hasStepConfigDef()
376    {
377    return stepConfigDef != null && !stepConfigDef.isEmpty();
378    }
379
380  /**
381   * Method getHeads returns the first Pipe instances in this pipe assembly.
382   *
383   * @return the first (type Pipe[]) of this Pipe object.
384   */
385  public Pipe[] getHeads()
386    {
387    Pipe[] pipes = getPrevious();
388
389    if( pipes.length == 0 )
390      return new Pipe[]{this};
391
392    if( pipes.length == 1 )
393      return pipes[ 0 ].getHeads();
394
395    Set<Pipe> heads = new HashSet<Pipe>();
396
397    for( Pipe pipe : pipes )
398      Collections.addAll( heads, pipe.getHeads() );
399
400    return heads.toArray( new Pipe[ heads.size() ] );
401    }
402
403  @Override
404  public Scope outgoingScopeFor( Set<Scope> incomingScopes )
405    {
406    return incomingScopes.iterator().next();
407    }
408
409  @Override
410  public Fields resolveIncomingOperationArgumentFields( Scope incomingScope )
411    {
412    throw new IllegalStateException( "resolveIncomingOperationFields should never be called" );
413    }
414
415  @Override
416  public Fields resolveIncomingOperationPassThroughFields( Scope incomingScope )
417    {
418    throw new IllegalStateException( "resolveIncomingOperationPassThroughFields should never be called" );
419    }
420
421  @Override
422  public String getTrace()
423    {
424    return trace;
425    }
426
427  @Override
428  public String toString()
429    {
430    return getClass().getSimpleName() + "(" + getName() + ")";
431    }
432
433  Scope getFirst( Set<Scope> incomingScopes )
434    {
435    return incomingScopes.iterator().next();
436    }
437
438  @SuppressWarnings({"EqualsWhichDoesntCheckParameterClass"})
439  @Override
440  public boolean equals( Object object )
441    {
442    // we cannot test equality by names for this class, prevents detection of dupe names in heads or tails
443    return this == object;
444    }
445
446  @Override
447  public int hashCode()
448    {
449    return 31 * getName().hashCode() + getClass().hashCode();
450    }
451
452  /**
453   * Method print is used internally.
454   *
455   * @param scope of type Scope
456   * @return String
457   */
458  public String print( Scope scope )
459    {
460    StringBuffer buffer = new StringBuffer();
461
462    printInternal( buffer, scope );
463
464    return buffer.toString();
465    }
466
467  protected void printInternal( StringBuffer buffer, Scope scope )
468    {
469    buffer.append( getClass().getSimpleName() ).append( "('" ).append( getName() ).append( "')" );
470    }
471  }