001/*
002 * Copyright (c) 2016-2017 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
004 *
005 * Project and contact information: http://www.cascading.org/
006 *
007 * This file is part of the Cascading project.
008 *
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *     http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 */
021
022package cascading.tap;
023
024import java.beans.ConstructorProperties;
025import java.io.IOException;
026import java.util.Arrays;
027import java.util.Collections;
028import java.util.Iterator;
029
030import cascading.flow.FlowProcess;
031import cascading.scheme.Scheme;
032import cascading.tuple.TupleEntryChainIterator;
033import cascading.tuple.TupleEntryIterator;
034import cascading.util.Trie;
035import cascading.util.Util;
036
037import static java.util.Arrays.copyOf;
038
039/**
040 * Class MultiSourceTap is used to tie multiple {@link cascading.tap.Tap} instances into a single resource. Effectively this will allow
041 * multiple files to be concatenated into the requesting pipe assembly, if they all share the same {@link Scheme} instance.
042 * <p>
043 * Note that order is not maintained by virtue of the underlying model. If order is necessary, use a unique sequence key
044 * to span the resources, like a line number.
045 * <p>
046 * Note that if multiple input files have the same Scheme (like {@link cascading.scheme.hadoop.TextLine}), they may not contain
047 * the same semi-structure internally. For example, one file might be an Apache log file, and another might be a Log4J
048 * log file. If each one should be parsed differently, then they must be handled by different pipe assembly branches.
049 * <p>
050 * As of 3.3, MultiSourceTap can aggregate {@link cascading.scheme.hadoop.PartitionTap} and
051 * {@link cascading.scheme.local.PartitionTap} instances.
052 * <p>
053 * But it may not be safe to aggregate {@link cascading.scheme.hadoop.GlobHfs} and {@link cascading.scheme.hadoop.PartitionTap}
054 * instances as GlobHfs identifiers cannot be fully resolved, preventing the cluster side runtime from distinguishing which
055 * Tap instance to open for reading.
056 */
057public class MultiSourceTap<Child extends Tap, Config, Input> extends SourceTap<Config, Input> implements CompositeTap<Child>
058  {
059  private final String identifier = "__multisource_placeholder_" + Util.createUniqueID();
060  protected Child[] taps;
061
062  protected transient Trie<Child> prefixMap;
063  protected transient String commonPrefix;
064
065  private class TupleIterator implements Iterator
066    {
067    final TupleEntryIterator iterator;
068
069    private TupleIterator( TupleEntryIterator iterator )
070      {
071      this.iterator = iterator;
072      }
073
074    @Override
075    public boolean hasNext()
076      {
077      return iterator.hasNext();
078      }
079
080    @Override
081    public Object next()
082      {
083      return iterator.next().getTuple();
084      }
085
086    @Override
087    public void remove()
088      {
089      iterator.remove();
090      }
091    }
092
093  protected MultiSourceTap( Scheme<Config, Input, ?, ?, ?> scheme )
094    {
095    super( scheme );
096    }
097
098  /**
099   * Constructor MultiSourceTap creates a new MultiSourceTap instance.
100   *
101   * @param taps of type Tap...
102   */
103  @ConstructorProperties({"taps"})
104  public MultiSourceTap( Child... taps )
105    {
106    this.taps = copyOf( taps, taps.length );
107
108    verifyTaps();
109    }
110
111  private void verifyTaps()
112    {
113    Tap tap = taps[ 0 ];
114
115    for( int i = 1; i < taps.length; i++ )
116      {
117      if( tap.getClass() != taps[ i ].getClass() )
118        throw new TapException( "all taps must be of the same type" );
119
120      if( !tap.getScheme().equals( taps[ i ].getScheme() ) )
121        throw new TapException( "all tap schemes must be equivalent" );
122      }
123    }
124
125  /**
126   * Method getTaps returns the taps of this MultiTap object.
127   *
128   * @return the taps (type Tap[]) of this MultiTap object.
129   */
130  protected Child[] getTaps()
131    {
132    return taps;
133    }
134
135  @Override
136  public Iterator<Child> getChildTaps()
137    {
138    Child[] taps = getTaps();
139
140    if( taps == null )
141      return Collections.EMPTY_LIST.iterator();
142
143    return Arrays.asList( taps ).iterator();
144    }
145
146  @Override
147  public long getNumChildTaps()
148    {
149    return getTaps().length;
150    }
151
152  @Override
153  public String getIdentifier()
154    {
155    return identifier;
156    }
157
158  @Override
159  public Scheme getScheme()
160    {
161    Scheme scheme = super.getScheme();
162
163    if( scheme != null )
164      return scheme;
165
166    return taps[ 0 ].getScheme(); // they should all be equivalent per verifyTaps
167    }
168
169  @Override
170  public boolean isReplace()
171    {
172    return false; // cannot be used as sink
173    }
174
175  @Override
176  public void sourceConfInit( FlowProcess<? extends Config> process, Config conf )
177    {
178    for( Tap tap : getTaps() )
179      tap.sourceConfInit( process, conf );
180    }
181
182  public boolean resourceExists( Config conf ) throws IOException
183    {
184    for( Tap tap : getTaps() )
185      {
186      if( !tap.resourceExists( conf ) )
187        return false;
188      }
189
190    return true;
191    }
192
193  /** Returns the most current modified time. */
194  @Override
195  public long getModifiedTime( Config conf ) throws IOException
196    {
197    Tap[] taps = getTaps();
198
199    if( taps == null || taps.length == 0 )
200      return 0;
201
202    long modified = taps[ 0 ].getModifiedTime( conf );
203
204    for( int i = 1; i < getTaps().length; i++ )
205      modified = Math.max( getTaps()[ i ].getModifiedTime( conf ), modified );
206
207    return modified;
208    }
209
210  @Override
211  public TupleEntryIterator openForRead( FlowProcess<? extends Config> flowProcess, Input input ) throws IOException
212    {
213    if( input != null )
214      return findMatchingTap( flowProcess ).openForRead( flowProcess, input );
215
216    Iterator iterators[] = new Iterator[ getTaps().length ];
217
218    for( int i = 0; i < getTaps().length; i++ )
219      iterators[ i ] = new TupleIterator( getTaps()[ i ].openForRead( flowProcess ) );
220
221    return new TupleEntryChainIterator( getSourceFields(), iterators );
222    }
223
224  protected Child findMatchingTap( FlowProcess<? extends Config> flowProcess )
225    {
226    // todo: in Cascading4, this value can be pulled from the FlowProcessContext
227    String identifier = flowProcess.getStringProperty( "cascading.source.path" );
228
229    // we cannot determine the actual file being processed -- this was the default until 3.3
230    if( identifier == null )
231      return taps[ 0 ];
232
233    Trie<Child> prefixMap = getTapPrefixMap( flowProcess );
234    int index = identifier.indexOf( getTapsCommonPrefix( flowProcess ) );
235
236    // the child taps represent relative paths that cannot be resolved -- eg. globs
237    if( index == -1 )
238      return taps[ 0 ];
239
240    Child child = prefixMap.get( identifier.substring( index ) );
241
242    if( child == null )
243      throw new IllegalStateException( "unable to find child having a prefix that matches: " + identifier );
244
245    return child;
246    }
247
248  protected String getTapsCommonPrefix( FlowProcess<? extends Config> flowProcess )
249    {
250    if( commonPrefix == null )
251      commonPrefix = getTapPrefixMap( flowProcess ).getCommonPrefix();
252
253    return commonPrefix;
254    }
255
256  protected Trie<Child> getTapPrefixMap( FlowProcess<? extends Config> flowProcess )
257    {
258    if( prefixMap != null )
259      return prefixMap;
260
261    prefixMap = new Trie<>();
262
263    for( Child tap : taps )
264      prefixMap.put( tap.getFullIdentifier( flowProcess ), tap );
265
266    return prefixMap;
267    }
268
269  public boolean equals( Object object )
270    {
271    if( this == object )
272      return true;
273    if( object == null || getClass() != object.getClass() )
274      return false;
275    if( !super.equals( object ) )
276      return false;
277
278    MultiSourceTap multiTap = (MultiSourceTap) object;
279
280    if( !Arrays.equals( getTaps(), multiTap.getTaps() ) )
281      return false;
282
283    return true;
284    }
285
286  public int hashCode()
287    {
288    int result = super.hashCode();
289    result = 31 * result + ( getTaps() != null ? Arrays.hashCode( getTaps() ) : 0 );
290    return result;
291    }
292
293  public String toString()
294    {
295    Tap[] printableTaps = getTaps();
296
297    if( printableTaps == null )
298      return "MultiSourceTap[none]";
299
300    String printedTaps;
301
302    if( printableTaps.length > 10 )
303      printedTaps = Arrays.toString( copyOf( printableTaps, 10 ) ) + ",...";
304    else
305      printedTaps = Arrays.toString( printableTaps );
306
307    return "MultiSourceTap[" + printableTaps.length + ':' + printedTaps + ']';
308    }
309  }