From 4090a8860061f58748e3faa6804094f90d3575f3 Mon Sep 17 00:00:00 2001 From: Eduard Tudenhoefner Date: Sun, 10 Dec 2023 22:57:32 +0100 Subject: [PATCH] Core: Add REST catalog table session cache (#8920) The purpose of caching auth session for tables is mainly so that we can stop refreshing the session when it isn't used anymore --- .../iceberg/rest/RESTSessionCatalog.java | 56 +++++++++++++++---- .../apache/iceberg/rest/TestRESTCatalog.java | 2 +- 2 files changed, 45 insertions(+), 13 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java index a2c2b7dd7030..5f660f0f4fe8 100644 --- a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java +++ b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java @@ -92,6 +92,7 @@ import org.apache.iceberg.rest.responses.OAuthTokenResponse; import org.apache.iceberg.rest.responses.UpdateNamespacePropertiesResponse; import org.apache.iceberg.util.EnvironmentUtil; +import org.apache.iceberg.util.Pair; import org.apache.iceberg.util.PropertyUtil; import org.apache.iceberg.util.ThreadPools; import org.apache.iceberg.view.BaseView; @@ -123,6 +124,7 @@ public class RESTSessionCatalog extends BaseViewSessionCatalog private final Function, RESTClient> clientBuilder; private final BiFunction, FileIO> ioBuilder; private Cache sessions = null; + private Cache tableSessions = null; private Cache fileIOCloser; private AuthSession catalogAuth = null; private boolean keepTokenRefreshed = true; @@ -197,6 +199,7 @@ public void initialize(String name, Map unresolved) { Map baseHeaders = configHeaders(mergedProps); this.sessions = newSessionCache(mergedProps); + this.tableSessions = newSessionCache(mergedProps); this.keepTokenRefreshed = PropertyUtil.propertyAsBoolean( mergedProps, @@ -242,7 +245,15 @@ private AuthSession session(SessionContext context) { AuthSession session = sessions.get( context.sessionId(), - id -> newSession(context.credentials(), context.properties(), catalogAuth)); + id -> { + Pair> newSession = + newSession(context.credentials(), context.properties(), catalogAuth); + if (null != newSession) { + return newSession.second().get(); + } + + return null; + }); return session != null ? session : catalogAuth; } @@ -859,7 +870,12 @@ private FileIO tableFileIO(SessionContext context, Map config) { } private AuthSession tableSession(Map tableConf, AuthSession parent) { - AuthSession session = newSession(tableConf, tableConf, parent); + Pair> newSession = newSession(tableConf, tableConf, parent); + if (null == newSession) { + return parent; + } + + AuthSession session = tableSessions.get(newSession.first(), id -> newSession.second().get()); return session != null ? session : parent; } @@ -889,30 +905,46 @@ private static ConfigResponse fetchConfig( return configResponse; } - private AuthSession newSession( + private Pair> newSession( Map credentials, Map properties, AuthSession parent) { if (credentials != null) { // use the bearer token without exchanging if (credentials.containsKey(OAuth2Properties.TOKEN)) { - return AuthSession.fromAccessToken( - client, - tokenRefreshExecutor(), + return Pair.of( credentials.get(OAuth2Properties.TOKEN), - expiresAtMillis(properties), - parent); + () -> + AuthSession.fromAccessToken( + client, + tokenRefreshExecutor(), + credentials.get(OAuth2Properties.TOKEN), + expiresAtMillis(properties), + parent)); } if (credentials.containsKey(OAuth2Properties.CREDENTIAL)) { // fetch a token using the client credentials flow - return AuthSession.fromCredential( - client, tokenRefreshExecutor(), credentials.get(OAuth2Properties.CREDENTIAL), parent); + return Pair.of( + credentials.get(OAuth2Properties.CREDENTIAL), + () -> + AuthSession.fromCredential( + client, + tokenRefreshExecutor(), + credentials.get(OAuth2Properties.CREDENTIAL), + parent)); } for (String tokenType : TOKEN_PREFERENCE_ORDER) { if (credentials.containsKey(tokenType)) { // exchange the token for an access token using the token exchange flow - return AuthSession.fromTokenExchange( - client, tokenRefreshExecutor(), credentials.get(tokenType), tokenType, parent); + return Pair.of( + credentials.get(tokenType), + () -> + AuthSession.fromTokenExchange( + client, + tokenRefreshExecutor(), + credentials.get(tokenType), + tokenType, + parent)); } } } diff --git a/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java b/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java index 3de9c7b1d3f7..d0634be16c9f 100644 --- a/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java +++ b/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java @@ -1162,7 +1162,7 @@ public void testTableAuth( // if the table returned a bearer token, there will be no token request if (!tableConfig.containsKey("token")) { // client credentials or token exchange to get a table token - Mockito.verify(adapter, times(2)) + Mockito.verify(adapter, times(1)) .execute( eq(HTTPMethod.POST), eq("v1/oauth/tokens"),