001 /* 002 * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.tuple.io; 022 023 import java.io.DataInputStream; 024 import java.io.IOException; 025 import java.io.InputStream; 026 import java.util.Comparator; 027 import java.util.List; 028 029 import cascading.tuple.Tuple; 030 import org.slf4j.Logger; 031 import org.slf4j.LoggerFactory; 032 033 /** Class TupleInputStream is used internally to read Tuples from storage. */ 034 public abstract class TupleInputStream extends DataInputStream 035 { 036 /** Field LOG */ 037 private static final Logger LOG = LoggerFactory.getLogger( TupleInputStream.class ); 038 039 /** Field inputStream */ 040 protected final InputStream inputStream; 041 /** Field elementReader */ 042 protected final ElementReader elementReader; 043 044 public interface ElementReader 045 { 046 Object read( int token, DataInputStream inputStream ) throws IOException; 047 048 Comparator getComparatorFor( int type, DataInputStream inputStream ) throws IOException; 049 050 void close(); 051 } 052 053 public TupleInputStream( InputStream inputStream, ElementReader elementReader ) 054 { 055 super( inputStream ); 056 this.inputStream = inputStream; 057 this.elementReader = elementReader; 058 } 059 060 public InputStream getInputStream() 061 { 062 return inputStream; 063 } 064 065 public Tuple readTuple() throws IOException 066 { 067 return readTuple( new Tuple() ); 068 } 069 070 public Tuple readTuple( Tuple tuple ) throws IOException 071 { 072 List<Object> elements = Tuple.elements( tuple ); 073 074 elements.clear(); 075 int len = getNumElements(); 076 077 for( int i = 0; i < len; i++ ) 078 elements.add( getNextElement() ); 079 080 return tuple; 081 } 082 083 public abstract int getNumElements() throws IOException; 084 085 public abstract int readToken() throws IOException; 086 087 public abstract Object getNextElement() throws IOException; 088 089 public TuplePair readTuplePair() throws IOException 090 { 091 return readTuplePair( new TuplePair() ); 092 } 093 094 public TuplePair readTuplePair( TuplePair tuplePair ) throws IOException 095 { 096 Tuple[] tuples = TuplePair.tuples( tuplePair ); 097 098 readTuple( tuples[ 0 ] ); // guaranteed to not be null 099 readTuple( tuples[ 1 ] ); // guaranteed to not be null 100 101 return tuplePair; 102 } 103 104 public IndexTuple readIndexTuple() throws IOException 105 { 106 return readIndexTuple( new IndexTuple() ); 107 } 108 109 public abstract IndexTuple readIndexTuple( IndexTuple indexTuple ) throws IOException; 110 111 protected abstract Object readType( int type ) throws IOException; 112 113 public Comparator getComparatorFor( int type ) throws IOException 114 { 115 if( type >= 0 && type <= 10 ) 116 return null; 117 118 return elementReader.getComparatorFor( type, this ); 119 } 120 121 @Override 122 public void close() throws IOException 123 { 124 LOG.debug( "closing tuple input stream" ); 125 126 try 127 { 128 super.close(); 129 } 130 finally 131 { 132 if( elementReader != null ) 133 elementReader.close(); 134 } 135 } 136 }