From 3da8e50d26ae8eb588c13a29aebb9b7723d2912d Mon Sep 17 00:00:00 2001 From: shachar Date: Fri, 5 Feb 2016 15:03:57 -0800 Subject: [PATCH 1/8] Initial draft of SharedMem Cache Algorithm - the concept works !!! - need to do: 1. handle crash of JVM in middle of lock 2. make the cache fixed size 3. fail the cache on number of keys too big and do expand() 4. add LRU Lots of work ! --- src/CacheIPCTest.java | 100 ++++++++++++++++++ src/CacheTest.java | 11 +- src/cache/Ehcache.java | 37 ------- src/cache/JCSCache.java | 32 ------ ...emoryCache.java => SharedMemoryCache.java} | 31 +++--- src/util/AtomicLock.java | 62 +++++++++++ 6 files changed, 186 insertions(+), 87 deletions(-) create mode 100644 src/CacheIPCTest.java delete mode 100755 src/cache/Ehcache.java delete mode 100755 src/cache/JCSCache.java rename src/cache/{UnsafeMemoryCache.java => SharedMemoryCache.java} (92%) create mode 100644 src/util/AtomicLock.java diff --git a/src/CacheIPCTest.java b/src/CacheIPCTest.java new file mode 100644 index 0000000..eea8fac --- /dev/null +++ b/src/CacheIPCTest.java @@ -0,0 +1,100 @@ +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.util.Date; +import java.util.Random; + +public class CacheIPCTest { + + private static String output(InputStream inputStream) throws IOException { + StringBuilder sb = new StringBuilder(); + BufferedReader br = null; + try { + br = new BufferedReader(new InputStreamReader(inputStream)); + String line; + while ((line = br.readLine()) != null) { + sb.append(line + System.getProperty("line.separator")); + } + } finally { + br.close(); + } + return sb.toString(); + } + + + public static void startAnotherJVM() throws Exception { + String separator = System.getProperty("file.separator"); + String classpath = System.getProperty("java.class.path"); + String path = System.getProperty("java.home") + + separator + "bin" + separator + "java"; + ProcessBuilder processBuilder = + new ProcessBuilder(path, "-cp", + classpath, + CacheTest.class.getName()); + Process process = processBuilder.start(); + System.out.println("Output:\n" + output(process.getInputStream())); + process.waitFor(); + } + + public static void startAndKillAnotherJVM() throws Exception { + String separator = System.getProperty("file.separator"); + String classpath = System.getProperty("java.class.path"); + String path = System.getProperty("java.home") + + separator + "bin" + separator + "java"; + ProcessBuilder processBuilder = + new ProcessBuilder(path, "-cp", + classpath, + CacheTest.class.getName()); + Process process = processBuilder.start(); + Random rnd = new Random(new Date().getTime() / 1000); + Thread.sleep(rnd.nextInt() % 1000, rnd.nextInt()); + process.destroy(); + System.out.println("Distroyed Output:\n" + output(process.getInputStream())); + process.waitFor(); + } + + + public static void main(String[] args) throws Exception { + int numOfJVMs = 5; + + Runnable r1 = new Runnable() { + + public void run() { + try { + startAnotherJVM(); + } catch (Exception e) { + throw (new Error("JVM is flying ....")); + } + } + }; + + + Runnable r2 = new Runnable() { + + public void run() { + try { + startAndKillAnotherJVM(); + } catch (Exception e) { + throw (new Error("JVM is flying ....")); + } + } + }; + + + Thread[] tArray = new Thread[numOfJVMs]; + + Runnable r = r1; + + for (int i = 0; i < numOfJVMs; i++) { + if (i == numOfJVMs - 2) + r = r2; + tArray[i] = new Thread(r); + tArray[i].start(); + } + + for (int i = 0; i < numOfJVMs; i++) { + tArray[i].join(); + } + } +} diff --git a/src/CacheTest.java b/src/CacheTest.java index 15fe9a4..321a968 100755 --- a/src/CacheTest.java +++ b/src/CacheTest.java @@ -65,17 +65,16 @@ public static void testAll(ICache cache) { System.out.println(cacheClass + " read-write: " + (end - start)); } + + + public static void main(String[] args) throws Exception { String type = args.length == 0 ? null : args[0]; ICache cache; - if ("ehcache".equals(type)) { - cache = new Ehcache(2*G); - } else if ("jcs".equals(type)) { - cache = new JCSCache("sampleCache"); - } else if ("chm".equals(type)) { + if ("chm".equals(type)) { cache = new ConcurrentHashMapCache(3000000, 256); } else { - cache = new UnsafeMemoryCache(new MemoryCacheConfiguration(2*G, 200*K, "/dev/shm/cache-test")); + cache = new SharedMemoryCache(new MemoryCacheConfiguration(200*M, 200*K, "/tmp/cache-shm-test")); } testAll(cache); cache.close(); diff --git a/src/cache/Ehcache.java b/src/cache/Ehcache.java deleted file mode 100755 index 80ed55d..0000000 --- a/src/cache/Ehcache.java +++ /dev/null @@ -1,37 +0,0 @@ -package cache; - -import net.sf.ehcache.Cache; -import net.sf.ehcache.CacheManager; -import net.sf.ehcache.Element; -import net.sf.ehcache.config.CacheConfiguration; -import net.sf.ehcache.config.MemoryUnit; - -public class Ehcache implements ICache { - private Cache cache; - - public Ehcache(long offHeap) { - CacheConfiguration config = new CacheConfiguration("sample-offheap-cache", 0). - overflowToOffHeap(true).maxBytesLocalOffHeap(offHeap, MemoryUnit.BYTES); - CacheManager manager = CacheManager.create(); - manager.addCache(new Cache(config)); - this.cache = manager.getCache("sample-offheap-cache"); - } - - @Override - public byte[] get(long key) { - Element element = cache.get(key); - return element != null ? (byte[]) element.getValue() : null; - } - - @Override - public boolean put(long key, byte[] value) { - cache.put(new Element(key, value)); - return true; - } - - @Override - public void close() { - cache.dispose(); - CacheManager.create().removeCache("sample-offheap-cache"); - } -} diff --git a/src/cache/JCSCache.java b/src/cache/JCSCache.java deleted file mode 100755 index bb5ecee..0000000 --- a/src/cache/JCSCache.java +++ /dev/null @@ -1,32 +0,0 @@ -package cache; - -import org.apache.jcs.JCS; -import org.apache.jcs.access.exception.CacheException; - -public class JCSCache implements ICache { - private JCS cache; - - public JCSCache(String name) throws CacheException { - this.cache = JCS.getInstance(name); - } - - @Override - public byte[] get(long key) { - return (byte[]) cache.get(key); - } - - @Override - public boolean put(long key, byte[] value) { - try { - cache.put(key, value); - return true; - } catch (CacheException e) { - return false; - } - } - - @Override - public void close() { - cache.dispose(); - } -} diff --git a/src/cache/UnsafeMemoryCache.java b/src/cache/SharedMemoryCache.java similarity index 92% rename from src/cache/UnsafeMemoryCache.java rename to src/cache/SharedMemoryCache.java index a30373b..2f7e785 100755 --- a/src/cache/UnsafeMemoryCache.java +++ b/src/cache/SharedMemoryCache.java @@ -1,16 +1,16 @@ package cache; +import sun.misc.Unsafe; +import util.AtomicLock; import util.MappedFile; import util.JavaInternals; -import sun.misc.Unsafe; -import java.util.concurrent.Semaphore; -public class UnsafeMemoryCache implements ICache { +public class SharedMemoryCache implements ICache { private static final Unsafe unsafe = JavaInternals.getUnsafe(); private static final int BYTE_ARRAY_OFFSET = unsafe.arrayBaseOffset(byte[].class); - private static final int WRITE_PERMITS = 1024; + private static final int MAX_KEY_COUNT = 256; private static final int KEY_SIZE = 8; @@ -24,17 +24,24 @@ public class UnsafeMemoryCache implements ICache { private int segmentMask; private Segment[] segments; - static final class Segment extends Semaphore { + static final class Segment { + public AtomicLock lock; final long start; + final long lockAddress; int tail; int count; Segment(long start, int size) { - super(WRITE_PERMITS, true); - this.start = start; - verify(start, size); + this.lockAddress = start; + this.lock = new AtomicLock(lockAddress); + this.start = start+4; + verify(start, size-4); } + public void lock() {lock.lock();} + public void release() {lock.unlock();} + + private void verify(long start, int size) { int maxTail = DATA_START; long prevKey = 0; @@ -64,7 +71,7 @@ private void verify(long start, int size) { } } - public UnsafeMemoryCache(MemoryCacheConfiguration configuration) throws Exception { + public SharedMemoryCache(MemoryCacheConfiguration configuration) throws Exception { long requestedCapacity = configuration.getCapacity(); long desiredSegmentSize = configuration.getSegmentSize(); int segmentCount = calculateSegmentCount(requestedCapacity, desiredSegmentSize); @@ -89,7 +96,7 @@ public void close() { @Override public byte[] get(long key) { Segment segment = segmentFor(key); - segment.acquireUninterruptibly(); + segment.lock(); try { long segmentStart = segment.start; long keysEnd = segmentStart + (segment.count << 3); @@ -117,7 +124,7 @@ public boolean put(long key, byte[] value) { } Segment segment = segmentFor(key); - segment.acquireUninterruptibly(WRITE_PERMITS); + segment.lock(); try { long segmentStart = segment.start; int tail = segment.tail; @@ -152,7 +159,7 @@ public boolean put(long key, byte[] value) { segment.tail = newTail; return true; } finally { - segment.release(WRITE_PERMITS); + segment.release(); } } diff --git a/src/util/AtomicLock.java b/src/util/AtomicLock.java new file mode 100644 index 0000000..07333f9 --- /dev/null +++ b/src/util/AtomicLock.java @@ -0,0 +1,62 @@ +package util; + + +/** + * Created by shachar on 2/4/16. + */ + + +import sun.misc.Unsafe; + + +/** + * Spin Lock + *

This is a lock designed to protect VERY short sections of + * critical code. Threads attempting to take the lock will spin + * forever until the lock is available, thus it is important that + * the code protected by this lock is extremely simple and non + * blocking. The reason for this lock is that it prevents a thread + * from giving up a CPU core when contending for the lock.

+ *
+ * try(SpinLock.Lock lock = spinlock.lock())
+ * {
+ *   // something very quick and non blocking
+ * }
+ * 
+ */ + +public class AtomicLock { + private static final Unsafe unsafe = JavaInternals.getUnsafe(); + long address_; + + public AtomicLock(long address) { + address_ = address; + } + + public void dump(int offset) { + System.out.println(String.format("%02X", unsafe.getInt(address_ + offset))); + } + + public void lock() { + + while (true) { + if (!isLocked()) { + if (unsafe.compareAndSwapInt(null,address_, 0, 1)) { + break; + } + } + assert true; + } + } + + + public boolean isLocked() { + return unsafe.getInt(address_) != 0; + } + + + public void unlock() { + unsafe.putOrderedInt(null,address_, 0); + } +} + From b3e787fd3774c1177710381dc162aab7c8f448c5 Mon Sep 17 00:00:00 2001 From: shachar Date: Fri, 5 Feb 2016 16:12:27 -0800 Subject: [PATCH 2/8] - fix tests to fail on killing JVM - add trylock --- src/CacheIPCTest.java | 4 ++-- src/CacheTest.java | 7 ++++--- src/util/AtomicLock.java | 21 +++++++++++++++++++-- 3 files changed, 25 insertions(+), 7 deletions(-) diff --git a/src/CacheIPCTest.java b/src/CacheIPCTest.java index eea8fac..9a86a4c 100644 --- a/src/CacheIPCTest.java +++ b/src/CacheIPCTest.java @@ -47,8 +47,8 @@ public static void startAndKillAnotherJVM() throws Exception { classpath, CacheTest.class.getName()); Process process = processBuilder.start(); - Random rnd = new Random(new Date().getTime() / 1000); - Thread.sleep(rnd.nextInt() % 1000, rnd.nextInt()); + Random rnd = new Random(new Date().getTime()); + Thread.sleep(rnd.nextInt(3000) + 300 ); process.destroy(); System.out.println("Distroyed Output:\n" + output(process.getInputStream())); process.waitFor(); diff --git a/src/CacheTest.java b/src/CacheTest.java index 321a968..89e6b89 100755 --- a/src/CacheTest.java +++ b/src/CacheTest.java @@ -11,12 +11,13 @@ public class CacheTest { private static final int WARMUP_COUNT = 100000; private static final int RUN_COUNT = 1000000; + private static final int DATA_SIZE = 256; public static void testWrite(ICache cache, int count) { Random random = new Random(0); for (int i = 0; i < count; i++) { long key = random.nextInt(1 << 20) * MAGIC; - cache.put(key, new byte[random.nextInt(8192)]); + cache.put(key, new byte[random.nextInt(DATA_SIZE)]); } } @@ -33,7 +34,7 @@ public static void testRead9Write1(ICache cache, int count) { for (int i = 0; i < count; i++) { long key = random.nextInt(1 << 20) * MAGIC; if (random.nextInt(10) == 0) { - cache.put(key, new byte[random.nextInt(8192)]); + cache.put(key, new byte[random.nextInt(DATA_SIZE)]); } else { cache.get(key); } @@ -74,7 +75,7 @@ public static void main(String[] args) throws Exception { if ("chm".equals(type)) { cache = new ConcurrentHashMapCache(3000000, 256); } else { - cache = new SharedMemoryCache(new MemoryCacheConfiguration(200*M, 200*K, "/tmp/cache-shm-test")); + cache = new SharedMemoryCache(new MemoryCacheConfiguration(200*M, 50*K, "/tmp/cache-shm-test")); } testAll(cache); cache.close(); diff --git a/src/util/AtomicLock.java b/src/util/AtomicLock.java index 07333f9..da16d66 100644 --- a/src/util/AtomicLock.java +++ b/src/util/AtomicLock.java @@ -26,6 +26,7 @@ */ public class AtomicLock { + private static final int MAX_TRY_LOOPS = 1000000; private static final Unsafe unsafe = JavaInternals.getUnsafe(); long address_; @@ -41,7 +42,7 @@ public void lock() { while (true) { if (!isLocked()) { - if (unsafe.compareAndSwapInt(null,address_, 0, 1)) { + if (unsafe.compareAndSwapInt(null, address_, 0, 1)) { break; } } @@ -49,6 +50,22 @@ public void lock() { } } + public boolean tryLock() { + int loops = 0; + boolean locked = false; + while (loops< MAX_TRY_LOOPS) { + if (!isLocked()) { + loops++; + if (unsafe.compareAndSwapInt(null, address_, 0, 1)) { + locked = true; + break; + } + } + assert true; + } + return locked; + } + public boolean isLocked() { return unsafe.getInt(address_) != 0; @@ -56,7 +73,7 @@ public boolean isLocked() { public void unlock() { - unsafe.putOrderedInt(null,address_, 0); + unsafe.putOrderedInt(null, address_, 0); } } From 9b8897ffdad5fb42e3f5e1c77929556e94cb77a6 Mon Sep 17 00:00:00 2001 From: shachar Date: Thu, 11 Feb 2016 13:09:34 -0800 Subject: [PATCH 3/8] - changed key to be byte[] instead of long - replaced unsafe operations with MemoryMappedBuffer operation --- src/CacheTest.java | 22 +-- src/cache/ConcurrentHashMapCache.java | 9 +- src/cache/ICache.java | 4 +- src/cache/SharedMemoryCache.java | 244 +++++++++++++------------- src/util/AtomicLock.java | 5 +- src/util/MappedFile.java | 109 +++++++++--- 6 files changed, 220 insertions(+), 173 deletions(-) diff --git a/src/CacheTest.java b/src/CacheTest.java index 89e6b89..642e894 100755 --- a/src/CacheTest.java +++ b/src/CacheTest.java @@ -1,6 +1,8 @@ import cache.*; import java.util.Random; +import java.util.UUID; +import java.util.Arrays; public class CacheTest { private static final long K = 1024; @@ -9,22 +11,22 @@ public class CacheTest { private static final long MAGIC = 54331; - private static final int WARMUP_COUNT = 100000; - private static final int RUN_COUNT = 1000000; - private static final int DATA_SIZE = 256; + private static final int WARMUP_COUNT = 100; + private static final int RUN_COUNT = 1000; + + public static void testWrite(ICache cache, int count) { - Random random = new Random(0); for (int i = 0; i < count; i++) { - long key = random.nextInt(1 << 20) * MAGIC; - cache.put(key, new byte[random.nextInt(DATA_SIZE)]); + byte[] key = Arrays.copyOfRange(UUID.randomUUID().toString().intern().getBytes(), 0, SharedMemoryCache.KEY_SIZE); + cache.put(key, new byte[SharedMemoryCache.DATA_SIZE]); } } public static void testRead(ICache cache, int count) { Random random = new Random(1); for (int i = 0; i < count; i++) { - long key = random.nextInt(1 << 20) * MAGIC; + byte[] key = Arrays.copyOfRange( UUID.randomUUID().toString().intern().getBytes(), 0, SharedMemoryCache.KEY_SIZE); cache.get(key); } } @@ -32,9 +34,9 @@ public static void testRead(ICache cache, int count) { public static void testRead9Write1(ICache cache, int count) { Random random = new Random(2); for (int i = 0; i < count; i++) { - long key = random.nextInt(1 << 20) * MAGIC; + byte[] key = Arrays.copyOfRange( UUID.randomUUID().toString().intern().getBytes(), 0, SharedMemoryCache.KEY_SIZE); if (random.nextInt(10) == 0) { - cache.put(key, new byte[random.nextInt(DATA_SIZE)]); + cache.put(key, new byte[random.nextInt(SharedMemoryCache.DATA_SIZE)]); } else { cache.get(key); } @@ -75,7 +77,7 @@ public static void main(String[] args) throws Exception { if ("chm".equals(type)) { cache = new ConcurrentHashMapCache(3000000, 256); } else { - cache = new SharedMemoryCache(new MemoryCacheConfiguration(200*M, 50*K, "/tmp/cache-shm-test")); + cache = new SharedMemoryCache(new MemoryCacheConfiguration(100*K, 10*K, "/tmp/cache-shm-test")); } testAll(cache); cache.close(); diff --git a/src/cache/ConcurrentHashMapCache.java b/src/cache/ConcurrentHashMapCache.java index ee77d7b..875ba83 100755 --- a/src/cache/ConcurrentHashMapCache.java +++ b/src/cache/ConcurrentHashMapCache.java @@ -2,21 +2,20 @@ import java.util.concurrent.ConcurrentHashMap; -public class ConcurrentHashMapCache extends ConcurrentHashMap implements ICache { +public class ConcurrentHashMapCache extends ConcurrentHashMap implements ICache { public ConcurrentHashMapCache(int capacity, int concurrencyLevel) { super(capacity, 0.75f, concurrencyLevel); } @Override - public byte[] get(long key) { + public byte[] get(byte[]key) { return super.get(key); } @Override - public boolean put(long key, byte[] value) { - super.put(key, value); - return true; + public byte[] put(byte[] key, byte[] value) { + return super.put(key, value) ; } @Override diff --git a/src/cache/ICache.java b/src/cache/ICache.java index 7221fe2..ea02c10 100755 --- a/src/cache/ICache.java +++ b/src/cache/ICache.java @@ -1,7 +1,7 @@ package cache; public interface ICache { - byte[] get(long key); - boolean put(long key, byte[] value); + byte[] get(byte[] key); + byte[] put(byte[]key, byte[] value); void close(); } diff --git a/src/cache/SharedMemoryCache.java b/src/cache/SharedMemoryCache.java index 2f7e785..a1bf9d1 100755 --- a/src/cache/SharedMemoryCache.java +++ b/src/cache/SharedMemoryCache.java @@ -1,81 +1,94 @@ package cache; -import sun.misc.Unsafe; -import util.AtomicLock; import util.MappedFile; -import util.JavaInternals; +import java.nio.ByteBuffer; +import java.util.Arrays; public class SharedMemoryCache implements ICache { - private static final Unsafe unsafe = JavaInternals.getUnsafe(); - private static final int BYTE_ARRAY_OFFSET = unsafe.arrayBaseOffset(byte[].class); + public static final int KEY_SIZE = 8; + public static final int DATA_SIZE = 16; + private static final int ELEMENT_SIZE = KEY_SIZE + DATA_SIZE; + public final static int MAX_KEY_COUNT(int size) { + return size / ELEMENT_SIZE; + } - private static final int MAX_KEY_COUNT = 256; - private static final int KEY_SIZE = 8; - private static final int KEY_SPACE = MAX_KEY_COUNT * KEY_SIZE; - private static final int DATA_START = KEY_SPACE * 2; - private static final int OFFSET = KEY_SPACE + 0; - private static final int LENGTH = KEY_SPACE + 4; + private static int dateOffset(int keyAddress) { + return keyAddress + KEY_SIZE; + } private MappedFile mmap; private int segmentSize; private int segmentMask; private Segment[] segments; - static final class Segment { - public AtomicLock lock; - final long start; - final long lockAddress; - int tail; - int count; - - Segment(long start, int size) { - this.lockAddress = start; - this.lock = new AtomicLock(lockAddress); - this.start = start+4; - verify(start, size-4); + static final class Segment { + int start =0; + int size; + MappedFile mmap=null; + + + Segment(int start, int size, MappedFile mmap) { + this.size = size ; + this.start = start; + this.mmap = mmap; + verify(); } - public void lock() {lock.lock();} - public void release() {lock.unlock();} + public int elementsSpaceSize() { + return size -4; + } + public int elementsSpaceOffset() { + return start + 4; + } + public int countOffset(){ + return start; + } + public int getCount() { + return mmap.get(countOffset()); + } - private void verify(long start, int size) { - int maxTail = DATA_START; - long prevKey = 0; - long pos = start; + public void incCount() { + mmap.put(getCount() + 1, countOffset()); + } - for (long keysEnd = start + KEY_SPACE; pos < keysEnd; pos += KEY_SIZE) { - long key = unsafe.getLong(pos); - if (key <= prevKey) { - break; - } - int offset = unsafe.getInt(pos + OFFSET); - int length = unsafe.getInt(pos + LENGTH); - int newTail = (offset + length + 7) & ~7; - if (offset < DATA_START || length < 0 || newTail > size) { - break; - } + private void verify() { + int start = elementsSpaceOffset(); + int size = elementsSpaceSize(); + int pos = start; + byte[] prevKey = new byte[KEY_SIZE]; + byte[] key = new byte[KEY_SIZE]; + + if (getCount() > MAX_KEY_COUNT(size)) { + throw new Error("shared cache element in position count is invalid"); + } - if (newTail > maxTail) { - maxTail = newTail; + for (int c = 0; c < getCount(); c++, pos += ELEMENT_SIZE) { + + if ( mmap.compare(pos, ByteBuffer.wrap(prevKey) ) <= 0 ) { + throw new Error("shared cache element in position is invalid"); } - prevKey = key; + mmap.get(prevKey,pos,KEY_SIZE); + } + + if ( ((pos - start)/ ELEMENT_SIZE) < getCount() ) { + throw new Error("shared cache element in position count is invalid"); } - this.tail = maxTail; - this.count = (int) (pos - start) >>> 3; } + + } public SharedMemoryCache(MemoryCacheConfiguration configuration) throws Exception { long requestedCapacity = configuration.getCapacity(); long desiredSegmentSize = configuration.getSegmentSize(); - int segmentCount = calculateSegmentCount(requestedCapacity, desiredSegmentSize); - long segmentSize = (requestedCapacity / segmentCount + 31) & ~31L; + int segmentCount = calculateSegmentCount(requestedCapacity, desiredSegmentSize); + int segmentSize = (int)((requestedCapacity / segmentCount + 31) & ~31L); this.mmap = new MappedFile(configuration.getImageFile(), segmentSize * segmentCount); this.segmentSize = (int) segmentSize; @@ -83,7 +96,7 @@ public SharedMemoryCache(MemoryCacheConfiguration configuration) throws Exceptio this.segments = new Segment[segmentCount]; for (int i = 0; i < segmentCount; i++) { - segments[i] = new Segment(mmap.getAddr() + segmentSize * i, this.segmentSize); + segments[i] = new Segment(segmentSize * i, this.segmentSize,mmap); } } @@ -94,79 +107,71 @@ public void close() { } @Override - public byte[] get(long key) { + public byte[] get(byte[] key) { + + if ( key.length != KEY_SIZE) { + return null; + } + Segment segment = segmentFor(key); - segment.lock(); + mmap.lock(); try { - long segmentStart = segment.start; - long keysEnd = segmentStart + (segment.count << 3); - long keyAddr = binarySearch(key, segmentStart, keysEnd); - - if (keyAddr > 0) { - int offset = unsafe.getInt(keyAddr + OFFSET); - int length = unsafe.getInt(keyAddr + LENGTH); - byte[] result = new byte[length]; - unsafe.copyMemory(null, segmentStart + offset, result, BYTE_ARRAY_OFFSET, length); + int segmentStart = segment.elementsSpaceOffset(); + int keysEnd = segmentStart + segment.getCount() * ELEMENT_SIZE; + int keyOffset = binarySearch(key, segmentStart, keysEnd); + + if (keyOffset > 0) { + int offset = dateOffset(keyOffset); + byte[] result = new byte[DATA_SIZE]; + mmap.get(result, offset, DATA_SIZE); return result; } return null; } finally { - segment.release(); + mmap.release(); } } @Override - public boolean put(long key, byte[] value) { - int length = value.length; - if (length >= segmentSize >> 1) { - return false; + public byte[] put(byte[] key, byte[] value) { + int keyOffset; + int keysEnd; + if (value.length != DATA_SIZE || key.length != KEY_SIZE) { + return null; } Segment segment = segmentFor(key); - segment.lock(); + mmap.lock(); try { - long segmentStart = segment.start; - int tail = segment.tail; - int newTail = (tail + length + 7) & ~7; - - if (newTail > segmentSize) { - tail = DATA_START; - newTail = (tail + length + 7) & ~7; + int segmentStart = segment.elementsSpaceOffset(); + int count = segment.getCount(); + if (count >= MAX_KEY_COUNT(segment.elementsSpaceSize())) { + return null; } - - purgeOverlappingRegion(segment, tail, newTail); - - int count = segment.count; - if (count == MAX_KEY_COUNT) { - return false; - } - - long keysEnd = segmentStart + (count << 3); - long keyAddr = binarySearch(key, segmentStart, keysEnd); - if (keyAddr < 0) { - keyAddr = ~keyAddr; - unsafe.copyMemory(null, keyAddr, null, keyAddr + KEY_SIZE, keysEnd - keyAddr); - unsafe.copyMemory(null, keyAddr + KEY_SPACE, null, keyAddr + (KEY_SPACE + KEY_SIZE), keysEnd - keyAddr); - segment.count = count + 1; + //mmap.dump(segment.start,100); + keysEnd = segmentStart + (count * ELEMENT_SIZE); + keyOffset = binarySearch(key, segmentStart, keysEnd); + if (keyOffset < 0) { + keyOffset = ~keyOffset; + mmap.copy(keyOffset, keyOffset + ELEMENT_SIZE, keysEnd - keyOffset); + mmap.put(key, keyOffset, KEY_SIZE); + segment.incCount(); } - unsafe.putLong(keyAddr, key); - unsafe.putInt(keyAddr + OFFSET, tail); - unsafe.putInt(keyAddr + LENGTH, length); - unsafe.copyMemory(value, BYTE_ARRAY_OFFSET, null, segmentStart + tail, length); - - segment.tail = newTail; - return true; + mmap.put(value, keyOffset + KEY_SIZE, DATA_SIZE); + //mmap.dump(segment.start,100); + return key; } finally { - segment.release(); + segment.verify(); + mmap.release(); } } public int count() { int count = 0; for (Segment segment : segments) { - count += segment.count; + count += segment.getCount(); } return count; } @@ -179,45 +184,32 @@ private int calculateSegmentCount(long requestedCapacity, long segmentSize) { return segmentCount; } - private Segment segmentFor(long key) { - return segments[((int) (key ^ (key >>> 16))) & segmentMask]; + private Segment segmentFor(byte[] key) { + return segments[Arrays.hashCode(key) & segmentMask]; } - private static long binarySearch(long key, long low, long high) { - for (high -= KEY_SIZE; low <= high; ) { - long mid = ((low + high) >>> 1) & ~7L; - long midVal = unsafe.getLong(mid); + private int binarySearch(byte[] key, int low, int high) { + byte[] midval = new byte[KEY_SIZE]; + ByteBuffer kbuf = ByteBuffer.wrap(key); + + for (high -= ELEMENT_SIZE; low <= high; ) { + int midOffset = low + (((high -low)/ ELEMENT_SIZE) >>> 1) * ELEMENT_SIZE; + + mmap.get(midval,midOffset,KEY_SIZE); + int compare = mmap.compare(midOffset, kbuf); - if (midVal < key) { - low = mid + KEY_SIZE; - } else if (midVal > key) { - high = mid - KEY_SIZE; + if (compare < 0) { + low = midOffset + ELEMENT_SIZE; + } else if (compare > 0) { + high = midOffset - ELEMENT_SIZE; } else { - return mid; + return midOffset; } } return ~low; } - private static void purgeOverlappingRegion(Segment segment, int from, int to) { - long pos = segment.start + OFFSET; - int count = segment.count; - long end = pos + (count << 3); - for (long newPos = pos; pos < end; pos += KEY_SIZE) { - int offset = unsafe.getInt(pos); - if (offset >= from && offset < to) { - count--; - } else { - if (newPos != pos) { - unsafe.putInt(newPos, offset); - unsafe.putInt(newPos + 4, unsafe.getInt(pos + 4)); - unsafe.putLong(newPos - KEY_SPACE, unsafe.getLong(pos - KEY_SPACE)); - } - newPos += KEY_SIZE; - } - } - segment.count = count; - } + } diff --git a/src/util/AtomicLock.java b/src/util/AtomicLock.java index da16d66..83937d2 100644 --- a/src/util/AtomicLock.java +++ b/src/util/AtomicLock.java @@ -34,10 +34,7 @@ public AtomicLock(long address) { address_ = address; } - public void dump(int offset) { - System.out.println(String.format("%02X", unsafe.getInt(address_ + offset))); - } - + public void lock() { while (true) { diff --git a/src/util/MappedFile.java b/src/util/MappedFile.java index c464111..a04dc7f 100755 --- a/src/util/MappedFile.java +++ b/src/util/MappedFile.java @@ -3,51 +3,108 @@ import sun.nio.ch.FileChannelImpl; import java.io.RandomAccessFile; -import java.lang.reflect.Method; +import java.nio.ByteBuffer; +import java.nio.MappedByteBuffer; import java.nio.channels.FileChannel; +import java.nio.channels.FileLock; -public class MappedFile { - private static Method map0 = JavaInternals.getMethod(FileChannelImpl.class, "map0", int.class, long.class, long.class); - private static Method unmap0 = JavaInternals.getMethod(FileChannelImpl.class, "unmap0", long.class, long.class); - private long addr; - private long size; +public class MappedFile { + private FileChannel ch; + private FileLock lock; + private MappedByteBuffer map; public MappedFile(String name, long size) throws Exception { + RandomAccessFile f = null; size = (size + 0xfffL) & ~0xfffL; + f = new RandomAccessFile(name, "rw"); + f.setLength(size); + ch = f.getChannel(); + map = ch.map(FileChannel.MapMode.READ_WRITE, 0, size); + } + + public void put(byte[] buffer, int index, int length) { + map.position(index); + map.put(buffer, 0, length); + } + + public void get(byte[] buffer, int index, int length) { + map.position(index); + map.mark(); + map.get(buffer, 0, length); + map.reset(); + } + - RandomAccessFile f = new RandomAccessFile(name, "rw"); - FileChannel ch = null; + public void copy(int srcIndex, int targetIndex, int length) { + for (int i = length-1; i >= 0; i--) { + map.position(0); + map.put(targetIndex + i, map.get(srcIndex + i)); + } + } + + public void put(int val, int index) { + map.position(index); + map.putInt(val); + } + public int get(int index) { + map.position(index); + return map.getInt(); + } + + + public void close() { try { - f.setLength(size); - ch = f.getChannel(); - this.addr = (Long) map0.invoke(ch, 1, 0L, size); - this.size = size; - } finally { if (ch != null) { ch.close(); } - f.close(); + } catch (Exception e) { + } + } - public void close() { - if (addr != 0) { - try { - unmap0.invoke(null, addr, size); - } catch (Exception e) { - // ignore - } - addr = 0; + public void lock() { + try { + lock = ch.lock(); + } catch (Exception e) { + System.out.println("lock is flying"); + } + } + + public void release() { + try { + lock.release(); + } catch (Exception e) { + System.out.println("release is flying"); } + } - public final long getAddr() { - return addr; + public int compare(int index, ByteBuffer buffer) { + map.position(index); + map.mark(); + int result = map.compareTo(buffer); + map.reset(); + return result; } - public final long getSize() { - return size; + + + + public void dump(int index, int size) { + + final char[] hexArray = "0123456789ABCDEF".toCharArray(); + byte[] bytes = new byte[size]; + get(bytes, index, size); + + char[] hexChars = new char[bytes.length * 2]; + for (int j = 0; j < bytes.length; j++) { + int v = bytes[j] & 0xFF; + hexChars[j * 2] = hexArray[v >>> 4]; + hexChars[j * 2 + 1] = hexArray[v & 0x0F]; + } + System.out.println(new String(hexChars)); } } From 85d95d62048ffad8d75e337c9d1a537e97485d7b Mon Sep 17 00:00:00 2001 From: shachar Date: Thu, 11 Feb 2016 21:47:04 -0800 Subject: [PATCH 4/8] - sort key by reference - this way we can add linked list as offsets in the data --- src/cache/SharedMemoryCache.java | 210 ++++++++++++++++++++++--------- src/util/MappedFile.java | 21 +++- 2 files changed, 171 insertions(+), 60 deletions(-) diff --git a/src/cache/SharedMemoryCache.java b/src/cache/SharedMemoryCache.java index a1bf9d1..e8417fd 100755 --- a/src/cache/SharedMemoryCache.java +++ b/src/cache/SharedMemoryCache.java @@ -1,94 +1,179 @@ package cache; +import sun.jvm.hotspot.runtime.Bytes; import util.MappedFile; import java.nio.ByteBuffer; import java.util.Arrays; +import java.util.Locale; public class SharedMemoryCache implements ICache { + + // Attributes + private MappedFile mmap; + private int segmentSize; + private int segmentMask; + private Segment[] segments; + + // FILE HEADER STRUCTURE + public static final int TOTAL_SIZE_OFFSET = 0; + public static final int LRU_HEAD_OFFSET = TOTAL_SIZE_OFFSET + 8; + public static final int HEADER_SIZE = LRU_HEAD_OFFSET + 4; + + private long getTotal() { + return mmap.getLong(TOTAL_SIZE_OFFSET); + } + + private void updateTotal(long size) { + mmap.putLong(getTotal() + size, TOTAL_SIZE_OFFSET); + } + + private void updateLRU(int offsetLatest) { + + } + + // HASH ELEMENT STRUCTURE + public static final int KEY_REF_SIZE = Short.SIZE/8; public static final int KEY_SIZE = 8; - public static final int DATA_SIZE = 16; + public static final int DATA_SIZE = CacheMetaInfo.DataSize ; private static final int ELEMENT_SIZE = KEY_SIZE + DATA_SIZE; - public final static int MAX_KEY_COUNT(int size) { - return size / ELEMENT_SIZE; - } - private static int dateOffset(int keyAddress) { - return keyAddress + KEY_SIZE; + + + + + static final class CacheMetaInfo { + + ByteBuffer bytes_; + + public static final int activateOffset = 0; + public static final int sizeOffset = 1; + public static final int timeOffset = Long.SIZE/8 + 1; + public static final int DataSize = Long.SIZE/8 + Long.SIZE/8 + 1; + + CacheMetaInfo(ByteBuffer bytes) { + bytes_ = bytes; + } + + CacheMetaInfo(byte activate, long size, long timestamp) { + bytes_ = ByteBuffer.allocate(DataSize); + bytes_.put(activate); + bytes_.putLong(timestamp); + bytes_.putLong(size); + + } + + public byte[] bytes() { return bytes_.array(); } + + public boolean activated() { + return bytes_.get(activateOffset) > 0; + } + + public long timestamp() { + bytes_.position(timeOffset); + bytes_.mark(); + long val = bytes_.getLong(); + bytes_.reset(); + return val; + } + + public long size() { + bytes_.position(sizeOffset); + bytes_.mark(); + long val = bytes_.getLong(); + bytes_.reset(); + return val; + } + + + } - private MappedFile mmap; - private int segmentSize; - private int segmentMask; - private Segment[] segments; static final class Segment { - int start =0; + int start = 0; int size; - MappedFile mmap=null; + MappedFile mmap = null; Segment(int start, int size, MappedFile mmap) { - this.size = size ; + this.size = size; this.start = start; this.mmap = mmap; verify(); } - public int elementsSpaceSize() { - return size -4; + public int elementsSpaceSize() { + return size - 2; } + public int elementsSpaceOffset() { - return start + 4; + return start + 2; } - public int countOffset(){ + + public int countOffset() { return start; } - public int getCount() { - return mmap.get(countOffset()); + public short getCount() { + return mmap.getShort(countOffset()); } public void incCount() { - mmap.put(getCount() + 1, countOffset()); + mmap.putShort((short)(getCount()+1), countOffset()); + } + + + public final int maxKeyCount() { + int numOfElements = elementsSpaceSize() / (ELEMENT_SIZE + KEY_REF_SIZE); + if ( numOfElements > Short.MAX_VALUE) { + numOfElements = Short.MAX_VALUE; + System.out.printf("LOG OF ERROR"); + } + return numOfElements; + } + + private int keysOffset() { + return elementsSpaceOffset()+ maxKeyCount()*KEY_REF_SIZE; + } + + private int keyOffset(short ref) { + return keysOffset() + ref * ELEMENT_SIZE; } + private int dateOffset(short ref) { + return keyOffset(ref) + DATA_SIZE ; + } private void verify() { int start = elementsSpaceOffset(); int size = elementsSpaceSize(); int pos = start; + int keyOffset; byte[] prevKey = new byte[KEY_SIZE]; byte[] key = new byte[KEY_SIZE]; - if (getCount() > MAX_KEY_COUNT(size)) { + if (getCount() > maxKeyCount()) { throw new Error("shared cache element in position count is invalid"); } - for (int c = 0; c < getCount(); c++, pos += ELEMENT_SIZE) { - - if ( mmap.compare(pos, ByteBuffer.wrap(prevKey) ) <= 0 ) { + for (int c = 0; c < getCount(); c++, pos += KEY_REF_SIZE) { + keyOffset = this.keyOffset(mmap.getShort(pos)); + if (mmap.compare(keyOffset, ByteBuffer.wrap(prevKey)) <= 0) { throw new Error("shared cache element in position is invalid"); } - mmap.get(prevKey,pos,KEY_SIZE); - } - - if ( ((pos - start)/ ELEMENT_SIZE) < getCount() ) { - throw new Error("shared cache element in position count is invalid"); + mmap.get(prevKey, keyOffset, KEY_SIZE); } - } - - } public SharedMemoryCache(MemoryCacheConfiguration configuration) throws Exception { - long requestedCapacity = configuration.getCapacity(); + long requestedCapacity = configuration.getCapacity() - HEADER_SIZE; long desiredSegmentSize = configuration.getSegmentSize(); - int segmentCount = calculateSegmentCount(requestedCapacity, desiredSegmentSize); - int segmentSize = (int)((requestedCapacity / segmentCount + 31) & ~31L); + int segmentCount = calculateSegmentCount(requestedCapacity, desiredSegmentSize); + int segmentSize = (int) ((requestedCapacity / segmentCount + 31) & ~31L); this.mmap = new MappedFile(configuration.getImageFile(), segmentSize * segmentCount); this.segmentSize = (int) segmentSize; @@ -96,7 +181,7 @@ public SharedMemoryCache(MemoryCacheConfiguration configuration) throws Exceptio this.segments = new Segment[segmentCount]; for (int i = 0; i < segmentCount; i++) { - segments[i] = new Segment(segmentSize * i, this.segmentSize,mmap); + segments[i] = new Segment(HEADER_SIZE + segmentSize * i, this.segmentSize, mmap); } } @@ -109,7 +194,7 @@ public void close() { @Override public byte[] get(byte[] key) { - if ( key.length != KEY_SIZE) { + if (key.length != KEY_SIZE) { return null; } @@ -117,11 +202,11 @@ public byte[] get(byte[] key) { mmap.lock(); try { int segmentStart = segment.elementsSpaceOffset(); - int keysEnd = segmentStart + segment.getCount() * ELEMENT_SIZE; - int keyOffset = binarySearch(key, segmentStart, keysEnd); + int keysEnd = segmentStart + segment.getCount() * KEY_REF_SIZE; + int keyRefOffset = binarySearch(key, segmentStart, keysEnd,segment); - if (keyOffset > 0) { - int offset = dateOffset(keyOffset); + if (keyRefOffset > 0) { + int offset = segment.dateOffset(mmap.getShort(keyRefOffset)); byte[] result = new byte[DATA_SIZE]; mmap.get(result, offset, DATA_SIZE); return result; @@ -133,34 +218,41 @@ public byte[] get(byte[] key) { } } + @Override public byte[] put(byte[] key, byte[] value) { int keyOffset; + int keyRefOffset; int keysEnd; if (value.length != DATA_SIZE || key.length != KEY_SIZE) { return null; } + CacheMetaInfo v = new CacheMetaInfo(ByteBuffer.wrap(value)); + Segment segment = segmentFor(key); mmap.lock(); try { int segmentStart = segment.elementsSpaceOffset(); int count = segment.getCount(); - if (count >= MAX_KEY_COUNT(segment.elementsSpaceSize())) { + if (count >= segment.maxKeyCount()) { return null; } - //mmap.dump(segment.start,100); - keysEnd = segmentStart + (count * ELEMENT_SIZE); - keyOffset = binarySearch(key, segmentStart, keysEnd); - if (keyOffset < 0) { - keyOffset = ~keyOffset; - mmap.copy(keyOffset, keyOffset + ELEMENT_SIZE, keysEnd - keyOffset); + mmap.dump(segment.start,100); + keysEnd = segmentStart + (count * KEY_REF_SIZE); + keyRefOffset = binarySearch(key, segmentStart, keysEnd,segment); + if (keyRefOffset < 0) { + keyRefOffset = ~keyRefOffset; + mmap.copy(keyRefOffset, keyRefOffset + KEY_REF_SIZE, keysEnd - keyRefOffset); + mmap.putShort(segment.getCount(),keyRefOffset); + keyOffset = segment.keyOffset(segment.getCount()); mmap.put(key, keyOffset, KEY_SIZE); + updateTotal(v.size()); segment.incCount(); } - + keyOffset = segment.keyOffset(mmap.getShort(keyRefOffset)); mmap.put(value, keyOffset + KEY_SIZE, DATA_SIZE); - //mmap.dump(segment.start,100); + mmap.dump(segment.start,100); return key; } finally { segment.verify(); @@ -168,6 +260,7 @@ public byte[] put(byte[] key, byte[] value) { } } + public int count() { int count = 0; for (Segment segment : segments) { @@ -188,20 +281,21 @@ private Segment segmentFor(byte[] key) { return segments[Arrays.hashCode(key) & segmentMask]; } - private int binarySearch(byte[] key, int low, int high) { + private int binarySearch(byte[] key, int low, int high, Segment seg) { byte[] midval = new byte[KEY_SIZE]; ByteBuffer kbuf = ByteBuffer.wrap(key); - for (high -= ELEMENT_SIZE; low <= high; ) { - int midOffset = low + (((high -low)/ ELEMENT_SIZE) >>> 1) * ELEMENT_SIZE; - - mmap.get(midval,midOffset,KEY_SIZE); + for (high -= KEY_REF_SIZE; low <= high; ) { + int midRefOffset = low + (((high - low) / KEY_REF_SIZE) >>> 1) * KEY_REF_SIZE; + short midRef = mmap.getShort(midRefOffset); + int midOffset = seg.keyOffset(midRef); + mmap.get(midval, midOffset, KEY_SIZE); int compare = mmap.compare(midOffset, kbuf); if (compare < 0) { - low = midOffset + ELEMENT_SIZE; + low = midRefOffset + KEY_REF_SIZE; } else if (compare > 0) { - high = midOffset - ELEMENT_SIZE; + high = midRefOffset - KEY_REF_SIZE; } else { return midOffset; } @@ -210,6 +304,4 @@ private int binarySearch(byte[] key, int low, int high) { } - - } diff --git a/src/util/MappedFile.java b/src/util/MappedFile.java index a04dc7f..68d4ea9 100755 --- a/src/util/MappedFile.java +++ b/src/util/MappedFile.java @@ -37,8 +37,8 @@ public void get(byte[] buffer, int index, int length) { public void copy(int srcIndex, int targetIndex, int length) { + map.position(0); for (int i = length-1; i >= 0; i--) { - map.position(0); map.put(targetIndex + i, map.get(srcIndex + i)); } } @@ -53,6 +53,25 @@ public int get(int index) { return map.getInt(); } + public void putLong(long val, int index) { + map.position(index); + map.putLong(val); + } + + public long getLong(int index) { + map.position(index); + return map.getLong(); + } + + public void putShort(short val, int index) { + map.position(index); + map.putShort(val); + } + + public short getShort(int index) { + map.position(index); + return map.getShort(); + } public void close() { try { From a924f18bccbc66e90df5718302d00f0082d4dc2c Mon Sep 17 00:00:00 2001 From: shachar Date: Sun, 14 Feb 2016 19:08:23 -0800 Subject: [PATCH 5/8] fixed linked list of LRU --- src/CacheTest.java | 13 ++-- src/cache/SharedMemoryCache.java | 115 +++++++++++++++++++++++++------ src/util/MappedFile.java | 5 +- 3 files changed, 106 insertions(+), 27 deletions(-) diff --git a/src/CacheTest.java b/src/CacheTest.java index 642e894..e8cc74c 100755 --- a/src/CacheTest.java +++ b/src/CacheTest.java @@ -17,8 +17,11 @@ public class CacheTest { public static void testWrite(ICache cache, int count) { + Random random = new Random(1); for (int i = 0; i < count; i++) { - byte[] key = Arrays.copyOfRange(UUID.randomUUID().toString().intern().getBytes(), 0, SharedMemoryCache.KEY_SIZE); + //byte[] key = Arrays.copyOfRange(UUID.randomUUID().toString().intern().getBytes(), 0, SharedMemoryCache.KEY_SIZE); + byte[] key = Arrays.copyOfRange(Long.toHexString(((random.nextInt() % RUN_COUNT) << 20) * MAGIC).intern().getBytes(), 0, SharedMemoryCache.KEY_SIZE); + //byte[] key = { 1,1,1,1,1,1,1,1 }; cache.put(key, new byte[SharedMemoryCache.DATA_SIZE]); } } @@ -26,15 +29,17 @@ public static void testWrite(ICache cache, int count) { public static void testRead(ICache cache, int count) { Random random = new Random(1); for (int i = 0; i < count; i++) { - byte[] key = Arrays.copyOfRange( UUID.randomUUID().toString().intern().getBytes(), 0, SharedMemoryCache.KEY_SIZE); + byte[] key = Arrays.copyOfRange(Long.toHexString(((random.nextInt() % RUN_COUNT) << 20) * MAGIC).intern().getBytes(), 0, SharedMemoryCache.KEY_SIZE); + // byte[] key = Arrays.copyOfRange( UUID.randomUUID().toString().intern().getBytes(), 0, SharedMemoryCache.KEY_SIZE); cache.get(key); } } public static void testRead9Write1(ICache cache, int count) { - Random random = new Random(2); + Random random = new Random(1); for (int i = 0; i < count; i++) { - byte[] key = Arrays.copyOfRange( UUID.randomUUID().toString().intern().getBytes(), 0, SharedMemoryCache.KEY_SIZE); + byte[] key = Arrays.copyOfRange(Long.toHexString(((random.nextInt() % RUN_COUNT) << 20) * MAGIC).intern().getBytes(), 0, SharedMemoryCache.KEY_SIZE); + // byte[] key = Arrays.copyOfRange( UUID.randomUUID().toString().intern().getBytes(), 0, SharedMemoryCache.KEY_SIZE); if (random.nextInt(10) == 0) { cache.put(key, new byte[random.nextInt(SharedMemoryCache.DATA_SIZE)]); } else { diff --git a/src/cache/SharedMemoryCache.java b/src/cache/SharedMemoryCache.java index e8417fd..87b5051 100755 --- a/src/cache/SharedMemoryCache.java +++ b/src/cache/SharedMemoryCache.java @@ -1,11 +1,9 @@ package cache; -import sun.jvm.hotspot.runtime.Bytes; import util.MappedFile; import java.nio.ByteBuffer; import java.util.Arrays; -import java.util.Locale; public class SharedMemoryCache implements ICache { @@ -29,18 +27,14 @@ private void updateTotal(long size) { mmap.putLong(getTotal() + size, TOTAL_SIZE_OFFSET); } - private void updateLRU(int offsetLatest) { - } // HASH ELEMENT STRUCTURE public static final int KEY_REF_SIZE = Short.SIZE/8; public static final int KEY_SIZE = 8; + public static final int LINK_SIZE = Integer.SIZE/8; public static final int DATA_SIZE = CacheMetaInfo.DataSize ; - private static final int ELEMENT_SIZE = KEY_SIZE + DATA_SIZE; - - - + private static final int ELEMENT_SIZE = LINK_SIZE*2 + KEY_SIZE + DATA_SIZE; @@ -134,7 +128,6 @@ public final int maxKeyCount() { } return numOfElements; } - private int keysOffset() { return elementsSpaceOffset()+ maxKeyCount()*KEY_REF_SIZE; } @@ -143,10 +136,25 @@ private int keyOffset(short ref) { return keysOffset() + ref * ELEMENT_SIZE; } - private int dateOffset(short ref) { - return keyOffset(ref) + DATA_SIZE ; + private static int oldLinkOffset(int keyOffset) { + return keyOffset + KEY_SIZE ; + } + + private static int newLinkOffset(int keyOffset) { + return keyOffset + KEY_SIZE + LINK_SIZE ; + } + + private static int dataOffset(int keyOffset) { + return keyOffset + KEY_SIZE + 2*LINK_SIZE ; + } + + private int dataOffset(short ref) { + return dataOffset(keyOffset(ref)) ; } + + + private void verify() { int start = elementsSpaceOffset(); int size = elementsSpaceSize(); @@ -161,7 +169,7 @@ private void verify() { for (int c = 0; c < getCount(); c++, pos += KEY_REF_SIZE) { keyOffset = this.keyOffset(mmap.getShort(pos)); - if (mmap.compare(keyOffset, ByteBuffer.wrap(prevKey)) <= 0) { + if (mmap.compare(keyOffset, KEY_SIZE, ByteBuffer.wrap(prevKey)) <= 0) { throw new Error("shared cache element in position is invalid"); } mmap.get(prevKey, keyOffset, KEY_SIZE); @@ -206,7 +214,7 @@ public byte[] get(byte[] key) { int keyRefOffset = binarySearch(key, segmentStart, keysEnd,segment); if (keyRefOffset > 0) { - int offset = segment.dateOffset(mmap.getShort(keyRefOffset)); + int offset = segment.dataOffset(mmap.getShort(keyRefOffset)); byte[] result = new byte[DATA_SIZE]; mmap.get(result, offset, DATA_SIZE); return result; @@ -234,14 +242,15 @@ public byte[] put(byte[] key, byte[] value) { mmap.lock(); try { int segmentStart = segment.elementsSpaceOffset(); - int count = segment.getCount(); - if (count >= segment.maxKeyCount()) { - return null; - } + + mmap.dump(segment.start,100); - keysEnd = segmentStart + (count * KEY_REF_SIZE); + keysEnd = segmentStart + (segment.getCount() * KEY_REF_SIZE); keyRefOffset = binarySearch(key, segmentStart, keysEnd,segment); if (keyRefOffset < 0) { + if (segment.getCount() >= segment.maxKeyCount()) { + return null; + } keyRefOffset = ~keyRefOffset; mmap.copy(keyRefOffset, keyRefOffset + KEY_REF_SIZE, keysEnd - keyRefOffset); mmap.putShort(segment.getCount(),keyRefOffset); @@ -250,12 +259,14 @@ public byte[] put(byte[] key, byte[] value) { updateTotal(v.size()); segment.incCount(); } - keyOffset = segment.keyOffset(mmap.getShort(keyRefOffset)); - mmap.put(value, keyOffset + KEY_SIZE, DATA_SIZE); + short ref = mmap.getShort(keyRefOffset); + mmap.put(value, segment.dataOffset(ref),DATA_SIZE); + moveToHead(LRU_HEAD_OFFSET, ref, segment); mmap.dump(segment.start,100); return key; } finally { segment.verify(); + verifyLRU(); mmap.release(); } } @@ -290,18 +301,78 @@ private int binarySearch(byte[] key, int low, int high, Segment seg) { short midRef = mmap.getShort(midRefOffset); int midOffset = seg.keyOffset(midRef); mmap.get(midval, midOffset, KEY_SIZE); - int compare = mmap.compare(midOffset, kbuf); + int compare = mmap.compare(midOffset,KEY_SIZE, kbuf); if (compare < 0) { low = midRefOffset + KEY_REF_SIZE; } else if (compare > 0) { high = midRefOffset - KEY_REF_SIZE; } else { - return midOffset; + return midRefOffset; } } return ~low; } + private int prev(int offset ){ + if (offset == LRU_HEAD_OFFSET ) { + return mmap.get(offset); + } + else { + return mmap.get(Segment.oldLinkOffset(offset)); + } + } + + private void removeLink(int headLinkOffest, int keyOffset) { + int oldKeyOffset = mmap.get(Segment.oldLinkOffset(keyOffset)); + if (oldKeyOffset > 0) { + int newKeyOffset = mmap.get(Segment.newLinkOffset(keyOffset)); + int headKeyOffset = mmap.get(headLinkOffest); + if (keyOffset == headKeyOffset) { + mmap.put(headKeyOffset, Segment.newLinkOffset(oldKeyOffset)); + mmap.put(oldKeyOffset, headLinkOffest ); + } else { + mmap.put(newKeyOffset, Segment.newLinkOffset(oldKeyOffset)); + mmap.put(oldKeyOffset, Segment.oldLinkOffset(newKeyOffset) ); + } + } + mmap.put(0, Segment.oldLinkOffset(keyOffset)); + mmap.put(0, Segment.newLinkOffset(keyOffset)); + } + + + private void updateHead(int headLinkOffest, int keyOffset) { + int headKeyOffset = mmap.get(headLinkOffest); + if (keyOffset != headKeyOffset) { + if (headKeyOffset > 0) { + mmap.put(keyOffset, Segment.newLinkOffset(headKeyOffset)); + mmap.put(headKeyOffset, Segment.oldLinkOffset(keyOffset)); + } + mmap.put(keyOffset, headLinkOffest); + } + } + + private void moveToHead(int headLinkOffest, short ref, Segment seg) { + int keyOffset = seg.keyOffset(ref); + if (keyOffset != mmap.get(headLinkOffest)) { + removeLink(headLinkOffest, keyOffset); + updateHead(headLinkOffest, keyOffset); + } + } + + + private void verifyLRU() { + int keyOffset = LRU_HEAD_OFFSET; + int count = 0; + while ( (keyOffset = prev(keyOffset)) > 0) { + count++; + } + if (count != this.count()) { + throw new Error("shared cache - not all keys are linked"); + } + } + + + } diff --git a/src/util/MappedFile.java b/src/util/MappedFile.java index 68d4ea9..55535ba 100755 --- a/src/util/MappedFile.java +++ b/src/util/MappedFile.java @@ -101,11 +101,14 @@ public void release() { } - public int compare(int index, ByteBuffer buffer) { + public int compare(int index, int length, ByteBuffer buffer) { map.position(index); + int oldLimit = map.limit(); + map.limit(index+length); map.mark(); int result = map.compareTo(buffer); map.reset(); + map.limit(oldLimit); return result; } From ec9a9c95d4bacf9f5018db319f72aac41b80c756 Mon Sep 17 00:00:00 2001 From: shachar Date: Mon, 15 Feb 2016 18:39:39 -0800 Subject: [PATCH 6/8] add timestamp + total count checks --- src/cache/SharedMemoryCache.java | 75 ++++++++++++++++++++++++-------- 1 file changed, 58 insertions(+), 17 deletions(-) diff --git a/src/cache/SharedMemoryCache.java b/src/cache/SharedMemoryCache.java index 87b5051..a3e341c 100755 --- a/src/cache/SharedMemoryCache.java +++ b/src/cache/SharedMemoryCache.java @@ -4,6 +4,7 @@ import java.nio.ByteBuffer; import java.util.Arrays; +import java.util.Date; public class SharedMemoryCache implements ICache { @@ -15,9 +16,10 @@ public class SharedMemoryCache implements ICache { private Segment[] segments; // FILE HEADER STRUCTURE + public static final int LINK_SIZE = Integer.SIZE/8; public static final int TOTAL_SIZE_OFFSET = 0; - public static final int LRU_HEAD_OFFSET = TOTAL_SIZE_OFFSET + 8; - public static final int HEADER_SIZE = LRU_HEAD_OFFSET + 4; + public static final int LRU_HEAD_OFFSET = TOTAL_SIZE_OFFSET + Long.SIZE/8; + public static final int HEADER_SIZE = LRU_HEAD_OFFSET + LINK_SIZE; private long getTotal() { return mmap.getLong(TOTAL_SIZE_OFFSET); @@ -32,7 +34,6 @@ private void updateTotal(long size) { // HASH ELEMENT STRUCTURE public static final int KEY_REF_SIZE = Short.SIZE/8; public static final int KEY_SIZE = 8; - public static final int LINK_SIZE = Integer.SIZE/8; public static final int DATA_SIZE = CacheMetaInfo.DataSize ; private static final int ELEMENT_SIZE = LINK_SIZE*2 + KEY_SIZE + DATA_SIZE; @@ -41,22 +42,25 @@ private void updateTotal(long size) { static final class CacheMetaInfo { ByteBuffer bytes_; + Date d_; public static final int activateOffset = 0; public static final int sizeOffset = 1; public static final int timeOffset = Long.SIZE/8 + 1; public static final int DataSize = Long.SIZE/8 + Long.SIZE/8 + 1; + CacheMetaInfo(ByteBuffer bytes) { bytes_ = bytes; + d_ = new Date(); } - CacheMetaInfo(byte activate, long size, long timestamp) { + CacheMetaInfo(byte activate, long size) { bytes_ = ByteBuffer.allocate(DataSize); bytes_.put(activate); - bytes_.putLong(timestamp); + bytes_.putLong(0); bytes_.putLong(size); - + d_ = new Date(); } public byte[] bytes() { return bytes_.array(); } @@ -73,6 +77,11 @@ public long timestamp() { return val; } + public void setTimestamp() { + bytes_.putLong(timeOffset, d_.getTime()); + } + + public long size() { bytes_.position(sizeOffset); bytes_.mark(); @@ -147,7 +156,11 @@ private static int newLinkOffset(int keyOffset) { private static int dataOffset(int keyOffset) { return keyOffset + KEY_SIZE + 2*LINK_SIZE ; } - + private CacheMetaInfo data(int keyOffset) { + byte[] data = new byte[DATA_SIZE]; + mmap.get(data, dataOffset(keyOffset),DATA_SIZE) ; + return new CacheMetaInfo(ByteBuffer.wrap(data)); + } private int dataOffset(short ref) { return dataOffset(keyOffset(ref)) ; } @@ -155,6 +168,7 @@ private int dataOffset(short ref) { + private void verify() { int start = elementsSpaceOffset(); int size = elementsSpaceSize(); @@ -237,9 +251,9 @@ public byte[] put(byte[] key, byte[] value) { } CacheMetaInfo v = new CacheMetaInfo(ByteBuffer.wrap(value)); - Segment segment = segmentFor(key); mmap.lock(); + v.setTimestamp(); try { int segmentStart = segment.elementsSpaceOffset(); @@ -292,6 +306,11 @@ private Segment segmentFor(byte[] key) { return segments[Arrays.hashCode(key) & segmentMask]; } + private Segment segmentFor(int keyOffset) { + int index = (keyOffset -HEADER_SIZE)/segmentSize; + return segments[index]; + } + private int binarySearch(byte[] key, int low, int high, Segment seg) { byte[] midval = new byte[KEY_SIZE]; ByteBuffer kbuf = ByteBuffer.wrap(key); @@ -325,15 +344,19 @@ private int prev(int offset ){ private void removeLink(int headLinkOffest, int keyOffset) { int oldKeyOffset = mmap.get(Segment.oldLinkOffset(keyOffset)); - if (oldKeyOffset > 0) { - int newKeyOffset = mmap.get(Segment.newLinkOffset(keyOffset)); - int headKeyOffset = mmap.get(headLinkOffest); - if (keyOffset == headKeyOffset) { + int newKeyOffset = mmap.get(Segment.newLinkOffset(keyOffset)); + int headKeyOffset = mmap.get(headLinkOffest); + if (keyOffset == headKeyOffset) { + if (oldKeyOffset > 0) { mmap.put(headKeyOffset, Segment.newLinkOffset(oldKeyOffset)); - mmap.put(oldKeyOffset, headLinkOffest ); - } else { + } + mmap.put(oldKeyOffset, headLinkOffest); + } else { + if (oldKeyOffset > 0) { mmap.put(newKeyOffset, Segment.newLinkOffset(oldKeyOffset)); - mmap.put(oldKeyOffset, Segment.oldLinkOffset(newKeyOffset) ); + } + if (newKeyOffset > 0) { + mmap.put(oldKeyOffset, Segment.oldLinkOffset(newKeyOffset)); } } mmap.put(0, Segment.oldLinkOffset(keyOffset)); @@ -363,13 +386,31 @@ private void moveToHead(int headLinkOffest, short ref, Segment seg) { private void verifyLRU() { int keyOffset = LRU_HEAD_OFFSET; + int prevKeyOffset; int count = 0; - while ( (keyOffset = prev(keyOffset)) > 0) { - count++; + long totalSize = 0; + Segment seg; + CacheMetaInfo info = new CacheMetaInfo((byte)0,0L); info.setTimestamp(); + CacheMetaInfo prevInfo; + while ((prevKeyOffset = prev(keyOffset)) > 0) { + seg = segmentFor(prevKeyOffset); + prevInfo = seg.data(prevKeyOffset); + if (info.timestamp() < prevInfo.timestamp()) { + throw new Error("shared cache - keys timestamp do not match"); + } + count++; + totalSize += info.size(); + info = prevInfo; + keyOffset = prevKeyOffset; + } if (count != this.count()) { throw new Error("shared cache - not all keys are linked"); } + + if (totalSize != this.getTotal()) { + throw new Error("shared cache - total size do not match"); + } } From 73467a3c133a708fa1fb85ad959414bedded6aff Mon Sep 17 00:00:00 2001 From: shachar Date: Wed, 17 Feb 2016 03:34:03 -0800 Subject: [PATCH 7/8] add eviction policy + testing --- src/CacheTest.java | 13 +- src/cache/ConcurrentHashMapCache.java | 6 +- src/cache/ICache.java | 4 +- src/cache/MemoryCacheConfiguration.java | 10 +- src/cache/SharedMemoryCache.java | 303 +++++++++++++++++------- src/util/MappedFile.java | 29 ++- 6 files changed, 265 insertions(+), 100 deletions(-) diff --git a/src/CacheTest.java b/src/CacheTest.java index e8cc74c..3050c2c 100755 --- a/src/CacheTest.java +++ b/src/CacheTest.java @@ -11,8 +11,10 @@ public class CacheTest { private static final long MAGIC = 54331; - private static final int WARMUP_COUNT = 100; - private static final int RUN_COUNT = 1000; + private static final int WARMUP_COUNT = 100; + private static final int RUN_COUNT = 1000; + private static final long CACHE_LIMIT = 10*M ; + private static final long DATA_VALUE_SIZE = (CACHE_LIMIT / RUN_COUNT)*4; @@ -21,8 +23,7 @@ public static void testWrite(ICache cache, int count) { for (int i = 0; i < count; i++) { //byte[] key = Arrays.copyOfRange(UUID.randomUUID().toString().intern().getBytes(), 0, SharedMemoryCache.KEY_SIZE); byte[] key = Arrays.copyOfRange(Long.toHexString(((random.nextInt() % RUN_COUNT) << 20) * MAGIC).intern().getBytes(), 0, SharedMemoryCache.KEY_SIZE); - //byte[] key = { 1,1,1,1,1,1,1,1 }; - cache.put(key, new byte[SharedMemoryCache.DATA_SIZE]); + cache.put(key, new SharedMemoryCache.CacheMetaInfo((byte)0,DATA_VALUE_SIZE)); } } @@ -41,7 +42,7 @@ public static void testRead9Write1(ICache cache, int count) { byte[] key = Arrays.copyOfRange(Long.toHexString(((random.nextInt() % RUN_COUNT) << 20) * MAGIC).intern().getBytes(), 0, SharedMemoryCache.KEY_SIZE); // byte[] key = Arrays.copyOfRange( UUID.randomUUID().toString().intern().getBytes(), 0, SharedMemoryCache.KEY_SIZE); if (random.nextInt(10) == 0) { - cache.put(key, new byte[random.nextInt(SharedMemoryCache.DATA_SIZE)]); + cache.put(key,new SharedMemoryCache.CacheMetaInfo((byte)0,DATA_VALUE_SIZE)); } else { cache.get(key); } @@ -82,7 +83,7 @@ public static void main(String[] args) throws Exception { if ("chm".equals(type)) { cache = new ConcurrentHashMapCache(3000000, 256); } else { - cache = new SharedMemoryCache(new MemoryCacheConfiguration(100*K, 10*K, "/tmp/cache-shm-test")); + cache = new SharedMemoryCache(new MemoryCacheConfiguration(100*K, 10*K, CACHE_LIMIT, "/tmp/cache-shm-test")); } testAll(cache); cache.close(); diff --git a/src/cache/ConcurrentHashMapCache.java b/src/cache/ConcurrentHashMapCache.java index 875ba83..1a85823 100755 --- a/src/cache/ConcurrentHashMapCache.java +++ b/src/cache/ConcurrentHashMapCache.java @@ -2,19 +2,19 @@ import java.util.concurrent.ConcurrentHashMap; -public class ConcurrentHashMapCache extends ConcurrentHashMap implements ICache { +public class ConcurrentHashMapCache extends ConcurrentHashMap implements ICache { public ConcurrentHashMapCache(int capacity, int concurrencyLevel) { super(capacity, 0.75f, concurrencyLevel); } @Override - public byte[] get(byte[]key) { + public SharedMemoryCache.CacheMetaInfo get(byte[]key) { return super.get(key); } @Override - public byte[] put(byte[] key, byte[] value) { + public SharedMemoryCache.CacheMetaInfo put(byte[] key, SharedMemoryCache.CacheMetaInfo value) { return super.put(key, value) ; } diff --git a/src/cache/ICache.java b/src/cache/ICache.java index ea02c10..44e19bf 100755 --- a/src/cache/ICache.java +++ b/src/cache/ICache.java @@ -1,7 +1,7 @@ package cache; public interface ICache { - byte[] get(byte[] key); - byte[] put(byte[]key, byte[] value); + SharedMemoryCache.CacheMetaInfo get(byte[] key); + SharedMemoryCache.CacheMetaInfo put(byte[]key, SharedMemoryCache.CacheMetaInfo value); void close(); } diff --git a/src/cache/MemoryCacheConfiguration.java b/src/cache/MemoryCacheConfiguration.java index d89c1b6..e749c84 100755 --- a/src/cache/MemoryCacheConfiguration.java +++ b/src/cache/MemoryCacheConfiguration.java @@ -4,11 +4,13 @@ public class MemoryCacheConfiguration { private long capacity; private long segmentSize; private String imageFile; + private long limit; - public MemoryCacheConfiguration(long capacity, long segmentSize, String imageFile) { + public MemoryCacheConfiguration(long capacity, long segmentSize, long limit, String imageFile) { this.capacity = capacity; this.segmentSize = segmentSize; this.imageFile = imageFile; + this.limit = limit; } public long getCapacity() { @@ -19,7 +21,13 @@ public long getSegmentSize() { return segmentSize; } + public long getLimit() { + return limit; + } + public String getImageFile() { return imageFile; } + + } diff --git a/src/cache/SharedMemoryCache.java b/src/cache/SharedMemoryCache.java index a3e341c..bdbf128 100755 --- a/src/cache/SharedMemoryCache.java +++ b/src/cache/SharedMemoryCache.java @@ -14,12 +14,15 @@ public class SharedMemoryCache implements ICache { private int segmentSize; private int segmentMask; private Segment[] segments; + private byte aKey[]; + private byte aData[]; // FILE HEADER STRUCTURE public static final int LINK_SIZE = Integer.SIZE/8; public static final int TOTAL_SIZE_OFFSET = 0; - public static final int LRU_HEAD_OFFSET = TOTAL_SIZE_OFFSET + Long.SIZE/8; - public static final int HEADER_SIZE = LRU_HEAD_OFFSET + LINK_SIZE; + public static final int LIMIT_SIZE_OFFSET = TOTAL_SIZE_OFFSET + Long.SIZE/8; + public static final int LRU_HEAD_OFFSET = LIMIT_SIZE_OFFSET + Long.SIZE/8; + public static final int HEADER_SIZE = LRU_HEAD_OFFSET + LINK_SIZE; private long getTotal() { return mmap.getLong(TOTAL_SIZE_OFFSET); @@ -29,17 +32,25 @@ private void updateTotal(long size) { mmap.putLong(getTotal() + size, TOTAL_SIZE_OFFSET); } + private long getLimit() { + return mmap.getLong(LIMIT_SIZE_OFFSET); + } + + private void setLimit(long limit) { + mmap.putLong(limit, LIMIT_SIZE_OFFSET); + } + // HASH ELEMENT STRUCTURE public static final int KEY_REF_SIZE = Short.SIZE/8; public static final int KEY_SIZE = 8; public static final int DATA_SIZE = CacheMetaInfo.DataSize ; - private static final int ELEMENT_SIZE = LINK_SIZE*2 + KEY_SIZE + DATA_SIZE; + private static final int ELEMENT_SIZE = KEY_SIZE+ LINK_SIZE*2 + KEY_REF_SIZE + DATA_SIZE; - static final class CacheMetaInfo { + public static final class CacheMetaInfo { ByteBuffer bytes_; Date d_; @@ -50,16 +61,16 @@ static final class CacheMetaInfo { public static final int DataSize = Long.SIZE/8 + Long.SIZE/8 + 1; - CacheMetaInfo(ByteBuffer bytes) { + public CacheMetaInfo(ByteBuffer bytes) { bytes_ = bytes; d_ = new Date(); } - CacheMetaInfo(byte activate, long size) { + public CacheMetaInfo(byte activate, long size) { bytes_ = ByteBuffer.allocate(DataSize); bytes_.put(activate); - bytes_.putLong(0); bytes_.putLong(size); + bytes_.putLong(0); d_ = new Date(); } @@ -90,8 +101,6 @@ public long size() { return val; } - - } @@ -105,7 +114,13 @@ static final class Segment { this.size = size; this.start = start; this.mmap = mmap; - verify(); + + // init refs for future usage + if (getCount() == 0 ) { + for (int c = 0,pos = start; c <= maxKeyCount(); c++, pos += KEY_REF_SIZE) { + mmap.putShort((short)c,pos); + } + } } public int elementsSpaceSize() { @@ -128,6 +143,10 @@ public void incCount() { mmap.putShort((short)(getCount()+1), countOffset()); } + public void decCount() { + mmap.putShort((short)(getCount()-1), countOffset()); + } + public final int maxKeyCount() { int numOfElements = elementsSpaceSize() / (ELEMENT_SIZE + KEY_REF_SIZE); @@ -153,41 +172,80 @@ private static int newLinkOffset(int keyOffset) { return keyOffset + KEY_SIZE + LINK_SIZE ; } - private static int dataOffset(int keyOffset) { + private static int ref(int keyOffset) { return keyOffset + KEY_SIZE + 2*LINK_SIZE ; } + + private static int dataOffset(int keyOffset) { + return keyOffset + KEY_SIZE + 2*LINK_SIZE + KEY_REF_SIZE; + } + + private boolean active(int keyOffset) { + return mmap.getByte(dataOffset(keyOffset)) > 0 ; + } + + private int keysRefEnd() { + return elementsSpaceOffset() + (getCount() * KEY_REF_SIZE); + } + private CacheMetaInfo data(int keyOffset) { byte[] data = new byte[DATA_SIZE]; mmap.get(data, dataOffset(keyOffset),DATA_SIZE) ; return new CacheMetaInfo(ByteBuffer.wrap(data)); } + private int dataOffset(short ref) { return dataOffset(keyOffset(ref)) ; } + public int binarySearch(byte[] key) { + int low = elementsSpaceOffset(); + int high = low + (getCount() * KEY_REF_SIZE); + ByteBuffer kbuf = ByteBuffer.wrap(key); + for (high -= KEY_REF_SIZE; low <= high; ) { + int midRefOffset = low + (((high - low) / KEY_REF_SIZE) >>> 1) * KEY_REF_SIZE; + short midRef = mmap.getShort(midRefOffset); + int midOffset = keyOffset(midRef); - private void verify() { + int compare = mmap.compare(midOffset,KEY_SIZE, kbuf); + + if (compare < 0) { + low = midRefOffset + KEY_REF_SIZE; + } else if (compare > 0) { + high = midRefOffset - KEY_REF_SIZE; + } else { + return midRefOffset; + } + } + return ~low; + } + + private boolean verify() { int start = elementsSpaceOffset(); - int size = elementsSpaceSize(); - int pos = start; + int keyOffset; byte[] prevKey = new byte[KEY_SIZE]; - byte[] key = new byte[KEY_SIZE]; if (getCount() > maxKeyCount()) { - throw new Error("shared cache element in position count is invalid"); + //throw new Error("shared cache element in position count is invalid"); + return false; } - for (int c = 0; c < getCount(); c++, pos += KEY_REF_SIZE) { + for (int c = 0,pos = start; c < getCount(); c++, pos += KEY_REF_SIZE) { keyOffset = this.keyOffset(mmap.getShort(pos)); if (mmap.compare(keyOffset, KEY_SIZE, ByteBuffer.wrap(prevKey)) <= 0) { - throw new Error("shared cache element in position is invalid"); + //throw new Error("shared cache element in position is invalid"); + return false; } mmap.get(prevKey, keyOffset, KEY_SIZE); } + + + + return true; } } @@ -196,8 +254,11 @@ public SharedMemoryCache(MemoryCacheConfiguration configuration) throws Exceptio long desiredSegmentSize = configuration.getSegmentSize(); int segmentCount = calculateSegmentCount(requestedCapacity, desiredSegmentSize); int segmentSize = (int) ((requestedCapacity / segmentCount + 31) & ~31L); - + this.aKey = new byte[KEY_SIZE]; + this.aData = new byte[DATA_SIZE]; this.mmap = new MappedFile(configuration.getImageFile(), segmentSize * segmentCount); + mmap.lock(); + this.segmentSize = (int) segmentSize; this.segmentMask = segmentCount - 1; this.segments = new Segment[segmentCount]; @@ -205,6 +266,15 @@ public SharedMemoryCache(MemoryCacheConfiguration configuration) throws Exceptio for (int i = 0; i < segmentCount; i++) { segments[i] = new Segment(HEADER_SIZE + segmentSize * i, this.segmentSize, mmap); } + if ( this.getLimit() == 0 ) { + this.setLimit(configuration.getLimit()); + } + + if (!verify()) { + throw new Error("need to be scanned all over"); + } + + mmap.release(); } @Override @@ -214,8 +284,8 @@ public void close() { } @Override - public byte[] get(byte[] key) { - + public CacheMetaInfo get(byte[] key) { + int keyOffset; if (key.length != KEY_SIZE) { return null; } @@ -223,17 +293,12 @@ public byte[] get(byte[] key) { Segment segment = segmentFor(key); mmap.lock(); try { - int segmentStart = segment.elementsSpaceOffset(); - int keysEnd = segmentStart + segment.getCount() * KEY_REF_SIZE; - int keyRefOffset = binarySearch(key, segmentStart, keysEnd,segment); - - if (keyRefOffset > 0) { - int offset = segment.dataOffset(mmap.getShort(keyRefOffset)); + if ((keyOffset = getKeyOffset(key)) > 0) { + int offset = segment.dataOffset(keyOffset); byte[] result = new byte[DATA_SIZE]; mmap.get(result, offset, DATA_SIZE); - return result; + return new CacheMetaInfo(ByteBuffer.wrap(result)); } - return null; } finally { mmap.release(); @@ -241,46 +306,120 @@ public byte[] get(byte[] key) { } + public int getKeyOffset(byte[] key) { + int keyOffset = 0; + Segment segment = segmentFor(key); + int keyRefOffset = segment.binarySearch(key); + + if (keyRefOffset > 0) { + keyOffset = segment.keyOffset(mmap.getShort(keyRefOffset)); + } + return keyOffset; + } + + public boolean deactivate(byte[] key) { + int keyOffset; + if (key.length != KEY_SIZE) { + return false; + } + + Segment segment = segmentFor(key); + mmap.lock(); + try { + if ((keyOffset = getKeyOffset(key)) > 0) { + int offset = segment.dataOffset(keyOffset) + CacheMetaInfo.activateOffset; + mmap.putByte((byte) 0, offset); + return true; + } + return false; + } finally { + mmap.release(); + } + } + + + public void delete(int keyOffset) { + Segment segment = segmentFor(keyOffset); + mmap.get(aKey,keyOffset,KEY_SIZE); + removeLink(LRU_HEAD_OFFSET, keyOffset); + updateTotal(-1*mmap.getLong(segment.dataOffset(keyOffset)+CacheMetaInfo.sizeOffset)); + int keyRefOffset = segment.binarySearch(aKey); + if ( keyOffset < 0 ) { + throw new Error ("must have the deleted key"); + } + short ref = mmap.getShort(keyRefOffset); + int keyRefsEnd = segment.keysRefEnd(); + mmap.copy(keyRefOffset + KEY_REF_SIZE, keyRefOffset,keyRefsEnd - keyRefOffset - KEY_REF_SIZE); + mmap.putShort(ref,keyRefsEnd-KEY_REF_SIZE); + mmap.clear(keyOffset,ELEMENT_SIZE); + segment.decCount(); + } + + public boolean needEviction(long newSize) { return (this.getTotal() + newSize - this.getLimit()) > 0; } + + public boolean evict(long newSize) { + int keyOffset = LRU_HEAD_OFFSET; + Segment seg; + + while (needEviction(newSize) && (keyOffset = prev(keyOffset)) > 0) { + seg = segmentFor(keyOffset); + if (!seg.active(keyOffset)) { + delete(keyOffset); + } + } + return !needEviction(newSize); + } + + @Override - public byte[] put(byte[] key, byte[] value) { + public CacheMetaInfo put(byte[] key, CacheMetaInfo value) { int keyOffset; int keyRefOffset; - int keysEnd; - if (value.length != DATA_SIZE || key.length != KEY_SIZE) { + int keyRefsEnd; + short newRef; + + if (value.bytes().length != DATA_SIZE || key.length != KEY_SIZE) { return null; } - CacheMetaInfo v = new CacheMetaInfo(ByteBuffer.wrap(value)); + Segment segment = segmentFor(key); mmap.lock(); - v.setTimestamp(); + value.setTimestamp(); try { - int segmentStart = segment.elementsSpaceOffset(); - - - mmap.dump(segment.start,100); - keysEnd = segmentStart + (segment.getCount() * KEY_REF_SIZE); - keyRefOffset = binarySearch(key, segmentStart, keysEnd,segment); + // mmap.dump(segment.start,100); + keyRefOffset = segment.binarySearch(key); if (keyRefOffset < 0) { if (segment.getCount() >= segment.maxKeyCount()) { return null; } + + if (needEviction(value.size())) { + if (!evict(value.size())) { + return null; + } + // in case evicted from the same segment + keyRefOffset = segment.binarySearch(key); + } + keyRefOffset = ~keyRefOffset; - mmap.copy(keyRefOffset, keyRefOffset + KEY_REF_SIZE, keysEnd - keyRefOffset); - mmap.putShort(segment.getCount(),keyRefOffset); - keyOffset = segment.keyOffset(segment.getCount()); + keyRefsEnd = segment.keysRefEnd(); + newRef = mmap.getShort(keyRefsEnd); + mmap.copy(keyRefOffset, keyRefOffset + KEY_REF_SIZE, keyRefsEnd - keyRefOffset); + mmap.putShort(newRef,keyRefOffset); + keyOffset = segment.keyOffset(newRef); mmap.put(key, keyOffset, KEY_SIZE); - updateTotal(v.size()); + mmap.putShort(newRef,segment.ref(keyOffset)); + updateTotal(value.size()); segment.incCount(); } short ref = mmap.getShort(keyRefOffset); - mmap.put(value, segment.dataOffset(ref),DATA_SIZE); + mmap.put(value.bytes(), segment.dataOffset(ref),DATA_SIZE); moveToHead(LRU_HEAD_OFFSET, ref, segment); - mmap.dump(segment.start,100); - return key; + // mmap.dump(segment.start,100); + return value; } finally { - segment.verify(); - verifyLRU(); + verify(); mmap.release(); } } @@ -311,27 +450,7 @@ private Segment segmentFor(int keyOffset) { return segments[index]; } - private int binarySearch(byte[] key, int low, int high, Segment seg) { - byte[] midval = new byte[KEY_SIZE]; - ByteBuffer kbuf = ByteBuffer.wrap(key); - - for (high -= KEY_REF_SIZE; low <= high; ) { - int midRefOffset = low + (((high - low) / KEY_REF_SIZE) >>> 1) * KEY_REF_SIZE; - short midRef = mmap.getShort(midRefOffset); - int midOffset = seg.keyOffset(midRef); - mmap.get(midval, midOffset, KEY_SIZE); - int compare = mmap.compare(midOffset,KEY_SIZE, kbuf); - - if (compare < 0) { - low = midRefOffset + KEY_REF_SIZE; - } else if (compare > 0) { - high = midRefOffset - KEY_REF_SIZE; - } else { - return midRefOffset; - } - } - return ~low; - } + private int prev(int offset ){ if (offset == LRU_HEAD_OFFSET ) { @@ -384,7 +503,7 @@ private void moveToHead(int headLinkOffest, short ref, Segment seg) { } - private void verifyLRU() { + private boolean verify() { int keyOffset = LRU_HEAD_OFFSET; int prevKeyOffset; int count = 0; @@ -392,25 +511,37 @@ private void verifyLRU() { Segment seg; CacheMetaInfo info = new CacheMetaInfo((byte)0,0L); info.setTimestamp(); CacheMetaInfo prevInfo; - while ((prevKeyOffset = prev(keyOffset)) > 0) { - seg = segmentFor(prevKeyOffset); - prevInfo = seg.data(prevKeyOffset); - if (info.timestamp() < prevInfo.timestamp()) { - throw new Error("shared cache - keys timestamp do not match"); + + try { + while ((prevKeyOffset = prev(keyOffset)) > 0) { + seg = segmentFor(prevKeyOffset); + prevInfo = seg.data(prevKeyOffset); + if (info.timestamp() < prevInfo.timestamp()) { + return false; + } + count++; + info = prevInfo; + totalSize += info.size(); + keyOffset = prevKeyOffset; + + } + if (count != this.count()) { + return false; } - count++; - totalSize += info.size(); - info = prevInfo; - keyOffset = prevKeyOffset; - } - if (count != this.count()) { - throw new Error("shared cache - not all keys are linked"); - } + if (totalSize != this.getTotal()) { + return false; + } - if (totalSize != this.getTotal()) { - throw new Error("shared cache - total size do not match"); + for (Segment segment : segments) { + if (!segment.verify()) { + return false; + } + } + } catch (Exception e) { + return false; } + return true; } diff --git a/src/util/MappedFile.java b/src/util/MappedFile.java index 55535ba..733a62c 100755 --- a/src/util/MappedFile.java +++ b/src/util/MappedFile.java @@ -38,8 +38,22 @@ public void get(byte[] buffer, int index, int length) { public void copy(int srcIndex, int targetIndex, int length) { map.position(0); - for (int i = length-1; i >= 0; i--) { - map.put(targetIndex + i, map.get(srcIndex + i)); + if (srcIndex < targetIndex ) { + for (int i = length - 1; i >= 0; i--) { + map.put(targetIndex + i, map.get(srcIndex + i)); + } + } else { + for (int i = 0; i < length; i++) { + map.put(targetIndex + i, map.get(srcIndex + i)); + } + } + } + + + public void clear(int index, int length) { + map.position(0); + for (int i = 0; i Date: Wed, 17 Feb 2016 16:32:12 -0800 Subject: [PATCH 8/8] fix MRU to LRU add clear and verify fix timestamp for IPC --- src/cache/SharedMemoryCache.java | 132 +++++++++++++++++++++---------- 1 file changed, 90 insertions(+), 42 deletions(-) diff --git a/src/cache/SharedMemoryCache.java b/src/cache/SharedMemoryCache.java index bdbf128..6538efa 100755 --- a/src/cache/SharedMemoryCache.java +++ b/src/cache/SharedMemoryCache.java @@ -22,7 +22,8 @@ public class SharedMemoryCache implements ICache { public static final int TOTAL_SIZE_OFFSET = 0; public static final int LIMIT_SIZE_OFFSET = TOTAL_SIZE_OFFSET + Long.SIZE/8; public static final int LRU_HEAD_OFFSET = LIMIT_SIZE_OFFSET + Long.SIZE/8; - public static final int HEADER_SIZE = LRU_HEAD_OFFSET + LINK_SIZE; + public static final int LRU_TAIL_OFFSET = LRU_HEAD_OFFSET + Long.SIZE/8; + public static final int HEADER_SIZE = LRU_TAIL_OFFSET + LINK_SIZE; private long getTotal() { return mmap.getLong(TOTAL_SIZE_OFFSET); @@ -46,14 +47,13 @@ private void setLimit(long limit) { public static final int KEY_REF_SIZE = Short.SIZE/8; public static final int KEY_SIZE = 8; public static final int DATA_SIZE = CacheMetaInfo.DataSize ; - private static final int ELEMENT_SIZE = KEY_SIZE+ LINK_SIZE*2 + KEY_REF_SIZE + DATA_SIZE; + private static final int ELEMENT_SIZE = KEY_SIZE+ LINK_SIZE*2 + DATA_SIZE; public static final class CacheMetaInfo { ByteBuffer bytes_; - Date d_; public static final int activateOffset = 0; public static final int sizeOffset = 1; @@ -63,7 +63,6 @@ public static final class CacheMetaInfo { public CacheMetaInfo(ByteBuffer bytes) { bytes_ = bytes; - d_ = new Date(); } public CacheMetaInfo(byte activate, long size) { @@ -71,7 +70,6 @@ public CacheMetaInfo(byte activate, long size) { bytes_.put(activate); bytes_.putLong(size); bytes_.putLong(0); - d_ = new Date(); } public byte[] bytes() { return bytes_.array(); } @@ -89,7 +87,7 @@ public long timestamp() { } public void setTimestamp() { - bytes_.putLong(timeOffset, d_.getTime()); + bytes_.putLong(timeOffset, System.nanoTime()); } @@ -147,6 +145,10 @@ public void decCount() { mmap.putShort((short)(getCount()-1), countOffset()); } + public void clear() { + mmap.putShort((short)0, countOffset()); + } + public final int maxKeyCount() { int numOfElements = elementsSpaceSize() / (ELEMENT_SIZE + KEY_REF_SIZE); @@ -172,12 +174,9 @@ private static int newLinkOffset(int keyOffset) { return keyOffset + KEY_SIZE + LINK_SIZE ; } - private static int ref(int keyOffset) { - return keyOffset + KEY_SIZE + 2*LINK_SIZE ; - } private static int dataOffset(int keyOffset) { - return keyOffset + KEY_SIZE + 2*LINK_SIZE + KEY_REF_SIZE; + return keyOffset + KEY_SIZE + 2*LINK_SIZE ; } private boolean active(int keyOffset) { @@ -247,6 +246,7 @@ private boolean verify() { return true; } + } public SharedMemoryCache(MemoryCacheConfiguration configuration) throws Exception { @@ -306,7 +306,7 @@ public CacheMetaInfo get(byte[] key) { } - public int getKeyOffset(byte[] key) { + private int getKeyOffset(byte[] key) { int keyOffset = 0; Segment segment = segmentFor(key); int keyRefOffset = segment.binarySearch(key); @@ -338,10 +338,25 @@ public boolean deactivate(byte[] key) { } + public boolean verifyOrClear() { + boolean verified = true; + mmap.lock(); + try { + if (!verify()) { + clear(); + verified = false; + } + } finally { + mmap.release(); + } + return verified; + } + + public void delete(int keyOffset) { Segment segment = segmentFor(keyOffset); mmap.get(aKey,keyOffset,KEY_SIZE); - removeLink(LRU_HEAD_OFFSET, keyOffset); + removeLink(keyOffset); updateTotal(-1*mmap.getLong(segment.dataOffset(keyOffset)+CacheMetaInfo.sizeOffset)); int keyRefOffset = segment.binarySearch(aKey); if ( keyOffset < 0 ) { @@ -355,13 +370,23 @@ public void delete(int keyOffset) { segment.decCount(); } + public void clear() { + mmap.put(0, LRU_HEAD_OFFSET); + mmap.put(0, LRU_HEAD_OFFSET); + mmap.putLong(0L, TOTAL_SIZE_OFFSET); + for (Segment segment : segments) { + segment.clear(); + } + } + + public boolean needEviction(long newSize) { return (this.getTotal() + newSize - this.getLimit()) > 0; } public boolean evict(long newSize) { - int keyOffset = LRU_HEAD_OFFSET; + int keyOffset = LRU_TAIL_OFFSET; Segment seg; - while (needEviction(newSize) && (keyOffset = prev(keyOffset)) > 0) { + while (needEviction(newSize) && (keyOffset = next(keyOffset)) > 0) { seg = segmentFor(keyOffset); if (!seg.active(keyOffset)) { delete(keyOffset); @@ -409,19 +434,24 @@ public CacheMetaInfo put(byte[] key, CacheMetaInfo value) { mmap.putShort(newRef,keyRefOffset); keyOffset = segment.keyOffset(newRef); mmap.put(key, keyOffset, KEY_SIZE); - mmap.putShort(newRef,segment.ref(keyOffset)); + mmap.put(0, Segment.oldLinkOffset(keyOffset)); + mmap.put(0, Segment.newLinkOffset(keyOffset)); updateTotal(value.size()); segment.incCount(); } short ref = mmap.getShort(keyRefOffset); mmap.put(value.bytes(), segment.dataOffset(ref),DATA_SIZE); - moveToHead(LRU_HEAD_OFFSET, ref, segment); + moveToHead(ref, segment); // mmap.dump(segment.start,100); - return value; } finally { - verify(); + if (!verify()) { + mmap.release(); + return null; + } mmap.release(); } + + return value; } @@ -452,25 +482,35 @@ private Segment segmentFor(int keyOffset) { - private int prev(int offset ){ - if (offset == LRU_HEAD_OFFSET ) { + private int next(int offset){ + if (offset == LRU_TAIL_OFFSET) { return mmap.get(offset); } else { - return mmap.get(Segment.oldLinkOffset(offset)); + return mmap.get(Segment.newLinkOffset(offset)); } } - private void removeLink(int headLinkOffest, int keyOffset) { + private void removeLink(int keyOffset) { int oldKeyOffset = mmap.get(Segment.oldLinkOffset(keyOffset)); int newKeyOffset = mmap.get(Segment.newLinkOffset(keyOffset)); - int headKeyOffset = mmap.get(headLinkOffest); + int tailKeyOffset = mmap.get(LRU_TAIL_OFFSET); + int headKeyOffset = mmap.get(LRU_HEAD_OFFSET); if (keyOffset == headKeyOffset) { if (oldKeyOffset > 0) { mmap.put(headKeyOffset, Segment.newLinkOffset(oldKeyOffset)); } - mmap.put(oldKeyOffset, headLinkOffest); - } else { + mmap.put(oldKeyOffset, LRU_HEAD_OFFSET); + } + + if (keyOffset == tailKeyOffset) { + if (newKeyOffset > 0) { + mmap.put(tailKeyOffset, Segment.oldLinkOffset(newKeyOffset)); + } + mmap.put(newKeyOffset, LRU_TAIL_OFFSET); + } + + if ( !(keyOffset == tailKeyOffset || keyOffset == headKeyOffset) ) { if (oldKeyOffset > 0) { mmap.put(newKeyOffset, Segment.newLinkOffset(oldKeyOffset)); } @@ -483,46 +523,54 @@ private void removeLink(int headLinkOffest, int keyOffset) { } - private void updateHead(int headLinkOffest, int keyOffset) { - int headKeyOffset = mmap.get(headLinkOffest); + private void updateHead(int keyOffset) { + int headKeyOffset = mmap.get(LRU_HEAD_OFFSET); if (keyOffset != headKeyOffset) { if (headKeyOffset > 0) { mmap.put(keyOffset, Segment.newLinkOffset(headKeyOffset)); mmap.put(headKeyOffset, Segment.oldLinkOffset(keyOffset)); } - mmap.put(keyOffset, headLinkOffest); + mmap.put(keyOffset, LRU_HEAD_OFFSET); + } + } + + private void updateEmptyTail(int keyOffset) { + int tailKeyOffset = mmap.get(LRU_TAIL_OFFSET); + if (tailKeyOffset == 0) { + mmap.put(keyOffset, LRU_TAIL_OFFSET); } } - private void moveToHead(int headLinkOffest, short ref, Segment seg) { + private void moveToHead(short ref, Segment seg) { int keyOffset = seg.keyOffset(ref); - if (keyOffset != mmap.get(headLinkOffest)) { - removeLink(headLinkOffest, keyOffset); - updateHead(headLinkOffest, keyOffset); + if (keyOffset != mmap.get(LRU_HEAD_OFFSET)) { + removeLink(keyOffset); + updateHead(keyOffset); } + updateEmptyTail(keyOffset); } private boolean verify() { - int keyOffset = LRU_HEAD_OFFSET; - int prevKeyOffset; + int keyOffset = LRU_TAIL_OFFSET; + int nextKeyOffset; int count = 0; long totalSize = 0; Segment seg; - CacheMetaInfo info = new CacheMetaInfo((byte)0,0L); info.setTimestamp(); - CacheMetaInfo prevInfo; + CacheMetaInfo info = new CacheMetaInfo((byte)0,0L); + CacheMetaInfo nextInfo; try { - while ((prevKeyOffset = prev(keyOffset)) > 0) { - seg = segmentFor(prevKeyOffset); - prevInfo = seg.data(prevKeyOffset); - if (info.timestamp() < prevInfo.timestamp()) { + while ((nextKeyOffset = next(keyOffset)) > 0) { + seg = segmentFor(nextKeyOffset); + nextInfo = seg.data(nextKeyOffset); + if (info.timestamp() > nextInfo.timestamp()) { return false; } count++; - info = prevInfo; + info = nextInfo; totalSize += info.size(); - keyOffset = prevKeyOffset; + keyOffset = nextKeyOffset; } if (count != this.count()) {