001 /* 002 * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.tuple.hadoop; 022 023 import java.io.DataInputStream; 024 import java.io.DataOutputStream; 025 import java.io.IOException; 026 import java.util.ArrayList; 027 import java.util.Collection; 028 import java.util.Collections; 029 import java.util.Comparator; 030 import java.util.HashMap; 031 import java.util.LinkedList; 032 import java.util.Map; 033 034 import cascading.CascadingException; 035 import cascading.flow.FlowProcess; 036 import cascading.flow.FlowProps; 037 import cascading.tuple.Comparison; 038 import cascading.tuple.Tuple; 039 import cascading.tuple.TupleException; 040 import cascading.tuple.hadoop.io.HadoopTupleOutputStream; 041 import cascading.tuple.hadoop.io.IndexTupleDeserializer; 042 import cascading.tuple.hadoop.io.IndexTupleSerializer; 043 import cascading.tuple.hadoop.io.TupleDeserializer; 044 import cascading.tuple.hadoop.io.TuplePairDeserializer; 045 import cascading.tuple.hadoop.io.TuplePairSerializer; 046 import cascading.tuple.hadoop.io.TupleSerializer; 047 import cascading.tuple.io.IndexTuple; 048 import cascading.tuple.io.TupleInputStream; 049 import cascading.tuple.io.TupleOutputStream; 050 import cascading.tuple.io.TuplePair; 051 import cascading.util.Util; 052 import org.apache.hadoop.conf.Configuration; 053 import org.apache.hadoop.conf.Configured; 054 import org.apache.hadoop.io.WritableUtils; 055 import org.apache.hadoop.io.serializer.Deserializer; 056 import org.apache.hadoop.io.serializer.Serialization; 057 import org.apache.hadoop.io.serializer.SerializationFactory; 058 import org.apache.hadoop.io.serializer.Serializer; 059 import org.apache.hadoop.io.serializer.WritableSerialization; 060 import org.apache.hadoop.mapred.JobConf; 061 import org.apache.hadoop.util.ReflectionUtils; 062 import org.slf4j.Logger; 063 import org.slf4j.LoggerFactory; 064 065 import static cascading.tuple.hadoop.TupleSerializationProps.HADOOP_IO_SERIALIZATIONS; 066 067 /** 068 * Class TupleSerialization is an implementation of Hadoop's {@link Serialization} interface. 069 * <p/> 070 * Typically developers will not use this implementation directly as it is automatically added 071 * to any relevant MapReduce jobs via the {@link JobConf}. 072 * <p/> 073 * By default, all primitive types are natively handled, and {@link org.apache.hadoop.io.BytesWritable} 074 * has a pre-configured serialization token since byte arrays are not handled natively by {@link Tuple}. 075 * <p/> 076 * To add or manipulate Hadoop serializations or Cascading serializations tokens, see 077 * {@link TupleSerializationProps} for a fluent property builder class. 078 * <p/> 079 * By default this Serialization interface registers the class {@link org.apache.hadoop.io.ByteWritable} as 080 * token 127. 081 */ 082 @SerializationToken( 083 tokens = {127}, 084 classNames = {"org.apache.hadoop.io.BytesWritable"}) 085 public class TupleSerialization extends Configured implements Serialization 086 { 087 /** Field LOG */ 088 private static final Logger LOG = LoggerFactory.getLogger( TupleSerialization.class ); 089 090 /** Field defaultComparator * */ 091 private Comparator defaultComparator; 092 /** Field classCache */ 093 private final Map<String, Class> classCache = new HashMap<String, Class>(); 094 /** Field serializationFactory */ 095 private SerializationFactory serializationFactory; 096 097 /** Field tokenClassesMap */ 098 private HashMap<Integer, String> tokenClassesMap; 099 /** Field classesTokensMap */ 100 private HashMap<String, Integer> classesTokensMap; 101 /** Field tokenMapSize */ 102 private long tokensSize = 0; 103 104 /** 105 * Adds the given token and className pair as a serialization token property. During object serialization and deserialization, 106 * the given token will be used instead of the className when an instance of the className is encountered. 107 * <p/> 108 * This method has moved to {@link TupleSerializationProps#addSerializationToken(java.util.Map, int, String)}. 109 * 110 * @param properties of type Map 111 * @param token of type int 112 * @param className of type String 113 */ 114 @Deprecated 115 public static void addSerializationToken( Map<Object, Object> properties, int token, String className ) 116 { 117 TupleSerializationProps.addSerializationToken( properties, token, className ); 118 } 119 120 /** 121 * Returns the serialization tokens property. 122 * <p/> 123 * This method has moved to {@link TupleSerializationProps#getSerializationTokens(java.util.Map)}. 124 * 125 * @param properties of type Map 126 * @return returns a String 127 */ 128 @Deprecated 129 public static String getSerializationTokens( Map<Object, Object> properties ) 130 { 131 return TupleSerializationProps.getSerializationTokens( properties ); 132 } 133 134 static String getSerializationTokens( Configuration jobConf ) 135 { 136 return jobConf.get( TupleSerializationProps.SERIALIZATION_TOKENS ); 137 } 138 139 /** 140 * Adds the given className as a Hadoop IO serialization class. 141 * <p/> 142 * This method has moved to {@link TupleSerializationProps#addSerialization(java.util.Map, String)}. 143 * 144 * @param properties of type Map 145 * @param className of type String 146 */ 147 @Deprecated 148 public static void addSerialization( Map<Object, Object> properties, String className ) 149 { 150 TupleSerializationProps.addSerialization( properties, className ); 151 } 152 153 /** 154 * Adds this class as a Hadoop Serialization class. This method is safe to call redundantly. 155 * <p/> 156 * This method will guarantee {@link TupleSerialization} and {@link WritableSerialization} are 157 * first in the list, as both are required. 158 * 159 * @param jobConf of type JobConf 160 */ 161 public static void setSerializations( JobConf jobConf ) 162 { 163 String serializations = getSerializations( jobConf ); 164 165 LinkedList<String> list = new LinkedList<String>(); 166 167 if( serializations != null && !serializations.isEmpty() ) 168 Collections.addAll( list, serializations.split( "," ) ); 169 170 // required by MultiInputSplit 171 String writable = WritableSerialization.class.getName(); 172 String tuple = TupleSerialization.class.getName(); 173 174 list.remove( writable ); 175 list.remove( tuple ); 176 177 list.addFirst( writable ); 178 list.addFirst( tuple ); 179 180 // make writable last 181 jobConf.set( HADOOP_IO_SERIALIZATIONS, Util.join( list, "," ) ); 182 } 183 184 static String getSerializations( Configuration jobConf ) 185 { 186 return jobConf.get( HADOOP_IO_SERIALIZATIONS, null ); 187 } 188 189 public static Comparator getDefaultComparator( Comparator comparator, Configuration jobConf ) 190 { 191 String typeName = jobConf.get( FlowProps.DEFAULT_ELEMENT_COMPARATOR ); 192 193 if( Util.isEmpty( typeName ) ) 194 return null; 195 196 if( comparator == null ) 197 return createComparator( jobConf, typeName ); 198 199 if( comparator.getClass().getName().equals( typeName ) && !( comparator instanceof Configured ) ) 200 return comparator; 201 202 return createComparator( jobConf, typeName ); 203 } 204 205 public static Comparator getDefaultComparator( Configuration jobConf ) 206 { 207 String typeName = jobConf.get( FlowProps.DEFAULT_ELEMENT_COMPARATOR ); 208 209 if( Util.isEmpty( typeName ) ) 210 return null; 211 212 return createComparator( jobConf, typeName ); 213 } 214 215 private static Comparator createComparator( Configuration jobConf, String typeName ) 216 { 217 LOG.debug( "using default comparator: {}", typeName ); 218 219 try 220 { 221 Class<Comparator> type = (Class<Comparator>) TupleSerialization.class.getClassLoader().loadClass( typeName ); 222 223 return ReflectionUtils.newInstance( type, jobConf ); 224 } 225 catch( ClassNotFoundException exception ) 226 { 227 throw new CascadingException( "unable to load class: " + typeName, exception ); 228 } 229 } 230 231 /** Constructor TupleSerialization creates a new TupleSerialization instance. */ 232 public TupleSerialization() 233 { 234 } 235 236 public TupleSerialization( final FlowProcess<JobConf> flowProcess ) 237 { 238 super( new Configuration() 239 { 240 @Override 241 public String get( String name ) 242 { 243 return get( name, null ); 244 } 245 246 @Override 247 public String get( String name, String defaultValue ) 248 { 249 Object value = flowProcess.getProperty( name ); 250 return value == null ? defaultValue : String.valueOf( value ); 251 } 252 } ); 253 } 254 255 /** 256 * Constructor TupleSerialization creates a new TupleSerialization instance. 257 * 258 * @param conf of type Configuration 259 */ 260 public TupleSerialization( Configuration conf ) 261 { 262 super( conf ); 263 } 264 265 @Override 266 public void setConf( Configuration conf ) 267 { 268 super.setConf( conf ); 269 270 if( conf != null ) 271 defaultComparator = getDefaultComparator( conf ); 272 } 273 274 @Override 275 public Configuration getConf() 276 { 277 if( super.getConf() == null ) 278 setConf( new JobConf() ); 279 280 return super.getConf(); 281 } 282 283 SerializationFactory getSerializationFactory() 284 { 285 if( serializationFactory == null ) 286 serializationFactory = new SerializationFactory( getConf() ); 287 288 return serializationFactory; 289 } 290 291 /** Must be called before {@link #getClassNameFor(int)} and {@link #getTokenFor(String)} methods. */ 292 void initTokenMaps() 293 { 294 if( tokenClassesMap != null ) 295 return; 296 297 tokenClassesMap = new HashMap<Integer, String>(); 298 classesTokensMap = new HashMap<String, Integer>(); 299 300 String tokenProperty = getSerializationTokens( getConf() ); 301 302 if( tokenProperty != null ) 303 { 304 tokenProperty = tokenProperty.replaceAll( "\\s", "" ); // allow for whitespace in token set 305 306 for( String pair : tokenProperty.split( "," ) ) 307 { 308 String[] elements = pair.split( "=" ); 309 addToken( null, Integer.parseInt( elements[ 0 ] ), elements[ 1 ] ); 310 } 311 } 312 313 String serializationsString = getSerializations( getConf() ); 314 315 LOG.debug( "using hadoop serializations from the job conf: {} ", serializationsString ); 316 317 if( serializationsString == null ) 318 return; 319 320 String[] serializations = serializationsString.split( "," ); 321 322 for( String serializationName : serializations ) 323 { 324 try 325 { 326 Class type = getConf().getClassByName( serializationName ); 327 328 SerializationToken tokenAnnotation = (SerializationToken) type.getAnnotation( SerializationToken.class ); 329 330 if( tokenAnnotation == null ) 331 continue; 332 333 if( tokenAnnotation.tokens().length != tokenAnnotation.classNames().length ) 334 throw new CascadingException( "serialization annotation tokens and classNames must be the same length" ); 335 336 int[] tokens = tokenAnnotation.tokens(); 337 338 for( int i = 0; i < tokens.length; i++ ) 339 addToken( type, tokens[ i ], tokenAnnotation.classNames()[ i ] ); 340 } 341 catch( ClassNotFoundException exception ) 342 { 343 LOG.warn( "unable to load serialization class: {}", serializationName, exception ); 344 } 345 } 346 347 tokensSize = tokenClassesMap.size(); 348 } 349 350 private void addToken( Class type, int token, String className ) 351 { 352 if( type != null && !type.getName().startsWith( "cascading." ) && token < 128 ) 353 throw new CascadingException( "serialization annotation tokens may not be less than 128, was: " + token ); 354 355 if( tokenClassesMap.containsKey( token ) ) 356 { 357 if( type == null ) 358 throw new IllegalStateException( "duplicate serialization token: " + token + " for class: " + className + " found in properties" ); 359 360 throw new IllegalStateException( "duplicate serialization token: " + token + " for class: " + className + " on serialization: " + type.getName() ); 361 } 362 363 if( classesTokensMap.containsKey( className ) ) 364 { 365 if( type == null ) 366 throw new IllegalStateException( "duplicate serialization classname: " + className + " for token: " + token + " found in properties " ); 367 368 throw new IllegalStateException( "duplicate serialization classname: " + className + " for token: " + token + " on serialization: " + type.getName() ); 369 } 370 371 LOG.debug( "adding serialization token: {}, for classname: {}", token, className ); 372 373 tokenClassesMap.put( token, className ); 374 classesTokensMap.put( className, token ); 375 } 376 377 /** 378 * Returns the className for the given token. 379 * 380 * @param token of type int 381 * @return a String 382 */ 383 final String getClassNameFor( int token ) 384 { 385 if( tokensSize == 0 ) 386 return null; 387 388 return tokenClassesMap.get( token ); 389 } 390 391 final long getTokensMapSize() 392 { 393 return tokensSize; 394 } 395 396 /** 397 * Returns the token for the given className. 398 * 399 * @param className of type String 400 * @return an Integer 401 */ 402 final Integer getTokenFor( String className ) 403 { 404 if( tokensSize == 0 ) 405 return null; 406 407 return classesTokensMap.get( className ); 408 } 409 410 public Comparator getDefaultComparator() 411 { 412 return defaultComparator; 413 } 414 415 public Comparator getComparator( Class type ) 416 { 417 Serialization serialization = getSerialization( type ); 418 419 Comparator comparator = null; 420 421 if( serialization instanceof Comparison ) 422 comparator = ( (Comparison) serialization ).getComparator( type ); 423 424 if( comparator != null ) 425 return comparator; 426 427 return defaultComparator; 428 } 429 430 Serialization getSerialization( String className ) 431 { 432 return getSerialization( getClass( className ) ); 433 } 434 435 Serialization getSerialization( Class type ) 436 { 437 return getSerializationFactory().getSerialization( type ); 438 } 439 440 Serializer getNewSerializer( Class type ) 441 { 442 try 443 { 444 Serializer serializer = getSerializationFactory().getSerializer( type ); 445 446 if( serializer == null ) 447 throw new CascadingException( "unable to load serializer for: " + type.getName() + " from: " + getSerializationFactory().getClass().getName() ); 448 449 return serializer; 450 } 451 catch( NullPointerException exception ) 452 { 453 throw new CascadingException( "unable to load serializer for: " + type.getName() + " from: " + getSerializationFactory().getClass().getName() ); 454 } 455 } 456 457 Deserializer getNewDeserializer( String className ) 458 { 459 try 460 { 461 Deserializer deserializer = getSerializationFactory().getDeserializer( getClass( className ) ); 462 463 if( deserializer == null ) 464 throw new CascadingException( "unable to load deserializer for: " + className + " from: " + getSerializationFactory().getClass().getName() ); 465 466 return deserializer; 467 } 468 catch( NullPointerException exception ) 469 { 470 throw new CascadingException( "unable to load deserializer for: " + className + " from: " + getSerializationFactory().getClass().getName() ); 471 } 472 } 473 474 TuplePairDeserializer getTuplePairDeserializer() 475 { 476 return new TuplePairDeserializer( getElementReader() ); 477 } 478 479 /** 480 * Method getElementReader returns the elementReader of this TupleSerialization object. 481 * 482 * @return the elementReader (type SerializationElementReader) of this TupleSerialization object. 483 */ 484 public SerializationElementReader getElementReader() 485 { 486 return new SerializationElementReader( this ); 487 } 488 489 TupleDeserializer getTupleDeserializer() 490 { 491 return new TupleDeserializer( getElementReader() ); 492 } 493 494 private TuplePairSerializer getTuplePairSerializer() 495 { 496 return new TuplePairSerializer( getElementWriter() ); 497 } 498 499 IndexTupleDeserializer getIndexTupleDeserializer() 500 { 501 return new IndexTupleDeserializer( getElementReader() ); 502 } 503 504 /** 505 * Method getElementWriter returns the elementWriter of this TupleSerialization object. 506 * 507 * @return the elementWriter (type SerializationElementWriter) of this TupleSerialization object. 508 */ 509 public SerializationElementWriter getElementWriter() 510 { 511 return new SerializationElementWriter( this ); 512 } 513 514 private TupleSerializer getTupleSerializer() 515 { 516 return new TupleSerializer( getElementWriter() ); 517 } 518 519 private IndexTupleSerializer getIndexTupleSerializer() 520 { 521 return new IndexTupleSerializer( getElementWriter() ); 522 } 523 524 /** 525 * Method accept implements {@link Serialization#accept(Class)}. 526 * 527 * @param c of type Class 528 * @return boolean 529 */ 530 public boolean accept( Class c ) 531 { 532 return Tuple.class == c || TuplePair.class == c || IndexTuple.class == c; 533 } 534 535 /** 536 * Method getDeserializer implements {@link Serialization#getDeserializer(Class)}. 537 * 538 * @param c of type Class 539 * @return Deserializer 540 */ 541 public Deserializer getDeserializer( Class c ) 542 { 543 if( c == Tuple.class ) 544 return getTupleDeserializer(); 545 else if( c == TuplePair.class ) 546 return getTuplePairDeserializer(); 547 else if( c == IndexTuple.class ) 548 return getIndexTupleDeserializer(); 549 550 throw new IllegalArgumentException( "unknown class, cannot deserialize: " + c.getName() ); 551 } 552 553 /** 554 * Method getSerializer implements {@link Serialization#getSerializer(Class)}. 555 * 556 * @param c of type Class 557 * @return Serializer 558 */ 559 public Serializer getSerializer( Class c ) 560 { 561 if( c == Tuple.class ) 562 return getTupleSerializer(); 563 else if( c == TuplePair.class ) 564 return getTuplePairSerializer(); 565 else if( c == IndexTuple.class ) 566 return getIndexTupleSerializer(); 567 568 throw new IllegalArgumentException( "unknown class, cannot serialize: " + c.getName() ); 569 } 570 571 public Class getClass( String className ) 572 { 573 Class type = classCache.get( className ); 574 575 if( type != null ) 576 return type; 577 578 try 579 { 580 if( className.charAt( 0 ) == '[' ) 581 type = Class.forName( className, true, Thread.currentThread().getContextClassLoader() ); 582 else 583 type = Thread.currentThread().getContextClassLoader().loadClass( className ); 584 } 585 catch( ClassNotFoundException exception ) 586 { 587 throw new TupleException( "unable to load class named: " + className, exception ); 588 } 589 590 classCache.put( className, type ); 591 592 return type; 593 } 594 595 public static class SerializationElementReader implements TupleInputStream.ElementReader 596 { 597 /** Field LOG */ 598 private static final Logger LOG = LoggerFactory.getLogger( SerializationElementReader.class ); 599 600 /** Field tupleSerialization */ 601 private final TupleSerialization tupleSerialization; 602 603 /** Field deserializers */ 604 final Map<String, Deserializer> deserializers = new HashMap<String, Deserializer>(); 605 606 /** 607 * Constructor SerializationElementReader creates a new SerializationElementReader instance. 608 * 609 * @param tupleSerialization of type TupleSerialization 610 */ 611 public SerializationElementReader( TupleSerialization tupleSerialization ) 612 { 613 this.tupleSerialization = tupleSerialization; 614 615 tupleSerialization.initTokenMaps(); 616 } 617 618 public Object read( int token, DataInputStream inputStream ) throws IOException 619 { 620 String className = getClassNameFor( token, inputStream ); 621 Deserializer deserializer = getDeserializerFor( inputStream, className ); 622 623 Object foundObject = null; 624 Object object; 625 626 try 627 { 628 object = deserializer.deserialize( foundObject ); 629 } 630 catch( IOException exception ) 631 { 632 LOG.error( "failed deserializing token: " + token + " with classname: " + className, exception ); 633 634 throw exception; 635 } 636 637 return object; 638 } 639 640 @Override 641 public Comparator getComparatorFor( int token, DataInputStream inputStream ) throws IOException 642 { 643 Class type = tupleSerialization.getClass( getClassNameFor( token, inputStream ) ); 644 645 return tupleSerialization.getComparator( type ); 646 } 647 648 private Deserializer getDeserializerFor( DataInputStream inputStream, String className ) throws IOException 649 { 650 Deserializer deserializer = deserializers.get( className ); 651 652 if( deserializer == null ) 653 { 654 deserializer = tupleSerialization.getNewDeserializer( className ); 655 deserializer.open( inputStream ); 656 deserializers.put( className, deserializer ); 657 } 658 659 return deserializer; 660 } 661 662 public String getClassNameFor( int token, DataInputStream inputStream ) throws IOException 663 { 664 String className = tupleSerialization.getClassNameFor( token ); 665 666 try 667 { 668 if( className == null ) 669 className = WritableUtils.readString( inputStream ); 670 } 671 catch( IOException exception ) 672 { 673 LOG.error( "unable to resolve token: {}, to a valid classname, with token map of size: {}, rethrowing IOException", token, tupleSerialization.getTokensMapSize() ); 674 throw exception; 675 } 676 677 return className; 678 } 679 680 public void close() 681 { 682 if( deserializers.size() == 0 ) 683 return; 684 685 Collection<Deserializer> clone = new ArrayList<Deserializer>( deserializers.values() ); 686 687 deserializers.clear(); 688 689 for( Deserializer deserializer : clone ) 690 { 691 try 692 { 693 deserializer.close(); 694 } 695 catch( IOException exception ) 696 { 697 // do nothing 698 } 699 } 700 } 701 } 702 703 public static class SerializationElementWriter implements TupleOutputStream.ElementWriter 704 { 705 /** Field LOG */ 706 private static final Logger LOG = LoggerFactory.getLogger( SerializationElementWriter.class ); 707 708 /** Field tupleSerialization */ 709 private final TupleSerialization tupleSerialization; 710 711 /** Field serializers */ 712 final Map<Class, Serializer> serializers = new HashMap<Class, Serializer>(); 713 714 public SerializationElementWriter( TupleSerialization tupleSerialization ) 715 { 716 this.tupleSerialization = tupleSerialization; 717 718 tupleSerialization.initTokenMaps(); 719 } 720 721 public void write( DataOutputStream outputStream, Object object ) throws IOException 722 { 723 Class<?> type = object.getClass(); 724 String className = type.getName(); 725 Integer token = tupleSerialization.getTokenFor( className ); 726 727 if( token == null ) 728 { 729 LOG.debug( "no serialization token found for classname: {}", className ); 730 731 WritableUtils.writeVInt( outputStream, HadoopTupleOutputStream.WRITABLE_TOKEN ); // denotes to punt to hadoop serialization 732 WritableUtils.writeString( outputStream, className ); 733 } 734 else 735 { 736 WritableUtils.writeVInt( outputStream, token ); 737 } 738 739 Serializer serializer = serializers.get( type ); 740 741 if( serializer == null ) 742 { 743 serializer = tupleSerialization.getNewSerializer( type ); 744 serializer.open( outputStream ); 745 serializers.put( type, serializer ); 746 } 747 748 try 749 { 750 serializer.serialize( object ); 751 } 752 catch( IOException exception ) 753 { 754 LOG.error( "failed serializing token: " + token + " with classname: " + className, exception ); 755 756 throw exception; 757 } 758 } 759 760 public void close() 761 { 762 if( serializers.size() == 0 ) 763 return; 764 765 Collection<Serializer> clone = new ArrayList<Serializer>( serializers.values() ); 766 767 serializers.clear(); 768 769 for( Serializer serializer : clone ) 770 { 771 try 772 { 773 serializer.close(); 774 } 775 catch( IOException exception ) 776 { 777 // do nothing 778 } 779 } 780 } 781 } 782 }