Skip to content

Commit

Permalink
[core] add interface in Catalog for CloneAction adapter CachingCatalog (
Browse files Browse the repository at this point in the history
  • Loading branch information
wwj6591812 authored Aug 5, 2024
1 parent fdaff8c commit b3f532c
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.paimon.annotation.Public;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.metastore.MetastoreClient;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
Expand Down Expand Up @@ -169,6 +170,13 @@ void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade)
*/
List<String> listTables(String databaseName) throws DatabaseNotExistException;

/**
* Get the table location in this catalog.
*
* @return the table location
*/
Path getDataTableLocation(Identifier identifier);

/**
* Check if a table exists in this catalog.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.paimon.catalog;

import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.metastore.MetastoreClient;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
Expand Down Expand Up @@ -134,6 +135,11 @@ public Table getTable(Identifier identifier) throws TableNotExistException {
return wrapped.getTable(identifier);
}

@Override
public Path getDataTableLocation(Identifier identifier) {
return wrapped.getDataTableLocation(identifier);
}

@Override
public void dropPartition(Identifier identifier, Map<String, String> partitions)
throws TableNotExistException, PartitionNotExistException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@

import java.util.Map;

import static org.apache.paimon.CoreOptions.PATH;

/** A Operator to copy files. */
public class CopyFileOperator extends AbstractStreamOperator<CloneFileInfo>
implements OneInputStreamOperator<CloneFileInfo, CloneFileInfo> {
Expand Down Expand Up @@ -69,19 +67,11 @@ public void processElement(StreamRecord<CloneFileInfo> streamRecord) throws Exce
FileIO sourceTableFileIO = sourceCatalog.fileIO();
FileIO targetTableFileIO = targetCatalog.fileIO();
Path sourceTableRootPath =
new Path(
sourceCatalog
.getTable(
Identifier.fromString(cloneFileInfo.getSourceIdentifier()))
.options()
.get(PATH.key()));
sourceCatalog.getDataTableLocation(
Identifier.fromString(cloneFileInfo.getSourceIdentifier()));
Path targetTableRootPath =
new Path(
targetCatalog
.getTable(
Identifier.fromString(cloneFileInfo.getTargetIdentifier()))
.options()
.get(PATH.key()));
targetCatalog.getDataTableLocation(
Identifier.fromString(cloneFileInfo.getTargetIdentifier()));

String filePathExcludeTableRoot = cloneFileInfo.getFilePathExcludeTableRoot();
Path sourcePath = new Path(sourceTableRootPath + filePathExcludeTableRoot);
Expand Down

0 comments on commit b3f532c

Please sign in to comment.