Skip to content

Commit

Permalink
rename DefaultLogRecordBatch to MemoryLogRecordBatch
Browse files Browse the repository at this point in the history
  • Loading branch information
JNSimba committed Dec 20, 2024
1 parent 547da03 commit 1cd762f
Show file tree
Hide file tree
Showing 17 changed files with 53 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import com.alibaba.fluss.record.DefaultKvRecord;
import com.alibaba.fluss.record.DefaultKvRecordBatch;
import com.alibaba.fluss.record.DefaultLogRecord;
import com.alibaba.fluss.record.DefaultLogRecordBatch;
import com.alibaba.fluss.record.MemoryLogRecordBatch;
import com.alibaba.fluss.row.InternalRow;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -79,7 +79,7 @@ public WriteRecord(
// TODO: row maybe not IndexedRow, which can't be estimated size
// and the size maybe not accurate when the format is arrow.
: DefaultLogRecord.sizeOf(row)
+ DefaultLogRecordBatch.RECORD_BATCH_HEADER_SIZE;
+ MemoryLogRecordBatch.RECORD_BATCH_HEADER_SIZE;
}

public PhysicalTablePath getPhysicalTablePath() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import com.alibaba.fluss.memory.MemorySegmentOutputView;
import com.alibaba.fluss.metadata.TableBucket;
import com.alibaba.fluss.record.DefaultLogRecord;
import com.alibaba.fluss.record.DefaultLogRecordBatch;
import com.alibaba.fluss.record.MemoryLogRecordBatch;
import com.alibaba.fluss.record.LogRecord;
import com.alibaba.fluss.record.LogRecordReadContext;
import com.alibaba.fluss.record.MemoryLogRecordsIndexedBuilder;
Expand Down Expand Up @@ -65,7 +65,7 @@ void testTryAppendWithWriteLimit() throws Exception {

for (int i = 0;
i
< (writeLimit - DefaultLogRecordBatch.RECORD_BATCH_HEADER_SIZE)
< (writeLimit - MemoryLogRecordBatch.RECORD_BATCH_HEADER_SIZE)
/ estimatedSizeInBytes;
i++) {
boolean appendResult =
Expand All @@ -89,7 +89,7 @@ void testToBytes() throws Exception {
logProducerBatch.close();
logProducerBatch.serialize();
BytesView bytesView = logProducerBatch.build();
DefaultLogRecordBatch recordBatch = new DefaultLogRecordBatch();
MemoryLogRecordBatch recordBatch = new MemoryLogRecordBatch();
MemorySegmentBytesView firstBytesView = (MemorySegmentBytesView) bytesView;
recordBatch.pointTo(firstBytesView.getMemorySegment(), firstBytesView.getPosition());
assertDefaultLogRecordBatchEquals(recordBatch);
Expand Down Expand Up @@ -164,7 +164,7 @@ private IndexedLogWriteBatch createLogWriteBatch(
new MemorySegmentOutputView(memorySegment)));
}

private void assertDefaultLogRecordBatchEquals(DefaultLogRecordBatch recordBatch) {
private void assertDefaultLogRecordBatchEquals(MemoryLogRecordBatch recordBatch) {
assertThat(recordBatch.getRecordCount()).isEqualTo(1);
assertThat(recordBatch.baseLogOffset()).isEqualTo(0L);
assertThat(recordBatch.schemaId()).isEqualTo((short) DATA1_TABLE_INFO.getSchemaId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@
import java.util.Map;
import java.util.stream.Collectors;

import static com.alibaba.fluss.record.DefaultLogRecordBatch.RECORD_BATCH_HEADER_SIZE;
import static com.alibaba.fluss.record.MemoryLogRecordBatch.RECORD_BATCH_HEADER_SIZE;
import static com.alibaba.fluss.record.TestData.DATA1_PHYSICAL_TABLE_PATH;
import static com.alibaba.fluss.record.TestData.DATA1_ROW_TYPE;
import static com.alibaba.fluss.record.TestData.DATA1_SCHEMA;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@

import java.io.IOException;

import static com.alibaba.fluss.record.DefaultLogRecordBatch.LENGTH_LENGTH;
import static com.alibaba.fluss.record.MemoryLogRecordBatch.LENGTH_LENGTH;

/* This file is based on source code of Apache Kafka Project (https://kafka.apache.org/), licensed by the Apache
* Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@
import java.nio.channels.FileChannel;
import java.util.Objects;

import static com.alibaba.fluss.record.DefaultLogRecordBatch.BASE_OFFSET_OFFSET;
import static com.alibaba.fluss.record.DefaultLogRecordBatch.LENGTH_OFFSET;
import static com.alibaba.fluss.record.DefaultLogRecordBatch.LOG_OVERHEAD;
import static com.alibaba.fluss.record.DefaultLogRecordBatch.MAGIC_LENGTH;
import static com.alibaba.fluss.record.DefaultLogRecordBatch.MAGIC_OFFSET;
import static com.alibaba.fluss.record.DefaultLogRecordBatch.RECORD_BATCH_HEADER_SIZE;
import static com.alibaba.fluss.record.MemoryLogRecordBatch.BASE_OFFSET_OFFSET;
import static com.alibaba.fluss.record.MemoryLogRecordBatch.LENGTH_OFFSET;
import static com.alibaba.fluss.record.MemoryLogRecordBatch.LOG_OVERHEAD;
import static com.alibaba.fluss.record.MemoryLogRecordBatch.MAGIC_LENGTH;
import static com.alibaba.fluss.record.MemoryLogRecordBatch.MAGIC_OFFSET;
import static com.alibaba.fluss.record.MemoryLogRecordBatch.RECORD_BATCH_HEADER_SIZE;

/* This file is based on source code of Apache Kafka Project (https://kafka.apache.org/), licensed by the Apache
* Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for
Expand Down Expand Up @@ -182,7 +182,7 @@ public int sizeInBytes() {
}

private LogRecordBatch toMemoryRecordBatch(ByteBuffer buffer) {
DefaultLogRecordBatch records = new DefaultLogRecordBatch();
MemoryLogRecordBatch records = new MemoryLogRecordBatch();
records.pointTo(MemorySegment.wrap(buffer.array()), 0);
return records;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,11 @@
import java.util.List;
import java.util.Map;

import static com.alibaba.fluss.record.DefaultLogRecordBatch.ARROW_ROWKIND_OFFSET;
import static com.alibaba.fluss.record.DefaultLogRecordBatch.LENGTH_OFFSET;
import static com.alibaba.fluss.record.DefaultLogRecordBatch.LOG_OVERHEAD;
import static com.alibaba.fluss.record.DefaultLogRecordBatch.RECORDS_COUNT_OFFSET;
import static com.alibaba.fluss.record.DefaultLogRecordBatch.RECORD_BATCH_HEADER_SIZE;
import static com.alibaba.fluss.record.MemoryLogRecordBatch.ARROW_ROWKIND_OFFSET;
import static com.alibaba.fluss.record.MemoryLogRecordBatch.LENGTH_OFFSET;
import static com.alibaba.fluss.record.MemoryLogRecordBatch.LOG_OVERHEAD;
import static com.alibaba.fluss.record.MemoryLogRecordBatch.RECORDS_COUNT_OFFSET;
import static com.alibaba.fluss.record.MemoryLogRecordBatch.RECORD_BATCH_HEADER_SIZE;
import static com.alibaba.fluss.utils.FileUtils.readFullyOrFail;
import static com.alibaba.fluss.utils.Preconditions.checkNotNull;
import static com.alibaba.fluss.utils.Preconditions.checkState;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,8 @@
*
* @since 0.1
*/
// TODO rename to MemoryLogRecordBatch
@PublicEvolving
public class DefaultLogRecordBatch implements LogRecordBatch {
public class MemoryLogRecordBatch implements LogRecordBatch {
protected static final int BASE_OFFSET_LENGTH = 8;
public static final int LENGTH_LENGTH = 4;
static final int MAGIC_LENGTH = 1;
Expand Down Expand Up @@ -248,7 +247,7 @@ public boolean equals(Object o) {
return false;
}

DefaultLogRecordBatch that = (DefaultLogRecordBatch) o;
MemoryLogRecordBatch that = (MemoryLogRecordBatch) o;
int sizeInBytes = sizeInBytes();
return sizeInBytes == that.sizeInBytes()
&& segment.equalTo(that.segment, position, that.position, sizeInBytes);
Expand All @@ -262,7 +261,7 @@ public int hashCode() {
private CloseableIterator<LogRecord> rowRecordIterator(RowType rowType, long timestamp) {
DataType[] fieldTypes = rowType.getChildren().toArray(new DataType[0]);
return new LogRecordIterator() {
int position = DefaultLogRecordBatch.this.position + RECORD_BATCH_HEADER_SIZE;
int position = MemoryLogRecordBatch.this.position + RECORD_BATCH_HEADER_SIZE;
int rowId = 0;

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,12 @@ public void clear() {
}

public void ensureValid() {
if (sizeInBytes < DefaultLogRecordBatch.RECORD_BATCH_HEADER_SIZE) {
if (sizeInBytes < MemoryLogRecordBatch.RECORD_BATCH_HEADER_SIZE) {
throw new RuntimeException(
"Record batch is corrupt (the size "
+ sizeInBytes
+ " is smaller than the minimum allowed overhead "
+ DefaultLogRecordBatch.RECORD_BATCH_HEADER_SIZE
+ MemoryLogRecordBatch.RECORD_BATCH_HEADER_SIZE
+ ")");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;

import static com.alibaba.fluss.record.DefaultLogRecordBatch.ARROW_ROWKIND_OFFSET;
import static com.alibaba.fluss.record.DefaultLogRecordBatch.BASE_OFFSET_LENGTH;
import static com.alibaba.fluss.record.DefaultLogRecordBatch.CRC_OFFSET;
import static com.alibaba.fluss.record.DefaultLogRecordBatch.LENGTH_LENGTH;
import static com.alibaba.fluss.record.DefaultLogRecordBatch.SCHEMA_ID_OFFSET;
import static com.alibaba.fluss.record.DefaultLogRecordBatch.WRITE_CLIENT_ID_OFFSET;
import static com.alibaba.fluss.record.MemoryLogRecordBatch.ARROW_ROWKIND_OFFSET;
import static com.alibaba.fluss.record.MemoryLogRecordBatch.BASE_OFFSET_LENGTH;
import static com.alibaba.fluss.record.MemoryLogRecordBatch.CRC_OFFSET;
import static com.alibaba.fluss.record.MemoryLogRecordBatch.LENGTH_LENGTH;
import static com.alibaba.fluss.record.MemoryLogRecordBatch.SCHEMA_ID_OFFSET;
import static com.alibaba.fluss.record.MemoryLogRecordBatch.WRITE_CLIENT_ID_OFFSET;
import static com.alibaba.fluss.record.LogRecordBatch.CURRENT_LOG_MAGIC_VALUE;
import static com.alibaba.fluss.record.LogRecordBatch.NO_BATCH_SEQUENCE;
import static com.alibaba.fluss.record.LogRecordBatch.NO_WRITER_ID;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@

import java.io.IOException;

import static com.alibaba.fluss.record.DefaultLogRecordBatch.BASE_OFFSET_LENGTH;
import static com.alibaba.fluss.record.DefaultLogRecordBatch.CRC_OFFSET;
import static com.alibaba.fluss.record.DefaultLogRecordBatch.LENGTH_LENGTH;
import static com.alibaba.fluss.record.DefaultLogRecordBatch.RECORD_BATCH_HEADER_SIZE;
import static com.alibaba.fluss.record.DefaultLogRecordBatch.SCHEMA_ID_OFFSET;
import static com.alibaba.fluss.record.DefaultLogRecordBatch.WRITE_CLIENT_ID_OFFSET;
import static com.alibaba.fluss.record.MemoryLogRecordBatch.BASE_OFFSET_LENGTH;
import static com.alibaba.fluss.record.MemoryLogRecordBatch.CRC_OFFSET;
import static com.alibaba.fluss.record.MemoryLogRecordBatch.LENGTH_LENGTH;
import static com.alibaba.fluss.record.MemoryLogRecordBatch.RECORD_BATCH_HEADER_SIZE;
import static com.alibaba.fluss.record.MemoryLogRecordBatch.SCHEMA_ID_OFFSET;
import static com.alibaba.fluss.record.MemoryLogRecordBatch.WRITE_CLIENT_ID_OFFSET;
import static com.alibaba.fluss.record.LogRecordBatch.CURRENT_LOG_MAGIC_VALUE;
import static com.alibaba.fluss.record.LogRecordBatch.NO_BATCH_SEQUENCE;
import static com.alibaba.fluss.record.LogRecordBatch.NO_WRITER_ID;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@

import com.alibaba.fluss.memory.MemorySegment;

import static com.alibaba.fluss.record.DefaultLogRecordBatch.LENGTH_OFFSET;
import static com.alibaba.fluss.record.DefaultLogRecordBatch.LOG_OVERHEAD;
import static com.alibaba.fluss.record.MemoryLogRecordBatch.LENGTH_OFFSET;
import static com.alibaba.fluss.record.MemoryLogRecordBatch.LOG_OVERHEAD;

/**
* A byte buffer backed log input stream. This class avoids the need to copy records by returning
Expand All @@ -43,7 +43,7 @@ public LogRecordBatch nextBatch() {
return null;
}

DefaultLogRecordBatch logRecords = new DefaultLogRecordBatch();
MemoryLogRecordBatch logRecords = new MemoryLogRecordBatch();
logRecords.pointTo(memorySegment, currentPosition);

currentPosition += batchSize;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@

import static org.assertj.core.api.Assertions.assertThat;

/** Test for {@link DefaultLogRecordBatch}. */
public class DefaultLogRecordBatchTest extends LogTestBase {
/** Test for {@link MemoryLogRecordBatch}. */
public class MemoryLogRecordBatchTest extends LogTestBase {

@Test
void testRecordBatchSize() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

import java.util.Iterator;

import static com.alibaba.fluss.record.DefaultLogRecordBatch.LOG_OVERHEAD;
import static com.alibaba.fluss.record.MemoryLogRecordBatch.LOG_OVERHEAD;
import static com.alibaba.fluss.record.TestData.DATA1;
import static org.assertj.core.api.Assertions.assertThat;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
import java.util.Arrays;
import java.util.List;

import static com.alibaba.fluss.record.DefaultLogRecordBatch.ARROW_ROWKIND_OFFSET;
import static com.alibaba.fluss.record.MemoryLogRecordBatch.ARROW_ROWKIND_OFFSET;
import static com.alibaba.fluss.record.TestData.DATA1;
import static com.alibaba.fluss.record.TestData.DATA1_ROW_TYPE;
import static com.alibaba.fluss.testutils.DataTestUtils.row;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import com.alibaba.fluss.metadata.PhysicalTablePath;
import com.alibaba.fluss.metadata.Schema;
import com.alibaba.fluss.metadata.TableBucket;
import com.alibaba.fluss.record.DefaultLogRecordBatch;
import com.alibaba.fluss.record.MemoryLogRecordBatch;
import com.alibaba.fluss.record.FileLogRecords;
import com.alibaba.fluss.record.KvRecord;
import com.alibaba.fluss.record.KvRecordBatch;
Expand Down Expand Up @@ -363,7 +363,7 @@ private static MemoryLogRecords createIndexedMemoryLogRecords(
MemoryLogRecords memoryLogRecords = builder.build();
memoryLogRecords.ensureValid();

((DefaultLogRecordBatch) memoryLogRecords.batches().iterator().next())
((MemoryLogRecordBatch) memoryLogRecords.batches().iterator().next())
.setCommitTimestamp(maxTimestamp);
return memoryLogRecords;
}
Expand Down Expand Up @@ -398,7 +398,7 @@ private static MemoryLogRecords createArrowMemoryLogRecords(
MemoryLogRecords memoryLogRecords =
MemoryLogRecords.pointToByteBuffer(bytesView.getByteBuf().nioBuffer());

((DefaultLogRecordBatch) memoryLogRecords.batches().iterator().next())
((MemoryLogRecordBatch) memoryLogRecords.batches().iterator().next())
.setCommitTimestamp(maxTimestamp);
memoryLogRecords.ensureValid();
return memoryLogRecords;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
import java.io.IOException;
import java.util.Optional;

import static com.alibaba.fluss.record.DefaultLogRecordBatch.RECORD_BATCH_HEADER_SIZE;
import static com.alibaba.fluss.record.MemoryLogRecordBatch.RECORD_BATCH_HEADER_SIZE;
import static com.alibaba.fluss.utils.IOUtils.closeQuietly;

/* This file is based on source code of Apache Kafka Project (https://kafka.apache.org/), licensed by the Apache
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import com.alibaba.fluss.metrics.MeterView;
import com.alibaba.fluss.metrics.MetricNames;
import com.alibaba.fluss.metrics.groups.MetricGroup;
import com.alibaba.fluss.record.DefaultLogRecordBatch;
import com.alibaba.fluss.record.MemoryLogRecordBatch;
import com.alibaba.fluss.record.FileLogProjection;
import com.alibaba.fluss.record.FileLogRecords;
import com.alibaba.fluss.record.LogRecordBatch;
Expand Down Expand Up @@ -667,10 +667,10 @@ private AssignResult assignOffsetAndTimestamp(
MemoryLogRecords records, long baseLogOffset, long commitTimestamp) {
long initialOffset = baseLogOffset;
for (LogRecordBatch batch : records.batches()) {
if (batch instanceof DefaultLogRecordBatch) {
DefaultLogRecordBatch defaultLogRecordBatch = (DefaultLogRecordBatch) batch;
defaultLogRecordBatch.setBaseLogOffset(initialOffset);
defaultLogRecordBatch.setCommitTimestamp(commitTimestamp);
if (batch instanceof MemoryLogRecordBatch) {
MemoryLogRecordBatch memoryLogRecordBatch = (MemoryLogRecordBatch) batch;
memoryLogRecordBatch.setBaseLogOffset(initialOffset);
memoryLogRecordBatch.setCommitTimestamp(commitTimestamp);
} else {
throw new FlussRuntimeException(
"Currently, we only support DefaultLogRecordBatch.");
Expand Down

0 comments on commit 1cd762f

Please sign in to comment.