001/* 002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.tap; 022 023import java.beans.ConstructorProperties; 024import java.io.IOException; 025import java.util.Arrays; 026import java.util.Collections; 027import java.util.Iterator; 028 029import cascading.flow.FlowProcess; 030import cascading.scheme.Scheme; 031import cascading.tuple.TupleEntryChainIterator; 032import cascading.tuple.TupleEntryIterator; 033import cascading.util.Util; 034 035import static java.util.Arrays.copyOf; 036 037/** 038 * Class MultiSourceTap is used to tie multiple {@link cascading.tap.Tap} instances into a single resource. Effectively this will allow 039 * multiple files to be concatenated into the requesting pipe assembly, if they all share the same {@link Scheme} instance. 040 * <p/> 041 * Note that order is not maintained by virtue of the underlying model. If order is necessary, use a unique sequence key 042 * to span the resources, like a line number. 043 * </p> 044 * Note that if multiple input files have the same Scheme (like {@link cascading.scheme.hadoop.TextLine}), they may not contain 045 * the same semi-structure internally. For example, one file might be an Apache log file, and another might be a Log4J 046 * log file. If each one should be parsed differently, then they must be handled by different pipe assembly branches. 047 */ 048public class MultiSourceTap<Child extends Tap, Config, Input> extends SourceTap<Config, Input> implements CompositeTap<Child> 049 { 050 private final String identifier = "__multisource_placeholder_" + Util.createUniqueID(); 051 protected Child[] taps; 052 053 private class TupleIterator implements Iterator 054 { 055 final TupleEntryIterator iterator; 056 057 private TupleIterator( TupleEntryIterator iterator ) 058 { 059 this.iterator = iterator; 060 } 061 062 @Override 063 public boolean hasNext() 064 { 065 return iterator.hasNext(); 066 } 067 068 @Override 069 public Object next() 070 { 071 return iterator.next().getTuple(); 072 } 073 074 @Override 075 public void remove() 076 { 077 iterator.remove(); 078 } 079 } 080 081 protected MultiSourceTap( Scheme<Config, Input, ?, ?, ?> scheme ) 082 { 083 super( scheme ); 084 } 085 086 /** 087 * Constructor MultiSourceTap creates a new MultiSourceTap instance. 088 * 089 * @param taps of type Tap... 090 */ 091 @ConstructorProperties({"taps"}) 092 public MultiSourceTap( Child... taps ) 093 { 094 this.taps = copyOf( taps, taps.length ); 095 096 verifyTaps(); 097 } 098 099 private void verifyTaps() 100 { 101 Tap tap = taps[ 0 ]; 102 103 for( int i = 1; i < taps.length; i++ ) 104 { 105 if( tap.getClass() != taps[ i ].getClass() ) 106 throw new TapException( "all taps must be of the same type" ); 107 108 if( !tap.getScheme().equals( taps[ i ].getScheme() ) ) 109 throw new TapException( "all tap schemes must be equivalent" ); 110 } 111 } 112 113 /** 114 * Method getTaps returns the taps of this MultiTap object. 115 * 116 * @return the taps (type Tap[]) of this MultiTap object. 117 */ 118 protected Child[] getTaps() 119 { 120 return taps; 121 } 122 123 @Override 124 public Iterator<Child> getChildTaps() 125 { 126 Child[] taps = getTaps(); 127 128 if( taps == null ) 129 return Collections.EMPTY_LIST.iterator(); 130 131 return Arrays.asList( taps ).iterator(); 132 } 133 134 @Override 135 public long getNumChildTaps() 136 { 137 return getTaps().length; 138 } 139 140 @Override 141 public String getIdentifier() 142 { 143 return identifier; 144 } 145 146 @Override 147 public Scheme getScheme() 148 { 149 Scheme scheme = super.getScheme(); 150 151 if( scheme != null ) 152 return scheme; 153 154 return taps[ 0 ].getScheme(); // they should all be equivalent per verifyTaps 155 } 156 157 @Override 158 public boolean isReplace() 159 { 160 return false; // cannot be used as sink 161 } 162 163 @Override 164 public void sourceConfInit( FlowProcess<? extends Config> process, Config conf ) 165 { 166 for( Tap tap : getTaps() ) 167 tap.sourceConfInit( process, conf ); 168 } 169 170 public boolean resourceExists( Config conf ) throws IOException 171 { 172 for( Tap tap : getTaps() ) 173 { 174 if( !tap.resourceExists( conf ) ) 175 return false; 176 } 177 178 return true; 179 } 180 181 /** Returns the most current modified time. */ 182 @Override 183 public long getModifiedTime( Config conf ) throws IOException 184 { 185 Tap[] taps = getTaps(); 186 187 if( taps == null || taps.length == 0 ) 188 return 0; 189 190 long modified = taps[ 0 ].getModifiedTime( conf ); 191 192 for( int i = 1; i < getTaps().length; i++ ) 193 modified = Math.max( getTaps()[ i ].getModifiedTime( conf ), modified ); 194 195 return modified; 196 } 197 198 @Override 199 public TupleEntryIterator openForRead( FlowProcess<? extends Config> flowProcess, Input input ) throws IOException 200 { 201 if( input != null ) 202 return taps[ 0 ].openForRead( flowProcess, input ); 203 204 Iterator iterators[] = new Iterator[ getTaps().length ]; 205 206 for( int i = 0; i < getTaps().length; i++ ) 207 iterators[ i ] = new TupleIterator( getTaps()[ i ].openForRead( flowProcess ) ); 208 209 return new TupleEntryChainIterator( getSourceFields(), iterators ); 210 } 211 212 public boolean equals( Object object ) 213 { 214 if( this == object ) 215 return true; 216 if( object == null || getClass() != object.getClass() ) 217 return false; 218 if( !super.equals( object ) ) 219 return false; 220 221 MultiSourceTap multiTap = (MultiSourceTap) object; 222 223 if( !Arrays.equals( getTaps(), multiTap.getTaps() ) ) 224 return false; 225 226 return true; 227 } 228 229 public int hashCode() 230 { 231 int result = super.hashCode(); 232 result = 31 * result + ( getTaps() != null ? Arrays.hashCode( getTaps() ) : 0 ); 233 return result; 234 } 235 236 public String toString() 237 { 238 Tap[] printableTaps = getTaps(); 239 240 if( printableTaps == null ) 241 return "MultiSourceTap[none]"; 242 243 String printedTaps; 244 245 if( printableTaps.length > 10 ) 246 printedTaps = Arrays.toString( copyOf( printableTaps, 10 ) ) + ",..."; 247 else 248 printedTaps = Arrays.toString( printableTaps ); 249 250 return "MultiSourceTap[" + printableTaps.length + ':' + printedTaps + ']'; 251 } 252 }