Skip to content

Commit

Permalink
Merge branch 'master' into flink1-20
Browse files Browse the repository at this point in the history
# Conflicts:
#	flink-doris-connector/pom.xml
  • Loading branch information
JNSimba committed Aug 13, 2024
2 parents 853f07c + eb90905 commit f1fc2f1
Show file tree
Hide file tree
Showing 51 changed files with 1,588 additions and 322 deletions.
21 changes: 12 additions & 9 deletions flink-doris-connector/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ under the License.
<flink.sql.cdc.version>3.1.1</flink.sql.cdc.version>
<flink.python.id>flink-python</flink.python.id>
<libthrift.version>0.16.0</libthrift.version>
<arrow.version>13.0.0</arrow.version>
<maven-compiler-plugin.version>3.10.1</maven-compiler-plugin.version>
<maven-javadoc-plugin.version>3.3.0</maven-javadoc-plugin.version>
<maven-source-plugin.version>3.2.1</maven-source-plugin.version>
Expand All @@ -96,6 +95,8 @@ under the License.
<mysql.driver.version>8.0.26</mysql.driver.version>
<ojdbc.version>19.3.0.0</ojdbc.version>
<log4j.version>2.17.1</log4j.version>
<arrow.version>15.0.2</arrow.version>
<adbc.version>0.12.0</adbc.version>
</properties>

<dependencies>
Expand Down Expand Up @@ -180,13 +181,16 @@ under the License.
<artifactId>commons-codec</artifactId>
<version>${commons-codec.version}</version>
</dependency>

<dependency>
<groupId>org.apache.arrow.adbc</groupId>
<artifactId>adbc-driver-flight-sql</artifactId>
<version>${adbc.version}</version>
</dependency>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-vector</artifactId>
<version>${arrow.version}</version>
</dependency>

<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-memory-netty</artifactId>
Expand All @@ -207,7 +211,6 @@ under the License.
</exclusion>
</exclusions>
</dependency>

<!-- jackson -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
Expand Down Expand Up @@ -429,13 +432,13 @@ under the License.
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.4</version>
<version>3.4.1</version>
<configuration>
<relocations>
<relocation>
<pattern>org.apache.arrow</pattern>
<shadedPattern>org.apache.doris.shaded.org.apache.arrow</shadedPattern>
</relocation>
<!-- <relocation>-->
<!-- <pattern>org.apache.arrow</pattern>-->
<!-- <shadedPattern>org.apache.doris.shaded.org.apache.arrow</shadedPattern>-->
<!-- </relocation>-->
<relocation>
<pattern>io.netty</pattern>
<shadedPattern>org.apache.doris.shaded.io.netty</shadedPattern>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@
package org.apache.doris.flink.catalog.doris;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;

import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.doris.flink.exception.CreateTableException;
import org.apache.doris.flink.tools.cdc.DorisTableConfig;

import java.util.ArrayList;
Expand All @@ -30,6 +33,7 @@
import java.util.Map.Entry;
import java.util.Objects;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

/**
* Factory that creates doris schema.
Expand Down Expand Up @@ -103,4 +107,132 @@ public static Integer parseTableSchemaBuckets(
}
return null;
}

public static String generateCreateTableDDL(TableSchema schema) {
StringBuilder sb = new StringBuilder("CREATE TABLE IF NOT EXISTS ");
sb.append(identifier(schema.getDatabase()))
.append(".")
.append(identifier(schema.getTable()))
.append("(");

Map<String, FieldSchema> fields = schema.getFields();
List<String> keys = schema.getKeys();
// append keys
for (String key : keys) {
if (!fields.containsKey(key)) {
throw new CreateTableException("key " + key + " not found in column list");
}
FieldSchema field = fields.get(key);
buildColumn(sb, field, true);
}

// append values
for (Map.Entry<String, FieldSchema> entry : fields.entrySet()) {
if (keys.contains(entry.getKey())) {
continue;
}
FieldSchema field = entry.getValue();
buildColumn(sb, field, false);
}
sb = sb.deleteCharAt(sb.length() - 1);
sb.append(" ) ");
// append uniq model
if (DataModel.UNIQUE.equals(schema.getModel())) {
sb.append(schema.getModel().name())
.append(" KEY(")
.append(String.join(",", identifier(schema.getKeys())))
.append(")");
}

// append table comment
if (!StringUtils.isNullOrWhitespaceOnly(schema.getTableComment())) {
sb.append(" COMMENT '").append(quoteComment(schema.getTableComment())).append("' ");
}

// append distribute key
sb.append(" DISTRIBUTED BY HASH(")
.append(String.join(",", identifier(schema.getDistributeKeys())))
.append(")");

Map<String, String> properties = schema.getProperties();
if (schema.getTableBuckets() != null) {

int bucketsNum = schema.getTableBuckets();
if (bucketsNum <= 0) {
throw new CreateTableException("The number of buckets must be positive.");
}
sb.append(" BUCKETS ").append(bucketsNum);
} else {
sb.append(" BUCKETS AUTO ");
}
// append properties
int index = 0;
for (Map.Entry<String, String> entry : properties.entrySet()) {
if (index == 0) {
sb.append(" PROPERTIES (");
}
if (index > 0) {
sb.append(",");
}
sb.append(quoteProperties(entry.getKey()))
.append("=")
.append(quoteProperties(entry.getValue()));
index++;

if (index == (schema.getProperties().size())) {
sb.append(")");
}
}
return sb.toString();
}

private static void buildColumn(StringBuilder sql, FieldSchema field, boolean isKey) {
String fieldType = field.getTypeString();
if (isKey && DorisType.STRING.equals(fieldType)) {
fieldType = String.format("%s(%s)", DorisType.VARCHAR, 65533);
}
sql.append(identifier(field.getName())).append(" ").append(fieldType);

if (field.getDefaultValue() != null) {
sql.append(" DEFAULT " + quoteDefaultValue(field.getDefaultValue()));
}
sql.append(" COMMENT '").append(quoteComment(field.getComment())).append("',");
}

private static String quoteProperties(String name) {
return "'" + name + "'";
}

private static List<String> identifier(List<String> names) {
return names.stream().map(DorisSchemaFactory::identifier).collect(Collectors.toList());
}

public static String identifier(String name) {
if (name.startsWith("`") && name.endsWith("`")) {
return name;
}
return "`" + name + "`";
}

public static String quoteDefaultValue(String defaultValue) {
// DEFAULT current_timestamp not need quote
if (defaultValue.equalsIgnoreCase("current_timestamp")) {
return defaultValue;
}
return "'" + defaultValue + "'";
}

public static String quoteComment(String comment) {
if (comment == null) {
return "";
} else {
return comment.replaceAll("'", "\\\\'");
}
}

public static String quoteTableIdentifier(String tableIdentifier) {
String[] dbTable = tableIdentifier.split("\\.");
Preconditions.checkArgument(dbTable.length == 2);
return identifier(dbTable[0]) + "." + identifier(dbTable[1]);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,12 @@
package org.apache.doris.flink.catalog.doris;

import org.apache.flink.annotation.Public;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;

import org.apache.commons.compress.utils.Lists;
import org.apache.doris.flink.cfg.DorisConnectionOptions;
import org.apache.doris.flink.connection.JdbcConnectionProvider;
import org.apache.doris.flink.connection.SimpleJdbcConnectionProvider;
import org.apache.doris.flink.exception.CreateTableException;
import org.apache.doris.flink.exception.DorisRuntimeException;
import org.apache.doris.flink.exception.DorisSystemException;
import org.slf4j.Logger;
Expand All @@ -41,7 +39,6 @@
import java.util.Map;
import java.util.Objects;
import java.util.function.Predicate;
import java.util.stream.Collectors;

import static org.apache.flink.util.Preconditions.checkArgument;

Expand Down Expand Up @@ -141,81 +138,7 @@ public List<String> extractColumnValuesBySQL(
}

public static String buildCreateTableDDL(TableSchema schema) {
StringBuilder sb = new StringBuilder("CREATE TABLE IF NOT EXISTS ");
sb.append(identifier(schema.getDatabase()))
.append(".")
.append(identifier(schema.getTable()))
.append("(");

Map<String, FieldSchema> fields = schema.getFields();
List<String> keys = schema.getKeys();
// append keys
for (String key : keys) {
if (!fields.containsKey(key)) {
throw new CreateTableException("key " + key + " not found in column list");
}
FieldSchema field = fields.get(key);
buildColumn(sb, field, true);
}

// append values
for (Map.Entry<String, FieldSchema> entry : fields.entrySet()) {
if (keys.contains(entry.getKey())) {
continue;
}
FieldSchema field = entry.getValue();
buildColumn(sb, field, false);
}
sb = sb.deleteCharAt(sb.length() - 1);
sb.append(" ) ");
// append uniq model
if (DataModel.UNIQUE.equals(schema.getModel())) {
sb.append(schema.getModel().name())
.append(" KEY(")
.append(String.join(",", identifier(schema.getKeys())))
.append(")");
}

// append table comment
if (!StringUtils.isNullOrWhitespaceOnly(schema.getTableComment())) {
sb.append(" COMMENT '").append(quoteComment(schema.getTableComment())).append("' ");
}

// append distribute key
sb.append(" DISTRIBUTED BY HASH(")
.append(String.join(",", identifier(schema.getDistributeKeys())))
.append(")");

Map<String, String> properties = schema.getProperties();
if (schema.getTableBuckets() != null) {

int bucketsNum = schema.getTableBuckets();
if (bucketsNum <= 0) {
throw new CreateTableException("The number of buckets must be positive.");
}
sb.append(" BUCKETS ").append(bucketsNum);
} else {
sb.append(" BUCKETS AUTO ");
}
// append properties
int index = 0;
for (Map.Entry<String, String> entry : properties.entrySet()) {
if (index == 0) {
sb.append(" PROPERTIES (");
}
if (index > 0) {
sb.append(",");
}
sb.append(quoteProperties(entry.getKey()))
.append("=")
.append(quoteProperties(entry.getValue()));
index++;

if (index == (schema.getProperties().size())) {
sb.append(")");
}
}
return sb.toString();
return DorisSchemaFactory.generateCreateTableDDL(schema);
}

public Map<String, String> getTableFieldNames(String databaseName, String tableName) {
Expand Down Expand Up @@ -244,53 +167,23 @@ public Map<String, String> getTableFieldNames(String databaseName, String tableN
}
}

private static void buildColumn(StringBuilder sql, FieldSchema field, boolean isKey) {
String fieldType = field.getTypeString();
if (isKey && DorisType.STRING.equals(fieldType)) {
fieldType = String.format("%s(%s)", DorisType.VARCHAR, 65533);
}
sql.append(identifier(field.getName())).append(" ").append(fieldType);

if (field.getDefaultValue() != null) {
sql.append(" DEFAULT " + quoteDefaultValue(field.getDefaultValue()));
}
sql.append(" COMMENT '").append(quoteComment(field.getComment())).append("',");
}

@Deprecated
public static String quoteDefaultValue(String defaultValue) {
// DEFAULT current_timestamp not need quote
if (defaultValue.equalsIgnoreCase("current_timestamp")) {
return defaultValue;
}
return "'" + defaultValue + "'";
return DorisSchemaFactory.quoteDefaultValue(defaultValue);
}

@Deprecated
public static String quoteComment(String comment) {
if (comment == null) {
return "";
} else {
return comment.replaceAll("'", "\\\\'");
}
}

private static List<String> identifier(List<String> name) {
return name.stream().map(DorisSystem::identifier).collect(Collectors.toList());
return DorisSchemaFactory.quoteComment(comment);
}

@Deprecated
public static String identifier(String name) {
if (name.startsWith("`") && name.endsWith("`")) {
return name;
}
return "`" + name + "`";
return DorisSchemaFactory.identifier(name);
}

@Deprecated
public static String quoteTableIdentifier(String tableIdentifier) {
String[] dbTable = tableIdentifier.split("\\.");
Preconditions.checkArgument(dbTable.length == 2);
return identifier(dbTable[0]) + "." + identifier(dbTable[1]);
}

private static String quoteProperties(String name) {
return "'" + name + "'";
return DorisSchemaFactory.quoteTableIdentifier(tableIdentifier);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,10 @@ public interface ConfigurationOptions {
Boolean DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT = false;
String DORIS_DESERIALIZE_QUEUE_SIZE = "doris.deserialize.queue.size";
Integer DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT = 64;

String USE_FLIGHT_SQL = "source.use-flight-sql";
Boolean USE_FLIGHT_SQL_DEFAULT = false;

String FLIGHT_SQL_PORT = "source.flight-sql-port";
Integer FLIGHT_SQL_PORT_DEFAULT = 9040;
}
Loading

0 comments on commit f1fc2f1

Please sign in to comment.