Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Add database API implementation in RESTCatalog #4676

Merged
merged 19 commits into from
Dec 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@

package org.apache.paimon.rest;

import org.apache.paimon.rest.exceptions.AlreadyExistsException;
import org.apache.paimon.rest.exceptions.BadRequestException;
import org.apache.paimon.rest.exceptions.ForbiddenException;
import org.apache.paimon.rest.exceptions.NoSuchResourceException;
import org.apache.paimon.rest.exceptions.NotAuthorizedException;
import org.apache.paimon.rest.exceptions.RESTException;
import org.apache.paimon.rest.exceptions.ServiceFailureException;
Expand All @@ -28,6 +30,7 @@

/** Default error handler. */
public class DefaultErrorHandler extends ErrorHandler {

private static final ErrorHandler INSTANCE = new DefaultErrorHandler();

public static ErrorHandler getInstance() {
Expand All @@ -36,26 +39,32 @@ public static ErrorHandler getInstance() {

@Override
public void accept(ErrorResponse error) {
int code = error.code();
int code = error.getCode();
String message = error.getMessage();
switch (code) {
case 400:
throw new BadRequestException(
String.format("Malformed request: %s", error.message()));
throw new BadRequestException(String.format("Malformed request: %s", message));
case 401:
throw new NotAuthorizedException("Not authorized: %s", error.message());
throw new NotAuthorizedException("Not authorized: %s", message);
case 403:
throw new ForbiddenException("Forbidden: %s", error.message());
throw new ForbiddenException("Forbidden: %s", message);
case 404:
throw new NoSuchResourceException("%s", message);
case 405:
case 406:
break;
case 409:
throw new AlreadyExistsException("%s", message);
case 500:
throw new ServiceFailureException("Server error: %s", error.message());
throw new ServiceFailureException("Server error: %s", message);
case 501:
throw new UnsupportedOperationException(error.message());
throw new UnsupportedOperationException(message);
case 503:
throw new ServiceUnavailableException("Service unavailable: %s", error.message());
throw new ServiceUnavailableException("Service unavailable: %s", message);
default:
break;
}

throw new RESTException("Unable to process: %s", error.message());
throw new RESTException("Unable to process: %s", message);
}
}
24 changes: 22 additions & 2 deletions paimon-core/src/main/java/org/apache/paimon/rest/HttpClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,23 @@ public <T extends RESTResponse> T post(
}
}

@Override
public <T extends RESTResponse> T delete(
String path, RESTRequest body, Map<String, String> headers) {
try {
RequestBody requestBody = buildRequestBody(body);
Request request =
new Request.Builder()
.url(uri + path)
.delete(requestBody)
.headers(Headers.of(headers))
.build();
return exec(request, null);
} catch (Exception e) {
throw new RuntimeException(e);
}
}

@Override
public void close() throws IOException {
okHttpClient.dispatcher().cancelAll();
Expand All @@ -111,10 +128,13 @@ private <T extends RESTResponse> T exec(Request request, Class<T> responseType)
response.code());
errorHandler.accept(error);
}
if (responseBodyStr == null) {
if (responseType != null && responseBodyStr != null) {
return mapper.readValue(responseBodyStr, responseType);
} else if (responseType == null) {
return null;
} else {
throw new RESTException("response body is null.");
}
return mapper.readValue(responseBodyStr, responseType);
} catch (Exception e) {
throw new RESTException(e, "rest exception");
}
Expand Down
45 changes: 40 additions & 5 deletions paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,29 @@
import org.apache.paimon.rest.auth.AuthSession;
import org.apache.paimon.rest.auth.CredentialsProvider;
import org.apache.paimon.rest.auth.CredentialsProviderFactory;
import org.apache.paimon.rest.exceptions.AlreadyExistsException;
import org.apache.paimon.rest.exceptions.NoSuchResourceException;
import org.apache.paimon.rest.requests.CreateDatabaseRequest;
import org.apache.paimon.rest.requests.DropDatabaseRequest;
import org.apache.paimon.rest.responses.ConfigResponse;
import org.apache.paimon.rest.responses.CreateDatabaseResponse;
import org.apache.paimon.rest.responses.DatabaseName;
import org.apache.paimon.rest.responses.GetDatabaseResponse;
import org.apache.paimon.rest.responses.ListDatabasesResponse;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.table.Table;

import org.apache.paimon.shade.guava30.com.google.common.annotations.VisibleForTesting;
import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;

import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ScheduledExecutorService;
import java.util.stream.Collectors;

import static org.apache.paimon.utils.ThreadPoolUtils.createScheduledThreadPool;

Expand Down Expand Up @@ -113,24 +123,49 @@ public FileIO fileIO() {

@Override
public List<String> listDatabases() {
throw new UnsupportedOperationException();
ListDatabasesResponse response =
client.get(resourcePaths.databases(), ListDatabasesResponse.class, headers());
if (response.getDatabases() != null) {
return response.getDatabases().stream()
.map(DatabaseName::getName)
.collect(Collectors.toList());
}
return ImmutableList.of();
}

@Override
public void createDatabase(String name, boolean ignoreIfExists, Map<String, String> properties)
throws DatabaseAlreadyExistException {
throw new UnsupportedOperationException();
CreateDatabaseRequest request = new CreateDatabaseRequest(name, ignoreIfExists, properties);
try {
client.post(
resourcePaths.databases(), request, CreateDatabaseResponse.class, headers());
} catch (AlreadyExistsException e) {
throw new DatabaseAlreadyExistException(name);
}
}

@Override
public Database getDatabase(String name) throws DatabaseNotExistException {
throw new UnsupportedOperationException();
try {
GetDatabaseResponse response =
client.get(resourcePaths.database(name), GetDatabaseResponse.class, headers());
return new Database.DatabaseImpl(
name, response.options(), response.comment().orElseGet(() -> null));
} catch (NoSuchResourceException e) {
throw new DatabaseNotExistException(name);
}
}

@Override
public void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade)
throws DatabaseNotExistException, DatabaseNotEmptyException {
throw new UnsupportedOperationException();
DropDatabaseRequest request = new DropDatabaseRequest(ignoreIfNotExists, cascade);
try {
client.delete(resourcePaths.database(name), request, headers());
} catch (NoSuchResourceException e) {
throw new DatabaseNotExistException(name);
}
}

@Override
Expand Down Expand Up @@ -208,7 +243,7 @@ public void close() throws Exception {
Map<String, String> fetchOptionsFromServer(
Map<String, String> headers, Map<String, String> clientProperties) {
ConfigResponse response =
client.get(ResourcePaths.V1_CONFIG, ConfigResponse.class, headers());
client.get(ResourcePaths.V1_CONFIG, ConfigResponse.class, headers);
return response.merge(clientProperties);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,9 @@ public class RESTCatalogInternalOptions {
.stringType()
.noDefaultValue()
.withDescription("REST Catalog auth credentials provider.");
public static final ConfigOption<String> DATABASE_COMMENT =
ConfigOptions.key("comment")
.stringType()
.defaultValue(null)
.withDescription("REST Catalog database comment.");
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,6 @@ public interface RESTClient extends Closeable {

<T extends RESTResponse> T post(
String path, RESTRequest body, Class<T> responseType, Map<String, String> headers);

<T extends RESTResponse> T delete(String path, RESTRequest body, Map<String, String> headers);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,8 @@

package org.apache.paimon.rest;

import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;

/** Interface to mark both REST requests and responses. */
@JsonIgnoreProperties(ignoreUnknown = true)
public interface RESTMessage {}
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@

package org.apache.paimon.rest;

import java.util.StringJoiner;

/** Resource paths for REST catalog. */
public class ResourcePaths {

public static final String V1_CONFIG = "/api/v1/config";
private static final StringJoiner SLASH = new StringJoiner("/");

public static ResourcePaths forCatalogProperties(String prefix) {
return new ResourcePaths(prefix);
Expand All @@ -32,4 +35,12 @@ public static ResourcePaths forCatalogProperties(String prefix) {
public ResourcePaths(String prefix) {
this.prefix = prefix;
}

public String databases() {
return SLASH.add("api").add("v1").add(prefix).add("databases").toString();
}

public String database(String databaseName) {
return SLASH.add("api").add("v1").add(prefix).add("databases").add(databaseName).toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,10 @@
public class AuthSession {

static final int TOKEN_REFRESH_NUM_RETRIES = 5;
static final long MIN_REFRESH_WAIT_MILLIS = 10;
static final long MAX_REFRESH_WINDOW_MILLIS = 300_000; // 5 minutes

private static final Logger log = LoggerFactory.getLogger(AuthSession.class);
private static final long MAX_REFRESH_WINDOW_MILLIS = 300_000; // 5 minutes
private static final long MIN_REFRESH_WAIT_MILLIS = 10;
private final CredentialsProvider credentialsProvider;
private volatile Map<String, String> headers;

Expand Down Expand Up @@ -76,25 +77,46 @@ public Map<String, String> getHeaders() {
return headers;
}

public Boolean refresh() {
if (this.credentialsProvider.supportRefresh()
&& this.credentialsProvider.keepRefreshed()
&& this.credentialsProvider.expiresInMills().isPresent()) {
boolean isSuccessful = this.credentialsProvider.refresh();
if (isSuccessful) {
Map<String, String> currentHeaders = this.headers;
this.headers =
RESTUtil.merge(currentHeaders, this.credentialsProvider.authHeader());
}
return isSuccessful;
}

return false;
}

@VisibleForTesting
static void scheduleTokenRefresh(
ScheduledExecutorService executor, AuthSession session, long expiresAtMillis) {
scheduleTokenRefresh(executor, session, expiresAtMillis, 0);
}

@VisibleForTesting
static long getTimeToWaitByExpiresInMills(long expiresInMillis) {
// how much ahead of time to start the refresh to allow it to complete
long refreshWindowMillis = Math.min(expiresInMillis, MAX_REFRESH_WINDOW_MILLIS);
// how much time to wait before expiration
long waitIntervalMillis = expiresInMillis - refreshWindowMillis;
// how much time to actually wait
return Math.max(waitIntervalMillis, MIN_REFRESH_WAIT_MILLIS);
}

private static void scheduleTokenRefresh(
ScheduledExecutorService executor,
AuthSession session,
long expiresAtMillis,
int retryTimes) {
if (retryTimes < TOKEN_REFRESH_NUM_RETRIES) {
long expiresInMillis = expiresAtMillis - System.currentTimeMillis();
// how much ahead of time to start the refresh to allow it to complete
long refreshWindowMillis = Math.min(expiresInMillis, MAX_REFRESH_WINDOW_MILLIS);
// how much time to wait before expiration
long waitIntervalMillis = expiresInMillis - refreshWindowMillis;
// how much time to actually wait
long timeToWait = Math.max(waitIntervalMillis, MIN_REFRESH_WAIT_MILLIS);
long timeToWait = getTimeToWaitByExpiresInMills(expiresInMillis);

executor.schedule(
() -> {
Expand All @@ -118,20 +140,4 @@ private static void scheduleTokenRefresh(
log.warn("Failed to refresh token after {} retries.", TOKEN_REFRESH_NUM_RETRIES);
}
}

public Boolean refresh() {
if (this.credentialsProvider.supportRefresh()
&& this.credentialsProvider.keepRefreshed()
&& this.credentialsProvider.expiresInMills().isPresent()) {
boolean isSuccessful = this.credentialsProvider.refresh();
if (isSuccessful) {
Map<String, String> currentHeaders = this.headers;
this.headers =
RESTUtil.merge(currentHeaders, this.credentialsProvider.authHeader());
}
return isSuccessful;
}

return false;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.rest.exceptions;

/** Exception thrown on HTTP 409 means a resource already exists. */
public class AlreadyExistsException extends RESTException {

public AlreadyExistsException(String message, Object... args) {
super(message, args);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.rest.exceptions;

/** Exception thrown on HTTP 404 means a resource not exists. */
public class NoSuchResourceException extends RESTException {

public NoSuchResourceException(String message, Object... args) {
super(message, args);
}
}
Loading
Loading