diff --git a/dinky-core/src/main/java/org/dinky/trans/pipeline/DinkyFlinkPipelineComposer.java b/dinky-core/src/main/java/org/dinky/trans/pipeline/DinkyFlinkPipelineComposer.java deleted file mode 100644 index de18794e17..0000000000 --- a/dinky-core/src/main/java/org/dinky/trans/pipeline/DinkyFlinkPipelineComposer.java +++ /dev/null @@ -1,165 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.dinky.trans.pipeline; - -import org.dinky.executor.Executor; - -import org.apache.flink.cdc.common.configuration.Configuration; -import org.apache.flink.cdc.common.event.Event; -import org.apache.flink.cdc.common.factories.DataSinkFactory; -import org.apache.flink.cdc.common.factories.FactoryHelper; -import org.apache.flink.cdc.common.pipeline.PipelineOptions; -import org.apache.flink.cdc.common.sink.DataSink; -import org.apache.flink.cdc.composer.PipelineComposer; -import org.apache.flink.cdc.composer.PipelineExecution; -import org.apache.flink.cdc.composer.definition.PipelineDef; -import org.apache.flink.cdc.composer.definition.SinkDef; -import org.apache.flink.cdc.composer.flink.FlinkEnvironmentUtils; -import org.apache.flink.cdc.composer.flink.coordination.OperatorIDGenerator; -import org.apache.flink.cdc.composer.flink.translator.DataSinkTranslator; -import org.apache.flink.cdc.composer.flink.translator.DataSourceTranslator; -import org.apache.flink.cdc.composer.flink.translator.PartitioningTranslator; -import org.apache.flink.cdc.composer.flink.translator.SchemaOperatorTranslator; -import org.apache.flink.cdc.composer.flink.translator.TransformTranslator; -import org.apache.flink.cdc.composer.utils.FactoryDiscoveryUtils; -import org.apache.flink.cdc.runtime.serializer.event.EventSerializer; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; - -import java.net.URI; -import java.net.URL; -import java.nio.file.Files; -import java.nio.file.Paths; -import java.util.HashSet; -import java.util.Optional; -import java.util.Set; - -/** - * DinkyFlinkPipelineComposer - * - * @author wenmo - * @since 2023/12/22 0:16 - */ -public class DinkyFlinkPipelineComposer implements PipelineComposer { - - private final StreamExecutionEnvironment env; - - public static DinkyFlinkPipelineComposer of(Executor executor) { - - return new DinkyFlinkPipelineComposer(executor.getStreamExecutionEnvironment()); - } - - private DinkyFlinkPipelineComposer(StreamExecutionEnvironment env) { - this.env = env; - } - - public PipelineExecution compose(PipelineDef pipelineDef) { - int parallelism = pipelineDef.getConfig().get(PipelineOptions.PIPELINE_PARALLELISM); - env.getConfig().setParallelism(parallelism); - - // Build Source Operator - DataSourceTranslator sourceTranslator = new DataSourceTranslator(); - DataStream stream = sourceTranslator.translate(pipelineDef.getSource(), env, pipelineDef.getConfig()); - - // Build TransformSchemaOperator for processing Schema Event - TransformTranslator transformTranslator = new TransformTranslator(); - stream = transformTranslator.translateSchema(stream, pipelineDef.getTransforms()); - - // Schema operator - SchemaOperatorTranslator schemaOperatorTranslator = new SchemaOperatorTranslator( - pipelineDef.getConfig().get(PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR), - pipelineDef.getConfig().get(PipelineOptions.PIPELINE_SCHEMA_OPERATOR_UID), - pipelineDef.getConfig().get(PipelineOptions.PIPELINE_SCHEMA_OPERATOR_RPC_TIMEOUT)); - OperatorIDGenerator schemaOperatorIDGenerator = - new OperatorIDGenerator(schemaOperatorTranslator.getSchemaOperatorUid()); - - // Build TransformDataOperator for processing Data Event - stream = transformTranslator.translateData( - stream, - pipelineDef.getTransforms(), - schemaOperatorIDGenerator.generate(), - pipelineDef.getConfig().get(PipelineOptions.PIPELINE_LOCAL_TIME_ZONE)); - - // Build DataSink in advance as schema operator requires MetadataApplier - DataSink dataSink = createDataSink(pipelineDef.getSink(), pipelineDef.getConfig()); - - stream = schemaOperatorTranslator.translate( - stream, parallelism, dataSink.getMetadataApplier(), pipelineDef.getRoute()); - - // Build Partitioner used to shuffle Event - PartitioningTranslator partitioningTranslator = new PartitioningTranslator(); - stream = partitioningTranslator.translate( - stream, parallelism, parallelism, schemaOperatorIDGenerator.generate()); - - // Build Sink Operator - DataSinkTranslator sinkTranslator = new DataSinkTranslator(); - sinkTranslator.translate(pipelineDef.getSink(), stream, dataSink, schemaOperatorIDGenerator.generate()); - - // Add framework JARs - addFrameworkJars(); - - return new DinkyFlinkPipelineExecution(env, pipelineDef.getConfig().get(PipelineOptions.PIPELINE_NAME)); - } - - private DataSink createDataSink(SinkDef sinkDef, Configuration pipelineConfig) { - // Search the data sink factory - DataSinkFactory sinkFactory = - FactoryDiscoveryUtils.getFactoryByIdentifier(sinkDef.getType(), DataSinkFactory.class); - - // Include sink connector JAR - FactoryDiscoveryUtils.getJarPathByIdentifier(sinkDef.getType(), DataSinkFactory.class) - .ifPresent(jar -> FlinkEnvironmentUtils.addJar(env, jar)); - - // Create data sink - return sinkFactory.createDataSink(new FactoryHelper.DefaultContext( - sinkDef.getConfig(), pipelineConfig, Thread.currentThread().getContextClassLoader())); - } - - private void addFrameworkJars() { - try { - Set frameworkJars = new HashSet<>(); - // Common JAR - // We use the core interface (Event) to search the JAR - Optional commonJar = getContainingJar(Event.class); - if (commonJar.isPresent()) { - frameworkJars.add(commonJar.get().toURI()); - } - // Runtime JAR - // We use the serializer of the core interface (EventSerializer) to search the JAR - Optional runtimeJar = getContainingJar(EventSerializer.class); - if (runtimeJar.isPresent()) { - frameworkJars.add(runtimeJar.get().toURI()); - } - for (URI jar : frameworkJars) { - FlinkEnvironmentUtils.addJar(env, jar.toURL()); - } - } catch (Exception e) { - throw new RuntimeException("Failed to search and add Flink CDC framework JARs", e); - } - } - - private Optional getContainingJar(Class clazz) throws Exception { - URL container = clazz.getProtectionDomain().getCodeSource().getLocation(); - if (Files.isDirectory(Paths.get(container.toURI()))) { - return Optional.empty(); - } - return Optional.of(container); - } -} diff --git a/dinky-core/src/main/java/org/dinky/trans/pipeline/DinkyFlinkPipelineExecution.java b/dinky-core/src/main/java/org/dinky/trans/pipeline/DinkyFlinkPipelineExecution.java deleted file mode 100644 index a0e0a19b4d..0000000000 --- a/dinky-core/src/main/java/org/dinky/trans/pipeline/DinkyFlinkPipelineExecution.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.dinky.trans.pipeline; - -import org.apache.flink.cdc.composer.PipelineExecution; -import org.apache.flink.core.execution.JobClient; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; - -/** - * A pipeline execution that run the defined pipeline via Flink's {@link - * StreamExecutionEnvironment}. - */ -public class DinkyFlinkPipelineExecution implements PipelineExecution { - - private final StreamExecutionEnvironment env; - private final String jobName; - - public DinkyFlinkPipelineExecution(StreamExecutionEnvironment env, String jobName) { - this.env = env; - this.jobName = jobName; - } - - public StreamExecutionEnvironment getEnv() { - return env; - } - - public String getJobName() { - return jobName; - } - - @Override - public ExecutionInfo execute() throws Exception { - - JobClient jobClient = env.executeAsync(jobName); - jobClient.getJobExecutionResult().get(); - return new ExecutionInfo(jobClient.getJobID().toString(), jobName); - } -} diff --git a/dinky-core/src/main/java/org/dinky/trans/pipeline/FlinkCDCPipelineOperation.java b/dinky-core/src/main/java/org/dinky/trans/pipeline/FlinkCDCPipelineOperation.java index 84c7c601f9..57f3324594 100644 --- a/dinky-core/src/main/java/org/dinky/trans/pipeline/FlinkCDCPipelineOperation.java +++ b/dinky-core/src/main/java/org/dinky/trans/pipeline/FlinkCDCPipelineOperation.java @@ -23,12 +23,16 @@ import org.dinky.trans.AbstractOperation; import org.dinky.trans.Operation; +import org.apache.flink.cdc.cli.parser.YamlPipelineDefinitionParser; import org.apache.flink.cdc.common.configuration.Configuration; import org.apache.flink.cdc.composer.PipelineComposer; import org.apache.flink.cdc.composer.definition.PipelineDef; +import org.apache.flink.cdc.composer.flink.FlinkPipelineComposer; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.internal.TableResultImpl; +import java.lang.reflect.Constructor; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -86,11 +90,11 @@ public Operation create(String statement) { public TableResult execute(Executor executor) { String yamlText = getPipelineConfigure(statement); Configuration globalPipelineConfig = Configuration.fromMap(executor.getSetConfig()); - // Parse pipeline definition file - YamlTextPipelineDefinitionParser pipelineDefinitionParser = new YamlTextPipelineDefinitionParser(); - // Create composer - PipelineComposer composer = createComposer(executor); try { + // Parse pipeline definition file + YamlPipelineDefinitionParser pipelineDefinitionParser = new YamlPipelineDefinitionParser(); + // Create composer + PipelineComposer composer = createComposer(executor); PipelineDef pipelineDef = pipelineDefinitionParser.parse(yamlText, globalPipelineConfig); // Compose pipeline composer.compose(pipelineDef); @@ -110,8 +114,16 @@ public String getPipelineConfigure(String statement) { return ""; } - public DinkyFlinkPipelineComposer createComposer(Executor executor) { - - return DinkyFlinkPipelineComposer.of(executor); + public PipelineComposer createComposer(Executor executor) { + try { + Class clazz = (Class) + Class.forName("org.apache.flink.cdc.composer.flink.FlinkPipelineComposer"); + Constructor constructor = + clazz.getDeclaredConstructor(StreamExecutionEnvironment.class, boolean.class); + constructor.setAccessible(true); + return constructor.newInstance(executor.getStreamExecutionEnvironment(), false); + } catch (Exception e) { + throw new RuntimeException(e); + } } } diff --git a/dinky-core/src/main/java/org/dinky/trans/pipeline/YamlTextPipelineDefinitionParser.java b/dinky-core/src/main/java/org/dinky/trans/pipeline/YamlTextPipelineDefinitionParser.java deleted file mode 100644 index 69a1a4443e..0000000000 --- a/dinky-core/src/main/java/org/dinky/trans/pipeline/YamlTextPipelineDefinitionParser.java +++ /dev/null @@ -1,189 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.dinky.trans.pipeline; - -import static org.apache.flink.cdc.common.utils.Preconditions.checkNotNull; - -import org.apache.flink.cdc.common.configuration.Configuration; -import org.apache.flink.cdc.common.utils.StringUtils; -import org.apache.flink.cdc.composer.definition.PipelineDef; -import org.apache.flink.cdc.composer.definition.RouteDef; -import org.apache.flink.cdc.composer.definition.SinkDef; -import org.apache.flink.cdc.composer.definition.SourceDef; -import org.apache.flink.cdc.composer.definition.TransformDef; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.yaml.YAMLFactory; - -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Optional; - -public class YamlTextPipelineDefinitionParser { - - // Parent node keys - private static final String SOURCE_KEY = "source"; - private static final String SINK_KEY = "sink"; - private static final String ROUTE_KEY = "route"; - private static final String TRANSFORM_KEY = "transform"; - private static final String PIPELINE_KEY = "pipeline"; - - // Source / sink keys - private static final String TYPE_KEY = "type"; - private static final String NAME_KEY = "name"; - - // Route keys - private static final String ROUTE_SOURCE_TABLE_KEY = "source-table"; - private static final String ROUTE_SINK_TABLE_KEY = "sink-table"; - private static final String ROUTE_DESCRIPTION_KEY = "description"; - - // Transform keys - private static final String TRANSFORM_SOURCE_TABLE_KEY = "source-table"; - private static final String TRANSFORM_PROJECTION_KEY = "projection"; - private static final String TRANSFORM_FILTER_KEY = "filter"; - private static final String TRANSFORM_DESCRIPTION_KEY = "description"; - - public static final String TRANSFORM_PRIMARY_KEY_KEY = "primary-keys"; - - public static final String TRANSFORM_PARTITION_KEY_KEY = "partition-keys"; - - public static final String TRANSFORM_TABLE_OPTION_KEY = "table-options"; - - private final ObjectMapper mapper = new ObjectMapper(new YAMLFactory()); - - /** Parse the specified pipeline definition file. */ - public PipelineDef parse(String text, Configuration globalPipelineConfig) throws Exception { - JsonNode root = mapper.readTree(text); - - // Source is required - SourceDef sourceDef = toSourceDef( - checkNotNull(root.get(SOURCE_KEY), "Missing required field \"%s\" in pipeline definition", SOURCE_KEY)); - - // Sink is required - SinkDef sinkDef = toSinkDef( - checkNotNull(root.get(SINK_KEY), "Missing required field \"%s\" in pipeline definition", SINK_KEY)); - - // Transforms are optional - List transformDefs = new ArrayList<>(); - Optional.ofNullable(root.get(TRANSFORM_KEY)) - .ifPresent(node -> node.forEach(transform -> transformDefs.add(toTransformDef(transform)))); - - // Routes are optional - List routeDefs = new ArrayList<>(); - Optional.ofNullable(root.get(ROUTE_KEY)) - .ifPresent(node -> node.forEach(route -> routeDefs.add(toRouteDef(route)))); - - // Pipeline configs are optional - Configuration userPipelineConfig = toPipelineConfig(root.get(PIPELINE_KEY)); - - // Merge user config into global config - Configuration pipelineConfig = new Configuration(); - pipelineConfig.addAll(globalPipelineConfig); - pipelineConfig.addAll(userPipelineConfig); - - return new PipelineDef(sourceDef, sinkDef, routeDefs, transformDefs, pipelineConfig); - } - - private SourceDef toSourceDef(JsonNode sourceNode) { - Map sourceMap = mapper.convertValue(sourceNode, new TypeReference>() {}); - - // "type" field is required - String type = checkNotNull( - sourceMap.remove(TYPE_KEY), "Missing required field \"%s\" in source configuration", TYPE_KEY); - - // "name" field is optional - String name = sourceMap.remove(NAME_KEY); - - return new SourceDef(type, name, Configuration.fromMap(sourceMap)); - } - - private SinkDef toSinkDef(JsonNode sinkNode) { - Map sinkMap = mapper.convertValue(sinkNode, new TypeReference>() {}); - - // "type" field is required - String type = - checkNotNull(sinkMap.remove(TYPE_KEY), "Missing required field \"%s\" in sink configuration", TYPE_KEY); - - // "name" field is optional - String name = sinkMap.remove(NAME_KEY); - - return new SinkDef(type, name, Configuration.fromMap(sinkMap)); - } - - private RouteDef toRouteDef(JsonNode routeNode) { - String sourceTable = checkNotNull( - routeNode.get(ROUTE_SOURCE_TABLE_KEY), - "Missing required field \"%s\" in route configuration", - ROUTE_SOURCE_TABLE_KEY) - .asText(); - String sinkTable = checkNotNull( - routeNode.get(ROUTE_SINK_TABLE_KEY), - "Missing required field \"%s\" in route configuration", - ROUTE_SINK_TABLE_KEY) - .asText(); - String description = Optional.ofNullable(routeNode.get(ROUTE_DESCRIPTION_KEY)) - .map(JsonNode::asText) - .orElse(null); - return new RouteDef(sourceTable, sinkTable, description); - } - - private TransformDef toTransformDef(JsonNode transformNode) { - String sourceTable = checkNotNull( - transformNode.get(TRANSFORM_SOURCE_TABLE_KEY), - "Missing required field \"%s\" in transform configuration", - TRANSFORM_SOURCE_TABLE_KEY) - .asText(); - String projection = Optional.ofNullable(transformNode.get(TRANSFORM_PROJECTION_KEY)) - .map(JsonNode::asText) - .orElse(null); - // When the star is in the first place, a backslash needs to be added for escape. - if (!StringUtils.isNullOrWhitespaceOnly(projection) && projection.contains("\\*")) { - projection = projection.replace("\\*", "*"); - } - String filter = Optional.ofNullable(transformNode.get(TRANSFORM_FILTER_KEY)) - .map(JsonNode::asText) - .orElse(null); - String primaryKeys = Optional.ofNullable(transformNode.get(TRANSFORM_PRIMARY_KEY_KEY)) - .map(JsonNode::asText) - .orElse(null); - String partitionKeys = Optional.ofNullable(transformNode.get(TRANSFORM_PARTITION_KEY_KEY)) - .map(JsonNode::asText) - .orElse(null); - String tableOptions = Optional.ofNullable(transformNode.get(TRANSFORM_TABLE_OPTION_KEY)) - .map(JsonNode::asText) - .orElse(null); - String description = Optional.ofNullable(transformNode.get(TRANSFORM_DESCRIPTION_KEY)) - .map(JsonNode::asText) - .orElse(null); - - return new TransformDef(sourceTable, projection, filter, primaryKeys, partitionKeys, tableOptions, description); - } - - private Configuration toPipelineConfig(JsonNode pipelineConfigNode) { - if (pipelineConfigNode == null || pipelineConfigNode.isNull()) { - return new Configuration(); - } - Map pipelineConfigMap = - mapper.convertValue(pipelineConfigNode, new TypeReference>() {}); - return Configuration.fromMap(pipelineConfigMap); - } -} diff --git a/dinky-flink/dinky-flink-1.14/pom.xml b/dinky-flink/dinky-flink-1.14/pom.xml index 03febcafa4..c867b7f545 100644 --- a/dinky-flink/dinky-flink-1.14/pom.xml +++ b/dinky-flink/dinky-flink-1.14/pom.xml @@ -34,7 +34,7 @@ 1.3.1 14.0 1.14.6 - 3.1.1 + 3.2.0 4.12 diff --git a/dinky-flink/dinky-flink-1.15/pom.xml b/dinky-flink/dinky-flink-1.15/pom.xml index b649001ee2..03d8009b18 100644 --- a/dinky-flink/dinky-flink-1.15/pom.xml +++ b/dinky-flink/dinky-flink-1.15/pom.xml @@ -34,7 +34,7 @@ 1.3.1 15.0 1.15.4 - 3.1.1 + 3.2.0 4.12 diff --git a/dinky-flink/dinky-flink-1.16/pom.xml b/dinky-flink/dinky-flink-1.16/pom.xml index c550e2ce04..9553bc2e06 100644 --- a/dinky-flink/dinky-flink-1.16/pom.xml +++ b/dinky-flink/dinky-flink-1.16/pom.xml @@ -18,7 +18,7 @@ 1.3.1 16.2 1.16.3 - 3.1.1 + 3.2.0 diff --git a/dinky-flink/dinky-flink-1.17/pom.xml b/dinky-flink/dinky-flink-1.17/pom.xml index 75baf20e2a..5710591f54 100644 --- a/dinky-flink/dinky-flink-1.17/pom.xml +++ b/dinky-flink/dinky-flink-1.17/pom.xml @@ -18,7 +18,7 @@ 1.3.1 17.0 1.17.2 - 3.1.1 + 3.2.0 diff --git a/dinky-flink/dinky-flink-1.18/pom.xml b/dinky-flink/dinky-flink-1.18/pom.xml index 1d224bd4e3..15bdce4ef7 100644 --- a/dinky-flink/dinky-flink-1.18/pom.xml +++ b/dinky-flink/dinky-flink-1.18/pom.xml @@ -18,7 +18,7 @@ 1.3.1 18.0 1.18.1 - 3.1.1 + 3.2.0 diff --git a/dinky-flink/dinky-flink-1.19/pom.xml b/dinky-flink/dinky-flink-1.19/pom.xml index e7d460224f..78f6c26762 100644 --- a/dinky-flink/dinky-flink-1.19/pom.xml +++ b/dinky-flink/dinky-flink-1.19/pom.xml @@ -18,7 +18,7 @@ 1.5.0 19.0 1.19.1 - 3.1.1 + 3.2.0 diff --git a/dinky-flink/dinky-flink-1.20/pom.xml b/dinky-flink/dinky-flink-1.20/pom.xml index 24a263c5b1..e10f28ea54 100644 --- a/dinky-flink/dinky-flink-1.20/pom.xml +++ b/dinky-flink/dinky-flink-1.20/pom.xml @@ -18,7 +18,7 @@ 1.5.0 19.0 1.20.0 - 3.1.1 + 3.2.0