001/*
002 * Copyright (c) 2016-2017 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
004 *
005 * Project and contact information: http://www.cascading.org/
006 *
007 * This file is part of the Cascading project.
008 *
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *     http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 */
021
022package cascading.tuple.hadoop;
023
024import java.io.DataInputStream;
025import java.io.DataOutputStream;
026import java.io.IOException;
027import java.util.ArrayList;
028import java.util.Collection;
029import java.util.Collections;
030import java.util.Comparator;
031import java.util.HashMap;
032import java.util.LinkedList;
033import java.util.List;
034import java.util.Map;
035
036import cascading.CascadingException;
037import cascading.flow.FlowProcess;
038import cascading.flow.FlowProps;
039import cascading.flow.hadoop.util.HadoopUtil;
040import cascading.tuple.Comparison;
041import cascading.tuple.Fields;
042import cascading.tuple.Tuple;
043import cascading.tuple.TupleException;
044import cascading.tuple.hadoop.io.HadoopTupleOutputStream;
045import cascading.tuple.hadoop.io.IndexTupleDeserializer;
046import cascading.tuple.hadoop.io.IndexTupleSerializer;
047import cascading.tuple.hadoop.io.KeyIndexTupleDeserializer;
048import cascading.tuple.hadoop.io.KeyIndexTupleSerializer;
049import cascading.tuple.hadoop.io.KeyTupleDeserializer;
050import cascading.tuple.hadoop.io.KeyTupleSerializer;
051import cascading.tuple.hadoop.io.TupleDeserializer;
052import cascading.tuple.hadoop.io.TuplePairDeserializer;
053import cascading.tuple.hadoop.io.TuplePairSerializer;
054import cascading.tuple.hadoop.io.TupleSerializer;
055import cascading.tuple.hadoop.io.ValueIndexTupleDeserializer;
056import cascading.tuple.hadoop.io.ValueIndexTupleSerializer;
057import cascading.tuple.hadoop.io.ValueTupleDeserializer;
058import cascading.tuple.hadoop.io.ValueTupleSerializer;
059import cascading.tuple.io.IndexTuple;
060import cascading.tuple.io.KeyIndexTuple;
061import cascading.tuple.io.KeyTuple;
062import cascading.tuple.io.TupleInputStream;
063import cascading.tuple.io.TupleOutputStream;
064import cascading.tuple.io.TuplePair;
065import cascading.tuple.io.ValueIndexTuple;
066import cascading.tuple.io.ValueTuple;
067import cascading.util.Util;
068import org.apache.hadoop.conf.Configuration;
069import org.apache.hadoop.conf.Configured;
070import org.apache.hadoop.io.WritableUtils;
071import org.apache.hadoop.io.serializer.Deserializer;
072import org.apache.hadoop.io.serializer.Serialization;
073import org.apache.hadoop.io.serializer.SerializationFactory;
074import org.apache.hadoop.io.serializer.Serializer;
075import org.apache.hadoop.io.serializer.WritableSerialization;
076import org.apache.hadoop.util.ReflectionUtils;
077import org.slf4j.Logger;
078import org.slf4j.LoggerFactory;
079
080import static cascading.tuple.hadoop.TupleSerializationProps.HADOOP_IO_SERIALIZATIONS;
081
082/**
083 * Class TupleSerialization is an implementation of Hadoop's {@link Serialization} interface.
084 * <p>
085 * Typically developers will not use this implementation directly as it is automatically added
086 * to any relevant MapReduce jobs via the {@link org.apache.hadoop.conf.Configuration}.
087 * <p>
088 * By default, all primitive types are natively handled, and {@link org.apache.hadoop.io.BytesWritable}
089 * has a pre-configured serialization token since byte arrays are not handled natively by {@link Tuple}.
090 * <p>
091 * To add or manipulate Hadoop serializations or Cascading serializations tokens, see
092 * {@link TupleSerializationProps} for a fluent property builder class.
093 * <p>
094 * By default this Serialization interface registers the class {@link org.apache.hadoop.io.ByteWritable} as
095 * token 127.
096 */
097@SerializationToken(
098  tokens = {127},
099  classNames = {"org.apache.hadoop.io.BytesWritable"})
100public class TupleSerialization extends Configured implements Serialization
101  {
102
103  /** Field LOG */
104  private static final Logger LOG = LoggerFactory.getLogger( TupleSerialization.class );
105
106  /** Field defaultComparator * */
107  private Comparator defaultComparator;
108  /** Field classCache */
109  private final Map<String, Class> classCache = new HashMap<String, Class>();
110  /** Field serializationFactory */
111  private SerializationFactory serializationFactory;
112
113  /** Field tokenClassesMap */
114  private HashMap<Integer, String> tokenClassesMap;
115  /** Field classesTokensMap */
116  private HashMap<String, Integer> classesTokensMap;
117  /** Field tokenMapSize */
118  private long tokensSize = 0;
119
120  List<Integer> ordinals;
121
122  Map<Integer, Fields> keyFieldsMap;
123  Map<Integer, Fields> sortFieldsMap;
124  Map<Integer, Fields> valueFieldsMap;
125
126  Fields keyFields;
127  Fields sortFields;
128  Fields valueFields;
129
130  Boolean typesRequired; // for testing purposes
131  Boolean typesIgnored; // for testing purposes
132
133  static String getSerializationTokens( Configuration jobConf )
134    {
135    return jobConf.get( TupleSerializationProps.SERIALIZATION_TOKENS );
136    }
137
138  /**
139   * Adds this class as a Hadoop Serialization class. This method is safe to call redundantly.
140   * <p>
141   * This method will guarantee  and {@link WritableSerialization} are
142   * first in the list, as both are required.
143   *
144   * @param jobConf of type JobConf
145   */
146  public static void setSerializations( Configuration jobConf )
147    {
148    setSerializations( jobConf, Collections.emptySet() );
149    }
150
151  public static void setSerializations( Configuration jobConf, Collection<String> provided )
152    {
153    String serializations = getSerializations( jobConf );
154
155    LinkedList<String> list = new LinkedList<String>();
156
157    list.addAll( provided );
158
159    if( serializations != null && !serializations.isEmpty() )
160      Collections.addAll( list, serializations.split( "," ) );
161
162    // required by MultiInputSplit
163    String writable = WritableSerialization.class.getName();
164    String tuple = TupleSerialization.class.getName();
165
166    list.remove( writable );
167    list.remove( tuple );
168
169    list.addFirst( writable );
170    list.addFirst( tuple );
171
172    // make writable last
173    jobConf.set( HADOOP_IO_SERIALIZATIONS, Util.join( list, "," ) );
174    }
175
176  static String getSerializations( Configuration jobConf )
177    {
178    return jobConf.get( HADOOP_IO_SERIALIZATIONS, null );
179    }
180
181  public static Comparator getDefaultComparator( Comparator comparator, Configuration jobConf )
182    {
183    String typeName = jobConf.get( FlowProps.DEFAULT_ELEMENT_COMPARATOR );
184
185    if( Util.isEmpty( typeName ) )
186      return null;
187
188    if( comparator == null )
189      return createComparator( jobConf, typeName );
190
191    if( comparator.getClass().getName().equals( typeName ) && !( comparator instanceof Configured ) )
192      return comparator;
193
194    return createComparator( jobConf, typeName );
195    }
196
197  public static Comparator getDefaultComparator( Configuration jobConf )
198    {
199    String typeName = jobConf.get( FlowProps.DEFAULT_ELEMENT_COMPARATOR );
200
201    if( Util.isEmpty( typeName ) )
202      return null;
203
204    return createComparator( jobConf, typeName );
205    }
206
207  private static Comparator createComparator( Configuration jobConf, String typeName )
208    {
209    LOG.debug( "using default comparator: {}", typeName );
210
211    try
212      {
213      Class<Comparator> type = (Class<Comparator>) TupleSerialization.class.getClassLoader().loadClass( typeName );
214
215      return ReflectionUtils.newInstance( type, jobConf );
216      }
217    catch( ClassNotFoundException exception )
218      {
219      throw new CascadingException( "unable to load class: " + typeName, exception );
220      }
221    }
222
223  /** Constructor TupleSerialization creates a new TupleSerialization instance. */
224  public TupleSerialization()
225    {
226    }
227
228  public TupleSerialization( final FlowProcess<? extends Configuration> flowProcess )
229    {
230    super( new Configuration()
231      {
232      @Override
233      public String get( String name )
234        {
235        return get( name, null );
236        }
237
238      @Override
239      public String get( String name, String defaultValue )
240        {
241        Object value = flowProcess.getProperty( name );
242        return value == null ? defaultValue : String.valueOf( value );
243        }
244      } );
245    }
246
247  /**
248   * Constructor TupleSerialization creates a new TupleSerialization instance.
249   *
250   * @param conf of type Configuration
251   */
252  public TupleSerialization( Configuration conf )
253    {
254    super( conf );
255    }
256
257  @Override
258  public void setConf( Configuration conf )
259    {
260    super.setConf( conf );
261
262    if( conf != null )
263      defaultComparator = getDefaultComparator( conf );
264    }
265
266  @Override
267  public Configuration getConf()
268    {
269    if( super.getConf() == null )
270      setConf( new Configuration() );
271
272    return super.getConf();
273    }
274
275  public boolean areTypesIgnored()
276    {
277    if( typesIgnored == null )
278      {
279      typesIgnored = getConf().getBoolean( TupleSerializationProps.IGNORE_TYPES, false );
280
281      if( typesIgnored )
282        LOG.info( "types are being ignored during serialization" );
283      }
284
285    return typesIgnored;
286    }
287
288  public boolean areTypesRequired()
289    {
290    if( typesRequired == null )
291      {
292      typesRequired = getConf().getBoolean( TupleSerializationProps.REQUIRE_TYPES, false );
293
294      if( typesRequired )
295        LOG.info( "types are being enforced during serialization" );
296      }
297
298    return typesRequired;
299    }
300
301  SerializationFactory getSerializationFactory()
302    {
303    if( serializationFactory == null )
304      serializationFactory = new SerializationFactory( getConf() );
305
306    return serializationFactory;
307    }
308
309  public Fields getKeyFields()
310    {
311    if( keyFields == null && getFirstOrdinal() != null )
312      keyFields = getKeyFieldsMap().get( getFirstOrdinal() );
313
314    return keyFields;
315    }
316
317  public Class[] getKeyTypes()
318    {
319    Fields fields = getKeyFields();
320
321    return getTypesFor( fields );
322    }
323
324  public Class[] getTypesFor( Fields fields )
325    {
326    if( areTypesIgnored() || fields == null )
327      return null;
328
329    return fields.getTypesClasses();
330    }
331
332  public Fields getSortFields()
333    {
334    if( sortFields == null && getFirstOrdinal() != null )
335      sortFields = getSortFieldsMap().get( getFirstOrdinal() );
336
337    return sortFields;
338    }
339
340  public Class[] getSortTypes()
341    {
342    return getTypesFor( getSortFields() );
343    }
344
345  public Fields getValueFields()
346    {
347    if( valueFields == null && getFirstOrdinal() != null )
348      valueFields = getValueFieldsMap().get( getFirstOrdinal() );
349
350    return valueFields;
351    }
352
353  public Fields getMaskedValueFields()
354    {
355    return maskVoid( getValueFields(), getKeyFields() );
356    }
357
358  public Class[] getValueTypes()
359    {
360    return getTypesFor( getValueFields() );
361    }
362
363  public Map<Integer, Class[]> getKeyTypeMap()
364    {
365    if( areTypesIgnored() || getKeyFieldsMap() == null )
366      return Collections.emptyMap();
367
368    Map<Integer, Class[]> map = new HashMap<>();
369
370    for( Map.Entry<Integer, Fields> entry : getKeyFieldsMap().entrySet() )
371      map.put( entry.getKey(), entry.getValue().getTypesClasses() );
372
373    return map;
374    }
375
376  public Map<Integer, Class[]> getValueTypeMap()
377    {
378    if( areTypesIgnored() || getValueFieldsMap() == null )
379      return Collections.emptyMap();
380
381    Map<Integer, Class[]> map = new HashMap<>();
382
383    for( Map.Entry<Integer, Fields> entry : getValueFieldsMap().entrySet() )
384      map.put( entry.getKey(), entry.getValue().getTypesClasses() );
385
386    return map;
387    }
388
389  public Map<Integer, Class[]> getMaskedValueTypeMap()
390    {
391    if( areTypesIgnored() || getValueFieldsMap() == null )
392      return Collections.emptyMap();
393
394    Map<Integer, Fields> keyFieldsMap = getKeyFieldsMap();
395
396    if( keyFieldsMap == null || keyFieldsMap.isEmpty() )
397      return getValueTypeMap();
398
399    Map<Integer, Class[]> map = new HashMap<>();
400
401    for( Map.Entry<Integer, Fields> entry : getValueFieldsMap().entrySet() )
402      {
403      Integer ordinal = entry.getKey();
404      Fields valueFields = entry.getValue();
405      Fields keyFields = keyFieldsMap.get( ordinal );
406
407      map.put( ordinal, maskVoid( valueFields, keyFields ).getTypesClasses() );
408      }
409
410    return map;
411    }
412
413  public List<Integer> getOrdinals()
414    {
415    if( ordinals == null )
416      ordinals = Util.split( Integer.class, ",", getConf().get( "cascading.node.ordinals" ) );
417
418    return ordinals;
419    }
420
421  public Integer getFirstOrdinal()
422    {
423    if( getOrdinals().isEmpty() )
424      return null;
425
426    return Util.getFirst( getOrdinals() );
427    }
428
429  public Map<Integer, Fields> getKeyFieldsMap()
430    {
431    if( keyFieldsMap == null )
432      keyFieldsMap = getFields( getConf(), "cascading.node.key.fields" );
433
434    return keyFieldsMap;
435    }
436
437  public Map<Integer, Fields> getSortFieldsMap()
438    {
439    if( sortFields == null )
440      sortFieldsMap = getFields( getConf(), "cascading.node.sort.fields" );
441
442    return sortFieldsMap;
443    }
444
445  public Map<Integer, Fields> getValueFieldsMap()
446    {
447    if( valueFieldsMap == null )
448      valueFieldsMap = getFields( getConf(), "cascading.node.value.fields" );
449
450    return valueFieldsMap;
451    }
452
453  /** Must be called before {@link #getClassNameFor(int)} and {@link #getTokenFor(String)} methods. */
454  void initTokenMaps()
455    {
456    if( tokenClassesMap != null )
457      return;
458
459    tokenClassesMap = new HashMap<>();
460    classesTokensMap = new HashMap<>();
461
462    String tokenProperty = getSerializationTokens( getConf() );
463
464    if( tokenProperty != null )
465      {
466      tokenProperty = tokenProperty.replaceAll( "\\s", "" ); // allow for whitespace in token set
467
468      for( String pair : tokenProperty.split( "," ) )
469        {
470        String[] elements = pair.split( "=" );
471        addToken( null, Integer.parseInt( elements[ 0 ] ), elements[ 1 ] );
472        }
473      }
474
475    String serializationsString = getSerializations( getConf() );
476
477    LOG.debug( "using hadoop serializations from the job conf: {} ", serializationsString );
478
479    if( serializationsString == null )
480      return;
481
482    String[] serializations = serializationsString.split( "," );
483
484    for( String serializationName : serializations )
485      {
486      try
487        {
488        Class type = getConf().getClassByName( serializationName );
489
490        SerializationToken tokenAnnotation = (SerializationToken) type.getAnnotation( SerializationToken.class );
491
492        if( tokenAnnotation == null )
493          continue;
494
495        if( tokenAnnotation.tokens().length != tokenAnnotation.classNames().length )
496          throw new CascadingException( "serialization annotation tokens and classNames must be the same length" );
497
498        int[] tokens = tokenAnnotation.tokens();
499
500        for( int i = 0; i < tokens.length; i++ )
501          addToken( type, tokens[ i ], tokenAnnotation.classNames()[ i ] );
502        }
503      catch( ClassNotFoundException exception )
504        {
505        LOG.warn( "unable to load serialization class: {}", serializationName, exception );
506        }
507      }
508
509    tokensSize = tokenClassesMap.size();
510    }
511
512  private void addToken( Class type, int token, String className )
513    {
514    if( type != null && !type.getName().startsWith( "cascading." ) && token < 128 )
515      throw new CascadingException( "serialization annotation tokens may not be less than 128, was: " + token );
516
517    if( tokenClassesMap.containsKey( token ) )
518      {
519      if( type == null )
520        throw new IllegalStateException( "duplicate serialization token: " + token + " for class: " + className + " found in properties" );
521
522      throw new IllegalStateException( "duplicate serialization token: " + token + " for class: " + className + " on serialization: " + type.getName() );
523      }
524
525    if( classesTokensMap.containsKey( className ) )
526      {
527      if( type == null )
528        throw new IllegalStateException( "duplicate serialization classname: " + className + " for token: " + token + " found in properties " );
529
530      throw new IllegalStateException( "duplicate serialization classname: " + className + " for token: " + token + " on serialization: " + type.getName() );
531      }
532
533    LOG.debug( "adding serialization token: {}, for classname: {}", token, className );
534
535    tokenClassesMap.put( token, className );
536    classesTokensMap.put( className, token );
537    }
538
539  /**
540   * Returns the className for the given token.
541   *
542   * @param token of type int
543   * @return a String
544   */
545  final String getClassNameFor( int token )
546    {
547    initTokenMaps();
548
549    if( tokensSize == 0 )
550      return null;
551
552    return tokenClassesMap.get( token );
553    }
554
555  final long getTokensMapSize()
556    {
557    return tokensSize;
558    }
559
560  /**
561   * Returns the token for the given className.
562   *
563   * @param className of type String
564   * @return an Integer
565   */
566  final Integer getTokenFor( String className )
567    {
568    initTokenMaps();
569
570    if( tokensSize == 0 )
571      return null;
572
573    return classesTokensMap.get( className );
574    }
575
576  public Comparator getDefaultComparator()
577    {
578    return defaultComparator;
579    }
580
581  public Comparator getComparator( Class type )
582    {
583    Serialization serialization = getSerialization( type );
584
585    Comparator comparator = null;
586
587    if( serialization instanceof Comparison )
588      comparator = ( (Comparison) serialization ).getComparator( type );
589
590    if( comparator != null )
591      return comparator;
592
593    return defaultComparator;
594    }
595
596  Serialization getSerialization( String className )
597    {
598    return getSerialization( getClass( className ) );
599    }
600
601  Serialization getSerialization( Class type )
602    {
603    return getSerializationFactory().getSerialization( type );
604    }
605
606  Serializer getNewSerializer( Class type )
607    {
608    try
609      {
610      Serializer serializer = getSerializationFactory().getSerializer( type );
611
612      if( serializer == null )
613        throw new CascadingException( "unable to load serializer for: " + type.getName() + " from: " + getSerializationFactory().getClass().getName() );
614
615      return serializer;
616      }
617    catch( NullPointerException exception )
618      {
619      throw new CascadingException( "unable to load serializer for: " + type.getName() + " from: " + getSerializationFactory().getClass().getName() );
620      }
621    }
622
623  Deserializer getNewDeserializer( String className )
624    {
625    try
626      {
627      Deserializer deserializer = getSerializationFactory().getDeserializer( getClass( className ) );
628
629      if( deserializer == null )
630        throw new CascadingException( "unable to load deserializer for: " + className + " from: " + getSerializationFactory().getClass().getName() );
631
632      return deserializer;
633      }
634    catch( NullPointerException exception )
635      {
636      throw new CascadingException( "unable to load deserializer for: " + className + " from: " + getSerializationFactory().getClass().getName() );
637      }
638    }
639
640  KeyTupleDeserializer getKeyTupleDeserializer()
641    {
642    return new KeyTupleDeserializer( getElementReader() );
643    }
644
645  ValueTupleDeserializer getValueTupleDeserializer()
646    {
647    return new ValueTupleDeserializer( getElementReader() );
648    }
649
650  TuplePairDeserializer getTuplePairDeserializer()
651    {
652    return new TuplePairDeserializer( getElementReader() );
653    }
654
655  /**
656   * Method getElementReader returns the elementReader of this TupleSerialization object.
657   *
658   * @return the elementReader (type SerializationElementReader) of this TupleSerialization object.
659   */
660  public SerializationElementReader getElementReader()
661    {
662    return new SerializationElementReader( this );
663    }
664
665  TupleDeserializer getTupleDeserializer()
666    {
667    return new TupleDeserializer( getElementReader() );
668    }
669
670  private KeyTupleSerializer getKeyTupleSerializer()
671    {
672    return new KeyTupleSerializer( getElementWriter() );
673    }
674
675  private ValueTupleSerializer getValueTupleSerializer()
676    {
677    return new ValueTupleSerializer( getElementWriter() );
678    }
679
680  private TuplePairSerializer getTuplePairSerializer()
681    {
682    return new TuplePairSerializer( getElementWriter() );
683    }
684
685  KeyIndexTupleDeserializer getKeyIndexTupleDeserializer()
686    {
687    return new KeyIndexTupleDeserializer( getElementReader() );
688    }
689
690  ValueIndexTupleDeserializer getValueIndexTupleDeserializer()
691    {
692    return new ValueIndexTupleDeserializer( getElementReader() );
693    }
694
695  IndexTupleDeserializer getIndexTupleDeserializer()
696    {
697    return new IndexTupleDeserializer( getElementReader() );
698    }
699
700  /**
701   * Method getElementWriter returns the elementWriter of this TupleSerialization object.
702   *
703   * @return the elementWriter (type SerializationElementWriter) of this TupleSerialization object.
704   */
705  public SerializationElementWriter getElementWriter()
706    {
707    return new SerializationElementWriter( this );
708    }
709
710  private TupleSerializer getTupleSerializer()
711    {
712    return new TupleSerializer( getElementWriter() );
713    }
714
715  private KeyIndexTupleSerializer getKeyIndexTupleSerializer()
716    {
717    return new KeyIndexTupleSerializer( getElementWriter() );
718    }
719
720  private ValueIndexTupleSerializer getValueIndexTupleSerializer()
721    {
722    return new ValueIndexTupleSerializer( getElementWriter() );
723    }
724
725  private IndexTupleSerializer getIndexTupleSerializer()
726    {
727    return new IndexTupleSerializer( getElementWriter() );
728    }
729
730  public boolean accept( Class c )
731    {
732    return Tuple.class == c ||
733      KeyTuple.class == c || ValueTuple.class == c ||
734      KeyIndexTuple.class == c || ValueIndexTuple.class == c ||
735      TuplePair.class == c || IndexTuple.class == c;
736    }
737
738  public Deserializer getDeserializer( Class c )
739    {
740    if( c == Tuple.class )
741      return getTupleDeserializer();
742    else if( c == KeyTuple.class )
743      return getKeyTupleDeserializer();
744    else if( c == ValueTuple.class )
745      return getValueTupleDeserializer();
746    else if( c == KeyIndexTuple.class )
747      return getKeyIndexTupleDeserializer();
748    else if( c == ValueIndexTuple.class )
749      return getValueIndexTupleDeserializer();
750    else if( c == TuplePair.class )
751      return getTuplePairDeserializer();
752    else if( c == IndexTuple.class )
753      return getIndexTupleDeserializer();
754
755    throw new IllegalArgumentException( "unknown class, cannot deserialize: " + c.getName() );
756    }
757
758  public Serializer getSerializer( Class c )
759    {
760    if( c == Tuple.class )
761      return getTupleSerializer();
762    else if( c == KeyTuple.class )
763      return getKeyTupleSerializer();
764    else if( c == ValueTuple.class )
765      return getValueTupleSerializer();
766    else if( c == KeyIndexTuple.class )
767      return getKeyIndexTupleSerializer();
768    else if( c == ValueIndexTuple.class )
769      return getValueIndexTupleSerializer();
770    else if( c == TuplePair.class )
771      return getTuplePairSerializer();
772    else if( c == IndexTuple.class )
773      return getIndexTupleSerializer();
774
775    throw new IllegalArgumentException( "unknown class, cannot serialize: " + c.getName() );
776    }
777
778  public Class getClass( String className )
779    {
780    Class type = classCache.get( className );
781
782    if( type != null )
783      return type;
784
785    try
786      {
787      if( className.charAt( 0 ) == '[' )
788        type = Class.forName( className, true, Thread.currentThread().getContextClassLoader() );
789      else
790        type = Thread.currentThread().getContextClassLoader().loadClass( className );
791      }
792    catch( ClassNotFoundException exception )
793      {
794      throw new TupleException( "unable to load class named: " + className, exception );
795      }
796
797    classCache.put( className, type );
798
799    return type;
800    }
801
802  public static Map<Integer, Fields> getFields( Configuration conf, String property )
803    {
804    try
805      {
806      return HadoopUtil.getFields( conf, property );
807      }
808    catch( IOException exception )
809      {
810      LOG.warn( "unable to get fields for: " + property );
811
812      return Collections.emptyMap();
813      }
814    }
815
816  private static Fields maskVoid( Fields fields, Fields mask )
817    {
818    if( fields == null )
819      return null;
820
821    if( mask == null || !fields.hasTypes() || !mask.hasTypes() )
822      return fields;
823
824    Fields voidedKey = mask.applyTypes( Fields.size( mask.size(), Void.class ) );
825
826    fields = fields.applyTypes( voidedKey );
827
828    return fields;
829    }
830
831  public static class SerializationElementReader implements TupleInputStream.ElementReader
832    {
833    /** Field LOG */
834    private static final Logger LOG = LoggerFactory.getLogger( SerializationElementReader.class );
835
836    /** Field tupleSerialization */
837    private final TupleSerialization tupleSerialization;
838
839    /** Field deserializers */
840    final Map<String, Deserializer> deserializers = new HashMap<String, Deserializer>();
841
842    /**
843     * Constructor SerializationElementReader creates a new SerializationElementReader instance.
844     *
845     * @param tupleSerialization of type TupleSerialization
846     */
847    public SerializationElementReader( TupleSerialization tupleSerialization )
848      {
849      this.tupleSerialization = tupleSerialization;
850      }
851
852    public TupleSerialization getTupleSerialization()
853      {
854      return tupleSerialization;
855      }
856
857    public Object read( int token, DataInputStream inputStream ) throws IOException
858      {
859      String className = getClassNameFor( token, inputStream );
860      Deserializer deserializer = getDeserializerFor( inputStream, className );
861
862      Object foundObject = null;
863      Object object;
864
865      try
866        {
867        object = deserializer.deserialize( foundObject );
868        }
869      catch( IOException exception )
870        {
871        LOG.error( "failed deserializing token: " + token + " with classname: " + className, exception );
872
873        throw exception;
874        }
875
876      return object;
877      }
878
879    public Object read( Class type, DataInputStream inputStream ) throws IOException
880      {
881      String className = type.getName();
882      Deserializer deserializer = getDeserializerFor( inputStream, className );
883
884      Object foundObject = null;
885      Object object;
886
887      try
888        {
889        object = deserializer.deserialize( foundObject );
890        }
891      catch( IOException exception )
892        {
893        LOG.error( "failed deserializing: " + className, exception );
894
895        throw exception;
896        }
897
898      return object;
899      }
900
901    @Override
902    public Comparator getComparatorFor( int token, DataInputStream inputStream ) throws IOException
903      {
904      Class type = tupleSerialization.getClass( getClassNameFor( token, inputStream ) );
905
906      return tupleSerialization.getComparator( type );
907      }
908
909    private Deserializer getDeserializerFor( DataInputStream inputStream, String className ) throws IOException
910      {
911      Deserializer deserializer = deserializers.get( className );
912
913      if( deserializer == null )
914        {
915        deserializer = tupleSerialization.getNewDeserializer( className );
916        deserializer.open( inputStream );
917        deserializers.put( className, deserializer );
918        }
919
920      return deserializer;
921      }
922
923    public String getClassNameFor( int token, DataInputStream inputStream ) throws IOException
924      {
925      String className = tupleSerialization.getClassNameFor( token );
926
927      try
928        {
929        if( className == null )
930          className = WritableUtils.readString( inputStream );
931        }
932      catch( IOException exception )
933        {
934        LOG.error( "unable to resolve token: {}, to a valid classname, with token map of size: {}, rethrowing IOException", token, tupleSerialization.getTokensMapSize() );
935        throw exception;
936        }
937
938      return className;
939      }
940
941    public void close()
942      {
943      if( deserializers.size() == 0 )
944        return;
945
946      Collection<Deserializer> clone = new ArrayList<Deserializer>( deserializers.values() );
947
948      deserializers.clear();
949
950      for( Deserializer deserializer : clone )
951        {
952        try
953          {
954          deserializer.close();
955          }
956        catch( IOException exception )
957          {
958          // do nothing
959          }
960        }
961      }
962    }
963
964  public static class SerializationElementWriter implements TupleOutputStream.ElementWriter
965    {
966    /** Field LOG */
967    private static final Logger LOG = LoggerFactory.getLogger( SerializationElementWriter.class );
968
969    /** Field tupleSerialization */
970    private final TupleSerialization tupleSerialization;
971
972    /** Field serializers */
973    final Map<Class, Serializer> serializers = new HashMap<Class, Serializer>();
974
975    public SerializationElementWriter( TupleSerialization tupleSerialization )
976      {
977      this.tupleSerialization = tupleSerialization;
978      }
979
980    public TupleSerialization getTupleSerialization()
981      {
982      return tupleSerialization;
983      }
984
985    public void write( DataOutputStream outputStream, Object object ) throws IOException
986      {
987      Class<?> type = object.getClass();
988      String className = type.getName();
989      Integer token = tupleSerialization.getTokenFor( className );
990
991      if( token == null )
992        {
993        LOG.debug( "no serialization token found for classname: {}", className );
994
995        WritableUtils.writeVInt( outputStream, HadoopTupleOutputStream.WRITABLE_TOKEN ); // denotes to punt to hadoop serialization
996        WritableUtils.writeString( outputStream, className );
997        }
998      else
999        {
1000        WritableUtils.writeVInt( outputStream, token );
1001        }
1002
1003      Serializer serializer = getSerializer( outputStream, type );
1004
1005      try
1006        {
1007        serializer.serialize( object );
1008        }
1009      catch( IOException exception )
1010        {
1011        LOG.error( "failed serializing token: " + token + " with classname: " + className, exception );
1012
1013        throw exception;
1014        }
1015      }
1016
1017    private Serializer getSerializer( DataOutputStream outputStream, Class<?> type ) throws IOException
1018      {
1019      Serializer serializer = serializers.get( type );
1020
1021      if( serializer == null )
1022        {
1023        serializer = tupleSerialization.getNewSerializer( type );
1024        serializer.open( outputStream );
1025        serializers.put( type, serializer );
1026        }
1027
1028      return serializer;
1029      }
1030
1031    public void write( DataOutputStream outputStream, Class<?> type, Object object ) throws IOException
1032      {
1033      Serializer serializer = getSerializer( outputStream, type );
1034
1035      try
1036        {
1037        serializer.serialize( object );
1038        }
1039      catch( IOException exception )
1040        {
1041        LOG.error( "failed serializing type: " + type.getName(), exception );
1042
1043        throw exception;
1044        }
1045      }
1046
1047    public void close()
1048      {
1049      if( serializers.size() == 0 )
1050        return;
1051
1052      Collection<Serializer> clone = new ArrayList<Serializer>( serializers.values() );
1053
1054      serializers.clear();
1055
1056      for( Serializer serializer : clone )
1057        {
1058        try
1059          {
1060          serializer.close();
1061          }
1062        catch( IOException exception )
1063          {
1064          // do nothing
1065          }
1066        }
1067      }
1068    }
1069  }