diff --git a/src/main/java/ru/vk/itmo/test/tyapuevdmitrij/DaoFactoryImplementation.java b/src/main/java/ru/vk/itmo/test/tyapuevdmitrij/DaoFactoryImplementation.java index b4f89905d..7de4b3f90 100644 --- a/src/main/java/ru/vk/itmo/test/tyapuevdmitrij/DaoFactoryImplementation.java +++ b/src/main/java/ru/vk/itmo/test/tyapuevdmitrij/DaoFactoryImplementation.java @@ -4,23 +4,23 @@ import ru.vk.itmo.Dao; import ru.vk.itmo.Entry; import ru.vk.itmo.test.DaoFactory; -import ru.vk.itmo.tyapuevdmitrij.InMemoryDao; +import ru.vk.itmo.tyapuevdmitrij.MemorySegmentDao; import java.lang.foreign.MemorySegment; import java.lang.foreign.ValueLayout; import java.nio.charset.StandardCharsets; -@DaoFactory(stage = 2) +@DaoFactory(stage = 4) public class DaoFactoryImplementation implements DaoFactory.Factory> { @Override public Dao> createDao() { - return new InMemoryDao(); + return new MemorySegmentDao(); } @Override public Dao> createDao(Config config) { - return new InMemoryDao(config); + return new MemorySegmentDao(config); } @Override diff --git a/src/main/java/ru/vk/itmo/tyapuevdmitrij/FilesException.java b/src/main/java/ru/vk/itmo/tyapuevdmitrij/FilesException.java new file mode 100644 index 000000000..c84a4f84b --- /dev/null +++ b/src/main/java/ru/vk/itmo/tyapuevdmitrij/FilesException.java @@ -0,0 +1,7 @@ +package ru.vk.itmo.tyapuevdmitrij; + +public class FilesException extends RuntimeException { + public FilesException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/src/main/java/ru/vk/itmo/tyapuevdmitrij/InMemoryDao.java b/src/main/java/ru/vk/itmo/tyapuevdmitrij/InMemoryDao.java deleted file mode 100644 index 8fa8b1ae3..000000000 --- a/src/main/java/ru/vk/itmo/tyapuevdmitrij/InMemoryDao.java +++ /dev/null @@ -1,154 +0,0 @@ -package ru.vk.itmo.tyapuevdmitrij; - -import ru.vk.itmo.BaseEntry; -import ru.vk.itmo.Config; -import ru.vk.itmo.Dao; -import ru.vk.itmo.Entry; - -import java.io.IOException; -import java.lang.foreign.Arena; -import java.lang.foreign.MemorySegment; -import java.lang.foreign.ValueLayout; -import java.nio.channels.FileChannel; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.StandardOpenOption; -import java.util.Comparator; -import java.util.EnumSet; -import java.util.Iterator; -import java.util.Map; -import java.util.concurrent.ConcurrentNavigableMap; -import java.util.concurrent.ConcurrentSkipListMap; - -public class InMemoryDao implements Dao> { - private final Comparator memorySegmentComparator = (segment1, segment2) -> { - long offset = segment1.mismatch(segment2); - if (offset == -1) { - return 0; - } - if (offset == segment1.byteSize()) { - return -1; - } - if (offset == segment2.byteSize()) { - return 1; - } - return segment1.get(ValueLayout.JAVA_BYTE, offset) - segment2.get(ValueLayout.JAVA_BYTE, offset); - }; - private final ConcurrentNavigableMap> memTable - = new ConcurrentSkipListMap<>(memorySegmentComparator); - private final Path ssTablePath; - private final MemorySegment ssTable; - private static final String SS_TABLE_FILE_NAME = "ssTable"; - - public InMemoryDao() { - ssTablePath = null; - ssTable = null; - } - - public InMemoryDao(Config config) { - ssTablePath = config.basePath().resolve(SS_TABLE_FILE_NAME); - ssTable = getReadBufferFromSsTable(); - } - - @Override - public Iterator> get(MemorySegment from, MemorySegment to) { - if (from == null && to == null) { - return memTable.values().iterator(); - } else if (from == null) { - return memTable.headMap(to).values().iterator(); - } else if (to == null) { - return memTable.tailMap(from).values().iterator(); - } - return memTable.subMap(from, to).values().iterator(); - } - - @Override - public Entry get(MemorySegment key) { - Entry value = memTable.get(key); - if (value != null || ssTable == null) { - return value; - } - return getSsTableDataByKey(key); - } - - @Override - public void upsert(Entry entry) { - memTable.put(entry.key(), entry); - } - - @Override - public void flush() throws IOException { - throw new UnsupportedOperationException(""); - } - - @Override - public void close() throws IOException { - MemorySegment buffer = getWriteBufferToSsTable(); - writeMemTableDataToFile(buffer); - } - - public long getSsTableDataByteSize() { - long ssTableDataByteSize = 0; - for (Map.Entry> entry : memTable.entrySet()) { - ssTableDataByteSize += entry.getKey().byteSize(); - ssTableDataByteSize += entry.getValue().value().byteSize(); - } - return ssTableDataByteSize + memTable.size() * Long.BYTES * 2L; - } - - public MemorySegment getWriteBufferToSsTable() throws IOException { - MemorySegment buffer; - try (FileChannel channel = FileChannel.open(ssTablePath, EnumSet.of( - StandardOpenOption.READ, - StandardOpenOption.WRITE, - StandardOpenOption.CREATE, - StandardOpenOption.TRUNCATE_EXISTING))) { - buffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, getSsTableDataByteSize(), Arena.ofAuto()); - } - return buffer; - } - - public void writeMemTableDataToFile(MemorySegment buffer) { - long offset = 0; - for (Entry entry : memTable.values()) { - buffer.set(ValueLayout.JAVA_LONG_UNALIGNED, offset, entry.key().byteSize()); - offset += Long.BYTES; - MemorySegment.copy(entry.key(), 0, buffer, offset, entry.key().byteSize()); - offset += entry.key().byteSize(); - buffer.set(ValueLayout.JAVA_LONG_UNALIGNED, offset, entry.value().byteSize()); - offset += Long.BYTES; - MemorySegment.copy(entry.value(), 0, buffer, offset, entry.value().byteSize()); - offset += entry.value().byteSize(); - } - } - - public final MemorySegment getReadBufferFromSsTable() { - MemorySegment buffer; - try (FileChannel channel = FileChannel.open(ssTablePath, StandardOpenOption.READ)) { - buffer = channel.map(FileChannel.MapMode.READ_ONLY, 0, Files.size(ssTablePath), Arena.ofAuto()); - } catch (IOException e) { - buffer = null; - } - return buffer; - } - - public Entry getSsTableDataByKey(MemorySegment key) { - long offset = 0; - MemorySegment valueFromSsTable; - while (offset < ssTable.byteSize()) { - long keyByteSize = ssTable.get(ValueLayout.JAVA_LONG_UNALIGNED, offset); - offset += Long.BYTES; - long keysMismatch; - keysMismatch = MemorySegment.mismatch(ssTable, offset, offset + keyByteSize, key, 0, key.byteSize()); - offset += keyByteSize; - long valueByteSize = ssTable.get(ValueLayout.JAVA_LONG_UNALIGNED, offset); - offset += Long.BYTES; - valueFromSsTable = ssTable.asSlice(offset, valueByteSize); - if (keysMismatch == -1) { - return new BaseEntry<>(key, valueFromSsTable); - } - offset += valueByteSize; - } - return null; - } -} diff --git a/src/main/java/ru/vk/itmo/tyapuevdmitrij/MemorySegmentDao.java b/src/main/java/ru/vk/itmo/tyapuevdmitrij/MemorySegmentDao.java new file mode 100644 index 000000000..12b7086b0 --- /dev/null +++ b/src/main/java/ru/vk/itmo/tyapuevdmitrij/MemorySegmentDao.java @@ -0,0 +1,170 @@ +package ru.vk.itmo.tyapuevdmitrij; + +import ru.vk.itmo.Config; +import ru.vk.itmo.Dao; +import ru.vk.itmo.Entry; + +import java.io.IOException; +import java.lang.foreign.Arena; +import java.lang.foreign.MemorySegment; +import java.lang.foreign.ValueLayout; +import java.nio.file.Path; +import java.util.Collections; +import java.util.Comparator; +import java.util.Iterator; +import java.util.concurrent.ConcurrentNavigableMap; +import java.util.concurrent.ConcurrentSkipListMap; + +public class MemorySegmentDao implements Dao> { + private static final Comparator MEMORY_SEGMENT_COMPARATOR = (segment1, segment2) -> { + long offset = segment1.mismatch(segment2); + if (offset == -1) { + return 0; + } + if (offset == segment1.byteSize()) { + return -1; + } + if (offset == segment2.byteSize()) { + return 1; + } + return segment1.get(ValueLayout.JAVA_BYTE, offset) - segment2.get(ValueLayout.JAVA_BYTE, offset); + }; + private final ConcurrentNavigableMap> memTable = + new ConcurrentSkipListMap<>(MEMORY_SEGMENT_COMPARATOR); + + private final Arena readArena; + private final Path ssTablePath; + private long ssTablesEntryQuantity; + private boolean compacted; + private final Storage storage; + + public MemorySegmentDao() { + ssTablePath = null; + readArena = null; + storage = null; + } + + public MemorySegmentDao(Config config) { + ssTablePath = config.basePath(); + readArena = Arena.ofShared(); + storage = new Storage(ssTablePath, readArena); + } + + @Override + public Iterator> get(MemorySegment from, MemorySegment to) { + return storage.range(getMemTableIterator(from, to), from, to, MEMORY_SEGMENT_COMPARATOR); + } + + private Iterator> getMemTableIterator(MemorySegment from, MemorySegment to) { + if (from == null && to == null) { + return memTable.values().iterator(); + } + if (from == null) { + return memTable.headMap(to).values().iterator(); + } + if (to == null) { + return memTable.tailMap(from).values().iterator(); + } + return memTable.subMap(from, to).values().iterator(); + } + + @Override + public Entry get(MemorySegment key) { + Entry value = memTable.get(key); + if (value != null && value.value() == null) { + return null; + } + if (value != null || storage.ssTables == null) { + return value; + } + Iterator> iterator = storage.range(Collections.emptyIterator(), + key, + null, + MEMORY_SEGMENT_COMPARATOR); + + if (!iterator.hasNext()) { + return null; + } + Entry next = iterator.next(); + if (MEMORY_SEGMENT_COMPARATOR.compare(next.key(), key) == 0) { + return next; + } + return null; + } + + @Override + public void upsert(Entry entry) { + memTable.put(entry.key(), entry); + } + + @Override + public void compact() throws IOException { + if (storage.ssTablesQuantity == 0 && memTable.isEmpty()) { + return; + } + Iterator> dataIterator = get(null, null); + Arena writeArena = Arena.ofConfined(); + MemorySegment buffer = NmapBuffer.getWriteBufferToSsTable(getCompactionTableByteSize(), + ssTablePath, + storage.ssTablesQuantity, + writeArena, + true); + long bufferByteSize = buffer.byteSize(); + buffer.set(ValueLayout.JAVA_LONG_UNALIGNED, bufferByteSize - Long.BYTES, ssTablesEntryQuantity); + long dataOffset = 0; + long indexOffset = bufferByteSize - Long.BYTES - ssTablesEntryQuantity * 2L * Long.BYTES; + while (dataIterator.hasNext()) { + Entry entry = dataIterator.next(); + buffer.set(ValueLayout.JAVA_LONG_UNALIGNED, indexOffset, dataOffset); + indexOffset += Long.BYTES; + buffer.set(ValueLayout.JAVA_LONG_UNALIGNED, dataOffset, entry.key().byteSize()); + dataOffset += Long.BYTES; + MemorySegment.copy(entry.key(), 0, buffer, dataOffset, entry.key().byteSize()); + dataOffset += entry.key().byteSize(); + buffer.set(ValueLayout.JAVA_LONG_UNALIGNED, indexOffset, dataOffset); + indexOffset += Long.BYTES; + buffer.set(ValueLayout.JAVA_LONG_UNALIGNED, dataOffset, entry.value().byteSize()); + dataOffset += Long.BYTES; + MemorySegment.copy(entry.value(), 0, buffer, dataOffset, entry.value().byteSize()); + dataOffset += entry.value().byteSize(); + } + if (writeArena.scope().isAlive()) { + writeArena.close(); + } + StorageHelper.deleteOldSsTables(ssTablePath); + StorageHelper.renameCompactedSsTable(ssTablePath); + compacted = true; + } + + @Override + public void close() throws IOException { + if (compacted) { + readArena.close(); + return; + } + if (memTable.isEmpty()) { + readArena.close(); + return; + } + if (!readArena.scope().isAlive()) { + return; + } + readArena.close(); + storage.save(memTable.values(), ssTablePath); + + } + + private long getCompactionTableByteSize() { + Iterator> dataIterator = get(null, null); + long compactionTableByteSize = 0; + long countEntry = 0; + while (dataIterator.hasNext()) { + Entry entry = dataIterator.next(); + compactionTableByteSize += entry.key().byteSize(); + compactionTableByteSize += entry.value().byteSize(); + countEntry++; + } + ssTablesEntryQuantity = countEntry; + return compactionTableByteSize + countEntry * 4L * Long.BYTES + Long.BYTES; + } +} diff --git a/src/main/java/ru/vk/itmo/tyapuevdmitrij/MergeIterator.java b/src/main/java/ru/vk/itmo/tyapuevdmitrij/MergeIterator.java new file mode 100644 index 000000000..a82adcc43 --- /dev/null +++ b/src/main/java/ru/vk/itmo/tyapuevdmitrij/MergeIterator.java @@ -0,0 +1,145 @@ +package ru.vk.itmo.tyapuevdmitrij; + +import ru.vk.itmo.Entry; + +import java.lang.foreign.MemorySegment; +import java.util.Collection; +import java.util.Comparator; +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.PriorityQueue; + +public class MergeIterator implements Iterator> { + + private final PriorityQueue priorityQueue; + private final Comparator> comparator; + PeekIterator tableIterator; + + private static class PeekIterator implements Iterator> { + + public final int id; + private final Iterator> delegate; + private Entry memorySegmentsEntry; + + private PeekIterator(int id, Iterator> delegate) { + this.id = id; + this.delegate = delegate; + } + + @Override + public boolean hasNext() { + if (memorySegmentsEntry == null) { + return delegate.hasNext(); + } + return true; + } + + @Override + public Entry next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + Entry peek = peek(); + this.memorySegmentsEntry = null; + return peek; + } + + private Entry peek() { + if (memorySegmentsEntry == null) { + if (!delegate.hasNext()) { + return null; + } + memorySegmentsEntry = delegate.next(); + } + return memorySegmentsEntry; + } + } + + public MergeIterator(Collection>> iterators, + Comparator> comparator) { + this.comparator = comparator; + Comparator peekComp = (o1, o2) -> comparator.compare(o1.peek(), o2.peek()); + priorityQueue = new PriorityQueue<>( + iterators.size(), + peekComp.thenComparing(o -> -o.id) + ); + + int id = 0; + for (Iterator> iterator : iterators) { + if (iterator.hasNext()) { + priorityQueue.add(new PeekIterator(id++, iterator)); + } + } + } + + private PeekIterator peek() { + while (tableIterator == null) { + tableIterator = priorityQueue.poll(); + if (tableIterator == null) { + return null; + } + peekFromPriorityQueue(); + if (tableIterator.peek() == null) { + tableIterator = null; + continue; + } + + if (skip(tableIterator.peek())) { + tableIterator.next(); + if (tableIterator.hasNext()) { + priorityQueue.add(tableIterator); + } + tableIterator = null; + } + } + + return tableIterator; + } + + private void peekFromPriorityQueue() { + while (true) { + PeekIterator next = priorityQueue.peek(); + if (next == null) { + break; + } + int compare = comparator.compare(tableIterator.peek(), next.peek()); + if (compare == 0) { + PeekIterator poll = priorityQueue.poll(); + if (poll != null) { + poll.next(); + if (poll.hasNext()) { + priorityQueue.add(poll); + } + } + } else { + break; + } + } + } + + protected boolean skip(Entry entry) { + if (entry != null) { + return entry.value() == null; + } + return false; + } + + @Override + public boolean hasNext() { + return peek() != null; + } + + @Override + public Entry next() { + PeekIterator entryIterator = peek(); + if (entryIterator == null) { + throw new NoSuchElementException(); + } + Entry next = entryIterator.next(); + this.tableIterator = null; + if (entryIterator.hasNext()) { + priorityQueue.add(entryIterator); + } + return next; + } +} diff --git a/src/main/java/ru/vk/itmo/tyapuevdmitrij/NmapBuffer.java b/src/main/java/ru/vk/itmo/tyapuevdmitrij/NmapBuffer.java new file mode 100644 index 000000000..e38322147 --- /dev/null +++ b/src/main/java/ru/vk/itmo/tyapuevdmitrij/NmapBuffer.java @@ -0,0 +1,57 @@ +package ru.vk.itmo.tyapuevdmitrij; + +import java.io.IOException; +import java.lang.foreign.Arena; +import java.lang.foreign.MemorySegment; +import java.nio.channels.FileChannel; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.util.EnumSet; + +public final class NmapBuffer { + + private NmapBuffer() { + throw new IllegalStateException("Utility class"); + } + + static MemorySegment getReadBufferFromSsTable(Path ssTablePath, Arena readArena) { + MemorySegment buffer; + boolean created = false; + try (FileChannel channel = FileChannel.open(ssTablePath, StandardOpenOption.READ)) { + buffer = channel.map(FileChannel.MapMode.READ_ONLY, 0, Files.size(ssTablePath), readArena); + created = true; + } catch (IOException e) { + buffer = null; + } finally { + if (!created) { + readArena.close(); + } + } + return buffer; + } + + static MemorySegment getWriteBufferToSsTable(Long writeBytes, + Path ssTablePath, + int ssTablesQuantity, + Arena writeArena, + boolean compactionFlag) throws IOException { + MemorySegment buffer; + Path path; + if (compactionFlag) { + path = ssTablePath.resolve(StorageHelper.COMPACTED_FILE_NAME); + } else { + path = ssTablePath.resolve(StorageHelper.SS_TABLE_FILE_NAME + ssTablesQuantity); + } + try (FileChannel channel = FileChannel.open(path, EnumSet.of(StandardOpenOption.READ, + StandardOpenOption.WRITE, + StandardOpenOption.CREATE, + StandardOpenOption.TRUNCATE_EXISTING))) { + buffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, + writeBytes, writeArena); + } + return buffer; + } +} + + diff --git a/src/main/java/ru/vk/itmo/tyapuevdmitrij/Storage.java b/src/main/java/ru/vk/itmo/tyapuevdmitrij/Storage.java new file mode 100644 index 000000000..aa921cd8e --- /dev/null +++ b/src/main/java/ru/vk/itmo/tyapuevdmitrij/Storage.java @@ -0,0 +1,177 @@ +package ru.vk.itmo.tyapuevdmitrij; + +import ru.vk.itmo.BaseEntry; +import ru.vk.itmo.Entry; + +import java.io.IOException; +import java.lang.foreign.Arena; +import java.lang.foreign.MemorySegment; +import java.lang.foreign.ValueLayout; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; + +public class Storage { + private final StorageHelper storageHelper; + protected final int ssTablesQuantity; + protected List ssTables; + + public Storage(Path ssTablePath, Arena readArena) { + storageHelper = new StorageHelper(); + ssTablesQuantity = StorageHelper.findSsTablesQuantity(ssTablePath); + ssTables = new ArrayList<>(ssTablesQuantity); + for (int i = 0; i < ssTablesQuantity; i++) { + Path path = ssTablePath.resolve(StorageHelper.SS_TABLE_FILE_NAME + i); + ssTables.add(NmapBuffer.getReadBufferFromSsTable(path, readArena)); + } + } + + public void save(Iterable> memTableEntries, Path ssTablePath) throws IOException { + Arena writeArena = Arena.ofConfined(); + MemorySegment buffer = NmapBuffer.getWriteBufferToSsTable(storageHelper.getSsTableDataByteSize(memTableEntries), + ssTablePath, + ssTablesQuantity, + writeArena, + false); + writeMemTableDataToFile(buffer, memTableEntries); + if (writeArena.scope().isAlive()) { + writeArena.close(); + } + } + + private void writeMemTableDataToFile(MemorySegment buffer, Iterable> memTableEntries) { + long offset = 0; + long bufferByteSize = buffer.byteSize(); + long writeIndexPosition = bufferByteSize - storageHelper.memTableEntriesCount * 2L * Long.BYTES - Long.BYTES; + //write to the end of file size of memTable + buffer.set(ValueLayout.JAVA_LONG_UNALIGNED, bufferByteSize - Long.BYTES, storageHelper.memTableEntriesCount); + for (Entry entry : memTableEntries) { + buffer.set(ValueLayout.JAVA_LONG_UNALIGNED, offset, entry.key().byteSize()); + // write keyByteSizeOffsetPosition to the end of buffer + buffer.set(ValueLayout.JAVA_LONG_UNALIGNED, writeIndexPosition, offset); + writeIndexPosition += Long.BYTES; + offset += Long.BYTES; + MemorySegment.copy(entry.key(), 0, buffer, offset, entry.key().byteSize()); + offset += entry.key().byteSize(); + // write valueByteSizeOffsetPosition to the end of buffer next to the keyByteSizeOffsetPosition + buffer.set(ValueLayout.JAVA_LONG_UNALIGNED, writeIndexPosition, offset); + writeIndexPosition += Long.BYTES; + if (entry.value() == null) { + buffer.set(ValueLayout.JAVA_LONG_UNALIGNED, offset, tombstone(offset)); + offset += Long.BYTES; + } else { + buffer.set(ValueLayout.JAVA_LONG_UNALIGNED, offset, entry.value().byteSize()); + offset += Long.BYTES; + MemorySegment.copy(entry.value(), 0, buffer, offset, entry.value().byteSize()); + offset += entry.value().byteSize(); + } + } + } + + private long getSsTableIndexByKey(MemorySegment ssTable, MemorySegment key, + Comparator memorySegmentComparator) { + long memTableSize = ssTable.get(ValueLayout.JAVA_LONG_UNALIGNED, ssTable.byteSize() - Long.BYTES); + long left = 0; + long right = memTableSize - 1L; + long lastKeyIndexOffset = ssTable.byteSize() - 3 * Long.BYTES; + while (left <= right) { + long mid = (left + right) >>> 1; + long midOffset = lastKeyIndexOffset - (memTableSize - 1L) * Long.BYTES * 2L + mid * 2L * Long.BYTES; + MemorySegment readKey = getKeyByOffset(ssTable, midOffset); + int res = memorySegmentComparator.compare(readKey, key); + if (res == 0) { + return mid; + } + if (res > 0) { + right = mid - 1; + } else { + left = mid + 1; + } + } + return left; + } + + private Entry getSsTableEntryByIndex(MemorySegment ssTable, long index) { + long memTableSize = ssTable.get(ValueLayout.JAVA_LONG_UNALIGNED, ssTable.byteSize() - Long.BYTES); + if (index > memTableSize - 1 || index < 0) { + return null; + } + long keyIndexOffset = ssTable.byteSize() - 3 * Long.BYTES + - (memTableSize - 1L) * Long.BYTES * 2L + index * 2L * Long.BYTES; + MemorySegment readKey = getKeyByOffset(ssTable, keyIndexOffset); + MemorySegment readValue = getValueByOffset(ssTable, keyIndexOffset + Long.BYTES); + return new BaseEntry<>(readKey, readValue); + } + + private MemorySegment getKeyByOffset(MemorySegment ssTable, long offset) { + long keyByteSizeOffset = ssTable.get(ValueLayout.JAVA_LONG_UNALIGNED, offset); + long keyByteSize = ssTable.get(ValueLayout.JAVA_LONG_UNALIGNED, keyByteSizeOffset); + long keyOffset = keyByteSizeOffset + Long.BYTES; + return ssTable.asSlice(keyOffset, keyByteSize); + } + + private MemorySegment getValueByOffset(MemorySegment ssTable, long offset) { + long valueByteSizeOffset = ssTable.get(ValueLayout.JAVA_LONG_UNALIGNED, offset); + long valueByteSize = ssTable.get(ValueLayout.JAVA_LONG_UNALIGNED, valueByteSizeOffset); + if (valueByteSize < 0) { + return null; + } + long valueOffset = valueByteSizeOffset + Long.BYTES; + return ssTable.asSlice(valueOffset, valueByteSize); + } + + public Iterator> range( + Iterator> firstIterator, + MemorySegment from, + MemorySegment to, Comparator memorySegmentComparator) { + List>> iterators = new ArrayList<>(ssTablesQuantity + 1); + for (MemorySegment memorySegment : ssTables) { + iterators.add(iterator(memorySegment, from, to, memorySegmentComparator)); + } + iterators.add(firstIterator); + + return new MergeIterator(iterators, Comparator.comparing(Entry::key, memorySegmentComparator)); + } + + private Iterator> iterator(MemorySegment ssTable, MemorySegment from, MemorySegment to, + Comparator memorySegmentComparator) { + long recordIndexFrom = from == null ? 0 : normalize(getSsTableIndexByKey(ssTable, + from, + memorySegmentComparator)); + long memTableSize = ssTable.get(ValueLayout.JAVA_LONG_UNALIGNED, ssTable.byteSize() - Long.BYTES); + long recordIndexTo = to == null ? memTableSize : normalize(getSsTableIndexByKey(ssTable, + to, + memorySegmentComparator)); + + return new Iterator<>() { + long index = recordIndexFrom; + + @Override + public boolean hasNext() { + return index < recordIndexTo; + } + + @Override + public Entry next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + Entry entry = getSsTableEntryByIndex(ssTable, index); + index++; + return entry; + } + }; + } + + private static long tombstone(long offset) { + return 1L << 63 | offset; + } + + private static long normalize(long value) { + return value & ~(1L << 63); + } + +} diff --git a/src/main/java/ru/vk/itmo/tyapuevdmitrij/StorageHelper.java b/src/main/java/ru/vk/itmo/tyapuevdmitrij/StorageHelper.java new file mode 100644 index 000000000..72ea38adf --- /dev/null +++ b/src/main/java/ru/vk/itmo/tyapuevdmitrij/StorageHelper.java @@ -0,0 +1,81 @@ +package ru.vk.itmo.tyapuevdmitrij; + +import ru.vk.itmo.Entry; + +import java.io.File; +import java.io.IOException; +import java.lang.foreign.MemorySegment; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardCopyOption; + +public class StorageHelper { + protected static final String SS_TABLE_FILE_NAME = "ssTable"; + + protected static final String COMPACTED_FILE_NAME = "compact"; + protected long memTableEntriesCount; + + static int findSsTablesQuantity(Path ssTablePath) { + File[] files = getDirectoryFiles(ssTablePath); + if (files.length == 0) { + return 0; + } + long countSsTables = 0L; + for (File file : files) { + if (file.isFile() && file.getName().contains(SS_TABLE_FILE_NAME)) { + countSsTables++; + } + } + return (int) countSsTables; + } + + static void deleteOldSsTables(Path ssTablePath) { + File[] files = getDirectoryFiles(ssTablePath); + for (File file : files) { + if (file.getName().contains(SS_TABLE_FILE_NAME)) { + try { + Files.delete(file.toPath()); + } catch (IOException e) { + throw new FilesException("Can't delete file " + file.toPath(), e); + } + } + } + } + + private static File[] getDirectoryFiles(Path ssTablePath) { + File directory = new File(ssTablePath.toUri()); + if (!directory.exists() || !directory.isDirectory()) { + return new File[0]; + } + return directory.listFiles(); + } + + static void renameCompactedSsTable(Path ssTablePath) { + Path compactionFile = ssTablePath.resolve(COMPACTED_FILE_NAME); + Path newCompactionFile = ssTablePath.resolve(SS_TABLE_FILE_NAME + 0); + try { + Files.move( + compactionFile, + newCompactionFile, + StandardCopyOption.ATOMIC_MOVE, + StandardCopyOption.REPLACE_EXISTING + ); + } catch (IOException e) { + throw new FilesException("Can't rename file", e); + } + } + + protected long getSsTableDataByteSize(Iterable> memTableEntries) { + long ssTableDataByteSize = 0; + long entriesCount = 0; + for (Entry entry : memTableEntries) { + ssTableDataByteSize += entry.key().byteSize(); + if (entry.value() != null) { + ssTableDataByteSize += entry.value().byteSize(); + } + entriesCount++; + } + memTableEntriesCount = entriesCount; + return ssTableDataByteSize + entriesCount * Long.BYTES * 4L + Long.BYTES; + } +}