diff --git a/dinky-admin/src/main/resources/db/migration/h2/V20241127.1.3.0__release.sql b/dinky-admin/src/main/resources/db/migration/h2/V20241230.1.3.0__release.sql similarity index 100% rename from dinky-admin/src/main/resources/db/migration/h2/V20241127.1.3.0__release.sql rename to dinky-admin/src/main/resources/db/migration/h2/V20241230.1.3.0__release.sql diff --git a/dinky-admin/src/main/resources/db/migration/mysql/V20241127.1.3.0__release.sql b/dinky-admin/src/main/resources/db/migration/mysql/V20241230.1.3.0__release.sql similarity index 100% rename from dinky-admin/src/main/resources/db/migration/mysql/V20241127.1.3.0__release.sql rename to dinky-admin/src/main/resources/db/migration/mysql/V20241230.1.3.0__release.sql diff --git a/dinky-admin/src/main/resources/db/migration/postgresql/V20241127.1.3.0__release.sql b/dinky-admin/src/main/resources/db/migration/postgresql/V20241127.1.3.0__release.sql deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/dinky-admin/src/main/resources/db/migration/postgresql/V20241230.1.3.0__release.sql b/dinky-admin/src/main/resources/db/migration/postgresql/V20241230.1.3.0__release.sql new file mode 100644 index 0000000000..97364f8e87 --- /dev/null +++ b/dinky-admin/src/main/resources/db/migration/postgresql/V20241230.1.3.0__release.sql @@ -0,0 +1,96 @@ + +SELECT add_column_if_not_exists('public','dinky_alert_history', 'is_delete', 'boolean', 'false', 'is delete 0: false, 1: true'); + +SELECT add_column_if_not_exists('public','dinky_job_instance', 'is_delete', 'boolean', 'false', 'is delete 0: false, 1: true'); + +SELECT add_column_if_not_exists('public','dinky_job_history', 'is_delete', 'boolean', 'false', 'is delete 0: false, 1: true'); + +SELECT add_column_if_not_exists('public','dinky_job_instance', 'is_delete', 'boolean', 'false', 'is delete 0: false, 1: true'); + +SELECT add_column_if_not_exists('public','dinky_sys_menu', 'is_delete', 'boolean', 'false', 'is delete 0: false, 1: true'); + +SELECT add_column_if_not_exists('public','dinky_sys_role_menu', 'is_delete', 'boolean', 'false', 'is delete 0: false, 1: true'); + +SELECT add_column_if_not_exists('public','dinky_row_permissions', 'is_delete', 'boolean', 'false', 'is delete 0: false, 1: true'); + +SELECT add_column_if_not_exists('public','dinky_user_role', 'is_delete', 'boolean', 'false', 'is delete 0: false, 1: true'); + +SELECT add_column_if_not_exists('public','dinky_user_tenant', 'is_delete', 'boolean', 'false', 'is delete 0: false, 1: true'); + +SELECT add_column_if_not_exists('public','dinky_udf_manage', 'is_delete', 'boolean', 'false', 'is delete 0: false, 1: true'); + +SELECT add_column_if_not_exists('public','dinky_udf_template', 'is_delete', 'boolean', 'false', 'is delete 0: false, 1: true'); + +SELECT add_column_if_not_exists('public','dinky_catalogue', 'is_delete', 'boolean', 'false', 'is delete 0: false, 1: true'); + +SELECT add_column_if_not_exists('public','dinky_cluster_configuration', 'is_delete', 'boolean', 'false', 'is delete 0: false, 1: true'); + +select add_column_if_not_exists('public','dinky_cluster', 'is_delete', 'boolean', 'false', 'is delete 0: false, 1: true'); + +select add_column_if_not_exists('public','dinky_dashboard', 'is_delete', 'boolean', 'false', 'is delete 0: false, 1: true'); + +select add_column_if_not_exists('public','dinky_database', 'is_delete', 'boolean', 'false', 'is delete 0: false, 1: true'); + +select add_column_if_not_exists('public','dinky_flink_document', 'is_delete', 'boolean', 'false', 'is delete 0: false, 1: true'); + +select add_column_if_not_exists('public','dinky_fragment', 'is_delete', 'boolean', 'false', 'is delete 0: false, 1: true'); + +select add_column_if_not_exists('public','dinky_git_project', 'is_delete', 'boolean', 'false', 'is delete 0: false, 1: true'); + +select add_column_if_not_exists('public','dinky_metrics', 'is_delete', 'boolean', 'false', 'is delete 0: false, 1: true'); + +select add_column_if_not_exists('public','dinky_sys_operate_log', 'is_delete', 'boolean', 'false', 'is delete 0: false, 1: true'); + +select add_column_if_not_exists('public','dinky_resources', 'is_delete', 'boolean', 'false', 'is delete 0: false, 1: true'); + +select add_column_if_not_exists('public','dinky_savepoints', 'is_delete', 'boolean', 'false', 'is delete 0: false, 1: true'); + +select add_column_if_not_exists('public','dinky_sys_token', 'is_delete', 'boolean', 'false', 'is delete 0: false, 1: true'); + +select add_column_if_not_exists('public','dinky_task', 'is_delete', 'boolean', 'false', 'is delete 0: false, 1: true'); + +select add_column_if_not_exists('public','dinky_task_version', 'is_delete', 'boolean', 'false', 'is delete 0: false, 1: true'); + +select add_column_if_not_exists('public','dinky_alert_group', 'is_delete', 'boolean', 'false', 'is delete 0: false, 1: true'); + +select add_column_if_not_exists('public','dinky_alert_instance', 'is_delete', 'boolean', 'false', 'is delete 0: false, 1: true'); + +select add_column_if_not_exists('public','dinky_alert_rules', 'is_delete', 'boolean', 'false', 'is delete 0: false, 1: true'); + +select add_column_if_not_exists('public','dinky_alert_template', 'is_delete', 'boolean', 'false', 'is delete 0: false, 1: true'); + +ALTER TABLE public.dinky_sys_login_log drop column is_deleted; +select add_column_if_not_exists('public','dinky_sys_login_log', 'is_delete', 'boolean', 'false', 'is delete 0: false, 1: true'); + + +SELECT add_column_if_not_exists('public','metadata_column', 'is_delete', 'boolean', 'false', 'is delete 0: false, 1: true'); + +SELECT add_column_if_not_exists('public','metadata_database', 'is_delete', 'boolean', 'false', 'is delete 0: false, 1: true'); + +SELECT add_column_if_not_exists('public','metadata_database_property', 'is_delete', 'boolean', 'false', 'is delete 0: false, 1: true'); + +SELECT add_column_if_not_exists('public','metadata_function', 'is_delete', 'boolean', 'false', 'is delete 0: false, 1: true'); + +SELECT add_column_if_not_exists('public','metadata_table', 'is_delete', 'boolean', 'false', 'is delete 0: false, 1: true'); + +SELECT add_column_if_not_exists('public','metadata_table_property', 'is_delete', 'boolean', 'false', 'is delete 0: false, 1: true'); + + +CREATE UNIQUE INDEX unx1_metadata_column + ON metadata_column USING BTREE (table_id, column_name, is_delete); + +CREATE UNIQUE INDEX unx1_metadata_database + ON metadata_database USING BTREE (database_name, is_delete); + +CREATE UNIQUE INDEX unx1_metadata_database_property + ON metadata_database_property USING BTREE (database_id, "key", is_delete); + +CREATE UNIQUE INDEX unx1_metadata_function + ON metadata_function USING BTREE ("function_name", database_id, is_delete); + +CREATE UNIQUE INDEX unx1_metadata_table + ON metadata_table USING BTREE ("table_name", database_id, is_delete); + +CREATE UNIQUE INDEX unx1_metadata_table_property + ON metadata_table_property USING BTREE (table_id, "key", is_delete); + diff --git a/dinky-catalog/dinky-catalog-postgres/dinky-catalog-postgres-1.14/src/main/java/org/dinky/flink/catalog/DinkyPostgresCatalog.java b/dinky-catalog/dinky-catalog-postgres/dinky-catalog-postgres-1.14/src/main/java/org/dinky/flink/catalog/DinkyPostgresCatalog.java index 5357b49a70..28eae05ecf 100644 --- a/dinky-catalog/dinky-catalog-postgres/dinky-catalog-postgres-1.14/src/main/java/org/dinky/flink/catalog/DinkyPostgresCatalog.java +++ b/dinky-catalog/dinky-catalog-postgres/dinky-catalog-postgres-1.14/src/main/java/org/dinky/flink/catalog/DinkyPostgresCatalog.java @@ -276,7 +276,7 @@ protected Connection getConnection() throws CatalogException { @Override public List listDatabases() throws CatalogException { List myDatabases = new ArrayList<>(); - String querySql = "SELECT database_name FROM metadata_database"; + String querySql = "SELECT database_name FROM metadata_database WHERE is_delete=false"; Connection conn = getConnection(); try (PreparedStatement ps = conn.prepareStatement(querySql)) { @@ -302,7 +302,7 @@ public List listDatabases() throws CatalogException { */ @Override public CatalogDatabase getDatabase(String databaseName) throws DatabaseNotExistException, CatalogException { - String querySql = "SELECT id, database_name,description FROM metadata_database where database_name=?"; + String querySql = "SELECT id, database_name, description FROM metadata_database WHERE database_name=? AND is_delete=false"; Connection conn = getConnection(); try (PreparedStatement ps = conn.prepareStatement(querySql)) { ps.setString(1, databaseName); @@ -314,7 +314,7 @@ public CatalogDatabase getDatabase(String databaseName) throws DatabaseNotExistE Map map = new HashMap<>(); - String sql = "select \"key\", \"value\" from metadata_database_property where database_id=? "; + String sql = "SELECT \"key\", \"value\" FROM metadata_database_property WHERE database_id=? AND is_delete=false"; try (PreparedStatement pStat = conn.prepareStatement(sql)) { pStat.setInt(1, id); ResultSet prs = pStat.executeQuery(); @@ -357,7 +357,7 @@ public boolean databaseExists(String databaseName) throws CatalogException { * @throws CatalogException This exception is thrown if an error occurs while getting the database ID */ private Integer getDatabaseId(String databaseName) throws CatalogException { - String querySql = "select id from metadata_database where database_name=?"; + String querySql = "SELECT id FROM metadata_database WHERE database_name=? AND is_delete=false"; Connection conn = getConnection(); try (PreparedStatement ps = conn.prepareStatement(querySql)) { ps.setString(1, databaseName); @@ -479,13 +479,13 @@ public void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade } } // todo: Now it is actually deleted, whether records will be retained for subsequent designs. - String deletePropSql = "delete from metadata_database_property where database_id=?"; - PreparedStatement dStat = conn.prepareStatement(deletePropSql); + String updatePropSql = "UPDATE metadata_database_property SET is_delete=true WHERE database_id=? AND is_delete=false"; + PreparedStatement dStat = conn.prepareStatement(updatePropSql); dStat.setInt(1, id); dStat.executeUpdate(); dStat.close(); - String deleteDbSql = "delete from metadata_database where id=?"; - dStat = conn.prepareStatement(deleteDbSql); + String updateDbSql = "UPDATE metadata_database SET is_delete=true WHERE id=? AND is_delete=false"; + dStat = conn.prepareStatement(updateDbSql); dStat.setInt(1, id); dStat.executeUpdate(); dStat.close(); @@ -523,16 +523,17 @@ public void alterDatabase(String name, CatalogDatabase newDb, boolean ignoreIfNo try { conn.setAutoCommit(false); // 1. The name cannot be changed and the type cannot be changed. Only notes can be changed - String updateCommentSql = "update metadata_database set description=? where id=?"; + String updateCommentSql = "UPDATE metadata_database SET description=? WHERE id=? AND is_delete=false"; PreparedStatement uState = conn.prepareStatement(updateCommentSql); uState.setString(1, newDb.getComment()); uState.setInt(2, id); uState.executeUpdate(); uState.close(); if (newDb.getProperties() != null && newDb.getProperties().size() > 0) { - String upsertSql = "insert into metadata_database_property (database_id, key, value) " - + "values (?,?,?) " - + "on CONFLICT (database_id, \"key\") do update set value = excluded.value, update_time = now()"; + String upsertSql = "INSERT INTO metadata_database_property (database_id, key, value) " + + "VALUES (?, ?, ?) " + + "ON CONFLICT (database_id, \"key\") DO UPDATE SET value = excluded.value, update_time = now() " + + "WHERE is_delete=false"; PreparedStatement pstat = conn.prepareStatement(upsertSql); for (Map.Entry entry : newDb.getProperties().entrySet()) { pstat.setInt(1, id); @@ -593,7 +594,7 @@ protected List listTablesViews(String databaseName, String tableType) // get all schemas // To give table or view - String querySql = "SELECT table_name FROM metadata_table where table_type=? and database_id = ?"; + String querySql = "SELECT table_name FROM metadata_table WHERE database_name=? AND object_type=? AND is_delete=false"; Connection conn = getConnection(); try (PreparedStatement ps = conn.prepareStatement(querySql)) { ps.setString(1, tableType); @@ -634,7 +635,7 @@ public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistExcep Connection conn = getConnection(); try { - String queryTable = "SELECT table_name ,description, table_type FROM metadata_table where id=?"; + String queryTable = "SELECT table_name ,description, table_type FROM metadata_table where id=? AND is_delete=false"; PreparedStatement ps = conn.prepareStatement(queryTable); ps.setInt(1, id); ResultSet rs = ps.executeQuery(); @@ -650,7 +651,7 @@ public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistExcep } if (tableType.equals(ObjectType.TABLE)) { // This is table - String propSql = "SELECT \"key\", \"value\" from metadata_table_property " + "WHERE table_id=?"; + String propSql = "SELECT \"key\", \"value\" from metadata_table_property WHERE table_id=? AND is_delete=false"; PreparedStatement pState = conn.prepareStatement(propSql); pState.setInt(1, id); ResultSet prs = pState.executeQuery(); @@ -667,7 +668,7 @@ public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistExcep // 1. Get table information from the library. (Already done before) // 2. Take out the field. String colSql = - "SELECT column_name, column_type, data_type, description FROM metadata_column WHERE table_id=?"; + "SELECT column_name, column_type, data_type, description FROM metadata_column WHERE table_id=? AND is_delete=false"; PreparedStatement cStat = conn.prepareStatement(colSql); cStat.setInt(1, id); ResultSet crs = cStat.executeQuery(); @@ -685,7 +686,7 @@ public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistExcep } cStat.close(); // 3、Take out the query - String qSql = "SELECT \"key\", \"value\" FROM metadata_table_property WHERE table_id=? "; + String qSql = "SELECT \"key\", \"value\" FROM metadata_table_property WHERE table_id=? AND is_delete=false"; PreparedStatement qStat = conn.prepareStatement(qSql); qStat.setInt(1, id); ResultSet qrs = qStat.executeQuery(); @@ -740,7 +741,7 @@ private Integer getTableId(ObjectPath tablePath) { return null; } // 获取id - String getIdSql = "select id from metadata_table where table_name=? and database_id=?"; + String getIdSql = "select id from metadata_table where table_name=? and database_id=? and is_delete=false"; Connection conn = getConnection(); try (PreparedStatement gStat = conn.prepareStatement(getIdSql)) { gStat.setString(1, tablePath.getObjectName()); @@ -777,17 +778,21 @@ public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists) try { // todo: Now it is actually deleted, whether records will be retained for subsequent designs. conn.setAutoCommit(false); - String deletePropSql = "delete from metadata_table_property where table_id=?"; +// String deletePropSql = "delete from metadata_table_property where table_id=?"; + String deletePropSql = "UPDATE metadata_table_property SET is_delete=true WHERE table_id=? and is_delete=false"; + PreparedStatement dStat = conn.prepareStatement(deletePropSql); dStat.setInt(1, id); dStat.executeUpdate(); dStat.close(); - String deleteColSql = "delete from metadata_column where table_id=?"; +// String deleteColSql = "delete from metadata_column where table_id=?"; + String deleteColSql = "UPDATE metadata_column SET is_delete=true WHERE table_id=? and is_delete=false"; dStat = conn.prepareStatement(deleteColSql); dStat.setInt(1, id); dStat.executeUpdate(); dStat.close(); - String deleteDbSql = "delete from metadata_table where id=?"; +// String deleteDbSql = "delete from metadata_table where id=?"; + String deleteDbSql = "UPDATE metadata_table SET is_delete=true WHERE id=? and is_delete=false"; dStat = conn.prepareStatement(deleteDbSql); dStat.setInt(1, id); dStat.executeUpdate(); @@ -822,7 +827,7 @@ public void renameTable(ObjectPath tablePath, String newTableName, boolean ignor if (tableExists(newPath)) { throw new TableAlreadyExistException(getName(), newPath); } - String updateSql = "UPDATE metadata_table SET table_name=? WHERE id=?"; + String updateSql = "UPDATE metadata_table SET table_name=? WHERE id=? AND is_delete=false"; Connection conn = getConnection(); try (PreparedStatement ps = conn.prepareStatement(updateSql)) { ps.setString(1, newTableName); @@ -999,7 +1004,9 @@ public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean String updateSql = "INSERT INTO metadata_table_property(table_id," + "key, value) values (?,?,?) " - + "on CONFLICT (table_id, \"key\") do update set value = excluded.value, update_time = now()"; + + "on CONFLICT (table_id, \"key\") do update set value = excluded.value, update_time = now()" + + "WHERE is_delete=false"; + Connection conn = getConnection(); try (PreparedStatement ps = conn.prepareStatement(updateSql)) { for (Map.Entry entry : opts.entrySet()) { @@ -1178,7 +1185,7 @@ public List listFunctions(String dbName) throws DatabaseNotExistExceptio if (null == dbId) { throw new DatabaseNotExistException(getName(), dbName); } - String querySql = "SELECT function_name from metadata_function WHERE database_id=?"; + String querySql = "SELECT function_name from metadata_function WHERE database_id=? AND is_delete=false"; Connection conn = getConnection(); try (PreparedStatement gStat = conn.prepareStatement(querySql)) { @@ -1211,7 +1218,7 @@ public CatalogFunction getFunction(ObjectPath functionPath) throws FunctionNotEx throw new FunctionNotExistException(getName(), functionPath); } - String querySql = "SELECT class_name,function_language from metadata_function WHERE id=?"; + String querySql = "SELECT class_name,function_language from metadata_function WHERE id=? AND is_delete=false"; Connection conn = getConnection(); try (PreparedStatement gStat = conn.prepareStatement(querySql)) { gStat.setInt(1, id); @@ -1256,7 +1263,7 @@ private Integer getFunctionId(ObjectPath functionPath) { return null; } // Get id - String getIdSql = "select id from metadata_function where function_name=? and database_id=?"; + String getIdSql = "select id from metadata_function where function_name=? and database_id=? and is_delete=false"; Connection conn = getConnection(); try (PreparedStatement gStat = conn.prepareStatement(getIdSql)) { gStat.setString(1, functionPath.getObjectName()); @@ -1334,7 +1341,7 @@ public void alterFunction(ObjectPath functionPath, CatalogFunction newFunction, } Connection conn = getConnection(); - String insertSql = "update metadata_function set class_name =?, function_language=? where id=?"; + String insertSql = "update metadata_function set class_name =?, function_language=? where id=? and is_delete=false"; try (PreparedStatement ps = conn.prepareStatement(insertSql)) { ps.setString(1, newFunction.getClassName()); ps.setString(2, newFunction.getFunctionLanguage().toString()); @@ -1366,7 +1373,8 @@ public void dropFunction(ObjectPath functionPath, boolean ignoreIfNotExists) } Connection conn = getConnection(); - String insertSql = "delete from metadata_function where id=?"; +// String insertSql = "delete from metadata_function where id=?"; + String insertSql = "UPDATE metadata_function SET is_delete=true WHERE id=? AND is_delete=false"; try (PreparedStatement ps = conn.prepareStatement(insertSql)) { ps.setInt(1, id); ps.executeUpdate(); diff --git a/dinky-catalog/dinky-catalog-postgres/dinky-catalog-postgres-1.15/src/main/java/org/dinky/flink/catalog/DinkyPostgresCatalog.java b/dinky-catalog/dinky-catalog-postgres/dinky-catalog-postgres-1.15/src/main/java/org/dinky/flink/catalog/DinkyPostgresCatalog.java index 5357b49a70..28eae05ecf 100644 --- a/dinky-catalog/dinky-catalog-postgres/dinky-catalog-postgres-1.15/src/main/java/org/dinky/flink/catalog/DinkyPostgresCatalog.java +++ b/dinky-catalog/dinky-catalog-postgres/dinky-catalog-postgres-1.15/src/main/java/org/dinky/flink/catalog/DinkyPostgresCatalog.java @@ -276,7 +276,7 @@ protected Connection getConnection() throws CatalogException { @Override public List listDatabases() throws CatalogException { List myDatabases = new ArrayList<>(); - String querySql = "SELECT database_name FROM metadata_database"; + String querySql = "SELECT database_name FROM metadata_database WHERE is_delete=false"; Connection conn = getConnection(); try (PreparedStatement ps = conn.prepareStatement(querySql)) { @@ -302,7 +302,7 @@ public List listDatabases() throws CatalogException { */ @Override public CatalogDatabase getDatabase(String databaseName) throws DatabaseNotExistException, CatalogException { - String querySql = "SELECT id, database_name,description FROM metadata_database where database_name=?"; + String querySql = "SELECT id, database_name, description FROM metadata_database WHERE database_name=? AND is_delete=false"; Connection conn = getConnection(); try (PreparedStatement ps = conn.prepareStatement(querySql)) { ps.setString(1, databaseName); @@ -314,7 +314,7 @@ public CatalogDatabase getDatabase(String databaseName) throws DatabaseNotExistE Map map = new HashMap<>(); - String sql = "select \"key\", \"value\" from metadata_database_property where database_id=? "; + String sql = "SELECT \"key\", \"value\" FROM metadata_database_property WHERE database_id=? AND is_delete=false"; try (PreparedStatement pStat = conn.prepareStatement(sql)) { pStat.setInt(1, id); ResultSet prs = pStat.executeQuery(); @@ -357,7 +357,7 @@ public boolean databaseExists(String databaseName) throws CatalogException { * @throws CatalogException This exception is thrown if an error occurs while getting the database ID */ private Integer getDatabaseId(String databaseName) throws CatalogException { - String querySql = "select id from metadata_database where database_name=?"; + String querySql = "SELECT id FROM metadata_database WHERE database_name=? AND is_delete=false"; Connection conn = getConnection(); try (PreparedStatement ps = conn.prepareStatement(querySql)) { ps.setString(1, databaseName); @@ -479,13 +479,13 @@ public void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade } } // todo: Now it is actually deleted, whether records will be retained for subsequent designs. - String deletePropSql = "delete from metadata_database_property where database_id=?"; - PreparedStatement dStat = conn.prepareStatement(deletePropSql); + String updatePropSql = "UPDATE metadata_database_property SET is_delete=true WHERE database_id=? AND is_delete=false"; + PreparedStatement dStat = conn.prepareStatement(updatePropSql); dStat.setInt(1, id); dStat.executeUpdate(); dStat.close(); - String deleteDbSql = "delete from metadata_database where id=?"; - dStat = conn.prepareStatement(deleteDbSql); + String updateDbSql = "UPDATE metadata_database SET is_delete=true WHERE id=? AND is_delete=false"; + dStat = conn.prepareStatement(updateDbSql); dStat.setInt(1, id); dStat.executeUpdate(); dStat.close(); @@ -523,16 +523,17 @@ public void alterDatabase(String name, CatalogDatabase newDb, boolean ignoreIfNo try { conn.setAutoCommit(false); // 1. The name cannot be changed and the type cannot be changed. Only notes can be changed - String updateCommentSql = "update metadata_database set description=? where id=?"; + String updateCommentSql = "UPDATE metadata_database SET description=? WHERE id=? AND is_delete=false"; PreparedStatement uState = conn.prepareStatement(updateCommentSql); uState.setString(1, newDb.getComment()); uState.setInt(2, id); uState.executeUpdate(); uState.close(); if (newDb.getProperties() != null && newDb.getProperties().size() > 0) { - String upsertSql = "insert into metadata_database_property (database_id, key, value) " - + "values (?,?,?) " - + "on CONFLICT (database_id, \"key\") do update set value = excluded.value, update_time = now()"; + String upsertSql = "INSERT INTO metadata_database_property (database_id, key, value) " + + "VALUES (?, ?, ?) " + + "ON CONFLICT (database_id, \"key\") DO UPDATE SET value = excluded.value, update_time = now() " + + "WHERE is_delete=false"; PreparedStatement pstat = conn.prepareStatement(upsertSql); for (Map.Entry entry : newDb.getProperties().entrySet()) { pstat.setInt(1, id); @@ -593,7 +594,7 @@ protected List listTablesViews(String databaseName, String tableType) // get all schemas // To give table or view - String querySql = "SELECT table_name FROM metadata_table where table_type=? and database_id = ?"; + String querySql = "SELECT table_name FROM metadata_table WHERE database_name=? AND object_type=? AND is_delete=false"; Connection conn = getConnection(); try (PreparedStatement ps = conn.prepareStatement(querySql)) { ps.setString(1, tableType); @@ -634,7 +635,7 @@ public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistExcep Connection conn = getConnection(); try { - String queryTable = "SELECT table_name ,description, table_type FROM metadata_table where id=?"; + String queryTable = "SELECT table_name ,description, table_type FROM metadata_table where id=? AND is_delete=false"; PreparedStatement ps = conn.prepareStatement(queryTable); ps.setInt(1, id); ResultSet rs = ps.executeQuery(); @@ -650,7 +651,7 @@ public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistExcep } if (tableType.equals(ObjectType.TABLE)) { // This is table - String propSql = "SELECT \"key\", \"value\" from metadata_table_property " + "WHERE table_id=?"; + String propSql = "SELECT \"key\", \"value\" from metadata_table_property WHERE table_id=? AND is_delete=false"; PreparedStatement pState = conn.prepareStatement(propSql); pState.setInt(1, id); ResultSet prs = pState.executeQuery(); @@ -667,7 +668,7 @@ public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistExcep // 1. Get table information from the library. (Already done before) // 2. Take out the field. String colSql = - "SELECT column_name, column_type, data_type, description FROM metadata_column WHERE table_id=?"; + "SELECT column_name, column_type, data_type, description FROM metadata_column WHERE table_id=? AND is_delete=false"; PreparedStatement cStat = conn.prepareStatement(colSql); cStat.setInt(1, id); ResultSet crs = cStat.executeQuery(); @@ -685,7 +686,7 @@ public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistExcep } cStat.close(); // 3、Take out the query - String qSql = "SELECT \"key\", \"value\" FROM metadata_table_property WHERE table_id=? "; + String qSql = "SELECT \"key\", \"value\" FROM metadata_table_property WHERE table_id=? AND is_delete=false"; PreparedStatement qStat = conn.prepareStatement(qSql); qStat.setInt(1, id); ResultSet qrs = qStat.executeQuery(); @@ -740,7 +741,7 @@ private Integer getTableId(ObjectPath tablePath) { return null; } // 获取id - String getIdSql = "select id from metadata_table where table_name=? and database_id=?"; + String getIdSql = "select id from metadata_table where table_name=? and database_id=? and is_delete=false"; Connection conn = getConnection(); try (PreparedStatement gStat = conn.prepareStatement(getIdSql)) { gStat.setString(1, tablePath.getObjectName()); @@ -777,17 +778,21 @@ public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists) try { // todo: Now it is actually deleted, whether records will be retained for subsequent designs. conn.setAutoCommit(false); - String deletePropSql = "delete from metadata_table_property where table_id=?"; +// String deletePropSql = "delete from metadata_table_property where table_id=?"; + String deletePropSql = "UPDATE metadata_table_property SET is_delete=true WHERE table_id=? and is_delete=false"; + PreparedStatement dStat = conn.prepareStatement(deletePropSql); dStat.setInt(1, id); dStat.executeUpdate(); dStat.close(); - String deleteColSql = "delete from metadata_column where table_id=?"; +// String deleteColSql = "delete from metadata_column where table_id=?"; + String deleteColSql = "UPDATE metadata_column SET is_delete=true WHERE table_id=? and is_delete=false"; dStat = conn.prepareStatement(deleteColSql); dStat.setInt(1, id); dStat.executeUpdate(); dStat.close(); - String deleteDbSql = "delete from metadata_table where id=?"; +// String deleteDbSql = "delete from metadata_table where id=?"; + String deleteDbSql = "UPDATE metadata_table SET is_delete=true WHERE id=? and is_delete=false"; dStat = conn.prepareStatement(deleteDbSql); dStat.setInt(1, id); dStat.executeUpdate(); @@ -822,7 +827,7 @@ public void renameTable(ObjectPath tablePath, String newTableName, boolean ignor if (tableExists(newPath)) { throw new TableAlreadyExistException(getName(), newPath); } - String updateSql = "UPDATE metadata_table SET table_name=? WHERE id=?"; + String updateSql = "UPDATE metadata_table SET table_name=? WHERE id=? AND is_delete=false"; Connection conn = getConnection(); try (PreparedStatement ps = conn.prepareStatement(updateSql)) { ps.setString(1, newTableName); @@ -999,7 +1004,9 @@ public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean String updateSql = "INSERT INTO metadata_table_property(table_id," + "key, value) values (?,?,?) " - + "on CONFLICT (table_id, \"key\") do update set value = excluded.value, update_time = now()"; + + "on CONFLICT (table_id, \"key\") do update set value = excluded.value, update_time = now()" + + "WHERE is_delete=false"; + Connection conn = getConnection(); try (PreparedStatement ps = conn.prepareStatement(updateSql)) { for (Map.Entry entry : opts.entrySet()) { @@ -1178,7 +1185,7 @@ public List listFunctions(String dbName) throws DatabaseNotExistExceptio if (null == dbId) { throw new DatabaseNotExistException(getName(), dbName); } - String querySql = "SELECT function_name from metadata_function WHERE database_id=?"; + String querySql = "SELECT function_name from metadata_function WHERE database_id=? AND is_delete=false"; Connection conn = getConnection(); try (PreparedStatement gStat = conn.prepareStatement(querySql)) { @@ -1211,7 +1218,7 @@ public CatalogFunction getFunction(ObjectPath functionPath) throws FunctionNotEx throw new FunctionNotExistException(getName(), functionPath); } - String querySql = "SELECT class_name,function_language from metadata_function WHERE id=?"; + String querySql = "SELECT class_name,function_language from metadata_function WHERE id=? AND is_delete=false"; Connection conn = getConnection(); try (PreparedStatement gStat = conn.prepareStatement(querySql)) { gStat.setInt(1, id); @@ -1256,7 +1263,7 @@ private Integer getFunctionId(ObjectPath functionPath) { return null; } // Get id - String getIdSql = "select id from metadata_function where function_name=? and database_id=?"; + String getIdSql = "select id from metadata_function where function_name=? and database_id=? and is_delete=false"; Connection conn = getConnection(); try (PreparedStatement gStat = conn.prepareStatement(getIdSql)) { gStat.setString(1, functionPath.getObjectName()); @@ -1334,7 +1341,7 @@ public void alterFunction(ObjectPath functionPath, CatalogFunction newFunction, } Connection conn = getConnection(); - String insertSql = "update metadata_function set class_name =?, function_language=? where id=?"; + String insertSql = "update metadata_function set class_name =?, function_language=? where id=? and is_delete=false"; try (PreparedStatement ps = conn.prepareStatement(insertSql)) { ps.setString(1, newFunction.getClassName()); ps.setString(2, newFunction.getFunctionLanguage().toString()); @@ -1366,7 +1373,8 @@ public void dropFunction(ObjectPath functionPath, boolean ignoreIfNotExists) } Connection conn = getConnection(); - String insertSql = "delete from metadata_function where id=?"; +// String insertSql = "delete from metadata_function where id=?"; + String insertSql = "UPDATE metadata_function SET is_delete=true WHERE id=? AND is_delete=false"; try (PreparedStatement ps = conn.prepareStatement(insertSql)) { ps.setInt(1, id); ps.executeUpdate(); diff --git a/dinky-catalog/dinky-catalog-postgres/dinky-catalog-postgres-1.16/src/main/java/org/dinky/flink/catalog/DinkyPostgresCatalog.java b/dinky-catalog/dinky-catalog-postgres/dinky-catalog-postgres-1.16/src/main/java/org/dinky/flink/catalog/DinkyPostgresCatalog.java index 5357b49a70..28eae05ecf 100644 --- a/dinky-catalog/dinky-catalog-postgres/dinky-catalog-postgres-1.16/src/main/java/org/dinky/flink/catalog/DinkyPostgresCatalog.java +++ b/dinky-catalog/dinky-catalog-postgres/dinky-catalog-postgres-1.16/src/main/java/org/dinky/flink/catalog/DinkyPostgresCatalog.java @@ -276,7 +276,7 @@ protected Connection getConnection() throws CatalogException { @Override public List listDatabases() throws CatalogException { List myDatabases = new ArrayList<>(); - String querySql = "SELECT database_name FROM metadata_database"; + String querySql = "SELECT database_name FROM metadata_database WHERE is_delete=false"; Connection conn = getConnection(); try (PreparedStatement ps = conn.prepareStatement(querySql)) { @@ -302,7 +302,7 @@ public List listDatabases() throws CatalogException { */ @Override public CatalogDatabase getDatabase(String databaseName) throws DatabaseNotExistException, CatalogException { - String querySql = "SELECT id, database_name,description FROM metadata_database where database_name=?"; + String querySql = "SELECT id, database_name, description FROM metadata_database WHERE database_name=? AND is_delete=false"; Connection conn = getConnection(); try (PreparedStatement ps = conn.prepareStatement(querySql)) { ps.setString(1, databaseName); @@ -314,7 +314,7 @@ public CatalogDatabase getDatabase(String databaseName) throws DatabaseNotExistE Map map = new HashMap<>(); - String sql = "select \"key\", \"value\" from metadata_database_property where database_id=? "; + String sql = "SELECT \"key\", \"value\" FROM metadata_database_property WHERE database_id=? AND is_delete=false"; try (PreparedStatement pStat = conn.prepareStatement(sql)) { pStat.setInt(1, id); ResultSet prs = pStat.executeQuery(); @@ -357,7 +357,7 @@ public boolean databaseExists(String databaseName) throws CatalogException { * @throws CatalogException This exception is thrown if an error occurs while getting the database ID */ private Integer getDatabaseId(String databaseName) throws CatalogException { - String querySql = "select id from metadata_database where database_name=?"; + String querySql = "SELECT id FROM metadata_database WHERE database_name=? AND is_delete=false"; Connection conn = getConnection(); try (PreparedStatement ps = conn.prepareStatement(querySql)) { ps.setString(1, databaseName); @@ -479,13 +479,13 @@ public void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade } } // todo: Now it is actually deleted, whether records will be retained for subsequent designs. - String deletePropSql = "delete from metadata_database_property where database_id=?"; - PreparedStatement dStat = conn.prepareStatement(deletePropSql); + String updatePropSql = "UPDATE metadata_database_property SET is_delete=true WHERE database_id=? AND is_delete=false"; + PreparedStatement dStat = conn.prepareStatement(updatePropSql); dStat.setInt(1, id); dStat.executeUpdate(); dStat.close(); - String deleteDbSql = "delete from metadata_database where id=?"; - dStat = conn.prepareStatement(deleteDbSql); + String updateDbSql = "UPDATE metadata_database SET is_delete=true WHERE id=? AND is_delete=false"; + dStat = conn.prepareStatement(updateDbSql); dStat.setInt(1, id); dStat.executeUpdate(); dStat.close(); @@ -523,16 +523,17 @@ public void alterDatabase(String name, CatalogDatabase newDb, boolean ignoreIfNo try { conn.setAutoCommit(false); // 1. The name cannot be changed and the type cannot be changed. Only notes can be changed - String updateCommentSql = "update metadata_database set description=? where id=?"; + String updateCommentSql = "UPDATE metadata_database SET description=? WHERE id=? AND is_delete=false"; PreparedStatement uState = conn.prepareStatement(updateCommentSql); uState.setString(1, newDb.getComment()); uState.setInt(2, id); uState.executeUpdate(); uState.close(); if (newDb.getProperties() != null && newDb.getProperties().size() > 0) { - String upsertSql = "insert into metadata_database_property (database_id, key, value) " - + "values (?,?,?) " - + "on CONFLICT (database_id, \"key\") do update set value = excluded.value, update_time = now()"; + String upsertSql = "INSERT INTO metadata_database_property (database_id, key, value) " + + "VALUES (?, ?, ?) " + + "ON CONFLICT (database_id, \"key\") DO UPDATE SET value = excluded.value, update_time = now() " + + "WHERE is_delete=false"; PreparedStatement pstat = conn.prepareStatement(upsertSql); for (Map.Entry entry : newDb.getProperties().entrySet()) { pstat.setInt(1, id); @@ -593,7 +594,7 @@ protected List listTablesViews(String databaseName, String tableType) // get all schemas // To give table or view - String querySql = "SELECT table_name FROM metadata_table where table_type=? and database_id = ?"; + String querySql = "SELECT table_name FROM metadata_table WHERE database_name=? AND object_type=? AND is_delete=false"; Connection conn = getConnection(); try (PreparedStatement ps = conn.prepareStatement(querySql)) { ps.setString(1, tableType); @@ -634,7 +635,7 @@ public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistExcep Connection conn = getConnection(); try { - String queryTable = "SELECT table_name ,description, table_type FROM metadata_table where id=?"; + String queryTable = "SELECT table_name ,description, table_type FROM metadata_table where id=? AND is_delete=false"; PreparedStatement ps = conn.prepareStatement(queryTable); ps.setInt(1, id); ResultSet rs = ps.executeQuery(); @@ -650,7 +651,7 @@ public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistExcep } if (tableType.equals(ObjectType.TABLE)) { // This is table - String propSql = "SELECT \"key\", \"value\" from metadata_table_property " + "WHERE table_id=?"; + String propSql = "SELECT \"key\", \"value\" from metadata_table_property WHERE table_id=? AND is_delete=false"; PreparedStatement pState = conn.prepareStatement(propSql); pState.setInt(1, id); ResultSet prs = pState.executeQuery(); @@ -667,7 +668,7 @@ public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistExcep // 1. Get table information from the library. (Already done before) // 2. Take out the field. String colSql = - "SELECT column_name, column_type, data_type, description FROM metadata_column WHERE table_id=?"; + "SELECT column_name, column_type, data_type, description FROM metadata_column WHERE table_id=? AND is_delete=false"; PreparedStatement cStat = conn.prepareStatement(colSql); cStat.setInt(1, id); ResultSet crs = cStat.executeQuery(); @@ -685,7 +686,7 @@ public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistExcep } cStat.close(); // 3、Take out the query - String qSql = "SELECT \"key\", \"value\" FROM metadata_table_property WHERE table_id=? "; + String qSql = "SELECT \"key\", \"value\" FROM metadata_table_property WHERE table_id=? AND is_delete=false"; PreparedStatement qStat = conn.prepareStatement(qSql); qStat.setInt(1, id); ResultSet qrs = qStat.executeQuery(); @@ -740,7 +741,7 @@ private Integer getTableId(ObjectPath tablePath) { return null; } // 获取id - String getIdSql = "select id from metadata_table where table_name=? and database_id=?"; + String getIdSql = "select id from metadata_table where table_name=? and database_id=? and is_delete=false"; Connection conn = getConnection(); try (PreparedStatement gStat = conn.prepareStatement(getIdSql)) { gStat.setString(1, tablePath.getObjectName()); @@ -777,17 +778,21 @@ public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists) try { // todo: Now it is actually deleted, whether records will be retained for subsequent designs. conn.setAutoCommit(false); - String deletePropSql = "delete from metadata_table_property where table_id=?"; +// String deletePropSql = "delete from metadata_table_property where table_id=?"; + String deletePropSql = "UPDATE metadata_table_property SET is_delete=true WHERE table_id=? and is_delete=false"; + PreparedStatement dStat = conn.prepareStatement(deletePropSql); dStat.setInt(1, id); dStat.executeUpdate(); dStat.close(); - String deleteColSql = "delete from metadata_column where table_id=?"; +// String deleteColSql = "delete from metadata_column where table_id=?"; + String deleteColSql = "UPDATE metadata_column SET is_delete=true WHERE table_id=? and is_delete=false"; dStat = conn.prepareStatement(deleteColSql); dStat.setInt(1, id); dStat.executeUpdate(); dStat.close(); - String deleteDbSql = "delete from metadata_table where id=?"; +// String deleteDbSql = "delete from metadata_table where id=?"; + String deleteDbSql = "UPDATE metadata_table SET is_delete=true WHERE id=? and is_delete=false"; dStat = conn.prepareStatement(deleteDbSql); dStat.setInt(1, id); dStat.executeUpdate(); @@ -822,7 +827,7 @@ public void renameTable(ObjectPath tablePath, String newTableName, boolean ignor if (tableExists(newPath)) { throw new TableAlreadyExistException(getName(), newPath); } - String updateSql = "UPDATE metadata_table SET table_name=? WHERE id=?"; + String updateSql = "UPDATE metadata_table SET table_name=? WHERE id=? AND is_delete=false"; Connection conn = getConnection(); try (PreparedStatement ps = conn.prepareStatement(updateSql)) { ps.setString(1, newTableName); @@ -999,7 +1004,9 @@ public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean String updateSql = "INSERT INTO metadata_table_property(table_id," + "key, value) values (?,?,?) " - + "on CONFLICT (table_id, \"key\") do update set value = excluded.value, update_time = now()"; + + "on CONFLICT (table_id, \"key\") do update set value = excluded.value, update_time = now()" + + "WHERE is_delete=false"; + Connection conn = getConnection(); try (PreparedStatement ps = conn.prepareStatement(updateSql)) { for (Map.Entry entry : opts.entrySet()) { @@ -1178,7 +1185,7 @@ public List listFunctions(String dbName) throws DatabaseNotExistExceptio if (null == dbId) { throw new DatabaseNotExistException(getName(), dbName); } - String querySql = "SELECT function_name from metadata_function WHERE database_id=?"; + String querySql = "SELECT function_name from metadata_function WHERE database_id=? AND is_delete=false"; Connection conn = getConnection(); try (PreparedStatement gStat = conn.prepareStatement(querySql)) { @@ -1211,7 +1218,7 @@ public CatalogFunction getFunction(ObjectPath functionPath) throws FunctionNotEx throw new FunctionNotExistException(getName(), functionPath); } - String querySql = "SELECT class_name,function_language from metadata_function WHERE id=?"; + String querySql = "SELECT class_name,function_language from metadata_function WHERE id=? AND is_delete=false"; Connection conn = getConnection(); try (PreparedStatement gStat = conn.prepareStatement(querySql)) { gStat.setInt(1, id); @@ -1256,7 +1263,7 @@ private Integer getFunctionId(ObjectPath functionPath) { return null; } // Get id - String getIdSql = "select id from metadata_function where function_name=? and database_id=?"; + String getIdSql = "select id from metadata_function where function_name=? and database_id=? and is_delete=false"; Connection conn = getConnection(); try (PreparedStatement gStat = conn.prepareStatement(getIdSql)) { gStat.setString(1, functionPath.getObjectName()); @@ -1334,7 +1341,7 @@ public void alterFunction(ObjectPath functionPath, CatalogFunction newFunction, } Connection conn = getConnection(); - String insertSql = "update metadata_function set class_name =?, function_language=? where id=?"; + String insertSql = "update metadata_function set class_name =?, function_language=? where id=? and is_delete=false"; try (PreparedStatement ps = conn.prepareStatement(insertSql)) { ps.setString(1, newFunction.getClassName()); ps.setString(2, newFunction.getFunctionLanguage().toString()); @@ -1366,7 +1373,8 @@ public void dropFunction(ObjectPath functionPath, boolean ignoreIfNotExists) } Connection conn = getConnection(); - String insertSql = "delete from metadata_function where id=?"; +// String insertSql = "delete from metadata_function where id=?"; + String insertSql = "UPDATE metadata_function SET is_delete=true WHERE id=? AND is_delete=false"; try (PreparedStatement ps = conn.prepareStatement(insertSql)) { ps.setInt(1, id); ps.executeUpdate(); diff --git a/dinky-catalog/dinky-catalog-postgres/dinky-catalog-postgres-1.17/src/main/java/org/dinky/flink/catalog/DinkyPostgresCatalog.java b/dinky-catalog/dinky-catalog-postgres/dinky-catalog-postgres-1.17/src/main/java/org/dinky/flink/catalog/DinkyPostgresCatalog.java index 5357b49a70..28eae05ecf 100644 --- a/dinky-catalog/dinky-catalog-postgres/dinky-catalog-postgres-1.17/src/main/java/org/dinky/flink/catalog/DinkyPostgresCatalog.java +++ b/dinky-catalog/dinky-catalog-postgres/dinky-catalog-postgres-1.17/src/main/java/org/dinky/flink/catalog/DinkyPostgresCatalog.java @@ -276,7 +276,7 @@ protected Connection getConnection() throws CatalogException { @Override public List listDatabases() throws CatalogException { List myDatabases = new ArrayList<>(); - String querySql = "SELECT database_name FROM metadata_database"; + String querySql = "SELECT database_name FROM metadata_database WHERE is_delete=false"; Connection conn = getConnection(); try (PreparedStatement ps = conn.prepareStatement(querySql)) { @@ -302,7 +302,7 @@ public List listDatabases() throws CatalogException { */ @Override public CatalogDatabase getDatabase(String databaseName) throws DatabaseNotExistException, CatalogException { - String querySql = "SELECT id, database_name,description FROM metadata_database where database_name=?"; + String querySql = "SELECT id, database_name, description FROM metadata_database WHERE database_name=? AND is_delete=false"; Connection conn = getConnection(); try (PreparedStatement ps = conn.prepareStatement(querySql)) { ps.setString(1, databaseName); @@ -314,7 +314,7 @@ public CatalogDatabase getDatabase(String databaseName) throws DatabaseNotExistE Map map = new HashMap<>(); - String sql = "select \"key\", \"value\" from metadata_database_property where database_id=? "; + String sql = "SELECT \"key\", \"value\" FROM metadata_database_property WHERE database_id=? AND is_delete=false"; try (PreparedStatement pStat = conn.prepareStatement(sql)) { pStat.setInt(1, id); ResultSet prs = pStat.executeQuery(); @@ -357,7 +357,7 @@ public boolean databaseExists(String databaseName) throws CatalogException { * @throws CatalogException This exception is thrown if an error occurs while getting the database ID */ private Integer getDatabaseId(String databaseName) throws CatalogException { - String querySql = "select id from metadata_database where database_name=?"; + String querySql = "SELECT id FROM metadata_database WHERE database_name=? AND is_delete=false"; Connection conn = getConnection(); try (PreparedStatement ps = conn.prepareStatement(querySql)) { ps.setString(1, databaseName); @@ -479,13 +479,13 @@ public void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade } } // todo: Now it is actually deleted, whether records will be retained for subsequent designs. - String deletePropSql = "delete from metadata_database_property where database_id=?"; - PreparedStatement dStat = conn.prepareStatement(deletePropSql); + String updatePropSql = "UPDATE metadata_database_property SET is_delete=true WHERE database_id=? AND is_delete=false"; + PreparedStatement dStat = conn.prepareStatement(updatePropSql); dStat.setInt(1, id); dStat.executeUpdate(); dStat.close(); - String deleteDbSql = "delete from metadata_database where id=?"; - dStat = conn.prepareStatement(deleteDbSql); + String updateDbSql = "UPDATE metadata_database SET is_delete=true WHERE id=? AND is_delete=false"; + dStat = conn.prepareStatement(updateDbSql); dStat.setInt(1, id); dStat.executeUpdate(); dStat.close(); @@ -523,16 +523,17 @@ public void alterDatabase(String name, CatalogDatabase newDb, boolean ignoreIfNo try { conn.setAutoCommit(false); // 1. The name cannot be changed and the type cannot be changed. Only notes can be changed - String updateCommentSql = "update metadata_database set description=? where id=?"; + String updateCommentSql = "UPDATE metadata_database SET description=? WHERE id=? AND is_delete=false"; PreparedStatement uState = conn.prepareStatement(updateCommentSql); uState.setString(1, newDb.getComment()); uState.setInt(2, id); uState.executeUpdate(); uState.close(); if (newDb.getProperties() != null && newDb.getProperties().size() > 0) { - String upsertSql = "insert into metadata_database_property (database_id, key, value) " - + "values (?,?,?) " - + "on CONFLICT (database_id, \"key\") do update set value = excluded.value, update_time = now()"; + String upsertSql = "INSERT INTO metadata_database_property (database_id, key, value) " + + "VALUES (?, ?, ?) " + + "ON CONFLICT (database_id, \"key\") DO UPDATE SET value = excluded.value, update_time = now() " + + "WHERE is_delete=false"; PreparedStatement pstat = conn.prepareStatement(upsertSql); for (Map.Entry entry : newDb.getProperties().entrySet()) { pstat.setInt(1, id); @@ -593,7 +594,7 @@ protected List listTablesViews(String databaseName, String tableType) // get all schemas // To give table or view - String querySql = "SELECT table_name FROM metadata_table where table_type=? and database_id = ?"; + String querySql = "SELECT table_name FROM metadata_table WHERE database_name=? AND object_type=? AND is_delete=false"; Connection conn = getConnection(); try (PreparedStatement ps = conn.prepareStatement(querySql)) { ps.setString(1, tableType); @@ -634,7 +635,7 @@ public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistExcep Connection conn = getConnection(); try { - String queryTable = "SELECT table_name ,description, table_type FROM metadata_table where id=?"; + String queryTable = "SELECT table_name ,description, table_type FROM metadata_table where id=? AND is_delete=false"; PreparedStatement ps = conn.prepareStatement(queryTable); ps.setInt(1, id); ResultSet rs = ps.executeQuery(); @@ -650,7 +651,7 @@ public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistExcep } if (tableType.equals(ObjectType.TABLE)) { // This is table - String propSql = "SELECT \"key\", \"value\" from metadata_table_property " + "WHERE table_id=?"; + String propSql = "SELECT \"key\", \"value\" from metadata_table_property WHERE table_id=? AND is_delete=false"; PreparedStatement pState = conn.prepareStatement(propSql); pState.setInt(1, id); ResultSet prs = pState.executeQuery(); @@ -667,7 +668,7 @@ public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistExcep // 1. Get table information from the library. (Already done before) // 2. Take out the field. String colSql = - "SELECT column_name, column_type, data_type, description FROM metadata_column WHERE table_id=?"; + "SELECT column_name, column_type, data_type, description FROM metadata_column WHERE table_id=? AND is_delete=false"; PreparedStatement cStat = conn.prepareStatement(colSql); cStat.setInt(1, id); ResultSet crs = cStat.executeQuery(); @@ -685,7 +686,7 @@ public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistExcep } cStat.close(); // 3、Take out the query - String qSql = "SELECT \"key\", \"value\" FROM metadata_table_property WHERE table_id=? "; + String qSql = "SELECT \"key\", \"value\" FROM metadata_table_property WHERE table_id=? AND is_delete=false"; PreparedStatement qStat = conn.prepareStatement(qSql); qStat.setInt(1, id); ResultSet qrs = qStat.executeQuery(); @@ -740,7 +741,7 @@ private Integer getTableId(ObjectPath tablePath) { return null; } // 获取id - String getIdSql = "select id from metadata_table where table_name=? and database_id=?"; + String getIdSql = "select id from metadata_table where table_name=? and database_id=? and is_delete=false"; Connection conn = getConnection(); try (PreparedStatement gStat = conn.prepareStatement(getIdSql)) { gStat.setString(1, tablePath.getObjectName()); @@ -777,17 +778,21 @@ public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists) try { // todo: Now it is actually deleted, whether records will be retained for subsequent designs. conn.setAutoCommit(false); - String deletePropSql = "delete from metadata_table_property where table_id=?"; +// String deletePropSql = "delete from metadata_table_property where table_id=?"; + String deletePropSql = "UPDATE metadata_table_property SET is_delete=true WHERE table_id=? and is_delete=false"; + PreparedStatement dStat = conn.prepareStatement(deletePropSql); dStat.setInt(1, id); dStat.executeUpdate(); dStat.close(); - String deleteColSql = "delete from metadata_column where table_id=?"; +// String deleteColSql = "delete from metadata_column where table_id=?"; + String deleteColSql = "UPDATE metadata_column SET is_delete=true WHERE table_id=? and is_delete=false"; dStat = conn.prepareStatement(deleteColSql); dStat.setInt(1, id); dStat.executeUpdate(); dStat.close(); - String deleteDbSql = "delete from metadata_table where id=?"; +// String deleteDbSql = "delete from metadata_table where id=?"; + String deleteDbSql = "UPDATE metadata_table SET is_delete=true WHERE id=? and is_delete=false"; dStat = conn.prepareStatement(deleteDbSql); dStat.setInt(1, id); dStat.executeUpdate(); @@ -822,7 +827,7 @@ public void renameTable(ObjectPath tablePath, String newTableName, boolean ignor if (tableExists(newPath)) { throw new TableAlreadyExistException(getName(), newPath); } - String updateSql = "UPDATE metadata_table SET table_name=? WHERE id=?"; + String updateSql = "UPDATE metadata_table SET table_name=? WHERE id=? AND is_delete=false"; Connection conn = getConnection(); try (PreparedStatement ps = conn.prepareStatement(updateSql)) { ps.setString(1, newTableName); @@ -999,7 +1004,9 @@ public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean String updateSql = "INSERT INTO metadata_table_property(table_id," + "key, value) values (?,?,?) " - + "on CONFLICT (table_id, \"key\") do update set value = excluded.value, update_time = now()"; + + "on CONFLICT (table_id, \"key\") do update set value = excluded.value, update_time = now()" + + "WHERE is_delete=false"; + Connection conn = getConnection(); try (PreparedStatement ps = conn.prepareStatement(updateSql)) { for (Map.Entry entry : opts.entrySet()) { @@ -1178,7 +1185,7 @@ public List listFunctions(String dbName) throws DatabaseNotExistExceptio if (null == dbId) { throw new DatabaseNotExistException(getName(), dbName); } - String querySql = "SELECT function_name from metadata_function WHERE database_id=?"; + String querySql = "SELECT function_name from metadata_function WHERE database_id=? AND is_delete=false"; Connection conn = getConnection(); try (PreparedStatement gStat = conn.prepareStatement(querySql)) { @@ -1211,7 +1218,7 @@ public CatalogFunction getFunction(ObjectPath functionPath) throws FunctionNotEx throw new FunctionNotExistException(getName(), functionPath); } - String querySql = "SELECT class_name,function_language from metadata_function WHERE id=?"; + String querySql = "SELECT class_name,function_language from metadata_function WHERE id=? AND is_delete=false"; Connection conn = getConnection(); try (PreparedStatement gStat = conn.prepareStatement(querySql)) { gStat.setInt(1, id); @@ -1256,7 +1263,7 @@ private Integer getFunctionId(ObjectPath functionPath) { return null; } // Get id - String getIdSql = "select id from metadata_function where function_name=? and database_id=?"; + String getIdSql = "select id from metadata_function where function_name=? and database_id=? and is_delete=false"; Connection conn = getConnection(); try (PreparedStatement gStat = conn.prepareStatement(getIdSql)) { gStat.setString(1, functionPath.getObjectName()); @@ -1334,7 +1341,7 @@ public void alterFunction(ObjectPath functionPath, CatalogFunction newFunction, } Connection conn = getConnection(); - String insertSql = "update metadata_function set class_name =?, function_language=? where id=?"; + String insertSql = "update metadata_function set class_name =?, function_language=? where id=? and is_delete=false"; try (PreparedStatement ps = conn.prepareStatement(insertSql)) { ps.setString(1, newFunction.getClassName()); ps.setString(2, newFunction.getFunctionLanguage().toString()); @@ -1366,7 +1373,8 @@ public void dropFunction(ObjectPath functionPath, boolean ignoreIfNotExists) } Connection conn = getConnection(); - String insertSql = "delete from metadata_function where id=?"; +// String insertSql = "delete from metadata_function where id=?"; + String insertSql = "UPDATE metadata_function SET is_delete=true WHERE id=? AND is_delete=false"; try (PreparedStatement ps = conn.prepareStatement(insertSql)) { ps.setInt(1, id); ps.executeUpdate(); diff --git a/dinky-catalog/dinky-catalog-postgres/dinky-catalog-postgres-1.18/src/main/java/org/dinky/flink/catalog/DinkyPostgresCatalog.java b/dinky-catalog/dinky-catalog-postgres/dinky-catalog-postgres-1.18/src/main/java/org/dinky/flink/catalog/DinkyPostgresCatalog.java index 5357b49a70..28eae05ecf 100644 --- a/dinky-catalog/dinky-catalog-postgres/dinky-catalog-postgres-1.18/src/main/java/org/dinky/flink/catalog/DinkyPostgresCatalog.java +++ b/dinky-catalog/dinky-catalog-postgres/dinky-catalog-postgres-1.18/src/main/java/org/dinky/flink/catalog/DinkyPostgresCatalog.java @@ -276,7 +276,7 @@ protected Connection getConnection() throws CatalogException { @Override public List listDatabases() throws CatalogException { List myDatabases = new ArrayList<>(); - String querySql = "SELECT database_name FROM metadata_database"; + String querySql = "SELECT database_name FROM metadata_database WHERE is_delete=false"; Connection conn = getConnection(); try (PreparedStatement ps = conn.prepareStatement(querySql)) { @@ -302,7 +302,7 @@ public List listDatabases() throws CatalogException { */ @Override public CatalogDatabase getDatabase(String databaseName) throws DatabaseNotExistException, CatalogException { - String querySql = "SELECT id, database_name,description FROM metadata_database where database_name=?"; + String querySql = "SELECT id, database_name, description FROM metadata_database WHERE database_name=? AND is_delete=false"; Connection conn = getConnection(); try (PreparedStatement ps = conn.prepareStatement(querySql)) { ps.setString(1, databaseName); @@ -314,7 +314,7 @@ public CatalogDatabase getDatabase(String databaseName) throws DatabaseNotExistE Map map = new HashMap<>(); - String sql = "select \"key\", \"value\" from metadata_database_property where database_id=? "; + String sql = "SELECT \"key\", \"value\" FROM metadata_database_property WHERE database_id=? AND is_delete=false"; try (PreparedStatement pStat = conn.prepareStatement(sql)) { pStat.setInt(1, id); ResultSet prs = pStat.executeQuery(); @@ -357,7 +357,7 @@ public boolean databaseExists(String databaseName) throws CatalogException { * @throws CatalogException This exception is thrown if an error occurs while getting the database ID */ private Integer getDatabaseId(String databaseName) throws CatalogException { - String querySql = "select id from metadata_database where database_name=?"; + String querySql = "SELECT id FROM metadata_database WHERE database_name=? AND is_delete=false"; Connection conn = getConnection(); try (PreparedStatement ps = conn.prepareStatement(querySql)) { ps.setString(1, databaseName); @@ -479,13 +479,13 @@ public void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade } } // todo: Now it is actually deleted, whether records will be retained for subsequent designs. - String deletePropSql = "delete from metadata_database_property where database_id=?"; - PreparedStatement dStat = conn.prepareStatement(deletePropSql); + String updatePropSql = "UPDATE metadata_database_property SET is_delete=true WHERE database_id=? AND is_delete=false"; + PreparedStatement dStat = conn.prepareStatement(updatePropSql); dStat.setInt(1, id); dStat.executeUpdate(); dStat.close(); - String deleteDbSql = "delete from metadata_database where id=?"; - dStat = conn.prepareStatement(deleteDbSql); + String updateDbSql = "UPDATE metadata_database SET is_delete=true WHERE id=? AND is_delete=false"; + dStat = conn.prepareStatement(updateDbSql); dStat.setInt(1, id); dStat.executeUpdate(); dStat.close(); @@ -523,16 +523,17 @@ public void alterDatabase(String name, CatalogDatabase newDb, boolean ignoreIfNo try { conn.setAutoCommit(false); // 1. The name cannot be changed and the type cannot be changed. Only notes can be changed - String updateCommentSql = "update metadata_database set description=? where id=?"; + String updateCommentSql = "UPDATE metadata_database SET description=? WHERE id=? AND is_delete=false"; PreparedStatement uState = conn.prepareStatement(updateCommentSql); uState.setString(1, newDb.getComment()); uState.setInt(2, id); uState.executeUpdate(); uState.close(); if (newDb.getProperties() != null && newDb.getProperties().size() > 0) { - String upsertSql = "insert into metadata_database_property (database_id, key, value) " - + "values (?,?,?) " - + "on CONFLICT (database_id, \"key\") do update set value = excluded.value, update_time = now()"; + String upsertSql = "INSERT INTO metadata_database_property (database_id, key, value) " + + "VALUES (?, ?, ?) " + + "ON CONFLICT (database_id, \"key\") DO UPDATE SET value = excluded.value, update_time = now() " + + "WHERE is_delete=false"; PreparedStatement pstat = conn.prepareStatement(upsertSql); for (Map.Entry entry : newDb.getProperties().entrySet()) { pstat.setInt(1, id); @@ -593,7 +594,7 @@ protected List listTablesViews(String databaseName, String tableType) // get all schemas // To give table or view - String querySql = "SELECT table_name FROM metadata_table where table_type=? and database_id = ?"; + String querySql = "SELECT table_name FROM metadata_table WHERE database_name=? AND object_type=? AND is_delete=false"; Connection conn = getConnection(); try (PreparedStatement ps = conn.prepareStatement(querySql)) { ps.setString(1, tableType); @@ -634,7 +635,7 @@ public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistExcep Connection conn = getConnection(); try { - String queryTable = "SELECT table_name ,description, table_type FROM metadata_table where id=?"; + String queryTable = "SELECT table_name ,description, table_type FROM metadata_table where id=? AND is_delete=false"; PreparedStatement ps = conn.prepareStatement(queryTable); ps.setInt(1, id); ResultSet rs = ps.executeQuery(); @@ -650,7 +651,7 @@ public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistExcep } if (tableType.equals(ObjectType.TABLE)) { // This is table - String propSql = "SELECT \"key\", \"value\" from metadata_table_property " + "WHERE table_id=?"; + String propSql = "SELECT \"key\", \"value\" from metadata_table_property WHERE table_id=? AND is_delete=false"; PreparedStatement pState = conn.prepareStatement(propSql); pState.setInt(1, id); ResultSet prs = pState.executeQuery(); @@ -667,7 +668,7 @@ public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistExcep // 1. Get table information from the library. (Already done before) // 2. Take out the field. String colSql = - "SELECT column_name, column_type, data_type, description FROM metadata_column WHERE table_id=?"; + "SELECT column_name, column_type, data_type, description FROM metadata_column WHERE table_id=? AND is_delete=false"; PreparedStatement cStat = conn.prepareStatement(colSql); cStat.setInt(1, id); ResultSet crs = cStat.executeQuery(); @@ -685,7 +686,7 @@ public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistExcep } cStat.close(); // 3、Take out the query - String qSql = "SELECT \"key\", \"value\" FROM metadata_table_property WHERE table_id=? "; + String qSql = "SELECT \"key\", \"value\" FROM metadata_table_property WHERE table_id=? AND is_delete=false"; PreparedStatement qStat = conn.prepareStatement(qSql); qStat.setInt(1, id); ResultSet qrs = qStat.executeQuery(); @@ -740,7 +741,7 @@ private Integer getTableId(ObjectPath tablePath) { return null; } // 获取id - String getIdSql = "select id from metadata_table where table_name=? and database_id=?"; + String getIdSql = "select id from metadata_table where table_name=? and database_id=? and is_delete=false"; Connection conn = getConnection(); try (PreparedStatement gStat = conn.prepareStatement(getIdSql)) { gStat.setString(1, tablePath.getObjectName()); @@ -777,17 +778,21 @@ public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists) try { // todo: Now it is actually deleted, whether records will be retained for subsequent designs. conn.setAutoCommit(false); - String deletePropSql = "delete from metadata_table_property where table_id=?"; +// String deletePropSql = "delete from metadata_table_property where table_id=?"; + String deletePropSql = "UPDATE metadata_table_property SET is_delete=true WHERE table_id=? and is_delete=false"; + PreparedStatement dStat = conn.prepareStatement(deletePropSql); dStat.setInt(1, id); dStat.executeUpdate(); dStat.close(); - String deleteColSql = "delete from metadata_column where table_id=?"; +// String deleteColSql = "delete from metadata_column where table_id=?"; + String deleteColSql = "UPDATE metadata_column SET is_delete=true WHERE table_id=? and is_delete=false"; dStat = conn.prepareStatement(deleteColSql); dStat.setInt(1, id); dStat.executeUpdate(); dStat.close(); - String deleteDbSql = "delete from metadata_table where id=?"; +// String deleteDbSql = "delete from metadata_table where id=?"; + String deleteDbSql = "UPDATE metadata_table SET is_delete=true WHERE id=? and is_delete=false"; dStat = conn.prepareStatement(deleteDbSql); dStat.setInt(1, id); dStat.executeUpdate(); @@ -822,7 +827,7 @@ public void renameTable(ObjectPath tablePath, String newTableName, boolean ignor if (tableExists(newPath)) { throw new TableAlreadyExistException(getName(), newPath); } - String updateSql = "UPDATE metadata_table SET table_name=? WHERE id=?"; + String updateSql = "UPDATE metadata_table SET table_name=? WHERE id=? AND is_delete=false"; Connection conn = getConnection(); try (PreparedStatement ps = conn.prepareStatement(updateSql)) { ps.setString(1, newTableName); @@ -999,7 +1004,9 @@ public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean String updateSql = "INSERT INTO metadata_table_property(table_id," + "key, value) values (?,?,?) " - + "on CONFLICT (table_id, \"key\") do update set value = excluded.value, update_time = now()"; + + "on CONFLICT (table_id, \"key\") do update set value = excluded.value, update_time = now()" + + "WHERE is_delete=false"; + Connection conn = getConnection(); try (PreparedStatement ps = conn.prepareStatement(updateSql)) { for (Map.Entry entry : opts.entrySet()) { @@ -1178,7 +1185,7 @@ public List listFunctions(String dbName) throws DatabaseNotExistExceptio if (null == dbId) { throw new DatabaseNotExistException(getName(), dbName); } - String querySql = "SELECT function_name from metadata_function WHERE database_id=?"; + String querySql = "SELECT function_name from metadata_function WHERE database_id=? AND is_delete=false"; Connection conn = getConnection(); try (PreparedStatement gStat = conn.prepareStatement(querySql)) { @@ -1211,7 +1218,7 @@ public CatalogFunction getFunction(ObjectPath functionPath) throws FunctionNotEx throw new FunctionNotExistException(getName(), functionPath); } - String querySql = "SELECT class_name,function_language from metadata_function WHERE id=?"; + String querySql = "SELECT class_name,function_language from metadata_function WHERE id=? AND is_delete=false"; Connection conn = getConnection(); try (PreparedStatement gStat = conn.prepareStatement(querySql)) { gStat.setInt(1, id); @@ -1256,7 +1263,7 @@ private Integer getFunctionId(ObjectPath functionPath) { return null; } // Get id - String getIdSql = "select id from metadata_function where function_name=? and database_id=?"; + String getIdSql = "select id from metadata_function where function_name=? and database_id=? and is_delete=false"; Connection conn = getConnection(); try (PreparedStatement gStat = conn.prepareStatement(getIdSql)) { gStat.setString(1, functionPath.getObjectName()); @@ -1334,7 +1341,7 @@ public void alterFunction(ObjectPath functionPath, CatalogFunction newFunction, } Connection conn = getConnection(); - String insertSql = "update metadata_function set class_name =?, function_language=? where id=?"; + String insertSql = "update metadata_function set class_name =?, function_language=? where id=? and is_delete=false"; try (PreparedStatement ps = conn.prepareStatement(insertSql)) { ps.setString(1, newFunction.getClassName()); ps.setString(2, newFunction.getFunctionLanguage().toString()); @@ -1366,7 +1373,8 @@ public void dropFunction(ObjectPath functionPath, boolean ignoreIfNotExists) } Connection conn = getConnection(); - String insertSql = "delete from metadata_function where id=?"; +// String insertSql = "delete from metadata_function where id=?"; + String insertSql = "UPDATE metadata_function SET is_delete=true WHERE id=? AND is_delete=false"; try (PreparedStatement ps = conn.prepareStatement(insertSql)) { ps.setInt(1, id); ps.executeUpdate(); diff --git a/dinky-catalog/dinky-catalog-postgres/dinky-catalog-postgres-1.19/src/main/java/org/dinky/flink/catalog/DinkyPostgresCatalog.java b/dinky-catalog/dinky-catalog-postgres/dinky-catalog-postgres-1.19/src/main/java/org/dinky/flink/catalog/DinkyPostgresCatalog.java index 5357b49a70..28eae05ecf 100644 --- a/dinky-catalog/dinky-catalog-postgres/dinky-catalog-postgres-1.19/src/main/java/org/dinky/flink/catalog/DinkyPostgresCatalog.java +++ b/dinky-catalog/dinky-catalog-postgres/dinky-catalog-postgres-1.19/src/main/java/org/dinky/flink/catalog/DinkyPostgresCatalog.java @@ -276,7 +276,7 @@ protected Connection getConnection() throws CatalogException { @Override public List listDatabases() throws CatalogException { List myDatabases = new ArrayList<>(); - String querySql = "SELECT database_name FROM metadata_database"; + String querySql = "SELECT database_name FROM metadata_database WHERE is_delete=false"; Connection conn = getConnection(); try (PreparedStatement ps = conn.prepareStatement(querySql)) { @@ -302,7 +302,7 @@ public List listDatabases() throws CatalogException { */ @Override public CatalogDatabase getDatabase(String databaseName) throws DatabaseNotExistException, CatalogException { - String querySql = "SELECT id, database_name,description FROM metadata_database where database_name=?"; + String querySql = "SELECT id, database_name, description FROM metadata_database WHERE database_name=? AND is_delete=false"; Connection conn = getConnection(); try (PreparedStatement ps = conn.prepareStatement(querySql)) { ps.setString(1, databaseName); @@ -314,7 +314,7 @@ public CatalogDatabase getDatabase(String databaseName) throws DatabaseNotExistE Map map = new HashMap<>(); - String sql = "select \"key\", \"value\" from metadata_database_property where database_id=? "; + String sql = "SELECT \"key\", \"value\" FROM metadata_database_property WHERE database_id=? AND is_delete=false"; try (PreparedStatement pStat = conn.prepareStatement(sql)) { pStat.setInt(1, id); ResultSet prs = pStat.executeQuery(); @@ -357,7 +357,7 @@ public boolean databaseExists(String databaseName) throws CatalogException { * @throws CatalogException This exception is thrown if an error occurs while getting the database ID */ private Integer getDatabaseId(String databaseName) throws CatalogException { - String querySql = "select id from metadata_database where database_name=?"; + String querySql = "SELECT id FROM metadata_database WHERE database_name=? AND is_delete=false"; Connection conn = getConnection(); try (PreparedStatement ps = conn.prepareStatement(querySql)) { ps.setString(1, databaseName); @@ -479,13 +479,13 @@ public void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade } } // todo: Now it is actually deleted, whether records will be retained for subsequent designs. - String deletePropSql = "delete from metadata_database_property where database_id=?"; - PreparedStatement dStat = conn.prepareStatement(deletePropSql); + String updatePropSql = "UPDATE metadata_database_property SET is_delete=true WHERE database_id=? AND is_delete=false"; + PreparedStatement dStat = conn.prepareStatement(updatePropSql); dStat.setInt(1, id); dStat.executeUpdate(); dStat.close(); - String deleteDbSql = "delete from metadata_database where id=?"; - dStat = conn.prepareStatement(deleteDbSql); + String updateDbSql = "UPDATE metadata_database SET is_delete=true WHERE id=? AND is_delete=false"; + dStat = conn.prepareStatement(updateDbSql); dStat.setInt(1, id); dStat.executeUpdate(); dStat.close(); @@ -523,16 +523,17 @@ public void alterDatabase(String name, CatalogDatabase newDb, boolean ignoreIfNo try { conn.setAutoCommit(false); // 1. The name cannot be changed and the type cannot be changed. Only notes can be changed - String updateCommentSql = "update metadata_database set description=? where id=?"; + String updateCommentSql = "UPDATE metadata_database SET description=? WHERE id=? AND is_delete=false"; PreparedStatement uState = conn.prepareStatement(updateCommentSql); uState.setString(1, newDb.getComment()); uState.setInt(2, id); uState.executeUpdate(); uState.close(); if (newDb.getProperties() != null && newDb.getProperties().size() > 0) { - String upsertSql = "insert into metadata_database_property (database_id, key, value) " - + "values (?,?,?) " - + "on CONFLICT (database_id, \"key\") do update set value = excluded.value, update_time = now()"; + String upsertSql = "INSERT INTO metadata_database_property (database_id, key, value) " + + "VALUES (?, ?, ?) " + + "ON CONFLICT (database_id, \"key\") DO UPDATE SET value = excluded.value, update_time = now() " + + "WHERE is_delete=false"; PreparedStatement pstat = conn.prepareStatement(upsertSql); for (Map.Entry entry : newDb.getProperties().entrySet()) { pstat.setInt(1, id); @@ -593,7 +594,7 @@ protected List listTablesViews(String databaseName, String tableType) // get all schemas // To give table or view - String querySql = "SELECT table_name FROM metadata_table where table_type=? and database_id = ?"; + String querySql = "SELECT table_name FROM metadata_table WHERE database_name=? AND object_type=? AND is_delete=false"; Connection conn = getConnection(); try (PreparedStatement ps = conn.prepareStatement(querySql)) { ps.setString(1, tableType); @@ -634,7 +635,7 @@ public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistExcep Connection conn = getConnection(); try { - String queryTable = "SELECT table_name ,description, table_type FROM metadata_table where id=?"; + String queryTable = "SELECT table_name ,description, table_type FROM metadata_table where id=? AND is_delete=false"; PreparedStatement ps = conn.prepareStatement(queryTable); ps.setInt(1, id); ResultSet rs = ps.executeQuery(); @@ -650,7 +651,7 @@ public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistExcep } if (tableType.equals(ObjectType.TABLE)) { // This is table - String propSql = "SELECT \"key\", \"value\" from metadata_table_property " + "WHERE table_id=?"; + String propSql = "SELECT \"key\", \"value\" from metadata_table_property WHERE table_id=? AND is_delete=false"; PreparedStatement pState = conn.prepareStatement(propSql); pState.setInt(1, id); ResultSet prs = pState.executeQuery(); @@ -667,7 +668,7 @@ public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistExcep // 1. Get table information from the library. (Already done before) // 2. Take out the field. String colSql = - "SELECT column_name, column_type, data_type, description FROM metadata_column WHERE table_id=?"; + "SELECT column_name, column_type, data_type, description FROM metadata_column WHERE table_id=? AND is_delete=false"; PreparedStatement cStat = conn.prepareStatement(colSql); cStat.setInt(1, id); ResultSet crs = cStat.executeQuery(); @@ -685,7 +686,7 @@ public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistExcep } cStat.close(); // 3、Take out the query - String qSql = "SELECT \"key\", \"value\" FROM metadata_table_property WHERE table_id=? "; + String qSql = "SELECT \"key\", \"value\" FROM metadata_table_property WHERE table_id=? AND is_delete=false"; PreparedStatement qStat = conn.prepareStatement(qSql); qStat.setInt(1, id); ResultSet qrs = qStat.executeQuery(); @@ -740,7 +741,7 @@ private Integer getTableId(ObjectPath tablePath) { return null; } // 获取id - String getIdSql = "select id from metadata_table where table_name=? and database_id=?"; + String getIdSql = "select id from metadata_table where table_name=? and database_id=? and is_delete=false"; Connection conn = getConnection(); try (PreparedStatement gStat = conn.prepareStatement(getIdSql)) { gStat.setString(1, tablePath.getObjectName()); @@ -777,17 +778,21 @@ public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists) try { // todo: Now it is actually deleted, whether records will be retained for subsequent designs. conn.setAutoCommit(false); - String deletePropSql = "delete from metadata_table_property where table_id=?"; +// String deletePropSql = "delete from metadata_table_property where table_id=?"; + String deletePropSql = "UPDATE metadata_table_property SET is_delete=true WHERE table_id=? and is_delete=false"; + PreparedStatement dStat = conn.prepareStatement(deletePropSql); dStat.setInt(1, id); dStat.executeUpdate(); dStat.close(); - String deleteColSql = "delete from metadata_column where table_id=?"; +// String deleteColSql = "delete from metadata_column where table_id=?"; + String deleteColSql = "UPDATE metadata_column SET is_delete=true WHERE table_id=? and is_delete=false"; dStat = conn.prepareStatement(deleteColSql); dStat.setInt(1, id); dStat.executeUpdate(); dStat.close(); - String deleteDbSql = "delete from metadata_table where id=?"; +// String deleteDbSql = "delete from metadata_table where id=?"; + String deleteDbSql = "UPDATE metadata_table SET is_delete=true WHERE id=? and is_delete=false"; dStat = conn.prepareStatement(deleteDbSql); dStat.setInt(1, id); dStat.executeUpdate(); @@ -822,7 +827,7 @@ public void renameTable(ObjectPath tablePath, String newTableName, boolean ignor if (tableExists(newPath)) { throw new TableAlreadyExistException(getName(), newPath); } - String updateSql = "UPDATE metadata_table SET table_name=? WHERE id=?"; + String updateSql = "UPDATE metadata_table SET table_name=? WHERE id=? AND is_delete=false"; Connection conn = getConnection(); try (PreparedStatement ps = conn.prepareStatement(updateSql)) { ps.setString(1, newTableName); @@ -999,7 +1004,9 @@ public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean String updateSql = "INSERT INTO metadata_table_property(table_id," + "key, value) values (?,?,?) " - + "on CONFLICT (table_id, \"key\") do update set value = excluded.value, update_time = now()"; + + "on CONFLICT (table_id, \"key\") do update set value = excluded.value, update_time = now()" + + "WHERE is_delete=false"; + Connection conn = getConnection(); try (PreparedStatement ps = conn.prepareStatement(updateSql)) { for (Map.Entry entry : opts.entrySet()) { @@ -1178,7 +1185,7 @@ public List listFunctions(String dbName) throws DatabaseNotExistExceptio if (null == dbId) { throw new DatabaseNotExistException(getName(), dbName); } - String querySql = "SELECT function_name from metadata_function WHERE database_id=?"; + String querySql = "SELECT function_name from metadata_function WHERE database_id=? AND is_delete=false"; Connection conn = getConnection(); try (PreparedStatement gStat = conn.prepareStatement(querySql)) { @@ -1211,7 +1218,7 @@ public CatalogFunction getFunction(ObjectPath functionPath) throws FunctionNotEx throw new FunctionNotExistException(getName(), functionPath); } - String querySql = "SELECT class_name,function_language from metadata_function WHERE id=?"; + String querySql = "SELECT class_name,function_language from metadata_function WHERE id=? AND is_delete=false"; Connection conn = getConnection(); try (PreparedStatement gStat = conn.prepareStatement(querySql)) { gStat.setInt(1, id); @@ -1256,7 +1263,7 @@ private Integer getFunctionId(ObjectPath functionPath) { return null; } // Get id - String getIdSql = "select id from metadata_function where function_name=? and database_id=?"; + String getIdSql = "select id from metadata_function where function_name=? and database_id=? and is_delete=false"; Connection conn = getConnection(); try (PreparedStatement gStat = conn.prepareStatement(getIdSql)) { gStat.setString(1, functionPath.getObjectName()); @@ -1334,7 +1341,7 @@ public void alterFunction(ObjectPath functionPath, CatalogFunction newFunction, } Connection conn = getConnection(); - String insertSql = "update metadata_function set class_name =?, function_language=? where id=?"; + String insertSql = "update metadata_function set class_name =?, function_language=? where id=? and is_delete=false"; try (PreparedStatement ps = conn.prepareStatement(insertSql)) { ps.setString(1, newFunction.getClassName()); ps.setString(2, newFunction.getFunctionLanguage().toString()); @@ -1366,7 +1373,8 @@ public void dropFunction(ObjectPath functionPath, boolean ignoreIfNotExists) } Connection conn = getConnection(); - String insertSql = "delete from metadata_function where id=?"; +// String insertSql = "delete from metadata_function where id=?"; + String insertSql = "UPDATE metadata_function SET is_delete=true WHERE id=? AND is_delete=false"; try (PreparedStatement ps = conn.prepareStatement(insertSql)) { ps.setInt(1, id); ps.executeUpdate(); diff --git a/dinky-catalog/dinky-catalog-postgres/dinky-catalog-postgres-1.20/src/main/java/org/dinky/flink/catalog/DinkyPostgresCatalog.java b/dinky-catalog/dinky-catalog-postgres/dinky-catalog-postgres-1.20/src/main/java/org/dinky/flink/catalog/DinkyPostgresCatalog.java index 5357b49a70..28eae05ecf 100644 --- a/dinky-catalog/dinky-catalog-postgres/dinky-catalog-postgres-1.20/src/main/java/org/dinky/flink/catalog/DinkyPostgresCatalog.java +++ b/dinky-catalog/dinky-catalog-postgres/dinky-catalog-postgres-1.20/src/main/java/org/dinky/flink/catalog/DinkyPostgresCatalog.java @@ -276,7 +276,7 @@ protected Connection getConnection() throws CatalogException { @Override public List listDatabases() throws CatalogException { List myDatabases = new ArrayList<>(); - String querySql = "SELECT database_name FROM metadata_database"; + String querySql = "SELECT database_name FROM metadata_database WHERE is_delete=false"; Connection conn = getConnection(); try (PreparedStatement ps = conn.prepareStatement(querySql)) { @@ -302,7 +302,7 @@ public List listDatabases() throws CatalogException { */ @Override public CatalogDatabase getDatabase(String databaseName) throws DatabaseNotExistException, CatalogException { - String querySql = "SELECT id, database_name,description FROM metadata_database where database_name=?"; + String querySql = "SELECT id, database_name, description FROM metadata_database WHERE database_name=? AND is_delete=false"; Connection conn = getConnection(); try (PreparedStatement ps = conn.prepareStatement(querySql)) { ps.setString(1, databaseName); @@ -314,7 +314,7 @@ public CatalogDatabase getDatabase(String databaseName) throws DatabaseNotExistE Map map = new HashMap<>(); - String sql = "select \"key\", \"value\" from metadata_database_property where database_id=? "; + String sql = "SELECT \"key\", \"value\" FROM metadata_database_property WHERE database_id=? AND is_delete=false"; try (PreparedStatement pStat = conn.prepareStatement(sql)) { pStat.setInt(1, id); ResultSet prs = pStat.executeQuery(); @@ -357,7 +357,7 @@ public boolean databaseExists(String databaseName) throws CatalogException { * @throws CatalogException This exception is thrown if an error occurs while getting the database ID */ private Integer getDatabaseId(String databaseName) throws CatalogException { - String querySql = "select id from metadata_database where database_name=?"; + String querySql = "SELECT id FROM metadata_database WHERE database_name=? AND is_delete=false"; Connection conn = getConnection(); try (PreparedStatement ps = conn.prepareStatement(querySql)) { ps.setString(1, databaseName); @@ -479,13 +479,13 @@ public void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade } } // todo: Now it is actually deleted, whether records will be retained for subsequent designs. - String deletePropSql = "delete from metadata_database_property where database_id=?"; - PreparedStatement dStat = conn.prepareStatement(deletePropSql); + String updatePropSql = "UPDATE metadata_database_property SET is_delete=true WHERE database_id=? AND is_delete=false"; + PreparedStatement dStat = conn.prepareStatement(updatePropSql); dStat.setInt(1, id); dStat.executeUpdate(); dStat.close(); - String deleteDbSql = "delete from metadata_database where id=?"; - dStat = conn.prepareStatement(deleteDbSql); + String updateDbSql = "UPDATE metadata_database SET is_delete=true WHERE id=? AND is_delete=false"; + dStat = conn.prepareStatement(updateDbSql); dStat.setInt(1, id); dStat.executeUpdate(); dStat.close(); @@ -523,16 +523,17 @@ public void alterDatabase(String name, CatalogDatabase newDb, boolean ignoreIfNo try { conn.setAutoCommit(false); // 1. The name cannot be changed and the type cannot be changed. Only notes can be changed - String updateCommentSql = "update metadata_database set description=? where id=?"; + String updateCommentSql = "UPDATE metadata_database SET description=? WHERE id=? AND is_delete=false"; PreparedStatement uState = conn.prepareStatement(updateCommentSql); uState.setString(1, newDb.getComment()); uState.setInt(2, id); uState.executeUpdate(); uState.close(); if (newDb.getProperties() != null && newDb.getProperties().size() > 0) { - String upsertSql = "insert into metadata_database_property (database_id, key, value) " - + "values (?,?,?) " - + "on CONFLICT (database_id, \"key\") do update set value = excluded.value, update_time = now()"; + String upsertSql = "INSERT INTO metadata_database_property (database_id, key, value) " + + "VALUES (?, ?, ?) " + + "ON CONFLICT (database_id, \"key\") DO UPDATE SET value = excluded.value, update_time = now() " + + "WHERE is_delete=false"; PreparedStatement pstat = conn.prepareStatement(upsertSql); for (Map.Entry entry : newDb.getProperties().entrySet()) { pstat.setInt(1, id); @@ -593,7 +594,7 @@ protected List listTablesViews(String databaseName, String tableType) // get all schemas // To give table or view - String querySql = "SELECT table_name FROM metadata_table where table_type=? and database_id = ?"; + String querySql = "SELECT table_name FROM metadata_table WHERE database_name=? AND object_type=? AND is_delete=false"; Connection conn = getConnection(); try (PreparedStatement ps = conn.prepareStatement(querySql)) { ps.setString(1, tableType); @@ -634,7 +635,7 @@ public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistExcep Connection conn = getConnection(); try { - String queryTable = "SELECT table_name ,description, table_type FROM metadata_table where id=?"; + String queryTable = "SELECT table_name ,description, table_type FROM metadata_table where id=? AND is_delete=false"; PreparedStatement ps = conn.prepareStatement(queryTable); ps.setInt(1, id); ResultSet rs = ps.executeQuery(); @@ -650,7 +651,7 @@ public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistExcep } if (tableType.equals(ObjectType.TABLE)) { // This is table - String propSql = "SELECT \"key\", \"value\" from metadata_table_property " + "WHERE table_id=?"; + String propSql = "SELECT \"key\", \"value\" from metadata_table_property WHERE table_id=? AND is_delete=false"; PreparedStatement pState = conn.prepareStatement(propSql); pState.setInt(1, id); ResultSet prs = pState.executeQuery(); @@ -667,7 +668,7 @@ public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistExcep // 1. Get table information from the library. (Already done before) // 2. Take out the field. String colSql = - "SELECT column_name, column_type, data_type, description FROM metadata_column WHERE table_id=?"; + "SELECT column_name, column_type, data_type, description FROM metadata_column WHERE table_id=? AND is_delete=false"; PreparedStatement cStat = conn.prepareStatement(colSql); cStat.setInt(1, id); ResultSet crs = cStat.executeQuery(); @@ -685,7 +686,7 @@ public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistExcep } cStat.close(); // 3、Take out the query - String qSql = "SELECT \"key\", \"value\" FROM metadata_table_property WHERE table_id=? "; + String qSql = "SELECT \"key\", \"value\" FROM metadata_table_property WHERE table_id=? AND is_delete=false"; PreparedStatement qStat = conn.prepareStatement(qSql); qStat.setInt(1, id); ResultSet qrs = qStat.executeQuery(); @@ -740,7 +741,7 @@ private Integer getTableId(ObjectPath tablePath) { return null; } // 获取id - String getIdSql = "select id from metadata_table where table_name=? and database_id=?"; + String getIdSql = "select id from metadata_table where table_name=? and database_id=? and is_delete=false"; Connection conn = getConnection(); try (PreparedStatement gStat = conn.prepareStatement(getIdSql)) { gStat.setString(1, tablePath.getObjectName()); @@ -777,17 +778,21 @@ public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists) try { // todo: Now it is actually deleted, whether records will be retained for subsequent designs. conn.setAutoCommit(false); - String deletePropSql = "delete from metadata_table_property where table_id=?"; +// String deletePropSql = "delete from metadata_table_property where table_id=?"; + String deletePropSql = "UPDATE metadata_table_property SET is_delete=true WHERE table_id=? and is_delete=false"; + PreparedStatement dStat = conn.prepareStatement(deletePropSql); dStat.setInt(1, id); dStat.executeUpdate(); dStat.close(); - String deleteColSql = "delete from metadata_column where table_id=?"; +// String deleteColSql = "delete from metadata_column where table_id=?"; + String deleteColSql = "UPDATE metadata_column SET is_delete=true WHERE table_id=? and is_delete=false"; dStat = conn.prepareStatement(deleteColSql); dStat.setInt(1, id); dStat.executeUpdate(); dStat.close(); - String deleteDbSql = "delete from metadata_table where id=?"; +// String deleteDbSql = "delete from metadata_table where id=?"; + String deleteDbSql = "UPDATE metadata_table SET is_delete=true WHERE id=? and is_delete=false"; dStat = conn.prepareStatement(deleteDbSql); dStat.setInt(1, id); dStat.executeUpdate(); @@ -822,7 +827,7 @@ public void renameTable(ObjectPath tablePath, String newTableName, boolean ignor if (tableExists(newPath)) { throw new TableAlreadyExistException(getName(), newPath); } - String updateSql = "UPDATE metadata_table SET table_name=? WHERE id=?"; + String updateSql = "UPDATE metadata_table SET table_name=? WHERE id=? AND is_delete=false"; Connection conn = getConnection(); try (PreparedStatement ps = conn.prepareStatement(updateSql)) { ps.setString(1, newTableName); @@ -999,7 +1004,9 @@ public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean String updateSql = "INSERT INTO metadata_table_property(table_id," + "key, value) values (?,?,?) " - + "on CONFLICT (table_id, \"key\") do update set value = excluded.value, update_time = now()"; + + "on CONFLICT (table_id, \"key\") do update set value = excluded.value, update_time = now()" + + "WHERE is_delete=false"; + Connection conn = getConnection(); try (PreparedStatement ps = conn.prepareStatement(updateSql)) { for (Map.Entry entry : opts.entrySet()) { @@ -1178,7 +1185,7 @@ public List listFunctions(String dbName) throws DatabaseNotExistExceptio if (null == dbId) { throw new DatabaseNotExistException(getName(), dbName); } - String querySql = "SELECT function_name from metadata_function WHERE database_id=?"; + String querySql = "SELECT function_name from metadata_function WHERE database_id=? AND is_delete=false"; Connection conn = getConnection(); try (PreparedStatement gStat = conn.prepareStatement(querySql)) { @@ -1211,7 +1218,7 @@ public CatalogFunction getFunction(ObjectPath functionPath) throws FunctionNotEx throw new FunctionNotExistException(getName(), functionPath); } - String querySql = "SELECT class_name,function_language from metadata_function WHERE id=?"; + String querySql = "SELECT class_name,function_language from metadata_function WHERE id=? AND is_delete=false"; Connection conn = getConnection(); try (PreparedStatement gStat = conn.prepareStatement(querySql)) { gStat.setInt(1, id); @@ -1256,7 +1263,7 @@ private Integer getFunctionId(ObjectPath functionPath) { return null; } // Get id - String getIdSql = "select id from metadata_function where function_name=? and database_id=?"; + String getIdSql = "select id from metadata_function where function_name=? and database_id=? and is_delete=false"; Connection conn = getConnection(); try (PreparedStatement gStat = conn.prepareStatement(getIdSql)) { gStat.setString(1, functionPath.getObjectName()); @@ -1334,7 +1341,7 @@ public void alterFunction(ObjectPath functionPath, CatalogFunction newFunction, } Connection conn = getConnection(); - String insertSql = "update metadata_function set class_name =?, function_language=? where id=?"; + String insertSql = "update metadata_function set class_name =?, function_language=? where id=? and is_delete=false"; try (PreparedStatement ps = conn.prepareStatement(insertSql)) { ps.setString(1, newFunction.getClassName()); ps.setString(2, newFunction.getFunctionLanguage().toString()); @@ -1366,7 +1373,8 @@ public void dropFunction(ObjectPath functionPath, boolean ignoreIfNotExists) } Connection conn = getConnection(); - String insertSql = "delete from metadata_function where id=?"; +// String insertSql = "delete from metadata_function where id=?"; + String insertSql = "UPDATE metadata_function SET is_delete=true WHERE id=? AND is_delete=false"; try (PreparedStatement ps = conn.prepareStatement(insertSql)) { ps.setInt(1, id); ps.executeUpdate();