001/* 002 * Copyright (c) 2016-2017 Chris K Wensel <chris@wensel.net>. All Rights Reserved. 003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 004 * 005 * Project and contact information: http://www.cascading.org/ 006 * 007 * This file is part of the Cascading project. 008 * 009 * Licensed under the Apache License, Version 2.0 (the "License"); 010 * you may not use this file except in compliance with the License. 011 * You may obtain a copy of the License at 012 * 013 * http://www.apache.org/licenses/LICENSE-2.0 014 * 015 * Unless required by applicable law or agreed to in writing, software 016 * distributed under the License is distributed on an "AS IS" BASIS, 017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 018 * See the License for the specific language governing permissions and 019 * limitations under the License. 020 */ 021 022package cascading.tuple.hadoop.util; 023 024import java.util.Comparator; 025import java.util.List; 026 027import cascading.tuple.Hasher; 028import cascading.tuple.Tuple; 029import cascading.tuple.hadoop.TupleSerialization; 030import cascading.tuple.util.TupleHasher; 031import org.apache.hadoop.conf.Configurable; 032import org.apache.hadoop.conf.Configuration; 033 034/** 035 * Super class of all Hadoop partitioners. 036 * <p> 037 * As of Cascading 2.7 the hashing used to calculate partitions has been changed to use Murmur3. Users that rely on the 038 * old behaviour should set {@link cascading.tuple.hadoop.util.HasherPartitioner#HASHER_PARTITIONER_USE_LEGACY_HASH} to 039 * {@code true}. 040 */ 041public class HasherPartitioner extends TupleHasher implements Configurable 042 { 043 public final static String HASHER_PARTITIONER_USE_LEGACY_HASH = "cascading.tuple.hadoop.util.hasherpartitioner.uselegacyhash"; 044 045 private static Comparator defaultComparator; 046 047 private Comparator[] comparators; 048 private Configuration conf; 049 050 @Override 051 public void setConf( Configuration conf ) 052 { 053 if( this.conf != null ) 054 return; 055 056 this.conf = conf; 057 058 defaultComparator = TupleSerialization.getDefaultComparator( defaultComparator, conf ); 059 060 comparators = DeserializerComparator.getFieldComparatorsFrom( conf, "cascading.group.comparator" ); 061 062 if( conf.getBoolean( HASHER_PARTITIONER_USE_LEGACY_HASH, false ) ) 063 this.hashFunction = new LegacyHashFunction(); 064 065 initialize( defaultComparator, comparators ); 066 } 067 068 @Override 069 public Configuration getConf() 070 { 071 return conf; 072 } 073 074 static class LegacyHashFunction extends TupleHasher.HashFunction 075 { 076 @Override 077 public int hash( Tuple tuple, Hasher[] hashers ) 078 { 079 int hash = 1; 080 List<Object> elements = Tuple.elements( tuple ); 081 for( int i = 0; i < elements.size(); i++ ) 082 { 083 Object element = elements.get( i ); 084 hash = 31 * hash + ( element != null ? hashers[ i % hashers.length ].hashCode( element ) : 0 ); 085 } 086 return hash; 087 } 088 } 089 }