001/*
002 * Copyright (c) 2016-2017 Chris K Wensel. All Rights Reserved.
003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
004 *
005 * Project and contact information: http://www.cascading.org/
006 *
007 * This file is part of the Cascading project.
008 *
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *     http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 */
021
022package cascading.tuple.hadoop.io;
023
024import java.io.IOException;
025import java.io.OutputStream;
026import java.util.IdentityHashMap;
027import java.util.Map;
028
029import cascading.tuple.Tuple;
030import cascading.tuple.io.IndexTuple;
031import cascading.tuple.io.TupleOutputStream;
032import cascading.tuple.io.TuplePair;
033import org.apache.hadoop.io.WritableUtils;
034
035/**
036 *
037 */
038public class HadoopTupleOutputStream extends TupleOutputStream
039  {
040  /** Field WRITABLE_TOKEN */
041  public static final int WRITABLE_TOKEN = 32;
042
043  private static final Map<Class, TupleElementWriter> staticTupleUnTypedElementWriters = new IdentityHashMap<>();
044  private static final Map<Class, TupleElementWriter> staticTupleTypedElementWriters = new IdentityHashMap<>();
045
046  static
047    {
048    // untyped
049
050    staticTupleUnTypedElementWriters.put( String.class, ( stream, element ) ->
051    {
052    WritableUtils.writeVInt( stream, 1 );
053    WritableUtils.writeString( stream, (String) element );
054    } );
055
056    staticTupleUnTypedElementWriters.put( Float.class, ( stream, element ) ->
057    {
058    WritableUtils.writeVInt( stream, 2 );
059    stream.writeFloat( (Float) element );
060    } );
061
062    staticTupleUnTypedElementWriters.put( Double.class, ( stream, element ) ->
063    {
064    WritableUtils.writeVInt( stream, 3 );
065    stream.writeDouble( (Double) element );
066    } );
067
068    staticTupleUnTypedElementWriters.put( Integer.class, ( stream, element ) ->
069    {
070    WritableUtils.writeVInt( stream, 4 );
071    WritableUtils.writeVInt( stream, (Integer) element );
072    } );
073
074    staticTupleUnTypedElementWriters.put( Long.class, ( stream, element ) ->
075    {
076    WritableUtils.writeVInt( stream, 5 );
077    WritableUtils.writeVLong( stream, (Long) element );
078    } );
079
080    staticTupleUnTypedElementWriters.put( Boolean.class, ( stream, element ) ->
081    {
082    WritableUtils.writeVInt( stream, 6 );
083    stream.writeBoolean( (Boolean) element );
084    } );
085
086    staticTupleUnTypedElementWriters.put( Short.class, ( stream, element ) ->
087    {
088    WritableUtils.writeVInt( stream, 7 );
089    stream.writeShort( (Short) element );
090    } );
091
092    staticTupleUnTypedElementWriters.put( Tuple.class, ( stream, element ) ->
093    {
094    WritableUtils.writeVInt( stream, 8 );
095    stream.writeTuple( (Tuple) element );
096    } );
097
098    staticTupleUnTypedElementWriters.put( TuplePair.class, ( stream, element ) ->
099    {
100    WritableUtils.writeVInt( stream, 9 );
101    stream.writeTuplePair( (TuplePair) element );
102    } );
103
104    staticTupleUnTypedElementWriters.put( IndexTuple.class, ( stream, element ) ->
105    {
106    WritableUtils.writeVInt( stream, 10 );
107    stream.writeIndexTuple( (IndexTuple) element );
108    } );
109
110    // typed
111
112    staticTupleTypedElementWriters.put( Void.class, ( stream, element ) ->
113    {
114    // do nothing
115    } );
116
117    staticTupleTypedElementWriters.put( String.class, ( stream, element ) -> WritableUtils.writeString( stream, (String) element ) );
118
119    staticTupleTypedElementWriters.put( Float.class, ( stream, element ) ->
120    {
121    if( element == null )
122      {
123      stream.writeByte( 0 );
124      return;
125      }
126
127    stream.writeByte( 1 );
128    stream.writeFloat( (Float) element );
129    } );
130
131    staticTupleTypedElementWriters.put( Double.class, ( stream, element ) ->
132    {
133    if( element == null )
134      {
135      stream.writeByte( 0 );
136      return;
137      }
138
139    stream.writeByte( 1 );
140    stream.writeDouble( (Double) element );
141    } );
142
143    staticTupleTypedElementWriters.put( Integer.class, ( stream, element ) ->
144    {
145    if( element == null )
146      {
147      stream.writeByte( 0 );
148      return;
149      }
150
151    stream.writeByte( 1 );
152    WritableUtils.writeVInt( stream, (Integer) element );
153    } );
154
155    staticTupleTypedElementWriters.put( Long.class, ( stream, element ) ->
156    {
157    if( element == null )
158      {
159      stream.writeByte( 0 );
160      return;
161      }
162
163    stream.writeByte( 1 );
164    WritableUtils.writeVLong( stream, (Long) element );
165    } );
166
167    staticTupleTypedElementWriters.put( Boolean.class, ( stream, element ) ->
168    {
169    if( element == null )
170      {
171      stream.writeByte( 0 );
172      return;
173      }
174
175    stream.writeByte( 1 );
176    stream.writeBoolean( (Boolean) element );
177    } );
178
179    staticTupleTypedElementWriters.put( Short.class, ( stream, element ) ->
180    {
181    if( element == null )
182      {
183      stream.writeByte( 0 );
184      return;
185      }
186
187    stream.writeByte( 1 );
188    stream.writeShort( (Short) element );
189    } );
190
191    staticTupleTypedElementWriters.put( Float.TYPE, ( stream, element ) ->
192    {
193    if( element == null )
194      stream.writeFloat( 0 );
195    else
196      stream.writeFloat( (Float) element );
197    } );
198
199    staticTupleTypedElementWriters.put( Double.TYPE, ( stream, element ) ->
200    {
201    if( element == null )
202      stream.writeDouble( 0 );
203    else
204      stream.writeDouble( (Double) element );
205    } );
206
207    staticTupleTypedElementWriters.put( Integer.TYPE, ( stream, element ) ->
208    {
209    if( element == null )
210      WritableUtils.writeVInt( stream, 0 );
211    else
212      WritableUtils.writeVInt( stream, (Integer) element );
213    } );
214
215    staticTupleTypedElementWriters.put( Long.TYPE, ( stream, element ) ->
216    {
217    if( element == null )
218      WritableUtils.writeVLong( stream, 0 );
219    else
220      WritableUtils.writeVLong( stream, (Long) element );
221    } );
222
223    staticTupleTypedElementWriters.put( Boolean.TYPE, ( stream, element ) ->
224    {
225    if( element == null )
226      stream.writeBoolean( false );
227    else
228      stream.writeBoolean( (Boolean) element );
229    } );
230
231    staticTupleTypedElementWriters.put( Short.TYPE, ( stream, element ) ->
232    {
233    if( element == null )
234      stream.writeShort( 0 );
235    else
236      stream.writeShort( (Short) element );
237    } );
238
239    staticTupleTypedElementWriters.put( Tuple.class, ( stream, element ) -> stream.writeTuple( (Tuple) element ) );
240
241    staticTupleTypedElementWriters.put( TuplePair.class, ( stream, element ) -> stream.writeTuplePair( (TuplePair) element ) );
242
243    staticTupleTypedElementWriters.put( IndexTuple.class, ( stream, element ) -> stream.writeIndexTuple( (IndexTuple) element ) );
244    }
245
246  public static TupleElementWriter[] getWritersFor( final ElementWriter elementWriter, final Class[] keyClasses )
247    {
248    if( keyClasses == null || keyClasses.length == 0 )
249      return null;
250
251    TupleElementWriter[] writers = new TupleElementWriter[ keyClasses.length ];
252
253    for( int i = 0; i < keyClasses.length; i++ )
254      {
255      TupleElementWriter writer = staticTupleTypedElementWriters.get( keyClasses[ i ] );
256
257      if( writer != null )
258        {
259        writers[ i ] = writer;
260        }
261      else
262        {
263        final int index = i;
264        writers[ i ] = ( stream, element ) -> elementWriter.write( stream, keyClasses[ index ], element );
265        }
266      }
267
268    return writers;
269    }
270
271  public HadoopTupleOutputStream( OutputStream outputStream, ElementWriter elementWriter )
272    {
273    super( staticTupleUnTypedElementWriters, staticTupleTypedElementWriters, outputStream, elementWriter );
274    }
275
276  @Override
277  protected void writeIntInternal( int value ) throws IOException
278    {
279    WritableUtils.writeVInt( this, value );
280    }
281
282  public void writeIndexTuple( IndexTuple indexTuple ) throws IOException
283    {
284    writeIntInternal( indexTuple.getIndex() );
285    writeTuple( indexTuple.getTuple() );
286    }
287  }