Skip to content
This repository has been archived by the owner on Mar 3, 2024. It is now read-only.

ДЗ№4. Тяпуев Дмитрий. Магистратура. Политех #202

Open
wants to merge 45 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 36 commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
e176c7d
all done
typuichik123 Sep 27, 2023
26f5bb7
fixed codeclimate
typuichik123 Sep 27, 2023
5a89728
fixes
typuichik123 Sep 27, 2023
b6e47b3
Merge branch 'main' into main
incubos Sep 27, 2023
269b43f
fix new test
typuichik123 Sep 27, 2023
6ecdff5
Merge branch 'polis-vk:main' into main
typuichik123 Oct 2, 2023
c162794
Homework 2 done
typuichik123 Oct 4, 2023
ca1356b
fixes
typuichik123 Oct 4, 2023
6280634
fix codeclimate
typuichik123 Oct 4, 2023
8244af7
Merge branch 'main' into main
lamtev Oct 4, 2023
e23dcff
Merge branch 'main' into homework2
incubos Oct 5, 2023
d4d384a
fix all
typuichik123 Oct 8, 2023
43569ac
fix codeclimate
typuichik123 Oct 8, 2023
89533ca
last fix codeclimate
typuichik123 Oct 8, 2023
0fca460
last fix codeclimate
typuichik123 Oct 8, 2023
ef2c3b0
чужой тест повалился по таймауту, загрузил код без изменений
typuichik123 Oct 8, 2023
57e1652
Merge remote-tracking branch 'origin/main' into homework3
typuichik123 Oct 9, 2023
9957684
Merge branch 'polis-vk:main' into main
typuichik123 Oct 11, 2023
9e8f1b3
Merge remote-tracking branch 'origin/main' into homework3
typuichik123 Oct 11, 2023
5f758a6
Merge branch 'polis-vk:main' into main
typuichik123 Oct 17, 2023
aa7dfa7
Merge remote-tracking branch 'origin/main' into homework3
typuichik123 Oct 17, 2023
30d534a
Merge branch 'polis-vk:main' into main
typuichik123 Oct 20, 2023
b750479
Merge branch 'polis-vk:main' into main
typuichik123 Oct 23, 2023
0dec53e
Merge remote-tracking branch 'origin/main' into homework3
typuichik123 Oct 26, 2023
2e5a3d4
Merge branch 'polis-vk:main' into main
typuichik123 Oct 26, 2023
d614b11
Merge remote-tracking branch 'origin/main' into homework3
typuichik123 Oct 26, 2023
3899bc4
Homework 4 done
typuichik123 Oct 27, 2023
fd682b5
fixes
typuichik123 Oct 27, 2023
e7e25ca
Merge branch 'polis-vk:main' into main
typuichik123 Oct 27, 2023
52230fd
several fixes for codeclimate
typuichik123 Oct 27, 2023
1767600
several fixes for codeclimate
typuichik123 Oct 27, 2023
06bb1b9
Merge branch 'main' into homework4
typuichik123 Oct 27, 2023
028150c
several fixes for codeclimate
typuichik123 Oct 28, 2023
273a787
several fixes for codeclimate
typuichik123 Oct 28, 2023
6b80d94
fix codeclimate
typuichik123 Oct 28, 2023
5603493
fix codeclimate
typuichik123 Oct 28, 2023
c8b8aad
Merge branch 'polis-vk:main' into main
typuichik123 Nov 8, 2023
d7da6f4
Merge branch 'polis-vk:main' into main
typuichik123 Nov 9, 2023
d759686
Merge branch 'main' into homework4
typuichik123 Nov 12, 2023
e7570ab
Merge branch 'main' into homework4
typuichik123 Nov 13, 2023
e6de16c
Some fixes
typuichik123 Nov 13, 2023
35c51db
Merge remote-tracking branch 'origin/homework4' into homework4
typuichik123 Nov 13, 2023
71ede3a
Last fixes
typuichik123 Nov 13, 2023
25a2985
Last fixes
typuichik123 Nov 13, 2023
b5487db
Merge branch 'main' into homework4
Marashov-Alexander Nov 14, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import java.lang.foreign.ValueLayout;
import java.nio.charset.StandardCharsets;

@DaoFactory(stage = 2)
@DaoFactory(stage = 4)
public class DaoFactoryImplementation implements DaoFactory.Factory<MemorySegment, Entry<MemorySegment>> {

@Override
Expand Down
145 changes: 65 additions & 80 deletions src/main/java/ru/vk/itmo/tyapuevdmitrij/InMemoryDao.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package ru.vk.itmo.tyapuevdmitrij;

import ru.vk.itmo.BaseEntry;
import ru.vk.itmo.Config;
import ru.vk.itmo.Dao;
import ru.vk.itmo.Entry;
Expand All @@ -9,19 +8,15 @@
import java.lang.foreign.Arena;
import java.lang.foreign.MemorySegment;
import java.lang.foreign.ValueLayout;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.Comparator;
import java.util.EnumSet;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;

public class InMemoryDao implements Dao<MemorySegment, Entry<MemorySegment>> {
private final Comparator<MemorySegment> memorySegmentComparator = (segment1, segment2) -> {

private static final Comparator<MemorySegment> memorySegmentComparator = (segment1, segment2) -> {
typuichik123 marked this conversation as resolved.
Show resolved Hide resolved
long offset = segment1.mismatch(segment2);
if (offset == -1) {
return 0;
Expand All @@ -34,29 +29,40 @@ public class InMemoryDao implements Dao<MemorySegment, Entry<MemorySegment>> {
}
return segment1.get(ValueLayout.JAVA_BYTE, offset) - segment2.get(ValueLayout.JAVA_BYTE, offset);
};
private final ConcurrentNavigableMap<MemorySegment, Entry<MemorySegment>> memTable
= new ConcurrentSkipListMap<>(memorySegmentComparator);
private final ConcurrentNavigableMap<MemorySegment, Entry<MemorySegment>> memTable =
new ConcurrentSkipListMap<>(memorySegmentComparator);

private final Arena readArena;
private final Path ssTablePath;
private final MemorySegment ssTable;
private static final String SS_TABLE_FILE_NAME = "ssTable";
private long ssTablesEntryQuantity;
private boolean compacted;
private final Storage storage;
typuichik123 marked this conversation as resolved.
Show resolved Hide resolved

public InMemoryDao() {
ssTablePath = null;
ssTable = null;
readArena = null;
storage = null;
}

public InMemoryDao(Config config) {
ssTablePath = config.basePath().resolve(SS_TABLE_FILE_NAME);
ssTable = getReadBufferFromSsTable();
ssTablePath = config.basePath();
readArena = Arena.ofShared();
storage = new Storage(ssTablePath, readArena);
}

@Override
public Iterator<Entry<MemorySegment>> get(MemorySegment from, MemorySegment to) {
return storage.range(getMemTableIterator(from, to), from, to, memorySegmentComparator);
}

private Iterator<Entry<MemorySegment>> getMemTableIterator(MemorySegment from, MemorySegment to) {
if (from == null && to == null) {
return memTable.values().iterator();
} else if (from == null) {
}
if (from == null) {
return memTable.headMap(to).values().iterator();
} else if (to == null) {
}
if (to == null) {
return memTable.tailMap(from).values().iterator();
}
return memTable.subMap(from, to).values().iterator();
Expand All @@ -65,10 +71,13 @@ public Iterator<Entry<MemorySegment>> get(MemorySegment from, MemorySegment to)
@Override
public Entry<MemorySegment> get(MemorySegment key) {
Entry<MemorySegment> value = memTable.get(key);
if (value != null || ssTable == null) {
if (memTable.containsKey(key) && value.value() == null) {
typuichik123 marked this conversation as resolved.
Show resolved Hide resolved
return null;
}
if (value != null || storage.ssTables == null) {
return value;
}
return getSsTableDataByKey(key);
return storage.getSsTableDataByKey(key, memorySegmentComparator);
typuichik123 marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
Expand All @@ -77,78 +86,54 @@ public void upsert(Entry<MemorySegment> entry) {
}

@Override
public void flush() throws IOException {
throw new UnsupportedOperationException("");
public void compact() throws IOException {
if (storage.ssTablesQuantity == 0 && memTable.isEmpty()) {
return;
}
Iterator<Entry<MemorySegment>> dataIterator = get(null, null);
MemorySegment buffer = NmapBuffer.getWriteBufferToSsTable(getCompactionTableByteSize(),
ssTablePath,
storage.ssTablesQuantity);
long bufferByteSize = buffer.byteSize();
buffer.set(ValueLayout.JAVA_LONG_UNALIGNED, bufferByteSize - Long.BYTES, ssTablesEntryQuantity);
long[] offsets = new long[2];
offsets[1] = bufferByteSize - Long.BYTES - ssTablesEntryQuantity * 2L * Long.BYTES;
while (dataIterator.hasNext()) {
storage.writeEntryAndIndexesToCompactionTable(buffer, dataIterator.next(), offsets);
}
StorageHelper.deleteOldSsTables(ssTablePath, storage.ssTablesQuantity);
StorageHelper.renameCompactedSsTable(ssTablePath);
compacted = true;
}

@Override
public void close() throws IOException {
MemorySegment buffer = getWriteBufferToSsTable();
writeMemTableDataToFile(buffer);
}

public long getSsTableDataByteSize() {
long ssTableDataByteSize = 0;
for (Map.Entry<MemorySegment, Entry<MemorySegment>> entry : memTable.entrySet()) {
ssTableDataByteSize += entry.getKey().byteSize();
ssTableDataByteSize += entry.getValue().value().byteSize();
if (compacted) {
typuichik123 marked this conversation as resolved.
Show resolved Hide resolved
return;
}
return ssTableDataByteSize + memTable.size() * Long.BYTES * 2L;
}

public MemorySegment getWriteBufferToSsTable() throws IOException {
MemorySegment buffer;
try (FileChannel channel = FileChannel.open(ssTablePath, EnumSet.of(
StandardOpenOption.READ,
StandardOpenOption.WRITE,
StandardOpenOption.CREATE,
StandardOpenOption.TRUNCATE_EXISTING))) {
buffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, getSsTableDataByteSize(), Arena.ofAuto());
if (memTable.isEmpty()) {
typuichik123 marked this conversation as resolved.
Show resolved Hide resolved
return;
}
return buffer;
}

public void writeMemTableDataToFile(MemorySegment buffer) {
long offset = 0;
for (Entry<MemorySegment> entry : memTable.values()) {
buffer.set(ValueLayout.JAVA_LONG_UNALIGNED, offset, entry.key().byteSize());
offset += Long.BYTES;
MemorySegment.copy(entry.key(), 0, buffer, offset, entry.key().byteSize());
offset += entry.key().byteSize();
buffer.set(ValueLayout.JAVA_LONG_UNALIGNED, offset, entry.value().byteSize());
offset += Long.BYTES;
MemorySegment.copy(entry.value(), 0, buffer, offset, entry.value().byteSize());
offset += entry.value().byteSize();
if (!readArena.scope().isAlive()) {
return;
}
}
readArena.close();
storage.save(memTable.values(), ssTablePath);

public final MemorySegment getReadBufferFromSsTable() {
MemorySegment buffer;
try (FileChannel channel = FileChannel.open(ssTablePath, StandardOpenOption.READ)) {
buffer = channel.map(FileChannel.MapMode.READ_ONLY, 0, Files.size(ssTablePath), Arena.ofAuto());
} catch (IOException e) {
buffer = null;
}
return buffer;
}

public Entry<MemorySegment> getSsTableDataByKey(MemorySegment key) {
long offset = 0;
MemorySegment valueFromSsTable;
while (offset < ssTable.byteSize()) {
long keyByteSize = ssTable.get(ValueLayout.JAVA_LONG_UNALIGNED, offset);
offset += Long.BYTES;
long keysMismatch;
keysMismatch = MemorySegment.mismatch(ssTable, offset, offset + keyByteSize, key, 0, key.byteSize());
offset += keyByteSize;
long valueByteSize = ssTable.get(ValueLayout.JAVA_LONG_UNALIGNED, offset);
offset += Long.BYTES;
valueFromSsTable = ssTable.asSlice(offset, valueByteSize);
if (keysMismatch == -1) {
return new BaseEntry<>(key, valueFromSsTable);
}
offset += valueByteSize;
private long getCompactionTableByteSize() {
Iterator<Entry<MemorySegment>> dataIterator = get(null, null);
long compactionTableByteSize = 0;
long countEntry = 0;
while (dataIterator.hasNext()) {
Entry<MemorySegment> entry = dataIterator.next();
compactionTableByteSize += entry.key().byteSize();
compactionTableByteSize += entry.value().byteSize();
countEntry++;
}
return null;
ssTablesEntryQuantity = countEntry;
return compactionTableByteSize + countEntry * 4L * Long.BYTES + Long.BYTES;
}

}
145 changes: 145 additions & 0 deletions src/main/java/ru/vk/itmo/tyapuevdmitrij/MergeIterator.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
package ru.vk.itmo.tyapuevdmitrij;

import ru.vk.itmo.Entry;

import java.lang.foreign.MemorySegment;
import java.util.Collection;
import java.util.Comparator;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.PriorityQueue;

public class MergeIterator implements Iterator<Entry<MemorySegment>> {

private final PriorityQueue<PeekIterator> priorityQueue;
private final Comparator<Entry<MemorySegment>> comparator;
PeekIterator tableIterator;

private static class PeekIterator implements Iterator<Entry<MemorySegment>> {

public final int id;
private final Iterator<Entry<MemorySegment>> delegate;
private Entry<MemorySegment> memorySegmentsEntry;

private PeekIterator(int id, Iterator<Entry<MemorySegment>> delegate) {
this.id = id;
this.delegate = delegate;
}

@Override
public boolean hasNext() {
if (memorySegmentsEntry == null) {
return delegate.hasNext();
}
return true;
}

@Override
public Entry<MemorySegment> next() {
if (!hasNext()) {
throw new NoSuchElementException();
}
Entry<MemorySegment> peek = peek();
this.memorySegmentsEntry = null;
return peek;
}

private Entry<MemorySegment> peek() {
if (memorySegmentsEntry == null) {
if (!delegate.hasNext()) {
return null;
}
memorySegmentsEntry = delegate.next();
}
return memorySegmentsEntry;
}
}

public MergeIterator(Collection<Iterator<Entry<MemorySegment>>> iterators,
Comparator<Entry<MemorySegment>> comparator) {
this.comparator = comparator;
Comparator<PeekIterator> peekComp = (o1, o2) -> comparator.compare(o1.peek(), o2.peek());
priorityQueue = new PriorityQueue<>(
iterators.size(),
peekComp.thenComparing(o -> -o.id)
);

int id = 0;
for (Iterator<Entry<MemorySegment>> iterator : iterators) {
if (iterator.hasNext()) {
priorityQueue.add(new PeekIterator(id++, iterator));
}
}
}

private PeekIterator peek() {
while (tableIterator == null) {
tableIterator = priorityQueue.poll();
if (tableIterator == null) {
return null;
}
peekFromPriorityQueue();
if (tableIterator.peek() == null) {
tableIterator = null;
continue;
}

if (skip(tableIterator.peek())) {
tableIterator.next();
if (tableIterator.hasNext()) {
priorityQueue.add(tableIterator);
}
tableIterator = null;
}
}

return tableIterator;
}

private void peekFromPriorityQueue() {
while (true) {
PeekIterator next = priorityQueue.peek();
if (next == null) {
break;
}
int compare = comparator.compare(tableIterator.peek(), next.peek());
if (compare == 0) {
PeekIterator poll = priorityQueue.poll();
if (poll != null) {
poll.next();
if (poll.hasNext()) {
priorityQueue.add(poll);
}
}
} else {
break;
}
}
}

protected boolean skip(Entry<MemorySegment> entry) {
if (entry != null) {
return entry.value() == null;
}
return false;
}

@Override
public boolean hasNext() {
return peek() != null;
}

@Override
public Entry<MemorySegment> next() {
PeekIterator entryIterator = peek();
if (entryIterator == null) {
throw new NoSuchElementException();
}
Entry<MemorySegment> next = entryIterator.next();
this.tableIterator = null;
if (entryIterator.hasNext()) {
priorityQueue.add(entryIterator);
}
return next;
}
}
Loading
Loading