001/*
002 * Copyright (c) 2016-2017 Chris K Wensel. All Rights Reserved.
003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
004 *
005 * Project and contact information: http://www.cascading.org/
006 *
007 * This file is part of the Cascading project.
008 *
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *     http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 */
021
022package cascading.tuple.hadoop.io;
023
024import java.io.DataInputStream;
025import java.io.IOException;
026import java.io.InputStream;
027import java.util.IdentityHashMap;
028import java.util.Map;
029
030import cascading.tuple.Tuple;
031import cascading.tuple.io.IndexTuple;
032import cascading.tuple.io.TupleInputStream;
033import cascading.tuple.io.TuplePair;
034import org.apache.hadoop.io.WritableUtils;
035
036/**
037 *
038 */
039public class HadoopTupleInputStream extends TupleInputStream
040  {
041  private static final Map<Class, TupleElementReader> staticTupleUnTypedElementReaders = new IdentityHashMap<>();
042  private static final Map<Class, TupleElementReader> staticTupleTypedElementReaders = new IdentityHashMap<>();
043
044  static
045    {
046    // typed
047
048    staticTupleTypedElementReaders.put( Void.class, (TupleElementReader<HadoopTupleInputStream>) stream -> null );
049
050    staticTupleTypedElementReaders.put( String.class, (TupleElementReader<HadoopTupleInputStream>) HadoopTupleInputStream::readString );
051
052    staticTupleTypedElementReaders.put( Float.class, (TupleElementReader<HadoopTupleInputStream>) HadoopTupleInputStream::readNullFloat );
053
054    staticTupleTypedElementReaders.put( Double.class, (TupleElementReader<HadoopTupleInputStream>) HadoopTupleInputStream::readNullDouble );
055
056    staticTupleTypedElementReaders.put( Integer.class, (TupleElementReader<HadoopTupleInputStream>) HadoopTupleInputStream::readNullVInt );
057
058    staticTupleTypedElementReaders.put( Long.class, (TupleElementReader<HadoopTupleInputStream>) HadoopTupleInputStream::readNullVLong );
059
060    staticTupleTypedElementReaders.put( Boolean.class, (TupleElementReader<HadoopTupleInputStream>) HadoopTupleInputStream::readNullBoolean );
061
062    staticTupleTypedElementReaders.put( Short.class, (TupleElementReader<HadoopTupleInputStream>) HadoopTupleInputStream::readNullShort );
063
064    staticTupleTypedElementReaders.put( Float.TYPE, (TupleElementReader<HadoopTupleInputStream>) DataInputStream::readFloat );
065
066    staticTupleTypedElementReaders.put( Double.TYPE, (TupleElementReader<HadoopTupleInputStream>) DataInputStream::readDouble );
067
068    staticTupleTypedElementReaders.put( Integer.TYPE, (TupleElementReader<HadoopTupleInputStream>) HadoopTupleInputStream::readVInt );
069
070    staticTupleTypedElementReaders.put( Long.TYPE, (TupleElementReader<HadoopTupleInputStream>) HadoopTupleInputStream::readVLong );
071
072    staticTupleTypedElementReaders.put( Boolean.TYPE, (TupleElementReader<HadoopTupleInputStream>) DataInputStream::readBoolean );
073
074    staticTupleTypedElementReaders.put( Short.TYPE, (TupleElementReader<HadoopTupleInputStream>) DataInputStream::readShort );
075
076    staticTupleTypedElementReaders.put( Tuple.class, (TupleElementReader<HadoopTupleInputStream>) TupleInputStream::readTuple );
077
078    staticTupleTypedElementReaders.put( TuplePair.class, (TupleElementReader<HadoopTupleInputStream>) TupleInputStream::readTuplePair );
079
080    staticTupleTypedElementReaders.put( IndexTuple.class, (TupleElementReader<HadoopTupleInputStream>) TupleInputStream::readIndexTuple );
081    }
082
083  public static TupleElementReader[] getReadersFor( final ElementReader elementReader, final Class[] classes )
084    {
085    if( classes == null || classes.length == 0 )
086      return null;
087
088    TupleElementReader[] readers = new TupleElementReader[ classes.length ];
089
090    for( int i = 0; i < classes.length; i++ )
091      {
092      TupleElementReader reader = staticTupleTypedElementReaders.get( classes[ i ] );
093
094      if( reader != null )
095        {
096        readers[ i ] = reader;
097        }
098      else
099        {
100        final int index = i;
101        readers[ i ] = stream -> elementReader.read( classes[ index ], stream );
102        }
103      }
104
105    return readers;
106    }
107
108  public HadoopTupleInputStream( InputStream inputStream, ElementReader elementReader )
109    {
110    super( inputStream, elementReader );
111    }
112
113  public int getNumElements() throws IOException
114    {
115    return readVInt();
116    }
117
118  public int readToken() throws IOException
119    {
120    return readVInt();
121    }
122
123  public Object getNextElement() throws IOException
124    {
125    return readType( readToken() );
126    }
127
128  public IndexTuple readIndexTuple( IndexTuple tuple ) throws IOException
129    {
130    tuple.setIndex( readVInt() );
131    tuple.setTuple( readTuple() );
132
133    return tuple;
134    }
135
136  public Long readNullVLong() throws IOException
137    {
138    byte b = this.readByte();
139
140    if( b == 0 )
141      return null;
142
143    return WritableUtils.readVLong( this );
144    }
145
146  public long readVLong() throws IOException
147    {
148    return WritableUtils.readVLong( this );
149    }
150
151  public Integer readNullVInt() throws IOException
152    {
153    byte b = this.readByte();
154
155    if( b == 0 )
156      return null;
157
158    return WritableUtils.readVInt( this );
159    }
160
161  public int readVInt() throws IOException
162    {
163    return WritableUtils.readVInt( this );
164    }
165
166  public String readString() throws IOException
167    {
168    return WritableUtils.readString( this );
169    }
170
171  private Short readNullShort() throws IOException
172    {
173    byte b = this.readByte();
174
175    if( b == 0 )
176      return null;
177
178    return readShort();
179    }
180
181  private Object readNullBoolean() throws IOException
182    {
183    byte b = this.readByte();
184
185    if( b == 0 )
186      return null;
187
188    return readBoolean();
189    }
190
191  private Object readNullDouble() throws IOException
192    {
193    byte b = this.readByte();
194
195    if( b == 0 )
196      return null;
197
198    return readDouble();
199    }
200
201  private Object readNullFloat() throws IOException
202    {
203    byte b = this.readByte();
204
205    if( b == 0 )
206      return null;
207
208    return readFloat();
209    }
210
211  protected final Object readType( int type ) throws IOException
212    {
213    switch( type )
214      {
215      case 0:
216        return null;
217      case 1:
218        return readString();
219      case 2:
220        return readFloat();
221      case 3:
222        return readDouble();
223      case 4:
224        return readVInt();
225      case 5:
226        return readVLong();
227      case 6:
228        return readBoolean();
229      case 7:
230        return readShort();
231      case 8:
232        return readTuple();
233      case 9:
234        return readTuplePair();
235      case 10:
236        return readIndexTuple();
237      default:
238        return elementReader.read( type, this );
239      }
240    }
241
242  public final Object readType( Class type ) throws IOException
243    {
244    if( type == Void.class )
245      return null;
246
247    if( type == String.class )
248      return readString();
249
250    if( type == Float.class )
251      return readNullFloat();
252    if( type == Double.class )
253      return readNullDouble();
254    if( type == Integer.class )
255      return readNullVInt();
256    if( type == Long.class )
257      return readNullVLong();
258    if( type == Boolean.class )
259      return readNullBoolean();
260    if( type == Short.class )
261      return readNullShort();
262
263    if( type == Float.TYPE )
264      return readFloat();
265    if( type == Double.TYPE )
266      return readDouble();
267    if( type == Integer.TYPE )
268      return readVInt();
269    if( type == Long.TYPE )
270      return readVLong();
271    if( type == Boolean.TYPE )
272      return readBoolean();
273    if( type == Short.TYPE )
274      return readShort();
275
276    if( type == Tuple.class )
277      return readTuple();
278    if( type == TuplePair.class )
279      return readTuplePair();
280    if( type == IndexTuple.class )
281      return readIndexTuple();
282    else
283      return elementReader.read( type, this );
284    }
285  }