001    /*
002     * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.tuple.util;
022    
023    import java.io.Serializable;
024    import java.util.Comparator;
025    import java.util.List;
026    
027    import cascading.flow.stream.MemorySpliceGate;
028    import cascading.tuple.Fields;
029    import cascading.tuple.Hasher;
030    import cascading.tuple.Tuple;
031    import cascading.util.Murmur3;
032    import org.slf4j.Logger;
033    import org.slf4j.LoggerFactory;
034    
035    /**
036     *
037     */
038    public class TupleHasher implements Serializable
039      {
040      private static final Logger LOG = LoggerFactory.getLogger( MemorySpliceGate.class );
041    
042      private static HashFunction DEFAULT_HASH_FUNCTION = new HashFunction();
043      private static Hasher DEFAULT_HASHER = new ObjectHasher();
044    
045      protected HashFunction hashFunction = DEFAULT_HASH_FUNCTION;
046      private Hasher[] hashers;
047    
048      public TupleHasher()
049        {
050        }
051    
052      public TupleHasher( Comparator defaultComparator, Comparator[] comparators )
053        {
054        initialize( defaultComparator, comparators );
055        }
056    
057      public static Comparator[] merge( Fields[] keyFields )
058        {
059        Comparator[] comparators = new Comparator[ keyFields[ 0 ].size() ];
060    
061        for( Fields keyField : keyFields )
062          {
063          if( keyField == null )
064            continue;
065    
066          for( int i = 0; i < keyField.getComparators().length; i++ )
067            {
068            Comparator comparator = keyField.getComparators()[ i ];
069    
070            if( !( comparator instanceof Hasher ) )
071              continue;
072    
073            if( comparators[ i ] != null && !comparators[ i ].equals( comparator ) )
074              LOG.warn( "two unequal Hasher instances for the same key field position found: {}, and: {}", comparators[ i ], comparator );
075    
076            comparators[ i ] = comparator;
077            }
078          }
079    
080        return comparators;
081        }
082    
083      public static boolean isNull( Comparator[] comparators )
084        {
085        int count = 0;
086    
087        for( Comparator comparator : comparators )
088          {
089          if( comparator == null )
090            count++;
091          }
092    
093        if( count == comparators.length )
094          return true;
095    
096        return false;
097        }
098    
099      protected void initialize( Comparator defaultComparator, Comparator[] comparators )
100        {
101        Hasher defaultHasher = DEFAULT_HASHER;
102    
103        if( defaultComparator instanceof Hasher )
104          defaultHasher = (Hasher) defaultComparator;
105    
106        hashers = new Hasher[ comparators.length ];
107    
108        for( int i = 0; i < comparators.length; i++ )
109          {
110          Comparator comparator = comparators[ i ];
111    
112          if( comparator instanceof Hasher )
113            hashers[ i ] = (Hasher) comparator;
114          else
115            hashers[ i ] = defaultHasher;
116          }
117        }
118    
119      public final int hashCode( Tuple tuple )
120        {
121        return getHashFunction().hash( tuple, hashers );
122        }
123    
124      protected HashFunction getHashFunction()
125        {
126        return hashFunction;
127        }
128    
129      private static class ObjectHasher implements Hasher<Object>
130        {
131        @Override
132        public int hashCode( Object value )
133          {
134          return value.hashCode();
135          }
136        }
137    
138      static class WrappedTuple extends Tuple
139        {
140        private final TupleHasher tupleHasher;
141    
142        public WrappedTuple( TupleHasher tupleHasher, Tuple input )
143          {
144          super( Tuple.elements( input ) );
145          this.tupleHasher = tupleHasher;
146          }
147    
148        @Override
149        public int hashCode()
150          {
151          return tupleHasher.hashCode( this );
152          }
153        }
154    
155      /**
156       * Wraps the given Tuple in a subtype, that uses the provided Hasher for hashCode calculations. If the given Hasher
157       * is {@code null} the input Tuple will be returned.
158       *
159       * @param tupleHasher A TupleHasher instance.
160       * @param input       A Tuple instance.
161       * @return A tuple using the provided TupleHasher for hashCode calculations if the TupleHasher is not null.
162       */
163      public static Tuple wrapTuple( TupleHasher tupleHasher, Tuple input )
164        {
165        if( tupleHasher == null )
166          return input;
167    
168        return new WrappedTuple( tupleHasher, input );
169        }
170    
171      public static class HashFunction implements Serializable
172        {
173        public int hash( Tuple tuple, Hasher[] hashers )
174          {
175          int hash = Murmur3.SEED;
176    
177          List<Object> elements = Tuple.elements( tuple );
178    
179          for( int i = 0; i < elements.size(); i++ )
180            {
181            Object element = elements.get( i );
182            int hashCode = 0;
183    
184            if( element != null )
185              hashCode = hashers[ i % hashers.length ].hashCode( element );
186    
187            hash = Murmur3.mixH1( hash, Murmur3.mixK1( hashCode ) );
188    
189            }
190          return Murmur3.fmix( hash, elements.size() );
191          }
192        }
193      }