001/*
002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.tuple.hadoop.io;
022
023import java.io.IOException;
024import java.io.InputStream;
025import java.util.IdentityHashMap;
026import java.util.Map;
027
028import cascading.tuple.Tuple;
029import cascading.tuple.io.IndexTuple;
030import cascading.tuple.io.TupleInputStream;
031import cascading.tuple.io.TuplePair;
032import org.apache.hadoop.io.WritableUtils;
033
034/**
035 *
036 */
037public class HadoopTupleInputStream extends TupleInputStream
038  {
039  private static final Map<Class, TupleElementReader> staticTupleUnTypedElementReaders = new IdentityHashMap<>();
040  private static final Map<Class, TupleElementReader> staticTupleTypedElementReaders = new IdentityHashMap<>();
041
042  static
043    {
044    // typed
045
046    staticTupleTypedElementReaders.put( Void.class, new TupleElementReader<HadoopTupleInputStream>()
047    {
048    @Override
049    public Object read( HadoopTupleInputStream stream ) throws IOException
050      {
051      return null;
052      }
053    } );
054
055    staticTupleTypedElementReaders.put( String.class, new TupleElementReader<HadoopTupleInputStream>()
056    {
057    @Override
058    public Object read( HadoopTupleInputStream stream ) throws IOException
059      {
060      return stream.readString();
061      }
062    } );
063
064    staticTupleTypedElementReaders.put( Float.class, new TupleElementReader<HadoopTupleInputStream>()
065    {
066    @Override
067    public Object read( HadoopTupleInputStream stream ) throws IOException
068      {
069      return stream.readNullFloat();
070      }
071    } );
072
073    staticTupleTypedElementReaders.put( Double.class, new TupleElementReader<HadoopTupleInputStream>()
074    {
075    @Override
076    public Object read( HadoopTupleInputStream stream ) throws IOException
077      {
078      return stream.readNullDouble();
079      }
080    } );
081
082    staticTupleTypedElementReaders.put( Integer.class, new TupleElementReader<HadoopTupleInputStream>()
083    {
084    @Override
085    public Object read( HadoopTupleInputStream stream ) throws IOException
086      {
087      return stream.readNullVInt();
088      }
089    } );
090
091    staticTupleTypedElementReaders.put( Long.class, new TupleElementReader<HadoopTupleInputStream>()
092    {
093    @Override
094    public Object read( HadoopTupleInputStream stream ) throws IOException
095      {
096      return stream.readNullVLong();
097      }
098    } );
099
100    staticTupleTypedElementReaders.put( Boolean.class, new TupleElementReader<HadoopTupleInputStream>()
101    {
102    @Override
103    public Object read( HadoopTupleInputStream stream ) throws IOException
104      {
105      return stream.readNullBoolean();
106      }
107    } );
108
109    staticTupleTypedElementReaders.put( Short.class, new TupleElementReader<HadoopTupleInputStream>()
110    {
111    @Override
112    public Object read( HadoopTupleInputStream stream ) throws IOException
113      {
114      return stream.readNullShort();
115      }
116    } );
117
118    staticTupleTypedElementReaders.put( Float.TYPE, new TupleElementReader<HadoopTupleInputStream>()
119    {
120    @Override
121    public Object read( HadoopTupleInputStream stream ) throws IOException
122      {
123      return stream.readFloat();
124      }
125    } );
126
127    staticTupleTypedElementReaders.put( Double.TYPE, new TupleElementReader<HadoopTupleInputStream>()
128    {
129    @Override
130    public Object read( HadoopTupleInputStream stream ) throws IOException
131      {
132      return stream.readDouble();
133      }
134    } );
135
136    staticTupleTypedElementReaders.put( Integer.TYPE, new TupleElementReader<HadoopTupleInputStream>()
137    {
138    @Override
139    public Object read( HadoopTupleInputStream stream ) throws IOException
140      {
141      return stream.readVInt();
142      }
143    } );
144
145    staticTupleTypedElementReaders.put( Long.TYPE, new TupleElementReader<HadoopTupleInputStream>()
146    {
147    @Override
148    public Object read( HadoopTupleInputStream stream ) throws IOException
149      {
150      return stream.readVLong();
151      }
152    } );
153
154    staticTupleTypedElementReaders.put( Boolean.TYPE, new TupleElementReader<HadoopTupleInputStream>()
155    {
156    @Override
157    public Object read( HadoopTupleInputStream stream ) throws IOException
158      {
159      return stream.readBoolean();
160      }
161    } );
162
163    staticTupleTypedElementReaders.put( Short.TYPE, new TupleElementReader<HadoopTupleInputStream>()
164    {
165    @Override
166    public Object read( HadoopTupleInputStream stream ) throws IOException
167      {
168      return stream.readShort();
169      }
170    } );
171
172    staticTupleTypedElementReaders.put( Tuple.class, new TupleElementReader<HadoopTupleInputStream>()
173    {
174    @Override
175    public Object read( HadoopTupleInputStream stream ) throws IOException
176      {
177      return stream.readTuple();
178      }
179    } );
180
181    staticTupleTypedElementReaders.put( TuplePair.class, new TupleElementReader<HadoopTupleInputStream>()
182    {
183    @Override
184    public Object read( HadoopTupleInputStream stream ) throws IOException
185      {
186      return stream.readTuplePair();
187      }
188    } );
189
190    staticTupleTypedElementReaders.put( IndexTuple.class, new TupleElementReader<HadoopTupleInputStream>()
191    {
192    @Override
193    public Object read( HadoopTupleInputStream stream ) throws IOException
194      {
195      return stream.readIndexTuple();
196      }
197    } );
198    }
199
200  public static TupleElementReader[] getReadersFor( final ElementReader elementReader, final Class[] classes )
201    {
202    if( classes == null || classes.length == 0 )
203      return null;
204
205    TupleElementReader[] readers = new TupleElementReader[ classes.length ];
206
207    for( int i = 0; i < classes.length; i++ )
208      {
209      TupleElementReader reader = staticTupleTypedElementReaders.get( classes[ i ] );
210
211      if( reader != null )
212        {
213        readers[ i ] = reader;
214        }
215      else
216        {
217        final int index = i;
218        readers[ i ] = new TupleElementReader()
219        {
220        @Override
221        public Object read( TupleInputStream stream ) throws IOException
222          {
223          return elementReader.read( classes[ index ], stream );
224          }
225        };
226        }
227      }
228
229    return readers;
230    }
231
232  public HadoopTupleInputStream( InputStream inputStream, ElementReader elementReader )
233    {
234    super( inputStream, elementReader );
235    }
236
237  public int getNumElements() throws IOException
238    {
239    return readVInt();
240    }
241
242  public int readToken() throws IOException
243    {
244    return readVInt();
245    }
246
247  public Object getNextElement() throws IOException
248    {
249    return readType( readToken() );
250    }
251
252  public IndexTuple readIndexTuple( IndexTuple tuple ) throws IOException
253    {
254    tuple.setIndex( readVInt() );
255    tuple.setTuple( readTuple() );
256
257    return tuple;
258    }
259
260  public Long readNullVLong() throws IOException
261    {
262    byte b = this.readByte();
263
264    if( b == 0 )
265      return null;
266
267    return WritableUtils.readVLong( this );
268    }
269
270  public long readVLong() throws IOException
271    {
272    return WritableUtils.readVLong( this );
273    }
274
275  public Integer readNullVInt() throws IOException
276    {
277    byte b = this.readByte();
278
279    if( b == 0 )
280      return null;
281
282    return WritableUtils.readVInt( this );
283    }
284
285  public int readVInt() throws IOException
286    {
287    return WritableUtils.readVInt( this );
288    }
289
290  public String readString() throws IOException
291    {
292    return WritableUtils.readString( this );
293    }
294
295  private Short readNullShort() throws IOException
296    {
297    byte b = this.readByte();
298
299    if( b == 0 )
300      return null;
301
302    return readShort();
303    }
304
305  private Object readNullBoolean() throws IOException
306    {
307    byte b = this.readByte();
308
309    if( b == 0 )
310      return null;
311
312    return readBoolean();
313    }
314
315  private Object readNullDouble() throws IOException
316    {
317    byte b = this.readByte();
318
319    if( b == 0 )
320      return null;
321
322    return readDouble();
323    }
324
325  private Object readNullFloat() throws IOException
326    {
327    byte b = this.readByte();
328
329    if( b == 0 )
330      return null;
331
332    return readFloat();
333    }
334
335  protected final Object readType( int type ) throws IOException
336    {
337    switch( type )
338      {
339      case 0:
340        return null;
341      case 1:
342        return readString();
343      case 2:
344        return readFloat();
345      case 3:
346        return readDouble();
347      case 4:
348        return readVInt();
349      case 5:
350        return readVLong();
351      case 6:
352        return readBoolean();
353      case 7:
354        return readShort();
355      case 8:
356        return readTuple();
357      case 9:
358        return readTuplePair();
359      case 10:
360        return readIndexTuple();
361      default:
362        return elementReader.read( type, this );
363      }
364    }
365
366  public final Object readType( Class type ) throws IOException
367    {
368    if( type == Void.class )
369      return null;
370
371    if( type == String.class )
372      return readString();
373
374    if( type == Float.class )
375      return readNullFloat();
376    if( type == Double.class )
377      return readNullDouble();
378    if( type == Integer.class )
379      return readNullVInt();
380    if( type == Long.class )
381      return readNullVLong();
382    if( type == Boolean.class )
383      return readNullBoolean();
384    if( type == Short.class )
385      return readNullShort();
386
387    if( type == Float.TYPE )
388      return readFloat();
389    if( type == Double.TYPE )
390      return readDouble();
391    if( type == Integer.TYPE )
392      return readVInt();
393    if( type == Long.TYPE )
394      return readVLong();
395    if( type == Boolean.TYPE )
396      return readBoolean();
397    if( type == Short.TYPE )
398      return readShort();
399
400    if( type == Tuple.class )
401      return readTuple();
402    if( type == TuplePair.class )
403      return readTuplePair();
404    if( type == IndexTuple.class )
405      return readIndexTuple();
406    else
407      return elementReader.read( type, this );
408    }
409  }