001/*
002 * Copyright (c) 2016-2017 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
004 *
005 * Project and contact information: http://www.cascading.org/
006 *
007 * This file is part of the Cascading project.
008 *
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *     http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 */
021
022package cascading.util.cache;
023
024import java.util.Collection;
025import java.util.LinkedHashMap;
026import java.util.Map;
027import java.util.Set;
028
029import org.slf4j.Logger;
030import org.slf4j.LoggerFactory;
031
032/**
033 * Implementation of the {@link CascadingCache} interface backed by a {@link java.util.LinkedHashMap} configured to
034 * evict the least recently used key.
035 * <p>
036 * That is, if duplicate keys are clustered near each other in the incoming tuple stream, this cache will provide the
037 * most benefit as keys that begin to occur less frequently or not at all will be evicted as the key capacity is reached.
038 * <p>
039 * If the keys are very random, if not uniformly distributed in the stream, consider using the
040 * {@link cascading.util.cache.DirectMappedCache} to reduce the amount of hash and equality comparisons.
041 * <p>
042 * This implementation is used by default by {@link cascading.pipe.assembly.Unique} and
043 * {@link cascading.pipe.assembly.AggregateBy} and their subclasses.
044 *
045 * @see cascading.pipe.assembly.Unique
046 * @see cascading.pipe.assembly.AggregateBy
047 * @see LRUHashMapCacheFactory
048 * @see DirectMappedCacheFactory
049 * @see DirectMappedCache
050 */
051public class LRUHashMapCache<Key, Value> implements CascadingCache<Key, Value>
052  {
053  /** logger */
054  private static final Logger LOG = LoggerFactory.getLogger( LRUHashMapCache.class );
055
056  /** capacity of the map. */
057  private int capacity;
058
059  /** call-back used, when entries are removed from the cache. */
060  private CacheEvictionCallback callback = CacheEvictionCallback.NULL;
061
062  /** counts flushes. */
063  private long flushes = 0;
064
065  private LinkedHashMap<Key, Value> backingMap;
066
067  @Override
068  public int getCapacity()
069    {
070    return capacity;
071    }
072
073  @Override
074  public void setCacheEvictionCallback( CacheEvictionCallback cacheEvictionCallback )
075    {
076    this.callback = cacheEvictionCallback;
077    }
078
079  @Override
080  public void setCapacity( int capacity )
081    {
082    this.capacity = capacity;
083    }
084
085  @Override
086  public void initialize()
087    {
088    this.backingMap = new LinkedHashMap<Key, Value>( capacity, 0.75f, true )
089      {
090      @Override
091      protected boolean removeEldestEntry( Map.Entry<Key, Value> eldest )
092        {
093        boolean doRemove = size() > capacity;
094
095        if( doRemove )
096          {
097          callback.evict( eldest );
098
099          if( flushes % getCapacity() == 0 ) // every multiple, write out data
100            {
101            Runtime runtime = Runtime.getRuntime();
102            long freeMem = runtime.freeMemory() / 1024 / 1024;
103            long maxMem = runtime.maxMemory() / 1024 / 1024;
104            long totalMem = runtime.totalMemory() / 1024 / 1024;
105
106            LOG.info( "flushed keys num times: {}, with capacity: {}", flushes + 1, capacity );
107            LOG.info( "mem on flush (mb), free: " + freeMem + ", total: " + totalMem + ", max: " + maxMem );
108
109            float percent = (float) totalMem / (float) maxMem;
110
111            if( percent < 0.80F )
112              LOG.info( "total mem is {}% of max mem, to better utilize unused memory consider increasing the cache size", (int) ( percent * 100.0F ) );
113            }
114          flushes++;
115          }
116
117        return doRemove;
118        }
119      };
120    }
121
122  @Override
123  public int size()
124    {
125    return backingMap.size();
126    }
127
128  @Override
129  public boolean isEmpty()
130    {
131    return backingMap.isEmpty();
132    }
133
134  @Override
135  public boolean containsKey( Object key )
136    {
137    return backingMap.containsKey( key );
138    }
139
140  @Override
141  public boolean containsValue( Object value )
142    {
143    return backingMap.containsValue( value );
144    }
145
146  @Override
147  public Value get( Object key )
148    {
149    return backingMap.get( key );
150    }
151
152  @Override
153  public Value put( Key key, Value value )
154    {
155    return backingMap.put( key, value );
156    }
157
158  @Override
159  public Value remove( Object key )
160    {
161    return backingMap.remove( key );
162    }
163
164  @Override
165  public void putAll( Map<? extends Key, ? extends Value> m )
166    {
167    backingMap.putAll( m );
168    }
169
170  @Override
171  public void clear()
172    {
173    backingMap.clear();
174    }
175
176  @Override
177  public Set<Key> keySet()
178    {
179    return backingMap.keySet();
180    }
181
182  @Override
183  public Collection<Value> values()
184    {
185    return backingMap.values();
186    }
187
188  @Override
189  public Set<Entry<Key, Value>> entrySet()
190    {
191    return backingMap.entrySet();
192    }
193  }