From 73584e116c3df21018ddd565b313fcbfdecb6c13 Mon Sep 17 00:00:00 2001 From: "wenchao.wu" Date: Tue, 18 Jun 2024 20:32:42 +0800 Subject: [PATCH] [core] fix serialize problem when using jdbc catalog with lock enbale in flink. --- .../org/apache/paimon/jdbc/JdbcCatalog.java | 2 +- .../paimon/jdbc/JdbcCatalogLockContext.java | 13 ++++++++++--- .../apache/paimon/jdbc/JdbcCatalogTest.java | 19 +++++++++++++++++++ 3 files changed, 30 insertions(+), 4 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java index 7e7718b5bee9..45600715b44d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java @@ -350,7 +350,7 @@ public Optional defaultLockFactory() { @Override public Optional lockContext() { - return Optional.of(new JdbcCatalogLockContext(connections, catalogKey, options)); + return Optional.of(new JdbcCatalogLockContext(catalogKey, options)); } private Lock lock(Identifier identifier) { diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogLockContext.java b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogLockContext.java index e56b3474ccb3..b109f271d1a8 100644 --- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogLockContext.java +++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogLockContext.java @@ -19,17 +19,17 @@ package org.apache.paimon.jdbc; import org.apache.paimon.catalog.CatalogLockContext; +import org.apache.paimon.options.CatalogOptions; import org.apache.paimon.options.Options; /** Jdbc lock context. */ public class JdbcCatalogLockContext implements CatalogLockContext { - private final JdbcClientPool connections; + private transient JdbcClientPool connections; private final String catalogKey; private final Options options; - public JdbcCatalogLockContext(JdbcClientPool connections, String catalogKey, Options options) { - this.connections = connections; + public JdbcCatalogLockContext(String catalogKey, Options options) { this.catalogKey = catalogKey; this.options = options; } @@ -40,6 +40,13 @@ public Options options() { } public JdbcClientPool connections() { + if (connections == null) { + connections = + new JdbcClientPool( + options.get(CatalogOptions.CLIENT_POOL_SIZE), + options.get(CatalogOptions.URI.key()), + options.toMap()); + } return connections; } diff --git a/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java b/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java index cc1febeab023..f0c84eb2c4d8 100644 --- a/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java @@ -22,12 +22,15 @@ import org.apache.paimon.catalog.Identifier; import org.apache.paimon.options.CatalogOptions; import org.apache.paimon.options.Options; +import org.apache.paimon.table.Table; import org.apache.paimon.shade.guava30.com.google.common.collect.Maps; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import java.io.ByteArrayOutputStream; +import java.io.ObjectOutputStream; import java.sql.SQLException; import java.util.ArrayList; import java.util.List; @@ -36,6 +39,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; /** Tests for {@link JdbcCatalog}. */ public class JdbcCatalogTest extends CatalogTestBase { @@ -112,4 +116,19 @@ public void testCheckIdentifierUpperCase() throws Exception { .isInstanceOf(IllegalArgumentException.class) .hasMessage("Table name [NEW_TABLE] cannot contain upper case in the catalog."); } + + @Test + public void testSerializeTable() throws Exception { + catalog.createDatabase("test_db", false); + catalog.createTable(Identifier.create("test_db", "table"), DEFAULT_TABLE_SCHEMA, false); + Table table = catalog.getTable(new Identifier("test_db", "table")); + assertDoesNotThrow( + () -> { + try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); + ObjectOutputStream oos = new ObjectOutputStream(baos)) { + oos.writeObject(table); + oos.flush(); + } + }); + } }