-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[core] Support auth in REST Catalog #4648
Changes from 17 commits
e5ce67b
d2e120b
1cc75d9
9ee1aa1
94c1ab3
c4e98e8
6f3391a
f733f27
e27991d
8478865
ebe0bb5
bb46ae3
b91d797
ec074c2
9aafd86
6e7f2cd
2a422b5
b754f0b
858f13b
c2c3046
7498e7e
314f965
4a08b0b
5e09533
e348da1
944a452
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,7 +18,6 @@ | |
|
||
package org.apache.paimon.rest; | ||
|
||
import org.apache.paimon.annotation.VisibleForTesting; | ||
import org.apache.paimon.catalog.Catalog; | ||
import org.apache.paimon.catalog.Database; | ||
import org.apache.paimon.catalog.Identifier; | ||
|
@@ -27,37 +26,44 @@ | |
import org.apache.paimon.manifest.PartitionEntry; | ||
import org.apache.paimon.options.CatalogOptions; | ||
import org.apache.paimon.options.Options; | ||
import org.apache.paimon.rest.auth.AuthOptions; | ||
import org.apache.paimon.rest.auth.AuthSession; | ||
import org.apache.paimon.rest.auth.CredentialsProvider; | ||
import org.apache.paimon.rest.auth.CredentialsProviderFactory; | ||
import org.apache.paimon.rest.responses.ConfigResponse; | ||
import org.apache.paimon.schema.Schema; | ||
import org.apache.paimon.schema.SchemaChange; | ||
import org.apache.paimon.table.Table; | ||
|
||
import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap; | ||
import org.apache.paimon.shade.guava30.com.google.common.annotations.VisibleForTesting; | ||
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper; | ||
|
||
import java.time.Duration; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.Optional; | ||
import java.util.concurrent.ScheduledExecutorService; | ||
|
||
import static org.apache.paimon.utils.ThreadPoolUtils.createScheduledThreadPool; | ||
|
||
/** A catalog implementation for REST. */ | ||
public class RESTCatalog implements Catalog { | ||
private RESTClient client; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. close this in catalog.close? |
||
private String token; | ||
private ResourcePaths resourcePaths; | ||
private Map<String, String> options; | ||
private Map<String, String> baseHeader; | ||
// a lazy thread pool for token refresh | ||
private final AuthSession catalogAuth; | ||
private volatile ScheduledExecutorService refreshExecutor = null; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. close this in catalog.close? |
||
private boolean keepTokenRefreshed; | ||
|
||
private static final ObjectMapper objectMapper = RESTObjectMapper.create(); | ||
static final String AUTH_HEADER = "Authorization"; | ||
static final String AUTH_HEADER_VALUE_FORMAT = "Bearer %s"; | ||
|
||
public RESTCatalog(Options options) { | ||
if (options.getOptional(CatalogOptions.WAREHOUSE).isPresent()) { | ||
throw new IllegalArgumentException("Can not config warehouse in RESTCatalog."); | ||
} | ||
String uri = options.get(RESTCatalogOptions.URI); | ||
token = options.get(RESTCatalogOptions.TOKEN); | ||
Optional<Duration> connectTimeout = | ||
options.getOptional(RESTCatalogOptions.CONNECTION_TIMEOUT); | ||
Optional<Duration> readTimeout = options.getOptional(RESTCatalogOptions.READ_TIMEOUT); | ||
|
@@ -71,12 +77,22 @@ public RESTCatalog(Options options) { | |
threadPoolSize, | ||
DefaultErrorHandler.getInstance()); | ||
this.client = new HttpClient(httpClientOptions); | ||
Map<String, String> authHeaders = | ||
ImmutableMap.of(AUTH_HEADER, String.format(AUTH_HEADER_VALUE_FORMAT, token)); | ||
this.baseHeader = configHeaders(options.toMap()); | ||
CredentialsProvider credentialsProvider = | ||
CredentialsProviderFactory.createCredentialsProvider( | ||
options, RESTCatalog.class.getClassLoader()); | ||
this.keepTokenRefreshed = options.get(AuthOptions.TOKEN_REFRESH_ENABLED); | ||
if (keepTokenRefreshed) { | ||
this.catalogAuth = | ||
AuthSession.fromRefreshCredentialsProvider( | ||
tokenRefreshExecutor(), this.baseHeader, credentialsProvider); | ||
|
||
} else { | ||
this.catalogAuth = new AuthSession(this.baseHeader, credentialsProvider); | ||
} | ||
Map<String, String> initHeaders = | ||
RESTUtil.merge(configHeaders(options.toMap()), authHeaders); | ||
RESTUtil.merge(configHeaders(options.toMap()), this.catalogAuth.getHeaders()); | ||
this.options = fetchOptionsFromServer(initHeaders, options.toMap()); | ||
this.baseHeader = configHeaders(this.options()); | ||
this.resourcePaths = | ||
ResourcePaths.forCatalogProperties( | ||
this.options.get(RESTCatalogInternalOptions.PREFIX)); | ||
|
@@ -187,11 +203,31 @@ public void close() throws Exception {} | |
Map<String, String> fetchOptionsFromServer( | ||
Map<String, String> headers, Map<String, String> clientProperties) { | ||
ConfigResponse response = | ||
client.get(ResourcePaths.V1_CONFIG, ConfigResponse.class, headers); | ||
client.get(ResourcePaths.V1_CONFIG, ConfigResponse.class, headers()); | ||
return response.merge(clientProperties); | ||
} | ||
|
||
private static Map<String, String> configHeaders(Map<String, String> properties) { | ||
return RESTUtil.extractPrefixMap(properties, "header."); | ||
} | ||
|
||
private Map<String, String> headers() { | ||
return catalogAuth.getHeaders(); | ||
} | ||
|
||
private ScheduledExecutorService tokenRefreshExecutor() { | ||
if (!keepTokenRefreshed) { | ||
return null; | ||
} | ||
|
||
if (refreshExecutor == null) { | ||
synchronized (this) { | ||
if (refreshExecutor == null) { | ||
this.refreshExecutor = createScheduledThreadPool(1, "token-refresh-thread"); | ||
} | ||
} | ||
} | ||
|
||
return refreshExecutor; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,53 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.paimon.rest.auth; | ||
|
||
import org.apache.paimon.options.ConfigOption; | ||
import org.apache.paimon.options.ConfigOptions; | ||
|
||
import java.time.Duration; | ||
|
||
/** Options for REST Catalog Auth. */ | ||
public class AuthOptions { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It is better to move all options to |
||
public static final ConfigOption<String> TOKEN = | ||
ConfigOptions.key("token") | ||
.stringType() | ||
.noDefaultValue() | ||
.withDescription("REST Catalog auth token."); | ||
public static final ConfigOption<Duration> TOKEN_EXPIRES_IN = | ||
ConfigOptions.key("token-expires-in") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. token.expiration-time |
||
.durationType() | ||
.defaultValue(Duration.ofHours(1)) | ||
.withDescription("REST Catalog auth token expires in."); | ||
public static final ConfigOption<Boolean> TOKEN_REFRESH_ENABLED = | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we really need this option? Maybe just remove this. |
||
ConfigOptions.key("token-refresh-enabled") | ||
.booleanType() | ||
.defaultValue(false) | ||
.withDescription("REST Catalog auth token refresh enable."); | ||
public static final ConfigOption<String> TOKEN_FILE_PATH = | ||
ConfigOptions.key("token-file-path") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
.stringType() | ||
.noDefaultValue() | ||
.withDescription("REST Catalog auth token file path."); | ||
public static final ConfigOption<String> CREDENTIALS_PROVIDER = | ||
ConfigOptions.key("credentials_provider") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It seems we don't need to provide this now, we can infer the provider from There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Move to RESTCatalogInternalOptions, if the user defines this, will use it. If not we'll use the option to get provider. |
||
.stringType() | ||
.noDefaultValue() | ||
.withDescription("REST Catalog auth credentials provider."); | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,145 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.paimon.rest.auth; | ||
|
||
import org.apache.paimon.rest.RESTUtil; | ||
import org.apache.paimon.utils.Pair; | ||
|
||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
import java.util.Map; | ||
import java.util.Optional; | ||
import java.util.concurrent.ScheduledExecutorService; | ||
import java.util.concurrent.TimeUnit; | ||
|
||
/** Auth session. */ | ||
public class AuthSession { | ||
private static final Logger log = LoggerFactory.getLogger(AuthSession.class); | ||
private static final int TOKEN_REFRESH_NUM_RETRIES = 5; | ||
private static final long MAX_REFRESH_WINDOW_MILLIS = 300_000; // 5 minutes | ||
private static final long MIN_REFRESH_WAIT_MILLIS = 10; | ||
private final CredentialsProvider credentialsProvider; | ||
private volatile Map<String, String> headers; | ||
|
||
public AuthSession(Map<String, String> headers, CredentialsProvider credentialsProvider) { | ||
this.headers = headers; | ||
this.credentialsProvider = credentialsProvider; | ||
} | ||
|
||
public static AuthSession fromRefreshCredentialsProvider( | ||
ScheduledExecutorService executor, | ||
Map<String, String> headers, | ||
CredentialsProvider credentialsProvider) { | ||
AuthSession session = new AuthSession(headers, credentialsProvider); | ||
|
||
long startTimeMillis = System.currentTimeMillis(); | ||
Optional<Long> expiresAtMillisOpt = credentialsProvider.expiresAtMillis(); | ||
|
||
if (expiresAtMillisOpt.isPresent() && expiresAtMillisOpt.get() <= startTimeMillis) { | ||
Pair<Boolean, Long> refreshResult = session.refresh(); | ||
|
||
// if expiration is non-null, then token refresh was successful | ||
boolean isSuccessful = refreshResult.getKey(); | ||
if (isSuccessful) { | ||
if (session.credentialsProvider.expiresAtMillis().isPresent()) { | ||
// use the new expiration time from the refreshed token | ||
expiresAtMillisOpt = session.credentialsProvider.expiresAtMillis(); | ||
} else { | ||
// otherwise use the expiration time from the token response | ||
expiresAtMillisOpt = Optional.of(startTimeMillis + refreshResult.getValue()); | ||
} | ||
} else { | ||
// token refresh failed, don't reattempt with the original expiration | ||
expiresAtMillisOpt = Optional.empty(); | ||
} | ||
} | ||
|
||
if (null != executor && expiresAtMillisOpt.isPresent()) { | ||
scheduleTokenRefresh(executor, session, expiresAtMillisOpt.get()); | ||
} | ||
|
||
return session; | ||
} | ||
|
||
public Map<String, String> getHeaders() { | ||
if (this.credentialsProvider.keepRefreshed() && this.credentialsProvider.willSoonExpire()) { | ||
refresh(); | ||
} | ||
return headers; | ||
} | ||
|
||
private static void scheduleTokenRefresh( | ||
ScheduledExecutorService executor, AuthSession session, long expiresAtMillis) { | ||
scheduleTokenRefresh(executor, session, expiresAtMillis, 0); | ||
} | ||
|
||
private static void scheduleTokenRefresh( | ||
ScheduledExecutorService executor, | ||
AuthSession session, | ||
long expiresAtMillis, | ||
int retryTimes) { | ||
if (retryTimes < TOKEN_REFRESH_NUM_RETRIES) { | ||
long expiresInMillis = expiresAtMillis - System.currentTimeMillis(); | ||
// how much ahead of time to start the request to allow it to complete | ||
long refreshWindowMillis = Math.min(expiresInMillis / 10, MAX_REFRESH_WINDOW_MILLIS); | ||
// how much time to wait before expiration | ||
long waitIntervalMillis = expiresInMillis - refreshWindowMillis; | ||
// how much time to actually wait | ||
long timeToWait = Math.max(waitIntervalMillis, MIN_REFRESH_WAIT_MILLIS); | ||
|
||
executor.schedule( | ||
() -> { | ||
long refreshStartTime = System.currentTimeMillis(); | ||
Pair<Boolean, Long> refreshResult = session.refresh(); | ||
boolean isSuccessful = refreshResult.getKey(); | ||
if (isSuccessful) { | ||
scheduleTokenRefresh( | ||
executor, | ||
session, | ||
refreshStartTime + refreshResult.getValue(), | ||
0); | ||
} else { | ||
scheduleTokenRefresh( | ||
executor, session, expiresAtMillis, retryTimes + 1); | ||
} | ||
}, | ||
timeToWait, | ||
TimeUnit.MILLISECONDS); | ||
} else { | ||
log.warn("Failed to refresh token after {} retries.", TOKEN_REFRESH_NUM_RETRIES); | ||
} | ||
} | ||
|
||
public Pair<Boolean, Long> refresh() { | ||
if (this.credentialsProvider.supportRefresh() | ||
&& this.credentialsProvider.keepRefreshed() | ||
&& this.credentialsProvider.expiresInMills().isPresent()) { | ||
boolean isSuccessful = this.credentialsProvider.refresh(); | ||
if (!isSuccessful) { | ||
return Pair.of(false, 0L); | ||
} | ||
Map<String, String> currentHeaders = this.headers; | ||
this.headers = RESTUtil.merge(currentHeaders, this.credentialsProvider.authHeader()); | ||
return Pair.of(true, credentialsProvider.expiresInMills().get()); | ||
} | ||
|
||
return Pair.of(false, 0L); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,36 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.paimon.rest.auth; | ||
|
||
import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap; | ||
|
||
import java.util.Map; | ||
|
||
/** Base bear token credentials provider. */ | ||
public abstract class BaseBearTokenCredentialsProvider implements CredentialsProvider { | ||
private static final String AUTHORIZATION_HEADER = "Authorization"; | ||
private static final String BEARER_PREFIX = "Bearer "; | ||
|
||
@Override | ||
public Map<String, String> authHeader() { | ||
return ImmutableMap.of(AUTHORIZATION_HEADER, BEARER_PREFIX + token()); | ||
} | ||
|
||
abstract String token(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
newDaemonThreadFactory?