diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/BinlogTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/BinlogTable.java index b17d61d44e77..08eea468ea70 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/BinlogTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/BinlogTable.java @@ -72,13 +72,8 @@ public RowType rowType() { List fields = new ArrayList<>(); fields.add(SpecialFields.ROW_KIND); for (DataField field : wrapped.rowType().getFields()) { - DataField newField = - new DataField( - field.id(), - field.name(), - new ArrayType(field.type().nullable()), // convert to nullable - field.description()); - fields.add(newField); + // convert to nullable + fields.add(field.newType(new ArrayType(field.type().nullable()))); } return new RowType(fields); } @@ -99,6 +94,19 @@ private BinlogRead(InnerTableRead dataRead) { super(dataRead); } + @Override + public InnerTableRead withReadType(RowType readType) { + List fields = new ArrayList<>(); + for (DataField field : readType.getFields()) { + if (field.name().equals(SpecialFields.ROW_KIND.name())) { + fields.add(field); + } else { + fields.add(field.newType(((ArrayType) field.type()).getElementType())); + } + } + return super.withReadType(readType.copy(fields)); + } + @Override public RecordReader createReader(Split split) throws IOException { DataSplit dataSplit = (DataSplit) split; diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonSystemTableTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonSystemTableTest.scala index 64baf6232fd8..7baa57a54d90 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonSystemTableTest.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonSystemTableTest.scala @@ -81,4 +81,20 @@ class PaimonSystemTableTest extends PaimonSparkTestBase { spark.sql("select partition,bucket from `T$buckets`"), Row("[2024-10-10, 01]", 0) :: Row("[2024-10-10, 01]", 1) :: Row("[2024-10-10, 01]", 2) :: Nil) } + + test("system table: binlog table") { + sql(""" + |CREATE TABLE T (a INT, b INT) + |TBLPROPERTIES ('primary-key'='a', 'changelog-producer' = 'lookup', 'bucket' = '2') + |""".stripMargin) + + sql("INSERT INTO T VALUES (1, 2)") + sql("INSERT INTO T VALUES (1, 3)") + sql("INSERT INTO T VALUES (2, 2)") + + checkAnswer( + sql("SELECT * FROM `T$binlog`"), + Seq(Row("+I", Array(1), Array(3)), Row("+I", Array(2), Array(2))) + ) + } }