001    /*
002     * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.util.cache;
022    
023    import java.util.Collection;
024    import java.util.LinkedHashMap;
025    import java.util.Map;
026    import java.util.Set;
027    
028    import org.slf4j.Logger;
029    import org.slf4j.LoggerFactory;
030    
031    /**
032     * Implementation of the {@link CascadingCache} interface backed by a {@link java.util.LinkedHashMap} configured to
033     * evict the least recently used key.
034     * <p/>
035     * That is, if duplicate keys are clustered near each other in the incoming tuple stream, this cache will provide the
036     * most benefit as keys that begin to occur less frequently or not at all will be evicted as the key capacity is reached.
037     * <p/>
038     * If the keys are very random, if not uniformly distributed in the stream, consider using the
039     * {@link cascading.util.cache.DirectMappedCache} to reduce the amount of hash and equality comparisons.
040     * <p/>
041     * This implementation is used by default by {@link cascading.pipe.assembly.Unique} and
042     * {@link cascading.pipe.assembly.AggregateBy} and their subclasses.
043     *
044     * @see cascading.pipe.assembly.Unique
045     * @see cascading.pipe.assembly.AggregateBy
046     * @see LRUHashMapCacheFactory
047     * @see DirectMappedCacheFactory
048     * @see DirectMappedCache
049     */
050    public class LRUHashMapCache<Key, Value> implements CascadingCache<Key, Value>
051      {
052      /** logger */
053      private static final Logger LOG = LoggerFactory.getLogger( LRUHashMapCache.class );
054    
055      /** capacity of the map. */
056      private int capacity;
057    
058      /** call-back used, when entries are removed from the cache. */
059      private CacheEvictionCallback callback = CacheEvictionCallback.NULL;
060    
061      /** counts flushes. */
062      private long flushes = 0;
063    
064      private LinkedHashMap<Key, Value> backingMap;
065    
066      @Override
067      public int getCapacity()
068        {
069        return capacity;
070        }
071    
072      @Override
073      public void setCacheEvictionCallback( CacheEvictionCallback cacheEvictionCallback )
074        {
075        this.callback = cacheEvictionCallback;
076        }
077    
078      @Override
079      public void setCapacity( int capacity )
080        {
081        this.capacity = capacity;
082        }
083    
084      @Override
085      public void initialize()
086        {
087        this.backingMap = new LinkedHashMap<Key, Value>( capacity, 0.75f, true )
088        {
089        @Override
090        protected boolean removeEldestEntry( Map.Entry<Key, Value> eldest )
091          {
092          boolean doRemove = size() > capacity;
093    
094          if( doRemove )
095            {
096            callback.evict( eldest );
097    
098            if( flushes % getCapacity() == 0 ) // every multiple, write out data
099              {
100              Runtime runtime = Runtime.getRuntime();
101              long freeMem = runtime.freeMemory() / 1024 / 1024;
102              long maxMem = runtime.maxMemory() / 1024 / 1024;
103              long totalMem = runtime.totalMemory() / 1024 / 1024;
104    
105              LOG.info( "flushed keys num times: {}, with capacity: {}", flushes + 1, capacity );
106              LOG.info( "mem on flush (mb), free: " + freeMem + ", total: " + totalMem + ", max: " + maxMem );
107    
108              float percent = (float) totalMem / (float) maxMem;
109    
110              if( percent < 0.80F )
111                LOG.info( "total mem is {}% of max mem, to better utilize unused memory consider increasing the cache size", (int) ( percent * 100.0F ) );
112              }
113            flushes++;
114            }
115    
116          return doRemove;
117          }
118        };
119        }
120    
121      @Override
122      public int size()
123        {
124        return backingMap.size();
125        }
126    
127      @Override
128      public boolean isEmpty()
129        {
130        return backingMap.isEmpty();
131        }
132    
133      @Override
134      public boolean containsKey( Object key )
135        {
136        return backingMap.containsKey( key );
137        }
138    
139      @Override
140      public boolean containsValue( Object value )
141        {
142        return backingMap.containsValue( value );
143        }
144    
145      @Override
146      public Value get( Object key )
147        {
148        return backingMap.get( key );
149        }
150    
151      @Override
152      public Value put( Key key, Value value )
153        {
154        return backingMap.put( key, value );
155        }
156    
157      @Override
158      public Value remove( Object key )
159        {
160        return backingMap.remove( key );
161        }
162    
163      @Override
164      public void putAll( Map<? extends Key, ? extends Value> m )
165        {
166        backingMap.putAll( m );
167        }
168    
169      @Override
170      public void clear()
171        {
172        backingMap.clear();
173        }
174    
175      @Override
176      public Set<Key> keySet()
177        {
178        return backingMap.keySet();
179        }
180    
181      @Override
182      public Collection<Value> values()
183        {
184        return backingMap.values();
185        }
186    
187      @Override
188      public Set<Entry<Key, Value>> entrySet()
189        {
190        return backingMap.entrySet();
191        }
192      }