001/* 002 * Copyright (c) 2016-2017 Chris K Wensel <chris@wensel.net>. All Rights Reserved. 003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 004 * 005 * Project and contact information: http://www.cascading.org/ 006 * 007 * This file is part of the Cascading project. 008 * 009 * Licensed under the Apache License, Version 2.0 (the "License"); 010 * you may not use this file except in compliance with the License. 011 * You may obtain a copy of the License at 012 * 013 * http://www.apache.org/licenses/LICENSE-2.0 014 * 015 * Unless required by applicable law or agreed to in writing, software 016 * distributed under the License is distributed on an "AS IS" BASIS, 017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 018 * See the License for the specific language governing permissions and 019 * limitations under the License. 020 */ 021 022package cascading.tuple.hadoop; 023 024import java.io.DataInputStream; 025import java.io.DataOutputStream; 026import java.io.IOException; 027import java.io.InputStream; 028import java.io.OutputStream; 029import java.util.Comparator; 030 031import cascading.tuple.Comparison; 032import cascading.tuple.hadoop.util.BytesComparator; 033import org.apache.hadoop.conf.Configured; 034import org.apache.hadoop.io.serializer.Deserializer; 035import org.apache.hadoop.io.serializer.Serialization; 036import org.apache.hadoop.io.serializer.Serializer; 037 038/** 039 * Class BytesSerialization is an implementation of Hadoop's {@link Serialization} interface for use 040 * by {@code byte} arrays ({@code byte[]}). 041 * <p> 042 * To use, call<br> 043 * {@code TupleSerializationProps.addSerialization(properties, BytesSerialization.class.getName() );} 044 * <p> 045 * This class also implements {@link Comparison} so it is not required to set a {@link cascading.tuple.hadoop.util.BytesComparator} 046 * when attempting to group on a byte array via GroupBy or CoGroup. 047 * 048 * @see TupleSerializationProps#addSerialization(java.util.Map, String) 049 * @see cascading.tuple.hadoop.util.BytesComparator 050 * @see Comparison 051 */ 052@SerializationToken(tokens = {126}, classNames = {"[B"}) 053public class BytesSerialization extends Configured implements Comparison<byte[]>, Serialization<byte[]> 054 { 055 public static class RawBytesDeserializer implements Deserializer<byte[]> 056 { 057 private DataInputStream in; 058 059 @Override 060 public void open( InputStream in ) throws IOException 061 { 062 if( in instanceof DataInputStream ) 063 this.in = (DataInputStream) in; 064 else 065 this.in = new DataInputStream( in ); 066 } 067 068 @Override 069 public byte[] deserialize( byte[] existing ) throws IOException 070 { 071 int len = in.readInt(); 072 073 byte[] bytes = existing != null && existing.length == len ? existing : new byte[ len ]; 074 075 in.readFully( bytes ); 076 077 return bytes; 078 } 079 080 @Override 081 public void close() throws IOException 082 { 083 in.close(); 084 } 085 } 086 087 public static class RawBytesSerializer implements Serializer<byte[]> 088 { 089 private DataOutputStream out; 090 091 @Override 092 public void open( OutputStream out ) throws IOException 093 { 094 if( out instanceof DataOutputStream ) 095 this.out = (DataOutputStream) out; 096 else 097 this.out = new DataOutputStream( out ); 098 } 099 100 @Override 101 public void serialize( byte[] bytes ) throws IOException 102 { 103 out.writeInt( bytes.length ); 104 out.write( bytes ); 105 } 106 107 @Override 108 public void close() throws IOException 109 { 110 out.close(); 111 } 112 } 113 114 public BytesSerialization() 115 { 116 } 117 118 @Override 119 public boolean accept( Class<?> c ) 120 { 121 return byte[].class == c; 122 } 123 124 @Override 125 public Serializer<byte[]> getSerializer( Class<byte[]> c ) 126 { 127 return new RawBytesSerializer(); 128 } 129 130 @Override 131 public Deserializer<byte[]> getDeserializer( Class<byte[]> c ) 132 { 133 return new RawBytesDeserializer(); 134 } 135 136 @Override 137 public Comparator<byte[]> getComparator( Class<byte[]> type ) 138 { 139 return new BytesComparator(); 140 } 141 }