001/*
002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.pipe;
022
023import java.util.Arrays;
024import java.util.Collections;
025import java.util.HashSet;
026import java.util.Set;
027
028import cascading.util.TraceUtil;
029import cascading.util.Util;
030import org.slf4j.Logger;
031import org.slf4j.LoggerFactory;
032
033/**
034 * Subclasses of SubAssembly encapsulate complex assemblies of {@link Pipe}s so they my be reused in the same manner
035 * a Pipe is used.
036 * <p/>
037 * That is, a typical SubAssembly subclass will accept a 'previous' Pipe instance, and a few
038 * arguments for configuring the resulting sub-assembly.
039 * <p/>
040 * The previous pipe (or pipes) must be passed on the super constructor, or set via {@link #setPrevious(Pipe...)}. This
041 * allows the current SubAssembly to become the parent of any Pipe instances between the previous and the tails,
042 * exclusive of the previous, and inclusive of the tails.
043 * <p/>
044 * Subsequently all tail Pipes must be set via the {@link #setTails(Pipe...)} method.
045 * <p/>
046 * Note if the SubAssembly represents a split in the pipeline process,
047 * all the 'tails' of the assembly must be passed to {@link #setTails(Pipe...)}. It is up the the developer to
048 * provide any other access to the tails so they may be chained into any subsequent Pipes.
049 * <p/>
050 * Any {@link cascading.property.ConfigDef} values on this SubAssembly will be honored by child Pipe instances via the
051 * {@link cascading.pipe.Pipe#getParent()} back link described above.
052 */
053public abstract class SubAssembly extends Pipe
054  {
055  private static final Logger LOG = LoggerFactory.getLogger( SubAssembly.class );
056
057  /** Field previous */
058  private Pipe[] previous; // actual previous pipes instances
059  /** Field tails */
060  private Pipe[] tails;
061
062  private transient String[] names;
063
064  /**
065   * Is responsible for unwinding nested SubAssembly instances.
066   *
067   * @param tails of type Pipe[]
068   * @return a Pipe[]
069   */
070  public static Pipe[] unwind( Pipe... tails )
071    {
072    Set<Pipe> previous = new HashSet<Pipe>();
073
074    for( Pipe pipe : tails )
075      {
076      if( pipe instanceof SubAssembly )
077        Collections.addAll( previous, unwind( pipe.getPrevious() ) );
078      else
079        previous.add( pipe );
080      }
081
082    return previous.toArray( new Pipe[ previous.size() ] );
083    }
084
085  protected SubAssembly()
086    {
087    }
088
089  protected SubAssembly( Pipe... previous )
090    {
091    setPrevious( previous );
092    }
093
094  protected SubAssembly( String name, Pipe[] previous )
095    {
096    super( name );
097    setPrevious( previous );
098    }
099
100  /**
101   * Must be called by subclasses to set the start end points of the assembly the subclass represents.
102   *
103   * @param previous of type Pipe
104   */
105  protected void setPrevious( Pipe... previous )
106    {
107    this.previous = previous;
108    }
109
110  /**
111   * Must be called by subclasses to set the final end points of the assembly the subclass represents.
112   *
113   * @param tails of type Pipe
114   */
115  protected void setTails( Pipe... tails )
116    {
117    this.tails = tails;
118
119    if( previous == null )
120      {
121      LOG.warn( "previous pipes not set via setPrevious or constructor on: {}", this );
122      return;
123      }
124
125    Set<Pipe> stopSet = new HashSet<Pipe>();
126
127    Collections.addAll( stopSet, previous );
128
129    setParent( stopSet, tails );
130    }
131
132  private void setParent( Set<Pipe> stopSet, Pipe[] tails )
133    {
134    if( tails == null )
135      return;
136
137    for( Pipe tail : tails )
138      {
139      if( stopSet.contains( tail ) )
140        continue;
141
142      tail.setParent( this );
143
144      Pipe[] current;
145
146      if( tail instanceof SubAssembly )
147        current = ( (SubAssembly) tail ).previous;
148      else
149        current = tail.getPrevious();
150
151      if( current == null && tail instanceof SubAssembly )
152        LOG.warn( "previous pipes not set via setPrevious or constructor on: {}", tail );
153
154      setParent( stopSet, current );
155      }
156    }
157
158  /**
159   * Method getTails returns all the tails of this SubAssembly object. These values are set by {@link #setTails(Pipe...)}.
160   *
161   * @return the tails (type Pipe[]) of this SubAssembly object.
162   */
163  public Pipe[] getTails()
164    {
165    return getPrevious(); // just returns a clone of tails
166    }
167
168  /**
169   * Method getTailNames returns the tailNames of this SubAssembly object.
170   *
171   * @return the tailNames (type String[]) of this SubAssembly object.
172   */
173  public String[] getTailNames()
174    {
175    if( tails == null )
176      throw new IllegalStateException( TraceUtil.formatRawTrace( this, "setTails must be called in the constructor" ) );
177
178    if( names != null )
179      return names;
180
181    names = new String[ tails.length ];
182
183    for( int i = 0; i < tails.length; i++ )
184      names[ i ] = tails[ i ].getName();
185
186    return names;
187    }
188
189  @Override
190  public String getName()
191    {
192    if( name == null )
193      name = Util.join( getTailNames(), "+" );
194
195    return name;
196    }
197
198  @Override
199  public Pipe[] getPrevious()
200    {
201    // returns the semantically equivalent to Pipe#previous to simplify logic in the planner
202    // SubAssemblies are really aliases for their tails
203    if( tails == null )
204      throw new IllegalStateException( TraceUtil.formatRawTrace( this, "setTails must be called after the sub-assembly is assembled" ) );
205
206    return Arrays.copyOf( tails, tails.length );
207    }
208  }