001/*
002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow.hadoop.util;
022
023import java.io.ByteArrayInputStream;
024import java.io.ByteArrayOutputStream;
025import java.io.IOException;
026import java.io.ObjectInputStream;
027import java.io.ObjectOutputStream;
028import java.io.ObjectStreamClass;
029import java.io.Serializable;
030import java.util.ArrayList;
031import java.util.HashMap;
032import java.util.List;
033import java.util.Map;
034import java.util.zip.GZIPInputStream;
035import java.util.zip.GZIPOutputStream;
036
037import cascading.flow.FlowException;
038
039/** Class JavaObjectSerializer is the default implementation of {@link ObjectSerializer}. */
040public class JavaObjectSerializer implements ObjectSerializer
041  {
042  @Override
043  public <T> byte[] serialize( T object, boolean compress ) throws IOException
044    {
045    if( object instanceof Map )
046      return serializeMap( (Map<String, ?>) object, compress );
047
048    if( object instanceof List )
049      return serializeList( (List<?>) object, compress );
050
051    ByteArrayOutputStream bytes = new ByteArrayOutputStream();
052
053    ObjectOutputStream out = new ObjectOutputStream( compress ? new GZIPOutputStream( bytes ) : bytes );
054
055    try
056      {
057      out.writeObject( object );
058      }
059    finally
060      {
061      out.close();
062      }
063
064    return bytes.toByteArray();
065    }
066
067  @Override
068  public <T> T deserialize( byte[] bytes, Class<T> type, boolean decompress ) throws IOException
069    {
070
071    if( Map.class.isAssignableFrom( type ) )
072      return (T) deserializeMap( bytes, decompress );
073
074    if( List.class.isAssignableFrom( type ) )
075      {
076      return (T) deserializeList( bytes, decompress );
077      }
078
079    ObjectInputStream in = null;
080
081    try
082      {
083      ByteArrayInputStream byteStream = new ByteArrayInputStream( bytes );
084
085      in = new ObjectInputStream( decompress ? new GZIPInputStream( byteStream ) : byteStream )
086      {
087      @Override
088      protected Class<?> resolveClass( ObjectStreamClass desc ) throws IOException, ClassNotFoundException
089        {
090        try
091          {
092          return Class.forName( desc.getName(), false, Thread.currentThread().getContextClassLoader() );
093          }
094        catch( ClassNotFoundException exception )
095          {
096          return super.resolveClass( desc );
097          }
098        }
099      };
100
101      return (T) in.readObject();
102      }
103    catch( ClassNotFoundException exception )
104      {
105      throw new FlowException( "unable to deserialize data", exception );
106      }
107    finally
108      {
109      if( in != null )
110        in.close();
111      }
112    }
113
114  @Override
115  public <T> boolean accepts( Class<T> type )
116    {
117    return Serializable.class.isAssignableFrom( type )
118      || Map.class.isAssignableFrom( type )
119      || List.class.isAssignableFrom( type );
120    }
121
122  public <T> byte[] serializeMap( Map<String, T> map, boolean compress ) throws IOException
123    {
124    ByteArrayOutputStream bytes = new ByteArrayOutputStream();
125    ObjectOutputStream out = new ObjectOutputStream( compress ? new GZIPOutputStream( bytes ) : bytes );
126
127    Class<T> tClass;
128
129    if( map.size() == 0 )
130      tClass = (Class<T>) Object.class;
131    else
132      tClass = (Class<T>) map.values().iterator().next().getClass();
133    try
134      {
135      out.writeInt( map.size() );
136      out.writeUTF( tClass.getName() );
137
138      for( Map.Entry<String, T> entry : map.entrySet() )
139        {
140        out.writeUTF( entry.getKey() );
141        byte[] itemBytes = serialize( entry.getValue(), false );
142        out.writeInt( itemBytes.length );
143        out.write( itemBytes );
144        }
145      }
146    finally
147      {
148      out.close();
149      }
150
151    return bytes.toByteArray();
152    }
153
154  public <T> Map<String, T> deserializeMap( byte[] bytes, boolean decompress ) throws IOException
155    {
156    ObjectInputStream in = null;
157
158    try
159      {
160      ByteArrayInputStream byteStream = new ByteArrayInputStream( bytes );
161
162      in = new ObjectInputStream( decompress ? new GZIPInputStream( byteStream ) : byteStream );
163
164      int mapSize = in.readInt();
165      Class<T> tClass = (Class<T>) Class.forName( in.readUTF() );
166
167      Map<String, T> map = new HashMap<String, T>( mapSize );
168
169      for( int j = 0; j < mapSize; j++ )
170        {
171        String key = in.readUTF();
172        byte[] valBytes = new byte[ in.readInt() ];
173        in.readFully( valBytes );
174        map.put( key, deserialize( valBytes, tClass, false ) );
175        }
176
177      return map;
178      }
179    catch( ClassNotFoundException e )
180      {
181      throw new IOException( e );
182      }
183    finally
184      {
185      if( in != null )
186        in.close();
187      }
188    }
189
190  public <T> byte[] serializeList( List<T> list, boolean compress ) throws IOException
191    {
192    ByteArrayOutputStream bytes = new ByteArrayOutputStream();
193
194    ObjectOutputStream out = new ObjectOutputStream( compress ? new GZIPOutputStream( bytes ) : bytes );
195
196    Class<T> tClass;
197
198    if( list.size() == 0 )
199      tClass = (Class<T>) Object.class;
200    else
201      tClass = (Class<T>) list.get( 0 ).getClass();
202
203    try
204      {
205      out.writeInt( list.size() );
206      out.writeUTF( tClass.getName() );
207
208      for( T item : list )
209        {
210        byte[] itemBytes = serialize( item, false );
211        out.writeInt( itemBytes.length );
212        out.write( itemBytes );
213        }
214      }
215    finally
216      {
217      out.close();
218      }
219
220    return bytes.toByteArray();
221    }
222
223  public <T> List<T> deserializeList( byte[] bytes, boolean decompress ) throws IOException
224    {
225    ObjectInputStream in = null;
226
227    try
228      {
229      ByteArrayInputStream byteStream = new ByteArrayInputStream( bytes );
230
231      in = new ObjectInputStream( decompress ? new GZIPInputStream( byteStream ) : byteStream );
232
233      int listSize = in.readInt();
234      Class<T> tClass = (Class<T>) Class.forName( in.readUTF() );
235
236      List<T> list = new ArrayList<T>( listSize );
237
238      for( int i = 0; i < listSize; i++ )
239        {
240        byte[] itemBytes = new byte[ in.readInt() ];
241        in.readFully( itemBytes );
242        list.add( deserialize( itemBytes, tClass, false ) );
243        }
244
245      return list;
246      }
247    catch( ClassNotFoundException e )
248      {
249      throw new IOException( e );
250      }
251    finally
252      {
253      if( in != null )
254        in.close();
255      }
256    }
257  }