From 7419b06856596328248d4ea1c3a2f30d5183f6ee Mon Sep 17 00:00:00 2001 From: zengbao Date: Thu, 5 Sep 2024 14:31:18 +0800 Subject: [PATCH] feat: close catalog --- .../cdc/StpCdcDynamicTableParsingProcessFunction.java | 8 ++++++++ .../StpCdcRecordStoreDynamicBucketMultiWriteOperator.java | 4 ++++ .../StpCdcRecordStoreUnawareBucketMultiWriteOperator.java | 4 ++++ 3 files changed, 16 insertions(+) diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/StpCdcDynamicTableParsingProcessFunction.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/StpCdcDynamicTableParsingProcessFunction.java index 1b1bdf4e9953..803a1da845a8 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/StpCdcDynamicTableParsingProcessFunction.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/StpCdcDynamicTableParsingProcessFunction.java @@ -154,4 +154,12 @@ record -> private CdcMultiplexRecord wrapRecord(String databaseName, String tableName, CdcRecord record) { return CdcMultiplexRecord.fromCdcRecord(databaseName, tableName, record); } + + @Override + public void close() throws Exception { + if (catalog != null) { + catalog.close(); + catalog = null; + } + } } diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/StpCdcRecordStoreDynamicBucketMultiWriteOperator.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/StpCdcRecordStoreDynamicBucketMultiWriteOperator.java index 838964e9addd..dd75af292ddc 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/StpCdcRecordStoreDynamicBucketMultiWriteOperator.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/StpCdcRecordStoreDynamicBucketMultiWriteOperator.java @@ -198,6 +198,10 @@ public void close() throws Exception { if (compactExecutor != null) { compactExecutor.shutdownNow(); } + if (catalog != null) { + catalog.close(); + catalog = null; + } } @Override diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/StpCdcRecordStoreUnawareBucketMultiWriteOperator.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/StpCdcRecordStoreUnawareBucketMultiWriteOperator.java index b00012c9ca1b..79cbe7f5d5ef 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/StpCdcRecordStoreUnawareBucketMultiWriteOperator.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/StpCdcRecordStoreUnawareBucketMultiWriteOperator.java @@ -199,6 +199,10 @@ public void close() throws Exception { if (compactExecutor != null) { compactExecutor.shutdownNow(); } + if (catalog != null) { + catalog.close(); + catalog = null; + } } @Override