001/* 002 * Copyright (c) 2016-2017 Chris K Wensel <chris@wensel.net>. All Rights Reserved. 003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 004 * 005 * Project and contact information: http://www.cascading.org/ 006 * 007 * This file is part of the Cascading project. 008 * 009 * Licensed under the Apache License, Version 2.0 (the "License"); 010 * you may not use this file except in compliance with the License. 011 * You may obtain a copy of the License at 012 * 013 * http://www.apache.org/licenses/LICENSE-2.0 014 * 015 * Unless required by applicable law or agreed to in writing, software 016 * distributed under the License is distributed on an "AS IS" BASIS, 017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 018 * See the License for the specific language governing permissions and 019 * limitations under the License. 020 */ 021 022package cascading.tap; 023 024import java.beans.ConstructorProperties; 025import java.io.IOException; 026import java.util.Arrays; 027import java.util.Collections; 028import java.util.Iterator; 029 030import cascading.flow.FlowProcess; 031import cascading.scheme.Scheme; 032import cascading.tuple.TupleEntryChainIterator; 033import cascading.tuple.TupleEntryIterator; 034import cascading.util.Trie; 035import cascading.util.Util; 036 037import static java.util.Arrays.copyOf; 038 039/** 040 * Class MultiSourceTap is used to tie multiple {@link cascading.tap.Tap} instances into a single resource. Effectively this will allow 041 * multiple files to be concatenated into the requesting pipe assembly, if they all share the same {@link Scheme} instance. 042 * <p> 043 * Note that order is not maintained by virtue of the underlying model. If order is necessary, use a unique sequence key 044 * to span the resources, like a line number. 045 * <p> 046 * Note that if multiple input files have the same Scheme (like {@link cascading.scheme.hadoop.TextLine}), they may not contain 047 * the same semi-structure internally. For example, one file might be an Apache log file, and another might be a Log4J 048 * log file. If each one should be parsed differently, then they must be handled by different pipe assembly branches. 049 * <p> 050 * As of 3.3, MultiSourceTap can aggregate {@link cascading.scheme.hadoop.PartitionTap} and 051 * {@link cascading.scheme.local.PartitionTap} instances. 052 * <p> 053 * But it may not be safe to aggregate {@link cascading.scheme.hadoop.GlobHfs} and {@link cascading.scheme.hadoop.PartitionTap} 054 * instances as GlobHfs identifiers cannot be fully resolved, preventing the cluster side runtime from distinguishing which 055 * Tap instance to open for reading. 056 */ 057public class MultiSourceTap<Child extends Tap, Config, Input> extends SourceTap<Config, Input> implements CompositeTap<Child> 058 { 059 private final String identifier = "__multisource_placeholder_" + Util.createUniqueID(); 060 protected Child[] taps; 061 062 protected transient Trie<Child> prefixMap; 063 protected transient String commonPrefix; 064 065 private class TupleIterator implements Iterator 066 { 067 final TupleEntryIterator iterator; 068 069 private TupleIterator( TupleEntryIterator iterator ) 070 { 071 this.iterator = iterator; 072 } 073 074 @Override 075 public boolean hasNext() 076 { 077 return iterator.hasNext(); 078 } 079 080 @Override 081 public Object next() 082 { 083 return iterator.next().getTuple(); 084 } 085 086 @Override 087 public void remove() 088 { 089 iterator.remove(); 090 } 091 } 092 093 protected MultiSourceTap( Scheme<Config, Input, ?, ?, ?> scheme ) 094 { 095 super( scheme ); 096 } 097 098 /** 099 * Constructor MultiSourceTap creates a new MultiSourceTap instance. 100 * 101 * @param taps of type Tap... 102 */ 103 @ConstructorProperties({"taps"}) 104 public MultiSourceTap( Child... taps ) 105 { 106 this.taps = copyOf( taps, taps.length ); 107 108 verifyTaps(); 109 } 110 111 private void verifyTaps() 112 { 113 Tap tap = taps[ 0 ]; 114 115 for( int i = 1; i < taps.length; i++ ) 116 { 117 if( tap.getClass() != taps[ i ].getClass() ) 118 throw new TapException( "all taps must be of the same type" ); 119 120 if( !tap.getScheme().equals( taps[ i ].getScheme() ) ) 121 throw new TapException( "all tap schemes must be equivalent" ); 122 } 123 } 124 125 /** 126 * Method getTaps returns the taps of this MultiTap object. 127 * 128 * @return the taps (type Tap[]) of this MultiTap object. 129 */ 130 protected Child[] getTaps() 131 { 132 return taps; 133 } 134 135 @Override 136 public Iterator<Child> getChildTaps() 137 { 138 Child[] taps = getTaps(); 139 140 if( taps == null ) 141 return Collections.EMPTY_LIST.iterator(); 142 143 return Arrays.asList( taps ).iterator(); 144 } 145 146 @Override 147 public long getNumChildTaps() 148 { 149 return getTaps().length; 150 } 151 152 @Override 153 public String getIdentifier() 154 { 155 return identifier; 156 } 157 158 @Override 159 public Scheme getScheme() 160 { 161 Scheme scheme = super.getScheme(); 162 163 if( scheme != null ) 164 return scheme; 165 166 return taps[ 0 ].getScheme(); // they should all be equivalent per verifyTaps 167 } 168 169 @Override 170 public boolean isReplace() 171 { 172 return false; // cannot be used as sink 173 } 174 175 @Override 176 public void sourceConfInit( FlowProcess<? extends Config> process, Config conf ) 177 { 178 for( Tap tap : getTaps() ) 179 tap.sourceConfInit( process, conf ); 180 } 181 182 public boolean resourceExists( Config conf ) throws IOException 183 { 184 for( Tap tap : getTaps() ) 185 { 186 if( !tap.resourceExists( conf ) ) 187 return false; 188 } 189 190 return true; 191 } 192 193 /** Returns the most current modified time. */ 194 @Override 195 public long getModifiedTime( Config conf ) throws IOException 196 { 197 Tap[] taps = getTaps(); 198 199 if( taps == null || taps.length == 0 ) 200 return 0; 201 202 long modified = taps[ 0 ].getModifiedTime( conf ); 203 204 for( int i = 1; i < getTaps().length; i++ ) 205 modified = Math.max( getTaps()[ i ].getModifiedTime( conf ), modified ); 206 207 return modified; 208 } 209 210 @Override 211 public TupleEntryIterator openForRead( FlowProcess<? extends Config> flowProcess, Input input ) throws IOException 212 { 213 if( input != null ) 214 return findMatchingTap( flowProcess ).openForRead( flowProcess, input ); 215 216 Iterator iterators[] = new Iterator[ getTaps().length ]; 217 218 for( int i = 0; i < getTaps().length; i++ ) 219 iterators[ i ] = new TupleIterator( getTaps()[ i ].openForRead( flowProcess ) ); 220 221 return new TupleEntryChainIterator( getSourceFields(), iterators ); 222 } 223 224 protected Child findMatchingTap( FlowProcess<? extends Config> flowProcess ) 225 { 226 // todo: in Cascading4, this value can be pulled from the FlowProcessContext 227 String identifier = flowProcess.getStringProperty( "cascading.source.path" ); 228 229 // we cannot determine the actual file being processed -- this was the default until 3.3 230 if( identifier == null ) 231 return taps[ 0 ]; 232 233 Trie<Child> prefixMap = getTapPrefixMap( flowProcess ); 234 int index = identifier.indexOf( getTapsCommonPrefix( flowProcess ) ); 235 236 // the child taps represent relative paths that cannot be resolved -- eg. globs 237 if( index == -1 ) 238 return taps[ 0 ]; 239 240 Child child = prefixMap.get( identifier.substring( index ) ); 241 242 if( child == null ) 243 throw new IllegalStateException( "unable to find child having a prefix that matches: " + identifier ); 244 245 return child; 246 } 247 248 protected String getTapsCommonPrefix( FlowProcess<? extends Config> flowProcess ) 249 { 250 if( commonPrefix == null ) 251 commonPrefix = getTapPrefixMap( flowProcess ).getCommonPrefix(); 252 253 return commonPrefix; 254 } 255 256 protected Trie<Child> getTapPrefixMap( FlowProcess<? extends Config> flowProcess ) 257 { 258 if( prefixMap != null ) 259 return prefixMap; 260 261 prefixMap = new Trie<>(); 262 263 for( Child tap : taps ) 264 prefixMap.put( tap.getFullIdentifier( flowProcess ), tap ); 265 266 return prefixMap; 267 } 268 269 public boolean equals( Object object ) 270 { 271 if( this == object ) 272 return true; 273 if( object == null || getClass() != object.getClass() ) 274 return false; 275 if( !super.equals( object ) ) 276 return false; 277 278 MultiSourceTap multiTap = (MultiSourceTap) object; 279 280 if( !Arrays.equals( getTaps(), multiTap.getTaps() ) ) 281 return false; 282 283 return true; 284 } 285 286 public int hashCode() 287 { 288 int result = super.hashCode(); 289 result = 31 * result + ( getTaps() != null ? Arrays.hashCode( getTaps() ) : 0 ); 290 return result; 291 } 292 293 public String toString() 294 { 295 Tap[] printableTaps = getTaps(); 296 297 if( printableTaps == null ) 298 return "MultiSourceTap[none]"; 299 300 String printedTaps; 301 302 if( printableTaps.length > 10 ) 303 printedTaps = Arrays.toString( copyOf( printableTaps, 10 ) ) + ",..."; 304 else 305 printedTaps = Arrays.toString( printableTaps ); 306 307 return "MultiSourceTap[" + printableTaps.length + ':' + printedTaps + ']'; 308 } 309 }