Skip to content

Commit

Permalink
support data token
Browse files Browse the repository at this point in the history
  • Loading branch information
jerry-024 committed Jan 17, 2025
1 parent 587fa28 commit 74b509e
Show file tree
Hide file tree
Showing 7 changed files with 259 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -149,4 +149,10 @@ public class CatalogOptions {
"Whether to support format tables, format table corresponds to a regular csv, parquet or orc table, allowing read and write operations. "
+ "However, during these processes, it does not connect to the metastore; hence, newly added partitions will not be reflected in"
+ " the metastore and need to be manually added as separate partition operations.");

public static final ConfigOption<Boolean> FILE_IO_REFRESH_CREDENTIAL_ENABLE =
ConfigOptions.key("file-io-refresh-credential.enabled")
.booleanType()
.defaultValue(false)
.withDescription("Whether to support file io refresh credential.");
}
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,8 @@ public Table getTable(Identifier identifier) throws TableNotExistException {
SnapshotCommit.Factory commitFactory =
new RenamingSnapshotCommit.Factory(
lockFactory().orElse(null), lockContext().orElse(null));
return CatalogUtils.loadTable(this, identifier, this::loadTableMetadata, commitFactory);
return CatalogUtils.loadTable(
this, this.fileIO(), identifier, this::loadTableMetadata, commitFactory);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ public static List<Partition> listPartitionsFromFileSystem(Table table) {
*/
public static Table loadTable(
Catalog catalog,
FileIO fileIO,
Identifier identifier,
TableMetadata.Loader metadataLoader,
SnapshotCommit.Factory commitFactory)
Expand All @@ -189,11 +190,10 @@ public static Table loadTable(
new CatalogEnvironment(
identifier, metadata.uuid(), catalog.catalogLoader(), commitFactory);
Path path = new Path(schema.options().get(PATH.key()));
FileStoreTable table =
FileStoreTableFactory.create(catalog.fileIO(), path, schema, catalogEnv);
FileStoreTable table = FileStoreTableFactory.create(fileIO, path, schema, catalogEnv);

if (options.type() == TableType.OBJECT_TABLE) {
table = toObjectTable(catalog, table);
table = toObjectTable(fileIO, table);
}

if (identifier.isSystemTable()) {
Expand Down Expand Up @@ -265,10 +265,11 @@ private static FormatTable toFormatTable(Identifier identifier, TableSchema sche
.build();
}

private static ObjectTable toObjectTable(Catalog catalog, FileStoreTable underlyingTable) {
private static ObjectTable toObjectTable(FileIO fileIO, FileStoreTable underlyingTable) {
CoreOptions options = underlyingTable.coreOptions();
String objectLocation = options.objectLocation();
FileIO objectFileIO = catalog.fileIO(new Path(objectLocation));
// todo: check whether here is ok.
FileIO objectFileIO = fileIO;
return ObjectTable.builder()
.underlyingTable(underlyingTable)
.objectLocation(objectLocation)
Expand Down
40 changes: 34 additions & 6 deletions paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ public class RESTCatalog implements Catalog {
private final ResourcePaths resourcePaths;
private final AuthSession catalogAuth;
private final Options options;
private final boolean fileIORefreshCredentialEnable;
private final FileIO fileIO;

private volatile ScheduledExecutorService refreshExecutor = null;
Expand All @@ -130,13 +131,20 @@ public RESTCatalog(CatalogContext context) {
.merge(context.options().toMap()));
this.resourcePaths = ResourcePaths.forCatalogProperties(options);

this.fileIORefreshCredentialEnable =
options.get(CatalogOptions.FILE_IO_REFRESH_CREDENTIAL_ENABLE);
try {
String warehouseStr = options.get(CatalogOptions.WAREHOUSE);
this.fileIO =
FileIO.get(
new Path(warehouseStr),
CatalogContext.create(
options, context.preferIO(), context.fallbackIO()));
if (fileIORefreshCredentialEnable) {
// todo: check whether is ok
this.fileIO = null;
} else {
String warehouseStr = options.get(CatalogOptions.WAREHOUSE);
this.fileIO =
FileIO.get(
new Path(warehouseStr),
CatalogContext.create(
options, context.preferIO(), context.fallbackIO()));
}
} catch (IOException e) {
LOG.warn("Can not get FileIO from options.");
throw new RuntimeException(e);
Expand All @@ -149,6 +157,9 @@ protected RESTCatalog(Options options, FileIO fileIO) {
this.options = options;
this.resourcePaths = ResourcePaths.forCatalogProperties(options);
this.fileIO = fileIO;
this.fileIORefreshCredentialEnable =
options.get(CatalogOptions.FILE_IO_REFRESH_CREDENTIAL_ENABLE);
;
}

@Override
Expand All @@ -168,11 +179,27 @@ public RESTCatalogLoader catalogLoader() {

@Override
public FileIO fileIO() {
if (fileIORefreshCredentialEnable) {
throw new UnsupportedOperationException();
}
return fileIO;
}

// todo: need cache table identifier location
@Override
public FileIO fileIO(Path path) {
if (fileIORefreshCredentialEnable) {
throw new UnsupportedOperationException();
}
return fileIO;
}

// todo: need cache table identifier fileIO
public FileIO fileIO(Identifier identifier) {
if (fileIORefreshCredentialEnable) {
return new RefreshCredentialFileIO(
resourcePaths, catalogAuth, options, client, identifier);
}
return fileIO;
}

Expand Down Expand Up @@ -288,6 +315,7 @@ public List<String> listTables(String databaseName) throws DatabaseNotExistExcep
public Table getTable(Identifier identifier) throws TableNotExistException {
return CatalogUtils.loadTable(
this,
this.fileIO(identifier),
identifier,
this::loadTableMetadata,
new RESTSnapshotCommitFactory(catalogLoader()));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.rest;

import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.FileStatus;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.PositionOutputStream;
import org.apache.paimon.fs.SeekableInputStream;
import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.options.Options;
import org.apache.paimon.rest.auth.AuthSession;
import org.apache.paimon.rest.responses.GetTableCredentialsResponse;

import java.io.IOException;
import java.util.Date;
import java.util.Map;

/** A {@link FileIO} to support refresh credential. */
public class RefreshCredentialFileIO implements FileIO {
private final ResourcePaths resourcePaths;
private final AuthSession catalogAuth;
private RESTClient client;
protected Options options;
private Identifier identifier;
private Date expireAt;
private transient volatile FileIO lazyFileIO;

public RefreshCredentialFileIO(
ResourcePaths resourcePaths,
AuthSession catalogAuth,
Options options,
RESTClient client,
Identifier identifier) {
this.resourcePaths = resourcePaths;
this.catalogAuth = catalogAuth;
this.options = options;
this.identifier = identifier;
this.client = client;
}

@Override
public void configure(CatalogContext context) {
// Do not get Hadoop Configuration in CatalogOptions
// The class is in different classloader from pluginClassLoader!
this.options = context.options();
}

@Override
public SeekableInputStream newInputStream(Path path) throws IOException {
return fileIO().newInputStream(path);
}

@Override
public PositionOutputStream newOutputStream(Path path, boolean overwrite) throws IOException {
return fileIO().newOutputStream(path, overwrite);
}

@Override
public FileStatus getFileStatus(Path path) throws IOException {
return fileIO().getFileStatus(path);
}

@Override
public FileStatus[] listStatus(Path path) throws IOException {
return fileIO().listStatus(path);
}

@Override
public boolean exists(Path path) throws IOException {
return fileIO().exists(path);
}

@Override
public boolean delete(Path path, boolean recursive) throws IOException {
return fileIO().delete(path, recursive);
}

@Override
public boolean mkdirs(Path path) throws IOException {
return fileIO().mkdirs(path);
}

@Override
public boolean rename(Path src, Path dst) throws IOException {
return fileIO().rename(src, dst);
}

@Override
public boolean isObjectStore() {
try {
return fileIO().isObjectStore();
} catch (IOException e) {
throw new RuntimeException(e);
}
}

private FileIO fileIO() throws IOException {
if (lazyFileIO == null || shouldRefresh()) {
synchronized (this) {
if (lazyFileIO == null || shouldRefresh()) {
GetTableCredentialsResponse response = getCredential();
expireAt = response.getExpiresAt();
Map<String, String> conf =
RESTUtil.merge(options.toMap(), response.getCredential());
Options updateCredentialOption = new Options(conf);
lazyFileIO =
FileIO.get(
new Path(updateCredentialOption.get(CatalogOptions.WAREHOUSE)),
CatalogContext.create(updateCredentialOption));
}
}
}
return lazyFileIO;
}

private GetTableCredentialsResponse getCredential() {
return client.get(
resourcePaths.tableCredentials(
identifier.getDatabaseName(), identifier.getObjectName()),
GetTableCredentialsResponse.class,
catalogAuth.getHeaders());
}

private boolean shouldRefresh() {
return expireAt != null && expireAt.getTime() > System.currentTimeMillis();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ public String commitTable(String databaseName) {
return SLASH.join(V1, prefix, DATABASES, databaseName, TABLES, "commit");
}

public String tableCredentials(String databaseName, String tableName) {
return SLASH.join(V1, prefix, DATABASES, databaseName, TABLES, tableName, "credentials");
}

public String partitions(String databaseName, String tableName) {
return SLASH.join(V1, prefix, DATABASES, databaseName, TABLES, tableName, "partitions");
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.rest.responses;

import org.apache.paimon.rest.RESTResponse;

import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty;

import java.util.Date;
import java.util.Map;

/** Response for table credentials. */
@JsonIgnoreProperties(ignoreUnknown = true)
public class GetTableCredentialsResponse implements RESTResponse {

private static final String FIELD_CREDENTIAL = "credential";
private static final String FIELD_EXPIREAT = "expiresAt";

@JsonProperty(FIELD_CREDENTIAL)
private final Map<String, String> credential;

@JsonProperty(FIELD_EXPIREAT)
private Date expiresAt;

@JsonCreator
public GetTableCredentialsResponse(
@JsonProperty(FIELD_EXPIREAT) Date expiresAt,
@JsonProperty(FIELD_CREDENTIAL) Map<String, String> credential) {
this.expiresAt = expiresAt;
this.credential = credential;
}

@JsonGetter(FIELD_CREDENTIAL)
public Map<String, String> getCredential() {
return credential;
}

@JsonGetter(FIELD_EXPIREAT)
public Date getExpiresAt() {
return expiresAt;
}
}

0 comments on commit 74b509e

Please sign in to comment.