001/*
002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.tap;
022
023import java.beans.ConstructorProperties;
024import java.io.IOException;
025import java.util.Arrays;
026import java.util.Collections;
027import java.util.Iterator;
028
029import cascading.flow.FlowProcess;
030import cascading.scheme.Scheme;
031import cascading.tuple.TupleEntryChainIterator;
032import cascading.tuple.TupleEntryIterator;
033import cascading.util.Util;
034
035import static java.util.Arrays.copyOf;
036
037/**
038 * Class MultiSourceTap is used to tie multiple {@link cascading.tap.Tap} instances into a single resource. Effectively this will allow
039 * multiple files to be concatenated into the requesting pipe assembly, if they all share the same {@link Scheme} instance.
040 * <p/>
041 * Note that order is not maintained by virtue of the underlying model. If order is necessary, use a unique sequence key
042 * to span the resources, like a line number.
043 * </p>
044 * Note that if multiple input files have the same Scheme (like {@link cascading.scheme.hadoop.TextLine}), they may not contain
045 * the same semi-structure internally. For example, one file might be an Apache log file, and another might be a Log4J
046 * log file. If each one should be parsed differently, then they must be handled by different pipe assembly branches.
047 */
048public class MultiSourceTap<Child extends Tap, Config, Input> extends SourceTap<Config, Input> implements CompositeTap<Child>
049  {
050  private final String identifier = "__multisource_placeholder_" + Util.createUniqueID();
051  protected Child[] taps;
052
053  private class TupleIterator implements Iterator
054    {
055    final TupleEntryIterator iterator;
056
057    private TupleIterator( TupleEntryIterator iterator )
058      {
059      this.iterator = iterator;
060      }
061
062    @Override
063    public boolean hasNext()
064      {
065      return iterator.hasNext();
066      }
067
068    @Override
069    public Object next()
070      {
071      return iterator.next().getTuple();
072      }
073
074    @Override
075    public void remove()
076      {
077      iterator.remove();
078      }
079    }
080
081  protected MultiSourceTap( Scheme<Config, Input, ?, ?, ?> scheme )
082    {
083    super( scheme );
084    }
085
086  /**
087   * Constructor MultiSourceTap creates a new MultiSourceTap instance.
088   *
089   * @param taps of type Tap...
090   */
091  @ConstructorProperties({"taps"})
092  public MultiSourceTap( Child... taps )
093    {
094    this.taps = copyOf( taps, taps.length );
095
096    verifyTaps();
097    }
098
099  private void verifyTaps()
100    {
101    Tap tap = taps[ 0 ];
102
103    for( int i = 1; i < taps.length; i++ )
104      {
105      if( tap.getClass() != taps[ i ].getClass() )
106        throw new TapException( "all taps must be of the same type" );
107
108      if( !tap.getScheme().equals( taps[ i ].getScheme() ) )
109        throw new TapException( "all tap schemes must be equivalent" );
110      }
111    }
112
113  /**
114   * Method getTaps returns the taps of this MultiTap object.
115   *
116   * @return the taps (type Tap[]) of this MultiTap object.
117   */
118  protected Child[] getTaps()
119    {
120    return taps;
121    }
122
123  @Override
124  public Iterator<Child> getChildTaps()
125    {
126    Child[] taps = getTaps();
127
128    if( taps == null )
129      return Collections.EMPTY_LIST.iterator();
130
131    return Arrays.asList( taps ).iterator();
132    }
133
134  @Override
135  public long getNumChildTaps()
136    {
137    return getTaps().length;
138    }
139
140  @Override
141  public String getIdentifier()
142    {
143    return identifier;
144    }
145
146  @Override
147  public Scheme getScheme()
148    {
149    Scheme scheme = super.getScheme();
150
151    if( scheme != null )
152      return scheme;
153
154    return taps[ 0 ].getScheme(); // they should all be equivalent per verifyTaps
155    }
156
157  @Override
158  public boolean isReplace()
159    {
160    return false; // cannot be used as sink
161    }
162
163  @Override
164  public void sourceConfInit( FlowProcess<? extends Config> process, Config conf )
165    {
166    for( Tap tap : getTaps() )
167      tap.sourceConfInit( process, conf );
168    }
169
170  public boolean resourceExists( Config conf ) throws IOException
171    {
172    for( Tap tap : getTaps() )
173      {
174      if( !tap.resourceExists( conf ) )
175        return false;
176      }
177
178    return true;
179    }
180
181  /** Returns the most current modified time. */
182  @Override
183  public long getModifiedTime( Config conf ) throws IOException
184    {
185    Tap[] taps = getTaps();
186
187    if( taps == null || taps.length == 0 )
188      return 0;
189
190    long modified = taps[ 0 ].getModifiedTime( conf );
191
192    for( int i = 1; i < getTaps().length; i++ )
193      modified = Math.max( getTaps()[ i ].getModifiedTime( conf ), modified );
194
195    return modified;
196    }
197
198  @Override
199  public TupleEntryIterator openForRead( FlowProcess<? extends Config> flowProcess, Input input ) throws IOException
200    {
201    if( input != null )
202      return taps[ 0 ].openForRead( flowProcess, input );
203
204    Iterator iterators[] = new Iterator[ getTaps().length ];
205
206    for( int i = 0; i < getTaps().length; i++ )
207      iterators[ i ] = new TupleIterator( getTaps()[ i ].openForRead( flowProcess ) );
208
209    return new TupleEntryChainIterator( getSourceFields(), iterators );
210    }
211
212  public boolean equals( Object object )
213    {
214    if( this == object )
215      return true;
216    if( object == null || getClass() != object.getClass() )
217      return false;
218    if( !super.equals( object ) )
219      return false;
220
221    MultiSourceTap multiTap = (MultiSourceTap) object;
222
223    if( !Arrays.equals( getTaps(), multiTap.getTaps() ) )
224      return false;
225
226    return true;
227    }
228
229  public int hashCode()
230    {
231    int result = super.hashCode();
232    result = 31 * result + ( getTaps() != null ? Arrays.hashCode( getTaps() ) : 0 );
233    return result;
234    }
235
236  public String toString()
237    {
238    Tap[] printableTaps = getTaps();
239
240    if( printableTaps == null )
241      return "MultiSourceTap[none]";
242
243    String printedTaps;
244
245    if( printableTaps.length > 10 )
246      printedTaps = Arrays.toString( copyOf( printableTaps, 10 ) ) + ",...";
247    else
248      printedTaps = Arrays.toString( printableTaps );
249
250    return "MultiSourceTap[" + printableTaps.length + ':' + printedTaps + ']';
251    }
252  }