Skip to content

Commit

Permalink
[core] fix serialize problem when using jdbc catalog with lock enbale…
Browse files Browse the repository at this point in the history
… in flink.
  • Loading branch information
wenchao.wu committed Jun 19, 2024
1 parent f2c8728 commit 73584e1
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ public Optional<CatalogLockFactory> defaultLockFactory() {

@Override
public Optional<CatalogLockContext> lockContext() {
return Optional.of(new JdbcCatalogLockContext(connections, catalogKey, options));
return Optional.of(new JdbcCatalogLockContext(catalogKey, options));
}

private Lock lock(Identifier identifier) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,17 @@
package org.apache.paimon.jdbc;

import org.apache.paimon.catalog.CatalogLockContext;
import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.options.Options;

/** Jdbc lock context. */
public class JdbcCatalogLockContext implements CatalogLockContext {

private final JdbcClientPool connections;
private transient JdbcClientPool connections;
private final String catalogKey;
private final Options options;

public JdbcCatalogLockContext(JdbcClientPool connections, String catalogKey, Options options) {
this.connections = connections;
public JdbcCatalogLockContext(String catalogKey, Options options) {
this.catalogKey = catalogKey;
this.options = options;
}
Expand All @@ -40,6 +40,13 @@ public Options options() {
}

public JdbcClientPool connections() {
if (connections == null) {
connections =
new JdbcClientPool(
options.get(CatalogOptions.CLIENT_POOL_SIZE),
options.get(CatalogOptions.URI.key()),
options.toMap());
}
return connections;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,15 @@
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.Table;

import org.apache.paimon.shade.guava30.com.google.common.collect.Maps;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.io.ByteArrayOutputStream;
import java.io.ObjectOutputStream;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
Expand All @@ -36,6 +39,7 @@

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;

/** Tests for {@link JdbcCatalog}. */
public class JdbcCatalogTest extends CatalogTestBase {
Expand Down Expand Up @@ -112,4 +116,19 @@ public void testCheckIdentifierUpperCase() throws Exception {
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Table name [NEW_TABLE] cannot contain upper case in the catalog.");
}

@Test
public void testSerializeTable() throws Exception {
catalog.createDatabase("test_db", false);
catalog.createTable(Identifier.create("test_db", "table"), DEFAULT_TABLE_SCHEMA, false);
Table table = catalog.getTable(new Identifier("test_db", "table"));
assertDoesNotThrow(
() -> {
try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(baos)) {
oos.writeObject(table);
oos.flush();
}
});
}
}

0 comments on commit 73584e1

Please sign in to comment.