001/*
002 * Copyright (c) 2016-2017 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
004 *
005 * Project and contact information: http://www.cascading.org/
006 *
007 * This file is part of the Cascading project.
008 *
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *     http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 */
021
022package cascading.pipe;
023
024import java.util.Arrays;
025import java.util.Collections;
026import java.util.HashSet;
027import java.util.Set;
028
029import cascading.util.TraceUtil;
030import cascading.util.Util;
031import org.slf4j.Logger;
032import org.slf4j.LoggerFactory;
033
034/**
035 * Subclasses of SubAssembly encapsulate complex assemblies of {@link Pipe}s so they my be reused in the same manner
036 * a Pipe is used.
037 * <p>
038 * That is, a typical SubAssembly subclass will accept a 'previous' Pipe instance, and a few
039 * arguments for configuring the resulting sub-assembly.
040 * <p>
041 * The previous pipe (or pipes) must be passed on the super constructor, or set via {@link #setPrevious(Pipe...)}. This
042 * allows the current SubAssembly to become the parent of any Pipe instances between the previous and the tails,
043 * exclusive of the previous, and inclusive of the tails.
044 * <p>
045 * Subsequently all tail Pipes must be set via the {@link #setTails(Pipe...)} method.
046 * <p>
047 * Note if the SubAssembly represents a split in the pipeline process,
048 * all the 'tails' of the assembly must be passed to {@link #setTails(Pipe...)}. It is up the the developer to
049 * provide any other access to the tails so they may be chained into any subsequent Pipes.
050 * <p>
051 * Any {@link cascading.property.ConfigDef} values on this SubAssembly will be honored by child Pipe instances via the
052 * {@link cascading.pipe.Pipe#getParent()} back link described above.
053 */
054public abstract class SubAssembly extends Pipe
055  {
056  private static final Logger LOG = LoggerFactory.getLogger( SubAssembly.class );
057
058  /** Field previous */
059  private Pipe[] previous; // actual previous pipes instances
060  /** Field tails */
061  private Pipe[] tails;
062
063  private transient String[] names;
064
065  /**
066   * Is responsible for unwinding nested SubAssembly instances.
067   *
068   * @param tails of type Pipe[]
069   * @return a Pipe[]
070   */
071  public static Pipe[] unwind( Pipe... tails )
072    {
073    Set<Pipe> previous = new HashSet<Pipe>();
074
075    for( Pipe pipe : tails )
076      {
077      if( pipe instanceof SubAssembly )
078        Collections.addAll( previous, unwind( pipe.getPrevious() ) );
079      else
080        previous.add( pipe );
081      }
082
083    return previous.toArray( new Pipe[ previous.size() ] );
084    }
085
086  protected SubAssembly()
087    {
088    }
089
090  protected SubAssembly( Pipe... previous )
091    {
092    setPrevious( previous );
093    }
094
095  protected SubAssembly( String name, Pipe[] previous )
096    {
097    super( name );
098    setPrevious( previous );
099    }
100
101  /**
102   * Must be called by subclasses to set the start end points of the assembly the subclass represents.
103   *
104   * @param previous of type Pipe
105   */
106  protected void setPrevious( Pipe... previous )
107    {
108    if( this.previous != null )
109      throw new IllegalStateException( "this method may only be called once" );
110
111    this.previous = previous;
112    }
113
114  /**
115   * Must be called by subclasses to set the final end points of the assembly the subclass represents.
116   *
117   * @param tails of type Pipe
118   */
119  protected void setTails( Pipe... tails )
120    {
121    if( this.tails != null )
122      throw new IllegalStateException( "this method may only be called once" );
123
124    this.tails = tails;
125
126    if( previous == null )
127      {
128      LOG.warn( "previous pipes not set via setPrevious or constructor on: {}", this );
129      return;
130      }
131
132    Set<Pipe> stopSet = new HashSet<Pipe>();
133
134    Collections.addAll( stopSet, previous );
135
136    setParent( stopSet, tails );
137    }
138
139  private void setParent( Set<Pipe> stopSet, Pipe[] tails )
140    {
141    if( tails == null )
142      return;
143
144    for( Pipe tail : tails )
145      {
146      if( stopSet.contains( tail ) )
147        continue;
148
149      tail.setParent( this );
150
151      Pipe[] current;
152
153      if( tail instanceof SubAssembly )
154        current = ( (SubAssembly) tail ).previous;
155      else
156        current = tail.getPrevious();
157
158      if( current == null && tail instanceof SubAssembly )
159        LOG.warn( "previous pipes not set via setPrevious or constructor on: {}", tail );
160
161      setParent( stopSet, current );
162      }
163    }
164
165  /**
166   * Method getTails returns all the tails of this SubAssembly object. These values are set by {@link #setTails(Pipe...)}.
167   *
168   * @return the tails (type Pipe[]) of this SubAssembly object.
169   */
170  public Pipe[] getTails()
171    {
172    return getPrevious(); // just returns a clone of tails
173    }
174
175  /**
176   * Method getTailNames returns the tailNames of this SubAssembly object.
177   *
178   * @return the tailNames (type String[]) of this SubAssembly object.
179   */
180  public String[] getTailNames()
181    {
182    if( tails == null )
183      throw new IllegalStateException( TraceUtil.formatRawTrace( this, "setTails must be called in the constructor" ) );
184
185    if( names != null )
186      return names;
187
188    names = new String[ tails.length ];
189
190    for( int i = 0; i < tails.length; i++ )
191      names[ i ] = tails[ i ].getName();
192
193    return names;
194    }
195
196  @Override
197  public String getName()
198    {
199    if( name == null )
200      name = Util.join( getTailNames(), "+" );
201
202    return name;
203    }
204
205  @Override
206  public Pipe[] getPrevious()
207    {
208    // returns the semantically equivalent to Pipe#previous to simplify logic in the planner
209    // SubAssemblies are really aliases for their tails
210    if( tails == null )
211      throw new IllegalStateException( TraceUtil.formatRawTrace( this, "setTails must be called after the sub-assembly is assembled" ) );
212
213    return Arrays.copyOf( tails, tails.length );
214    }
215  }