001/* 002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.pipe; 022 023import java.util.Arrays; 024import java.util.Collections; 025import java.util.HashSet; 026import java.util.Set; 027 028import cascading.util.TraceUtil; 029import cascading.util.Util; 030import org.slf4j.Logger; 031import org.slf4j.LoggerFactory; 032 033/** 034 * Subclasses of SubAssembly encapsulate complex assemblies of {@link Pipe}s so they my be reused in the same manner 035 * a Pipe is used. 036 * <p/> 037 * That is, a typical SubAssembly subclass will accept a 'previous' Pipe instance, and a few 038 * arguments for configuring the resulting sub-assembly. 039 * <p/> 040 * The previous pipe (or pipes) must be passed on the super constructor, or set via {@link #setPrevious(Pipe...)}. This 041 * allows the current SubAssembly to become the parent of any Pipe instances between the previous and the tails, 042 * exclusive of the previous, and inclusive of the tails. 043 * <p/> 044 * Subsequently all tail Pipes must be set via the {@link #setTails(Pipe...)} method. 045 * <p/> 046 * Note if the SubAssembly represents a split in the pipeline process, 047 * all the 'tails' of the assembly must be passed to {@link #setTails(Pipe...)}. It is up the the developer to 048 * provide any other access to the tails so they may be chained into any subsequent Pipes. 049 * <p/> 050 * Any {@link cascading.property.ConfigDef} values on this SubAssembly will be honored by child Pipe instances via the 051 * {@link cascading.pipe.Pipe#getParent()} back link described above. 052 */ 053public abstract class SubAssembly extends Pipe 054 { 055 private static final Logger LOG = LoggerFactory.getLogger( SubAssembly.class ); 056 057 /** Field previous */ 058 private Pipe[] previous; // actual previous pipes instances 059 /** Field tails */ 060 private Pipe[] tails; 061 062 private transient String[] names; 063 064 /** 065 * Is responsible for unwinding nested SubAssembly instances. 066 * 067 * @param tails of type Pipe[] 068 * @return a Pipe[] 069 */ 070 public static Pipe[] unwind( Pipe... tails ) 071 { 072 Set<Pipe> previous = new HashSet<Pipe>(); 073 074 for( Pipe pipe : tails ) 075 { 076 if( pipe instanceof SubAssembly ) 077 Collections.addAll( previous, unwind( pipe.getPrevious() ) ); 078 else 079 previous.add( pipe ); 080 } 081 082 return previous.toArray( new Pipe[ previous.size() ] ); 083 } 084 085 protected SubAssembly() 086 { 087 } 088 089 protected SubAssembly( Pipe... previous ) 090 { 091 setPrevious( previous ); 092 } 093 094 protected SubAssembly( String name, Pipe[] previous ) 095 { 096 super( name ); 097 setPrevious( previous ); 098 } 099 100 /** 101 * Must be called by subclasses to set the start end points of the assembly the subclass represents. 102 * 103 * @param previous of type Pipe 104 */ 105 protected void setPrevious( Pipe... previous ) 106 { 107 this.previous = previous; 108 } 109 110 /** 111 * Must be called by subclasses to set the final end points of the assembly the subclass represents. 112 * 113 * @param tails of type Pipe 114 */ 115 protected void setTails( Pipe... tails ) 116 { 117 this.tails = tails; 118 119 if( previous == null ) 120 { 121 LOG.warn( "previous pipes not set via setPrevious or constructor on: {}", this ); 122 return; 123 } 124 125 Set<Pipe> stopSet = new HashSet<Pipe>(); 126 127 Collections.addAll( stopSet, previous ); 128 129 setParent( stopSet, tails ); 130 } 131 132 private void setParent( Set<Pipe> stopSet, Pipe[] tails ) 133 { 134 if( tails == null ) 135 return; 136 137 for( Pipe tail : tails ) 138 { 139 if( stopSet.contains( tail ) ) 140 continue; 141 142 tail.setParent( this ); 143 144 Pipe[] current; 145 146 if( tail instanceof SubAssembly ) 147 current = ( (SubAssembly) tail ).previous; 148 else 149 current = tail.getPrevious(); 150 151 if( current == null && tail instanceof SubAssembly ) 152 LOG.warn( "previous pipes not set via setPrevious or constructor on: {}", tail ); 153 154 setParent( stopSet, current ); 155 } 156 } 157 158 /** 159 * Method getTails returns all the tails of this SubAssembly object. These values are set by {@link #setTails(Pipe...)}. 160 * 161 * @return the tails (type Pipe[]) of this SubAssembly object. 162 */ 163 public Pipe[] getTails() 164 { 165 return getPrevious(); // just returns a clone of tails 166 } 167 168 /** 169 * Method getTailNames returns the tailNames of this SubAssembly object. 170 * 171 * @return the tailNames (type String[]) of this SubAssembly object. 172 */ 173 public String[] getTailNames() 174 { 175 if( tails == null ) 176 throw new IllegalStateException( TraceUtil.formatRawTrace( this, "setTails must be called in the constructor" ) ); 177 178 if( names != null ) 179 return names; 180 181 names = new String[ tails.length ]; 182 183 for( int i = 0; i < tails.length; i++ ) 184 names[ i ] = tails[ i ].getName(); 185 186 return names; 187 } 188 189 @Override 190 public String getName() 191 { 192 if( name == null ) 193 name = Util.join( getTailNames(), "+" ); 194 195 return name; 196 } 197 198 @Override 199 public Pipe[] getPrevious() 200 { 201 // returns the semantically equivalent to Pipe#previous to simplify logic in the planner 202 // SubAssemblies are really aliases for their tails 203 if( tails == null ) 204 throw new IllegalStateException( TraceUtil.formatRawTrace( this, "setTails must be called after the sub-assembly is assembled" ) ); 205 206 return Arrays.copyOf( tails, tails.length ); 207 } 208 }