001/*
002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.tuple.hadoop;
022
023import java.io.DataInputStream;
024import java.io.DataOutputStream;
025import java.io.IOException;
026import java.util.ArrayList;
027import java.util.Collection;
028import java.util.Collections;
029import java.util.Comparator;
030import java.util.HashMap;
031import java.util.LinkedList;
032import java.util.List;
033import java.util.Map;
034
035import cascading.CascadingException;
036import cascading.flow.FlowProcess;
037import cascading.flow.FlowProps;
038import cascading.flow.hadoop.util.HadoopUtil;
039import cascading.tuple.Comparison;
040import cascading.tuple.Fields;
041import cascading.tuple.Tuple;
042import cascading.tuple.TupleException;
043import cascading.tuple.hadoop.io.HadoopTupleOutputStream;
044import cascading.tuple.hadoop.io.IndexTupleDeserializer;
045import cascading.tuple.hadoop.io.IndexTupleSerializer;
046import cascading.tuple.hadoop.io.KeyIndexTupleDeserializer;
047import cascading.tuple.hadoop.io.KeyIndexTupleSerializer;
048import cascading.tuple.hadoop.io.KeyTupleDeserializer;
049import cascading.tuple.hadoop.io.KeyTupleSerializer;
050import cascading.tuple.hadoop.io.TupleDeserializer;
051import cascading.tuple.hadoop.io.TuplePairDeserializer;
052import cascading.tuple.hadoop.io.TuplePairSerializer;
053import cascading.tuple.hadoop.io.TupleSerializer;
054import cascading.tuple.hadoop.io.ValueIndexTupleDeserializer;
055import cascading.tuple.hadoop.io.ValueIndexTupleSerializer;
056import cascading.tuple.hadoop.io.ValueTupleDeserializer;
057import cascading.tuple.hadoop.io.ValueTupleSerializer;
058import cascading.tuple.io.IndexTuple;
059import cascading.tuple.io.KeyIndexTuple;
060import cascading.tuple.io.KeyTuple;
061import cascading.tuple.io.TupleInputStream;
062import cascading.tuple.io.TupleOutputStream;
063import cascading.tuple.io.TuplePair;
064import cascading.tuple.io.ValueIndexTuple;
065import cascading.tuple.io.ValueTuple;
066import cascading.util.Util;
067import org.apache.hadoop.conf.Configuration;
068import org.apache.hadoop.conf.Configured;
069import org.apache.hadoop.io.WritableUtils;
070import org.apache.hadoop.io.serializer.Deserializer;
071import org.apache.hadoop.io.serializer.Serialization;
072import org.apache.hadoop.io.serializer.SerializationFactory;
073import org.apache.hadoop.io.serializer.Serializer;
074import org.apache.hadoop.io.serializer.WritableSerialization;
075import org.apache.hadoop.util.ReflectionUtils;
076import org.slf4j.Logger;
077import org.slf4j.LoggerFactory;
078
079import static cascading.tuple.hadoop.TupleSerializationProps.HADOOP_IO_SERIALIZATIONS;
080
081/**
082 * Class TupleSerialization is an implementation of Hadoop's {@link Serialization} interface.
083 * <p/>
084 * Typically developers will not use this implementation directly as it is automatically added
085 * to any relevant MapReduce jobs via the {@link org.apache.hadoop.conf.Configuration}.
086 * <p/>
087 * By default, all primitive types are natively handled, and {@link org.apache.hadoop.io.BytesWritable}
088 * has a pre-configured serialization token since byte arrays are not handled natively by {@link Tuple}.
089 * <p/>
090 * To add or manipulate Hadoop serializations or Cascading serializations tokens, see
091 * {@link TupleSerializationProps} for a fluent property builder class.
092 * <p/>
093 * By default this Serialization interface registers the class {@link org.apache.hadoop.io.ByteWritable} as
094 * token 127.
095 */
096@SerializationToken(
097  tokens = {127},
098  classNames = {"org.apache.hadoop.io.BytesWritable"})
099public class TupleSerialization extends Configured implements Serialization
100  {
101
102  /** Field LOG */
103  private static final Logger LOG = LoggerFactory.getLogger( TupleSerialization.class );
104
105  /** Field defaultComparator * */
106  private Comparator defaultComparator;
107  /** Field classCache */
108  private final Map<String, Class> classCache = new HashMap<String, Class>();
109  /** Field serializationFactory */
110  private SerializationFactory serializationFactory;
111
112  /** Field tokenClassesMap */
113  private HashMap<Integer, String> tokenClassesMap;
114  /** Field classesTokensMap */
115  private HashMap<String, Integer> classesTokensMap;
116  /** Field tokenMapSize */
117  private long tokensSize = 0;
118
119  List<Integer> ordinals;
120
121  Map<Integer, Fields> keyFieldsMap;
122  Map<Integer, Fields> sortFieldsMap;
123  Map<Integer, Fields> valueFieldsMap;
124
125  Fields keyFields;
126  Fields sortFields;
127  Fields valueFields;
128
129  Boolean typesRequired; // for testing purposes
130  Boolean typesIgnored; // for testing purposes
131
132  static String getSerializationTokens( Configuration jobConf )
133    {
134    return jobConf.get( TupleSerializationProps.SERIALIZATION_TOKENS );
135    }
136
137  /**
138   * Adds this class as a Hadoop Serialization class. This method is safe to call redundantly.
139   * <p/>
140   * This method will guarantee {@link TupleSerialization} and {@link WritableSerialization} are
141   * first in the list, as both are required.
142   *
143   * @param jobConf of type JobConf
144   */
145  public static void setSerializations( Configuration jobConf )
146    {
147    String serializations = getSerializations( jobConf );
148
149    LinkedList<String> list = new LinkedList<String>();
150
151    if( serializations != null && !serializations.isEmpty() )
152      Collections.addAll( list, serializations.split( "," ) );
153
154    // required by MultiInputSplit
155    String writable = WritableSerialization.class.getName();
156    String tuple = TupleSerialization.class.getName();
157
158    list.remove( writable );
159    list.remove( tuple );
160
161    list.addFirst( writable );
162    list.addFirst( tuple );
163
164    // make writable last
165    jobConf.set( HADOOP_IO_SERIALIZATIONS, Util.join( list, "," ) );
166    }
167
168  static String getSerializations( Configuration jobConf )
169    {
170    return jobConf.get( HADOOP_IO_SERIALIZATIONS, null );
171    }
172
173  public static Comparator getDefaultComparator( Comparator comparator, Configuration jobConf )
174    {
175    String typeName = jobConf.get( FlowProps.DEFAULT_ELEMENT_COMPARATOR );
176
177    if( Util.isEmpty( typeName ) )
178      return null;
179
180    if( comparator == null )
181      return createComparator( jobConf, typeName );
182
183    if( comparator.getClass().getName().equals( typeName ) && !( comparator instanceof Configured ) )
184      return comparator;
185
186    return createComparator( jobConf, typeName );
187    }
188
189  public static Comparator getDefaultComparator( Configuration jobConf )
190    {
191    String typeName = jobConf.get( FlowProps.DEFAULT_ELEMENT_COMPARATOR );
192
193    if( Util.isEmpty( typeName ) )
194      return null;
195
196    return createComparator( jobConf, typeName );
197    }
198
199  private static Comparator createComparator( Configuration jobConf, String typeName )
200    {
201    LOG.debug( "using default comparator: {}", typeName );
202
203    try
204      {
205      Class<Comparator> type = (Class<Comparator>) TupleSerialization.class.getClassLoader().loadClass( typeName );
206
207      return ReflectionUtils.newInstance( type, jobConf );
208      }
209    catch( ClassNotFoundException exception )
210      {
211      throw new CascadingException( "unable to load class: " + typeName, exception );
212      }
213    }
214
215  /** Constructor TupleSerialization creates a new TupleSerialization instance. */
216  public TupleSerialization()
217    {
218    }
219
220  public TupleSerialization( final FlowProcess<? extends Configuration> flowProcess )
221    {
222    super( new Configuration()
223    {
224    @Override
225    public String get( String name )
226      {
227      return get( name, null );
228      }
229
230    @Override
231    public String get( String name, String defaultValue )
232      {
233      Object value = flowProcess.getProperty( name );
234      return value == null ? defaultValue : String.valueOf( value );
235      }
236    } );
237    }
238
239  /**
240   * Constructor TupleSerialization creates a new TupleSerialization instance.
241   *
242   * @param conf of type Configuration
243   */
244  public TupleSerialization( Configuration conf )
245    {
246    super( conf );
247    }
248
249  @Override
250  public void setConf( Configuration conf )
251    {
252    super.setConf( conf );
253
254    if( conf != null )
255      defaultComparator = getDefaultComparator( conf );
256    }
257
258  @Override
259  public Configuration getConf()
260    {
261    if( super.getConf() == null )
262      setConf( new Configuration() );
263
264    return super.getConf();
265    }
266
267  public boolean areTypesIgnored()
268    {
269    if( typesIgnored == null )
270      {
271      typesIgnored = getConf().getBoolean( TupleSerializationProps.IGNORE_TYPES, false );
272
273      if( typesIgnored )
274        LOG.info( "types are being ignored during serialization" );
275      }
276
277    return typesIgnored;
278    }
279
280  public boolean areTypesRequired()
281    {
282    if( typesRequired == null )
283      {
284      typesRequired = getConf().getBoolean( TupleSerializationProps.REQUIRE_TYPES, false );
285
286      if( typesRequired )
287        LOG.info( "types are being enforced during serialization" );
288      }
289
290    return typesRequired;
291    }
292
293  SerializationFactory getSerializationFactory()
294    {
295    if( serializationFactory == null )
296      serializationFactory = new SerializationFactory( getConf() );
297
298    return serializationFactory;
299    }
300
301  public Fields getKeyFields()
302    {
303    if( keyFields == null && getFirstOrdinal() != null )
304      keyFields = getKeyFieldsMap().get( getFirstOrdinal() );
305
306    return keyFields;
307    }
308
309  public Class[] getKeyTypes()
310    {
311    Fields fields = getKeyFields();
312
313    return getTypesFor( fields );
314    }
315
316  public Class[] getTypesFor( Fields fields )
317    {
318    if( areTypesIgnored() || fields == null )
319      return null;
320
321    return fields.getTypesClasses();
322    }
323
324  public Fields getSortFields()
325    {
326    if( sortFields == null && getFirstOrdinal() != null )
327      sortFields = getSortFieldsMap().get( getFirstOrdinal() );
328
329    return sortFields;
330    }
331
332  public Class[] getSortTypes()
333    {
334    return getTypesFor( getSortFields() );
335    }
336
337  public Fields getValueFields()
338    {
339    if( valueFields == null && getFirstOrdinal() != null )
340      valueFields = getValueFieldsMap().get( getFirstOrdinal() );
341
342    return valueFields;
343    }
344
345  public Fields getMaskedValueFields()
346    {
347    return maskVoid( getValueFields(), getKeyFields() );
348    }
349
350  public Class[] getValueTypes()
351    {
352    return getTypesFor( getValueFields() );
353    }
354
355  public Map<Integer, Class[]> getKeyTypeMap()
356    {
357    if( areTypesIgnored() || getKeyFieldsMap() == null )
358      return Collections.emptyMap();
359
360    Map<Integer, Class[]> map = new HashMap<>();
361
362    for( Map.Entry<Integer, Fields> entry : getKeyFieldsMap().entrySet() )
363      map.put( entry.getKey(), entry.getValue().getTypesClasses() );
364
365    return map;
366    }
367
368  public Map<Integer, Class[]> getValueTypeMap()
369    {
370    if( areTypesIgnored() || getValueFieldsMap() == null )
371      return Collections.emptyMap();
372
373    Map<Integer, Class[]> map = new HashMap<>();
374
375    for( Map.Entry<Integer, Fields> entry : getValueFieldsMap().entrySet() )
376      map.put( entry.getKey(), entry.getValue().getTypesClasses() );
377
378    return map;
379    }
380
381  public Map<Integer, Class[]> getMaskedValueTypeMap()
382    {
383    if( areTypesIgnored() || getValueFieldsMap() == null )
384      return Collections.emptyMap();
385
386    Map<Integer, Fields> keyFieldsMap = getKeyFieldsMap();
387
388    if( keyFieldsMap == null || keyFieldsMap.isEmpty() )
389      return getValueTypeMap();
390
391    Map<Integer, Class[]> map = new HashMap<>();
392
393    for( Map.Entry<Integer, Fields> entry : getValueFieldsMap().entrySet() )
394      {
395      Integer ordinal = entry.getKey();
396      Fields valueFields = entry.getValue();
397      Fields keyFields = keyFieldsMap.get( ordinal );
398
399      map.put( ordinal, maskVoid( valueFields, keyFields ).getTypesClasses() );
400      }
401
402    return map;
403    }
404
405  public List<Integer> getOrdinals()
406    {
407    if( ordinals == null )
408      ordinals = Util.split( Integer.class, ",", getConf().get( "cascading.node.ordinals" ) );
409
410    return ordinals;
411    }
412
413  public Integer getFirstOrdinal()
414    {
415    if( getOrdinals().isEmpty() )
416      return null;
417
418    return Util.getFirst( getOrdinals() );
419    }
420
421  public Map<Integer, Fields> getKeyFieldsMap()
422    {
423    if( keyFieldsMap == null )
424      keyFieldsMap = getFields( getConf(), "cascading.node.key.fields" );
425
426    return keyFieldsMap;
427    }
428
429  public Map<Integer, Fields> getSortFieldsMap()
430    {
431    if( sortFields == null )
432      sortFieldsMap = getFields( getConf(), "cascading.node.sort.fields" );
433
434    return sortFieldsMap;
435    }
436
437  public Map<Integer, Fields> getValueFieldsMap()
438    {
439    if( valueFieldsMap == null )
440      valueFieldsMap = getFields( getConf(), "cascading.node.value.fields" );
441
442    return valueFieldsMap;
443    }
444
445  /** Must be called before {@link #getClassNameFor(int)} and {@link #getTokenFor(String)} methods. */
446  void initTokenMaps()
447    {
448    if( tokenClassesMap != null )
449      return;
450
451    tokenClassesMap = new HashMap<>();
452    classesTokensMap = new HashMap<>();
453
454    String tokenProperty = getSerializationTokens( getConf() );
455
456    if( tokenProperty != null )
457      {
458      tokenProperty = tokenProperty.replaceAll( "\\s", "" ); // allow for whitespace in token set
459
460      for( String pair : tokenProperty.split( "," ) )
461        {
462        String[] elements = pair.split( "=" );
463        addToken( null, Integer.parseInt( elements[ 0 ] ), elements[ 1 ] );
464        }
465      }
466
467    String serializationsString = getSerializations( getConf() );
468
469    LOG.debug( "using hadoop serializations from the job conf: {} ", serializationsString );
470
471    if( serializationsString == null )
472      return;
473
474    String[] serializations = serializationsString.split( "," );
475
476    for( String serializationName : serializations )
477      {
478      try
479        {
480        Class type = getConf().getClassByName( serializationName );
481
482        SerializationToken tokenAnnotation = (SerializationToken) type.getAnnotation( SerializationToken.class );
483
484        if( tokenAnnotation == null )
485          continue;
486
487        if( tokenAnnotation.tokens().length != tokenAnnotation.classNames().length )
488          throw new CascadingException( "serialization annotation tokens and classNames must be the same length" );
489
490        int[] tokens = tokenAnnotation.tokens();
491
492        for( int i = 0; i < tokens.length; i++ )
493          addToken( type, tokens[ i ], tokenAnnotation.classNames()[ i ] );
494        }
495      catch( ClassNotFoundException exception )
496        {
497        LOG.warn( "unable to load serialization class: {}", serializationName, exception );
498        }
499      }
500
501    tokensSize = tokenClassesMap.size();
502    }
503
504  private void addToken( Class type, int token, String className )
505    {
506    if( type != null && !type.getName().startsWith( "cascading." ) && token < 128 )
507      throw new CascadingException( "serialization annotation tokens may not be less than 128, was: " + token );
508
509    if( tokenClassesMap.containsKey( token ) )
510      {
511      if( type == null )
512        throw new IllegalStateException( "duplicate serialization token: " + token + " for class: " + className + " found in properties" );
513
514      throw new IllegalStateException( "duplicate serialization token: " + token + " for class: " + className + " on serialization: " + type.getName() );
515      }
516
517    if( classesTokensMap.containsKey( className ) )
518      {
519      if( type == null )
520        throw new IllegalStateException( "duplicate serialization classname: " + className + " for token: " + token + " found in properties " );
521
522      throw new IllegalStateException( "duplicate serialization classname: " + className + " for token: " + token + " on serialization: " + type.getName() );
523      }
524
525    LOG.debug( "adding serialization token: {}, for classname: {}", token, className );
526
527    tokenClassesMap.put( token, className );
528    classesTokensMap.put( className, token );
529    }
530
531  /**
532   * Returns the className for the given token.
533   *
534   * @param token of type int
535   * @return a String
536   */
537  final String getClassNameFor( int token )
538    {
539    initTokenMaps();
540
541    if( tokensSize == 0 )
542      return null;
543
544    return tokenClassesMap.get( token );
545    }
546
547  final long getTokensMapSize()
548    {
549    return tokensSize;
550    }
551
552  /**
553   * Returns the token for the given className.
554   *
555   * @param className of type String
556   * @return an Integer
557   */
558  final Integer getTokenFor( String className )
559    {
560    initTokenMaps();
561
562    if( tokensSize == 0 )
563      return null;
564
565    return classesTokensMap.get( className );
566    }
567
568  public Comparator getDefaultComparator()
569    {
570    return defaultComparator;
571    }
572
573  public Comparator getComparator( Class type )
574    {
575    Serialization serialization = getSerialization( type );
576
577    Comparator comparator = null;
578
579    if( serialization instanceof Comparison )
580      comparator = ( (Comparison) serialization ).getComparator( type );
581
582    if( comparator != null )
583      return comparator;
584
585    return defaultComparator;
586    }
587
588  Serialization getSerialization( String className )
589    {
590    return getSerialization( getClass( className ) );
591    }
592
593  Serialization getSerialization( Class type )
594    {
595    return getSerializationFactory().getSerialization( type );
596    }
597
598  Serializer getNewSerializer( Class type )
599    {
600    try
601      {
602      Serializer serializer = getSerializationFactory().getSerializer( type );
603
604      if( serializer == null )
605        throw new CascadingException( "unable to load serializer for: " + type.getName() + " from: " + getSerializationFactory().getClass().getName() );
606
607      return serializer;
608      }
609    catch( NullPointerException exception )
610      {
611      throw new CascadingException( "unable to load serializer for: " + type.getName() + " from: " + getSerializationFactory().getClass().getName() );
612      }
613    }
614
615  Deserializer getNewDeserializer( String className )
616    {
617    try
618      {
619      Deserializer deserializer = getSerializationFactory().getDeserializer( getClass( className ) );
620
621      if( deserializer == null )
622        throw new CascadingException( "unable to load deserializer for: " + className + " from: " + getSerializationFactory().getClass().getName() );
623
624      return deserializer;
625      }
626    catch( NullPointerException exception )
627      {
628      throw new CascadingException( "unable to load deserializer for: " + className + " from: " + getSerializationFactory().getClass().getName() );
629      }
630    }
631
632  KeyTupleDeserializer getKeyTupleDeserializer()
633    {
634    return new KeyTupleDeserializer( getElementReader() );
635    }
636
637  ValueTupleDeserializer getValueTupleDeserializer()
638    {
639    return new ValueTupleDeserializer( getElementReader() );
640    }
641
642  TuplePairDeserializer getTuplePairDeserializer()
643    {
644    return new TuplePairDeserializer( getElementReader() );
645    }
646
647  /**
648   * Method getElementReader returns the elementReader of this TupleSerialization object.
649   *
650   * @return the elementReader (type SerializationElementReader) of this TupleSerialization object.
651   */
652  public SerializationElementReader getElementReader()
653    {
654    return new SerializationElementReader( this );
655    }
656
657  TupleDeserializer getTupleDeserializer()
658    {
659    return new TupleDeserializer( getElementReader() );
660    }
661
662  private KeyTupleSerializer getKeyTupleSerializer()
663    {
664    return new KeyTupleSerializer( getElementWriter() );
665    }
666
667  private ValueTupleSerializer getValueTupleSerializer()
668    {
669    return new ValueTupleSerializer( getElementWriter() );
670    }
671
672  private TuplePairSerializer getTuplePairSerializer()
673    {
674    return new TuplePairSerializer( getElementWriter() );
675    }
676
677  KeyIndexTupleDeserializer getKeyIndexTupleDeserializer()
678    {
679    return new KeyIndexTupleDeserializer( getElementReader() );
680    }
681
682  ValueIndexTupleDeserializer getValueIndexTupleDeserializer()
683    {
684    return new ValueIndexTupleDeserializer( getElementReader() );
685    }
686
687  IndexTupleDeserializer getIndexTupleDeserializer()
688    {
689    return new IndexTupleDeserializer( getElementReader() );
690    }
691
692  /**
693   * Method getElementWriter returns the elementWriter of this TupleSerialization object.
694   *
695   * @return the elementWriter (type SerializationElementWriter) of this TupleSerialization object.
696   */
697  public SerializationElementWriter getElementWriter()
698    {
699    return new SerializationElementWriter( this );
700    }
701
702  private TupleSerializer getTupleSerializer()
703    {
704    return new TupleSerializer( getElementWriter() );
705    }
706
707  private KeyIndexTupleSerializer getKeyIndexTupleSerializer()
708    {
709    return new KeyIndexTupleSerializer( getElementWriter() );
710    }
711
712  private ValueIndexTupleSerializer getValueIndexTupleSerializer()
713    {
714    return new ValueIndexTupleSerializer( getElementWriter() );
715    }
716
717  private IndexTupleSerializer getIndexTupleSerializer()
718    {
719    return new IndexTupleSerializer( getElementWriter() );
720    }
721
722  /**
723   * Method accept implements {@link Serialization#accept(Class)}.
724   *
725   * @param c of type Class
726   * @return boolean
727   */
728  public boolean accept( Class c )
729    {
730    return Tuple.class == c ||
731      KeyTuple.class == c || ValueTuple.class == c ||
732      KeyIndexTuple.class == c || ValueIndexTuple.class == c ||
733      TuplePair.class == c || IndexTuple.class == c;
734    }
735
736  /**
737   * Method getDeserializer implements {@link Serialization#getDeserializer(Class)}.
738   *
739   * @param c of type Class
740   * @return Deserializer
741   */
742  public Deserializer getDeserializer( Class c )
743    {
744    if( c == Tuple.class )
745      return getTupleDeserializer();
746    else if( c == KeyTuple.class )
747      return getKeyTupleDeserializer();
748    else if( c == ValueTuple.class )
749      return getValueTupleDeserializer();
750    else if( c == KeyIndexTuple.class )
751      return getKeyIndexTupleDeserializer();
752    else if( c == ValueIndexTuple.class )
753      return getValueIndexTupleDeserializer();
754    else if( c == TuplePair.class )
755      return getTuplePairDeserializer();
756    else if( c == IndexTuple.class )
757      return getIndexTupleDeserializer();
758
759    throw new IllegalArgumentException( "unknown class, cannot deserialize: " + c.getName() );
760    }
761
762  /**
763   * Method getSerializer implements {@link Serialization#getSerializer(Class)}.
764   *
765   * @param c of type Class
766   * @return Serializer
767   */
768  public Serializer getSerializer( Class c )
769    {
770    if( c == Tuple.class )
771      return getTupleSerializer();
772    else if( c == KeyTuple.class )
773      return getKeyTupleSerializer();
774    else if( c == ValueTuple.class )
775      return getValueTupleSerializer();
776    else if( c == KeyIndexTuple.class )
777      return getKeyIndexTupleSerializer();
778    else if( c == ValueIndexTuple.class )
779      return getValueIndexTupleSerializer();
780    else if( c == TuplePair.class )
781      return getTuplePairSerializer();
782    else if( c == IndexTuple.class )
783      return getIndexTupleSerializer();
784
785    throw new IllegalArgumentException( "unknown class, cannot serialize: " + c.getName() );
786    }
787
788  public Class getClass( String className )
789    {
790    Class type = classCache.get( className );
791
792    if( type != null )
793      return type;
794
795    try
796      {
797      if( className.charAt( 0 ) == '[' )
798        type = Class.forName( className, true, Thread.currentThread().getContextClassLoader() );
799      else
800        type = Thread.currentThread().getContextClassLoader().loadClass( className );
801      }
802    catch( ClassNotFoundException exception )
803      {
804      throw new TupleException( "unable to load class named: " + className, exception );
805      }
806
807    classCache.put( className, type );
808
809    return type;
810    }
811
812  public static Map<Integer, Fields> getFields( Configuration conf, String property )
813    {
814    try
815      {
816      return HadoopUtil.getFields( conf, property );
817      }
818    catch( IOException exception )
819      {
820      LOG.warn( "unable to get fields for: " + property );
821
822      return Collections.emptyMap();
823      }
824    }
825
826  private static Fields maskVoid( Fields fields, Fields mask )
827    {
828    if( fields == null )
829      return null;
830
831    if( mask == null || !fields.hasTypes() || !mask.hasTypes() )
832      return fields;
833
834    Fields voidedKey = mask.applyTypes( Fields.size( mask.size(), Void.class ) );
835
836    fields = fields.applyTypes( voidedKey );
837
838    return fields;
839    }
840
841  public static class SerializationElementReader implements TupleInputStream.ElementReader
842    {
843    /** Field LOG */
844    private static final Logger LOG = LoggerFactory.getLogger( SerializationElementReader.class );
845
846    /** Field tupleSerialization */
847    private final TupleSerialization tupleSerialization;
848
849    /** Field deserializers */
850    final Map<String, Deserializer> deserializers = new HashMap<String, Deserializer>();
851
852    /**
853     * Constructor SerializationElementReader creates a new SerializationElementReader instance.
854     *
855     * @param tupleSerialization of type TupleSerialization
856     */
857    public SerializationElementReader( TupleSerialization tupleSerialization )
858      {
859      this.tupleSerialization = tupleSerialization;
860      }
861
862    public TupleSerialization getTupleSerialization()
863      {
864      return tupleSerialization;
865      }
866
867    public Object read( int token, DataInputStream inputStream ) throws IOException
868      {
869      String className = getClassNameFor( token, inputStream );
870      Deserializer deserializer = getDeserializerFor( inputStream, className );
871
872      Object foundObject = null;
873      Object object;
874
875      try
876        {
877        object = deserializer.deserialize( foundObject );
878        }
879      catch( IOException exception )
880        {
881        LOG.error( "failed deserializing token: " + token + " with classname: " + className, exception );
882
883        throw exception;
884        }
885
886      return object;
887      }
888
889    public Object read( Class type, DataInputStream inputStream ) throws IOException
890      {
891      String className = type.getName();
892      Deserializer deserializer = getDeserializerFor( inputStream, className );
893
894      Object foundObject = null;
895      Object object;
896
897      try
898        {
899        object = deserializer.deserialize( foundObject );
900        }
901      catch( IOException exception )
902        {
903        LOG.error( "failed deserializing: " + className, exception );
904
905        throw exception;
906        }
907
908      return object;
909      }
910
911    @Override
912    public Comparator getComparatorFor( int token, DataInputStream inputStream ) throws IOException
913      {
914      Class type = tupleSerialization.getClass( getClassNameFor( token, inputStream ) );
915
916      return tupleSerialization.getComparator( type );
917      }
918
919    private Deserializer getDeserializerFor( DataInputStream inputStream, String className ) throws IOException
920      {
921      Deserializer deserializer = deserializers.get( className );
922
923      if( deserializer == null )
924        {
925        deserializer = tupleSerialization.getNewDeserializer( className );
926        deserializer.open( inputStream );
927        deserializers.put( className, deserializer );
928        }
929
930      return deserializer;
931      }
932
933    public String getClassNameFor( int token, DataInputStream inputStream ) throws IOException
934      {
935      String className = tupleSerialization.getClassNameFor( token );
936
937      try
938        {
939        if( className == null )
940          className = WritableUtils.readString( inputStream );
941        }
942      catch( IOException exception )
943        {
944        LOG.error( "unable to resolve token: {}, to a valid classname, with token map of size: {}, rethrowing IOException", token, tupleSerialization.getTokensMapSize() );
945        throw exception;
946        }
947
948      return className;
949      }
950
951    public void close()
952      {
953      if( deserializers.size() == 0 )
954        return;
955
956      Collection<Deserializer> clone = new ArrayList<Deserializer>( deserializers.values() );
957
958      deserializers.clear();
959
960      for( Deserializer deserializer : clone )
961        {
962        try
963          {
964          deserializer.close();
965          }
966        catch( IOException exception )
967          {
968          // do nothing
969          }
970        }
971      }
972    }
973
974  public static class SerializationElementWriter implements TupleOutputStream.ElementWriter
975    {
976    /** Field LOG */
977    private static final Logger LOG = LoggerFactory.getLogger( SerializationElementWriter.class );
978
979    /** Field tupleSerialization */
980    private final TupleSerialization tupleSerialization;
981
982    /** Field serializers */
983    final Map<Class, Serializer> serializers = new HashMap<Class, Serializer>();
984
985    public SerializationElementWriter( TupleSerialization tupleSerialization )
986      {
987      this.tupleSerialization = tupleSerialization;
988      }
989
990    public TupleSerialization getTupleSerialization()
991      {
992      return tupleSerialization;
993      }
994
995    public void write( DataOutputStream outputStream, Object object ) throws IOException
996      {
997      Class<?> type = object.getClass();
998      String className = type.getName();
999      Integer token = tupleSerialization.getTokenFor( className );
1000
1001      if( token == null )
1002        {
1003        LOG.debug( "no serialization token found for classname: {}", className );
1004
1005        WritableUtils.writeVInt( outputStream, HadoopTupleOutputStream.WRITABLE_TOKEN ); // denotes to punt to hadoop serialization
1006        WritableUtils.writeString( outputStream, className );
1007        }
1008      else
1009        {
1010        WritableUtils.writeVInt( outputStream, token );
1011        }
1012
1013      Serializer serializer = getSerializer( outputStream, type );
1014
1015      try
1016        {
1017        serializer.serialize( object );
1018        }
1019      catch( IOException exception )
1020        {
1021        LOG.error( "failed serializing token: " + token + " with classname: " + className, exception );
1022
1023        throw exception;
1024        }
1025      }
1026
1027    private Serializer getSerializer( DataOutputStream outputStream, Class<?> type ) throws IOException
1028      {
1029      Serializer serializer = serializers.get( type );
1030
1031      if( serializer == null )
1032        {
1033        serializer = tupleSerialization.getNewSerializer( type );
1034        serializer.open( outputStream );
1035        serializers.put( type, serializer );
1036        }
1037
1038      return serializer;
1039      }
1040
1041    public void write( DataOutputStream outputStream, Class<?> type, Object object ) throws IOException
1042      {
1043      Serializer serializer = getSerializer( outputStream, type );
1044
1045      try
1046        {
1047        serializer.serialize( object );
1048        }
1049      catch( IOException exception )
1050        {
1051        LOG.error( "failed serializing type: " + type.getName(), exception );
1052
1053        throw exception;
1054        }
1055      }
1056
1057    public void close()
1058      {
1059      if( serializers.size() == 0 )
1060        return;
1061
1062      Collection<Serializer> clone = new ArrayList<Serializer>( serializers.values() );
1063
1064      serializers.clear();
1065
1066      for( Serializer serializer : clone )
1067        {
1068        try
1069          {
1070          serializer.close();
1071          }
1072        catch( IOException exception )
1073          {
1074          // do nothing
1075          }
1076        }
1077      }
1078    }
1079  }