Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Remove Catalog.getTableLocation interface #4718

Merged
merged 2 commits into from
Dec 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -496,7 +496,6 @@ public Optional<MetastoreClient.Factory> metastoreClientFactory(
return Optional.empty();
}

@Override
public Path getTableLocation(Identifier identifier) {
return new Path(newDatabasePath(identifier.getDatabaseName()), identifier.getTableName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import org.apache.paimon.annotation.Public;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
Expand Down Expand Up @@ -136,14 +135,6 @@ void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade)
*/
Table getTable(Identifier identifier) throws TableNotExistException;

/**
* Get the table location in this catalog. If the table exists, return the location of the
* table; If the table does not exist, construct the location for table.
*
* @return the table location
*/
Path getTableLocation(Identifier identifier);

/**
* Get names of all tables under this database. An empty list is returned if none exists.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.paimon.catalog;

import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
Expand Down Expand Up @@ -147,11 +146,6 @@ public void renameView(Identifier fromView, Identifier toView, boolean ignoreIfN
wrapped.renameView(fromView, toView, ignoreIfNotExists);
}

@Override
public Path getTableLocation(Identifier identifier) {
return wrapped.getTableLocation(identifier);
}

@Override
public void createPartition(Identifier identifier, Map<String, String> partitions)
throws TableNotExistException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.paimon.catalog.Database;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.options.Options;
Expand Down Expand Up @@ -177,11 +176,6 @@ public Table getTable(Identifier identifier) throws TableNotExistException {
throw new UnsupportedOperationException();
}

@Override
public Path getTableLocation(Identifier identifier) {
throw new UnsupportedOperationException();
}

@Override
public List<String> listTables(String databaseName) throws DatabaseNotExistException {
return new ArrayList<String>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.paimon.flink.procedure.ProcedureUtil;
import org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil;
import org.apache.paimon.flink.utils.FlinkDescriptorProperties;
import org.apache.paimon.fs.Path;
import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.operation.FileStoreCommit;
import org.apache.paimon.options.Options;
Expand Down Expand Up @@ -525,20 +524,6 @@ protected Schema buildPaimonSchema(
registerLogSystem(catalog, identifier, options, classLoader);
}

// remove table path
String path = options.remove(PATH.key());
if (path != null) {
Path expectedPath = catalog.getTableLocation(identifier);
if (!new Path(path).equals(expectedPath)) {
throw new CatalogException(
String.format(
"You specified the Path when creating the table, "
+ "but the Path '%s' is different from where it should be '%s'. "
+ "Please remove the Path.",
path, expectedPath));
}
}

if (catalogTable instanceof CatalogTable) {
return fromCatalogTable(((CatalogTable) catalogTable).copy(options));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@

package org.apache.paimon.flink.clone;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.FlinkCatalogFactory;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.Table;
import org.apache.paimon.utils.IOUtils;

import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
Expand All @@ -32,6 +34,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.Map;

/** A Operator to copy files. */
Expand All @@ -43,8 +46,11 @@ public class CopyFileOperator extends AbstractStreamOperator<CloneFileInfo>
private final Map<String, String> sourceCatalogConfig;
private final Map<String, String> targetCatalogConfig;

private Catalog sourceCatalog;
private Catalog targetCatalog;
private transient Catalog sourceCatalog;
private transient Catalog targetCatalog;

private transient Map<String, Path> srcLocations;
private transient Map<String, Path> targetLocations;

public CopyFileOperator(
Map<String, String> sourceCatalogConfig, Map<String, String> targetCatalogConfig) {
Expand All @@ -58,6 +64,8 @@ public void open() throws Exception {
FlinkCatalogFactory.createPaimonCatalog(Options.fromMap(sourceCatalogConfig));
targetCatalog =
FlinkCatalogFactory.createPaimonCatalog(Options.fromMap(targetCatalogConfig));
srcLocations = new HashMap<>();
targetLocations = new HashMap<>();
}

@Override
Expand All @@ -66,12 +74,29 @@ public void processElement(StreamRecord<CloneFileInfo> streamRecord) throws Exce

FileIO sourceTableFileIO = sourceCatalog.fileIO();
FileIO targetTableFileIO = targetCatalog.fileIO();

Path sourceTableRootPath =
sourceCatalog.getTableLocation(
Identifier.fromString(cloneFileInfo.getSourceIdentifier()));
srcLocations.computeIfAbsent(
cloneFileInfo.getSourceIdentifier(),
key -> {
try {
return pathOfTable(
sourceCatalog.getTable(Identifier.fromString(key)));
} catch (Catalog.TableNotExistException e) {
throw new RuntimeException(e);
}
});
Path targetTableRootPath =
targetCatalog.getTableLocation(
Identifier.fromString(cloneFileInfo.getTargetIdentifier()));
targetLocations.computeIfAbsent(
cloneFileInfo.getTargetIdentifier(),
key -> {
try {
return pathOfTable(
targetCatalog.getTable(Identifier.fromString(key)));
} catch (Catalog.TableNotExistException e) {
throw new RuntimeException(e);
}
});

String filePathExcludeTableRoot = cloneFileInfo.getFilePathExcludeTableRoot();
Path sourcePath = new Path(sourceTableRootPath + filePathExcludeTableRoot);
Expand Down Expand Up @@ -110,6 +135,10 @@ public void processElement(StreamRecord<CloneFileInfo> streamRecord) throws Exce
output.collect(streamRecord);
}

private Path pathOfTable(Table table) {
return new Path(table.options().get(CoreOptions.PATH.key()));
}

@Override
public void close() throws Exception {
if (sourceCatalog != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@

/** Test for {@link FlinkCatalog}. */
public class FlinkCatalogTest {

private static final String TESTING_LOG_STORE = "testing";

private final ObjectPath path1 = new ObjectPath("db1", "t1");
Expand Down Expand Up @@ -348,12 +349,7 @@ public void testCreateFlinkTableWithPath() throws Exception {
CatalogTable table1 = createTable(options);
assertThatThrownBy(() -> catalog.createTable(this.path1, table1, false))
.hasMessageContaining(
"You specified the Path when creating the table, "
+ "but the Path '/unknown/path' is different from where it should be");

options.put(PATH.key(), warehouse + "/db1.db/t1");
CatalogTable table2 = createTable(options);
catalog.createTable(this.path1, table2, false);
"The current catalog FileSystemCatalog does not support specifying the table path when creating a table.");
}

@ParameterizedTest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ private void innerTest(boolean deletionVectors) throws Exception {
.build();
catalog.createTable(identifier, schema, true);
FileStoreTable table = (FileStoreTable) catalog.getTable(identifier);
Path location = catalog.getTableLocation(identifier);
Path location = table.location();
Path successFile = new Path(location, "a=0/_SUCCESS");
PartitionMarkDone markDone =
PartitionMarkDone.create(false, false, new MockOperatorStateStore(), table).get();
Expand Down
Loading