From 1cd762f50d17b8d5c1519a34f707e022cded6c52 Mon Sep 17 00:00:00 2001 From: wudi <676366545@qq.com> Date: Fri, 20 Dec 2024 17:13:56 +0800 Subject: [PATCH] rename DefaultLogRecordBatch to MemoryLogRecordBatch --- .../alibaba/fluss/client/write/WriteRecord.java | 4 ++-- .../client/write/IndexedLogWriteBatchTest.java | 8 ++++---- .../fluss/client/write/RecordAccumulatorTest.java | 2 +- .../com/alibaba/fluss/record/DefaultLogRecord.java | 2 +- .../alibaba/fluss/record/FileLogInputStream.java | 14 +++++++------- .../alibaba/fluss/record/FileLogProjection.java | 10 +++++----- ...gRecordBatch.java => MemoryLogRecordBatch.java} | 7 +++---- .../com/alibaba/fluss/record/MemoryLogRecords.java | 4 ++-- .../fluss/record/MemoryLogRecordsArrowBuilder.java | 12 ++++++------ .../record/MemoryLogRecordsIndexedBuilder.java | 12 ++++++------ .../fluss/record/MemorySegmentLogInputStream.java | 6 +++--- ...atchTest.java => MemoryLogRecordBatchTest.java} | 4 ++-- .../record/MemorySegmentLogInputStreamTest.java | 2 +- .../fluss/row/arrow/ArrowReaderWriterTest.java | 2 +- .../com/alibaba/fluss/testutils/DataTestUtils.java | 6 +++--- .../com/alibaba/fluss/server/log/LogSegment.java | 2 +- .../com/alibaba/fluss/server/log/LogTablet.java | 10 +++++----- 17 files changed, 53 insertions(+), 54 deletions(-) rename fluss-common/src/main/java/com/alibaba/fluss/record/{DefaultLogRecordBatch.java => MemoryLogRecordBatch.java} (98%) rename fluss-common/src/test/java/com/alibaba/fluss/record/{DefaultLogRecordBatchTest.java => MemoryLogRecordBatchTest.java} (97%) diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/write/WriteRecord.java b/fluss-client/src/main/java/com/alibaba/fluss/client/write/WriteRecord.java index cd4dcd5a..db521c09 100644 --- a/fluss-client/src/main/java/com/alibaba/fluss/client/write/WriteRecord.java +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/write/WriteRecord.java @@ -23,7 +23,7 @@ import com.alibaba.fluss.record.DefaultKvRecord; import com.alibaba.fluss.record.DefaultKvRecordBatch; import com.alibaba.fluss.record.DefaultLogRecord; -import com.alibaba.fluss.record.DefaultLogRecordBatch; +import com.alibaba.fluss.record.MemoryLogRecordBatch; import com.alibaba.fluss.row.InternalRow; import javax.annotation.Nullable; @@ -79,7 +79,7 @@ public WriteRecord( // TODO: row maybe not IndexedRow, which can't be estimated size // and the size maybe not accurate when the format is arrow. : DefaultLogRecord.sizeOf(row) - + DefaultLogRecordBatch.RECORD_BATCH_HEADER_SIZE; + + MemoryLogRecordBatch.RECORD_BATCH_HEADER_SIZE; } public PhysicalTablePath getPhysicalTablePath() { diff --git a/fluss-client/src/test/java/com/alibaba/fluss/client/write/IndexedLogWriteBatchTest.java b/fluss-client/src/test/java/com/alibaba/fluss/client/write/IndexedLogWriteBatchTest.java index 482bda5b..3d7222a0 100644 --- a/fluss-client/src/test/java/com/alibaba/fluss/client/write/IndexedLogWriteBatchTest.java +++ b/fluss-client/src/test/java/com/alibaba/fluss/client/write/IndexedLogWriteBatchTest.java @@ -20,7 +20,7 @@ import com.alibaba.fluss.memory.MemorySegmentOutputView; import com.alibaba.fluss.metadata.TableBucket; import com.alibaba.fluss.record.DefaultLogRecord; -import com.alibaba.fluss.record.DefaultLogRecordBatch; +import com.alibaba.fluss.record.MemoryLogRecordBatch; import com.alibaba.fluss.record.LogRecord; import com.alibaba.fluss.record.LogRecordReadContext; import com.alibaba.fluss.record.MemoryLogRecordsIndexedBuilder; @@ -65,7 +65,7 @@ void testTryAppendWithWriteLimit() throws Exception { for (int i = 0; i - < (writeLimit - DefaultLogRecordBatch.RECORD_BATCH_HEADER_SIZE) + < (writeLimit - MemoryLogRecordBatch.RECORD_BATCH_HEADER_SIZE) / estimatedSizeInBytes; i++) { boolean appendResult = @@ -89,7 +89,7 @@ void testToBytes() throws Exception { logProducerBatch.close(); logProducerBatch.serialize(); BytesView bytesView = logProducerBatch.build(); - DefaultLogRecordBatch recordBatch = new DefaultLogRecordBatch(); + MemoryLogRecordBatch recordBatch = new MemoryLogRecordBatch(); MemorySegmentBytesView firstBytesView = (MemorySegmentBytesView) bytesView; recordBatch.pointTo(firstBytesView.getMemorySegment(), firstBytesView.getPosition()); assertDefaultLogRecordBatchEquals(recordBatch); @@ -164,7 +164,7 @@ private IndexedLogWriteBatch createLogWriteBatch( new MemorySegmentOutputView(memorySegment))); } - private void assertDefaultLogRecordBatchEquals(DefaultLogRecordBatch recordBatch) { + private void assertDefaultLogRecordBatchEquals(MemoryLogRecordBatch recordBatch) { assertThat(recordBatch.getRecordCount()).isEqualTo(1); assertThat(recordBatch.baseLogOffset()).isEqualTo(0L); assertThat(recordBatch.schemaId()).isEqualTo((short) DATA1_TABLE_INFO.getSchemaId()); diff --git a/fluss-client/src/test/java/com/alibaba/fluss/client/write/RecordAccumulatorTest.java b/fluss-client/src/test/java/com/alibaba/fluss/client/write/RecordAccumulatorTest.java index 7aff708e..0f449493 100644 --- a/fluss-client/src/test/java/com/alibaba/fluss/client/write/RecordAccumulatorTest.java +++ b/fluss-client/src/test/java/com/alibaba/fluss/client/write/RecordAccumulatorTest.java @@ -60,7 +60,7 @@ import java.util.Map; import java.util.stream.Collectors; -import static com.alibaba.fluss.record.DefaultLogRecordBatch.RECORD_BATCH_HEADER_SIZE; +import static com.alibaba.fluss.record.MemoryLogRecordBatch.RECORD_BATCH_HEADER_SIZE; import static com.alibaba.fluss.record.TestData.DATA1_PHYSICAL_TABLE_PATH; import static com.alibaba.fluss.record.TestData.DATA1_ROW_TYPE; import static com.alibaba.fluss.record.TestData.DATA1_SCHEMA; diff --git a/fluss-common/src/main/java/com/alibaba/fluss/record/DefaultLogRecord.java b/fluss-common/src/main/java/com/alibaba/fluss/record/DefaultLogRecord.java index 4f34b452..55ed1296 100644 --- a/fluss-common/src/main/java/com/alibaba/fluss/record/DefaultLogRecord.java +++ b/fluss-common/src/main/java/com/alibaba/fluss/record/DefaultLogRecord.java @@ -29,7 +29,7 @@ import java.io.IOException; -import static com.alibaba.fluss.record.DefaultLogRecordBatch.LENGTH_LENGTH; +import static com.alibaba.fluss.record.MemoryLogRecordBatch.LENGTH_LENGTH; /* This file is based on source code of Apache Kafka Project (https://kafka.apache.org/), licensed by the Apache * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for diff --git a/fluss-common/src/main/java/com/alibaba/fluss/record/FileLogInputStream.java b/fluss-common/src/main/java/com/alibaba/fluss/record/FileLogInputStream.java index 926af5ee..d0e2f6f4 100644 --- a/fluss-common/src/main/java/com/alibaba/fluss/record/FileLogInputStream.java +++ b/fluss-common/src/main/java/com/alibaba/fluss/record/FileLogInputStream.java @@ -27,12 +27,12 @@ import java.nio.channels.FileChannel; import java.util.Objects; -import static com.alibaba.fluss.record.DefaultLogRecordBatch.BASE_OFFSET_OFFSET; -import static com.alibaba.fluss.record.DefaultLogRecordBatch.LENGTH_OFFSET; -import static com.alibaba.fluss.record.DefaultLogRecordBatch.LOG_OVERHEAD; -import static com.alibaba.fluss.record.DefaultLogRecordBatch.MAGIC_LENGTH; -import static com.alibaba.fluss.record.DefaultLogRecordBatch.MAGIC_OFFSET; -import static com.alibaba.fluss.record.DefaultLogRecordBatch.RECORD_BATCH_HEADER_SIZE; +import static com.alibaba.fluss.record.MemoryLogRecordBatch.BASE_OFFSET_OFFSET; +import static com.alibaba.fluss.record.MemoryLogRecordBatch.LENGTH_OFFSET; +import static com.alibaba.fluss.record.MemoryLogRecordBatch.LOG_OVERHEAD; +import static com.alibaba.fluss.record.MemoryLogRecordBatch.MAGIC_LENGTH; +import static com.alibaba.fluss.record.MemoryLogRecordBatch.MAGIC_OFFSET; +import static com.alibaba.fluss.record.MemoryLogRecordBatch.RECORD_BATCH_HEADER_SIZE; /* This file is based on source code of Apache Kafka Project (https://kafka.apache.org/), licensed by the Apache * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for @@ -182,7 +182,7 @@ public int sizeInBytes() { } private LogRecordBatch toMemoryRecordBatch(ByteBuffer buffer) { - DefaultLogRecordBatch records = new DefaultLogRecordBatch(); + MemoryLogRecordBatch records = new MemoryLogRecordBatch(); records.pointTo(MemorySegment.wrap(buffer.array()), 0); return records; } diff --git a/fluss-common/src/main/java/com/alibaba/fluss/record/FileLogProjection.java b/fluss-common/src/main/java/com/alibaba/fluss/record/FileLogProjection.java index f97c50f1..8e328bab 100644 --- a/fluss-common/src/main/java/com/alibaba/fluss/record/FileLogProjection.java +++ b/fluss-common/src/main/java/com/alibaba/fluss/record/FileLogProjection.java @@ -49,11 +49,11 @@ import java.util.List; import java.util.Map; -import static com.alibaba.fluss.record.DefaultLogRecordBatch.ARROW_ROWKIND_OFFSET; -import static com.alibaba.fluss.record.DefaultLogRecordBatch.LENGTH_OFFSET; -import static com.alibaba.fluss.record.DefaultLogRecordBatch.LOG_OVERHEAD; -import static com.alibaba.fluss.record.DefaultLogRecordBatch.RECORDS_COUNT_OFFSET; -import static com.alibaba.fluss.record.DefaultLogRecordBatch.RECORD_BATCH_HEADER_SIZE; +import static com.alibaba.fluss.record.MemoryLogRecordBatch.ARROW_ROWKIND_OFFSET; +import static com.alibaba.fluss.record.MemoryLogRecordBatch.LENGTH_OFFSET; +import static com.alibaba.fluss.record.MemoryLogRecordBatch.LOG_OVERHEAD; +import static com.alibaba.fluss.record.MemoryLogRecordBatch.RECORDS_COUNT_OFFSET; +import static com.alibaba.fluss.record.MemoryLogRecordBatch.RECORD_BATCH_HEADER_SIZE; import static com.alibaba.fluss.utils.FileUtils.readFullyOrFail; import static com.alibaba.fluss.utils.Preconditions.checkNotNull; import static com.alibaba.fluss.utils.Preconditions.checkState; diff --git a/fluss-common/src/main/java/com/alibaba/fluss/record/DefaultLogRecordBatch.java b/fluss-common/src/main/java/com/alibaba/fluss/record/MemoryLogRecordBatch.java similarity index 98% rename from fluss-common/src/main/java/com/alibaba/fluss/record/DefaultLogRecordBatch.java rename to fluss-common/src/main/java/com/alibaba/fluss/record/MemoryLogRecordBatch.java index 9be052b1..dc00debf 100644 --- a/fluss-common/src/main/java/com/alibaba/fluss/record/DefaultLogRecordBatch.java +++ b/fluss-common/src/main/java/com/alibaba/fluss/record/MemoryLogRecordBatch.java @@ -72,9 +72,8 @@ * * @since 0.1 */ -// TODO rename to MemoryLogRecordBatch @PublicEvolving -public class DefaultLogRecordBatch implements LogRecordBatch { +public class MemoryLogRecordBatch implements LogRecordBatch { protected static final int BASE_OFFSET_LENGTH = 8; public static final int LENGTH_LENGTH = 4; static final int MAGIC_LENGTH = 1; @@ -248,7 +247,7 @@ public boolean equals(Object o) { return false; } - DefaultLogRecordBatch that = (DefaultLogRecordBatch) o; + MemoryLogRecordBatch that = (MemoryLogRecordBatch) o; int sizeInBytes = sizeInBytes(); return sizeInBytes == that.sizeInBytes() && segment.equalTo(that.segment, position, that.position, sizeInBytes); @@ -262,7 +261,7 @@ public int hashCode() { private CloseableIterator rowRecordIterator(RowType rowType, long timestamp) { DataType[] fieldTypes = rowType.getChildren().toArray(new DataType[0]); return new LogRecordIterator() { - int position = DefaultLogRecordBatch.this.position + RECORD_BATCH_HEADER_SIZE; + int position = MemoryLogRecordBatch.this.position + RECORD_BATCH_HEADER_SIZE; int rowId = 0; @Override diff --git a/fluss-common/src/main/java/com/alibaba/fluss/record/MemoryLogRecords.java b/fluss-common/src/main/java/com/alibaba/fluss/record/MemoryLogRecords.java index b8484839..14462f42 100644 --- a/fluss-common/src/main/java/com/alibaba/fluss/record/MemoryLogRecords.java +++ b/fluss-common/src/main/java/com/alibaba/fluss/record/MemoryLogRecords.java @@ -70,12 +70,12 @@ public void clear() { } public void ensureValid() { - if (sizeInBytes < DefaultLogRecordBatch.RECORD_BATCH_HEADER_SIZE) { + if (sizeInBytes < MemoryLogRecordBatch.RECORD_BATCH_HEADER_SIZE) { throw new RuntimeException( "Record batch is corrupt (the size " + sizeInBytes + " is smaller than the minimum allowed overhead " - + DefaultLogRecordBatch.RECORD_BATCH_HEADER_SIZE + + MemoryLogRecordBatch.RECORD_BATCH_HEADER_SIZE + ")"); } } diff --git a/fluss-common/src/main/java/com/alibaba/fluss/record/MemoryLogRecordsArrowBuilder.java b/fluss-common/src/main/java/com/alibaba/fluss/record/MemoryLogRecordsArrowBuilder.java index 869eea6e..72059e2f 100644 --- a/fluss-common/src/main/java/com/alibaba/fluss/record/MemoryLogRecordsArrowBuilder.java +++ b/fluss-common/src/main/java/com/alibaba/fluss/record/MemoryLogRecordsArrowBuilder.java @@ -32,12 +32,12 @@ import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; -import static com.alibaba.fluss.record.DefaultLogRecordBatch.ARROW_ROWKIND_OFFSET; -import static com.alibaba.fluss.record.DefaultLogRecordBatch.BASE_OFFSET_LENGTH; -import static com.alibaba.fluss.record.DefaultLogRecordBatch.CRC_OFFSET; -import static com.alibaba.fluss.record.DefaultLogRecordBatch.LENGTH_LENGTH; -import static com.alibaba.fluss.record.DefaultLogRecordBatch.SCHEMA_ID_OFFSET; -import static com.alibaba.fluss.record.DefaultLogRecordBatch.WRITE_CLIENT_ID_OFFSET; +import static com.alibaba.fluss.record.MemoryLogRecordBatch.ARROW_ROWKIND_OFFSET; +import static com.alibaba.fluss.record.MemoryLogRecordBatch.BASE_OFFSET_LENGTH; +import static com.alibaba.fluss.record.MemoryLogRecordBatch.CRC_OFFSET; +import static com.alibaba.fluss.record.MemoryLogRecordBatch.LENGTH_LENGTH; +import static com.alibaba.fluss.record.MemoryLogRecordBatch.SCHEMA_ID_OFFSET; +import static com.alibaba.fluss.record.MemoryLogRecordBatch.WRITE_CLIENT_ID_OFFSET; import static com.alibaba.fluss.record.LogRecordBatch.CURRENT_LOG_MAGIC_VALUE; import static com.alibaba.fluss.record.LogRecordBatch.NO_BATCH_SEQUENCE; import static com.alibaba.fluss.record.LogRecordBatch.NO_WRITER_ID; diff --git a/fluss-common/src/main/java/com/alibaba/fluss/record/MemoryLogRecordsIndexedBuilder.java b/fluss-common/src/main/java/com/alibaba/fluss/record/MemoryLogRecordsIndexedBuilder.java index 2a81f8ab..2195a9ea 100644 --- a/fluss-common/src/main/java/com/alibaba/fluss/record/MemoryLogRecordsIndexedBuilder.java +++ b/fluss-common/src/main/java/com/alibaba/fluss/record/MemoryLogRecordsIndexedBuilder.java @@ -25,12 +25,12 @@ import java.io.IOException; -import static com.alibaba.fluss.record.DefaultLogRecordBatch.BASE_OFFSET_LENGTH; -import static com.alibaba.fluss.record.DefaultLogRecordBatch.CRC_OFFSET; -import static com.alibaba.fluss.record.DefaultLogRecordBatch.LENGTH_LENGTH; -import static com.alibaba.fluss.record.DefaultLogRecordBatch.RECORD_BATCH_HEADER_SIZE; -import static com.alibaba.fluss.record.DefaultLogRecordBatch.SCHEMA_ID_OFFSET; -import static com.alibaba.fluss.record.DefaultLogRecordBatch.WRITE_CLIENT_ID_OFFSET; +import static com.alibaba.fluss.record.MemoryLogRecordBatch.BASE_OFFSET_LENGTH; +import static com.alibaba.fluss.record.MemoryLogRecordBatch.CRC_OFFSET; +import static com.alibaba.fluss.record.MemoryLogRecordBatch.LENGTH_LENGTH; +import static com.alibaba.fluss.record.MemoryLogRecordBatch.RECORD_BATCH_HEADER_SIZE; +import static com.alibaba.fluss.record.MemoryLogRecordBatch.SCHEMA_ID_OFFSET; +import static com.alibaba.fluss.record.MemoryLogRecordBatch.WRITE_CLIENT_ID_OFFSET; import static com.alibaba.fluss.record.LogRecordBatch.CURRENT_LOG_MAGIC_VALUE; import static com.alibaba.fluss.record.LogRecordBatch.NO_BATCH_SEQUENCE; import static com.alibaba.fluss.record.LogRecordBatch.NO_WRITER_ID; diff --git a/fluss-common/src/main/java/com/alibaba/fluss/record/MemorySegmentLogInputStream.java b/fluss-common/src/main/java/com/alibaba/fluss/record/MemorySegmentLogInputStream.java index 402a64f1..271895d2 100644 --- a/fluss-common/src/main/java/com/alibaba/fluss/record/MemorySegmentLogInputStream.java +++ b/fluss-common/src/main/java/com/alibaba/fluss/record/MemorySegmentLogInputStream.java @@ -18,8 +18,8 @@ import com.alibaba.fluss.memory.MemorySegment; -import static com.alibaba.fluss.record.DefaultLogRecordBatch.LENGTH_OFFSET; -import static com.alibaba.fluss.record.DefaultLogRecordBatch.LOG_OVERHEAD; +import static com.alibaba.fluss.record.MemoryLogRecordBatch.LENGTH_OFFSET; +import static com.alibaba.fluss.record.MemoryLogRecordBatch.LOG_OVERHEAD; /** * A byte buffer backed log input stream. This class avoids the need to copy records by returning @@ -43,7 +43,7 @@ public LogRecordBatch nextBatch() { return null; } - DefaultLogRecordBatch logRecords = new DefaultLogRecordBatch(); + MemoryLogRecordBatch logRecords = new MemoryLogRecordBatch(); logRecords.pointTo(memorySegment, currentPosition); currentPosition += batchSize; diff --git a/fluss-common/src/test/java/com/alibaba/fluss/record/DefaultLogRecordBatchTest.java b/fluss-common/src/test/java/com/alibaba/fluss/record/MemoryLogRecordBatchTest.java similarity index 97% rename from fluss-common/src/test/java/com/alibaba/fluss/record/DefaultLogRecordBatchTest.java rename to fluss-common/src/test/java/com/alibaba/fluss/record/MemoryLogRecordBatchTest.java index af2a0e9c..1b89377e 100644 --- a/fluss-common/src/test/java/com/alibaba/fluss/record/DefaultLogRecordBatchTest.java +++ b/fluss-common/src/test/java/com/alibaba/fluss/record/MemoryLogRecordBatchTest.java @@ -30,8 +30,8 @@ import static org.assertj.core.api.Assertions.assertThat; -/** Test for {@link DefaultLogRecordBatch}. */ -public class DefaultLogRecordBatchTest extends LogTestBase { +/** Test for {@link MemoryLogRecordBatch}. */ +public class MemoryLogRecordBatchTest extends LogTestBase { @Test void testRecordBatchSize() throws Exception { diff --git a/fluss-common/src/test/java/com/alibaba/fluss/record/MemorySegmentLogInputStreamTest.java b/fluss-common/src/test/java/com/alibaba/fluss/record/MemorySegmentLogInputStreamTest.java index 05e3002a..dd7c7985 100644 --- a/fluss-common/src/test/java/com/alibaba/fluss/record/MemorySegmentLogInputStreamTest.java +++ b/fluss-common/src/test/java/com/alibaba/fluss/record/MemorySegmentLogInputStreamTest.java @@ -22,7 +22,7 @@ import java.util.Iterator; -import static com.alibaba.fluss.record.DefaultLogRecordBatch.LOG_OVERHEAD; +import static com.alibaba.fluss.record.MemoryLogRecordBatch.LOG_OVERHEAD; import static com.alibaba.fluss.record.TestData.DATA1; import static org.assertj.core.api.Assertions.assertThat; diff --git a/fluss-common/src/test/java/com/alibaba/fluss/row/arrow/ArrowReaderWriterTest.java b/fluss-common/src/test/java/com/alibaba/fluss/row/arrow/ArrowReaderWriterTest.java index 4b63b29f..b5963d35 100644 --- a/fluss-common/src/test/java/com/alibaba/fluss/row/arrow/ArrowReaderWriterTest.java +++ b/fluss-common/src/test/java/com/alibaba/fluss/row/arrow/ArrowReaderWriterTest.java @@ -41,7 +41,7 @@ import java.util.Arrays; import java.util.List; -import static com.alibaba.fluss.record.DefaultLogRecordBatch.ARROW_ROWKIND_OFFSET; +import static com.alibaba.fluss.record.MemoryLogRecordBatch.ARROW_ROWKIND_OFFSET; import static com.alibaba.fluss.record.TestData.DATA1; import static com.alibaba.fluss.record.TestData.DATA1_ROW_TYPE; import static com.alibaba.fluss.testutils.DataTestUtils.row; diff --git a/fluss-common/src/test/java/com/alibaba/fluss/testutils/DataTestUtils.java b/fluss-common/src/test/java/com/alibaba/fluss/testutils/DataTestUtils.java index 8e8c67e6..a349b7f2 100644 --- a/fluss-common/src/test/java/com/alibaba/fluss/testutils/DataTestUtils.java +++ b/fluss-common/src/test/java/com/alibaba/fluss/testutils/DataTestUtils.java @@ -25,7 +25,7 @@ import com.alibaba.fluss.metadata.PhysicalTablePath; import com.alibaba.fluss.metadata.Schema; import com.alibaba.fluss.metadata.TableBucket; -import com.alibaba.fluss.record.DefaultLogRecordBatch; +import com.alibaba.fluss.record.MemoryLogRecordBatch; import com.alibaba.fluss.record.FileLogRecords; import com.alibaba.fluss.record.KvRecord; import com.alibaba.fluss.record.KvRecordBatch; @@ -363,7 +363,7 @@ private static MemoryLogRecords createIndexedMemoryLogRecords( MemoryLogRecords memoryLogRecords = builder.build(); memoryLogRecords.ensureValid(); - ((DefaultLogRecordBatch) memoryLogRecords.batches().iterator().next()) + ((MemoryLogRecordBatch) memoryLogRecords.batches().iterator().next()) .setCommitTimestamp(maxTimestamp); return memoryLogRecords; } @@ -398,7 +398,7 @@ private static MemoryLogRecords createArrowMemoryLogRecords( MemoryLogRecords memoryLogRecords = MemoryLogRecords.pointToByteBuffer(bytesView.getByteBuf().nioBuffer()); - ((DefaultLogRecordBatch) memoryLogRecords.batches().iterator().next()) + ((MemoryLogRecordBatch) memoryLogRecords.batches().iterator().next()) .setCommitTimestamp(maxTimestamp); memoryLogRecords.ensureValid(); return memoryLogRecords; diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/log/LogSegment.java b/fluss-server/src/main/java/com/alibaba/fluss/server/log/LogSegment.java index 659073a0..76edac43 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/log/LogSegment.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/log/LogSegment.java @@ -45,7 +45,7 @@ import java.io.IOException; import java.util.Optional; -import static com.alibaba.fluss.record.DefaultLogRecordBatch.RECORD_BATCH_HEADER_SIZE; +import static com.alibaba.fluss.record.MemoryLogRecordBatch.RECORD_BATCH_HEADER_SIZE; import static com.alibaba.fluss.utils.IOUtils.closeQuietly; /* This file is based on source code of Apache Kafka Project (https://kafka.apache.org/), licensed by the Apache diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/log/LogTablet.java b/fluss-server/src/main/java/com/alibaba/fluss/server/log/LogTablet.java index 4e40a6e1..856a02c5 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/log/LogTablet.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/log/LogTablet.java @@ -31,7 +31,7 @@ import com.alibaba.fluss.metrics.MeterView; import com.alibaba.fluss.metrics.MetricNames; import com.alibaba.fluss.metrics.groups.MetricGroup; -import com.alibaba.fluss.record.DefaultLogRecordBatch; +import com.alibaba.fluss.record.MemoryLogRecordBatch; import com.alibaba.fluss.record.FileLogProjection; import com.alibaba.fluss.record.FileLogRecords; import com.alibaba.fluss.record.LogRecordBatch; @@ -667,10 +667,10 @@ private AssignResult assignOffsetAndTimestamp( MemoryLogRecords records, long baseLogOffset, long commitTimestamp) { long initialOffset = baseLogOffset; for (LogRecordBatch batch : records.batches()) { - if (batch instanceof DefaultLogRecordBatch) { - DefaultLogRecordBatch defaultLogRecordBatch = (DefaultLogRecordBatch) batch; - defaultLogRecordBatch.setBaseLogOffset(initialOffset); - defaultLogRecordBatch.setCommitTimestamp(commitTimestamp); + if (batch instanceof MemoryLogRecordBatch) { + MemoryLogRecordBatch memoryLogRecordBatch = (MemoryLogRecordBatch) batch; + memoryLogRecordBatch.setBaseLogOffset(initialOffset); + memoryLogRecordBatch.setCommitTimestamp(commitTimestamp); } else { throw new FlussRuntimeException( "Currently, we only support DefaultLogRecordBatch.");