001/* 002 * Copyright (c) 2016-2017 Chris K Wensel. All Rights Reserved. 003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 004 * 005 * Project and contact information: http://www.cascading.org/ 006 * 007 * This file is part of the Cascading project. 008 * 009 * Licensed under the Apache License, Version 2.0 (the "License"); 010 * you may not use this file except in compliance with the License. 011 * You may obtain a copy of the License at 012 * 013 * http://www.apache.org/licenses/LICENSE-2.0 014 * 015 * Unless required by applicable law or agreed to in writing, software 016 * distributed under the License is distributed on an "AS IS" BASIS, 017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 018 * See the License for the specific language governing permissions and 019 * limitations under the License. 020 */ 021 022package cascading.tuple.hadoop.io; 023 024import java.io.IOException; 025import java.io.OutputStream; 026import java.util.IdentityHashMap; 027import java.util.Map; 028 029import cascading.tuple.Tuple; 030import cascading.tuple.io.IndexTuple; 031import cascading.tuple.io.TupleOutputStream; 032import cascading.tuple.io.TuplePair; 033import org.apache.hadoop.io.WritableUtils; 034 035/** 036 * 037 */ 038public class HadoopTupleOutputStream extends TupleOutputStream 039 { 040 /** Field WRITABLE_TOKEN */ 041 public static final int WRITABLE_TOKEN = 32; 042 043 private static final Map<Class, TupleElementWriter> staticTupleUnTypedElementWriters = new IdentityHashMap<>(); 044 private static final Map<Class, TupleElementWriter> staticTupleTypedElementWriters = new IdentityHashMap<>(); 045 046 static 047 { 048 // untyped 049 050 staticTupleUnTypedElementWriters.put( String.class, ( stream, element ) -> 051 { 052 WritableUtils.writeVInt( stream, 1 ); 053 WritableUtils.writeString( stream, (String) element ); 054 } ); 055 056 staticTupleUnTypedElementWriters.put( Float.class, ( stream, element ) -> 057 { 058 WritableUtils.writeVInt( stream, 2 ); 059 stream.writeFloat( (Float) element ); 060 } ); 061 062 staticTupleUnTypedElementWriters.put( Double.class, ( stream, element ) -> 063 { 064 WritableUtils.writeVInt( stream, 3 ); 065 stream.writeDouble( (Double) element ); 066 } ); 067 068 staticTupleUnTypedElementWriters.put( Integer.class, ( stream, element ) -> 069 { 070 WritableUtils.writeVInt( stream, 4 ); 071 WritableUtils.writeVInt( stream, (Integer) element ); 072 } ); 073 074 staticTupleUnTypedElementWriters.put( Long.class, ( stream, element ) -> 075 { 076 WritableUtils.writeVInt( stream, 5 ); 077 WritableUtils.writeVLong( stream, (Long) element ); 078 } ); 079 080 staticTupleUnTypedElementWriters.put( Boolean.class, ( stream, element ) -> 081 { 082 WritableUtils.writeVInt( stream, 6 ); 083 stream.writeBoolean( (Boolean) element ); 084 } ); 085 086 staticTupleUnTypedElementWriters.put( Short.class, ( stream, element ) -> 087 { 088 WritableUtils.writeVInt( stream, 7 ); 089 stream.writeShort( (Short) element ); 090 } ); 091 092 staticTupleUnTypedElementWriters.put( Tuple.class, ( stream, element ) -> 093 { 094 WritableUtils.writeVInt( stream, 8 ); 095 stream.writeTuple( (Tuple) element ); 096 } ); 097 098 staticTupleUnTypedElementWriters.put( TuplePair.class, ( stream, element ) -> 099 { 100 WritableUtils.writeVInt( stream, 9 ); 101 stream.writeTuplePair( (TuplePair) element ); 102 } ); 103 104 staticTupleUnTypedElementWriters.put( IndexTuple.class, ( stream, element ) -> 105 { 106 WritableUtils.writeVInt( stream, 10 ); 107 stream.writeIndexTuple( (IndexTuple) element ); 108 } ); 109 110 // typed 111 112 staticTupleTypedElementWriters.put( Void.class, ( stream, element ) -> 113 { 114 // do nothing 115 } ); 116 117 staticTupleTypedElementWriters.put( String.class, ( stream, element ) -> WritableUtils.writeString( stream, (String) element ) ); 118 119 staticTupleTypedElementWriters.put( Float.class, ( stream, element ) -> 120 { 121 if( element == null ) 122 { 123 stream.writeByte( 0 ); 124 return; 125 } 126 127 stream.writeByte( 1 ); 128 stream.writeFloat( (Float) element ); 129 } ); 130 131 staticTupleTypedElementWriters.put( Double.class, ( stream, element ) -> 132 { 133 if( element == null ) 134 { 135 stream.writeByte( 0 ); 136 return; 137 } 138 139 stream.writeByte( 1 ); 140 stream.writeDouble( (Double) element ); 141 } ); 142 143 staticTupleTypedElementWriters.put( Integer.class, ( stream, element ) -> 144 { 145 if( element == null ) 146 { 147 stream.writeByte( 0 ); 148 return; 149 } 150 151 stream.writeByte( 1 ); 152 WritableUtils.writeVInt( stream, (Integer) element ); 153 } ); 154 155 staticTupleTypedElementWriters.put( Long.class, ( stream, element ) -> 156 { 157 if( element == null ) 158 { 159 stream.writeByte( 0 ); 160 return; 161 } 162 163 stream.writeByte( 1 ); 164 WritableUtils.writeVLong( stream, (Long) element ); 165 } ); 166 167 staticTupleTypedElementWriters.put( Boolean.class, ( stream, element ) -> 168 { 169 if( element == null ) 170 { 171 stream.writeByte( 0 ); 172 return; 173 } 174 175 stream.writeByte( 1 ); 176 stream.writeBoolean( (Boolean) element ); 177 } ); 178 179 staticTupleTypedElementWriters.put( Short.class, ( stream, element ) -> 180 { 181 if( element == null ) 182 { 183 stream.writeByte( 0 ); 184 return; 185 } 186 187 stream.writeByte( 1 ); 188 stream.writeShort( (Short) element ); 189 } ); 190 191 staticTupleTypedElementWriters.put( Float.TYPE, ( stream, element ) -> 192 { 193 if( element == null ) 194 stream.writeFloat( 0 ); 195 else 196 stream.writeFloat( (Float) element ); 197 } ); 198 199 staticTupleTypedElementWriters.put( Double.TYPE, ( stream, element ) -> 200 { 201 if( element == null ) 202 stream.writeDouble( 0 ); 203 else 204 stream.writeDouble( (Double) element ); 205 } ); 206 207 staticTupleTypedElementWriters.put( Integer.TYPE, ( stream, element ) -> 208 { 209 if( element == null ) 210 WritableUtils.writeVInt( stream, 0 ); 211 else 212 WritableUtils.writeVInt( stream, (Integer) element ); 213 } ); 214 215 staticTupleTypedElementWriters.put( Long.TYPE, ( stream, element ) -> 216 { 217 if( element == null ) 218 WritableUtils.writeVLong( stream, 0 ); 219 else 220 WritableUtils.writeVLong( stream, (Long) element ); 221 } ); 222 223 staticTupleTypedElementWriters.put( Boolean.TYPE, ( stream, element ) -> 224 { 225 if( element == null ) 226 stream.writeBoolean( false ); 227 else 228 stream.writeBoolean( (Boolean) element ); 229 } ); 230 231 staticTupleTypedElementWriters.put( Short.TYPE, ( stream, element ) -> 232 { 233 if( element == null ) 234 stream.writeShort( 0 ); 235 else 236 stream.writeShort( (Short) element ); 237 } ); 238 239 staticTupleTypedElementWriters.put( Tuple.class, ( stream, element ) -> stream.writeTuple( (Tuple) element ) ); 240 241 staticTupleTypedElementWriters.put( TuplePair.class, ( stream, element ) -> stream.writeTuplePair( (TuplePair) element ) ); 242 243 staticTupleTypedElementWriters.put( IndexTuple.class, ( stream, element ) -> stream.writeIndexTuple( (IndexTuple) element ) ); 244 } 245 246 public static TupleElementWriter[] getWritersFor( final ElementWriter elementWriter, final Class[] keyClasses ) 247 { 248 if( keyClasses == null || keyClasses.length == 0 ) 249 return null; 250 251 TupleElementWriter[] writers = new TupleElementWriter[ keyClasses.length ]; 252 253 for( int i = 0; i < keyClasses.length; i++ ) 254 { 255 TupleElementWriter writer = staticTupleTypedElementWriters.get( keyClasses[ i ] ); 256 257 if( writer != null ) 258 { 259 writers[ i ] = writer; 260 } 261 else 262 { 263 final int index = i; 264 writers[ i ] = ( stream, element ) -> elementWriter.write( stream, keyClasses[ index ], element ); 265 } 266 } 267 268 return writers; 269 } 270 271 public HadoopTupleOutputStream( OutputStream outputStream, ElementWriter elementWriter ) 272 { 273 super( staticTupleUnTypedElementWriters, staticTupleTypedElementWriters, outputStream, elementWriter ); 274 } 275 276 @Override 277 protected void writeIntInternal( int value ) throws IOException 278 { 279 WritableUtils.writeVInt( this, value ); 280 } 281 282 public void writeIndexTuple( IndexTuple indexTuple ) throws IOException 283 { 284 writeIntInternal( indexTuple.getIndex() ); 285 writeTuple( indexTuple.getTuple() ); 286 } 287 }