From c41806a8b72bcc3af4dcd64ec71e20b74248868e Mon Sep 17 00:00:00 2001 From: wudi <> Date: Thu, 30 Nov 2023 14:00:40 +0800 Subject: [PATCH] update --- .../doris/flink/sink/writer/EventType.java | 23 +++++++++++++++++++ .../JsonDebeziumSchemaSerializer.java | 10 +------- 2 files changed, 24 insertions(+), 9 deletions(-) create mode 100644 flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/EventType.java diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/EventType.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/EventType.java new file mode 100644 index 000000000..d26bf278d --- /dev/null +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/EventType.java @@ -0,0 +1,23 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.sink.writer; + +public enum EventType { + ALTER, + CREATE +} diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java index 97e4aebd3..81ef64c2f 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java @@ -33,6 +33,7 @@ import org.apache.doris.flink.sink.schema.SchemaChangeHelper; import org.apache.doris.flink.sink.schema.SchemaChangeHelper.DDLSchema; import org.apache.doris.flink.sink.schema.SchemaChangeManager; +import org.apache.doris.flink.sink.writer.EventType; import org.apache.doris.flink.tools.cdc.SourceConnector; import org.apache.doris.flink.tools.cdc.SourceSchema; import org.apache.doris.flink.tools.cdc.mysql.MysqlType; @@ -279,9 +280,6 @@ public List extractDDLList(JsonNode record) throws IOException{ if (Objects.isNull(tableChange) || Objects.isNull(ddl)) { return null; } - if(!EventType.ALTER.equals(extractEventType(record))){ - return null; - } JsonNode columns = tableChange.get("table").get("columns"); if (firstSchemaChange) { @@ -714,10 +712,4 @@ private String handleType(String type) { return type; } - - enum EventType{ - ALTER, - CREATE - } - } \ No newline at end of file