001    /*
002     * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.pipe;
022    
023    import java.beans.ConstructorProperties;
024    import java.io.Serializable;
025    import java.util.Collections;
026    import java.util.HashSet;
027    import java.util.Set;
028    
029    import cascading.flow.FlowElement;
030    import cascading.flow.planner.Scope;
031    import cascading.property.ConfigDef;
032    import cascading.tuple.Fields;
033    import cascading.util.TraceUtil;
034    import cascading.util.Traceable;
035    import cascading.util.Util;
036    
037    import static java.util.Arrays.asList;
038    
039    /**
040     * Class Pipe is used to name branches in pipe assemblies, and as a base class for core
041     * processing model types, specifically {@link Each}, {@link Every}, {@link GroupBy},
042     * {@link CoGroup}, {@link Merge}, {@link HashJoin}, and {@link SubAssembly}.
043     * <p/>
044     * Pipes are chained together through their constructors.
045     * <p/>
046     * To effect a split in the pipe,
047     * simply pass a Pipe instance to two or more constructors of subsequent Pipe instances.
048     * </p>
049     * A join can be achieved by passing two or more Pipe instances to a {@link CoGroup} or {@link HashJoin} pipe.
050     * <p/>
051     * A merge can be achieved by passing two or more Pipe instances to a {@link GroupBy} or {@link Merge} pipe.
052     *
053     * @see Each
054     * @see Every
055     * @see GroupBy
056     * @see Merge
057     * @see CoGroup
058     * @see HashJoin
059     * @see SubAssembly
060     */
061    public class Pipe implements FlowElement, Serializable, Traceable
062      {
063      /** Field serialVersionUID */
064      private static final long serialVersionUID = 1L;
065      /** Field name */
066      protected String name;
067      /** Field previous */
068      protected Pipe previous;
069      /** Field parent */
070      protected Pipe parent;
071    
072      protected ConfigDef configDef;
073    
074      protected ConfigDef stepConfigDef;
075    
076      /** Field id */
077      private final String id = Util.createUniqueID(); // 3.0 planner relies on this being consistent
078      /** Field trace */
079      private String trace = TraceUtil.captureDebugTrace( this ); // see TraceUtil.setTrace() to override
080    
081      public static synchronized String id( Pipe pipe )
082        {
083        return pipe.id;
084        }
085    
086      /**
087       * Convenience method to create an array of Pipe instances.
088       *
089       * @param pipes vararg list of pipes
090       * @return array of pipes
091       */
092      public static Pipe[] pipes( Pipe... pipes )
093        {
094        return pipes;
095        }
096    
097      /**
098       * Convenience method for finding all Pipe names in an assembly.
099       *
100       * @param tails vararg list of all tails in given assembly
101       * @return array of Pipe names
102       */
103      public static String[] names( Pipe... tails )
104        {
105        Set<String> names = new HashSet<String>();
106    
107        collectNames( tails, names );
108    
109        return names.toArray( new String[ names.size() ] );
110        }
111    
112      private static void collectNames( Pipe[] pipes, Set<String> names )
113        {
114        for( Pipe pipe : pipes )
115          {
116          if( pipe instanceof SubAssembly )
117            names.addAll( asList( ( (SubAssembly) pipe ).getTailNames() ) );
118          else
119            names.add( pipe.getName() );
120    
121          collectNames( SubAssembly.unwind( pipe.getPrevious() ), names );
122          }
123        }
124    
125      public static Pipe[] named( String name, Pipe... tails )
126        {
127        Set<Pipe> pipes = new HashSet<Pipe>();
128    
129        collectPipes( name, tails, pipes );
130    
131        return pipes.toArray( new Pipe[ pipes.size() ] );
132        }
133    
134      private static void collectPipes( String name, Pipe[] tails, Set<Pipe> pipes )
135        {
136        for( Pipe tail : tails )
137          {
138          if( !( tail instanceof SubAssembly ) && tail.getName().equals( name ) )
139            pipes.add( tail );
140    
141          collectPipes( name, SubAssembly.unwind( tail.getPrevious() ), pipes );
142          }
143        }
144    
145      static Pipe[] resolvePreviousAll( Pipe... pipes )
146        {
147        Pipe[] resolved = new Pipe[ pipes.length ];
148    
149        for( int i = 0; i < pipes.length; i++ )
150          resolved[ i ] = resolvePrevious( pipes[ i ] );
151    
152        return resolved;
153        }
154    
155      static Pipe resolvePrevious( Pipe pipe )
156        {
157        if( pipe instanceof Splice || pipe instanceof Operator )
158          return pipe;
159    
160        Pipe[] pipes = pipe.getPrevious();
161    
162        if( pipes.length > 1 )
163          throw new IllegalStateException( "cannot resolve SubAssemblies with multiple tails at this time" );
164    
165        for( Pipe previous : pipes )
166          {
167          if( previous instanceof Splice || previous instanceof Operator )
168            return previous;
169    
170          return resolvePrevious( previous );
171          }
172    
173        return pipe;
174        }
175    
176      protected Pipe()
177        {
178        }
179    
180      @ConstructorProperties({"previous"})
181      protected Pipe( Pipe previous )
182        {
183        this.previous = previous;
184    
185        verifyPipe();
186        }
187    
188      /**
189       * Constructor Pipe creates a new Pipe instance with the given name. This is useful as the 'start' or head
190       * of a pipe assembly.
191       *
192       * @param name name for this branch of Pipes
193       */
194      @ConstructorProperties({"name"})
195      public Pipe( String name )
196        {
197        this.name = name;
198        }
199    
200      /**
201       * Constructor Pipe creates a new Pipe instance with the given name and previous Pipe instance. This is useful for
202       * naming a branch in a pipe assembly. Or renaming the branch mid-way down.
203       *
204       * @param name     name for this branch of Pipes
205       * @param previous previous Pipe to receive input Tuples from
206       */
207      @ConstructorProperties({"name", "previous"})
208      public Pipe( String name, Pipe previous )
209        {
210        this.name = name;
211        this.previous = previous;
212    
213        verifyPipe();
214        }
215    
216      private void verifyPipe()
217        {
218        if( !( previous instanceof SubAssembly ) )
219          return;
220    
221        String[] strings = ( (SubAssembly) previous ).getTailNames();
222        if( strings.length != 1 )
223          throw new IllegalArgumentException( "pipe assembly must not return more than one tail pipe instance, found " + Util.join( strings, ", " ) );
224        }
225    
226      /**
227       * Get the name of this pipe. Guaranteed non-null.
228       *
229       * @return String the name of this pipe
230       */
231      public String getName()
232        {
233        if( name != null )
234          return name;
235    
236        if( previous != null )
237          {
238          name = previous.getName();
239    
240          return name;
241          }
242    
243        return "ANONYMOUS";
244        }
245    
246      /**
247       * Get all the upstream pipes this pipe is connected to. This method will return the Pipe instances
248       * passed on the constructors as inputs to this Pipe instance.
249       *
250       * @return all the upstream pipes this pipe is connected to.
251       */
252      public Pipe[] getPrevious()
253        {
254        if( previous == null )
255          return new Pipe[ 0 ];
256    
257        return new Pipe[]{previous};
258        }
259    
260      protected void setParent( Pipe parent )
261        {
262        this.parent = parent;
263        }
264    
265      /**
266       * Returns the enclosing parent Pipe instance, if any. A parent is typically a {@link SubAssembly} that wraps
267       * this instance.
268       *
269       * @return of type Pipe
270       */
271      public Pipe getParent()
272        {
273        return parent;
274        }
275    
276      /**
277       * Returns a {@link ConfigDef} instance that allows for local properties to be set and made available via
278       * a resulting {@link cascading.flow.FlowProcess} instance when the pipe is invoked.
279       * <p/>
280       * Any properties set on the configDef will not show up in any {@link cascading.flow.Flow} or
281       * {@link cascading.flow.FlowStep} process level configuration, but will override any of those values as seen by the
282       * current Pipe instance.
283       *
284       * @return an instance of ConfigDef
285       */
286      @Override
287      public ConfigDef getConfigDef()
288        {
289        if( configDef == null )
290          configDef = new ConfigDef();
291    
292        return configDef;
293        }
294    
295      /**
296       * Returns {@code true} if there are properties in the configDef instance.
297       *
298       * @return true if there are configDef properties
299       */
300      @Override
301      public boolean hasConfigDef()
302        {
303        return configDef != null && !configDef.isEmpty();
304        }
305    
306      /**
307       * Returns a {@link ConfigDef} instance that allows for process level properties to be set and made available via
308       * a resulting {@link cascading.flow.FlowProcess} instance when the pipe is invoked.
309       * <p/>
310       * Any properties set on the stepConfigDef will not show up in any Flow configuration, but will show up in
311       * the current process {@link cascading.flow.FlowStep} (in Hadoop the MapReduce jobconf). Any value set in the
312       * stepConfigDef will be overridden by the pipe local {@code #getConfigDef} instance.
313       * </p>
314       * Use this method to tweak properties in the process step this pipe instance is planned into. In the case of the
315       * Hadoop platform, when set on a {@link GroupBy} instance, the number of reducers can be modified.
316       *
317       * @return an instance of ConfigDef
318       */
319      @Override
320      public ConfigDef getStepConfigDef()
321        {
322        if( stepConfigDef == null )
323          stepConfigDef = new ConfigDef();
324    
325        return stepConfigDef;
326        }
327    
328      /**
329       * Returns {@code true} if there are properties in the processConfigDef instance.
330       *
331       * @return true if there are processConfigDef properties
332       */
333      @Override
334      public boolean hasStepConfigDef()
335        {
336        return stepConfigDef != null && !stepConfigDef.isEmpty();
337        }
338    
339      /**
340       * Method getHeads returns the first Pipe instances in this pipe assembly.
341       *
342       * @return the first (type Pipe[]) of this Pipe object.
343       */
344      public Pipe[] getHeads()
345        {
346        Pipe[] pipes = getPrevious();
347    
348        if( pipes.length == 0 )
349          return new Pipe[]{this};
350    
351        if( pipes.length == 1 )
352          return pipes[ 0 ].getHeads();
353    
354        Set<Pipe> heads = new HashSet<Pipe>();
355    
356        for( Pipe pipe : pipes )
357          Collections.addAll( heads, pipe.getHeads() );
358    
359        return heads.toArray( new Pipe[ heads.size() ] );
360        }
361    
362      @Override
363      public Scope outgoingScopeFor( Set<Scope> incomingScopes )
364        {
365        return incomingScopes.iterator().next();
366        }
367    
368      @Override
369      public Fields resolveIncomingOperationArgumentFields( Scope incomingScope )
370        {
371        throw new IllegalStateException( "resolveIncomingOperationFields should never be called" );
372        }
373    
374      @Override
375      public Fields resolveIncomingOperationPassThroughFields( Scope incomingScope )
376        {
377        throw new IllegalStateException( "resolveIncomingOperationPassThroughFields should never be called" );
378        }
379    
380      @Override
381      public String getTrace()
382        {
383        return trace;
384        }
385    
386      @Override
387      public String toString()
388        {
389        return getClass().getSimpleName() + "(" + getName() + ")";
390        }
391    
392      Scope getFirst( Set<Scope> incomingScopes )
393        {
394        return incomingScopes.iterator().next();
395        }
396    
397      @Override
398      public boolean isEquivalentTo( FlowElement element )
399        {
400        if( element == null )
401          return false;
402    
403        if( this == element )
404          return true;
405    
406        return getClass() == element.getClass();
407        }
408    
409      @SuppressWarnings({"EqualsWhichDoesntCheckParameterClass"})
410      @Override
411      public boolean equals( Object object )
412        {
413        // we cannot test equality by names for this class, prevents detection of dupe names in heads or tails
414        return this == object;
415        }
416    
417      @Override
418      public int hashCode()
419        {
420        return 31 * getName().hashCode() + getClass().hashCode();
421        }
422    
423      /**
424       * Method print is used internally.
425       *
426       * @param scope of type Scope
427       * @return String
428       */
429      public String print( Scope scope )
430        {
431        StringBuffer buffer = new StringBuffer();
432    
433        printInternal( buffer, scope );
434    
435        return buffer.toString();
436        }
437    
438      protected void printInternal( StringBuffer buffer, Scope scope )
439        {
440        buffer.append( getClass().getSimpleName() ).append( "('" ).append( getName() ).append( "')" );
441        }
442      }