Skip to content
This repository has been archived by the owner on Mar 3, 2024. It is now read-only.

Кислов Данил ИТМО ФИТиП HW5 #272

Open
wants to merge 114 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
114 commits
Select commit Hold shift + click to select a range
e805d40
ConcurrentSkipList storage implementation
HKTRp Sep 24, 2023
aed959b
Codestyle corrected
HKTRp Sep 24, 2023
ec9ca78
Code climate corrected
HKTRp Sep 25, 2023
0d718a0
IteratorWrapper removed
HKTRp Sep 26, 2023
bdc32ac
IteratorWrapper removed
HKTRp Sep 26, 2023
d8df344
Merge branch 'main' into main
incubos Sep 27, 2023
05d748f
Merge branch 'main' into main
HKTRp Sep 28, 2023
5d1f6dc
Merge branch 'main' into main
HKTRp Sep 29, 2023
6651cc8
Merge branch 'polis-vk:main' into main
HKTRp Sep 30, 2023
9e1ab30
Merge branch 'main' into main
HKTRp Oct 1, 2023
3910507
Persistent realisation
HKTRp Oct 1, 2023
5e0be62
Merge branch 'main' into persistent
HKTRp Oct 1, 2023
144575f
Code climate fixed
HKTRp Oct 1, 2023
c4f9d47
Merge remote-tracking branch 'origin/persistent' into persistent
HKTRp Oct 1, 2023
04411ac
Code climate fixed
HKTRp Oct 1, 2023
3cc122d
Code climate fixed
HKTRp Oct 1, 2023
1560af0
Code climate fixed
HKTRp Oct 1, 2023
c6777b2
Merge branch 'main' into persistent
HKTRp Oct 3, 2023
643278a
Merge branch 'main' into persistent
HKTRp Oct 3, 2023
f5921e4
Index search with file mapping implemented
HKTRp Oct 3, 2023
e80341d
Merge remote-tracking branch 'origin/persistent' into persistent
HKTRp Oct 3, 2023
584edf7
BinSearch implemented, redundant disk reading removed
HKTRp Oct 3, 2023
b604d3f
Code climate fixes
HKTRp Oct 3, 2023
cb0ac90
Merge branch 'main' into persistent
HKTRp Oct 3, 2023
59812cf
Merge branch 'main' into persistent
HKTRp Oct 4, 2023
2682e50
Comments
HKTRp Oct 4, 2023
e098abf
Merge remote-tracking branch 'origin/persistent' into persistent
HKTRp Oct 4, 2023
0797809
Comments
HKTRp Oct 4, 2023
7d65b00
Comments
HKTRp Oct 4, 2023
1fa2478
Merge branch 'main' into persistent
HKTRp Oct 4, 2023
7084791
Merge branch 'main' into persistent
HKTRp Oct 5, 2023
6fb53f6
Merge branch 'main' into persistent
HKTRp Oct 5, 2023
fdceb53
Merge branch 'polis-vk:main' into persistent
HKTRp Oct 5, 2023
44a401f
Comments
HKTRp Oct 5, 2023
96aa886
Merge branch 'main' into persistent
HKTRp Oct 5, 2023
0a4a5ee
Merge branch 'polis-vk:main' into persistent
HKTRp Oct 6, 2023
5a497f7
MergeIterator implementation
HKTRp Oct 6, 2023
2605c5c
MergeIterator implementation
HKTRp Oct 6, 2023
1c9d2a4
.
HKTRp Oct 8, 2023
d3cce20
.
HKTRp Oct 8, 2023
5a05576
Merge branch 'polis-vk:main' into mergeIterator
HKTRp Oct 8, 2023
fa9cd7e
Merge branch 'polis-vk:main' into mergeIterator
HKTRp Oct 8, 2023
9aa119e
Fixed solution
HKTRp Oct 8, 2023
b889440
Merge remote-tracking branch 'origin/mergeIterator' into mergeIterator
HKTRp Oct 8, 2023
41dcc22
Merge branch 'polis-vk:main' into mergeIterator
HKTRp Oct 9, 2023
5478382
CodeClimate fixes
HKTRp Oct 10, 2023
fc4b7cf
CodeClimate fixes
HKTRp Oct 10, 2023
ca67c1f
CodeClimate fixes
HKTRp Oct 10, 2023
fb49a3a
CodeClimate fixes
HKTRp Oct 10, 2023
f2d1ad9
Merge branch 'main' into mergeIterator
HKTRp Oct 18, 2023
6a0e13f
Common arena removed
HKTRp Oct 18, 2023
2fa1d16
Code climate fixes
HKTRp Oct 18, 2023
7f9b328
Some comments
HKTRp Oct 18, 2023
bd62074
Compaction prepare
HKTRp Oct 20, 2023
fb6f538
Merge branch 'polis-vk:main' into compaction
HKTRp Oct 20, 2023
00fba02
Merge branch 'polis-vk:main' into compaction
HKTRp Oct 20, 2023
35d49a9
Compaction
HKTRp Oct 20, 2023
ea45a28
Merge remote-tracking branch 'origin/compaction' into compaction
HKTRp Oct 20, 2023
d3b94be
Merge branch 'polis-vk:main' into compaction
HKTRp Oct 20, 2023
9ead79d
Compaction empty fix
HKTRp Oct 20, 2023
36fa97a
Merge branch 'main' into compaction
incubos Oct 20, 2023
bf25d29
Merge branch 'main' into compaction
incubos Oct 20, 2023
45521fe
Code climate fixed
HKTRp Oct 20, 2023
c7ac79b
Merge remote-tracking branch 'origin/compaction' into compaction
HKTRp Oct 20, 2023
9258098
Merge branch 'main' into compaction
HKTRp Nov 6, 2023
154b9b4
thread safety
HKTRp Nov 6, 2023
475fc0b
codeclimate fix
HKTRp Nov 6, 2023
492e147
compaction paralleled
HKTRp Nov 7, 2023
9766fa6
Merge branch 'compaction' into threadSafe
HKTRp Nov 8, 2023
adc0fcf
Merge branch 'polis-vk:main' into threadSafe
HKTRp Nov 8, 2023
1ed0d76
One-thread-for-background-operations implementation
HKTRp Nov 8, 2023
82a35d8
Codeclimate fixes
HKTRp Nov 8, 2023
de3db1a
Codeclimate fixes
HKTRp Nov 8, 2023
f491258
Codeclimate fixes
HKTRp Nov 8, 2023
c12d153
Merge branch 'main' into threadSafe
HKTRp Nov 8, 2023
a1fb12e
Removed synchronized for upsert
HKTRp Nov 8, 2023
cb8e852
.
HKTRp Nov 8, 2023
9e541b6
.
HKTRp Nov 8, 2023
fda8d1b
codeclimate
HKTRp Nov 8, 2023
ce778e8
Changed order little bit
HKTRp Nov 8, 2023
cdc54ea
Merge branch 'main' into threadSafe
atimofeyev Nov 12, 2023
eb040ca
Fixed data loss while flushing bug + some refactor
HKTRp Nov 18, 2023
bd93f01
Busy waiting removed
HKTRp Nov 18, 2023
fddf72a
Code climate
HKTRp Nov 18, 2023
32e83b9
Merge branch 'main' into threadSafe
HKTRp Nov 18, 2023
c2be639
Initial data duplication removed
HKTRp Nov 18, 2023
eee529c
Merge remote-tracking branch 'origin/threadSafe' into threadSafe
HKTRp Nov 18, 2023
5023841
Return busy waiting because it works better
HKTRp Nov 18, 2023
b8f6db2
That climate thing again
HKTRp Nov 18, 2023
37cc6ea
Merge branch 'main' into threadSafe
incubos Nov 20, 2023
2efcf62
Merge branch 'main' into threadSafe
incubos Nov 21, 2023
4ea5333
Merge branch 'main' into threadSafe
incubos Nov 21, 2023
f3c5405
Attempt to speed up dao closing
HKTRp Nov 21, 2023
92995b4
Merge remote-tracking branch 'origin/threadSafe' into threadSafe
HKTRp Nov 21, 2023
3426e3f
4th hw corrections
HKTRp Nov 22, 2023
212c52b
Merge remote-tracking branch 'origin/threadSafe' into threadSafe
HKTRp Nov 22, 2023
458f5eb
cc
HKTRp Nov 22, 2023
b65a9fc
Replace atomics with lock to speed up algorithm for many threads
HKTRp Nov 22, 2023
ccfdadf
.
HKTRp Nov 22, 2023
ae5ce82
Revert "."
HKTRp Nov 22, 2023
260a42d
Revert "Replace atomics with lock to speed up algorithm for many thre…
HKTRp Nov 22, 2023
d6c0066
minor changes
HKTRp Nov 22, 2023
4c2c5d5
IDK why but this works faster
HKTRp Nov 22, 2023
96695a0
Just try
HKTRp Nov 22, 2023
bb2445e
Locks realisation (instead of beautiful lock-free :( )
HKTRp Nov 22, 2023
7f3e7f6
code. climete.
HKTRp Nov 22, 2023
6294314
Merge branch 'main' into threadSafe
HKTRp Nov 22, 2023
8131446
Merge branch 'main' into threadSafe
HKTRp Nov 22, 2023
29c129f
Revert "code. climete."
HKTRp Nov 22, 2023
ee9dad6
Revert "Locks realisation (instead of beautiful lock-free :( )"
HKTRp Nov 22, 2023
15ade91
Revert "Revert "Locks realisation (instead of beautiful lock-free :( )""
HKTRp Nov 22, 2023
d02337b
Revert "Revert "code. climete.""
HKTRp Nov 22, 2023
e1e6995
Common arena (desperate attempt)
HKTRp Nov 22, 2023
ff49e6c
.
HKTRp Nov 22, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 0 additions & 47 deletions src/main/java/ru/vk/itmo/kislovdanil/InMemoryDao.java

This file was deleted.

40 changes: 40 additions & 0 deletions src/main/java/ru/vk/itmo/kislovdanil/MemTable.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package ru.vk.itmo.kislovdanil;

import ru.vk.itmo.Entry;

import java.lang.foreign.MemorySegment;
import java.util.Comparator;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicLong;

/* Basically, ConcurrentSkipList with counter of threads that putting value in it.
Necessary for preventing data loss while flushing.
*/
public class MemTable {
private final ConcurrentSkipListMap<MemorySegment, Entry<MemorySegment>> storage;
private final long threshold;
private final AtomicLong size = new AtomicLong(0);

private static long getEntrySize(Entry<MemorySegment> entry) {
long valueSize = entry.value() == null ? 0 : entry.value().byteSize();
return valueSize + entry.key().byteSize();
}

public MemTable(Comparator<MemorySegment> comparator, long threshold) {
this.storage = new ConcurrentSkipListMap<>(comparator);
this.threshold = threshold;
}

public boolean put(Entry<MemorySegment> entry) {
long entrySize = getEntrySize(entry);
if (size.addAndGet(entrySize) - entrySize > threshold) {
return false;
}
storage.put(entry.key(), entry);
return true;
}

public ConcurrentSkipListMap<MemorySegment, Entry<MemorySegment>> getStorage() {
return storage;
}
}
223 changes: 157 additions & 66 deletions src/main/java/ru/vk/itmo/kislovdanil/PersistentDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,51 +4,86 @@
import ru.vk.itmo.Dao;
import ru.vk.itmo.Entry;
import ru.vk.itmo.kislovdanil.exceptions.DBException;
import ru.vk.itmo.kislovdanil.exceptions.OverloadException;
import ru.vk.itmo.kislovdanil.iterators.DatabaseIterator;
import ru.vk.itmo.kislovdanil.iterators.MemTableIterator;
import ru.vk.itmo.kislovdanil.iterators.MergeIterator;
import ru.vk.itmo.kislovdanil.sstable.SSTable;

import java.io.File;
import java.io.IOException;
import java.lang.foreign.Arena;
import java.lang.foreign.MemorySegment;
import java.lang.foreign.ValueLayout;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class PersistentDao implements Dao<MemorySegment, Entry<MemorySegment>>, Iterable<Entry<MemorySegment>> {

public static final MemorySegment DELETED_VALUE = null;
private final Config config;
private final List<SSTable> tables = new ArrayList<>();
private volatile List<SSTable> tables = new ArrayList<>();
private final Comparator<MemorySegment> comparator = new MemSegComparator();
private final ConcurrentNavigableMap<MemorySegment, Entry<MemorySegment>> storage =
new ConcurrentSkipListMap<>(comparator);
private volatile MemTable memTable;
// Temporary storage in case of main storage flushing (Read only)
private volatile MemTable additionalStorage;
// In case of additional table overload while main table is flushing
private final AtomicLong nextId = new AtomicLong();
private final ExecutorService commonExecutorService = Executors.newFixedThreadPool(2);
// To prevent parallel flushing
private volatile Future<?> compcatFuture;
// To make sure that flushing in close() will be started
private volatile Future<?> flushFuture;
// Have to take before any tables modification
private final Lock compactionLock = new ReentrantLock();
// Have to take read while upsert and write while flushing (to prevent data loss)
private final ReadWriteLock upsertLock = new ReentrantReadWriteLock();
private final Arena filesArena = Arena.ofShared();

private long lastTimestamp = System.currentTimeMillis();
private long getMaxTablesId(Iterable<SSTable> tableIterable) {
long curMaxId = -1;
for (SSTable table : tableIterable) {
curMaxId = Math.max(curMaxId, table.getTableId());
}
return curMaxId;
}

public PersistentDao(Config config) throws IOException {
this.config = config;
this.memTable = new MemTable(comparator, config.flushThresholdBytes());
File basePathDirectory = new File(config.basePath().toString());
String[] ssTablesIds = basePathDirectory.list();
if (ssTablesIds == null) return;
for (String tableID : ssTablesIds) {
// SSTable constructor with rewrite=false reads table data from disk if it exists
tables.add(new SSTable(config.basePath(), comparator, Long.parseLong(tableID),
storage.values(), false));
// SSTable constructor without entries iterator reads table data from disk if it exists
tables.add(new SSTable(config.basePath(), comparator, Long.parseLong(tableID), filesArena));
}
nextId.set(getMaxTablesId(tables) + 1);
tables.sort(SSTable::compareTo);
}

@Override
public Iterator<Entry<MemorySegment>> get(MemorySegment from, MemorySegment to) {
List<DatabaseIterator> iterators = new ArrayList<>(tables.size() + 1);
List<DatabaseIterator> iterators = new ArrayList<>(tables.size() + 2);
for (SSTable table : tables) {
iterators.add(table.getRange(from, to));
}
iterators.add(new MemTableIterator(from, to));
iterators.add(new MemTableIterator(from, to, memTable, Long.MAX_VALUE));
if (additionalStorage != null) {
iterators.add(new MemTableIterator(from, to, additionalStorage, Long.MAX_VALUE - 1));
}
return new MergeIterator(iterators, comparator);
}

Expand All @@ -57,19 +92,23 @@ private static Entry<MemorySegment> wrapEntryIfDeleted(Entry<MemorySegment> entr
return entry;
}

private void updateId() {
lastTimestamp = Math.max(lastTimestamp + 1, System.currentTimeMillis());
private long getNextId() {
return nextId.getAndIncrement();
}

@Override
public Entry<MemorySegment> get(MemorySegment key) {
Entry<MemorySegment> ans = storage.get(key);
Entry<MemorySegment> ans = memTable.getStorage().get(key);
if (ans != null) return wrapEntryIfDeleted(ans);
if (additionalStorage != null) {
ans = additionalStorage.getStorage().get(key);
if (ans != null) return wrapEntryIfDeleted(ans);
}
try {
for (SSTable table : tables) {
Entry<MemorySegment> data = table.find(key);
if (data != null) {
return wrapEntryIfDeleted(data);
for (SSTable table : tables.reversed()) {
ans = table.find(key);
if (ans != null) {
return wrapEntryIfDeleted(ans);
}
}
} catch (IOException e) {
Expand All @@ -80,39 +119,122 @@ public Entry<MemorySegment> get(MemorySegment key) {

@Override
public void upsert(Entry<MemorySegment> entry) {
storage.put(entry.key(), entry);
upsertLock.readLock().lock();
try {
if (memTable.put(entry)) {
return;
}
} finally {
upsertLock.readLock().unlock();
}
flush();
upsertLock.readLock().lock();
try {
if (!memTable.put(entry)) {
throw new OverloadException(entry);
}
} finally {
upsertLock.readLock().unlock();
}
}

@Override
public void flush() throws IOException {
if (!storage.isEmpty()) {
updateId();
// SSTable constructor with rewrite=true writes MemTable data on disk deleting old data if it exists
private void makeFlush() throws IOException {
compactionLock.lock();
try {
if (additionalStorage == null) return;
// SSTable constructor with entries iterator writes MemTable data on disk deleting old data if it exists
tables.add(new SSTable(config.basePath(), comparator,
lastTimestamp, storage.values(), true));
storage.clear();
getNextId(), additionalStorage.getStorage().values().iterator(), filesArena));
additionalStorage = null;
} finally {
compactionLock.unlock();
}
}

@Override
public void flush() {
upsertLock.writeLock().lock();
try {
if (additionalStorage != null || memTable.getStorage().isEmpty()) {
return;
}
additionalStorage = memTable;
memTable = new MemTable(comparator, config.flushThresholdBytes());
flushFuture = commonExecutorService.submit(
() -> {
try {
makeFlush();
} catch (IOException e) {
throw new DBException(e);
}
});
} finally {
upsertLock.writeLock().unlock();
}
}

private void closeExecutorService(ExecutorService executorService) {
executorService.shutdown();
try {
if (!executorService.awaitTermination(30, TimeUnit.SECONDS)) {
executorService.shutdownNow();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}

@Override
public void close() throws IOException {
if (!filesArena.scope().isAlive()) {
return;
}
if (flushFuture != null) {
try {
flushFuture.get();
} catch (InterruptedException | ExecutionException e) {
Thread.currentThread().interrupt();
}
}
flush();
closeExecutorService(commonExecutorService);
filesArena.close();
}

@Override
public void compact() throws IOException {
if (!tables.isEmpty()) {
updateId();
SSTable compactedTable = new SSTable(config.basePath(), comparator, lastTimestamp,
this, true);
storage.clear();
for (SSTable table : tables) {
private void makeCompaction() throws IOException {
compactionLock.lock();
try {
if (tables.size() <= 1) return;
long compactedTableId = getNextId();
SSTable compactedTable = new SSTable(config.basePath(), comparator, compactedTableId,
new MergeIterator(tables, comparator), filesArena);
List<SSTable> oldTables = tables;
List<SSTable> newTables = new ArrayList<>();
newTables.add(compactedTable);
tables = newTables;
for (SSTable table : oldTables) {
table.deleteFromDisk();
}
tables.add(compactedTable);
} finally {
compactionLock.unlock();
}
}

@Override
public void compact() {
if (compcatFuture != null && !compcatFuture.isDone()) {
compcatFuture.cancel(false);
}
compcatFuture = commonExecutorService.submit(
() -> {
try {
makeCompaction();
} catch (IOException e) {
throw new DBException(e);
}
});
}

@Override
public Iterator<Entry<MemorySegment>> iterator() {
return get(null, null);
Expand All @@ -131,35 +253,4 @@ public int compare(MemorySegment o1, MemorySegment o2) {
return Byte.compare(o1.get(ValueLayout.JAVA_BYTE, mismatch), o2.get(ValueLayout.JAVA_BYTE, mismatch));
}
}

private class MemTableIterator implements DatabaseIterator {
Iterator<Entry<MemorySegment>> innerIter;

public MemTableIterator(MemorySegment from, MemorySegment to) {
if (from == null && to == null) {
innerIter = storage.values().iterator();
} else if (from != null && to == null) {
innerIter = storage.tailMap(from).values().iterator();
} else if (from == null) {
innerIter = storage.headMap(to).values().iterator();
} else {
innerIter = storage.subMap(from, to).values().iterator();
}
}

@Override
public long getPriority() {
return Long.MAX_VALUE;
}

@Override
public boolean hasNext() {
return innerIter.hasNext();
}

@Override
public Entry<MemorySegment> next() {
return innerIter.next();
}
}
}
Loading
Loading