001    /*
002     * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.util.cache;
022    
023    import java.util.AbstractCollection;
024    import java.util.AbstractSet;
025    import java.util.Collection;
026    import java.util.Iterator;
027    import java.util.Map;
028    import java.util.NoSuchElementException;
029    import java.util.Set;
030    
031    import org.slf4j.Logger;
032    import org.slf4j.LoggerFactory;
033    
034    /**
035     * DirectMappedCache is an implementation of the {@link cascading.util.cache.CascadingCache} interface following the semantics of
036     * http://en.wikipedia.org/wiki/CPU_cache#Direct-mapped_cache. The Cache is backed by an array that stays constant in size.
037     * <p/>
038     * Unlike other implementation of a Map a hash collision will lead to the entry being overwritten.
039     * The {@link CacheEvictionCallback} is called with the entry that will be overwritten.
040     * <p/>
041     * Use this cache if the keys are arriving in a random if not uniformly distributed order in order to reduce the number
042     * of hash and equality comparisons.
043     * <p/>
044     * If duplicate keys are clustered near each other in the incoming tuple stream, consider the {@link cascading.util.cache.LRUHashMapCache} cache
045     * instead.
046     * <p/>
047     * DirectMappedCache does not permit <code>null</code> keys nor <code>null</code> values
048     *
049     * @see cascading.pipe.assembly.Unique
050     * @see cascading.pipe.assembly.AggregateBy
051     * @see DirectMappedCacheFactory
052     * @see LRUHashMapCacheFactory
053     * @see LRUHashMapCache
054     */
055    public final class DirectMappedCache<Key, Value> implements CascadingCache<Key, Value>
056      {
057      /** logger */
058      private static final Logger LOG = LoggerFactory.getLogger( DirectMappedCache.class );
059    
060      /** size of the backing array. */
061      private int capacity;
062    
063      /** number of actual key in the map. */
064      private int actualSize = 0;
065    
066      /** array used for storing the entries */
067      private Entry<Key, Value>[] elements;
068    
069      private long collisions = 0;
070    
071      private long putCalls = 0;
072    
073      public boolean initialized = false;
074    
075      /** callback that is called whenever an entry is overwritten or removed from the cache. */
076      private CacheEvictionCallback evictionCallBack = CacheEvictionCallback.NULL;
077    
078      @Override
079      public int size()
080        {
081        return actualSize;
082        }
083    
084      @Override
085      public boolean isEmpty()
086        {
087        return actualSize < 1;
088        }
089    
090      @Override
091      public boolean containsKey( Object key )
092        {
093        if( key == null )
094          throw new IllegalArgumentException( "null keys are not permitted" );
095    
096        return get( key ) != null;
097        }
098    
099      @Override
100      public boolean containsValue( Object value )
101        {
102        if( value == null )
103          throw new IllegalArgumentException( "value cannot be null" );
104    
105        Iter<Entry<Key, Value>> iter = new Iter<Entry<Key, Value>>( elements );
106    
107        while( iter.hasNext() )
108          {
109          Value current = iter.next().getValue();
110          if( current.equals( value ) )
111            return true;
112          }
113    
114        return false;
115        }
116    
117      @Override
118      public Value get( Object key )
119        {
120        int index = index( key );
121        Entry<Key, Value> existing = elements[ index ];
122    
123        if( existing == null || !key.equals( existing.getKey() ) )
124          return null;
125    
126        return existing.getValue();
127        }
128    
129      @Override
130      public Value put( Key key, Value value )
131        {
132        if( key == null )
133          throw new IllegalArgumentException( "key cannot be null" );
134    
135        if( value == null )
136          throw new IllegalArgumentException( "value cannot be null" );
137    
138        putCalls++;
139    
140        Value previous = null;
141        int index = index( key );
142        Entry<Key, Value> existing = elements[ index ];
143    
144        if( existing != null )
145          {
146          previous = existing.getValue();
147          collisions++;
148          evictionCallBack.evict( existing );
149          }
150        else
151          actualSize++;
152    
153        elements[ index ] = new CacheEntry<Key, Value>( key, value );
154    
155        if( putCalls % getCapacity() == 0 )
156          {
157          Runtime runtime = Runtime.getRuntime();
158          long freeMem = runtime.freeMemory() / 1024 / 1024;
159          long maxMem = runtime.maxMemory() / 1024 / 1024;
160          long totalMem = runtime.totalMemory() / 1024 / 1024;
161          LOG.info( "mem on flush (mb), free: " + freeMem + ", total: " + totalMem + ", max: " + maxMem );
162          LOG.info( "capacity={}, puts={}, collisions={}, fill factor={}%", getCapacity(), putCalls, collisions,
163            ( (double) getCapacity() / actualSize ) * 100 );
164          float percent = (float) totalMem / (float) maxMem;
165          if( percent < 0.80F )
166            LOG.info( "total mem is {}% of max mem, to better utilize unused memory consider increasing the cache size", (int) ( percent * 100.0F ) );
167          }
168    
169        return previous;
170        }
171    
172      private int index( Object key )
173        {
174        return Math.abs( key.hashCode() % capacity );
175        }
176    
177      @Override
178      public Value remove( Object key )
179        {
180        if( key == null )
181          throw new IllegalArgumentException( "key cannot be null" );
182    
183        int index = index( key );
184        Entry<Key, Value> existing = elements[ index ];
185    
186        if( existing == null || !existing.getKey().equals( key ) )
187          return null;
188    
189        elements[ index ] = null;
190        actualSize--;
191        evictionCallBack.evict( existing );
192    
193        return existing.getValue();
194        }
195    
196      @Override
197      public void putAll( Map<? extends Key, ? extends Value> m )
198        {
199        for( Entry<? extends Key, ? extends Value> entry : m.entrySet() )
200          put( entry.getKey(), entry.getValue() );
201        }
202    
203      @Override
204      public void clear()
205        {
206        elements = new CacheEntry[ capacity ];
207        actualSize = 0;
208        }
209    
210      @Override
211      public Set<Key> keySet()
212        {
213        return new KeySetView<Key>( new SetView<Map.Entry<Key, ?>>( elements, actualSize ) );
214        }
215    
216      @Override
217      public Collection<Value> values()
218        {
219        return new ValueCollection( new SetView<Map.Entry<?, Value>>( elements, actualSize ) );
220        }
221    
222      @Override
223      public Set<Map.Entry<Key, Value>> entrySet()
224        {
225        return new SetView<Entry<Key, Value>>( elements, actualSize );
226        }
227    
228      @Override
229      public int getCapacity()
230        {
231        return capacity;
232        }
233    
234      @Override
235      public void setCacheEvictionCallback( CacheEvictionCallback cacheEvictionCallback )
236        {
237        if( initialized )
238          throw new IllegalStateException( "cannot set callback after initialization" );
239    
240        this.evictionCallBack = cacheEvictionCallback;
241        }
242    
243      @Override
244      public void setCapacity( int capacity )
245        {
246        if( initialized )
247          throw new IllegalArgumentException( "cannot set size after initialization" );
248    
249        this.capacity = capacity;
250        }
251    
252      @Override
253      public void initialize()
254        {
255        if( capacity < 1 )
256          throw new IllegalStateException( "capacity must be larger than 0" );
257    
258        if( evictionCallBack == null )
259          throw new IllegalStateException( "evictionCallback cannot be null" );
260    
261        elements = new CacheEntry[ capacity ];
262        initialized = true;
263        }
264    
265      static class CacheEntry<Key, Value> implements Entry<Key, Value>
266        {
267        private final Key key;
268        private Value value;
269    
270        CacheEntry( Key key, Value value )
271          {
272          this.key = key;
273          this.value = value;
274          }
275    
276        @Override
277        public Key getKey()
278          {
279          return key;
280          }
281    
282        @Override
283        public Value getValue()
284          {
285          return value;
286          }
287    
288        @Override
289        public Value setValue( Value value )
290          {
291          Value oldValue = this.value;
292          this.value = value;
293          return oldValue;
294          }
295    
296        @Override
297        public boolean equals( Object object )
298          {
299          if( this == object )
300            return true;
301          if( object == null || getClass() != object.getClass() )
302            return false;
303    
304          Entry that = (Entry) object;
305    
306          if( key != null ? !key.equals( that.getKey() ) : that.getKey() != null )
307            return false;
308    
309          if( value != null ? !value.equals( that.getValue() ) : that.getValue() != null )
310            return false;
311    
312          return true;
313          }
314    
315        @Override
316        public int hashCode()
317          {
318          int result = key != null ? key.hashCode() : 0;
319          result = 31 * result + ( value != null ? value.hashCode() : 0 );
320          return result;
321          }
322    
323        @Override
324        public String toString()
325          {
326          return "CacheEntry{" +
327            "key=" + key +
328            ", value=" + value +
329            '}';
330          }
331        }
332    
333      class SetView<T> extends AbstractSet<T>
334        {
335        private final T[] elements;
336        private final int size;
337        private final Iter<T> iter;
338    
339        SetView( T[] elements, int size )
340          {
341          this.elements = elements;
342          this.size = size;
343          this.iter = new Iter<T>( elements );
344          }
345    
346        @Override
347        public int size()
348          {
349          return size;
350          }
351    
352        @Override
353        public Iterator<T> iterator()
354          {
355          return new Iter<T>( elements );
356          }
357    
358        @Override
359        public boolean containsAll( Collection<?> c )
360          {
361          for( Object object : c )
362            {
363            if( !contains( object ) )
364              return false;
365            }
366    
367          return true;
368          }
369    
370        @Override
371        public boolean addAll( Collection<? extends T> c )
372          {
373          throw new UnsupportedOperationException( "SetView is read only." );
374          }
375    
376        @Override
377        public boolean retainAll( Collection<?> c )
378          {
379          throw new UnsupportedOperationException( "SetView is read only." );
380          }
381    
382        @Override
383        public boolean removeAll( Collection<?> c )
384          {
385          throw new UnsupportedOperationException( "SetView is read only." );
386          }
387    
388        @Override
389        public void clear()
390          {
391          throw new UnsupportedOperationException( "SetView is read only." );
392          }
393        }
394    
395      static class Iter<T> implements Iterator<T>
396        {
397        private final T[] elements;
398        private T nextElement;
399        int currentIndex = -1;
400    
401        public Iter( T[] elements )
402          {
403          this.elements = elements;
404          this.nextElement = findNext();
405          }
406    
407        private T findNext()
408          {
409          while( currentIndex < elements.length - 1 )
410            {
411            currentIndex++;
412    
413            if( elements[ currentIndex ] != null )
414              return elements[ currentIndex ];
415            }
416    
417          return null;
418          }
419    
420        @Override
421        public boolean hasNext()
422          {
423          return nextElement != null;
424          }
425    
426        @Override
427        public T next()
428          {
429          if( !hasNext() )
430            throw new NoSuchElementException();
431    
432          T current = nextElement;
433    
434          nextElement = findNext();
435    
436          return current;
437          }
438    
439        @Override
440        public void remove()
441          {
442          throw new UnsupportedOperationException( "Not supported" );
443          }
444    
445        public void reset()
446          {
447          currentIndex = -1;
448          nextElement = findNext();
449          }
450        }
451    
452      static class KeySetView<Key> extends AbstractSet<Key>
453        {
454        private final Set<Map.Entry<Key, ?>> parent;
455    
456        public KeySetView( Set<Map.Entry<Key, ?>> parent )
457          {
458          this.parent = parent;
459          }
460    
461        @Override
462        public Iterator<Key> iterator()
463          {
464          return new KeyIterator<Key>( parent.iterator() );
465          }
466    
467        @Override
468        public int size()
469          {
470          return parent.size();
471          }
472        }
473    
474      static class ValueCollection<Value> extends AbstractCollection<Value>
475        {
476        private Set<Entry<?, Value>> parent;
477    
478        ValueCollection( Set<Entry<?, Value>> parent )
479          {
480          this.parent = parent;
481          }
482    
483        @Override
484        public Iterator<Value> iterator()
485          {
486          return new ValueIterator<Value>( parent.iterator() );
487          }
488    
489        @Override
490        public int size()
491          {
492          return parent.size();
493          }
494        }
495    
496      static class KeyIterator<Key> implements Iterator<Key>
497        {
498        private final Iterator<Map.Entry<Key, ?>> parent;
499    
500        public KeyIterator( Iterator<Map.Entry<Key, ?>> parent )
501          {
502          this.parent = parent;
503          }
504    
505        @Override
506        public Key next()
507          {
508          return parent.next().getKey();
509          }
510    
511        @Override
512        public boolean hasNext()
513          {
514          return parent.hasNext();
515          }
516    
517        @Override
518        public void remove()
519          {
520          parent.remove();
521          }
522        }
523    
524      static class ValueIterator<Value> implements Iterator<Value>
525        {
526        private final Iterator<Map.Entry<?, Value>> parent;
527    
528        public ValueIterator( Iterator<Map.Entry<?, Value>> parent )
529          {
530          this.parent = parent;
531          }
532    
533        @Override
534        public Value next()
535          {
536          return parent.next().getValue();
537          }
538    
539        @Override
540        public boolean hasNext()
541          {
542          return parent.hasNext();
543          }
544    
545        @Override
546        public void remove()
547          {
548          parent.remove();
549          }
550        }
551      }