Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/logical_delete' into logical_delete
Browse files Browse the repository at this point in the history
  • Loading branch information
Zzm0809 committed Nov 27, 2024
2 parents 87be271 + a6d8ed2 commit 6daad7c
Show file tree
Hide file tree
Showing 9 changed files with 169 additions and 114 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

package org.dinky.data.model;

import lombok.EqualsAndHashCode;
import org.dinky.mybatis.model.DateBaseEntity;

import java.io.Serializable;
Expand All @@ -37,6 +36,7 @@
import io.swagger.annotations.ApiModelProperty;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,5 +52,4 @@ public class Tenant extends DateBaseEntity<Tenant> implements Serializable {
/** note */
@ApiModelProperty(value = "Tenant Note", required = true, dataType = "String", example = "Default")
private String note;

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;

import lombok.Getter;
import org.dinky.flink.catalog.factory.DinkyMysqlCatalogFactoryOptions;

import org.apache.flink.table.api.Schema;
Expand Down Expand Up @@ -74,6 +73,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import lombok.Getter;

/**
* DinkyMysqlCatalog is a catalog implementation for MySQL.
*/
Expand Down Expand Up @@ -260,7 +261,8 @@ public List<String> listDatabases() throws CatalogException {

@Override
public CatalogDatabase getDatabase(String databaseName) throws DatabaseNotExistException, CatalogException {
String querySql = "SELECT id, database_name,description FROM metadata_database where database_name=? and is_delete = 0";
String querySql =
"SELECT id, database_name,description FROM metadata_database where database_name=? and is_delete = 0";
Connection conn = getConnection();
try (PreparedStatement ps = conn.prepareStatement(querySql)) {
ps.setString(1, databaseName);
Expand All @@ -272,7 +274,8 @@ public CatalogDatabase getDatabase(String databaseName) throws DatabaseNotExistE

Map<String, String> map = new HashMap<>();

String sql = "select `key`,`value` from metadata_database_property where database_id=? and is_delete = 0";
String sql =
"select `key`,`value` from metadata_database_property where database_id=? and is_delete = 0";
try (PreparedStatement pStat = conn.prepareStatement(sql)) {
pStat.setInt(1, id);
ResultSet prs = pStat.executeQuery();
Expand Down Expand Up @@ -482,7 +485,8 @@ protected List<String> listTablesViews(String databaseName, String tableType)

// get all schemas
// 要给出table 或 view
String querySql = "SELECT table_name FROM metadata_table where table_type=? and database_id = ? and is_delete = 0";
String querySql =
"SELECT table_name FROM metadata_table where table_type=? and database_id = ? and is_delete = 0";
Connection conn = getConnection();
try (PreparedStatement ps = conn.prepareStatement(querySql)) {
ps.setString(1, tableType);
Expand Down Expand Up @@ -528,7 +532,8 @@ public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistExcep
}
if (tableType.equals(ObjectType.TABLE)) {
// 这个是 table
String propSql = "SELECT `key`, `value` from metadata_table_property WHERE table_id=? and is_delete = 0";
String propSql =
"SELECT `key`, `value` from metadata_table_property WHERE table_id=? and is_delete = 0";
PreparedStatement pState = conn.prepareStatement(propSql);
pState.setInt(1, id);
ResultSet prs = pState.executeQuery();
Expand Down Expand Up @@ -643,7 +648,7 @@ public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists)
dStat.setInt(1, id);
dStat.executeUpdate();
dStat.close();
// String deleteDbSql = "delete from metadata_table " + " where id=?";
// String deleteDbSql = "delete from metadata_table " + " where id=?";
String deleteDbSql = "update metadata_table set is_delete=1 where id=?";
dStat = conn.prepareStatement(deleteDbSql);
dStat.setInt(1, id);
Expand Down Expand Up @@ -694,12 +699,14 @@ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ig
}
return;
}
//Insert table
//Insert into the table table. Here, it could be a table or a view
//If it is a table, we think it is a resolved table, so we can use the properties method to serialize and save it.
//If it is a view, we think it can only have physical fields
// Insert table
// Insert into the table table. Here, it could be a table or a view
// If it is a table, we think it is a resolved table, so we can use the properties method to serialize and save
// it.
// If it is a view, we think it can only have physical fields
if (!(table instanceof ResolvedCatalogBaseTable)) {
throw new UnsupportedOperationException("Entering tables of non-ResolvedCatalogBaseTable types is temporarily not supported");
throw new UnsupportedOperationException(
"Entering tables of non-ResolvedCatalogBaseTable types is temporarily not supported");
}
Connection conn = getConnection();
try {
Expand Down Expand Up @@ -730,8 +737,7 @@ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ig
if (table instanceof ResolvedCatalogTable) {
// table 就可以直接拿properties了。
Map<String, String> props = ((ResolvedCatalogTable) table).toProperties();
String propInsertSql =
"insert into metadata_table_property(table_id, `key`,`value`) values (?,?,?)";
String propInsertSql = "insert into metadata_table_property(table_id, `key`,`value`) values (?,?,?)";
PreparedStatement pStat = conn.prepareStatement(propInsertSql);
for (Map.Entry<String, String> entry : props.entrySet()) {
pStat.setInt(1, id);
Expand Down Expand Up @@ -778,7 +784,8 @@ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ig
colIStat.setObject(7, null); // view没有主键
colIStat.addBatch();
} else {
throw new UnsupportedOperationException("Temporarily, it is believed that non-physical fields will not appear in view");
throw new UnsupportedOperationException(
"Temporarily, it is believed that non-physical fields will not appear in view");
}
}
colIStat.executeBatch();
Expand Down Expand Up @@ -1032,7 +1039,8 @@ public void alterFunction(ObjectPath functionPath, CatalogFunction newFunction,
}

Connection conn = getConnection();
String insertSql = "update metadata_function set (class_name =?, function_language=?) " + " where id=? and is_delete = 0";
String insertSql =
"update metadata_function set (class_name =?, function_language=?) " + " where id=? and is_delete = 0";
try (PreparedStatement ps = conn.prepareStatement(insertSql)) {
ps.setString(1, newFunction.getClassName());
ps.setString(2, newFunction.getFunctionLanguage().toString());
Expand All @@ -1056,7 +1064,7 @@ public void dropFunction(ObjectPath functionPath, boolean ignoreIfNotExists)
}

Connection conn = getConnection();
// String insertSql = "delete from metadata_function " + " where id=?";
// String insertSql = "delete from metadata_function " + " where id=?";
String insertSql = "update metadata_function set is_delete = 1 where id=?";
try (PreparedStatement ps = conn.prepareStatement(insertSql)) {
ps.setInt(1, id);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;

import lombok.Getter;
import org.dinky.flink.catalog.factory.DinkyMysqlCatalogFactoryOptions;

import org.apache.flink.table.api.Schema;
Expand Down Expand Up @@ -74,6 +73,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import lombok.Getter;

/**
* DinkyMysqlCatalog is a catalog implementation for MySQL.
*/
Expand Down Expand Up @@ -260,7 +261,8 @@ public List<String> listDatabases() throws CatalogException {

@Override
public CatalogDatabase getDatabase(String databaseName) throws DatabaseNotExistException, CatalogException {
String querySql = "SELECT id, database_name,description FROM metadata_database where database_name=? and is_delete = 0";
String querySql =
"SELECT id, database_name,description FROM metadata_database where database_name=? and is_delete = 0";
Connection conn = getConnection();
try (PreparedStatement ps = conn.prepareStatement(querySql)) {
ps.setString(1, databaseName);
Expand All @@ -272,7 +274,8 @@ public CatalogDatabase getDatabase(String databaseName) throws DatabaseNotExistE

Map<String, String> map = new HashMap<>();

String sql = "select `key`,`value` from metadata_database_property where database_id=? and is_delete = 0";
String sql =
"select `key`,`value` from metadata_database_property where database_id=? and is_delete = 0";
try (PreparedStatement pStat = conn.prepareStatement(sql)) {
pStat.setInt(1, id);
ResultSet prs = pStat.executeQuery();
Expand Down Expand Up @@ -482,7 +485,8 @@ protected List<String> listTablesViews(String databaseName, String tableType)

// get all schemas
// 要给出table 或 view
String querySql = "SELECT table_name FROM metadata_table where table_type=? and database_id = ? and is_delete = 0";
String querySql =
"SELECT table_name FROM metadata_table where table_type=? and database_id = ? and is_delete = 0";
Connection conn = getConnection();
try (PreparedStatement ps = conn.prepareStatement(querySql)) {
ps.setString(1, tableType);
Expand Down Expand Up @@ -528,7 +532,8 @@ public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistExcep
}
if (tableType.equals(ObjectType.TABLE)) {
// 这个是 table
String propSql = "SELECT `key`, `value` from metadata_table_property WHERE table_id=? and is_delete = 0";
String propSql =
"SELECT `key`, `value` from metadata_table_property WHERE table_id=? and is_delete = 0";
PreparedStatement pState = conn.prepareStatement(propSql);
pState.setInt(1, id);
ResultSet prs = pState.executeQuery();
Expand Down Expand Up @@ -643,7 +648,7 @@ public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists)
dStat.setInt(1, id);
dStat.executeUpdate();
dStat.close();
// String deleteDbSql = "delete from metadata_table " + " where id=?";
// String deleteDbSql = "delete from metadata_table " + " where id=?";
String deleteDbSql = "update metadata_table set is_delete=1 where id=?";
dStat = conn.prepareStatement(deleteDbSql);
dStat.setInt(1, id);
Expand Down Expand Up @@ -694,12 +699,14 @@ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ig
}
return;
}
//Insert table
//Insert into the table table. Here, it could be a table or a view
//If it is a table, we think it is a resolved table, so we can use the properties method to serialize and save it.
//If it is a view, we think it can only have physical fields
// Insert table
// Insert into the table table. Here, it could be a table or a view
// If it is a table, we think it is a resolved table, so we can use the properties method to serialize and save
// it.
// If it is a view, we think it can only have physical fields
if (!(table instanceof ResolvedCatalogBaseTable)) {
throw new UnsupportedOperationException("Entering tables of non-ResolvedCatalogBaseTable types is temporarily not supported");
throw new UnsupportedOperationException(
"Entering tables of non-ResolvedCatalogBaseTable types is temporarily not supported");
}
Connection conn = getConnection();
try {
Expand Down Expand Up @@ -730,8 +737,7 @@ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ig
if (table instanceof ResolvedCatalogTable) {
// table 就可以直接拿properties了。
Map<String, String> props = ((ResolvedCatalogTable) table).toProperties();
String propInsertSql =
"insert into metadata_table_property(table_id, `key`,`value`) values (?,?,?)";
String propInsertSql = "insert into metadata_table_property(table_id, `key`,`value`) values (?,?,?)";
PreparedStatement pStat = conn.prepareStatement(propInsertSql);
for (Map.Entry<String, String> entry : props.entrySet()) {
pStat.setInt(1, id);
Expand Down Expand Up @@ -778,7 +784,8 @@ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ig
colIStat.setObject(7, null); // view没有主键
colIStat.addBatch();
} else {
throw new UnsupportedOperationException("Temporarily, it is believed that non-physical fields will not appear in view");
throw new UnsupportedOperationException(
"Temporarily, it is believed that non-physical fields will not appear in view");
}
}
colIStat.executeBatch();
Expand Down Expand Up @@ -1032,7 +1039,8 @@ public void alterFunction(ObjectPath functionPath, CatalogFunction newFunction,
}

Connection conn = getConnection();
String insertSql = "update metadata_function set (class_name =?, function_language=?) " + " where id=? and is_delete = 0";
String insertSql =
"update metadata_function set (class_name =?, function_language=?) " + " where id=? and is_delete = 0";
try (PreparedStatement ps = conn.prepareStatement(insertSql)) {
ps.setString(1, newFunction.getClassName());
ps.setString(2, newFunction.getFunctionLanguage().toString());
Expand All @@ -1056,7 +1064,7 @@ public void dropFunction(ObjectPath functionPath, boolean ignoreIfNotExists)
}

Connection conn = getConnection();
// String insertSql = "delete from metadata_function " + " where id=?";
// String insertSql = "delete from metadata_function " + " where id=?";
String insertSql = "update metadata_function set is_delete = 1 where id=?";
try (PreparedStatement ps = conn.prepareStatement(insertSql)) {
ps.setInt(1, id);
Expand Down
Loading

0 comments on commit 6daad7c

Please sign in to comment.