001/*
002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.tuple.hadoop;
022
023import java.io.DataInputStream;
024import java.io.DataOutputStream;
025import java.io.IOException;
026import java.io.InputStream;
027import java.io.OutputStream;
028import java.math.BigDecimal;
029import java.math.BigInteger;
030
031import org.apache.hadoop.conf.Configured;
032import org.apache.hadoop.io.serializer.Deserializer;
033import org.apache.hadoop.io.serializer.Serialization;
034import org.apache.hadoop.io.serializer.Serializer;
035
036/**
037 * Class BigDecimalSerialization is an implementation of Hadoop's {@link org.apache.hadoop.io.serializer.Serialization} interface for use
038 * by {@link BigDecimal} instances.
039 * <p/>
040 * To use, call<br/>
041 * {@code TupleSerializationProps.addSerialization(properties, BigDecimalSerialization.class.getName());}
042 * <p/>
043 *
044 * @see cascading.tuple.hadoop.TupleSerializationProps#addSerialization(java.util.Map, String)
045 */
046@SerializationToken(tokens = {125}, classNames = {"java.math.BigDecimal"})
047public class BigDecimalSerialization extends Configured implements Serialization<BigDecimal>
048  {
049  public static class BigDecimalDeserializer implements Deserializer<BigDecimal>
050    {
051    private DataInputStream in;
052
053    @Override
054    public void open( InputStream in ) throws IOException
055      {
056      if( in instanceof DataInputStream )
057        this.in = (DataInputStream) in;
058      else
059        this.in = new DataInputStream( in );
060      }
061
062    @Override
063    public BigDecimal deserialize( BigDecimal existing ) throws IOException
064      {
065      int len = in.readInt();
066      byte[] valueBytes = new byte[ len ];
067
068      in.readFully( valueBytes );
069
070      BigInteger value = new BigInteger( valueBytes );
071
072      return new BigDecimal( value, in.readInt() );
073      }
074
075    @Override
076    public void close() throws IOException
077      {
078      in.close();
079      }
080    }
081
082  public static class BigDecimalSerializer implements Serializer<BigDecimal>
083    {
084    private DataOutputStream out;
085
086    @Override
087    public void open( OutputStream out ) throws IOException
088      {
089      if( out instanceof DataOutputStream )
090        this.out = (DataOutputStream) out;
091      else
092        this.out = new DataOutputStream( out );
093      }
094
095    @Override
096    public void serialize( BigDecimal bigDecimal ) throws IOException
097      {
098      BigInteger value = bigDecimal.unscaledValue();
099      byte[] valueBytes = value.toByteArray();
100
101      out.writeInt( valueBytes.length );
102      out.write( valueBytes );
103      out.writeInt( bigDecimal.scale() );
104      }
105
106    @Override
107    public void close() throws IOException
108      {
109      out.close();
110      }
111    }
112
113  public BigDecimalSerialization()
114    {
115    }
116
117  @Override
118  public boolean accept( Class<?> c )
119    {
120    return BigDecimal.class == c;
121    }
122
123  @Override
124  public Serializer<BigDecimal> getSerializer( Class<BigDecimal> c )
125    {
126    return new BigDecimalSerializer();
127    }
128
129  @Override
130  public Deserializer<BigDecimal> getDeserializer( Class<BigDecimal> c )
131    {
132    return new BigDecimalDeserializer();
133    }
134  }