001/* 002 * Copyright (c) 2016-2017 Chris K Wensel <chris@wensel.net>. All Rights Reserved. 003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 004 * 005 * Project and contact information: http://www.cascading.org/ 006 * 007 * This file is part of the Cascading project. 008 * 009 * Licensed under the Apache License, Version 2.0 (the "License"); 010 * you may not use this file except in compliance with the License. 011 * You may obtain a copy of the License at 012 * 013 * http://www.apache.org/licenses/LICENSE-2.0 014 * 015 * Unless required by applicable law or agreed to in writing, software 016 * distributed under the License is distributed on an "AS IS" BASIS, 017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 018 * See the License for the specific language governing permissions and 019 * limitations under the License. 020 */ 021 022package cascading.pipe; 023 024import java.util.Arrays; 025import java.util.Collections; 026import java.util.HashSet; 027import java.util.Set; 028 029import cascading.util.TraceUtil; 030import cascading.util.Util; 031import org.slf4j.Logger; 032import org.slf4j.LoggerFactory; 033 034/** 035 * Subclasses of SubAssembly encapsulate complex assemblies of {@link Pipe}s so they my be reused in the same manner 036 * a Pipe is used. 037 * <p> 038 * That is, a typical SubAssembly subclass will accept a 'previous' Pipe instance, and a few 039 * arguments for configuring the resulting sub-assembly. 040 * <p> 041 * The previous pipe (or pipes) must be passed on the super constructor, or set via {@link #setPrevious(Pipe...)}. This 042 * allows the current SubAssembly to become the parent of any Pipe instances between the previous and the tails, 043 * exclusive of the previous, and inclusive of the tails. 044 * <p> 045 * Subsequently all tail Pipes must be set via the {@link #setTails(Pipe...)} method. 046 * <p> 047 * Note if the SubAssembly represents a split in the pipeline process, 048 * all the 'tails' of the assembly must be passed to {@link #setTails(Pipe...)}. It is up the the developer to 049 * provide any other access to the tails so they may be chained into any subsequent Pipes. 050 * <p> 051 * Any {@link cascading.property.ConfigDef} values on this SubAssembly will be honored by child Pipe instances via the 052 * {@link cascading.pipe.Pipe#getParent()} back link described above. 053 */ 054public abstract class SubAssembly extends Pipe 055 { 056 private static final Logger LOG = LoggerFactory.getLogger( SubAssembly.class ); 057 058 /** Field previous */ 059 private Pipe[] previous; // actual previous pipes instances 060 /** Field tails */ 061 private Pipe[] tails; 062 063 private transient String[] names; 064 065 /** 066 * Is responsible for unwinding nested SubAssembly instances. 067 * 068 * @param tails of type Pipe[] 069 * @return a Pipe[] 070 */ 071 public static Pipe[] unwind( Pipe... tails ) 072 { 073 Set<Pipe> previous = new HashSet<Pipe>(); 074 075 for( Pipe pipe : tails ) 076 { 077 if( pipe instanceof SubAssembly ) 078 Collections.addAll( previous, unwind( pipe.getPrevious() ) ); 079 else 080 previous.add( pipe ); 081 } 082 083 return previous.toArray( new Pipe[ previous.size() ] ); 084 } 085 086 protected SubAssembly() 087 { 088 } 089 090 protected SubAssembly( Pipe... previous ) 091 { 092 setPrevious( previous ); 093 } 094 095 protected SubAssembly( String name, Pipe[] previous ) 096 { 097 super( name ); 098 setPrevious( previous ); 099 } 100 101 /** 102 * Must be called by subclasses to set the start end points of the assembly the subclass represents. 103 * 104 * @param previous of type Pipe 105 */ 106 protected void setPrevious( Pipe... previous ) 107 { 108 if( this.previous != null ) 109 throw new IllegalStateException( "this method may only be called once" ); 110 111 this.previous = previous; 112 } 113 114 /** 115 * Must be called by subclasses to set the final end points of the assembly the subclass represents. 116 * 117 * @param tails of type Pipe 118 */ 119 protected void setTails( Pipe... tails ) 120 { 121 if( this.tails != null ) 122 throw new IllegalStateException( "this method may only be called once" ); 123 124 this.tails = tails; 125 126 if( previous == null ) 127 { 128 LOG.warn( "previous pipes not set via setPrevious or constructor on: {}", this ); 129 return; 130 } 131 132 Set<Pipe> stopSet = new HashSet<Pipe>(); 133 134 Collections.addAll( stopSet, previous ); 135 136 setParent( stopSet, tails ); 137 } 138 139 private void setParent( Set<Pipe> stopSet, Pipe[] tails ) 140 { 141 if( tails == null ) 142 return; 143 144 for( Pipe tail : tails ) 145 { 146 if( stopSet.contains( tail ) ) 147 continue; 148 149 tail.setParent( this ); 150 151 Pipe[] current; 152 153 if( tail instanceof SubAssembly ) 154 current = ( (SubAssembly) tail ).previous; 155 else 156 current = tail.getPrevious(); 157 158 if( current == null && tail instanceof SubAssembly ) 159 LOG.warn( "previous pipes not set via setPrevious or constructor on: {}", tail ); 160 161 setParent( stopSet, current ); 162 } 163 } 164 165 /** 166 * Method getTails returns all the tails of this SubAssembly object. These values are set by {@link #setTails(Pipe...)}. 167 * 168 * @return the tails (type Pipe[]) of this SubAssembly object. 169 */ 170 public Pipe[] getTails() 171 { 172 return getPrevious(); // just returns a clone of tails 173 } 174 175 /** 176 * Method getTailNames returns the tailNames of this SubAssembly object. 177 * 178 * @return the tailNames (type String[]) of this SubAssembly object. 179 */ 180 public String[] getTailNames() 181 { 182 if( tails == null ) 183 throw new IllegalStateException( TraceUtil.formatRawTrace( this, "setTails must be called in the constructor" ) ); 184 185 if( names != null ) 186 return names; 187 188 names = new String[ tails.length ]; 189 190 for( int i = 0; i < tails.length; i++ ) 191 names[ i ] = tails[ i ].getName(); 192 193 return names; 194 } 195 196 @Override 197 public String getName() 198 { 199 if( name == null ) 200 name = Util.join( getTailNames(), "+" ); 201 202 return name; 203 } 204 205 @Override 206 public Pipe[] getPrevious() 207 { 208 // returns the semantically equivalent to Pipe#previous to simplify logic in the planner 209 // SubAssemblies are really aliases for their tails 210 if( tails == null ) 211 throw new IllegalStateException( TraceUtil.formatRawTrace( this, "setTails must be called after the sub-assembly is assembled" ) ); 212 213 return Arrays.copyOf( tails, tails.length ); 214 } 215 }