Skip to content

Commit

Permalink
[hive] Fix flink not synchronizing to hive metastore when deleting ta…
Browse files Browse the repository at this point in the history
…ble partitions using hive catalog (apache#3411)
  • Loading branch information
zhuangchong authored May 29, 2024
1 parent 8cc348a commit 8791c5d
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,7 @@ public static RowType addKeyNamePrefix(RowType type) {

public static List<DataField> addKeyNamePrefix(List<DataField> keyFields) {
return keyFields.stream()
.map(
f ->
new DataField(
f.id(),
KEY_FIELD_PREFIX + f.name(),
f.type(),
f.description()))
.map(f -> f.newName(KEY_FIELD_PREFIX + f.name()))
.collect(Collectors.toList());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
Expand Down Expand Up @@ -268,6 +269,25 @@ public Map<String, String> loadDatabasePropertiesImpl(String name) {
}
}

@Override
public void dropPartition(Identifier identifier, Map<String, String> partitionSpec)
throws TableNotExistException {
TableSchema tableSchema = getDataTableSchema(identifier);
if (!tableSchema.partitionKeys().isEmpty()
&& new CoreOptions(tableSchema.options()).partitionedTableInMetastore()) {
try {
// Do not close client, it is for HiveCatalog
@SuppressWarnings("resource")
HiveMetastoreClient metastoreClient =
new HiveMetastoreClient(identifier, tableSchema, client);
metastoreClient.deletePartition(new LinkedHashMap<>(partitionSpec));
} catch (Exception e) {
throw new RuntimeException(e);
}
}
super.dropPartition(identifier, partitionSpec);
}

private Map<String, String> convertToProperties(Database database) {
Map<String, String> properties = new HashMap<>(database.getParameters());
if (database.getLocationUri() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -970,6 +970,19 @@ public void testAddPartitionsToMetastoreForUnpartitionedTable() throws Exception
.containsExactlyInAnyOrder("1\t10", "2\t20");
}

@Test
public void testDropPartitionsToMetastore() throws Exception {
prepareTestAddPartitionsToMetastore();

// drop partition
tEnv.executeSql(
"ALTER TABLE t DROP PARTITION (ptb = '1a', pta = 1), PARTITION (ptb = '1b', pta = 1)")
.await();
assertThat(hiveShell.executeQuery("show partitions t"))
.containsExactlyInAnyOrder(
"ptb=2a/pta=2", "ptb=2b/pta=2", "ptb=3a/pta=3", "ptb=3b/pta=3");
}

@Test
public void testAddPartitionsForTag() throws Exception {
tEnv.executeSql(
Expand Down

0 comments on commit 8791c5d

Please sign in to comment.