Skip to content

Commit

Permalink
LakeFSFS - TokenProvider AWS (no login yet) (#7604)
Browse files Browse the repository at this point in the history
  • Loading branch information
Isan-Rivkin authored Apr 10, 2024
1 parent bde5176 commit 99a60cd
Show file tree
Hide file tree
Showing 17 changed files with 1,156 additions and 24 deletions.
2 changes: 1 addition & 1 deletion clients/hadoopfs/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ To export to S3:
<dependency>
<groupId>io.lakefs</groupId>
<artifactId>sdk</artifactId>
<version>1.12.0</version>
<version>1.17.0</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
Expand Down
11 changes: 11 additions & 0 deletions clients/hadoopfs/src/main/java/io/lakefs/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,22 @@
public class Constants {
public static final String DEFAULT_SCHEME = "lakefs";
public static final String DEFAULT_CLIENT_ENDPOINT = "http://localhost:8000/api/v1";
public static final String DEFAULT_AUTH_PROVIDER_SERVER_ID_HEADER = "X-Lakefs-Server-ID";
public static final String ACCESS_KEY_KEY_SUFFIX = "access.key";
public static final String SECRET_KEY_KEY_SUFFIX = "secret.key";
public static final String ENDPOINT_KEY_SUFFIX = "endpoint";
public static final String LIST_AMOUNT_KEY_SUFFIX = "list.amount";
public static final String ACCESS_MODE_KEY_SUFFIX = "access.mode";
// io.lakefs.auth.TemporaryAWSCredentialsLakeFSTokenProvider, io.lakefs.auth.InstanceProfileAWSCredentialsLakeFSTokenProvider
public static final String LAKEFS_AUTH_PROVIDER_KEY_SUFFIX = "auth.provider";
// TODO(isan) document all configuration fields before merge.
public static final String TOKEN_AWS_CREDENTIALS_PROVIDER_ACCESS_KEY_SUFFIX = "token.aws.access.key";
public static final String TOKEN_AWS_CREDENTIALS_PROVIDER_SECRET_KEY_SUFFIX = "token.aws.secret.key";
public static final String TOKEN_AWS_CREDENTIALS_PROVIDER_SESSION_TOKEN_KEY_SUFFIX = "token.aws.session.token";
public static final String TOKEN_AWS_CREDENTIALS_PROVIDER_TOKEN_DURATION_SECONDS = "token.aws.sts.duration_seconds";
public static final String TOKEN_AWS_CREDENTIALS_PROVIDER_ADDITIONAL_HEADERS = "token.sts.additional_headers";
public static final String TOKEN_AWS_STS_ENDPOINT = "token.aws.sts.endpoint";

public static final String SESSION_ID = "session_id";

public static enum AccessMode {
Expand Down
25 changes: 25 additions & 0 deletions clients/hadoopfs/src/main/java/io/lakefs/FSConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

import org.apache.hadoop.conf.Configuration;

import java.util.Arrays;
import java.util.Map;
import java.util.stream.Collectors;

public final class FSConfiguration {

private static String formatFSConfigurationKey(String scheme, String key) {
Expand Down Expand Up @@ -46,4 +50,25 @@ public static int getInt(Configuration conf, String scheme, String keySuffix, in
String valueString = get(conf, scheme, keySuffix);
return (valueString == null) ? defaultValue : Integer.parseInt(valueString);
}
/**
* lookup a map in a configuration key.
* The map is expected to be in the format of "key1:value1,key2:value2"
*
* @param conf configuration object to get the value from
* @param scheme used to format key for lookup
* @param keySuffix key suffix to lookup
* @return value found or default value
*/
public static Map<String,String> getMap(Configuration conf, String scheme, String keySuffix) {
String valueString = get(conf, scheme, keySuffix);
if (valueString == null) {
return null;
}
return Arrays.stream(valueString.split(","))
.map(entry -> entry.split(":"))
.collect(Collectors.toMap(
entry -> entry[0],
entry -> entry[1]
));
}
}
83 changes: 60 additions & 23 deletions clients/hadoopfs/src/main/java/io/lakefs/LakeFSClient.java
Original file line number Diff line number Diff line change
@@ -1,17 +1,23 @@
package io.lakefs;

import io.lakefs.auth.LakeFSTokenProvider;
import io.lakefs.auth.LakeFSTokenProviderFactory;
import io.lakefs.clients.sdk.*;
import io.lakefs.clients.sdk.auth.HttpBasicAuth;
import io.lakefs.clients.sdk.auth.HttpBearerAuth;
import org.apache.hadoop.conf.Configuration;

import java.io.IOException;

import static io.lakefs.auth.LakeFSTokenProviderFactory.newLakeFSTokenProvider;

/**
* Provides access to lakeFS API using client library.
* This class uses the configuration to initialize API client and instance per API interface we expose.
*/
public class LakeFSClient {
private static final String BASIC_AUTH = "basic_auth";
private static final String JWT_TOKEN_AUTH = "jwt_token";

private final ObjectsApi objectsApi;
private final StagingApi stagingApi;
Expand All @@ -21,49 +27,80 @@ public class LakeFSClient {
private final InternalApi internalApi;

public LakeFSClient(String scheme, Configuration conf) throws IOException {
String accessKey = FSConfiguration.get(conf, scheme, Constants.ACCESS_KEY_KEY_SUFFIX);
if (accessKey == null) {
throw new IOException("Missing lakeFS access key");
}
String authProvider = FSConfiguration.get(conf, scheme, Constants.LAKEFS_AUTH_PROVIDER_KEY_SUFFIX, LakeFSClient.BASIC_AUTH);
ApiClient apiClient;

String secretKey = FSConfiguration.get(conf, scheme, Constants.SECRET_KEY_KEY_SUFFIX);
if (secretKey == null) {
throw new IOException("Missing lakeFS secret key");
if (authProvider == BASIC_AUTH) {
String accessKey = FSConfiguration.get(conf, scheme, Constants.ACCESS_KEY_KEY_SUFFIX);
if (accessKey == null) {
throw new IOException("Missing lakeFS access key");
}

String secretKey = FSConfiguration.get(conf, scheme, Constants.SECRET_KEY_KEY_SUFFIX);
if (secretKey == null) {
throw new IOException("Missing lakeFS secret key");
}
apiClient = newApiClientNoAuth(scheme, conf);
HttpBasicAuth basicAuth = (HttpBasicAuth) apiClient.getAuthentication(BASIC_AUTH);
basicAuth.setUsername(accessKey);
basicAuth.setPassword(secretKey);
} else {
// TODO(isan) depends on missing functionality PR https://github.com/treeverse/lakeFS/pull/7578 being merged.
// once merged, we can use the following code to get the token
throw new IOException("Unsupported auth provider: " + authProvider + ". Only basic_auth is supported at the moment.");
// LakeFSTokenProvider tokenProvider = newLakeFSTokenProvider(scheme, conf);
// String jwt = tokenProvider.getToken();
// apiClient = newApiClientNoAuth(scheme, conf);
// HttpBearerAuth tokenAuth = (HttpBearerAuth)apiClient.getAuthentication(JWT_TOKEN_AUTH);
// tokenAuth.setBearerToken(jwt);
}

this.objectsApi = new ObjectsApi(apiClient);
this.stagingApi = new StagingApi(apiClient);
this.repositoriesApi = new RepositoriesApi(apiClient);
this.branchesApi = new BranchesApi(apiClient);
this.configApi = new ConfigApi(apiClient);
this.internalApi = new InternalApi(apiClient);
}

ApiClient newApiClientNoAuth(String scheme, Configuration conf) {

ApiClient apiClient = io.lakefs.clients.sdk.Configuration.getDefaultApiClient();
String endpoint = FSConfiguration.get(conf, scheme, Constants.ENDPOINT_KEY_SUFFIX, Constants.DEFAULT_CLIENT_ENDPOINT);
if (endpoint.endsWith(Constants.SEPARATOR)) {
endpoint = endpoint.substring(0, endpoint.length() - 1);
}
apiClient.setBasePath(endpoint);
apiClient.addDefaultHeader("X-Lakefs-Client", "lakefs-hadoopfs/" + getClass().getPackage().getImplementationVersion());
HttpBasicAuth basicAuth = (HttpBasicAuth) apiClient.getAuthentication(BASIC_AUTH);
basicAuth.setUsername(accessKey);
basicAuth.setPassword(secretKey);

String sessionId = FSConfiguration.get(conf, scheme, Constants.SESSION_ID);
if (sessionId != null) {
apiClient.addDefaultCookie("sessionId", sessionId);
}

this.objectsApi = new ObjectsApi(apiClient);
this.stagingApi = new StagingApi(apiClient);
this.repositoriesApi = new RepositoriesApi(apiClient);
this.branchesApi = new BranchesApi(apiClient);
this.configApi = new ConfigApi(apiClient);
this.internalApi = new InternalApi(apiClient);
return apiClient;
}

public ObjectsApi getObjectsApi() { return objectsApi; }
public ObjectsApi getObjectsApi() {
return objectsApi;
}

public StagingApi getStagingApi() { return stagingApi; }
public StagingApi getStagingApi() {
return stagingApi;
}

public RepositoriesApi getRepositoriesApi() { return repositoriesApi; }
public RepositoriesApi getRepositoriesApi() {
return repositoriesApi;
}

public BranchesApi getBranchesApi() { return branchesApi; }
public BranchesApi getBranchesApi() {
return branchesApi;
}

public ConfigApi getConfigApi() { return configApi; }
public ConfigApi getConfigApi() {
return configApi;
}

public InternalApi getInternalApi() { return internalApi; }
public InternalApi getInternalApi() {
return internalApi;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
package io.lakefs.auth;
import com.amazonaws.auth.AWSCredentialsProvider;
import io.lakefs.Constants;
import io.lakefs.FSConfiguration;
import io.lakefs.clients.sdk.ApiClient;
import io.lakefs.clients.sdk.model.AuthenticationToken;
import org.apache.commons.codec.binary.Base64;

import java.io.IOException;
import java.net.URI;
import java.net.URL;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;


public class AWSLakeFSTokenProvider implements LakeFSTokenProvider {
STSGetCallerIdentityPresigner stsPresigner;
AWSCredentialsProvider awsProvider;
AuthenticationToken lakeFSAuthToken = null;
String stsEndpoint;
Map<String, String> stsAdditionalHeaders;
int stsExpirationInSeconds;
ApiClient lakeFSApi;

AWSLakeFSTokenProvider() {
}

public AWSLakeFSTokenProvider(AWSCredentialsProvider awsProvider, ApiClient lakeFSClient, STSGetCallerIdentityPresigner stsPresigner, String stsEndpoint, Map<String, String> stsAdditionalHeaders, int stsExpirationInSeconds) {
this.awsProvider = awsProvider;
this.stsPresigner = stsPresigner;
this.lakeFSApi = lakeFSClient;
this.stsEndpoint = stsEndpoint;
this.stsAdditionalHeaders = stsAdditionalHeaders;
this.stsExpirationInSeconds = stsExpirationInSeconds;
}

protected void initialize(AWSCredentialsProvider awsProvider, String scheme, Configuration conf) throws IOException {
// aws credentials provider
this.awsProvider = awsProvider;

// sts endpoint to call STS
this.stsEndpoint = FSConfiguration.get(conf, scheme, Constants.TOKEN_AWS_STS_ENDPOINT);

if (this.stsEndpoint == null) {
throw new IOException("Missing sts endpoint");
}

// Expiration for each identity token generated (they are very short-lived and only used for exchange, the value is part of the signature)
this.stsExpirationInSeconds = FSConfiguration.getInt(conf, scheme, Constants.TOKEN_AWS_CREDENTIALS_PROVIDER_TOKEN_DURATION_SECONDS, 60);

// initialize the presigner
this.stsPresigner = new GetCallerIdentityV4Presigner();

// initialize a lakeFS api client

this.lakeFSApi = io.lakefs.clients.sdk.Configuration.getDefaultApiClient();
this.lakeFSApi.addDefaultHeader("X-Lakefs-Client", "lakefs-hadoopfs/" + getClass().getPackage().getImplementationVersion());
String endpoint = FSConfiguration.get(conf, scheme, Constants.ENDPOINT_KEY_SUFFIX, Constants.DEFAULT_CLIENT_ENDPOINT);
if (endpoint.endsWith(Constants.SEPARATOR)) {
endpoint = endpoint.substring(0, endpoint.length() - 1);
}
String sessionId = FSConfiguration.get(conf, scheme, Constants.SESSION_ID);
if (sessionId != null) {
this.lakeFSApi.addDefaultCookie("sessionId", sessionId);
}
this.lakeFSApi.setBasePath(endpoint);

// set additional headers (non-canonical) to sign with each request to STS
// non-canonical headers are signed by the presigner and sent to STS for verification in the requests by lakeFS to exchange the token
Map<String, String> additionalHeaders = FSConfiguration.getMap(conf, scheme, Constants.TOKEN_AWS_CREDENTIALS_PROVIDER_ADDITIONAL_HEADERS);
if (additionalHeaders == null) {
additionalHeaders = new HashMap<String, String>() {{
put(Constants.DEFAULT_AUTH_PROVIDER_SERVER_ID_HEADER, new URL(lakeFSApi.getBasePath()).getHost());
}};
// default header to sign is the lakeFS server host name
additionalHeaders.put(Constants.DEFAULT_AUTH_PROVIDER_SERVER_ID_HEADER, new URL(endpoint).getHost());
}
this.stsAdditionalHeaders = additionalHeaders;
}

@Override
public String getToken() {
if (needsNewToken()) {
refresh();
}
return this.lakeFSAuthToken.getToken();
}

private boolean needsNewToken() {
return this.lakeFSAuthToken == null || this.lakeFSAuthToken.getTokenExpiration() < System.currentTimeMillis();
}

public GeneratePresignGetCallerIdentityResponse newPresignedRequest() throws Exception {
GeneratePresignGetCallerIdentityRequest stsReq = new GeneratePresignGetCallerIdentityRequest(
new URI(this.stsEndpoint),
this.awsProvider.getCredentials(),
this.stsAdditionalHeaders,
this.stsExpirationInSeconds
);
return this.stsPresigner.presignRequest(stsReq);
}

public String newPresignedGetCallerIdentityToken() throws Exception {
GeneratePresignGetCallerIdentityResponse signedRequest = this.newPresignedRequest();

// generate token parameters object
LakeFSExternalPrincipalIdentityRequest identityTokenParams = new LakeFSExternalPrincipalIdentityRequest(
signedRequest.getHTTPMethod(),
signedRequest.getHost(),
signedRequest.getRegion(),
signedRequest.getAction(),
signedRequest.getDate(),
signedRequest.getExpires(),
signedRequest.getAccessKeyId(),
signedRequest.getSignature(),
Arrays.asList(signedRequest.getSignedHeadersParam().split(";")),
signedRequest.getVersion(),
signedRequest.getAlgorithm(),
signedRequest.getSecurityToken()
);

// base64 encode
return Base64.encodeBase64String(identityTokenParams.toJSON().getBytes());
}

private void newToken() throws Exception {
String identityToken = this.newPresignedGetCallerIdentityToken();
/*
TODO(isan)
depends on missing functionality PR https://github.com/treeverse/lakeFS/pull/7578 being merged.
before merging this code - implement the call to lakeFS.
it will introduce the functionality in the generated client of actually doing the login.
call lakeFS to exchange the token for a lakeFS token
The flow will be:
1. use this.lakeFSApi Client with ExternalPrincipal API class (no auth required)
2. this.lakeFSAuthToken = call api.ExternalPrincipalLogin(identityToken, <lakeFS Token optional TTL>)
*/
// dummy initiation
this.lakeFSAuthToken = new AuthenticationToken();
this.lakeFSAuthToken.setTokenExpiration(System.currentTimeMillis() + 60);
}

// refresh can be called to create a new token regardless if the current token is expired or not or does not exist.
@Override
public void refresh() {
synchronized (this) {
try {
newToken();
} catch (Exception e) {
throw new RuntimeException("Failed to refresh token", e);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package io.lakefs.auth;

import com.amazonaws.auth.AWSCredentials;

import java.net.URI;
import java.util.Map;

class GeneratePresignGetCallerIdentityRequest {
private final Map<String, String> additionalHeaders;
private final int expirationInSeconds;
private final URI stsEndpoint;

private AWSCredentials credentials;

public GeneratePresignGetCallerIdentityRequest(URI stsEndpoint, AWSCredentials credentials, Map<String, String> additionalHeaders, int expirationInSeconds) {
this.credentials = credentials;
this.stsEndpoint = stsEndpoint;
this.additionalHeaders = additionalHeaders;
this.expirationInSeconds = expirationInSeconds;
}

public Map<String, String> getAdditionalHeaders() {
return additionalHeaders;
}

public int getExpirationInSeconds() {
return expirationInSeconds;
}

public URI getStsEndpoint() {
return stsEndpoint;
}

public AWSCredentials getCredentials() {
return credentials;
}

}
Loading

0 comments on commit 99a60cd

Please sign in to comment.