001/*
002 * Copyright (c) 2016-2017 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
004 *
005 * Project and contact information: http://www.cascading.org/
006 *
007 * This file is part of the Cascading project.
008 *
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *     http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 */
021
022package cascading.tuple.hadoop.util;
023
024import java.util.Comparator;
025import java.util.List;
026
027import cascading.tuple.Hasher;
028import cascading.tuple.Tuple;
029import cascading.tuple.hadoop.TupleSerialization;
030import cascading.tuple.util.TupleHasher;
031import org.apache.hadoop.conf.Configurable;
032import org.apache.hadoop.conf.Configuration;
033
034/**
035 * Super class of all Hadoop partitioners.
036 * <p>
037 * As of Cascading 2.7 the hashing used to calculate partitions has been changed to use Murmur3. Users that rely on the
038 * old behaviour should set {@link cascading.tuple.hadoop.util.HasherPartitioner#HASHER_PARTITIONER_USE_LEGACY_HASH} to
039 * {@code true}.
040 */
041public class HasherPartitioner extends TupleHasher implements Configurable
042  {
043  public final static String HASHER_PARTITIONER_USE_LEGACY_HASH = "cascading.tuple.hadoop.util.hasherpartitioner.uselegacyhash";
044
045  private static Comparator defaultComparator;
046
047  private Comparator[] comparators;
048  private Configuration conf;
049
050  @Override
051  public void setConf( Configuration conf )
052    {
053    if( this.conf != null )
054      return;
055
056    this.conf = conf;
057
058    defaultComparator = TupleSerialization.getDefaultComparator( defaultComparator, conf );
059
060    comparators = DeserializerComparator.getFieldComparatorsFrom( conf, "cascading.group.comparator" );
061
062    if( conf.getBoolean( HASHER_PARTITIONER_USE_LEGACY_HASH, false ) )
063      this.hashFunction = new LegacyHashFunction();
064
065    initialize( defaultComparator, comparators );
066    }
067
068  @Override
069  public Configuration getConf()
070    {
071    return conf;
072    }
073
074  static class LegacyHashFunction extends TupleHasher.HashFunction
075    {
076    @Override
077    public int hash( Tuple tuple, Hasher[] hashers )
078      {
079      int hash = 1;
080      List<Object> elements = Tuple.elements( tuple );
081      for( int i = 0; i < elements.size(); i++ )
082        {
083        Object element = elements.get( i );
084        hash = 31 * hash + ( element != null ? hashers[ i % hashers.length ].hashCode( element ) : 0 );
085        }
086      return hash;
087      }
088    }
089  }