001/* 002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.tuple.hadoop; 022 023import java.io.DataInputStream; 024import java.io.DataOutputStream; 025import java.io.IOException; 026import java.util.ArrayList; 027import java.util.Collection; 028import java.util.Collections; 029import java.util.Comparator; 030import java.util.HashMap; 031import java.util.LinkedList; 032import java.util.List; 033import java.util.Map; 034 035import cascading.CascadingException; 036import cascading.flow.FlowProcess; 037import cascading.flow.FlowProps; 038import cascading.flow.hadoop.util.HadoopUtil; 039import cascading.tuple.Comparison; 040import cascading.tuple.Fields; 041import cascading.tuple.Tuple; 042import cascading.tuple.TupleException; 043import cascading.tuple.hadoop.io.HadoopTupleOutputStream; 044import cascading.tuple.hadoop.io.IndexTupleDeserializer; 045import cascading.tuple.hadoop.io.IndexTupleSerializer; 046import cascading.tuple.hadoop.io.KeyIndexTupleDeserializer; 047import cascading.tuple.hadoop.io.KeyIndexTupleSerializer; 048import cascading.tuple.hadoop.io.KeyTupleDeserializer; 049import cascading.tuple.hadoop.io.KeyTupleSerializer; 050import cascading.tuple.hadoop.io.TupleDeserializer; 051import cascading.tuple.hadoop.io.TuplePairDeserializer; 052import cascading.tuple.hadoop.io.TuplePairSerializer; 053import cascading.tuple.hadoop.io.TupleSerializer; 054import cascading.tuple.hadoop.io.ValueIndexTupleDeserializer; 055import cascading.tuple.hadoop.io.ValueIndexTupleSerializer; 056import cascading.tuple.hadoop.io.ValueTupleDeserializer; 057import cascading.tuple.hadoop.io.ValueTupleSerializer; 058import cascading.tuple.io.IndexTuple; 059import cascading.tuple.io.KeyIndexTuple; 060import cascading.tuple.io.KeyTuple; 061import cascading.tuple.io.TupleInputStream; 062import cascading.tuple.io.TupleOutputStream; 063import cascading.tuple.io.TuplePair; 064import cascading.tuple.io.ValueIndexTuple; 065import cascading.tuple.io.ValueTuple; 066import cascading.util.Util; 067import org.apache.hadoop.conf.Configuration; 068import org.apache.hadoop.conf.Configured; 069import org.apache.hadoop.io.WritableUtils; 070import org.apache.hadoop.io.serializer.Deserializer; 071import org.apache.hadoop.io.serializer.Serialization; 072import org.apache.hadoop.io.serializer.SerializationFactory; 073import org.apache.hadoop.io.serializer.Serializer; 074import org.apache.hadoop.io.serializer.WritableSerialization; 075import org.apache.hadoop.util.ReflectionUtils; 076import org.slf4j.Logger; 077import org.slf4j.LoggerFactory; 078 079import static cascading.tuple.hadoop.TupleSerializationProps.HADOOP_IO_SERIALIZATIONS; 080 081/** 082 * Class TupleSerialization is an implementation of Hadoop's {@link Serialization} interface. 083 * <p/> 084 * Typically developers will not use this implementation directly as it is automatically added 085 * to any relevant MapReduce jobs via the {@link org.apache.hadoop.conf.Configuration}. 086 * <p/> 087 * By default, all primitive types are natively handled, and {@link org.apache.hadoop.io.BytesWritable} 088 * has a pre-configured serialization token since byte arrays are not handled natively by {@link Tuple}. 089 * <p/> 090 * To add or manipulate Hadoop serializations or Cascading serializations tokens, see 091 * {@link TupleSerializationProps} for a fluent property builder class. 092 * <p/> 093 * By default this Serialization interface registers the class {@link org.apache.hadoop.io.ByteWritable} as 094 * token 127. 095 */ 096@SerializationToken( 097 tokens = {127}, 098 classNames = {"org.apache.hadoop.io.BytesWritable"}) 099public class TupleSerialization extends Configured implements Serialization 100 { 101 102 /** Field LOG */ 103 private static final Logger LOG = LoggerFactory.getLogger( TupleSerialization.class ); 104 105 /** Field defaultComparator * */ 106 private Comparator defaultComparator; 107 /** Field classCache */ 108 private final Map<String, Class> classCache = new HashMap<String, Class>(); 109 /** Field serializationFactory */ 110 private SerializationFactory serializationFactory; 111 112 /** Field tokenClassesMap */ 113 private HashMap<Integer, String> tokenClassesMap; 114 /** Field classesTokensMap */ 115 private HashMap<String, Integer> classesTokensMap; 116 /** Field tokenMapSize */ 117 private long tokensSize = 0; 118 119 List<Integer> ordinals; 120 121 Map<Integer, Fields> keyFieldsMap; 122 Map<Integer, Fields> sortFieldsMap; 123 Map<Integer, Fields> valueFieldsMap; 124 125 Fields keyFields; 126 Fields sortFields; 127 Fields valueFields; 128 129 Boolean typesRequired; // for testing purposes 130 Boolean typesIgnored; // for testing purposes 131 132 static String getSerializationTokens( Configuration jobConf ) 133 { 134 return jobConf.get( TupleSerializationProps.SERIALIZATION_TOKENS ); 135 } 136 137 /** 138 * Adds this class as a Hadoop Serialization class. This method is safe to call redundantly. 139 * <p/> 140 * This method will guarantee {@link TupleSerialization} and {@link WritableSerialization} are 141 * first in the list, as both are required. 142 * 143 * @param jobConf of type JobConf 144 */ 145 public static void setSerializations( Configuration jobConf ) 146 { 147 String serializations = getSerializations( jobConf ); 148 149 LinkedList<String> list = new LinkedList<String>(); 150 151 if( serializations != null && !serializations.isEmpty() ) 152 Collections.addAll( list, serializations.split( "," ) ); 153 154 // required by MultiInputSplit 155 String writable = WritableSerialization.class.getName(); 156 String tuple = TupleSerialization.class.getName(); 157 158 list.remove( writable ); 159 list.remove( tuple ); 160 161 list.addFirst( writable ); 162 list.addFirst( tuple ); 163 164 // make writable last 165 jobConf.set( HADOOP_IO_SERIALIZATIONS, Util.join( list, "," ) ); 166 } 167 168 static String getSerializations( Configuration jobConf ) 169 { 170 return jobConf.get( HADOOP_IO_SERIALIZATIONS, null ); 171 } 172 173 public static Comparator getDefaultComparator( Comparator comparator, Configuration jobConf ) 174 { 175 String typeName = jobConf.get( FlowProps.DEFAULT_ELEMENT_COMPARATOR ); 176 177 if( Util.isEmpty( typeName ) ) 178 return null; 179 180 if( comparator == null ) 181 return createComparator( jobConf, typeName ); 182 183 if( comparator.getClass().getName().equals( typeName ) && !( comparator instanceof Configured ) ) 184 return comparator; 185 186 return createComparator( jobConf, typeName ); 187 } 188 189 public static Comparator getDefaultComparator( Configuration jobConf ) 190 { 191 String typeName = jobConf.get( FlowProps.DEFAULT_ELEMENT_COMPARATOR ); 192 193 if( Util.isEmpty( typeName ) ) 194 return null; 195 196 return createComparator( jobConf, typeName ); 197 } 198 199 private static Comparator createComparator( Configuration jobConf, String typeName ) 200 { 201 LOG.debug( "using default comparator: {}", typeName ); 202 203 try 204 { 205 Class<Comparator> type = (Class<Comparator>) TupleSerialization.class.getClassLoader().loadClass( typeName ); 206 207 return ReflectionUtils.newInstance( type, jobConf ); 208 } 209 catch( ClassNotFoundException exception ) 210 { 211 throw new CascadingException( "unable to load class: " + typeName, exception ); 212 } 213 } 214 215 /** Constructor TupleSerialization creates a new TupleSerialization instance. */ 216 public TupleSerialization() 217 { 218 } 219 220 public TupleSerialization( final FlowProcess<? extends Configuration> flowProcess ) 221 { 222 super( new Configuration() 223 { 224 @Override 225 public String get( String name ) 226 { 227 return get( name, null ); 228 } 229 230 @Override 231 public String get( String name, String defaultValue ) 232 { 233 Object value = flowProcess.getProperty( name ); 234 return value == null ? defaultValue : String.valueOf( value ); 235 } 236 } ); 237 } 238 239 /** 240 * Constructor TupleSerialization creates a new TupleSerialization instance. 241 * 242 * @param conf of type Configuration 243 */ 244 public TupleSerialization( Configuration conf ) 245 { 246 super( conf ); 247 } 248 249 @Override 250 public void setConf( Configuration conf ) 251 { 252 super.setConf( conf ); 253 254 if( conf != null ) 255 defaultComparator = getDefaultComparator( conf ); 256 } 257 258 @Override 259 public Configuration getConf() 260 { 261 if( super.getConf() == null ) 262 setConf( new Configuration() ); 263 264 return super.getConf(); 265 } 266 267 public boolean areTypesIgnored() 268 { 269 if( typesIgnored == null ) 270 { 271 typesIgnored = getConf().getBoolean( TupleSerializationProps.IGNORE_TYPES, false ); 272 273 if( typesIgnored ) 274 LOG.info( "types are being ignored during serialization" ); 275 } 276 277 return typesIgnored; 278 } 279 280 public boolean areTypesRequired() 281 { 282 if( typesRequired == null ) 283 { 284 typesRequired = getConf().getBoolean( TupleSerializationProps.REQUIRE_TYPES, false ); 285 286 if( typesRequired ) 287 LOG.info( "types are being enforced during serialization" ); 288 } 289 290 return typesRequired; 291 } 292 293 SerializationFactory getSerializationFactory() 294 { 295 if( serializationFactory == null ) 296 serializationFactory = new SerializationFactory( getConf() ); 297 298 return serializationFactory; 299 } 300 301 public Fields getKeyFields() 302 { 303 if( keyFields == null && getFirstOrdinal() != null ) 304 keyFields = getKeyFieldsMap().get( getFirstOrdinal() ); 305 306 return keyFields; 307 } 308 309 public Class[] getKeyTypes() 310 { 311 Fields fields = getKeyFields(); 312 313 return getTypesFor( fields ); 314 } 315 316 public Class[] getTypesFor( Fields fields ) 317 { 318 if( areTypesIgnored() || fields == null ) 319 return null; 320 321 return fields.getTypesClasses(); 322 } 323 324 public Fields getSortFields() 325 { 326 if( sortFields == null && getFirstOrdinal() != null ) 327 sortFields = getSortFieldsMap().get( getFirstOrdinal() ); 328 329 return sortFields; 330 } 331 332 public Class[] getSortTypes() 333 { 334 return getTypesFor( getSortFields() ); 335 } 336 337 public Fields getValueFields() 338 { 339 if( valueFields == null && getFirstOrdinal() != null ) 340 valueFields = getValueFieldsMap().get( getFirstOrdinal() ); 341 342 return valueFields; 343 } 344 345 public Fields getMaskedValueFields() 346 { 347 return maskVoid( getValueFields(), getKeyFields() ); 348 } 349 350 public Class[] getValueTypes() 351 { 352 return getTypesFor( getValueFields() ); 353 } 354 355 public Map<Integer, Class[]> getKeyTypeMap() 356 { 357 if( areTypesIgnored() || getKeyFieldsMap() == null ) 358 return Collections.emptyMap(); 359 360 Map<Integer, Class[]> map = new HashMap<>(); 361 362 for( Map.Entry<Integer, Fields> entry : getKeyFieldsMap().entrySet() ) 363 map.put( entry.getKey(), entry.getValue().getTypesClasses() ); 364 365 return map; 366 } 367 368 public Map<Integer, Class[]> getValueTypeMap() 369 { 370 if( areTypesIgnored() || getValueFieldsMap() == null ) 371 return Collections.emptyMap(); 372 373 Map<Integer, Class[]> map = new HashMap<>(); 374 375 for( Map.Entry<Integer, Fields> entry : getValueFieldsMap().entrySet() ) 376 map.put( entry.getKey(), entry.getValue().getTypesClasses() ); 377 378 return map; 379 } 380 381 public Map<Integer, Class[]> getMaskedValueTypeMap() 382 { 383 if( areTypesIgnored() || getValueFieldsMap() == null ) 384 return Collections.emptyMap(); 385 386 Map<Integer, Fields> keyFieldsMap = getKeyFieldsMap(); 387 388 if( keyFieldsMap == null || keyFieldsMap.isEmpty() ) 389 return getValueTypeMap(); 390 391 Map<Integer, Class[]> map = new HashMap<>(); 392 393 for( Map.Entry<Integer, Fields> entry : getValueFieldsMap().entrySet() ) 394 { 395 Integer ordinal = entry.getKey(); 396 Fields valueFields = entry.getValue(); 397 Fields keyFields = keyFieldsMap.get( ordinal ); 398 399 map.put( ordinal, maskVoid( valueFields, keyFields ).getTypesClasses() ); 400 } 401 402 return map; 403 } 404 405 public List<Integer> getOrdinals() 406 { 407 if( ordinals == null ) 408 ordinals = Util.split( Integer.class, ",", getConf().get( "cascading.node.ordinals" ) ); 409 410 return ordinals; 411 } 412 413 public Integer getFirstOrdinal() 414 { 415 if( getOrdinals().isEmpty() ) 416 return null; 417 418 return Util.getFirst( getOrdinals() ); 419 } 420 421 public Map<Integer, Fields> getKeyFieldsMap() 422 { 423 if( keyFieldsMap == null ) 424 keyFieldsMap = getFields( getConf(), "cascading.node.key.fields" ); 425 426 return keyFieldsMap; 427 } 428 429 public Map<Integer, Fields> getSortFieldsMap() 430 { 431 if( sortFields == null ) 432 sortFieldsMap = getFields( getConf(), "cascading.node.sort.fields" ); 433 434 return sortFieldsMap; 435 } 436 437 public Map<Integer, Fields> getValueFieldsMap() 438 { 439 if( valueFieldsMap == null ) 440 valueFieldsMap = getFields( getConf(), "cascading.node.value.fields" ); 441 442 return valueFieldsMap; 443 } 444 445 /** Must be called before {@link #getClassNameFor(int)} and {@link #getTokenFor(String)} methods. */ 446 void initTokenMaps() 447 { 448 if( tokenClassesMap != null ) 449 return; 450 451 tokenClassesMap = new HashMap<>(); 452 classesTokensMap = new HashMap<>(); 453 454 String tokenProperty = getSerializationTokens( getConf() ); 455 456 if( tokenProperty != null ) 457 { 458 tokenProperty = tokenProperty.replaceAll( "\\s", "" ); // allow for whitespace in token set 459 460 for( String pair : tokenProperty.split( "," ) ) 461 { 462 String[] elements = pair.split( "=" ); 463 addToken( null, Integer.parseInt( elements[ 0 ] ), elements[ 1 ] ); 464 } 465 } 466 467 String serializationsString = getSerializations( getConf() ); 468 469 LOG.debug( "using hadoop serializations from the job conf: {} ", serializationsString ); 470 471 if( serializationsString == null ) 472 return; 473 474 String[] serializations = serializationsString.split( "," ); 475 476 for( String serializationName : serializations ) 477 { 478 try 479 { 480 Class type = getConf().getClassByName( serializationName ); 481 482 SerializationToken tokenAnnotation = (SerializationToken) type.getAnnotation( SerializationToken.class ); 483 484 if( tokenAnnotation == null ) 485 continue; 486 487 if( tokenAnnotation.tokens().length != tokenAnnotation.classNames().length ) 488 throw new CascadingException( "serialization annotation tokens and classNames must be the same length" ); 489 490 int[] tokens = tokenAnnotation.tokens(); 491 492 for( int i = 0; i < tokens.length; i++ ) 493 addToken( type, tokens[ i ], tokenAnnotation.classNames()[ i ] ); 494 } 495 catch( ClassNotFoundException exception ) 496 { 497 LOG.warn( "unable to load serialization class: {}", serializationName, exception ); 498 } 499 } 500 501 tokensSize = tokenClassesMap.size(); 502 } 503 504 private void addToken( Class type, int token, String className ) 505 { 506 if( type != null && !type.getName().startsWith( "cascading." ) && token < 128 ) 507 throw new CascadingException( "serialization annotation tokens may not be less than 128, was: " + token ); 508 509 if( tokenClassesMap.containsKey( token ) ) 510 { 511 if( type == null ) 512 throw new IllegalStateException( "duplicate serialization token: " + token + " for class: " + className + " found in properties" ); 513 514 throw new IllegalStateException( "duplicate serialization token: " + token + " for class: " + className + " on serialization: " + type.getName() ); 515 } 516 517 if( classesTokensMap.containsKey( className ) ) 518 { 519 if( type == null ) 520 throw new IllegalStateException( "duplicate serialization classname: " + className + " for token: " + token + " found in properties " ); 521 522 throw new IllegalStateException( "duplicate serialization classname: " + className + " for token: " + token + " on serialization: " + type.getName() ); 523 } 524 525 LOG.debug( "adding serialization token: {}, for classname: {}", token, className ); 526 527 tokenClassesMap.put( token, className ); 528 classesTokensMap.put( className, token ); 529 } 530 531 /** 532 * Returns the className for the given token. 533 * 534 * @param token of type int 535 * @return a String 536 */ 537 final String getClassNameFor( int token ) 538 { 539 initTokenMaps(); 540 541 if( tokensSize == 0 ) 542 return null; 543 544 return tokenClassesMap.get( token ); 545 } 546 547 final long getTokensMapSize() 548 { 549 return tokensSize; 550 } 551 552 /** 553 * Returns the token for the given className. 554 * 555 * @param className of type String 556 * @return an Integer 557 */ 558 final Integer getTokenFor( String className ) 559 { 560 initTokenMaps(); 561 562 if( tokensSize == 0 ) 563 return null; 564 565 return classesTokensMap.get( className ); 566 } 567 568 public Comparator getDefaultComparator() 569 { 570 return defaultComparator; 571 } 572 573 public Comparator getComparator( Class type ) 574 { 575 Serialization serialization = getSerialization( type ); 576 577 Comparator comparator = null; 578 579 if( serialization instanceof Comparison ) 580 comparator = ( (Comparison) serialization ).getComparator( type ); 581 582 if( comparator != null ) 583 return comparator; 584 585 return defaultComparator; 586 } 587 588 Serialization getSerialization( String className ) 589 { 590 return getSerialization( getClass( className ) ); 591 } 592 593 Serialization getSerialization( Class type ) 594 { 595 return getSerializationFactory().getSerialization( type ); 596 } 597 598 Serializer getNewSerializer( Class type ) 599 { 600 try 601 { 602 Serializer serializer = getSerializationFactory().getSerializer( type ); 603 604 if( serializer == null ) 605 throw new CascadingException( "unable to load serializer for: " + type.getName() + " from: " + getSerializationFactory().getClass().getName() ); 606 607 return serializer; 608 } 609 catch( NullPointerException exception ) 610 { 611 throw new CascadingException( "unable to load serializer for: " + type.getName() + " from: " + getSerializationFactory().getClass().getName() ); 612 } 613 } 614 615 Deserializer getNewDeserializer( String className ) 616 { 617 try 618 { 619 Deserializer deserializer = getSerializationFactory().getDeserializer( getClass( className ) ); 620 621 if( deserializer == null ) 622 throw new CascadingException( "unable to load deserializer for: " + className + " from: " + getSerializationFactory().getClass().getName() ); 623 624 return deserializer; 625 } 626 catch( NullPointerException exception ) 627 { 628 throw new CascadingException( "unable to load deserializer for: " + className + " from: " + getSerializationFactory().getClass().getName() ); 629 } 630 } 631 632 KeyTupleDeserializer getKeyTupleDeserializer() 633 { 634 return new KeyTupleDeserializer( getElementReader() ); 635 } 636 637 ValueTupleDeserializer getValueTupleDeserializer() 638 { 639 return new ValueTupleDeserializer( getElementReader() ); 640 } 641 642 TuplePairDeserializer getTuplePairDeserializer() 643 { 644 return new TuplePairDeserializer( getElementReader() ); 645 } 646 647 /** 648 * Method getElementReader returns the elementReader of this TupleSerialization object. 649 * 650 * @return the elementReader (type SerializationElementReader) of this TupleSerialization object. 651 */ 652 public SerializationElementReader getElementReader() 653 { 654 return new SerializationElementReader( this ); 655 } 656 657 TupleDeserializer getTupleDeserializer() 658 { 659 return new TupleDeserializer( getElementReader() ); 660 } 661 662 private KeyTupleSerializer getKeyTupleSerializer() 663 { 664 return new KeyTupleSerializer( getElementWriter() ); 665 } 666 667 private ValueTupleSerializer getValueTupleSerializer() 668 { 669 return new ValueTupleSerializer( getElementWriter() ); 670 } 671 672 private TuplePairSerializer getTuplePairSerializer() 673 { 674 return new TuplePairSerializer( getElementWriter() ); 675 } 676 677 KeyIndexTupleDeserializer getKeyIndexTupleDeserializer() 678 { 679 return new KeyIndexTupleDeserializer( getElementReader() ); 680 } 681 682 ValueIndexTupleDeserializer getValueIndexTupleDeserializer() 683 { 684 return new ValueIndexTupleDeserializer( getElementReader() ); 685 } 686 687 IndexTupleDeserializer getIndexTupleDeserializer() 688 { 689 return new IndexTupleDeserializer( getElementReader() ); 690 } 691 692 /** 693 * Method getElementWriter returns the elementWriter of this TupleSerialization object. 694 * 695 * @return the elementWriter (type SerializationElementWriter) of this TupleSerialization object. 696 */ 697 public SerializationElementWriter getElementWriter() 698 { 699 return new SerializationElementWriter( this ); 700 } 701 702 private TupleSerializer getTupleSerializer() 703 { 704 return new TupleSerializer( getElementWriter() ); 705 } 706 707 private KeyIndexTupleSerializer getKeyIndexTupleSerializer() 708 { 709 return new KeyIndexTupleSerializer( getElementWriter() ); 710 } 711 712 private ValueIndexTupleSerializer getValueIndexTupleSerializer() 713 { 714 return new ValueIndexTupleSerializer( getElementWriter() ); 715 } 716 717 private IndexTupleSerializer getIndexTupleSerializer() 718 { 719 return new IndexTupleSerializer( getElementWriter() ); 720 } 721 722 /** 723 * Method accept implements {@link Serialization#accept(Class)}. 724 * 725 * @param c of type Class 726 * @return boolean 727 */ 728 public boolean accept( Class c ) 729 { 730 return Tuple.class == c || 731 KeyTuple.class == c || ValueTuple.class == c || 732 KeyIndexTuple.class == c || ValueIndexTuple.class == c || 733 TuplePair.class == c || IndexTuple.class == c; 734 } 735 736 /** 737 * Method getDeserializer implements {@link Serialization#getDeserializer(Class)}. 738 * 739 * @param c of type Class 740 * @return Deserializer 741 */ 742 public Deserializer getDeserializer( Class c ) 743 { 744 if( c == Tuple.class ) 745 return getTupleDeserializer(); 746 else if( c == KeyTuple.class ) 747 return getKeyTupleDeserializer(); 748 else if( c == ValueTuple.class ) 749 return getValueTupleDeserializer(); 750 else if( c == KeyIndexTuple.class ) 751 return getKeyIndexTupleDeserializer(); 752 else if( c == ValueIndexTuple.class ) 753 return getValueIndexTupleDeserializer(); 754 else if( c == TuplePair.class ) 755 return getTuplePairDeserializer(); 756 else if( c == IndexTuple.class ) 757 return getIndexTupleDeserializer(); 758 759 throw new IllegalArgumentException( "unknown class, cannot deserialize: " + c.getName() ); 760 } 761 762 /** 763 * Method getSerializer implements {@link Serialization#getSerializer(Class)}. 764 * 765 * @param c of type Class 766 * @return Serializer 767 */ 768 public Serializer getSerializer( Class c ) 769 { 770 if( c == Tuple.class ) 771 return getTupleSerializer(); 772 else if( c == KeyTuple.class ) 773 return getKeyTupleSerializer(); 774 else if( c == ValueTuple.class ) 775 return getValueTupleSerializer(); 776 else if( c == KeyIndexTuple.class ) 777 return getKeyIndexTupleSerializer(); 778 else if( c == ValueIndexTuple.class ) 779 return getValueIndexTupleSerializer(); 780 else if( c == TuplePair.class ) 781 return getTuplePairSerializer(); 782 else if( c == IndexTuple.class ) 783 return getIndexTupleSerializer(); 784 785 throw new IllegalArgumentException( "unknown class, cannot serialize: " + c.getName() ); 786 } 787 788 public Class getClass( String className ) 789 { 790 Class type = classCache.get( className ); 791 792 if( type != null ) 793 return type; 794 795 try 796 { 797 if( className.charAt( 0 ) == '[' ) 798 type = Class.forName( className, true, Thread.currentThread().getContextClassLoader() ); 799 else 800 type = Thread.currentThread().getContextClassLoader().loadClass( className ); 801 } 802 catch( ClassNotFoundException exception ) 803 { 804 throw new TupleException( "unable to load class named: " + className, exception ); 805 } 806 807 classCache.put( className, type ); 808 809 return type; 810 } 811 812 public static Map<Integer, Fields> getFields( Configuration conf, String property ) 813 { 814 try 815 { 816 return HadoopUtil.getFields( conf, property ); 817 } 818 catch( IOException exception ) 819 { 820 LOG.warn( "unable to get fields for: " + property ); 821 822 return Collections.emptyMap(); 823 } 824 } 825 826 private static Fields maskVoid( Fields fields, Fields mask ) 827 { 828 if( fields == null ) 829 return null; 830 831 if( mask == null || !fields.hasTypes() || !mask.hasTypes() ) 832 return fields; 833 834 Fields voidedKey = mask.applyTypes( Fields.size( mask.size(), Void.class ) ); 835 836 fields = fields.applyTypes( voidedKey ); 837 838 return fields; 839 } 840 841 public static class SerializationElementReader implements TupleInputStream.ElementReader 842 { 843 /** Field LOG */ 844 private static final Logger LOG = LoggerFactory.getLogger( SerializationElementReader.class ); 845 846 /** Field tupleSerialization */ 847 private final TupleSerialization tupleSerialization; 848 849 /** Field deserializers */ 850 final Map<String, Deserializer> deserializers = new HashMap<String, Deserializer>(); 851 852 /** 853 * Constructor SerializationElementReader creates a new SerializationElementReader instance. 854 * 855 * @param tupleSerialization of type TupleSerialization 856 */ 857 public SerializationElementReader( TupleSerialization tupleSerialization ) 858 { 859 this.tupleSerialization = tupleSerialization; 860 } 861 862 public TupleSerialization getTupleSerialization() 863 { 864 return tupleSerialization; 865 } 866 867 public Object read( int token, DataInputStream inputStream ) throws IOException 868 { 869 String className = getClassNameFor( token, inputStream ); 870 Deserializer deserializer = getDeserializerFor( inputStream, className ); 871 872 Object foundObject = null; 873 Object object; 874 875 try 876 { 877 object = deserializer.deserialize( foundObject ); 878 } 879 catch( IOException exception ) 880 { 881 LOG.error( "failed deserializing token: " + token + " with classname: " + className, exception ); 882 883 throw exception; 884 } 885 886 return object; 887 } 888 889 public Object read( Class type, DataInputStream inputStream ) throws IOException 890 { 891 String className = type.getName(); 892 Deserializer deserializer = getDeserializerFor( inputStream, className ); 893 894 Object foundObject = null; 895 Object object; 896 897 try 898 { 899 object = deserializer.deserialize( foundObject ); 900 } 901 catch( IOException exception ) 902 { 903 LOG.error( "failed deserializing: " + className, exception ); 904 905 throw exception; 906 } 907 908 return object; 909 } 910 911 @Override 912 public Comparator getComparatorFor( int token, DataInputStream inputStream ) throws IOException 913 { 914 Class type = tupleSerialization.getClass( getClassNameFor( token, inputStream ) ); 915 916 return tupleSerialization.getComparator( type ); 917 } 918 919 private Deserializer getDeserializerFor( DataInputStream inputStream, String className ) throws IOException 920 { 921 Deserializer deserializer = deserializers.get( className ); 922 923 if( deserializer == null ) 924 { 925 deserializer = tupleSerialization.getNewDeserializer( className ); 926 deserializer.open( inputStream ); 927 deserializers.put( className, deserializer ); 928 } 929 930 return deserializer; 931 } 932 933 public String getClassNameFor( int token, DataInputStream inputStream ) throws IOException 934 { 935 String className = tupleSerialization.getClassNameFor( token ); 936 937 try 938 { 939 if( className == null ) 940 className = WritableUtils.readString( inputStream ); 941 } 942 catch( IOException exception ) 943 { 944 LOG.error( "unable to resolve token: {}, to a valid classname, with token map of size: {}, rethrowing IOException", token, tupleSerialization.getTokensMapSize() ); 945 throw exception; 946 } 947 948 return className; 949 } 950 951 public void close() 952 { 953 if( deserializers.size() == 0 ) 954 return; 955 956 Collection<Deserializer> clone = new ArrayList<Deserializer>( deserializers.values() ); 957 958 deserializers.clear(); 959 960 for( Deserializer deserializer : clone ) 961 { 962 try 963 { 964 deserializer.close(); 965 } 966 catch( IOException exception ) 967 { 968 // do nothing 969 } 970 } 971 } 972 } 973 974 public static class SerializationElementWriter implements TupleOutputStream.ElementWriter 975 { 976 /** Field LOG */ 977 private static final Logger LOG = LoggerFactory.getLogger( SerializationElementWriter.class ); 978 979 /** Field tupleSerialization */ 980 private final TupleSerialization tupleSerialization; 981 982 /** Field serializers */ 983 final Map<Class, Serializer> serializers = new HashMap<Class, Serializer>(); 984 985 public SerializationElementWriter( TupleSerialization tupleSerialization ) 986 { 987 this.tupleSerialization = tupleSerialization; 988 } 989 990 public TupleSerialization getTupleSerialization() 991 { 992 return tupleSerialization; 993 } 994 995 public void write( DataOutputStream outputStream, Object object ) throws IOException 996 { 997 Class<?> type = object.getClass(); 998 String className = type.getName(); 999 Integer token = tupleSerialization.getTokenFor( className ); 1000 1001 if( token == null ) 1002 { 1003 LOG.debug( "no serialization token found for classname: {}", className ); 1004 1005 WritableUtils.writeVInt( outputStream, HadoopTupleOutputStream.WRITABLE_TOKEN ); // denotes to punt to hadoop serialization 1006 WritableUtils.writeString( outputStream, className ); 1007 } 1008 else 1009 { 1010 WritableUtils.writeVInt( outputStream, token ); 1011 } 1012 1013 Serializer serializer = getSerializer( outputStream, type ); 1014 1015 try 1016 { 1017 serializer.serialize( object ); 1018 } 1019 catch( IOException exception ) 1020 { 1021 LOG.error( "failed serializing token: " + token + " with classname: " + className, exception ); 1022 1023 throw exception; 1024 } 1025 } 1026 1027 private Serializer getSerializer( DataOutputStream outputStream, Class<?> type ) throws IOException 1028 { 1029 Serializer serializer = serializers.get( type ); 1030 1031 if( serializer == null ) 1032 { 1033 serializer = tupleSerialization.getNewSerializer( type ); 1034 serializer.open( outputStream ); 1035 serializers.put( type, serializer ); 1036 } 1037 1038 return serializer; 1039 } 1040 1041 public void write( DataOutputStream outputStream, Class<?> type, Object object ) throws IOException 1042 { 1043 Serializer serializer = getSerializer( outputStream, type ); 1044 1045 try 1046 { 1047 serializer.serialize( object ); 1048 } 1049 catch( IOException exception ) 1050 { 1051 LOG.error( "failed serializing type: " + type.getName(), exception ); 1052 1053 throw exception; 1054 } 1055 } 1056 1057 public void close() 1058 { 1059 if( serializers.size() == 0 ) 1060 return; 1061 1062 Collection<Serializer> clone = new ArrayList<Serializer>( serializers.values() ); 1063 1064 serializers.clear(); 1065 1066 for( Serializer serializer : clone ) 1067 { 1068 try 1069 { 1070 serializer.close(); 1071 } 1072 catch( IOException exception ) 1073 { 1074 // do nothing 1075 } 1076 } 1077 } 1078 } 1079 }