001/* 002 * Copyright (c) 2016-2017 Chris K Wensel <chris@wensel.net>. All Rights Reserved. 003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 004 * 005 * Project and contact information: http://www.cascading.org/ 006 * 007 * This file is part of the Cascading project. 008 * 009 * Licensed under the Apache License, Version 2.0 (the "License"); 010 * you may not use this file except in compliance with the License. 011 * You may obtain a copy of the License at 012 * 013 * http://www.apache.org/licenses/LICENSE-2.0 014 * 015 * Unless required by applicable law or agreed to in writing, software 016 * distributed under the License is distributed on an "AS IS" BASIS, 017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 018 * See the License for the specific language governing permissions and 019 * limitations under the License. 020 */ 021 022package cascading.tuple.hadoop; 023 024import java.io.DataInputStream; 025import java.io.DataOutputStream; 026import java.io.IOException; 027import java.util.ArrayList; 028import java.util.Collection; 029import java.util.Collections; 030import java.util.Comparator; 031import java.util.HashMap; 032import java.util.LinkedList; 033import java.util.List; 034import java.util.Map; 035 036import cascading.CascadingException; 037import cascading.flow.FlowProcess; 038import cascading.flow.FlowProps; 039import cascading.flow.hadoop.util.HadoopUtil; 040import cascading.tuple.Comparison; 041import cascading.tuple.Fields; 042import cascading.tuple.Tuple; 043import cascading.tuple.TupleException; 044import cascading.tuple.hadoop.io.HadoopTupleOutputStream; 045import cascading.tuple.hadoop.io.IndexTupleDeserializer; 046import cascading.tuple.hadoop.io.IndexTupleSerializer; 047import cascading.tuple.hadoop.io.KeyIndexTupleDeserializer; 048import cascading.tuple.hadoop.io.KeyIndexTupleSerializer; 049import cascading.tuple.hadoop.io.KeyTupleDeserializer; 050import cascading.tuple.hadoop.io.KeyTupleSerializer; 051import cascading.tuple.hadoop.io.TupleDeserializer; 052import cascading.tuple.hadoop.io.TuplePairDeserializer; 053import cascading.tuple.hadoop.io.TuplePairSerializer; 054import cascading.tuple.hadoop.io.TupleSerializer; 055import cascading.tuple.hadoop.io.ValueIndexTupleDeserializer; 056import cascading.tuple.hadoop.io.ValueIndexTupleSerializer; 057import cascading.tuple.hadoop.io.ValueTupleDeserializer; 058import cascading.tuple.hadoop.io.ValueTupleSerializer; 059import cascading.tuple.io.IndexTuple; 060import cascading.tuple.io.KeyIndexTuple; 061import cascading.tuple.io.KeyTuple; 062import cascading.tuple.io.TupleInputStream; 063import cascading.tuple.io.TupleOutputStream; 064import cascading.tuple.io.TuplePair; 065import cascading.tuple.io.ValueIndexTuple; 066import cascading.tuple.io.ValueTuple; 067import cascading.util.Util; 068import org.apache.hadoop.conf.Configuration; 069import org.apache.hadoop.conf.Configured; 070import org.apache.hadoop.io.WritableUtils; 071import org.apache.hadoop.io.serializer.Deserializer; 072import org.apache.hadoop.io.serializer.Serialization; 073import org.apache.hadoop.io.serializer.SerializationFactory; 074import org.apache.hadoop.io.serializer.Serializer; 075import org.apache.hadoop.io.serializer.WritableSerialization; 076import org.apache.hadoop.util.ReflectionUtils; 077import org.slf4j.Logger; 078import org.slf4j.LoggerFactory; 079 080import static cascading.tuple.hadoop.TupleSerializationProps.HADOOP_IO_SERIALIZATIONS; 081 082/** 083 * Class TupleSerialization is an implementation of Hadoop's {@link Serialization} interface. 084 * <p> 085 * Typically developers will not use this implementation directly as it is automatically added 086 * to any relevant MapReduce jobs via the {@link org.apache.hadoop.conf.Configuration}. 087 * <p> 088 * By default, all primitive types are natively handled, and {@link org.apache.hadoop.io.BytesWritable} 089 * has a pre-configured serialization token since byte arrays are not handled natively by {@link Tuple}. 090 * <p> 091 * To add or manipulate Hadoop serializations or Cascading serializations tokens, see 092 * {@link TupleSerializationProps} for a fluent property builder class. 093 * <p> 094 * By default this Serialization interface registers the class {@link org.apache.hadoop.io.ByteWritable} as 095 * token 127. 096 */ 097@SerializationToken( 098 tokens = {127}, 099 classNames = {"org.apache.hadoop.io.BytesWritable"}) 100public class TupleSerialization extends Configured implements Serialization 101 { 102 103 /** Field LOG */ 104 private static final Logger LOG = LoggerFactory.getLogger( TupleSerialization.class ); 105 106 /** Field defaultComparator * */ 107 private Comparator defaultComparator; 108 /** Field classCache */ 109 private final Map<String, Class> classCache = new HashMap<String, Class>(); 110 /** Field serializationFactory */ 111 private SerializationFactory serializationFactory; 112 113 /** Field tokenClassesMap */ 114 private HashMap<Integer, String> tokenClassesMap; 115 /** Field classesTokensMap */ 116 private HashMap<String, Integer> classesTokensMap; 117 /** Field tokenMapSize */ 118 private long tokensSize = 0; 119 120 List<Integer> ordinals; 121 122 Map<Integer, Fields> keyFieldsMap; 123 Map<Integer, Fields> sortFieldsMap; 124 Map<Integer, Fields> valueFieldsMap; 125 126 Fields keyFields; 127 Fields sortFields; 128 Fields valueFields; 129 130 Boolean typesRequired; // for testing purposes 131 Boolean typesIgnored; // for testing purposes 132 133 static String getSerializationTokens( Configuration jobConf ) 134 { 135 return jobConf.get( TupleSerializationProps.SERIALIZATION_TOKENS ); 136 } 137 138 /** 139 * Adds this class as a Hadoop Serialization class. This method is safe to call redundantly. 140 * <p> 141 * This method will guarantee and {@link WritableSerialization} are 142 * first in the list, as both are required. 143 * 144 * @param jobConf of type JobConf 145 */ 146 public static void setSerializations( Configuration jobConf ) 147 { 148 setSerializations( jobConf, Collections.emptySet() ); 149 } 150 151 public static void setSerializations( Configuration jobConf, Collection<String> provided ) 152 { 153 String serializations = getSerializations( jobConf ); 154 155 LinkedList<String> list = new LinkedList<String>(); 156 157 list.addAll( provided ); 158 159 if( serializations != null && !serializations.isEmpty() ) 160 Collections.addAll( list, serializations.split( "," ) ); 161 162 // required by MultiInputSplit 163 String writable = WritableSerialization.class.getName(); 164 String tuple = TupleSerialization.class.getName(); 165 166 list.remove( writable ); 167 list.remove( tuple ); 168 169 list.addFirst( writable ); 170 list.addFirst( tuple ); 171 172 // make writable last 173 jobConf.set( HADOOP_IO_SERIALIZATIONS, Util.join( list, "," ) ); 174 } 175 176 static String getSerializations( Configuration jobConf ) 177 { 178 return jobConf.get( HADOOP_IO_SERIALIZATIONS, null ); 179 } 180 181 public static Comparator getDefaultComparator( Comparator comparator, Configuration jobConf ) 182 { 183 String typeName = jobConf.get( FlowProps.DEFAULT_ELEMENT_COMPARATOR ); 184 185 if( Util.isEmpty( typeName ) ) 186 return null; 187 188 if( comparator == null ) 189 return createComparator( jobConf, typeName ); 190 191 if( comparator.getClass().getName().equals( typeName ) && !( comparator instanceof Configured ) ) 192 return comparator; 193 194 return createComparator( jobConf, typeName ); 195 } 196 197 public static Comparator getDefaultComparator( Configuration jobConf ) 198 { 199 String typeName = jobConf.get( FlowProps.DEFAULT_ELEMENT_COMPARATOR ); 200 201 if( Util.isEmpty( typeName ) ) 202 return null; 203 204 return createComparator( jobConf, typeName ); 205 } 206 207 private static Comparator createComparator( Configuration jobConf, String typeName ) 208 { 209 LOG.debug( "using default comparator: {}", typeName ); 210 211 try 212 { 213 Class<Comparator> type = (Class<Comparator>) TupleSerialization.class.getClassLoader().loadClass( typeName ); 214 215 return ReflectionUtils.newInstance( type, jobConf ); 216 } 217 catch( ClassNotFoundException exception ) 218 { 219 throw new CascadingException( "unable to load class: " + typeName, exception ); 220 } 221 } 222 223 /** Constructor TupleSerialization creates a new TupleSerialization instance. */ 224 public TupleSerialization() 225 { 226 } 227 228 public TupleSerialization( final FlowProcess<? extends Configuration> flowProcess ) 229 { 230 super( new Configuration() 231 { 232 @Override 233 public String get( String name ) 234 { 235 return get( name, null ); 236 } 237 238 @Override 239 public String get( String name, String defaultValue ) 240 { 241 Object value = flowProcess.getProperty( name ); 242 return value == null ? defaultValue : String.valueOf( value ); 243 } 244 } ); 245 } 246 247 /** 248 * Constructor TupleSerialization creates a new TupleSerialization instance. 249 * 250 * @param conf of type Configuration 251 */ 252 public TupleSerialization( Configuration conf ) 253 { 254 super( conf ); 255 } 256 257 @Override 258 public void setConf( Configuration conf ) 259 { 260 super.setConf( conf ); 261 262 if( conf != null ) 263 defaultComparator = getDefaultComparator( conf ); 264 } 265 266 @Override 267 public Configuration getConf() 268 { 269 if( super.getConf() == null ) 270 setConf( new Configuration() ); 271 272 return super.getConf(); 273 } 274 275 public boolean areTypesIgnored() 276 { 277 if( typesIgnored == null ) 278 { 279 typesIgnored = getConf().getBoolean( TupleSerializationProps.IGNORE_TYPES, false ); 280 281 if( typesIgnored ) 282 LOG.info( "types are being ignored during serialization" ); 283 } 284 285 return typesIgnored; 286 } 287 288 public boolean areTypesRequired() 289 { 290 if( typesRequired == null ) 291 { 292 typesRequired = getConf().getBoolean( TupleSerializationProps.REQUIRE_TYPES, false ); 293 294 if( typesRequired ) 295 LOG.info( "types are being enforced during serialization" ); 296 } 297 298 return typesRequired; 299 } 300 301 SerializationFactory getSerializationFactory() 302 { 303 if( serializationFactory == null ) 304 serializationFactory = new SerializationFactory( getConf() ); 305 306 return serializationFactory; 307 } 308 309 public Fields getKeyFields() 310 { 311 if( keyFields == null && getFirstOrdinal() != null ) 312 keyFields = getKeyFieldsMap().get( getFirstOrdinal() ); 313 314 return keyFields; 315 } 316 317 public Class[] getKeyTypes() 318 { 319 Fields fields = getKeyFields(); 320 321 return getTypesFor( fields ); 322 } 323 324 public Class[] getTypesFor( Fields fields ) 325 { 326 if( areTypesIgnored() || fields == null ) 327 return null; 328 329 return fields.getTypesClasses(); 330 } 331 332 public Fields getSortFields() 333 { 334 if( sortFields == null && getFirstOrdinal() != null ) 335 sortFields = getSortFieldsMap().get( getFirstOrdinal() ); 336 337 return sortFields; 338 } 339 340 public Class[] getSortTypes() 341 { 342 return getTypesFor( getSortFields() ); 343 } 344 345 public Fields getValueFields() 346 { 347 if( valueFields == null && getFirstOrdinal() != null ) 348 valueFields = getValueFieldsMap().get( getFirstOrdinal() ); 349 350 return valueFields; 351 } 352 353 public Fields getMaskedValueFields() 354 { 355 return maskVoid( getValueFields(), getKeyFields() ); 356 } 357 358 public Class[] getValueTypes() 359 { 360 return getTypesFor( getValueFields() ); 361 } 362 363 public Map<Integer, Class[]> getKeyTypeMap() 364 { 365 if( areTypesIgnored() || getKeyFieldsMap() == null ) 366 return Collections.emptyMap(); 367 368 Map<Integer, Class[]> map = new HashMap<>(); 369 370 for( Map.Entry<Integer, Fields> entry : getKeyFieldsMap().entrySet() ) 371 map.put( entry.getKey(), entry.getValue().getTypesClasses() ); 372 373 return map; 374 } 375 376 public Map<Integer, Class[]> getValueTypeMap() 377 { 378 if( areTypesIgnored() || getValueFieldsMap() == null ) 379 return Collections.emptyMap(); 380 381 Map<Integer, Class[]> map = new HashMap<>(); 382 383 for( Map.Entry<Integer, Fields> entry : getValueFieldsMap().entrySet() ) 384 map.put( entry.getKey(), entry.getValue().getTypesClasses() ); 385 386 return map; 387 } 388 389 public Map<Integer, Class[]> getMaskedValueTypeMap() 390 { 391 if( areTypesIgnored() || getValueFieldsMap() == null ) 392 return Collections.emptyMap(); 393 394 Map<Integer, Fields> keyFieldsMap = getKeyFieldsMap(); 395 396 if( keyFieldsMap == null || keyFieldsMap.isEmpty() ) 397 return getValueTypeMap(); 398 399 Map<Integer, Class[]> map = new HashMap<>(); 400 401 for( Map.Entry<Integer, Fields> entry : getValueFieldsMap().entrySet() ) 402 { 403 Integer ordinal = entry.getKey(); 404 Fields valueFields = entry.getValue(); 405 Fields keyFields = keyFieldsMap.get( ordinal ); 406 407 map.put( ordinal, maskVoid( valueFields, keyFields ).getTypesClasses() ); 408 } 409 410 return map; 411 } 412 413 public List<Integer> getOrdinals() 414 { 415 if( ordinals == null ) 416 ordinals = Util.split( Integer.class, ",", getConf().get( "cascading.node.ordinals" ) ); 417 418 return ordinals; 419 } 420 421 public Integer getFirstOrdinal() 422 { 423 if( getOrdinals().isEmpty() ) 424 return null; 425 426 return Util.getFirst( getOrdinals() ); 427 } 428 429 public Map<Integer, Fields> getKeyFieldsMap() 430 { 431 if( keyFieldsMap == null ) 432 keyFieldsMap = getFields( getConf(), "cascading.node.key.fields" ); 433 434 return keyFieldsMap; 435 } 436 437 public Map<Integer, Fields> getSortFieldsMap() 438 { 439 if( sortFields == null ) 440 sortFieldsMap = getFields( getConf(), "cascading.node.sort.fields" ); 441 442 return sortFieldsMap; 443 } 444 445 public Map<Integer, Fields> getValueFieldsMap() 446 { 447 if( valueFieldsMap == null ) 448 valueFieldsMap = getFields( getConf(), "cascading.node.value.fields" ); 449 450 return valueFieldsMap; 451 } 452 453 /** Must be called before {@link #getClassNameFor(int)} and {@link #getTokenFor(String)} methods. */ 454 void initTokenMaps() 455 { 456 if( tokenClassesMap != null ) 457 return; 458 459 tokenClassesMap = new HashMap<>(); 460 classesTokensMap = new HashMap<>(); 461 462 String tokenProperty = getSerializationTokens( getConf() ); 463 464 if( tokenProperty != null ) 465 { 466 tokenProperty = tokenProperty.replaceAll( "\\s", "" ); // allow for whitespace in token set 467 468 for( String pair : tokenProperty.split( "," ) ) 469 { 470 String[] elements = pair.split( "=" ); 471 addToken( null, Integer.parseInt( elements[ 0 ] ), elements[ 1 ] ); 472 } 473 } 474 475 String serializationsString = getSerializations( getConf() ); 476 477 LOG.debug( "using hadoop serializations from the job conf: {} ", serializationsString ); 478 479 if( serializationsString == null ) 480 return; 481 482 String[] serializations = serializationsString.split( "," ); 483 484 for( String serializationName : serializations ) 485 { 486 try 487 { 488 Class type = getConf().getClassByName( serializationName ); 489 490 SerializationToken tokenAnnotation = (SerializationToken) type.getAnnotation( SerializationToken.class ); 491 492 if( tokenAnnotation == null ) 493 continue; 494 495 if( tokenAnnotation.tokens().length != tokenAnnotation.classNames().length ) 496 throw new CascadingException( "serialization annotation tokens and classNames must be the same length" ); 497 498 int[] tokens = tokenAnnotation.tokens(); 499 500 for( int i = 0; i < tokens.length; i++ ) 501 addToken( type, tokens[ i ], tokenAnnotation.classNames()[ i ] ); 502 } 503 catch( ClassNotFoundException exception ) 504 { 505 LOG.warn( "unable to load serialization class: {}", serializationName, exception ); 506 } 507 } 508 509 tokensSize = tokenClassesMap.size(); 510 } 511 512 private void addToken( Class type, int token, String className ) 513 { 514 if( type != null && !type.getName().startsWith( "cascading." ) && token < 128 ) 515 throw new CascadingException( "serialization annotation tokens may not be less than 128, was: " + token ); 516 517 if( tokenClassesMap.containsKey( token ) ) 518 { 519 if( type == null ) 520 throw new IllegalStateException( "duplicate serialization token: " + token + " for class: " + className + " found in properties" ); 521 522 throw new IllegalStateException( "duplicate serialization token: " + token + " for class: " + className + " on serialization: " + type.getName() ); 523 } 524 525 if( classesTokensMap.containsKey( className ) ) 526 { 527 if( type == null ) 528 throw new IllegalStateException( "duplicate serialization classname: " + className + " for token: " + token + " found in properties " ); 529 530 throw new IllegalStateException( "duplicate serialization classname: " + className + " for token: " + token + " on serialization: " + type.getName() ); 531 } 532 533 LOG.debug( "adding serialization token: {}, for classname: {}", token, className ); 534 535 tokenClassesMap.put( token, className ); 536 classesTokensMap.put( className, token ); 537 } 538 539 /** 540 * Returns the className for the given token. 541 * 542 * @param token of type int 543 * @return a String 544 */ 545 final String getClassNameFor( int token ) 546 { 547 initTokenMaps(); 548 549 if( tokensSize == 0 ) 550 return null; 551 552 return tokenClassesMap.get( token ); 553 } 554 555 final long getTokensMapSize() 556 { 557 return tokensSize; 558 } 559 560 /** 561 * Returns the token for the given className. 562 * 563 * @param className of type String 564 * @return an Integer 565 */ 566 final Integer getTokenFor( String className ) 567 { 568 initTokenMaps(); 569 570 if( tokensSize == 0 ) 571 return null; 572 573 return classesTokensMap.get( className ); 574 } 575 576 public Comparator getDefaultComparator() 577 { 578 return defaultComparator; 579 } 580 581 public Comparator getComparator( Class type ) 582 { 583 Serialization serialization = getSerialization( type ); 584 585 Comparator comparator = null; 586 587 if( serialization instanceof Comparison ) 588 comparator = ( (Comparison) serialization ).getComparator( type ); 589 590 if( comparator != null ) 591 return comparator; 592 593 return defaultComparator; 594 } 595 596 Serialization getSerialization( String className ) 597 { 598 return getSerialization( getClass( className ) ); 599 } 600 601 Serialization getSerialization( Class type ) 602 { 603 return getSerializationFactory().getSerialization( type ); 604 } 605 606 Serializer getNewSerializer( Class type ) 607 { 608 try 609 { 610 Serializer serializer = getSerializationFactory().getSerializer( type ); 611 612 if( serializer == null ) 613 throw new CascadingException( "unable to load serializer for: " + type.getName() + " from: " + getSerializationFactory().getClass().getName() ); 614 615 return serializer; 616 } 617 catch( NullPointerException exception ) 618 { 619 throw new CascadingException( "unable to load serializer for: " + type.getName() + " from: " + getSerializationFactory().getClass().getName() ); 620 } 621 } 622 623 Deserializer getNewDeserializer( String className ) 624 { 625 try 626 { 627 Deserializer deserializer = getSerializationFactory().getDeserializer( getClass( className ) ); 628 629 if( deserializer == null ) 630 throw new CascadingException( "unable to load deserializer for: " + className + " from: " + getSerializationFactory().getClass().getName() ); 631 632 return deserializer; 633 } 634 catch( NullPointerException exception ) 635 { 636 throw new CascadingException( "unable to load deserializer for: " + className + " from: " + getSerializationFactory().getClass().getName() ); 637 } 638 } 639 640 KeyTupleDeserializer getKeyTupleDeserializer() 641 { 642 return new KeyTupleDeserializer( getElementReader() ); 643 } 644 645 ValueTupleDeserializer getValueTupleDeserializer() 646 { 647 return new ValueTupleDeserializer( getElementReader() ); 648 } 649 650 TuplePairDeserializer getTuplePairDeserializer() 651 { 652 return new TuplePairDeserializer( getElementReader() ); 653 } 654 655 /** 656 * Method getElementReader returns the elementReader of this TupleSerialization object. 657 * 658 * @return the elementReader (type SerializationElementReader) of this TupleSerialization object. 659 */ 660 public SerializationElementReader getElementReader() 661 { 662 return new SerializationElementReader( this ); 663 } 664 665 TupleDeserializer getTupleDeserializer() 666 { 667 return new TupleDeserializer( getElementReader() ); 668 } 669 670 private KeyTupleSerializer getKeyTupleSerializer() 671 { 672 return new KeyTupleSerializer( getElementWriter() ); 673 } 674 675 private ValueTupleSerializer getValueTupleSerializer() 676 { 677 return new ValueTupleSerializer( getElementWriter() ); 678 } 679 680 private TuplePairSerializer getTuplePairSerializer() 681 { 682 return new TuplePairSerializer( getElementWriter() ); 683 } 684 685 KeyIndexTupleDeserializer getKeyIndexTupleDeserializer() 686 { 687 return new KeyIndexTupleDeserializer( getElementReader() ); 688 } 689 690 ValueIndexTupleDeserializer getValueIndexTupleDeserializer() 691 { 692 return new ValueIndexTupleDeserializer( getElementReader() ); 693 } 694 695 IndexTupleDeserializer getIndexTupleDeserializer() 696 { 697 return new IndexTupleDeserializer( getElementReader() ); 698 } 699 700 /** 701 * Method getElementWriter returns the elementWriter of this TupleSerialization object. 702 * 703 * @return the elementWriter (type SerializationElementWriter) of this TupleSerialization object. 704 */ 705 public SerializationElementWriter getElementWriter() 706 { 707 return new SerializationElementWriter( this ); 708 } 709 710 private TupleSerializer getTupleSerializer() 711 { 712 return new TupleSerializer( getElementWriter() ); 713 } 714 715 private KeyIndexTupleSerializer getKeyIndexTupleSerializer() 716 { 717 return new KeyIndexTupleSerializer( getElementWriter() ); 718 } 719 720 private ValueIndexTupleSerializer getValueIndexTupleSerializer() 721 { 722 return new ValueIndexTupleSerializer( getElementWriter() ); 723 } 724 725 private IndexTupleSerializer getIndexTupleSerializer() 726 { 727 return new IndexTupleSerializer( getElementWriter() ); 728 } 729 730 public boolean accept( Class c ) 731 { 732 return Tuple.class == c || 733 KeyTuple.class == c || ValueTuple.class == c || 734 KeyIndexTuple.class == c || ValueIndexTuple.class == c || 735 TuplePair.class == c || IndexTuple.class == c; 736 } 737 738 public Deserializer getDeserializer( Class c ) 739 { 740 if( c == Tuple.class ) 741 return getTupleDeserializer(); 742 else if( c == KeyTuple.class ) 743 return getKeyTupleDeserializer(); 744 else if( c == ValueTuple.class ) 745 return getValueTupleDeserializer(); 746 else if( c == KeyIndexTuple.class ) 747 return getKeyIndexTupleDeserializer(); 748 else if( c == ValueIndexTuple.class ) 749 return getValueIndexTupleDeserializer(); 750 else if( c == TuplePair.class ) 751 return getTuplePairDeserializer(); 752 else if( c == IndexTuple.class ) 753 return getIndexTupleDeserializer(); 754 755 throw new IllegalArgumentException( "unknown class, cannot deserialize: " + c.getName() ); 756 } 757 758 public Serializer getSerializer( Class c ) 759 { 760 if( c == Tuple.class ) 761 return getTupleSerializer(); 762 else if( c == KeyTuple.class ) 763 return getKeyTupleSerializer(); 764 else if( c == ValueTuple.class ) 765 return getValueTupleSerializer(); 766 else if( c == KeyIndexTuple.class ) 767 return getKeyIndexTupleSerializer(); 768 else if( c == ValueIndexTuple.class ) 769 return getValueIndexTupleSerializer(); 770 else if( c == TuplePair.class ) 771 return getTuplePairSerializer(); 772 else if( c == IndexTuple.class ) 773 return getIndexTupleSerializer(); 774 775 throw new IllegalArgumentException( "unknown class, cannot serialize: " + c.getName() ); 776 } 777 778 public Class getClass( String className ) 779 { 780 Class type = classCache.get( className ); 781 782 if( type != null ) 783 return type; 784 785 try 786 { 787 if( className.charAt( 0 ) == '[' ) 788 type = Class.forName( className, true, Thread.currentThread().getContextClassLoader() ); 789 else 790 type = Thread.currentThread().getContextClassLoader().loadClass( className ); 791 } 792 catch( ClassNotFoundException exception ) 793 { 794 throw new TupleException( "unable to load class named: " + className, exception ); 795 } 796 797 classCache.put( className, type ); 798 799 return type; 800 } 801 802 public static Map<Integer, Fields> getFields( Configuration conf, String property ) 803 { 804 try 805 { 806 return HadoopUtil.getFields( conf, property ); 807 } 808 catch( IOException exception ) 809 { 810 LOG.warn( "unable to get fields for: " + property ); 811 812 return Collections.emptyMap(); 813 } 814 } 815 816 private static Fields maskVoid( Fields fields, Fields mask ) 817 { 818 if( fields == null ) 819 return null; 820 821 if( mask == null || !fields.hasTypes() || !mask.hasTypes() ) 822 return fields; 823 824 Fields voidedKey = mask.applyTypes( Fields.size( mask.size(), Void.class ) ); 825 826 fields = fields.applyTypes( voidedKey ); 827 828 return fields; 829 } 830 831 public static class SerializationElementReader implements TupleInputStream.ElementReader 832 { 833 /** Field LOG */ 834 private static final Logger LOG = LoggerFactory.getLogger( SerializationElementReader.class ); 835 836 /** Field tupleSerialization */ 837 private final TupleSerialization tupleSerialization; 838 839 /** Field deserializers */ 840 final Map<String, Deserializer> deserializers = new HashMap<String, Deserializer>(); 841 842 /** 843 * Constructor SerializationElementReader creates a new SerializationElementReader instance. 844 * 845 * @param tupleSerialization of type TupleSerialization 846 */ 847 public SerializationElementReader( TupleSerialization tupleSerialization ) 848 { 849 this.tupleSerialization = tupleSerialization; 850 } 851 852 public TupleSerialization getTupleSerialization() 853 { 854 return tupleSerialization; 855 } 856 857 public Object read( int token, DataInputStream inputStream ) throws IOException 858 { 859 String className = getClassNameFor( token, inputStream ); 860 Deserializer deserializer = getDeserializerFor( inputStream, className ); 861 862 Object foundObject = null; 863 Object object; 864 865 try 866 { 867 object = deserializer.deserialize( foundObject ); 868 } 869 catch( IOException exception ) 870 { 871 LOG.error( "failed deserializing token: " + token + " with classname: " + className, exception ); 872 873 throw exception; 874 } 875 876 return object; 877 } 878 879 public Object read( Class type, DataInputStream inputStream ) throws IOException 880 { 881 String className = type.getName(); 882 Deserializer deserializer = getDeserializerFor( inputStream, className ); 883 884 Object foundObject = null; 885 Object object; 886 887 try 888 { 889 object = deserializer.deserialize( foundObject ); 890 } 891 catch( IOException exception ) 892 { 893 LOG.error( "failed deserializing: " + className, exception ); 894 895 throw exception; 896 } 897 898 return object; 899 } 900 901 @Override 902 public Comparator getComparatorFor( int token, DataInputStream inputStream ) throws IOException 903 { 904 Class type = tupleSerialization.getClass( getClassNameFor( token, inputStream ) ); 905 906 return tupleSerialization.getComparator( type ); 907 } 908 909 private Deserializer getDeserializerFor( DataInputStream inputStream, String className ) throws IOException 910 { 911 Deserializer deserializer = deserializers.get( className ); 912 913 if( deserializer == null ) 914 { 915 deserializer = tupleSerialization.getNewDeserializer( className ); 916 deserializer.open( inputStream ); 917 deserializers.put( className, deserializer ); 918 } 919 920 return deserializer; 921 } 922 923 public String getClassNameFor( int token, DataInputStream inputStream ) throws IOException 924 { 925 String className = tupleSerialization.getClassNameFor( token ); 926 927 try 928 { 929 if( className == null ) 930 className = WritableUtils.readString( inputStream ); 931 } 932 catch( IOException exception ) 933 { 934 LOG.error( "unable to resolve token: {}, to a valid classname, with token map of size: {}, rethrowing IOException", token, tupleSerialization.getTokensMapSize() ); 935 throw exception; 936 } 937 938 return className; 939 } 940 941 public void close() 942 { 943 if( deserializers.size() == 0 ) 944 return; 945 946 Collection<Deserializer> clone = new ArrayList<Deserializer>( deserializers.values() ); 947 948 deserializers.clear(); 949 950 for( Deserializer deserializer : clone ) 951 { 952 try 953 { 954 deserializer.close(); 955 } 956 catch( IOException exception ) 957 { 958 // do nothing 959 } 960 } 961 } 962 } 963 964 public static class SerializationElementWriter implements TupleOutputStream.ElementWriter 965 { 966 /** Field LOG */ 967 private static final Logger LOG = LoggerFactory.getLogger( SerializationElementWriter.class ); 968 969 /** Field tupleSerialization */ 970 private final TupleSerialization tupleSerialization; 971 972 /** Field serializers */ 973 final Map<Class, Serializer> serializers = new HashMap<Class, Serializer>(); 974 975 public SerializationElementWriter( TupleSerialization tupleSerialization ) 976 { 977 this.tupleSerialization = tupleSerialization; 978 } 979 980 public TupleSerialization getTupleSerialization() 981 { 982 return tupleSerialization; 983 } 984 985 public void write( DataOutputStream outputStream, Object object ) throws IOException 986 { 987 Class<?> type = object.getClass(); 988 String className = type.getName(); 989 Integer token = tupleSerialization.getTokenFor( className ); 990 991 if( token == null ) 992 { 993 LOG.debug( "no serialization token found for classname: {}", className ); 994 995 WritableUtils.writeVInt( outputStream, HadoopTupleOutputStream.WRITABLE_TOKEN ); // denotes to punt to hadoop serialization 996 WritableUtils.writeString( outputStream, className ); 997 } 998 else 999 { 1000 WritableUtils.writeVInt( outputStream, token ); 1001 } 1002 1003 Serializer serializer = getSerializer( outputStream, type ); 1004 1005 try 1006 { 1007 serializer.serialize( object ); 1008 } 1009 catch( IOException exception ) 1010 { 1011 LOG.error( "failed serializing token: " + token + " with classname: " + className, exception ); 1012 1013 throw exception; 1014 } 1015 } 1016 1017 private Serializer getSerializer( DataOutputStream outputStream, Class<?> type ) throws IOException 1018 { 1019 Serializer serializer = serializers.get( type ); 1020 1021 if( serializer == null ) 1022 { 1023 serializer = tupleSerialization.getNewSerializer( type ); 1024 serializer.open( outputStream ); 1025 serializers.put( type, serializer ); 1026 } 1027 1028 return serializer; 1029 } 1030 1031 public void write( DataOutputStream outputStream, Class<?> type, Object object ) throws IOException 1032 { 1033 Serializer serializer = getSerializer( outputStream, type ); 1034 1035 try 1036 { 1037 serializer.serialize( object ); 1038 } 1039 catch( IOException exception ) 1040 { 1041 LOG.error( "failed serializing type: " + type.getName(), exception ); 1042 1043 throw exception; 1044 } 1045 } 1046 1047 public void close() 1048 { 1049 if( serializers.size() == 0 ) 1050 return; 1051 1052 Collection<Serializer> clone = new ArrayList<Serializer>( serializers.values() ); 1053 1054 serializers.clear(); 1055 1056 for( Serializer serializer : clone ) 1057 { 1058 try 1059 { 1060 serializer.close(); 1061 } 1062 catch( IOException exception ) 1063 { 1064 // do nothing 1065 } 1066 } 1067 } 1068 } 1069 }