001/*
002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.util.cache;
022
023import java.util.Collection;
024import java.util.LinkedHashMap;
025import java.util.Map;
026import java.util.Set;
027
028import org.slf4j.Logger;
029import org.slf4j.LoggerFactory;
030
031/**
032 * Implementation of the {@link CascadingCache} interface backed by a {@link java.util.LinkedHashMap} configured to
033 * evict the least recently used key.
034 * <p/>
035 * That is, if duplicate keys are clustered near each other in the incoming tuple stream, this cache will provide the
036 * most benefit as keys that begin to occur less frequently or not at all will be evicted as the key capacity is reached.
037 * <p/>
038 * If the keys are very random, if not uniformly distributed in the stream, consider using the
039 * {@link cascading.util.cache.DirectMappedCache} to reduce the amount of hash and equality comparisons.
040 * <p/>
041 * This implementation is used by default by {@link cascading.pipe.assembly.Unique} and
042 * {@link cascading.pipe.assembly.AggregateBy} and their subclasses.
043 *
044 * @see cascading.pipe.assembly.Unique
045 * @see cascading.pipe.assembly.AggregateBy
046 * @see LRUHashMapCacheFactory
047 * @see DirectMappedCacheFactory
048 * @see DirectMappedCache
049 */
050public class LRUHashMapCache<Key, Value> implements CascadingCache<Key, Value>
051  {
052  /** logger */
053  private static final Logger LOG = LoggerFactory.getLogger( LRUHashMapCache.class );
054
055  /** capacity of the map. */
056  private int capacity;
057
058  /** call-back used, when entries are removed from the cache. */
059  private CacheEvictionCallback callback = CacheEvictionCallback.NULL;
060
061  /** counts flushes. */
062  private long flushes = 0;
063
064  private LinkedHashMap<Key, Value> backingMap;
065
066  @Override
067  public int getCapacity()
068    {
069    return capacity;
070    }
071
072  @Override
073  public void setCacheEvictionCallback( CacheEvictionCallback cacheEvictionCallback )
074    {
075    this.callback = cacheEvictionCallback;
076    }
077
078  @Override
079  public void setCapacity( int capacity )
080    {
081    this.capacity = capacity;
082    }
083
084  @Override
085  public void initialize()
086    {
087    this.backingMap = new LinkedHashMap<Key, Value>( capacity, 0.75f, true )
088    {
089    @Override
090    protected boolean removeEldestEntry( Map.Entry<Key, Value> eldest )
091      {
092      boolean doRemove = size() > capacity;
093
094      if( doRemove )
095        {
096        callback.evict( eldest );
097
098        if( flushes % getCapacity() == 0 ) // every multiple, write out data
099          {
100          Runtime runtime = Runtime.getRuntime();
101          long freeMem = runtime.freeMemory() / 1024 / 1024;
102          long maxMem = runtime.maxMemory() / 1024 / 1024;
103          long totalMem = runtime.totalMemory() / 1024 / 1024;
104
105          LOG.info( "flushed keys num times: {}, with capacity: {}", flushes + 1, capacity );
106          LOG.info( "mem on flush (mb), free: " + freeMem + ", total: " + totalMem + ", max: " + maxMem );
107
108          float percent = (float) totalMem / (float) maxMem;
109
110          if( percent < 0.80F )
111            LOG.info( "total mem is {}% of max mem, to better utilize unused memory consider increasing the cache size", (int) ( percent * 100.0F ) );
112          }
113        flushes++;
114        }
115
116      return doRemove;
117      }
118    };
119    }
120
121  @Override
122  public int size()
123    {
124    return backingMap.size();
125    }
126
127  @Override
128  public boolean isEmpty()
129    {
130    return backingMap.isEmpty();
131    }
132
133  @Override
134  public boolean containsKey( Object key )
135    {
136    return backingMap.containsKey( key );
137    }
138
139  @Override
140  public boolean containsValue( Object value )
141    {
142    return backingMap.containsValue( value );
143    }
144
145  @Override
146  public Value get( Object key )
147    {
148    return backingMap.get( key );
149    }
150
151  @Override
152  public Value put( Key key, Value value )
153    {
154    return backingMap.put( key, value );
155    }
156
157  @Override
158  public Value remove( Object key )
159    {
160    return backingMap.remove( key );
161    }
162
163  @Override
164  public void putAll( Map<? extends Key, ? extends Value> m )
165    {
166    backingMap.putAll( m );
167    }
168
169  @Override
170  public void clear()
171    {
172    backingMap.clear();
173    }
174
175  @Override
176  public Set<Key> keySet()
177    {
178    return backingMap.keySet();
179    }
180
181  @Override
182  public Collection<Value> values()
183    {
184    return backingMap.values();
185    }
186
187  @Override
188  public Set<Entry<Key, Value>> entrySet()
189    {
190    return backingMap.entrySet();
191    }
192  }