001    /*
002     * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.tap;
022    
023    import java.beans.ConstructorProperties;
024    import java.io.IOException;
025    import java.util.Arrays;
026    import java.util.Collections;
027    import java.util.Iterator;
028    
029    import cascading.flow.FlowProcess;
030    import cascading.scheme.Scheme;
031    import cascading.tuple.TupleEntryChainIterator;
032    import cascading.tuple.TupleEntryIterator;
033    import cascading.util.Util;
034    
035    import static java.util.Arrays.copyOf;
036    
037    /**
038     * Class MultiSourceTap is used to tie multiple {@link cascading.tap.Tap} instances into a single resource. Effectively this will allow
039     * multiple files to be concatenated into the requesting pipe assembly, if they all share the same {@link Scheme} instance.
040     * <p/>
041     * Note that order is not maintained by virtue of the underlying model. If order is necessary, use a unique sequence key
042     * to span the resources, like a line number.
043     * </p>
044     * Note that if multiple input files have the same Scheme (like {@link cascading.scheme.hadoop.TextLine}), they may not contain
045     * the same semi-structure internally. For example, one file might be an Apache log file, and another might be a Log4J
046     * log file. If each one should be parsed differently, then they must be handled by different pipe assembly branches.
047     */
048    public class MultiSourceTap<Child extends Tap, Config, Input> extends SourceTap<Config, Input> implements CompositeTap<Child>
049      {
050      private final String identifier = "__multisource_placeholder_" + Util.createUniqueID();
051      protected Child[] taps;
052    
053      private class TupleIterator implements Iterator
054        {
055        final TupleEntryIterator iterator;
056    
057        private TupleIterator( TupleEntryIterator iterator )
058          {
059          this.iterator = iterator;
060          }
061    
062        @Override
063        public boolean hasNext()
064          {
065          return iterator.hasNext();
066          }
067    
068        @Override
069        public Object next()
070          {
071          return iterator.next().getTuple();
072          }
073    
074        @Override
075        public void remove()
076          {
077          iterator.remove();
078          }
079        }
080    
081      protected MultiSourceTap( Scheme<Config, Input, ?, ?, ?> scheme )
082        {
083        super( scheme );
084        }
085    
086      /**
087       * Constructor MultiSourceTap creates a new MultiSourceTap instance.
088       *
089       * @param taps of type Tap...
090       */
091      @ConstructorProperties({"taps"})
092      public MultiSourceTap( Child... taps )
093        {
094        this.taps = copyOf( taps, taps.length );
095    
096        verifyTaps();
097        }
098    
099      private void verifyTaps()
100        {
101        Tap tap = taps[ 0 ];
102    
103        for( int i = 1; i < taps.length; i++ )
104          {
105          if( tap.getClass() != taps[ i ].getClass() )
106            throw new TapException( "all taps must be of the same type" );
107    
108          if( !tap.getScheme().equals( taps[ i ].getScheme() ) )
109            throw new TapException( "all tap schemes must be equivalent" );
110          }
111        }
112    
113      /**
114       * Method getTaps returns the taps of this MultiTap object.
115       *
116       * @return the taps (type Tap[]) of this MultiTap object.
117       */
118      protected Child[] getTaps()
119        {
120        return taps;
121        }
122    
123      @Override
124      public Iterator<Child> getChildTaps()
125        {
126        Child[] taps = getTaps();
127    
128        if( taps == null )
129          return Collections.EMPTY_LIST.iterator();
130    
131        return Arrays.asList( taps ).iterator();
132        }
133    
134      @Override
135      public long getNumChildTaps()
136        {
137        return getTaps().length;
138        }
139    
140      @Override
141      public String getIdentifier()
142        {
143        return identifier;
144        }
145    
146      @Override
147      public Scheme getScheme()
148        {
149        Scheme scheme = super.getScheme();
150    
151        if( scheme != null )
152          return scheme;
153    
154        return taps[ 0 ].getScheme(); // they should all be equivalent per verifyTaps
155        }
156    
157      @Override
158      public boolean isReplace()
159        {
160        return false; // cannot be used as sink
161        }
162    
163      @Override
164      public void sourceConfInit( FlowProcess<Config> process, Config conf )
165        {
166        for( Tap tap : getTaps() )
167          tap.sourceConfInit( process, conf );
168        }
169    
170      public boolean resourceExists( Config conf ) throws IOException
171        {
172        for( Tap tap : getTaps() )
173          {
174          if( !tap.resourceExists( conf ) )
175            return false;
176          }
177    
178        return true;
179        }
180    
181      /** Returns the most current modified time. */
182      @Override
183      public long getModifiedTime( Config conf ) throws IOException
184        {
185        Tap[] taps = getTaps();
186    
187        if( taps == null || taps.length == 0 )
188          return 0;
189    
190        long modified = taps[ 0 ].getModifiedTime( conf );
191    
192        for( int i = 1; i < getTaps().length; i++ )
193          modified = Math.max( getTaps()[ i ].getModifiedTime( conf ), modified );
194    
195        return modified;
196        }
197    
198      @Override
199      public TupleEntryIterator openForRead( FlowProcess<Config> flowProcess, Input input ) throws IOException
200        {
201        if( input != null )
202          return taps[ 0 ].openForRead( flowProcess, input );
203    
204        Iterator iterators[] = new Iterator[ getTaps().length ];
205    
206        for( int i = 0; i < getTaps().length; i++ )
207          iterators[ i ] = new TupleIterator( getTaps()[ i ].openForRead( flowProcess ) );
208    
209        return new TupleEntryChainIterator( getSourceFields(), iterators );
210        }
211    
212      public boolean equals( Object object )
213        {
214        if( this == object )
215          return true;
216        if( object == null || getClass() != object.getClass() )
217          return false;
218        if( !super.equals( object ) )
219          return false;
220    
221        MultiSourceTap multiTap = (MultiSourceTap) object;
222    
223        if( !Arrays.equals( getTaps(), multiTap.getTaps() ) )
224          return false;
225    
226        return true;
227        }
228    
229      public int hashCode()
230        {
231        int result = super.hashCode();
232        result = 31 * result + ( getTaps() != null ? Arrays.hashCode( getTaps() ) : 0 );
233        return result;
234        }
235    
236      public String toString()
237        {
238        Tap[] printableTaps = getTaps();
239    
240        if( printableTaps == null )
241          return "MultiSourceTap[none]";
242    
243        String printedTaps;
244    
245        if( printableTaps.length > 10 )
246          printedTaps = Arrays.toString( copyOf( printableTaps, 10 ) ) + ",...";
247        else
248          printedTaps = Arrays.toString( printableTaps );
249    
250        return "MultiSourceTap[" + printableTaps.length + ':' + printedTaps + ']';
251        }
252      }