From f15f6b1bf7f054b05193f2b726439f0466757653 Mon Sep 17 00:00:00 2001 From: Xinyi Zou Date: Mon, 25 Sep 2023 17:09:26 +0800 Subject: [PATCH] 4 --- .../java/org/apache/doris/common/Config.java | 25 +++-- .../arrowflight/DorisFlightSqlProducer.java | 14 ++- .../arrowflight/DorisFlightSqlService.java | 26 ++--- .../arrowflight/FlightStatementExecutor.java | 19 +--- ...sAuthResult.java => FlightAuthResult.java} | 6 +- .../arrowflight/auth2/FlightAuthUtils.java | 34 +++--- .../auth2/FlightBearerTokenAuthenticator.java | 25 +++-- .../auth2/FlightCookieMiddleware.java | 105 ------------------ .../auth2/FlightCredentialValidator.java | 36 +++--- .../sessions/FlightSessionsManager.java | 43 +++++++ .../FlightSessionsWithTokenManager.java | 96 ++++++++++++++++ .../FlightUserSession.java} | 23 ++-- .../tokens/FlightTokenDetails.java | 104 +++++++++++++++++ ...enManager.java => FlightTokenManager.java} | 12 +- ...rImpl.java => FlightTokenManagerImpl.java} | 49 ++++---- .../arrowflight/tokens/TokenDetails.java | 42 ------- 16 files changed, 380 insertions(+), 279 deletions(-) rename fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/auth2/{DorisAuthResult.java => FlightAuthResult.java} (81%) delete mode 100644 fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/auth2/FlightCookieMiddleware.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/sessions/FlightSessionsManager.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/sessions/FlightSessionsWithTokenManager.java rename fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/{tokens/SessionState.java => sessions/FlightUserSession.java} (81%) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/tokens/FlightTokenDetails.java rename fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/tokens/{TokenManager.java => FlightTokenManager.java} (75%) rename fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/tokens/{TokenManagerImpl.java => FlightTokenManagerImpl.java} (63%) delete mode 100644 fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/tokens/TokenDetails.java diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 3856e6bc4f92282..4df8e6c27083bea 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -2190,13 +2190,24 @@ public class Config extends ConfigBase { }) public static long auto_analyze_job_record_count = 20000; - @ConfField(description = {"Arrow Flight Server中所有用户token的缓存上限,超过后LRU淘汰", + @ConfField(description = {"Arrow Flight Server中所有用户token的缓存上限,超过后LRU淘汰,默认值为2000", "The cache limit of all user tokens in Arrow Flight Server. which will be eliminated by" - + "LRU rules after exceeding the limit."}) - public static int arrow_flight_token_cache_size = -1; - - @ConfField(description = {"Arrow Flight Server中用户token的存活时间,单位分钟", - "The alive time of the user token in Arrow Flight Server, unit minutes"}) - public static int arrow_flight_token_alive_time = -1; + + "LRU rules after exceeding the limit, the default value is 2000."}) + public static int arrow_flight_token_cache_size = 2000; + + @ConfField(description = {"Arrow Flight Server中用户token的存活时间,自上次写入后过期时间,单位分钟,默认值为4320,即3天", + "The alive time of the user token in Arrow Flight Server, expire after write, unit minutes," + + "the default value is 4320, which is 3 days"}) + public static int arrow_flight_token_alive_time = 4320; + + @ConfField(description = {"Arrow Flight Server中所有用户session的缓存上限,超过后LRU淘汰,默认值为1000", + "The cache limit of all user sessions in Arrow Flight Server. which will be eliminated by" + + "LRU rules after exceeding the limit, the default value is 1000."}) + public static int arrow_flight_session_cache_size = 1000; + + @ConfField(description = {"Arrow Flight Server中用户session的存活时间,自上次访问后过期时间,单位分钟,默认值为120", + "The alive time of the user token in Arrow Flight Server, expire after access, unit minutes," + + "the default value is 120"}) + public static int arrow_flight_session_alive_time = 120; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java index edc37fe8bed46db..9cf9e18d4331cac 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java @@ -22,7 +22,8 @@ import org.apache.doris.common.util.DebugUtil; import org.apache.doris.common.util.Util; -import org.apache.doris.service.arrowflight.tokens.TokenManager; +import org.apache.doris.service.arrowflight.sessions.FlightSessionsManager; +import org.apache.doris.service.arrowflight.sessions.FlightUserSession; import com.google.protobuf.Any; import com.google.protobuf.ByteString; @@ -73,11 +74,11 @@ public class DorisFlightSqlProducer implements FlightSqlProducer, AutoCloseable private final Location location; private final BufferAllocator rootAllocator = new RootAllocator(); private final SqlInfoBuilder sqlInfoBuilder; - private final TokenManager tokenManager; + private final FlightSessionsManager flightSessionsManager; - public DorisFlightSqlProducer(final Location location, TokenManager tokenManager) { + public DorisFlightSqlProducer(final Location location, FlightSessionsManager flightSessionsManager) { this.location = location; - this.tokenManager = tokenManager; + this.flightSessionsManager = flightSessionsManager; sqlInfoBuilder = new SqlInfoBuilder(); sqlInfoBuilder.withFlightSqlServerName("DorisFE") .withFlightSqlServerVersion("1.0") @@ -107,9 +108,10 @@ public void closePreparedStatement(final ActionClosePreparedStatementRequest req public FlightInfo getFlightInfoStatement(final CommandStatementQuery request, final CallContext context, final FlightDescriptor descriptor) { try { - tokenManager.validateToken(context.peerIdentity()); + FlightUserSession flightUserSession = flightSessionsManager.getUserSession(context.peerIdentity()); final String query = request.getQuery(); - final FlightStatementExecutor flightStatementExecutor = new FlightStatementExecutor(query); + final FlightStatementExecutor flightStatementExecutor = new FlightStatementExecutor(query, + flightUserSession.getConnectContext()); flightStatementExecutor.executeQuery(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlService.java b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlService.java index edf77526f1a191e..dfc53c793819511 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlService.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlService.java @@ -19,12 +19,12 @@ import org.apache.doris.common.Config; import org.apache.doris.service.arrowflight.auth2.FlightBearerTokenAuthenticator; -import org.apache.doris.service.arrowflight.auth2.FlightCookieMiddleware; -import org.apache.doris.service.arrowflight.tokens.TokenManager; -import org.apache.doris.service.arrowflight.tokens.TokenManagerImpl; +import org.apache.doris.service.arrowflight.sessions.FlightSessionsManager; +import org.apache.doris.service.arrowflight.sessions.FlightSessionsWithTokenManager; +import org.apache.doris.service.arrowflight.tokens.FlightTokenManager; +import org.apache.doris.service.arrowflight.tokens.FlightTokenManagerImpl; import org.apache.arrow.flight.FlightServer; -import org.apache.arrow.flight.FlightServerMiddleware; import org.apache.arrow.flight.Location; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; @@ -40,23 +40,21 @@ public class DorisFlightSqlService { private static final Logger LOG = LogManager.getLogger(DorisFlightSqlService.class); private final FlightServer flightServer; private volatile boolean running; - private final TokenManager tokenManager; - public static final String FLIGHT_CLIENT_PROPERTIES_MIDDLEWARE = "client-properties-middleware"; - public static final FlightServerMiddleware.Key FLIGHT_CLIENT_PROPERTIES_MIDDLEWARE_KEY - = FlightServerMiddleware.Key.of(FLIGHT_CLIENT_PROPERTIES_MIDDLEWARE); + private final FlightTokenManager flightTokenManager; + private final FlightSessionsManager flightSessionsManager; public DorisFlightSqlService(int port) { BufferAllocator allocator = new RootAllocator(); Location location = Location.forGrpcInsecure("0.0.0.0", port); - this.tokenManager = new TokenManagerImpl(Config.arrow_flight_token_cache_size, + this.flightTokenManager = new FlightTokenManagerImpl(Config.arrow_flight_token_cache_size, Config.arrow_flight_token_alive_time); + this.flightSessionsManager = new FlightSessionsWithTokenManager(flightTokenManager, + Config.arrow_flight_session_cache_size, + Config.arrow_flight_session_alive_time); - DorisFlightSqlProducer producer = new DorisFlightSqlProducer(location, tokenManager); + DorisFlightSqlProducer producer = new DorisFlightSqlProducer(location, flightSessionsManager); flightServer = FlightServer.builder(allocator, location, producer) - .headerAuthenticator(new FlightBearerTokenAuthenticator(tokenManager)).build(); - // .middleware(FLIGHT_CLIENT_PROPERTIES_MIDDLEWARE_KEY, - // new FlightServerCookieMiddleware.Factory()) - // .authHandler(new BasicServerAuthHandler(new FlightServerBasicAuthValidator())).build(); + .headerAuthenticator(new FlightBearerTokenAuthenticator(flightTokenManager)).build(); } // start Arrow Flight SQL service, return true if success, otherwise false diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightStatementExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightStatementExecutor.java index ced03350de4e058..58b660cdc187407 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightStatementExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightStatementExecutor.java @@ -64,9 +64,9 @@ public final class FlightStatementExecutor { private TNetworkAddress resultInternalServiceAddr; private ArrayList resultOutputExprs; - public FlightStatementExecutor(final String query) { + public FlightStatementExecutor(final String query, ConnectContext connectContext) { this.query = query; - acConnectContext = buildConnectContext(); + this.acConnectContext = new AutoCloseConnectContext(connectContext); } public void setQueryId(TUniqueId queryId) { @@ -126,21 +126,6 @@ public int hashCode() { return Objects.hash(this); } - public static AutoCloseConnectContext buildConnectContext() { - ConnectContext connectContext = new ConnectContext(); - SessionVariable sessionVariable = connectContext.getSessionVariable(); - sessionVariable.internalSession = true; - sessionVariable.setEnablePipelineEngine(false); // TODO - sessionVariable.setEnablePipelineXEngine(false); // TODO - connectContext.setEnv(Env.getCurrentEnv()); - connectContext.setQualifiedUser(UserIdentity.ROOT.getQualifiedUser()); // TODO - connectContext.setCurrentUserIdentity(UserIdentity.ROOT); // TODO - connectContext.setStartTime(); - connectContext.setCluster(SystemInfoService.DEFAULT_CLUSTER); - connectContext.setResultSinkType(TResultSinkType.ARROW_FLIGHT_PROTOCAL); - return new AutoCloseConnectContext(connectContext); - } - public void executeQuery() { try { UUID uuid = UUID.randomUUID(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/auth2/DorisAuthResult.java b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/auth2/FlightAuthResult.java similarity index 81% rename from fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/auth2/DorisAuthResult.java rename to fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/auth2/FlightAuthResult.java index 6ec68973c42ff3e..f7e8e3400639ea6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/auth2/DorisAuthResult.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/auth2/FlightAuthResult.java @@ -15,8 +15,6 @@ // specific language governing permissions and limitations // under the License. // This file is copied from -// https://github.com/dremio/dremio-oss/blob/master/services/arrow-flight/src/main/java/com/dremio/service/flight/ServerCookieMiddleware.java -// and modified by Doris package org.apache.doris.service.arrowflight.auth2; @@ -28,14 +26,14 @@ * Result of Authentication. */ @Value.Immutable -public interface DorisAuthResult { +public interface FlightAuthResult { String getUserName(); UserIdentity getUserIdentity(); String getRemoteIp(); - static DorisAuthResult of(String userName, UserIdentity userIdentity, String remoteIp) { + static FlightAuthResult of(String userName, UserIdentity userIdentity, String remoteIp) { return ImmutableDorisAuthResult.builder() .userName(userName) .userIdentity(userIdentity) diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/auth2/FlightAuthUtils.java b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/auth2/FlightAuthUtils.java index 66e59fba13f658c..b605dff66b6a21d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/auth2/FlightAuthUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/auth2/FlightAuthUtils.java @@ -20,7 +20,7 @@ import org.apache.doris.analysis.UserIdentity; import org.apache.doris.catalog.Env; import org.apache.doris.common.AuthenticationException; -import org.apache.doris.service.arrowflight.tokens.TokenManager; +import org.apache.doris.service.arrowflight.tokens.FlightTokenManager; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; @@ -30,48 +30,46 @@ import java.util.List; /** - * A collection of common Dremio Flight server authentication methods. + * A collection of common Flight server authentication methods. */ public final class FlightAuthUtils { private FlightAuthUtils() { } /** - * Authenticate against Dremio with the provided credentials. + * Authenticate against with the provided credentials. * - * @param username Dremio username. - * @param password Dremio password. + * @param username username. + * @param password password. * @param logger the slf4j logger for logging. - * @throws org.apache.arrow.flight.FlightRuntimeException if unable to authenticate against Dremio + * @throws org.apache.arrow.flight.FlightRuntimeException if unable to authenticate against * with the provided credentials. */ - public static DorisAuthResult authenticateCredentials(String username, String password, String remoteIp, + public static FlightAuthResult authenticateCredentials(String username, String password, String remoteIp, Logger logger) { try { List currentUserIdentity = Lists.newArrayList(); Env.getCurrentEnv().getAuth().checkPlainPassword(username, remoteIp, password, currentUserIdentity); Preconditions.checkState(currentUserIdentity.size() == 1); - return DorisAuthResult.of(username, currentUserIdentity.get(0), remoteIp); + return FlightAuthResult.of(username, currentUserIdentity.get(0), remoteIp); } catch (AuthenticationException e) { logger.error("Unable to authenticate user {}", username, e); - final String errorMessage = "Unable to authenticate user " + username + ", exception: " + e.getMessage(); - throw CallStatus.UNAUTHENTICATED.withCause(e).withDescription(errorMessage).toRuntimeException(); + final String errMsg = "Unable to authenticate user " + username + ", exception: " + e.getMessage(); + throw CallStatus.UNAUTHENTICATED.withCause(e).withDescription(errMsg).toRuntimeException(); } } /** - * Create a new token with the TokenManager and create a new UserSession object associated with - * the authenticated username. * Creates a new Bearer Token. Returns the bearer token associated with the User. * - * @param tokenManager the TokenManager. + * @param flightTokenManager the TokenManager. * @param username the user to create a Flight server session for. - * @param dorisAuthResult tht DorisAuthResult. - * @return the token associated with the UserSession created. + * @param flightAuthResult the FlightAuthResult. + * @return the token associated with the FlightTokenDetails created. */ - public static String createToken(TokenManager tokenManager, String username, DorisAuthResult dorisAuthResult) { - // TODO: DX-25278: Add ClientAddress information while creating a Token in DremioFlightServerAuthValidator - return tokenManager.createToken(username, dorisAuthResult).token; + public static String createToken(FlightTokenManager flightTokenManager, String username, + FlightAuthResult flightAuthResult) { + return flightTokenManager.createToken(username, flightAuthResult).getToken(); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/auth2/FlightBearerTokenAuthenticator.java b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/auth2/FlightBearerTokenAuthenticator.java index a5c1d9236b213cb..ef6e28b034dd5e7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/auth2/FlightBearerTokenAuthenticator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/auth2/FlightBearerTokenAuthenticator.java @@ -15,12 +15,12 @@ // specific language governing permissions and limitations // under the License. // This file is copied from -// https://github.com/dremio/dremio-oss/blob/master/services/arrow-flight/src/main/java/com/dremio/service/flight/ServerCookieMiddleware.java +// https://github.com/dremio/dremio-oss/blob/master/services/arrow-flight/src/main/java/com/dremio/service/flight/auth2/DremioBearerTokenAuthenticator.java // and modified by Doris package org.apache.doris.service.arrowflight.auth2; -import org.apache.doris.service.arrowflight.tokens.TokenManager; +import org.apache.doris.service.arrowflight.tokens.FlightTokenManager; import org.apache.arrow.flight.CallHeaders; import org.apache.arrow.flight.CallStatus; @@ -32,9 +32,9 @@ import org.apache.logging.log4j.Logger; /** - * Dremio's custom implementation of CallHeaderAuthenticator for bearer token authentication. - * This class implements CallHeaderAuthenticator rather than BearerTokenAuthenticator. Dremio - * creates UserSession objects when the bearer token is created and requires access to the CallHeaders + * Doris's custom implementation of CallHeaderAuthenticator for bearer token authentication. + * This class implements CallHeaderAuthenticator rather than BearerTokenAuthenticator. Doris + * creates FlightTokenDetails objects when the bearer token is created and requires access to the CallHeaders * in getAuthResultWithBearerToken. */ @@ -42,17 +42,18 @@ public class FlightBearerTokenAuthenticator implements CallHeaderAuthenticator { private static final Logger LOG = LogManager.getLogger(FlightBearerTokenAuthenticator.class); private final CallHeaderAuthenticator initialAuthenticator; - private final TokenManager tokenManager; + private final FlightTokenManager flightTokenManager; - public FlightBearerTokenAuthenticator(TokenManager tokenManager) { - this.tokenManager = tokenManager; - this.initialAuthenticator = new BasicCallHeaderAuthenticator(new FlightCredentialValidator(this.tokenManager)); + public FlightBearerTokenAuthenticator(FlightTokenManager flightTokenManager) { + this.flightTokenManager = flightTokenManager; + this.initialAuthenticator = new BasicCallHeaderAuthenticator( + new FlightCredentialValidator(this.flightTokenManager)); } /** * If no bearer token is provided, the method initiates initial password and username * authentication. Once authenticated, client properties are retrieved from incoming CallHeaders. - * Then it generates a token and creates a UserSession with the retrieved client properties. + * Then it generates a token and creates a FlightTokenDetails with the retrieved client properties. * associated with it. *

* If a bearer token is provided, the method validates the provided token. @@ -81,7 +82,7 @@ public AuthResult authenticate(CallHeaders incomingHeaders) { */ AuthResult validateBearer(String token) { try { - tokenManager.validateToken(token); + flightTokenManager.validateToken(token); return createAuthResultWithBearerToken(token); } catch (IllegalArgumentException e) { LOG.error("Bearer token validation failed.", e); @@ -93,7 +94,7 @@ AuthResult validateBearer(String token) { /** * Helper method to create an AuthResult. * - * @param token the token to create a UserSession for. + * @param token the token to create a FlightTokenDetails for. * @return a new AuthResult with functionality to add given bearer token to the outgoing header. */ private AuthResult createAuthResultWithBearerToken(String token) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/auth2/FlightCookieMiddleware.java b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/auth2/FlightCookieMiddleware.java deleted file mode 100644 index 56dd2f14a67265c..000000000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/auth2/FlightCookieMiddleware.java +++ /dev/null @@ -1,105 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. -// This file is copied from -// https://github.com/dremio/dremio-oss/blob/master/services/arrow-flight/src/main/java/com/dremio/service/flight/ServerCookieMiddleware.java -// and modified by Doris - -package org.apache.doris.service.arrowflight.auth2; - -import org.apache.arrow.flight.CallHeaders; -import org.apache.arrow.flight.CallInfo; -import org.apache.arrow.flight.CallStatus; -import org.apache.arrow.flight.FlightServerMiddleware; -import org.apache.arrow.flight.RequestContext; - -import java.util.HashMap; -import java.util.Map; -import java.util.stream.Collectors; -import javax.validation.constraints.NotNull; - -/** - * ServerCookieMiddleware allows a FlightServer to retrieve cookies from the request as well as set outgoing cookies - * TODO - */ -public final class FlightCookieMiddleware implements FlightServerMiddleware { - private RequestContext requestContext; - private Map cookieValues; - private final CallHeaders incomingHeaders; - - public static class Factory implements FlightServerMiddleware.Factory { - /** - * Construct a factory for receiving call headers. - */ - public Factory() { - } - - @Override - public FlightCookieMiddleware onCallStarted(CallInfo callInfo, CallHeaders incomingHeaders, - RequestContext context) { - return new FlightCookieMiddleware(callInfo, incomingHeaders, context); - } - } - - - private FlightCookieMiddleware(CallInfo callInfo, CallHeaders incomingHeaders, - RequestContext requestContext) { - this.incomingHeaders = incomingHeaders; - this.requestContext = requestContext; - this.cookieValues = new HashMap(); - cookieValues.put("SESSION_ID", "1"); - } - - /** - * Retrieve the headers for this call. - */ - public CallHeaders headers() { - return this.incomingHeaders; - } - - public void addCookie(@NotNull String key, @NotNull String value) { - // Add to the internal map of values - this.cookieValues.put(key, value); - } - - public RequestContext getRequestContext() { - return this.requestContext; - } - - @Override - public void onBeforeSendingHeaders(CallHeaders outgoingHeaders) { - // if internal values are not empty - if (cookieValues.isEmpty()) { - return; - } - - final String cookies = cookieValues.entrySet() - .stream() - .map((entry) -> String.format("%s=%s", entry.getKey(), entry.getValue())) - .collect(Collectors.joining(";")); - - // set it in the headers - outgoingHeaders.insert("Set-Cookie", cookies); - } - - @Override - public void onCallCompleted(CallStatus status) { - } - - @Override - public void onCallErrored(Throwable err) { - } -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/auth2/FlightCredentialValidator.java b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/auth2/FlightCredentialValidator.java index 820042b0c22a6d7..6676e8526ef0f54 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/auth2/FlightCredentialValidator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/auth2/FlightCredentialValidator.java @@ -15,12 +15,12 @@ // specific language governing permissions and limitations // under the License. // This file is copied from -// https://github.com/dremio/dremio-oss/blob/master/services/arrow-flight/src/main/java/com/dremio/service/flight/ServerCookieMiddleware.java +// https://github.com/dremio/dremio-oss/blob/master/services/arrow-flight/src/main/java/com/dremio/service/flight/auth2/DremioCredentialValidator.java // and modified by Doris package org.apache.doris.service.arrowflight.auth2; -import org.apache.doris.service.arrowflight.tokens.TokenManager; +import org.apache.doris.service.arrowflight.tokens.FlightTokenManager; import org.apache.arrow.flight.auth2.BasicCallHeaderAuthenticator; import org.apache.arrow.flight.auth2.CallHeaderAuthenticator.AuthResult; @@ -28,43 +28,43 @@ import org.apache.logging.log4j.Logger; /** - * Dremio authentication specialized CredentialValidator implementation. + * Authentication specialized CredentialValidator implementation. */ public class FlightCredentialValidator implements BasicCallHeaderAuthenticator.CredentialValidator { private static final Logger LOG = LogManager.getLogger(FlightCredentialValidator.class); - private final TokenManager tokenManager; + private final FlightTokenManager flightTokenManager; - public FlightCredentialValidator(TokenManager tokenManager) { - this.tokenManager = tokenManager; + public FlightCredentialValidator(FlightTokenManager flightTokenManager) { + this.flightTokenManager = flightTokenManager; } /** - * Authenticates against Dremio with the provided username and password. + * Authenticates against with the provided username and password. * - * @param username Dremio username. - * @param password Dremio user password. + * @param username username. + * @param password user password. * @return AuthResult with username as the peer identity. */ @Override public AuthResult validate(String username, String password) { - // TODO Add ClientAddress information while creating a Token in FlightServerBasicAuthValidator + // TODO Add ClientAddress information while creating a Token String remoteIp = "0.0.0.0"; - DorisAuthResult dorisAuthResult = FlightAuthUtils.authenticateCredentials(username, password, remoteIp, LOG); - return getAuthResultWithBearerToken(dorisAuthResult); + FlightAuthResult flightAuthResult = FlightAuthUtils.authenticateCredentials(username, password, remoteIp, LOG); + return getAuthResultWithBearerToken(flightAuthResult); } /** * Generates a bearer token, parses client properties from incoming headers, then creates a - * UserSession associated with the generated token and client properties. + * FlightTokenDetails associated with the generated token and client properties. * - * @param dorisAuthResult the DorisAuthResult from initial authentication, with peer identity captured. - * @return an an AuthResult with the bearer token and peer identity. + * @param flightAuthResult the FlightAuthResult from initial authentication, with peer identity captured. + * @return an FlightAuthResult with the bearer token and peer identity. */ - AuthResult getAuthResultWithBearerToken(DorisAuthResult dorisAuthResult) { - final String username = dorisAuthResult.getUserName(); - final String token = FlightAuthUtils.createToken(tokenManager, username, dorisAuthResult); + AuthResult getAuthResultWithBearerToken(FlightAuthResult flightAuthResult) { + final String username = flightAuthResult.getUserName(); + final String token = FlightAuthUtils.createToken(flightTokenManager, username, flightAuthResult); return () -> token; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/sessions/FlightSessionsManager.java b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/sessions/FlightSessionsManager.java new file mode 100644 index 000000000000000..d5ed398a9ce312a --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/sessions/FlightSessionsManager.java @@ -0,0 +1,43 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// This file is copied from +// https://github.com/dremio/dremio-oss/blob/master/services/arrow-flight/src/main/java/com/dremio/service/flight/DremioFlightSessionsManager.java +// and modified by Doris + +package org.apache.doris.service.arrowflight.sessions; + +/** + * Manages UserSession creation and UserSession cache. + */ +public interface FlightSessionsManager extends AutoCloseable { + + /** + * Resolves an existing UserSession for the given token. + *

+ * + * @param peerIdentity identity after authorization + * @return The UserSession or null if no sessionId is given. + */ + FlightUserSession getUserSession(String peerIdentity); + + /** + * Creates a UserSession object and store it in the local cache, assuming that the peerIdentity was already validated. + * + * @param peerIdentity identity after authorization + */ + FlightUserSession createUserSession(String peerIdentity); +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/sessions/FlightSessionsWithTokenManager.java b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/sessions/FlightSessionsWithTokenManager.java new file mode 100644 index 000000000000000..78d8bd9a9e0f647 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/sessions/FlightSessionsWithTokenManager.java @@ -0,0 +1,96 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// https://github.com/dremio/dremio-oss/blob/master/services/arrow-flight/src/main/java/com/dremio/service/flight/TokenCacheFlightSessionManager.java +// and modified by Doris + +package org.apache.doris.service.arrowflight.sessions; + +import org.apache.doris.service.arrowflight.tokens.FlightTokenDetails; +import org.apache.doris.service.arrowflight.tokens.FlightTokenManager; + +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import org.apache.arrow.flight.CallStatus; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.concurrent.TimeUnit; + +public class FlightSessionsWithTokenManager implements FlightSessionsManager { + private static final Logger LOG = LogManager.getLogger(FlightSessionsWithTokenManager.class); + + private final Cache userSessions; + + private final int cacheExpiration; + + private final FlightTokenManager flightTokenManager; + + public FlightSessionsWithTokenManager(FlightTokenManager flightTokenManager, final int cacheSize, + final int cacheExpiration) { + this.flightTokenManager = flightTokenManager; + this.cacheExpiration = cacheExpiration; + this.userSessions = CacheBuilder.newBuilder() + .maximumSize(cacheSize) + .expireAfterAccess(cacheExpiration, TimeUnit.MINUTES) + .build(new CacheLoader() { + @Override + public FlightUserSession load(String key) { + return new FlightUserSession(); + } + }); + } + + @Override + public FlightUserSession getUserSession(String peerIdentity) { + FlightUserSession userSession = userSessions.getIfPresent(peerIdentity); + if (null == userSession) { + userSession = createUserSession(peerIdentity); + if (null == userSession) { + flightTokenManager.invalidateToken(peerIdentity); + String err = "UserSession expire after access " + cacheExpiration + " minutes ago, reauthorize."; + LOG.error(err); + throw CallStatus.UNAUTHENTICATED.withDescription(err).toRuntimeException(); + } + return userSession; + } + return userSession; + } + + @Override + public FlightUserSession createUserSession(String peerIdentity) { + try { + final FlightTokenDetails flightTokenDetails = flightTokenManager.validateToken(peerIdentity); + if (flightTokenDetails.getCreatedSession()) { + return null; + } + final FlightUserSession flightUserSession = new FlightUserSession(flightTokenDetails.getUsername(), + flightTokenDetails.getIssuedAt(), flightTokenDetails.getExpiresAt()); + flightUserSession.setAuthResult(flightTokenDetails.getUserIdentity(), flightTokenDetails.getRemoteIp()); + userSessions.put(peerIdentity, flightUserSession); + return flightUserSession; + } catch (IllegalArgumentException e) { + LOG.error("Bearer token validation failed.", e); + throw CallStatus.UNAUTHENTICATED.toRuntimeException(); + } + } + + @Override + public void close() throws Exception { + userSessions.invalidateAll(); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/tokens/SessionState.java b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/sessions/FlightUserSession.java similarity index 81% rename from fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/tokens/SessionState.java rename to fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/sessions/FlightUserSession.java index e4658c91bb61454..da55ba631e2cba1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/tokens/SessionState.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/sessions/FlightUserSession.java @@ -14,31 +14,34 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. +// This file is copied from -package org.apache.doris.service.arrowflight.tokens; +package org.apache.doris.service.arrowflight.sessions; import org.apache.doris.analysis.UserIdentity; import org.apache.doris.catalog.Env; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.SessionVariable; -import org.apache.doris.service.arrowflight.auth2.DorisAuthResult; import org.apache.doris.system.SystemInfoService; import org.apache.doris.thrift.TResultSinkType; -public class SessionState { +/** + * Object holding UserSesssion + */ +public class FlightUserSession { private final String username; private final long issuedAt; private final long expiresAt; private final ConnectContext connectContext; - public SessionState() { + public FlightUserSession() { this.username = ""; this.issuedAt = 0; this.expiresAt = 0; - this.connectContext = buildConnectContext(); + this.connectContext = new ConnectContext(); } - public SessionState(String username, long issuedAt, long expiresAt) { + public FlightUserSession(String username, long issuedAt, long expiresAt) { this.username = username; this.issuedAt = issuedAt; this.expiresAt = expiresAt; @@ -61,10 +64,10 @@ public ConnectContext getConnectContext() { return connectContext; } - public void setAuthResult(final DorisAuthResult authResult) { - connectContext.setQualifiedUser(authResult.getUserIdentity().getQualifiedUser()); - connectContext.setCurrentUserIdentity(authResult.getUserIdentity()); - connectContext.setRemoteIP(authResult.getRemoteIp()); + public void setAuthResult(final UserIdentity userIdentity, final String remoteIpy) { + connectContext.setQualifiedUser(userIdentity.getQualifiedUser()); + connectContext.setCurrentUserIdentity(userIdentity); + connectContext.setRemoteIP(remoteIpy); } public static ConnectContext buildConnectContext() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/tokens/FlightTokenDetails.java b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/tokens/FlightTokenDetails.java new file mode 100644 index 000000000000000..16af05a66544987 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/tokens/FlightTokenDetails.java @@ -0,0 +1,104 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.service.arrowflight.tokens; + +import org.apache.doris.analysis.UserIdentity; + +import com.google.common.base.Preconditions; + +/** + * Details of a token. + */ +public final class FlightTokenDetails { + + private final String token; + private final String username; + private final long issuedAt; + private final long expiresAt; + private final String remoteIp; + private final UserIdentity userIdentity; + private boolean createdSession = false; + + public FlightTokenDetails() { + this.token = ""; + this.username = ""; + this.issuedAt = 0; + this.expiresAt = 0; + this.remoteIp = ""; + this.userIdentity = new UserIdentity(username, remoteIp); + } + + public FlightTokenDetails(String token, String username, long issuedAt, long expiresAt, UserIdentity userIdentity, + String remoteIp) { + Preconditions.checkNotNull(token); + Preconditions.checkNotNull(username); + this.token = token; + this.username = username; + this.issuedAt = issuedAt; + this.expiresAt = expiresAt; + this.remoteIp = remoteIp; + this.userIdentity = userIdentity; + } + + public FlightTokenDetails(String token, String username, long expiresAt) { + Preconditions.checkNotNull(token); + Preconditions.checkNotNull(username); + this.token = token; + this.username = username; + this.expiresAt = expiresAt; + this.issuedAt = 0; + this.remoteIp = ""; + this.userIdentity = new UserIdentity(username, remoteIp); + } + + public String getToken() { + return token; + } + + public String getUsername() { + return username; + } + + public long getIssuedAt() { + return issuedAt; + } + + public long getExpiresAt() { + return expiresAt; + } + + public String getRemoteIp() { + return remoteIp; + } + + public UserIdentity getUserIdentity() { + return userIdentity; + } + + public void setCreatedSession(boolean createdSession) { + this.createdSession = createdSession; + } + + public boolean getCreatedSession() { + return createdSession; + } + + public static FlightTokenDetails of(String token, String username, long expiresAt) { + return new FlightTokenDetails(token, username, expiresAt); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/tokens/TokenManager.java b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/tokens/FlightTokenManager.java similarity index 75% rename from fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/tokens/TokenManager.java rename to fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/tokens/FlightTokenManager.java index 235cba10d1542b9..23435c3c0460db5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/tokens/TokenManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/tokens/FlightTokenManager.java @@ -14,15 +14,17 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. +// https://github.com/dremio/dremio-oss/blob/master/services/arrow-flight/src/main/java/com/dremio/service/tokens/TokenManager.java +// and modified by Doris package org.apache.doris.service.arrowflight.tokens; -import org.apache.doris.service.arrowflight.auth2.DorisAuthResult; +import org.apache.doris.service.arrowflight.auth2.FlightAuthResult; /** * Token manager. */ -public interface TokenManager { +public interface FlightTokenManager extends AutoCloseable { /** * Generate a securely random token. @@ -35,10 +37,10 @@ public interface TokenManager { * Create a token for the session, and return details about the token. * * @param username user name - * @param dorisAuthResult auth result + * @param flightAuthResult auth result * @return token details */ - TokenDetails createToken(String username, DorisAuthResult dorisAuthResult); + FlightTokenDetails createToken(String username, FlightAuthResult flightAuthResult); /** * Validate the token, and return details about the token. @@ -47,7 +49,7 @@ public interface TokenManager { * @return token details * @throws IllegalArgumentException if the token is invalid or expired */ - TokenDetails validateToken(String token) throws IllegalArgumentException; + FlightTokenDetails validateToken(String token) throws IllegalArgumentException; /** * Invalidate the token. diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/tokens/TokenManagerImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/tokens/FlightTokenManagerImpl.java similarity index 63% rename from fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/tokens/TokenManagerImpl.java rename to fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/tokens/FlightTokenManagerImpl.java index 03abeda931f200c..16fd8d74cf2cc05 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/tokens/TokenManagerImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/tokens/FlightTokenManagerImpl.java @@ -14,10 +14,12 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. +// https://github.com/dremio/dremio-oss/blob/master/services/arrow-flight/src/main/java/com/dremio/service/tokens/TokenManagerImpl.java +// and modified by Doris package org.apache.doris.service.arrowflight.tokens; -import org.apache.doris.service.arrowflight.auth2.DorisAuthResult; +import org.apache.doris.service.arrowflight.auth2.FlightAuthResult; import com.google.common.base.Preconditions; import com.google.common.cache.CacheBuilder; @@ -33,24 +35,24 @@ /** * Token manager implementation. */ -public class TokenManagerImpl implements TokenManager { - private static final Logger LOG = LogManager.getLogger(TokenManagerImpl.class); +public class FlightTokenManagerImpl implements FlightTokenManager { + private static final Logger LOG = LogManager.getLogger(FlightTokenManagerImpl.class); private final SecureRandom generator = new SecureRandom(); private final int cacheExpiration; - private LoadingCache tokenCache; + private LoadingCache tokenCache; - public TokenManagerImpl(final int cacheSize, final int cacheExpiration) { + public FlightTokenManagerImpl(final int cacheSize, final int cacheExpiration) { this.cacheExpiration = cacheExpiration; this.tokenCache = CacheBuilder.newBuilder() .maximumSize(cacheSize) .expireAfterWrite(cacheExpiration, TimeUnit.MINUTES) - .build(new CacheLoader() { + .build(new CacheLoader() { @Override - public SessionState load(String key) { - return new SessionState(); + public FlightTokenDetails load(String key) { + return new FlightTokenDetails(); } }); } @@ -66,40 +68,40 @@ public String newToken() { } @Override - public TokenDetails createToken(final String username, final DorisAuthResult dorisAuthResult) { + public FlightTokenDetails createToken(final String username, final FlightAuthResult flightAuthResult) { final String token = newToken(); final long now = System.currentTimeMillis(); final long expires = now + TimeUnit.MILLISECONDS.convert(cacheExpiration, TimeUnit.MINUTES); - final SessionState state = new SessionState(username, now, expires); - state.setAuthResult(dorisAuthResult); + final FlightTokenDetails flightTokenDetails = new FlightTokenDetails(token, username, now, expires, + flightAuthResult.getUserIdentity(), flightAuthResult.getRemoteIp()); - tokenCache.put(token, state); - LOG.trace("Created token for user: {}", username); - return TokenDetails.of(token, username, expires); + tokenCache.put(token, flightTokenDetails); + LOG.trace("Created flight token for user: {}", username); + return FlightTokenDetails.of(token, username, expires); } @Override - public TokenDetails validateToken(final String token) throws IllegalArgumentException { - final SessionState value = getSessionState(token); + public FlightTokenDetails validateToken(final String token) throws IllegalArgumentException { + final FlightTokenDetails value = getTokenDetails(token); if (System.currentTimeMillis() >= value.getExpiresAt()) { tokenCache.invalidate(token); // removes from the store as well throw new IllegalArgumentException("token expired"); } - LOG.trace("Validated token for user: {}", value.getUsername()); - return TokenDetails.of(token, value.getUsername(), value.getExpiresAt()); + LOG.trace("Validated flight token for user: {}", value.getUsername()); + return FlightTokenDetails.of(token, value.getUsername(), value.getExpiresAt()); } @Override public void invalidateToken(final String token) { - LOG.trace("Invalidate token"); + LOG.trace("Invalidate flight token, {}", token); tokenCache.invalidate(token); // removes from the store as well } - private SessionState getSessionState(final String token) { + private FlightTokenDetails getTokenDetails(final String token) { Preconditions.checkNotNull(token, "invalid token"); - final SessionState value; + final FlightTokenDetails value; try { value = tokenCache.getUnchecked(token); } catch (CacheLoader.InvalidCacheLoadException ignored) { @@ -108,4 +110,9 @@ private SessionState getSessionState(final String token) { return value; } + + @Override + public void close() throws Exception { + tokenCache.invalidateAll(); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/tokens/TokenDetails.java b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/tokens/TokenDetails.java deleted file mode 100644 index 762601e47edcbba..000000000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/tokens/TokenDetails.java +++ /dev/null @@ -1,42 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.service.arrowflight.tokens; - -import com.google.common.base.Preconditions; - -/** - * Details of a token. - */ -public final class TokenDetails { - - public final String token; - public final String username; - public final long expiresAt; - - private TokenDetails(String token, String username, long expiresAt) { - Preconditions.checkNotNull(token); - Preconditions.checkNotNull(username); - this.token = token; - this.username = username; - this.expiresAt = expiresAt; - } - - public static TokenDetails of(String token, String username, long expiresAt) { - return new TokenDetails(token, username, expiresAt); - } -}