001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.tuple.hadoop.io;
022    
023    import java.io.IOException;
024    import java.io.OutputStream;
025    import java.util.IdentityHashMap;
026    import java.util.Map;
027    
028    import cascading.tuple.Tuple;
029    import cascading.tuple.io.IndexTuple;
030    import cascading.tuple.io.TupleOutputStream;
031    import cascading.tuple.io.TuplePair;
032    import org.apache.hadoop.io.WritableUtils;
033    
034    /**
035     *
036     */
037    public class HadoopTupleOutputStream extends TupleOutputStream
038      {
039      /** Field WRITABLE_TOKEN */
040      public static final int WRITABLE_TOKEN = 32;
041    
042      private static final Map<Class, TupleElementWriter> staticTupleElementWriters = new IdentityHashMap<Class, TupleElementWriter>();
043    
044      static
045        {
046        staticTupleElementWriters.put( String.class, new TupleElementWriter()
047        {
048        @Override
049        public void write( TupleOutputStream stream, Object element ) throws IOException
050          {
051          WritableUtils.writeVInt( stream, 1 );
052          WritableUtils.writeString( stream, (String) element );
053          }
054        } );
055    
056        staticTupleElementWriters.put( Float.class, new TupleElementWriter()
057        {
058        @Override
059        public void write( TupleOutputStream stream, Object element ) throws IOException
060          {
061          WritableUtils.writeVInt( stream, 2 );
062          stream.writeFloat( (Float) element );
063          }
064        } );
065    
066        staticTupleElementWriters.put( Double.class, new TupleElementWriter()
067        {
068        @Override
069        public void write( TupleOutputStream stream, Object element ) throws IOException
070          {
071          WritableUtils.writeVInt( stream, 3 );
072          stream.writeDouble( (Double) element );
073          }
074        } );
075    
076        staticTupleElementWriters.put( Integer.class, new TupleElementWriter()
077        {
078        @Override
079        public void write( TupleOutputStream stream, Object element ) throws IOException
080          {
081          WritableUtils.writeVInt( stream, 4 );
082          WritableUtils.writeVInt( stream, (Integer) element );
083          }
084        } );
085    
086        staticTupleElementWriters.put( Long.class, new TupleElementWriter()
087        {
088        @Override
089        public void write( TupleOutputStream stream, Object element ) throws IOException
090          {
091          WritableUtils.writeVInt( stream, 5 );
092          WritableUtils.writeVLong( stream, (Long) element );
093          }
094        } );
095    
096        staticTupleElementWriters.put( Boolean.class, new TupleElementWriter()
097        {
098        @Override
099        public void write( TupleOutputStream stream, Object element ) throws IOException
100          {
101          WritableUtils.writeVInt( stream, 6 );
102          stream.writeBoolean( (Boolean) element );
103          }
104        } );
105    
106        staticTupleElementWriters.put( Short.class, new TupleElementWriter()
107        {
108        @Override
109        public void write( TupleOutputStream stream, Object element ) throws IOException
110          {
111          WritableUtils.writeVInt( stream, 7 );
112          stream.writeShort( (Short) element );
113          }
114        } );
115    
116        staticTupleElementWriters.put( Tuple.class, new TupleElementWriter()
117        {
118        @Override
119        public void write( TupleOutputStream stream, Object element ) throws IOException
120          {
121          WritableUtils.writeVInt( stream, 8 );
122          stream.writeTuple( (Tuple) element );
123          }
124        } );
125    
126        staticTupleElementWriters.put( TuplePair.class, new TupleElementWriter()
127        {
128        @Override
129        public void write( TupleOutputStream stream, Object element ) throws IOException
130          {
131          WritableUtils.writeVInt( stream, 9 );
132          stream.writeTuplePair( (TuplePair) element );
133          }
134        } );
135    
136        staticTupleElementWriters.put( IndexTuple.class, new TupleElementWriter()
137        {
138        @Override
139        public void write( TupleOutputStream stream, Object element ) throws IOException
140          {
141          WritableUtils.writeVInt( stream, 10 );
142          stream.writeIndexTuple( (IndexTuple) element );
143          }
144        } );
145        }
146    
147      public HadoopTupleOutputStream( OutputStream outputStream, ElementWriter elementWriter )
148        {
149        super( staticTupleElementWriters, outputStream, elementWriter );
150        }
151    
152      @Override
153      protected void writeIntInternal( int value ) throws IOException
154        {
155        WritableUtils.writeVInt( this, value );
156        }
157    
158      public void writeIndexTuple( IndexTuple indexTuple ) throws IOException
159        {
160        writeIntInternal( indexTuple.getIndex() );
161        writeTuple( indexTuple.getTuple() );
162        }
163      }