001    /*
002     * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.tuple.io;
022    
023    import java.io.DataInputStream;
024    import java.io.IOException;
025    import java.io.InputStream;
026    import java.util.Comparator;
027    import java.util.List;
028    
029    import cascading.tuple.Tuple;
030    import org.slf4j.Logger;
031    import org.slf4j.LoggerFactory;
032    
033    /** Class TupleInputStream is used internally to read Tuples from storage. */
034    public abstract class TupleInputStream extends DataInputStream
035      {
036      /** Field LOG */
037      private static final Logger LOG = LoggerFactory.getLogger( TupleInputStream.class );
038    
039      /** Field inputStream */
040      protected final InputStream inputStream;
041      /** Field elementReader */
042      protected final ElementReader elementReader;
043    
044      public interface ElementReader
045        {
046        Object read( int token, DataInputStream inputStream ) throws IOException;
047    
048        Comparator getComparatorFor( int type, DataInputStream inputStream ) throws IOException;
049    
050        void close();
051        }
052    
053      public TupleInputStream( InputStream inputStream, ElementReader elementReader )
054        {
055        super( inputStream );
056        this.inputStream = inputStream;
057        this.elementReader = elementReader;
058        }
059    
060      public InputStream getInputStream()
061        {
062        return inputStream;
063        }
064    
065      public Tuple readTuple() throws IOException
066        {
067        return readTuple( new Tuple() );
068        }
069    
070      public Tuple readTuple( Tuple tuple ) throws IOException
071        {
072        List<Object> elements = Tuple.elements( tuple );
073    
074        elements.clear();
075        int len = getNumElements();
076    
077        for( int i = 0; i < len; i++ )
078          elements.add( getNextElement() );
079    
080        return tuple;
081        }
082    
083      public abstract int getNumElements() throws IOException;
084    
085      public abstract int readToken() throws IOException;
086    
087      public abstract Object getNextElement() throws IOException;
088    
089      public TuplePair readTuplePair() throws IOException
090        {
091        return readTuplePair( new TuplePair() );
092        }
093    
094      public TuplePair readTuplePair( TuplePair tuplePair ) throws IOException
095        {
096        Tuple[] tuples = TuplePair.tuples( tuplePair );
097    
098        readTuple( tuples[ 0 ] ); // guaranteed to not be null
099        readTuple( tuples[ 1 ] ); // guaranteed to not be null
100    
101        return tuplePair;
102        }
103    
104      public IndexTuple readIndexTuple() throws IOException
105        {
106        return readIndexTuple( new IndexTuple() );
107        }
108    
109      public abstract IndexTuple readIndexTuple( IndexTuple indexTuple ) throws IOException;
110    
111      protected abstract Object readType( int type ) throws IOException;
112    
113      public Comparator getComparatorFor( int type ) throws IOException
114        {
115        if( type >= 0 && type <= 10 )
116          return null;
117    
118        return elementReader.getComparatorFor( type, this );
119        }
120    
121      @Override
122      public void close() throws IOException
123        {
124        LOG.debug( "closing tuple input stream" );
125    
126        try
127          {
128          super.close();
129          }
130        finally
131          {
132          if( elementReader != null )
133            elementReader.close();
134          }
135        }
136      }