Skip to content

Commit

Permalink
[core] Remove Catalog.getTableLocation interface (#4718)
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi authored Dec 16, 2024
1 parent 5309ccc commit 7767020
Show file tree
Hide file tree
Showing 8 changed files with 38 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -496,7 +496,6 @@ public Optional<MetastoreClient.Factory> metastoreClientFactory(
return Optional.empty();
}

@Override
public Path getTableLocation(Identifier identifier) {
return new Path(newDatabasePath(identifier.getDatabaseName()), identifier.getTableName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import org.apache.paimon.annotation.Public;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
Expand Down Expand Up @@ -136,14 +135,6 @@ void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade)
*/
Table getTable(Identifier identifier) throws TableNotExistException;

/**
* Get the table location in this catalog. If the table exists, return the location of the
* table; If the table does not exist, construct the location for table.
*
* @return the table location
*/
Path getTableLocation(Identifier identifier);

/**
* Get names of all tables under this database. An empty list is returned if none exists.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.paimon.catalog;

import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
Expand Down Expand Up @@ -147,11 +146,6 @@ public void renameView(Identifier fromView, Identifier toView, boolean ignoreIfN
wrapped.renameView(fromView, toView, ignoreIfNotExists);
}

@Override
public Path getTableLocation(Identifier identifier) {
return wrapped.getTableLocation(identifier);
}

@Override
public void createPartition(Identifier identifier, Map<String, String> partitions)
throws TableNotExistException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.paimon.catalog.Database;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.options.Options;
Expand Down Expand Up @@ -177,11 +176,6 @@ public Table getTable(Identifier identifier) throws TableNotExistException {
throw new UnsupportedOperationException();
}

@Override
public Path getTableLocation(Identifier identifier) {
throw new UnsupportedOperationException();
}

@Override
public List<String> listTables(String databaseName) throws DatabaseNotExistException {
return new ArrayList<String>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.paimon.flink.procedure.ProcedureUtil;
import org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil;
import org.apache.paimon.flink.utils.FlinkDescriptorProperties;
import org.apache.paimon.fs.Path;
import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.operation.FileStoreCommit;
import org.apache.paimon.options.Options;
Expand Down Expand Up @@ -525,20 +524,6 @@ protected Schema buildPaimonSchema(
registerLogSystem(catalog, identifier, options, classLoader);
}

// remove table path
String path = options.remove(PATH.key());
if (path != null) {
Path expectedPath = catalog.getTableLocation(identifier);
if (!new Path(path).equals(expectedPath)) {
throw new CatalogException(
String.format(
"You specified the Path when creating the table, "
+ "but the Path '%s' is different from where it should be '%s'. "
+ "Please remove the Path.",
path, expectedPath));
}
}

if (catalogTable instanceof CatalogTable) {
return fromCatalogTable(((CatalogTable) catalogTable).copy(options));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@

package org.apache.paimon.flink.clone;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.FlinkCatalogFactory;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.Table;
import org.apache.paimon.utils.IOUtils;

import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
Expand All @@ -32,6 +34,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.Map;

/** A Operator to copy files. */
Expand All @@ -43,8 +46,11 @@ public class CopyFileOperator extends AbstractStreamOperator<CloneFileInfo>
private final Map<String, String> sourceCatalogConfig;
private final Map<String, String> targetCatalogConfig;

private Catalog sourceCatalog;
private Catalog targetCatalog;
private transient Catalog sourceCatalog;
private transient Catalog targetCatalog;

private transient Map<String, Path> srcLocations;
private transient Map<String, Path> targetLocations;

public CopyFileOperator(
Map<String, String> sourceCatalogConfig, Map<String, String> targetCatalogConfig) {
Expand All @@ -58,6 +64,8 @@ public void open() throws Exception {
FlinkCatalogFactory.createPaimonCatalog(Options.fromMap(sourceCatalogConfig));
targetCatalog =
FlinkCatalogFactory.createPaimonCatalog(Options.fromMap(targetCatalogConfig));
srcLocations = new HashMap<>();
targetLocations = new HashMap<>();
}

@Override
Expand All @@ -66,12 +74,29 @@ public void processElement(StreamRecord<CloneFileInfo> streamRecord) throws Exce

FileIO sourceTableFileIO = sourceCatalog.fileIO();
FileIO targetTableFileIO = targetCatalog.fileIO();

Path sourceTableRootPath =
sourceCatalog.getTableLocation(
Identifier.fromString(cloneFileInfo.getSourceIdentifier()));
srcLocations.computeIfAbsent(
cloneFileInfo.getSourceIdentifier(),
key -> {
try {
return pathOfTable(
sourceCatalog.getTable(Identifier.fromString(key)));
} catch (Catalog.TableNotExistException e) {
throw new RuntimeException(e);
}
});
Path targetTableRootPath =
targetCatalog.getTableLocation(
Identifier.fromString(cloneFileInfo.getTargetIdentifier()));
targetLocations.computeIfAbsent(
cloneFileInfo.getTargetIdentifier(),
key -> {
try {
return pathOfTable(
targetCatalog.getTable(Identifier.fromString(key)));
} catch (Catalog.TableNotExistException e) {
throw new RuntimeException(e);
}
});

String filePathExcludeTableRoot = cloneFileInfo.getFilePathExcludeTableRoot();
Path sourcePath = new Path(sourceTableRootPath + filePathExcludeTableRoot);
Expand Down Expand Up @@ -110,6 +135,10 @@ public void processElement(StreamRecord<CloneFileInfo> streamRecord) throws Exce
output.collect(streamRecord);
}

private Path pathOfTable(Table table) {
return new Path(table.options().get(CoreOptions.PATH.key()));
}

@Override
public void close() throws Exception {
if (sourceCatalog != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@

/** Test for {@link FlinkCatalog}. */
public class FlinkCatalogTest {

private static final String TESTING_LOG_STORE = "testing";

private final ObjectPath path1 = new ObjectPath("db1", "t1");
Expand Down Expand Up @@ -348,12 +349,7 @@ public void testCreateFlinkTableWithPath() throws Exception {
CatalogTable table1 = createTable(options);
assertThatThrownBy(() -> catalog.createTable(this.path1, table1, false))
.hasMessageContaining(
"You specified the Path when creating the table, "
+ "but the Path '/unknown/path' is different from where it should be");

options.put(PATH.key(), warehouse + "/db1.db/t1");
CatalogTable table2 = createTable(options);
catalog.createTable(this.path1, table2, false);
"The current catalog FileSystemCatalog does not support specifying the table path when creating a table.");
}

@ParameterizedTest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ private void innerTest(boolean deletionVectors) throws Exception {
.build();
catalog.createTable(identifier, schema, true);
FileStoreTable table = (FileStoreTable) catalog.getTable(identifier);
Path location = catalog.getTableLocation(identifier);
Path location = table.location();
Path successFile = new Path(location, "a=0/_SUCCESS");
PartitionMarkDone markDone =
PartitionMarkDone.create(false, false, new MockOperatorStateStore(), table).get();
Expand Down

0 comments on commit 7767020

Please sign in to comment.