Skip to content

Commit

Permalink
add doris source e2ecase
Browse files Browse the repository at this point in the history
  • Loading branch information
JNSimba committed Mar 6, 2024
1 parent 263588f commit 5ad9838
Show file tree
Hide file tree
Showing 2 changed files with 164 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@

public abstract class DorisTestBase {
protected static final Logger LOG = LoggerFactory.getLogger(DorisTestBase.class);
protected static final String DORIS_DOCKER_IMAGE = System.getProperty("image");
private static final String DEFAULT_DOCKER_IMAGE = "adamlee489/doris:2.0.3";
protected static final String DORIS_DOCKER_IMAGE = System.getProperty("image") == null ? DEFAULT_DOCKER_IMAGE : System.getProperty("image");
private static final String DRIVER_JAR =
"https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.16/mysql-connector-java-8.0.16.jar";
protected static final String DRIVER_CLASS = "com.mysql.cj.jdbc.Driver";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.flink.tools.cdc;

import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.function.SupplierWithException;

import org.apache.doris.flink.DorisTestBase;
import org.apache.doris.flink.tools.cdc.mysql.MysqlDatabaseSync;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.jupiter.api.Assertions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.MySQLContainer;
import org.testcontainers.lifecycle.Startables;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.apache.flink.api.common.JobStatus.RUNNING;

/**
* DorisDorisE2ECase.
*/
public class DorisDorisE2ECase extends DorisTestBase {
protected static final Logger LOG = LoggerFactory.getLogger(DorisDorisE2ECase.class);
private static final String DATABASE_SOURCE = "test_source";
private static final String DATABASE_SINK = "test_sink";
private static final String TABLE = "tbl1";

@Test
public void testDoris2Doris() throws Exception {
initializeDorisTable(TABLE);
printClusterStatus();
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

String sourceDDL =
String.format(
"CREATE TABLE doris_source ("
+ " name STRING,"
+ " age INT"
+ ") WITH ("
+ " 'connector' = 'doris',"
+ " 'fenodes' = '%s',"
+ " 'table.identifier' = '%s',"
+ " 'username' = '%s',"
+ " 'password' = '%s'"
+ ")",
getFenodes(), DATABASE_SOURCE + "." + TABLE, USERNAME, PASSWORD);
tEnv.executeSql(sourceDDL);

String sinkDDL =
String.format(
"CREATE TABLE doris_sink ("
+ " name STRING,"
+ " age INT"
+ ") WITH ("
+ " 'connector' = 'doris',"
+ " 'fenodes' = '%s',"
+ " 'table.identifier' = '%s',"
+ " 'username' = '%s',"
+ " 'password' = '%s'"
+ ")",
getFenodes(), DATABASE_SINK + "." + TABLE, USERNAME, PASSWORD);
tEnv.executeSql(sinkDDL);

tEnv.executeSql("INSERT INTO doris_sink SELECT * FROM doris_source").await();

TableResult tableResult = tEnv.executeSql("SELECT * FROM doris_sink");
List<Object> actual = new ArrayList<>();
try (CloseableIterator<Row> iterator = tableResult.collect()) {
while (iterator.hasNext()) {
actual.add(iterator.next().toString());
}
}
String[] expected = new String[] {"+I[doris, 18]", "+I[flink, 10]"};
Assertions.assertIterableEquals(Arrays.asList(expected), actual);
}

private void initializeDorisTable(String table) throws Exception {
try (Connection connection =
DriverManager.getConnection(
String.format(URL, DORIS_CONTAINER.getHost()), USERNAME, PASSWORD);
Statement statement = connection.createStatement()) {
statement.execute(String.format("CREATE DATABASE IF NOT EXISTS %s", DATABASE_SOURCE));
statement.execute(String.format("CREATE DATABASE IF NOT EXISTS %s", DATABASE_SINK));
statement.execute(String.format("DROP TABLE IF EXISTS %s.%s", DATABASE_SOURCE, table));
statement.execute(String.format("DROP TABLE IF EXISTS %s.%s", DATABASE_SINK, table));
statement.execute(
String.format(
"CREATE TABLE %s.%s ( \n"
+ "`name` varchar(256),\n"
+ "`age` int\n"
+ ") DISTRIBUTED BY HASH(`name`) BUCKETS 1\n"
+ "PROPERTIES (\n"
+ "\"replication_num\" = \"1\"\n"
+ ")\n",
DATABASE_SOURCE, table));
statement.execute(
String.format(
"CREATE TABLE %s.%s ( \n"
+ "`name` varchar(256),\n"
+ "`age` int\n"
+ ") DISTRIBUTED BY HASH(`name`) BUCKETS 1\n"
+ "PROPERTIES (\n"
+ "\"replication_num\" = \"1\"\n"
+ ")\n",
DATABASE_SINK, table));
statement.execute(
String.format("insert into %s.%s values ('doris',18)", DATABASE_SOURCE, table));
statement.execute(
String.format("insert into %s.%s values ('flink',10)", DATABASE_SOURCE, table));
}
}

}

0 comments on commit 5ad9838

Please sign in to comment.