Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Delete write mem control parameters #29

Merged
merged 1 commit into from
Feb 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ public RestorableTsFileIOWriter(File file) throws IOException {
public RestorableTsFileIOWriter(File file, long maxMetadataSize) throws IOException {
this(file, true);
this.maxMetadataSize = maxMetadataSize;
this.enableMemoryControl = true;
this.chunkMetadataTempFile = new File(file.getAbsolutePath() + CHUNK_METADATA_TEMP_FILE_SUFFIX);
this.checkMetadataSizeAndMayFlush();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -111,7 +110,6 @@ public class TsFileIOWriter implements AutoCloseable {
protected volatile boolean hasChunkMetadataInDisk = false;
// record the total num of path in order to make bloom filter
protected int pathCount = 0;
protected boolean enableMemoryControl = false;
private Path lastSerializePath = null;
protected LinkedList<Long> endPosInCMTForDevice = new LinkedList<>();
private volatile int chunkMetadataCount = 0;
Expand Down Expand Up @@ -151,10 +149,8 @@ public TsFileIOWriter(TsFileOutput output, boolean test) {
}

/** for write with memory control */
public TsFileIOWriter(File file, boolean enableMemoryControl, long maxMetadataSize)
throws IOException {
public TsFileIOWriter(File file, long maxMetadataSize) throws IOException {
this(file);
this.enableMemoryControl = enableMemoryControl;
this.maxMetadataSize = maxMetadataSize;
chunkMetadataTempFile = new File(file.getAbsolutePath() + CHUNK_METADATA_TEMP_FILE_SUFFIX);
}
Expand Down Expand Up @@ -306,9 +302,7 @@ public void writeChunk(Chunk chunk) throws IOException {

/** end chunk and write some log. */
public void endCurrentChunk() {
if (enableMemoryControl) {
this.currentChunkMetadataSize += currentChunkMetadata.getRetainedSizeInBytes();
}
this.currentChunkMetadataSize += currentChunkMetadata.getRetainedSizeInBytes();
chunkMetadataCount++;
chunkMetadataList.add(currentChunkMetadata);
currentChunkMetadata = null;
Expand Down Expand Up @@ -508,40 +502,6 @@ public void setFile(File file) {
this.file = file;
}

/** Remove such ChunkMetadata that its startTime is not in chunkStartTimes */
public void filterChunks(Map<Path, List<Long>> chunkStartTimes) {
Map<Path, Integer> startTimeIdxes = new HashMap<>();
chunkStartTimes.forEach((p, t) -> startTimeIdxes.put(p, 0));

Iterator<ChunkGroupMetadata> chunkGroupMetaDataIterator = chunkGroupMetadataList.iterator();
while (chunkGroupMetaDataIterator.hasNext()) {
ChunkGroupMetadata chunkGroupMetaData = chunkGroupMetaDataIterator.next();
String deviceId = chunkGroupMetaData.getDevice();
int chunkNum = chunkGroupMetaData.getChunkMetadataList().size();
Iterator<ChunkMetadata> chunkMetaDataIterator =
chunkGroupMetaData.getChunkMetadataList().iterator();
while (chunkMetaDataIterator.hasNext()) {
IChunkMetadata chunkMetaData = chunkMetaDataIterator.next();
Path path = new Path(deviceId, chunkMetaData.getMeasurementUid(), true);
int startTimeIdx = startTimeIdxes.get(path);

List<Long> pathChunkStartTimes = chunkStartTimes.get(path);
boolean chunkValid =
startTimeIdx < pathChunkStartTimes.size()
&& pathChunkStartTimes.get(startTimeIdx) == chunkMetaData.getStartTime();
if (!chunkValid) {
chunkMetaDataIterator.remove();
chunkNum--;
} else {
startTimeIdxes.put(path, startTimeIdx + 1);
}
}
if (chunkNum == 0) {
chunkGroupMetaDataIterator.remove();
}
}
}

public void writePlanIndices() throws IOException {
ReadWriteIOUtils.write(MetaMarker.OPERATION_INDEX_RANGE, out.wrapAsStream());
ReadWriteIOUtils.write(minPlanIndex, out.wrapAsStream());
Expand Down Expand Up @@ -630,7 +590,7 @@ public void setMaxPlanIndex(long maxPlanIndex) {
*/
public int checkMetadataSizeAndMayFlush() throws IOException {
// This function should be called after all data of an aligned device has been written
if (enableMemoryControl && currentChunkMetadataSize > maxMetadataSize) {
if (currentChunkMetadataSize > maxMetadataSize) {
try {
if (logger.isDebugEnabled()) {
logger.debug(
Expand Down Expand Up @@ -700,7 +660,7 @@ private int writeChunkMetadataToTempFile(
// for each device, we only serialize it once, in order to save io
writtenSize += ReadWriteIOUtils.write(seriesPath.getDevice(), tempOutput.wrapAsStream());
}
if (isNewPath && iChunkMetadataList.size() > 0) {
if (isNewPath && !iChunkMetadataList.isEmpty()) {
// serialize the public info of this measurement
writtenSize +=
ReadWriteIOUtils.writeVar(seriesPath.getMeasurement(), tempOutput.wrapAsStream());
Expand All @@ -719,10 +679,6 @@ private int writeChunkMetadataToTempFile(
return writtenSize;
}

public String getCurrentChunkGroupDeviceId() {
return currentChunkGroupDeviceId;
}

public List<ChunkGroupMetadata> getChunkGroupMetadataList() {
return chunkGroupMetadataList;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public void tearDown() throws IOException {
/** The following tests is for ChunkMetadata serialization and deserialization. */
@Test
public void testSerializeAndDeserializeChunkMetadata() throws IOException {
try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024 * 1024 * 10)) {
try (TsFileIOWriter writer = new TsFileIOWriter(testFile, 1024 * 1024 * 10)) {
List<ChunkMetadata> originChunkMetadataList = new ArrayList<>();
for (int i = 0; i < 10; ++i) {
String deviceId = sortedDeviceId.get(i);
Expand Down Expand Up @@ -147,7 +147,7 @@ public void testSerializeAndDeserializeChunkMetadata() throws IOException {

@Test
public void testSerializeAndDeserializeAlignedChunkMetadata() throws IOException {
try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024 * 1024 * 10)) {
try (TsFileIOWriter writer = new TsFileIOWriter(testFile, 1024 * 1024 * 10)) {
List<ChunkMetadata> originChunkMetadataList = new ArrayList<>();
for (int i = 0; i < 10; ++i) {
String deviceId = sortedDeviceId.get(i);
Expand Down Expand Up @@ -185,7 +185,7 @@ public void testSerializeAndDeserializeAlignedChunkMetadata() throws IOException

@Test
public void testSerializeAndDeserializeMixedChunkMetadata() throws IOException {
try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024 * 1024 * 10)) {
try (TsFileIOWriter writer = new TsFileIOWriter(testFile, 1024 * 1024 * 10)) {
List<IChunkMetadata> originChunkMetadataList = new ArrayList<>();
List<String> seriesIds = new ArrayList<>();
for (int i = 0; i < 10; ++i) {
Expand Down Expand Up @@ -258,7 +258,7 @@ public void testSerializeAndDeserializeMixedChunkMetadata() throws IOException {
@Test
public void testWriteCompleteFileWithNormalChunk() throws IOException {
Map<String, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>> originData = new HashMap<>();
try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) {
try (TsFileIOWriter writer = new TsFileIOWriter(testFile, 1024)) {
for (int i = 0; i < 10; ++i) {
String deviceId = sortedDeviceId.get(i);
writer.startChunkGroup(deviceId);
Expand Down Expand Up @@ -311,7 +311,7 @@ public void testWriteCompleteFileWithNormalChunk() throws IOException {
@Test
public void testWriteCompleteFileWithMultipleNormalChunk() throws IOException {
Map<String, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>> originData = new HashMap<>();
try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) {
try (TsFileIOWriter writer = new TsFileIOWriter(testFile, 1024)) {
for (int i = 0; i < 10; ++i) {
String deviceId = sortedDeviceId.get(i);
writer.startChunkGroup(deviceId);
Expand Down Expand Up @@ -398,7 +398,7 @@ public void testWriteCompleteFileWithMultipleNormalChunk() throws IOException {
@Test
public void testWriteCompleteFileWithMetadataRemainsInMemoryWhenEndFile() throws IOException {
Map<String, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>> originData = new HashMap<>();
try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) {
try (TsFileIOWriter writer = new TsFileIOWriter(testFile, 1024)) {
for (int i = 0; i < 10; ++i) {
String deviceId = sortedDeviceId.get(i);
writer.startChunkGroup(deviceId);
Expand Down Expand Up @@ -490,7 +490,7 @@ public void testWriteCompleteFileWithEnormousNormalChunk() throws IOException {
Map<String, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>> originData = new HashMap<>();
long originTestChunkSize = TEST_CHUNK_SIZE;
TEST_CHUNK_SIZE = 10;
try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) {
try (TsFileIOWriter writer = new TsFileIOWriter(testFile, 1024)) {
for (int i = 0; i < 2; ++i) {
String deviceId = sortedDeviceId.get(i);
writer.startChunkGroup(deviceId);
Expand Down Expand Up @@ -581,7 +581,7 @@ public void testWriteCompleteFileWithEnormousSeriesNum() throws IOException {
Map<String, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>> originTimes = new HashMap<>();
long originTestChunkSize = TEST_CHUNK_SIZE;
TEST_CHUNK_SIZE = 1;
try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) {
try (TsFileIOWriter writer = new TsFileIOWriter(testFile, 1024)) {
for (int i = 0; i < 2; ++i) {
String deviceId = sortedDeviceId.get(i);
writer.startChunkGroup(deviceId);
Expand Down Expand Up @@ -672,7 +672,7 @@ public void testWriteCompleteFileWithEnormousDeviceNum() throws IOException {
Map<String, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>> originTimes = new HashMap<>();
long originTestChunkSize = TEST_CHUNK_SIZE;
TEST_CHUNK_SIZE = 10;
try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) {
try (TsFileIOWriter writer = new TsFileIOWriter(testFile, 1024)) {
for (int i = 0; i < 1024; ++i) {
String deviceId = sortedDeviceId.get(i);
writer.startChunkGroup(deviceId);
Expand Down Expand Up @@ -762,7 +762,7 @@ public void testWriteCompleteFileWithEnormousDeviceNum() throws IOException {
@Test
public void testWriteCompleteFileWithAlignedSeriesWithOneChunk() throws IOException {
Map<String, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>> originData = new HashMap<>();
try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) {
try (TsFileIOWriter writer = new TsFileIOWriter(testFile, 1024)) {
for (int i = 0; i < 10; ++i) {
String deviceId = sortedDeviceId.get(i);
writer.startChunkGroup(deviceId);
Expand Down Expand Up @@ -798,7 +798,7 @@ public void testWriteCompleteFileWithAlignedSeriesWithOneChunk() throws IOExcept
public void testWriteCompleteFileWithAlignedSeriesWithMultiChunks() throws IOException {
Map<String, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>> originData = new HashMap<>();
int chunkNum = 512, seriesNum = 6;
try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) {
try (TsFileIOWriter writer = new TsFileIOWriter(testFile, 1024)) {
for (int i = 0; i < 1; ++i) {
String deviceId = sortedDeviceId.get(i);
for (int k = 0; k < chunkNum; ++k) {
Expand Down Expand Up @@ -840,7 +840,7 @@ public void testWriteCompleteFileWithAlignedSeriesWithManyComponents() throws IO
long originTestPointNum = TEST_CHUNK_SIZE;
TEST_CHUNK_SIZE = 10;
try {
try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) {
try (TsFileIOWriter writer = new TsFileIOWriter(testFile, 1024)) {
for (int i = 0; i < 10; ++i) {
String deviceId = sortedDeviceId.get(i);
for (int k = 0; k < chunkNum; ++k) {
Expand Down Expand Up @@ -881,7 +881,7 @@ public void testWriteCompleteFileWithLotsAlignedSeries() throws IOException {
TEST_CHUNK_SIZE = 10;
int deviceNum = 1024;
try {
try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) {
try (TsFileIOWriter writer = new TsFileIOWriter(testFile, 1024)) {
for (int i = 0; i < deviceNum; ++i) {
String deviceId = sortedDeviceId.get(i);
for (int k = 0; k < chunkNum; ++k) {
Expand Down Expand Up @@ -918,7 +918,7 @@ public void testWriteCompleteFileWithLotsAlignedSeries() throws IOException {
public void testWritingAlignedSeriesByColumnWithMultiComponents() throws IOException {
Map<String, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>> originValue = new HashMap<>();
TEST_CHUNK_SIZE = 10;
try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) {
try (TsFileIOWriter writer = new TsFileIOWriter(testFile, 1024)) {
for (int i = 0; i < 5; i++) {
String deviceId = sortedDeviceId.get(i);
writer.startChunkGroup(deviceId);
Expand Down Expand Up @@ -976,7 +976,7 @@ public void testWritingAlignedSeriesByColumnWithMultiComponents() throws IOExcep
@Test
public void testWritingCompleteMixedFiles() throws IOException {
Map<String, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>> originData = new HashMap<>();
try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) {
try (TsFileIOWriter writer = new TsFileIOWriter(testFile, 1024)) {
for (int i = 0; i < 5; ++i) {
String deviceId = sortedDeviceId.get(i);
for (int k = 0; k < 10; ++k) {
Expand Down Expand Up @@ -1075,7 +1075,7 @@ public void testWritingCompleteMixedFiles() throws IOException {
@Test
public void testWritingAlignedSeriesByColumn() throws IOException {
Map<String, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>> originValue = new HashMap<>();
try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) {
try (TsFileIOWriter writer = new TsFileIOWriter(testFile, 1024)) {
for (int i = 0; i < 5; i++) {
String deviceId = sortedDeviceId.get(i);
writer.startChunkGroup(deviceId);
Expand Down Expand Up @@ -1129,7 +1129,7 @@ public void testWritingAlignedSeriesByColumn() throws IOException {
@Test
public void testWritingAlignedSeriesByColumnWithMultiChunks() throws IOException {
Map<String, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>> originValue = new HashMap<>();
try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) {
try (TsFileIOWriter writer = new TsFileIOWriter(testFile, 1024)) {
for (int i = 0; i < 5; i++) {
String deviceId = sortedDeviceId.get(i);
writer.startChunkGroup(deviceId);
Expand Down
Loading