Skip to content

Commit

Permalink
update: deleteMap file serialization design
Browse files Browse the repository at this point in the history
  • Loading branch information
Zouxxyy committed Feb 20, 2024
1 parent 95faa78 commit 8a6bc12
Show file tree
Hide file tree
Showing 7 changed files with 241 additions and 117 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -156,38 +156,34 @@ public void deleteManifest(String indexManifest) {
indexManifestFile.delete(indexManifest);
}

public IndexFileMeta writeDeleteMapIndex(Map<String, DeleteIndex> deleteMap) {
String file;
try {
file = deleteMapIndex.write(deleteMap);
} catch (IOException e) {
throw new UncheckedIOException(e);
public Map<String, long[]> readDeleteIndexBytesOffsets(IndexFileMeta deleteMapFile) {
if (!deleteMapFile.indexType().equals(DELETE_MAP_INDEX)) {
throw new IllegalArgumentException(
"Input file is not delete map index: " + deleteMapFile.indexType());
}
return new IndexFileMeta(
DELETE_MAP_INDEX, file, deleteMapIndex.fileSize(file), deleteMap.size());
return deleteMapIndex.readDeleteIndexBytesOffsets(deleteMapFile.fileName());
}

public Map<String, DeleteIndex> readDeleteMapIndex(IndexFileMeta deleteMapFile) {
public Map<String, DeleteIndex> readAllDeleteIndex(
IndexFileMeta deleteMapFile, Map<String, long[]> deleteIndexBytesOffsets) {
if (!deleteMapFile.indexType().equals(DELETE_MAP_INDEX)) {
throw new IllegalArgumentException(
"Input file is not delete map index: " + deleteMapFile.indexType());
}
try {
return deleteMapIndex.read(deleteMapFile.fileName());
} catch (IOException e) {
throw new UncheckedIOException(e);
}
return deleteMapIndex.readAllDeleteIndex(deleteMapFile.fileName(), deleteIndexBytesOffsets);
}

public Optional<DeleteIndex> readDeleteIndex(IndexFileMeta deleteMapFile, String fileName) {
public DeleteIndex readDeleteIndex(IndexFileMeta deleteMapFile, long[] deleteIndexBytesOffset) {
if (!deleteMapFile.indexType().equals(DELETE_MAP_INDEX)) {
throw new IllegalArgumentException(
"Input file is not delete map index" + deleteMapFile.indexType());
}
try {
return deleteMapIndex.read(deleteMapFile.fileName(), fileName);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
return deleteMapIndex.readDeleteIndex(deleteMapFile.fileName(), deleteIndexBytesOffset);
}

public IndexFileMeta writeDeleteMapIndex(Map<String, DeleteIndex> deleteMap) {
String file = deleteMapIndex.write(deleteMap);
return new IndexFileMeta(
DELETE_MAP_INDEX, file, deleteMapIndex.fileSize(file), deleteMap.size());
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,17 @@

import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.utils.JsonSerdeUtil;
import org.apache.paimon.fs.SeekableInputStream;
import org.apache.paimon.utils.PathFactory;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Optional;

/** todo: add an abstract file. */
/** x. */
public class DeleteMapIndexFile {

public static final String DELETE_MAP_INDEX = "DELETE_MAP";
Expand All @@ -51,30 +51,99 @@ public long fileSize(String fileName) {
}
}

public Map<String, DeleteIndex> read(String fileName) throws IOException {
String json = fileIO.readFileUtf8(pathFactory.toPath(fileName));
LinkedHashMap<String, String> map =
(LinkedHashMap<String, String>) JsonSerdeUtil.fromJson(json, Map.class);
public Map<String, long[]> readDeleteIndexBytesOffsets(String fileName) {
try (DataInputStream dataInputStream =
new DataInputStream(fileIO.newInputStream(pathFactory.toPath(fileName)))) {
int size = dataInputStream.readInt();
int maxKeyLength = dataInputStream.readInt();
Map<String, long[]> map = new HashMap<>();
for (int i = 0; i < size; i++) {
byte[] bytes = new byte[maxKeyLength];
dataInputStream.read(bytes);
String key = new String(bytes).trim();
long start = dataInputStream.readLong();
long length = dataInputStream.readLong();
map.put(key, new long[] {start, length});
}
return map;
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

public Map<String, DeleteIndex> readAllDeleteIndex(
String fileName, Map<String, long[]> deleteIndexBytesOffsets) {
Map<String, DeleteIndex> deleteIndexMap = new HashMap<>();
for (Map.Entry<String, String> entry : map.entrySet()) {
byte[] bytes = java.util.Base64.getDecoder().decode(entry.getValue());
deleteIndexMap.put(entry.getKey(), new BitmapDeleteIndex().deserializeFromBytes(bytes));
try (SeekableInputStream inputStream =
fileIO.newInputStream(pathFactory.toPath(fileName))) {
for (Map.Entry<String, long[]> entry : deleteIndexBytesOffsets.entrySet()) {
long[] offset = entry.getValue();
inputStream.seek(offset[0]);
byte[] bytes = new byte[(int) offset[1]];
inputStream.read(bytes);
deleteIndexMap.put(
entry.getKey(), new BitmapDeleteIndex().deserializeFromBytes(bytes));
}
} catch (IOException e) {
throw new UncheckedIOException(e);
}
return deleteIndexMap;
}

public Optional<DeleteIndex> read(String fileName, String dataFileName) throws IOException {
// todo: support only read specific file's bitmap
return Optional.ofNullable(read(fileName).get(dataFileName));
public DeleteIndex readDeleteIndex(String fileName, long[] deleteIndexBytesOffset) {
try (SeekableInputStream inputStream =
fileIO.newInputStream(pathFactory.toPath(fileName))) {
inputStream.seek(deleteIndexBytesOffset[0]);
byte[] bytes = new byte[(int) deleteIndexBytesOffset[1]];
inputStream.read(bytes);
return new BitmapDeleteIndex().deserializeFromBytes(bytes);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

public String write(Map<String, DeleteIndex> input) throws IOException {
public String write(Map<String, DeleteIndex> input) {
int size = input.size();
int maxKeyLength = input.keySet().stream().mapToInt(String::length).max().orElse(0);

byte[][] keyBytesArray = new byte[size][];
long[] startArray = new long[size];
long[] lengthArray = new long[size];
byte[][] valueBytesArray = new byte[size][];
int i = 0;
long start = 8 + (long) size * (maxKeyLength + 16);
for (Map.Entry<String, DeleteIndex> entry : input.entrySet()) {
String key = entry.getKey();
byte[] valueBytes = entry.getValue().serializeToBytes();
keyBytesArray[i] = String.format("%" + maxKeyLength + "s", key).getBytes();
startArray[i] = start;
lengthArray[i] = valueBytes.length;
valueBytesArray[i] = valueBytes;
start += valueBytes.length;
i++;
}

Path path = pathFactory.newPath();
try {
String json = JsonSerdeUtil.toJson(input);
fileIO.writeFileUtf8(path, json);
// File structure:
// mapSize (int), maxKeyLength (int)
// Array of <padded keyBytes (maxKeyLength), start (long), length (long)>
// Array of <valueBytes>
try (DataOutputStream dataOutputStream =
new DataOutputStream(fileIO.newOutputStream(path, true))) {
dataOutputStream.writeInt(size);
dataOutputStream.writeInt(maxKeyLength);

for (int j = 0; j < size; j++) {
dataOutputStream.write(keyBytesArray[j]);
dataOutputStream.writeLong(startArray[j]);
dataOutputStream.writeLong(lengthArray[j]);
}

for (int j = 0; j < size; j++) {
dataOutputStream.write(valueBytesArray[j]);
}
} catch (IOException e) {
throw new RuntimeException("Failed to xxx: " + path, e);
throw new RuntimeException(e);
}
return path.getName();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@

import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand All @@ -39,10 +38,10 @@ public class DeleteMapIndexMaintainer implements IndexMaintainer<KeyValue, Delet

private final IndexFileHandler indexFileHandler;
private final IndexFileMeta indexFile;
private Map<String, DeleteIndex> deleteMap;
private Map<String, long[]> deleteIndexBytesOffsets;
private final Map<String, DeleteIndex> deleteMap;
private boolean modified;
private boolean restored;
private final HashSet<String> restoredFile;

public DeleteMapIndexMaintainer(
IndexFileHandler fileHandler,
Expand All @@ -63,12 +62,11 @@ public DeleteMapIndexMaintainer(
this.deleteMap = new HashMap<>();
this.modified = false;
this.restored = false;
this.restoredFile = new HashSet<>();
}

@Override
public void notifyNewRecord(KeyValue record) {
restoreDeleteMap();
restoreAllDeleteIndex();
DeleteIndex deleteIndex =
deleteMap.computeIfAbsent(record.fileName(), k -> new BitmapDeleteIndex());
if (!deleteIndex.isDeleted(record.position())) {
Expand All @@ -78,7 +76,7 @@ public void notifyNewRecord(KeyValue record) {
}

public void delete(String fileName) {
restoreDeleteMap();
restoreAllDeleteIndex();
if (deleteMap.containsKey(fileName)) {
deleteMap.remove(fileName);
modified = true;
Expand All @@ -95,34 +93,53 @@ public List<IndexFileMeta> prepareCommit() {
return Collections.emptyList();
}

// This method is only used by writer, which restore the whole delete map
private void restoreDeleteMap() {
if (indexFile != null && !restored) {
this.deleteMap = indexFileHandler.readDeleteMapIndex(indexFile);
restored = true;
}
}

// This method is only used by the reader, which just lazy load the specified delete index
public Optional<DeleteIndex> indexOf(String fileName) {
if (indexFile != null
&& !restored
&& !restoredFile.contains(fileName)
&& !deleteMap.containsKey(fileName)) {
restoredFile.add(fileName);
indexFileHandler
.readDeleteIndex(indexFile, fileName)
.ifPresent(deleteIndex -> deleteMap.put(fileName, deleteIndex));
}
restoreDeleteIndex(fileName);
return Optional.ofNullable(deleteMap.get(fileName));
}

@VisibleForTesting
public Map<String, DeleteIndex> deleteMap() {
restoreDeleteMap();
restoreAllDeleteIndex();
return deleteMap;
}

// -------------------------------------------------------------------------
// Internal methods
// -------------------------------------------------------------------------

private void restoreDeleteIndexBytesOffsets() {
if (indexFile != null && deleteIndexBytesOffsets == null) {
deleteIndexBytesOffsets =
new HashMap<>(indexFileHandler.readDeleteIndexBytesOffsets(indexFile));
}
}

// Restore the whole delete map
private void restoreAllDeleteIndex() {
restoreDeleteIndexBytesOffsets();
if (indexFile != null && !restored) {
deleteMap.putAll(
indexFileHandler.readAllDeleteIndex(indexFile, deleteIndexBytesOffsets));
restored = true;
}
}

// Restore the specified delete index
private void restoreDeleteIndex(String fileName) {
restoreDeleteIndexBytesOffsets();
if (indexFile != null
&& !restored
&& !deleteMap.containsKey(fileName)
&& deleteIndexBytesOffsets.containsKey(fileName)) {
deleteMap.put(
fileName,
indexFileHandler.readDeleteIndex(
indexFile, deleteIndexBytesOffsets.get(fileName)));
}
}

/** Factory to restore {@link DeleteMapIndexMaintainer}. */
public static class Factory implements IndexMaintainer.Factory<KeyValue, DeleteIndex> {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@

package org.apache.paimon.utils;

import org.apache.paimon.index.delete.DeleteIndex;
import org.apache.paimon.index.delete.DeleteIndexSerializer;
import org.apache.paimon.schema.SchemaSerializer;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.types.DataField;
Expand Down Expand Up @@ -157,11 +155,6 @@ private static Module createPaimonJacksonModule() {
DataTypeJsonParser::parseDataField);
registerJsonObjects(
module, DataType.class, DataType::serializeJson, DataTypeJsonParser::parseDataType);
registerJsonObjects(
module,
DeleteIndex.class,
DeleteIndexSerializer.INSTANCE,
DeleteIndexSerializer.INSTANCE);
return module;
}

Expand Down
Loading

0 comments on commit 8a6bc12

Please sign in to comment.