From 3fc0cc66a83dfdaaac5fe986c74f356722ca413c Mon Sep 17 00:00:00 2001 From: luoyuxia Date: Thu, 19 Dec 2024 15:54:26 +0800 Subject: [PATCH 1/2] debug --- .../java/com/alibaba/fluss/server/kv/KvTablet.java | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/KvTablet.java b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/KvTablet.java index 7dc740ce..9a636202 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/KvTablet.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/KvTablet.java @@ -329,7 +329,16 @@ public LogAppendInfo putAsLeader( // should not append. if (appendedRecordCount > 0) { // now, we can build the full log. - return logTablet.appendAsLeader(walBuilder.build()); + LogAppendInfo logAppendInfo = + logTablet.appendAsLeader(walBuilder.build()); + long logEndOffset = logAppendInfo.lastOffset(); + if (logEndOffset != logOffset) { + LOG.warn( + "The log end offset {} is not equal to the expected log offset {}.", + logEndOffset, + logOffset); + } + return logAppendInfo; } else { return new LogAppendInfo( logEndOffsetOfPrevBatch - 1, @@ -341,6 +350,7 @@ public LogAppendInfo putAsLeader( false); } } catch (Throwable t) { + LOG.warn("logEndOffsetOfPrevBatch: {}", logEndOffsetOfPrevBatch); // While encounter error here, the CDC logs may fail writing to disk, // and the client probably will resend the batch. If we do not remove the // values generated by the erroneous batch from the kvPreWriteBuffer, the From 5efa1b9ffde9a0153c695f73b1c279bb2700c870 Mon Sep 17 00:00:00 2001 From: luoyuxia Date: Fri, 20 Dec 2024 15:15:45 +0800 Subject: [PATCH 2/2] [server/kv] Fix out of order exception after delete a not exist row --- .../fluss/client/table/FlussTableITCase.java | 33 +++++++++++++++++++ .../com/alibaba/fluss/server/kv/KvTablet.java | 32 +----------------- .../alibaba/fluss/server/log/LogTablet.java | 20 ++++++----- 3 files changed, 46 insertions(+), 39 deletions(-) diff --git a/fluss-client/src/test/java/com/alibaba/fluss/client/table/FlussTableITCase.java b/fluss-client/src/test/java/com/alibaba/fluss/client/table/FlussTableITCase.java index 9d26a9b3..4811b1b5 100644 --- a/fluss-client/src/test/java/com/alibaba/fluss/client/table/FlussTableITCase.java +++ b/fluss-client/src/test/java/com/alibaba/fluss/client/table/FlussTableITCase.java @@ -215,6 +215,39 @@ void testPutAndLookup() throws Exception { verifyPutAndLookup(table2, schema, new Object[] {"a", 1}); } + @Test + void testDeleteNotExistRow() throws Exception { + TablePath tablePath = TablePath.of("test_db_1", "test_delete_not_exist_row"); + TableDescriptor tableDescriptor = DATA1_TABLE_INFO_PK.getTableDescriptor(); + Schema schema = tableDescriptor.getSchema(); + createTable(tablePath, tableDescriptor, false); + + try (Table table = conn.getTable(tablePath)) { + UpsertWriter upsertWriter = table.getUpsertWriter(); + // delete a non-exist row + InternalRow row1 = + compactedRow(tableDescriptor.getSchema().toRowType(), new Object[] {1, "2"}); + upsertWriter.delete(row1).get(); + // then insert the row + upsertWriter.upsert(row1).get(); + + // delete a non-exist row again + InternalRow row2 = + compactedRow(tableDescriptor.getSchema().toRowType(), new Object[] {2, "2"}); + upsertWriter.delete(row2).get(); + upsertWriter.upsert(row2).get(); + + // look up the rows + RowType rowType = DATA1_SCHEMA_PK.toRowType(); + assertThatRow(table.lookup(keyRow(schema, new Object[] {1, "2"})).get().getRow()) + .withSchema(rowType) + .isEqualTo(row1); + assertThatRow(table.lookup(keyRow(schema, new Object[] {2, "2"})).get().getRow()) + .withSchema(rowType) + .isEqualTo(row2); + } + } + @Test void testLookupForNotReadyTable() throws Exception { TablePath tablePath = TablePath.of("test_db_1", "test_lookup_unready_table_t1"); diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/KvTablet.java b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/KvTablet.java index 9a636202..15c52060 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/KvTablet.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/KvTablet.java @@ -260,7 +260,6 @@ public LogAppendInfo putAsLeader( ValueDecoder valueDecoder = new ValueDecoder(readContext.getRowDecoder(schemaId)); - int appendedRecordCount = 0; for (KvRecord kvRecord : kvRecords.records(readContext)) { byte[] keyBytes = BytesUtils.toArray(kvRecord.getKey()); KvPreWriteBuffer.Key key = KvPreWriteBuffer.Key.of(keyBytes); @@ -278,13 +277,11 @@ public LogAppendInfo putAsLeader( // if newRow is null, it means the row should be deleted if (newRow == null) { walBuilder.append(RowKind.DELETE, oldRow); - appendedRecordCount += 1; kvPreWriteBuffer.delete(key, logOffset++); } else { // otherwise, it's a partial update, should produce -U,+U walBuilder.append(RowKind.UPDATE_BEFORE, oldRow); walBuilder.append(RowKind.UPDATE_AFTER, newRow); - appendedRecordCount += 2; kvPreWriteBuffer.put( key, ValueEncoder.encodeValue(schemaId, newRow), @@ -302,7 +299,6 @@ public LogAppendInfo putAsLeader( updateRow(oldRow, kvRecord.getRow(), partialUpdater); walBuilder.append(RowKind.UPDATE_BEFORE, oldRow); walBuilder.append(RowKind.UPDATE_AFTER, newRow); - appendedRecordCount += 2; // logOffset is for -U, logOffset + 1 is for +U, we need to use // the log offset for +U kvPreWriteBuffer.put( @@ -316,7 +312,6 @@ public LogAppendInfo putAsLeader( // of the input row are set to null. BinaryRow newRow = kvRecord.getRow(); walBuilder.append(RowKind.INSERT, newRow); - appendedRecordCount += 1; kvPreWriteBuffer.put( key, ValueEncoder.encodeValue(schemaId, newRow), @@ -324,33 +319,8 @@ public LogAppendInfo putAsLeader( } } } - - // if appendedRecordCount is 0, it means there is no record to append, we - // should not append. - if (appendedRecordCount > 0) { - // now, we can build the full log. - LogAppendInfo logAppendInfo = - logTablet.appendAsLeader(walBuilder.build()); - long logEndOffset = logAppendInfo.lastOffset(); - if (logEndOffset != logOffset) { - LOG.warn( - "The log end offset {} is not equal to the expected log offset {}.", - logEndOffset, - logOffset); - } - return logAppendInfo; - } else { - return new LogAppendInfo( - logEndOffsetOfPrevBatch - 1, - logEndOffsetOfPrevBatch - 1, - 0L, - 0L, - 0, - 0, - false); - } + return logTablet.appendAsLeader(walBuilder.build()); } catch (Throwable t) { - LOG.warn("logEndOffsetOfPrevBatch: {}", logEndOffsetOfPrevBatch); // While encounter error here, the CDC logs may fail writing to disk, // and the client probably will resend the batch. If we do not remove the // values generated by the erroneous batch from the kvPreWriteBuffer, the diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/log/LogTablet.java b/fluss-server/src/main/java/com/alibaba/fluss/server/log/LogTablet.java index 4e40a6e1..4ab4913e 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/log/LogTablet.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/log/LogTablet.java @@ -619,14 +619,18 @@ private LogAppendInfo append(MemoryLogRecords records, boolean needAssignOffsetA appendInfo.setMaxTimestamp(duplicatedBatch.timestamp); appendInfo.setStartOffsetOfMaxTimestamp(startOffset); } else { - // Append the records, and increment the local log end offset immediately after - // append because write to the transaction index below may fail, and we want to - // ensure that the offsets of future appends still grow monotonically. - localLog.append( - appendInfo.lastOffset(), - appendInfo.maxTimestamp(), - appendInfo.startOffsetOfMaxTimestamp(), - validRecords); + // if there are records to append + if (appendInfo.lastOffset() >= appendInfo.firstOffset()) { + // Append the records, and increment the local log end offset immediately after + // append because write to the transaction index below may fail, and we want to + // ensure that the offsets of future appends still grow monotonically. + localLog.append( + appendInfo.lastOffset(), + appendInfo.maxTimestamp(), + appendInfo.startOffsetOfMaxTimestamp(), + validRecords); + } + updateHighWatermarkWithLogEndOffset(); // update the writer state.