001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.tuple.hadoop;
022    
023    import java.io.DataInputStream;
024    import java.io.DataOutputStream;
025    import java.io.IOException;
026    import java.util.ArrayList;
027    import java.util.Collection;
028    import java.util.Collections;
029    import java.util.Comparator;
030    import java.util.HashMap;
031    import java.util.LinkedList;
032    import java.util.Map;
033    
034    import cascading.CascadingException;
035    import cascading.flow.FlowProcess;
036    import cascading.flow.FlowProps;
037    import cascading.tuple.Comparison;
038    import cascading.tuple.Tuple;
039    import cascading.tuple.TupleException;
040    import cascading.tuple.hadoop.io.HadoopTupleOutputStream;
041    import cascading.tuple.hadoop.io.IndexTupleDeserializer;
042    import cascading.tuple.hadoop.io.IndexTupleSerializer;
043    import cascading.tuple.hadoop.io.TupleDeserializer;
044    import cascading.tuple.hadoop.io.TuplePairDeserializer;
045    import cascading.tuple.hadoop.io.TuplePairSerializer;
046    import cascading.tuple.hadoop.io.TupleSerializer;
047    import cascading.tuple.io.IndexTuple;
048    import cascading.tuple.io.TupleInputStream;
049    import cascading.tuple.io.TupleOutputStream;
050    import cascading.tuple.io.TuplePair;
051    import cascading.util.Util;
052    import org.apache.hadoop.conf.Configuration;
053    import org.apache.hadoop.conf.Configured;
054    import org.apache.hadoop.io.WritableUtils;
055    import org.apache.hadoop.io.serializer.Deserializer;
056    import org.apache.hadoop.io.serializer.Serialization;
057    import org.apache.hadoop.io.serializer.SerializationFactory;
058    import org.apache.hadoop.io.serializer.Serializer;
059    import org.apache.hadoop.io.serializer.WritableSerialization;
060    import org.apache.hadoop.mapred.JobConf;
061    import org.apache.hadoop.util.ReflectionUtils;
062    import org.slf4j.Logger;
063    import org.slf4j.LoggerFactory;
064    
065    import static cascading.tuple.hadoop.TupleSerializationProps.HADOOP_IO_SERIALIZATIONS;
066    
067    /**
068     * Class TupleSerialization is an implementation of Hadoop's {@link Serialization} interface.
069     * <p/>
070     * Typically developers will not use this implementation directly as it is automatically added
071     * to any relevant MapReduce jobs via the {@link JobConf}.
072     * <p/>
073     * By default, all primitive types are natively handled, and {@link org.apache.hadoop.io.BytesWritable}
074     * has a pre-configured serialization token since byte arrays are not handled natively by {@link Tuple}.
075     * <p/>
076     * To add or manipulate Hadoop serializations or Cascading serializations tokens, see
077     * {@link TupleSerializationProps} for a fluent property builder class.
078     * <p/>
079     * By default this Serialization interface registers the class {@link org.apache.hadoop.io.ByteWritable} as
080     * token 127.
081     */
082    @SerializationToken(
083      tokens = {127},
084      classNames = {"org.apache.hadoop.io.BytesWritable"})
085    public class TupleSerialization extends Configured implements Serialization
086      {
087      /** Field LOG */
088      private static final Logger LOG = LoggerFactory.getLogger( TupleSerialization.class );
089    
090      /** Field defaultComparator * */
091      private Comparator defaultComparator;
092      /** Field classCache */
093      private final Map<String, Class> classCache = new HashMap<String, Class>();
094      /** Field serializationFactory */
095      private SerializationFactory serializationFactory;
096    
097      /** Field tokenClassesMap */
098      private HashMap<Integer, String> tokenClassesMap;
099      /** Field classesTokensMap */
100      private HashMap<String, Integer> classesTokensMap;
101      /** Field tokenMapSize */
102      private long tokensSize = 0;
103    
104      /**
105       * Adds the given token and className pair as a serialization token property. During object serialization and deserialization,
106       * the given token will be used instead of the className when an instance of the className is encountered.
107       * <p/>
108       * This method has moved to {@link TupleSerializationProps#addSerializationToken(java.util.Map, int, String)}.
109       *
110       * @param properties of type Map
111       * @param token      of type int
112       * @param className  of type String
113       */
114      @Deprecated
115      public static void addSerializationToken( Map<Object, Object> properties, int token, String className )
116        {
117        TupleSerializationProps.addSerializationToken( properties, token, className );
118        }
119    
120      /**
121       * Returns the serialization tokens property.
122       * <p/>
123       * This method has moved to {@link TupleSerializationProps#getSerializationTokens(java.util.Map)}.
124       *
125       * @param properties of type Map
126       * @return returns a String
127       */
128      @Deprecated
129      public static String getSerializationTokens( Map<Object, Object> properties )
130        {
131        return TupleSerializationProps.getSerializationTokens( properties );
132        }
133    
134      static String getSerializationTokens( Configuration jobConf )
135        {
136        return jobConf.get( TupleSerializationProps.SERIALIZATION_TOKENS );
137        }
138    
139      /**
140       * Adds the given className as a Hadoop IO serialization class.
141       * <p/>
142       * This method has moved to {@link TupleSerializationProps#addSerialization(java.util.Map, String)}.
143       *
144       * @param properties of type Map
145       * @param className  of type String
146       */
147      @Deprecated
148      public static void addSerialization( Map<Object, Object> properties, String className )
149        {
150        TupleSerializationProps.addSerialization( properties, className );
151        }
152    
153      /**
154       * Adds this class as a Hadoop Serialization class. This method is safe to call redundantly.
155       * <p/>
156       * This method will guarantee {@link TupleSerialization} and {@link WritableSerialization} are
157       * first in the list, as both are required.
158       *
159       * @param jobConf of type JobConf
160       */
161      public static void setSerializations( JobConf jobConf )
162        {
163        String serializations = getSerializations( jobConf );
164    
165        LinkedList<String> list = new LinkedList<String>();
166    
167        if( serializations != null && !serializations.isEmpty() )
168          Collections.addAll( list, serializations.split( "," ) );
169    
170        // required by MultiInputSplit
171        String writable = WritableSerialization.class.getName();
172        String tuple = TupleSerialization.class.getName();
173    
174        list.remove( writable );
175        list.remove( tuple );
176    
177        list.addFirst( writable );
178        list.addFirst( tuple );
179    
180        // make writable last
181        jobConf.set( HADOOP_IO_SERIALIZATIONS, Util.join( list, "," ) );
182        }
183    
184      static String getSerializations( Configuration jobConf )
185        {
186        return jobConf.get( HADOOP_IO_SERIALIZATIONS, null );
187        }
188    
189      public static Comparator getDefaultComparator( Comparator comparator, Configuration jobConf )
190        {
191        String typeName = jobConf.get( FlowProps.DEFAULT_ELEMENT_COMPARATOR );
192    
193        if( Util.isEmpty( typeName ) )
194          return null;
195    
196        if( comparator == null )
197          return createComparator( jobConf, typeName );
198    
199        if( comparator.getClass().getName().equals( typeName ) && !( comparator instanceof Configured ) )
200          return comparator;
201    
202        return createComparator( jobConf, typeName );
203        }
204    
205      public static Comparator getDefaultComparator( Configuration jobConf )
206        {
207        String typeName = jobConf.get( FlowProps.DEFAULT_ELEMENT_COMPARATOR );
208    
209        if( Util.isEmpty( typeName ) )
210          return null;
211    
212        return createComparator( jobConf, typeName );
213        }
214    
215      private static Comparator createComparator( Configuration jobConf, String typeName )
216        {
217        LOG.debug( "using default comparator: {}", typeName );
218    
219        try
220          {
221          Class<Comparator> type = (Class<Comparator>) TupleSerialization.class.getClassLoader().loadClass( typeName );
222    
223          return ReflectionUtils.newInstance( type, jobConf );
224          }
225        catch( ClassNotFoundException exception )
226          {
227          throw new CascadingException( "unable to load class: " + typeName, exception );
228          }
229        }
230    
231      /** Constructor TupleSerialization creates a new TupleSerialization instance. */
232      public TupleSerialization()
233        {
234        }
235    
236      public TupleSerialization( final FlowProcess<JobConf> flowProcess )
237        {
238        super( new Configuration()
239        {
240        @Override
241        public String get( String name )
242          {
243          return get( name, null );
244          }
245    
246        @Override
247        public String get( String name, String defaultValue )
248          {
249          Object value = flowProcess.getProperty( name );
250          return value == null ? defaultValue : String.valueOf( value );
251          }
252        } );
253        }
254    
255      /**
256       * Constructor TupleSerialization creates a new TupleSerialization instance.
257       *
258       * @param conf of type Configuration
259       */
260      public TupleSerialization( Configuration conf )
261        {
262        super( conf );
263        }
264    
265      @Override
266      public void setConf( Configuration conf )
267        {
268        super.setConf( conf );
269    
270        if( conf != null )
271          defaultComparator = getDefaultComparator( conf );
272        }
273    
274      @Override
275      public Configuration getConf()
276        {
277        if( super.getConf() == null )
278          setConf( new JobConf() );
279    
280        return super.getConf();
281        }
282    
283      SerializationFactory getSerializationFactory()
284        {
285        if( serializationFactory == null )
286          serializationFactory = new SerializationFactory( getConf() );
287    
288        return serializationFactory;
289        }
290    
291      /** Must be called before {@link #getClassNameFor(int)} and {@link #getTokenFor(String)} methods. */
292      void initTokenMaps()
293        {
294        if( tokenClassesMap != null )
295          return;
296    
297        tokenClassesMap = new HashMap<Integer, String>();
298        classesTokensMap = new HashMap<String, Integer>();
299    
300        String tokenProperty = getSerializationTokens( getConf() );
301    
302        if( tokenProperty != null )
303          {
304          tokenProperty = tokenProperty.replaceAll( "\\s", "" ); // allow for whitespace in token set
305    
306          for( String pair : tokenProperty.split( "," ) )
307            {
308            String[] elements = pair.split( "=" );
309            addToken( null, Integer.parseInt( elements[ 0 ] ), elements[ 1 ] );
310            }
311          }
312    
313        String serializationsString = getSerializations( getConf() );
314    
315        LOG.debug( "using hadoop serializations from the job conf: {} ", serializationsString );
316    
317        if( serializationsString == null )
318          return;
319    
320        String[] serializations = serializationsString.split( "," );
321    
322        for( String serializationName : serializations )
323          {
324          try
325            {
326            Class type = getConf().getClassByName( serializationName );
327    
328            SerializationToken tokenAnnotation = (SerializationToken) type.getAnnotation( SerializationToken.class );
329    
330            if( tokenAnnotation == null )
331              continue;
332    
333            if( tokenAnnotation.tokens().length != tokenAnnotation.classNames().length )
334              throw new CascadingException( "serialization annotation tokens and classNames must be the same length" );
335    
336            int[] tokens = tokenAnnotation.tokens();
337    
338            for( int i = 0; i < tokens.length; i++ )
339              addToken( type, tokens[ i ], tokenAnnotation.classNames()[ i ] );
340            }
341          catch( ClassNotFoundException exception )
342            {
343            LOG.warn( "unable to load serialization class: {}", serializationName, exception );
344            }
345          }
346    
347        tokensSize = tokenClassesMap.size();
348        }
349    
350      private void addToken( Class type, int token, String className )
351        {
352        if( type != null && !type.getName().startsWith( "cascading." ) && token < 128 )
353          throw new CascadingException( "serialization annotation tokens may not be less than 128, was: " + token );
354    
355        if( tokenClassesMap.containsKey( token ) )
356          {
357          if( type == null )
358            throw new IllegalStateException( "duplicate serialization token: " + token + " for class: " + className + " found in properties" );
359    
360          throw new IllegalStateException( "duplicate serialization token: " + token + " for class: " + className + " on serialization: " + type.getName() );
361          }
362    
363        if( classesTokensMap.containsKey( className ) )
364          {
365          if( type == null )
366            throw new IllegalStateException( "duplicate serialization classname: " + className + " for token: " + token + " found in properties " );
367    
368          throw new IllegalStateException( "duplicate serialization classname: " + className + " for token: " + token + " on serialization: " + type.getName() );
369          }
370    
371        LOG.debug( "adding serialization token: {}, for classname: {}", token, className );
372    
373        tokenClassesMap.put( token, className );
374        classesTokensMap.put( className, token );
375        }
376    
377      /**
378       * Returns the className for the given token.
379       *
380       * @param token of type int
381       * @return a String
382       */
383      final String getClassNameFor( int token )
384        {
385        if( tokensSize == 0 )
386          return null;
387    
388        return tokenClassesMap.get( token );
389        }
390    
391      final long getTokensMapSize()
392        {
393        return tokensSize;
394        }
395    
396      /**
397       * Returns the token for the given className.
398       *
399       * @param className of type String
400       * @return an Integer
401       */
402      final Integer getTokenFor( String className )
403        {
404        if( tokensSize == 0 )
405          return null;
406    
407        return classesTokensMap.get( className );
408        }
409    
410      public Comparator getDefaultComparator()
411        {
412        return defaultComparator;
413        }
414    
415      public Comparator getComparator( Class type )
416        {
417        Serialization serialization = getSerialization( type );
418    
419        Comparator comparator = null;
420    
421        if( serialization instanceof Comparison )
422          comparator = ( (Comparison) serialization ).getComparator( type );
423    
424        if( comparator != null )
425          return comparator;
426    
427        return defaultComparator;
428        }
429    
430      Serialization getSerialization( String className )
431        {
432        return getSerialization( getClass( className ) );
433        }
434    
435      Serialization getSerialization( Class type )
436        {
437        return getSerializationFactory().getSerialization( type );
438        }
439    
440      Serializer getNewSerializer( Class type )
441        {
442        try
443          {
444          Serializer serializer = getSerializationFactory().getSerializer( type );
445    
446          if( serializer == null )
447            throw new CascadingException( "unable to load serializer for: " + type.getName() + " from: " + getSerializationFactory().getClass().getName() );
448    
449          return serializer;
450          }
451        catch( NullPointerException exception )
452          {
453          throw new CascadingException( "unable to load serializer for: " + type.getName() + " from: " + getSerializationFactory().getClass().getName() );
454          }
455        }
456    
457      Deserializer getNewDeserializer( String className )
458        {
459        try
460          {
461          Deserializer deserializer = getSerializationFactory().getDeserializer( getClass( className ) );
462    
463          if( deserializer == null )
464            throw new CascadingException( "unable to load deserializer for: " + className + " from: " + getSerializationFactory().getClass().getName() );
465    
466          return deserializer;
467          }
468        catch( NullPointerException exception )
469          {
470          throw new CascadingException( "unable to load deserializer for: " + className + " from: " + getSerializationFactory().getClass().getName() );
471          }
472        }
473    
474      TuplePairDeserializer getTuplePairDeserializer()
475        {
476        return new TuplePairDeserializer( getElementReader() );
477        }
478    
479      /**
480       * Method getElementReader returns the elementReader of this TupleSerialization object.
481       *
482       * @return the elementReader (type SerializationElementReader) of this TupleSerialization object.
483       */
484      public SerializationElementReader getElementReader()
485        {
486        return new SerializationElementReader( this );
487        }
488    
489      TupleDeserializer getTupleDeserializer()
490        {
491        return new TupleDeserializer( getElementReader() );
492        }
493    
494      private TuplePairSerializer getTuplePairSerializer()
495        {
496        return new TuplePairSerializer( getElementWriter() );
497        }
498    
499      IndexTupleDeserializer getIndexTupleDeserializer()
500        {
501        return new IndexTupleDeserializer( getElementReader() );
502        }
503    
504      /**
505       * Method getElementWriter returns the elementWriter of this TupleSerialization object.
506       *
507       * @return the elementWriter (type SerializationElementWriter) of this TupleSerialization object.
508       */
509      public SerializationElementWriter getElementWriter()
510        {
511        return new SerializationElementWriter( this );
512        }
513    
514      private TupleSerializer getTupleSerializer()
515        {
516        return new TupleSerializer( getElementWriter() );
517        }
518    
519      private IndexTupleSerializer getIndexTupleSerializer()
520        {
521        return new IndexTupleSerializer( getElementWriter() );
522        }
523    
524      /**
525       * Method accept implements {@link Serialization#accept(Class)}.
526       *
527       * @param c of type Class
528       * @return boolean
529       */
530      public boolean accept( Class c )
531        {
532        return Tuple.class == c || TuplePair.class == c || IndexTuple.class == c;
533        }
534    
535      /**
536       * Method getDeserializer implements {@link Serialization#getDeserializer(Class)}.
537       *
538       * @param c of type Class
539       * @return Deserializer
540       */
541      public Deserializer getDeserializer( Class c )
542        {
543        if( c == Tuple.class )
544          return getTupleDeserializer();
545        else if( c == TuplePair.class )
546          return getTuplePairDeserializer();
547        else if( c == IndexTuple.class )
548          return getIndexTupleDeserializer();
549    
550        throw new IllegalArgumentException( "unknown class, cannot deserialize: " + c.getName() );
551        }
552    
553      /**
554       * Method getSerializer implements {@link Serialization#getSerializer(Class)}.
555       *
556       * @param c of type Class
557       * @return Serializer
558       */
559      public Serializer getSerializer( Class c )
560        {
561        if( c == Tuple.class )
562          return getTupleSerializer();
563        else if( c == TuplePair.class )
564          return getTuplePairSerializer();
565        else if( c == IndexTuple.class )
566          return getIndexTupleSerializer();
567    
568        throw new IllegalArgumentException( "unknown class, cannot serialize: " + c.getName() );
569        }
570    
571      public Class getClass( String className )
572        {
573        Class type = classCache.get( className );
574    
575        if( type != null )
576          return type;
577    
578        try
579          {
580          if( className.charAt( 0 ) == '[' )
581            type = Class.forName( className, true, Thread.currentThread().getContextClassLoader() );
582          else
583            type = Thread.currentThread().getContextClassLoader().loadClass( className );
584          }
585        catch( ClassNotFoundException exception )
586          {
587          throw new TupleException( "unable to load class named: " + className, exception );
588          }
589    
590        classCache.put( className, type );
591    
592        return type;
593        }
594    
595      public static class SerializationElementReader implements TupleInputStream.ElementReader
596        {
597        /** Field LOG */
598        private static final Logger LOG = LoggerFactory.getLogger( SerializationElementReader.class );
599    
600        /** Field tupleSerialization */
601        private final TupleSerialization tupleSerialization;
602    
603        /** Field deserializers */
604        final Map<String, Deserializer> deserializers = new HashMap<String, Deserializer>();
605    
606        /**
607         * Constructor SerializationElementReader creates a new SerializationElementReader instance.
608         *
609         * @param tupleSerialization of type TupleSerialization
610         */
611        public SerializationElementReader( TupleSerialization tupleSerialization )
612          {
613          this.tupleSerialization = tupleSerialization;
614    
615          tupleSerialization.initTokenMaps();
616          }
617    
618        public Object read( int token, DataInputStream inputStream ) throws IOException
619          {
620          String className = getClassNameFor( token, inputStream );
621          Deserializer deserializer = getDeserializerFor( inputStream, className );
622    
623          Object foundObject = null;
624          Object object;
625    
626          try
627            {
628            object = deserializer.deserialize( foundObject );
629            }
630          catch( IOException exception )
631            {
632            LOG.error( "failed deserializing token: " + token + " with classname: " + className, exception );
633    
634            throw exception;
635            }
636    
637          return object;
638          }
639    
640        @Override
641        public Comparator getComparatorFor( int token, DataInputStream inputStream ) throws IOException
642          {
643          Class type = tupleSerialization.getClass( getClassNameFor( token, inputStream ) );
644    
645          return tupleSerialization.getComparator( type );
646          }
647    
648        private Deserializer getDeserializerFor( DataInputStream inputStream, String className ) throws IOException
649          {
650          Deserializer deserializer = deserializers.get( className );
651    
652          if( deserializer == null )
653            {
654            deserializer = tupleSerialization.getNewDeserializer( className );
655            deserializer.open( inputStream );
656            deserializers.put( className, deserializer );
657            }
658    
659          return deserializer;
660          }
661    
662        public String getClassNameFor( int token, DataInputStream inputStream ) throws IOException
663          {
664          String className = tupleSerialization.getClassNameFor( token );
665    
666          try
667            {
668            if( className == null )
669              className = WritableUtils.readString( inputStream );
670            }
671          catch( IOException exception )
672            {
673            LOG.error( "unable to resolve token: {}, to a valid classname, with token map of size: {}, rethrowing IOException", token, tupleSerialization.getTokensMapSize() );
674            throw exception;
675            }
676    
677          return className;
678          }
679    
680        public void close()
681          {
682          if( deserializers.size() == 0 )
683            return;
684    
685          Collection<Deserializer> clone = new ArrayList<Deserializer>( deserializers.values() );
686    
687          deserializers.clear();
688    
689          for( Deserializer deserializer : clone )
690            {
691            try
692              {
693              deserializer.close();
694              }
695            catch( IOException exception )
696              {
697              // do nothing
698              }
699            }
700          }
701        }
702    
703      public static class SerializationElementWriter implements TupleOutputStream.ElementWriter
704        {
705        /** Field LOG */
706        private static final Logger LOG = LoggerFactory.getLogger( SerializationElementWriter.class );
707    
708        /** Field tupleSerialization */
709        private final TupleSerialization tupleSerialization;
710    
711        /** Field serializers */
712        final Map<Class, Serializer> serializers = new HashMap<Class, Serializer>();
713    
714        public SerializationElementWriter( TupleSerialization tupleSerialization )
715          {
716          this.tupleSerialization = tupleSerialization;
717    
718          tupleSerialization.initTokenMaps();
719          }
720    
721        public void write( DataOutputStream outputStream, Object object ) throws IOException
722          {
723          Class<?> type = object.getClass();
724          String className = type.getName();
725          Integer token = tupleSerialization.getTokenFor( className );
726    
727          if( token == null )
728            {
729            LOG.debug( "no serialization token found for classname: {}", className );
730    
731            WritableUtils.writeVInt( outputStream, HadoopTupleOutputStream.WRITABLE_TOKEN ); // denotes to punt to hadoop serialization
732            WritableUtils.writeString( outputStream, className );
733            }
734          else
735            {
736            WritableUtils.writeVInt( outputStream, token );
737            }
738    
739          Serializer serializer = serializers.get( type );
740    
741          if( serializer == null )
742            {
743            serializer = tupleSerialization.getNewSerializer( type );
744            serializer.open( outputStream );
745            serializers.put( type, serializer );
746            }
747    
748          try
749            {
750            serializer.serialize( object );
751            }
752          catch( IOException exception )
753            {
754            LOG.error( "failed serializing token: " + token + " with classname: " + className, exception );
755    
756            throw exception;
757            }
758          }
759    
760        public void close()
761          {
762          if( serializers.size() == 0 )
763            return;
764    
765          Collection<Serializer> clone = new ArrayList<Serializer>( serializers.values() );
766    
767          serializers.clear();
768    
769          for( Serializer serializer : clone )
770            {
771            try
772              {
773              serializer.close();
774              }
775            catch( IOException exception )
776              {
777              // do nothing
778              }
779            }
780          }
781        }
782      }