001/* 002 * Copyright (c) 2016-2017 Chris K Wensel. All Rights Reserved. 003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 004 * 005 * Project and contact information: http://www.cascading.org/ 006 * 007 * This file is part of the Cascading project. 008 * 009 * Licensed under the Apache License, Version 2.0 (the "License"); 010 * you may not use this file except in compliance with the License. 011 * You may obtain a copy of the License at 012 * 013 * http://www.apache.org/licenses/LICENSE-2.0 014 * 015 * Unless required by applicable law or agreed to in writing, software 016 * distributed under the License is distributed on an "AS IS" BASIS, 017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 018 * See the License for the specific language governing permissions and 019 * limitations under the License. 020 */ 021 022package cascading.tuple.hadoop.io; 023 024import java.io.DataInputStream; 025import java.io.IOException; 026import java.io.InputStream; 027import java.util.IdentityHashMap; 028import java.util.Map; 029 030import cascading.tuple.Tuple; 031import cascading.tuple.io.IndexTuple; 032import cascading.tuple.io.TupleInputStream; 033import cascading.tuple.io.TuplePair; 034import org.apache.hadoop.io.WritableUtils; 035 036/** 037 * 038 */ 039public class HadoopTupleInputStream extends TupleInputStream 040 { 041 private static final Map<Class, TupleElementReader> staticTupleUnTypedElementReaders = new IdentityHashMap<>(); 042 private static final Map<Class, TupleElementReader> staticTupleTypedElementReaders = new IdentityHashMap<>(); 043 044 static 045 { 046 // typed 047 048 staticTupleTypedElementReaders.put( Void.class, (TupleElementReader<HadoopTupleInputStream>) stream -> null ); 049 050 staticTupleTypedElementReaders.put( String.class, (TupleElementReader<HadoopTupleInputStream>) HadoopTupleInputStream::readString ); 051 052 staticTupleTypedElementReaders.put( Float.class, (TupleElementReader<HadoopTupleInputStream>) HadoopTupleInputStream::readNullFloat ); 053 054 staticTupleTypedElementReaders.put( Double.class, (TupleElementReader<HadoopTupleInputStream>) HadoopTupleInputStream::readNullDouble ); 055 056 staticTupleTypedElementReaders.put( Integer.class, (TupleElementReader<HadoopTupleInputStream>) HadoopTupleInputStream::readNullVInt ); 057 058 staticTupleTypedElementReaders.put( Long.class, (TupleElementReader<HadoopTupleInputStream>) HadoopTupleInputStream::readNullVLong ); 059 060 staticTupleTypedElementReaders.put( Boolean.class, (TupleElementReader<HadoopTupleInputStream>) HadoopTupleInputStream::readNullBoolean ); 061 062 staticTupleTypedElementReaders.put( Short.class, (TupleElementReader<HadoopTupleInputStream>) HadoopTupleInputStream::readNullShort ); 063 064 staticTupleTypedElementReaders.put( Float.TYPE, (TupleElementReader<HadoopTupleInputStream>) DataInputStream::readFloat ); 065 066 staticTupleTypedElementReaders.put( Double.TYPE, (TupleElementReader<HadoopTupleInputStream>) DataInputStream::readDouble ); 067 068 staticTupleTypedElementReaders.put( Integer.TYPE, (TupleElementReader<HadoopTupleInputStream>) HadoopTupleInputStream::readVInt ); 069 070 staticTupleTypedElementReaders.put( Long.TYPE, (TupleElementReader<HadoopTupleInputStream>) HadoopTupleInputStream::readVLong ); 071 072 staticTupleTypedElementReaders.put( Boolean.TYPE, (TupleElementReader<HadoopTupleInputStream>) DataInputStream::readBoolean ); 073 074 staticTupleTypedElementReaders.put( Short.TYPE, (TupleElementReader<HadoopTupleInputStream>) DataInputStream::readShort ); 075 076 staticTupleTypedElementReaders.put( Tuple.class, (TupleElementReader<HadoopTupleInputStream>) TupleInputStream::readTuple ); 077 078 staticTupleTypedElementReaders.put( TuplePair.class, (TupleElementReader<HadoopTupleInputStream>) TupleInputStream::readTuplePair ); 079 080 staticTupleTypedElementReaders.put( IndexTuple.class, (TupleElementReader<HadoopTupleInputStream>) TupleInputStream::readIndexTuple ); 081 } 082 083 public static TupleElementReader[] getReadersFor( final ElementReader elementReader, final Class[] classes ) 084 { 085 if( classes == null || classes.length == 0 ) 086 return null; 087 088 TupleElementReader[] readers = new TupleElementReader[ classes.length ]; 089 090 for( int i = 0; i < classes.length; i++ ) 091 { 092 TupleElementReader reader = staticTupleTypedElementReaders.get( classes[ i ] ); 093 094 if( reader != null ) 095 { 096 readers[ i ] = reader; 097 } 098 else 099 { 100 final int index = i; 101 readers[ i ] = stream -> elementReader.read( classes[ index ], stream ); 102 } 103 } 104 105 return readers; 106 } 107 108 public HadoopTupleInputStream( InputStream inputStream, ElementReader elementReader ) 109 { 110 super( inputStream, elementReader ); 111 } 112 113 public int getNumElements() throws IOException 114 { 115 return readVInt(); 116 } 117 118 public int readToken() throws IOException 119 { 120 return readVInt(); 121 } 122 123 public Object getNextElement() throws IOException 124 { 125 return readType( readToken() ); 126 } 127 128 public IndexTuple readIndexTuple( IndexTuple tuple ) throws IOException 129 { 130 tuple.setIndex( readVInt() ); 131 tuple.setTuple( readTuple() ); 132 133 return tuple; 134 } 135 136 public Long readNullVLong() throws IOException 137 { 138 byte b = this.readByte(); 139 140 if( b == 0 ) 141 return null; 142 143 return WritableUtils.readVLong( this ); 144 } 145 146 public long readVLong() throws IOException 147 { 148 return WritableUtils.readVLong( this ); 149 } 150 151 public Integer readNullVInt() throws IOException 152 { 153 byte b = this.readByte(); 154 155 if( b == 0 ) 156 return null; 157 158 return WritableUtils.readVInt( this ); 159 } 160 161 public int readVInt() throws IOException 162 { 163 return WritableUtils.readVInt( this ); 164 } 165 166 public String readString() throws IOException 167 { 168 return WritableUtils.readString( this ); 169 } 170 171 private Short readNullShort() throws IOException 172 { 173 byte b = this.readByte(); 174 175 if( b == 0 ) 176 return null; 177 178 return readShort(); 179 } 180 181 private Object readNullBoolean() throws IOException 182 { 183 byte b = this.readByte(); 184 185 if( b == 0 ) 186 return null; 187 188 return readBoolean(); 189 } 190 191 private Object readNullDouble() throws IOException 192 { 193 byte b = this.readByte(); 194 195 if( b == 0 ) 196 return null; 197 198 return readDouble(); 199 } 200 201 private Object readNullFloat() throws IOException 202 { 203 byte b = this.readByte(); 204 205 if( b == 0 ) 206 return null; 207 208 return readFloat(); 209 } 210 211 protected final Object readType( int type ) throws IOException 212 { 213 switch( type ) 214 { 215 case 0: 216 return null; 217 case 1: 218 return readString(); 219 case 2: 220 return readFloat(); 221 case 3: 222 return readDouble(); 223 case 4: 224 return readVInt(); 225 case 5: 226 return readVLong(); 227 case 6: 228 return readBoolean(); 229 case 7: 230 return readShort(); 231 case 8: 232 return readTuple(); 233 case 9: 234 return readTuplePair(); 235 case 10: 236 return readIndexTuple(); 237 default: 238 return elementReader.read( type, this ); 239 } 240 } 241 242 public final Object readType( Class type ) throws IOException 243 { 244 if( type == Void.class ) 245 return null; 246 247 if( type == String.class ) 248 return readString(); 249 250 if( type == Float.class ) 251 return readNullFloat(); 252 if( type == Double.class ) 253 return readNullDouble(); 254 if( type == Integer.class ) 255 return readNullVInt(); 256 if( type == Long.class ) 257 return readNullVLong(); 258 if( type == Boolean.class ) 259 return readNullBoolean(); 260 if( type == Short.class ) 261 return readNullShort(); 262 263 if( type == Float.TYPE ) 264 return readFloat(); 265 if( type == Double.TYPE ) 266 return readDouble(); 267 if( type == Integer.TYPE ) 268 return readVInt(); 269 if( type == Long.TYPE ) 270 return readVLong(); 271 if( type == Boolean.TYPE ) 272 return readBoolean(); 273 if( type == Short.TYPE ) 274 return readShort(); 275 276 if( type == Tuple.class ) 277 return readTuple(); 278 if( type == TuplePair.class ) 279 return readTuplePair(); 280 if( type == IndexTuple.class ) 281 return readIndexTuple(); 282 else 283 return elementReader.read( type, this ); 284 } 285 }