Skip to content

Commit

Permalink
add AlertDatabase request and response, and remove ignoreIfExists in …
Browse files Browse the repository at this point in the history
…CreateDatabaseReques
  • Loading branch information
jerry-024 committed Dec 13, 2024
1 parent 6b646be commit 8cb2d26
Show file tree
Hide file tree
Showing 7 changed files with 172 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.util.Map;

/** define change to the database property. */
public interface DatabaseChange {
static DatabaseChange setProperty(String property, String value) {
return new SetProperty(property, value);
Expand Down
27 changes: 17 additions & 10 deletions paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -424,16 +424,14 @@ public static boolean deleteProperties(
Stream.concat(Stream.of(storeKey, databaseName), properties.stream())
.toArray(String[]::new);

return execute(connections, JdbcUtils.deletePropertiesStatement(properties), args) > 0;
}

private static String deletePropertiesStatement(Set<String> properties) {
StringBuilder sqlStatement = new StringBuilder(JdbcUtils.DELETE_DATABASE_PROPERTIES_SQL);
String values =
String.join(",", Collections.nCopies(properties.size(), String.valueOf('?')));
sqlStatement.append("(").append(values).append(")");

return sqlStatement.toString();
int deleteRecords =
execute(connections, JdbcUtils.deletePropertiesStatement(properties), args);
if (deleteRecords > 0) {
return true;
}
throw new IllegalStateException(
String.format(
"Failed to delete: %d of %d succeeded", deleteRecords, properties.size()));
}

public static void createDistributedLockTable(JdbcClientPool connections, Options options)
Expand All @@ -460,4 +458,13 @@ public static void release(JdbcClientPool connections, String lockId)
DistributedLockDialectFactory.create(connections.getProtocol())
.releaseLock(connections, lockId);
}

private static String deletePropertiesStatement(Set<String> properties) {
StringBuilder sqlStatement = new StringBuilder(JdbcUtils.DELETE_DATABASE_PROPERTIES_SQL);
String values =
String.join(",", Collections.nCopies(properties.size(), String.valueOf('?')));
sqlStatement.append("(").append(values).append(")");

return sqlStatement.toString();
}
}
53 changes: 41 additions & 12 deletions paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@
import org.apache.paimon.rest.auth.CredentialsProviderFactory;
import org.apache.paimon.rest.exceptions.AlreadyExistsException;
import org.apache.paimon.rest.exceptions.NoSuchResourceException;
import org.apache.paimon.rest.requests.AlertDatabaseRequest;
import org.apache.paimon.rest.requests.CreateDatabaseRequest;
import org.apache.paimon.rest.requests.DropDatabaseRequest;
import org.apache.paimon.rest.responses.AlertDatabaseResponse;
import org.apache.paimon.rest.responses.ConfigResponse;
import org.apache.paimon.rest.responses.CreateDatabaseResponse;
import org.apache.paimon.rest.responses.DatabaseName;
Expand All @@ -44,10 +47,11 @@

import org.apache.paimon.shade.guava30.com.google.common.annotations.VisibleForTesting;
import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
import org.apache.paimon.shade.guava30.com.google.common.collect.Maps;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -137,12 +141,14 @@ public List<String> listDatabases() {
@Override
public void createDatabase(String name, boolean ignoreIfExists, Map<String, String> properties)
throws DatabaseAlreadyExistException {
CreateDatabaseRequest request = new CreateDatabaseRequest(name, ignoreIfExists, properties);
CreateDatabaseRequest request = new CreateDatabaseRequest(name, properties);
try {
client.post(
resourcePaths.databases(), request, CreateDatabaseResponse.class, headers());
} catch (AlreadyExistsException e) {
throw new DatabaseAlreadyExistException(name);
if (!ignoreIfExists) {
throw new DatabaseAlreadyExistException(name);
}
}
}

Expand All @@ -161,22 +167,45 @@ public Database getDatabase(String name) throws DatabaseNotExistException {
@Override
public void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade)
throws DatabaseNotExistException, DatabaseNotEmptyException {
DropDatabaseRequest request = new DropDatabaseRequest(ignoreIfNotExists, cascade);
try {
if (!cascade && !this.listTables(name).isEmpty()) {
throw new DatabaseNotEmptyException(name);
}
client.delete(resourcePaths.database(name), headers());
client.delete(resourcePaths.database(name), request, headers());
} catch (NoSuchResourceException e) {
if (!ignoreIfNotExists) {
throw new DatabaseNotExistException(name);
}
throw new DatabaseNotExistException(name);
}
}

@Override
public void alertDatabase(String name, List<DatabaseChange> changes, boolean ignoreIfNotExists)
throws DatabaseNotExistException {
throw new UnsupportedOperationException();
try {
Map<String, String> insertProperties = Maps.newHashMap();
List<String> removeProperties = Lists.newArrayList();
changes.forEach(
change -> {
if (change instanceof DatabaseChange.SetProperty) {
DatabaseChange.SetProperty setProperty =
(DatabaseChange.SetProperty) change;
insertProperties.put(setProperty.property(), setProperty.value());
} else {
removeProperties.add(
((DatabaseChange.RemoveProperty) change).property());
}
});
AlertDatabaseRequest request =
new AlertDatabaseRequest(removeProperties, insertProperties);
AlertDatabaseResponse response =
client.post(
resourcePaths.database(name),
request,
AlertDatabaseResponse.class,
headers());
if (response.getUpdated().isEmpty()) {
throw new IllegalStateException("Failed to update properties");
}
} catch (NoSuchResourceException e) {
throw new DatabaseNotExistException(name);
}
}

@Override
Expand All @@ -191,7 +220,7 @@ public Path getTableLocation(Identifier identifier) {

@Override
public List<String> listTables(String databaseName) throws DatabaseNotExistException {
return new ArrayList<String>();
throw new UnsupportedOperationException();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.rest.requests;

import org.apache.paimon.rest.RESTRequest;

import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty;

import java.util.List;
import java.util.Map;

/** Request for alerting database. */
public class AlertDatabaseRequest implements RESTRequest {

private static final String FIELD_REMOVALS = "removals";
private static final String FIELD_UPDATES = "updates";

@JsonProperty(FIELD_REMOVALS)
private List<String> removals;

@JsonProperty(FIELD_UPDATES)
private Map<String, String> updates;

@JsonCreator
public AlertDatabaseRequest(
@JsonProperty(FIELD_REMOVALS) List<String> removals,
@JsonProperty(FIELD_UPDATES) Map<String, String> updates) {
this.removals = removals;
this.updates = updates;
}

@JsonGetter(FIELD_REMOVALS)
public List<String> getRemovals() {
return removals;
}

@JsonGetter(FIELD_UPDATES)
public Map<String, String> getUpdates() {
return updates;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,25 +30,19 @@
public class CreateDatabaseRequest implements RESTRequest {

private static final String FIELD_NAME = "name";
private static final String FIELD_IGNORE_IF_EXISTS = "ignoreIfExists";
private static final String FIELD_OPTIONS = "options";

@JsonProperty(FIELD_NAME)
private String name;

@JsonProperty(FIELD_IGNORE_IF_EXISTS)
private boolean ignoreIfExists;

@JsonProperty(FIELD_OPTIONS)
private Map<String, String> options;

@JsonCreator
public CreateDatabaseRequest(
@JsonProperty(FIELD_NAME) String name,
@JsonProperty(FIELD_IGNORE_IF_EXISTS) boolean ignoreIfExists,
@JsonProperty(FIELD_OPTIONS) Map<String, String> options) {
this.name = name;
this.ignoreIfExists = ignoreIfExists;
this.options = options;
}

Expand All @@ -57,11 +51,6 @@ public String getName() {
return name;
}

@JsonGetter(FIELD_IGNORE_IF_EXISTS)
public boolean getIgnoreIfExists() {
return ignoreIfExists;
}

@JsonGetter(FIELD_OPTIONS)
public Map<String, String> getOptions() {
return options;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.rest.responses;

import org.apache.paimon.rest.RESTResponse;

import java.util.List;

/** Response for alerting database. */
public class AlertDatabaseResponse implements RESTResponse {

// List of namespace property keys that were removed
private List<String> removed;
// List of namespace property keys that were added or updated
private List<String> updated;
// List of properties that were requested for removal that were not found in the namespace's
// properties
private List<String> missing;

public AlertDatabaseResponse(List<String> removed, List<String> updated, List<String> missing) {
this.removed = removed;
this.updated = updated;
this.missing = missing;
}

public List<String> getRemoved() {
return removed;
}

public List<String> getUpdated() {
return updated;
}

public List<String> getMissing() {
return missing;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,9 @@ public static String databaseName() {
}

public static CreateDatabaseRequest createDatabaseRequest(String name) {
boolean ignoreIfExists = true;
Map<String, String> options = new HashMap<>();
options.put("a", "b");
return new CreateDatabaseRequest(name, ignoreIfExists, options);
return new CreateDatabaseRequest(name, options);
}

public static CreateDatabaseResponse createDatabaseResponse(String name) {
Expand Down

0 comments on commit 8cb2d26

Please sign in to comment.