Skip to content

Commit

Permalink
Revert "fix the clone tests"
Browse files Browse the repository at this point in the history
This reverts commit 3a00acc.
  • Loading branch information
neuyilan committed Jan 3, 2025
1 parent f3303c1 commit 3a63857
Show file tree
Hide file tree
Showing 8 changed files with 68 additions and 194 deletions.
22 changes: 0 additions & 22 deletions paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
Original file line number Diff line number Diff line change
Expand Up @@ -484,28 +484,6 @@ public DataFileMeta copy(List<String> newExtraFiles) {
externalPath);
}

public DataFileMeta copy(String newExternalPath) {
return new DataFileMeta(
fileName,
fileSize,
rowCount,
minKey,
maxKey,
keyStats,
valueStats,
minSequenceNumber,
maxSequenceNumber,
schemaId,
level,
extraFiles,
creationTime,
deleteRowCount,
embeddedIndex,
fileSource,
valueStatsCols,
newExternalPath);
}

public DataFileMeta copy(byte[] newEmbeddedIndex) {
return new DataFileMeta(
fileName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,12 +116,7 @@ public RollingFileWriter<ManifestEntry, ManifestFileMeta> createRollingWriter()
suggestedFileSize);
}

public ManifestEntryWriter createManifestEntryWriter(Path manifestPath) {
return new ManifestEntryWriter(writerFactory, manifestPath, compression);
}

/** Writer for {@link ManifestEntry}. */
public class ManifestEntryWriter extends SingleFileWriter<ManifestEntry, ManifestFileMeta> {
private class ManifestEntryWriter extends SingleFileWriter<ManifestEntry, ManifestFileMeta> {

private final SimpleStatsCollector partitionStatsCollector;
private final SimpleStatsConverter partitionStatsSerializer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,16 @@ public class CloneFileInfo {
private final String filePathExcludeTableRoot;
private final String sourceIdentifier;
private final String targetIdentifier;
private final FileType fileType;

public CloneFileInfo(
String sourceFilePath,
String filePathExcludeTableRoot,
String sourceIdentifier,
String targetIdentifier,
FileType fileType) {
String targetIdentifier) {
this.sourceFilePath = sourceFilePath;
this.filePathExcludeTableRoot = filePathExcludeTableRoot;
this.sourceIdentifier = sourceIdentifier;
this.targetIdentifier = targetIdentifier;
this.fileType = fileType;
}

public String getSourceFilePath() {
Expand All @@ -56,18 +53,10 @@ public String getTargetIdentifier() {
return targetIdentifier;
}

public FileType getFileType() {
return fileType;
}

@Override
public String toString() {
return String.format(
"{ sourceFilePath: %s, filePathExcludeTableRoot: %s, sourceIdentifier: %s, targetIdentifier: %s, fileType: %s }",
sourceFilePath,
filePathExcludeTableRoot,
sourceIdentifier,
targetIdentifier,
fileType);
"{ sourceFilePath: %s, filePathExcludeTableRoot: %s, sourceIdentifier: %s, targetIdentifier: %s }",
sourceFilePath, filePathExcludeTableRoot, sourceIdentifier, targetIdentifier);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,12 @@
package org.apache.paimon.flink.clone;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.FileStore;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Catalog.TableNotExistException;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.FlinkCatalogFactory;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.ManifestFile;
import org.apache.paimon.manifest.ManifestFile.ManifestEntryWriter;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.utils.IOUtils;

Expand All @@ -40,10 +34,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/** A Operator to copy files. */
Expand Down Expand Up @@ -83,6 +74,18 @@ public void processElement(StreamRecord<CloneFileInfo> streamRecord) throws Exce

FileIO sourceTableFileIO = sourceCatalog.fileIO();
FileIO targetTableFileIO = targetCatalog.fileIO();

Path sourceTableRootPath =
srcLocations.computeIfAbsent(
cloneFileInfo.getSourceIdentifier(),
key -> {
try {
return pathOfTable(
sourceCatalog.getTable(Identifier.fromString(key)));
} catch (Catalog.TableNotExistException e) {
throw new RuntimeException(e);
}
});
Path targetTableRootPath =
targetLocations.computeIfAbsent(
cloneFileInfo.getTargetIdentifier(),
Expand Down Expand Up @@ -122,14 +125,9 @@ public void processElement(StreamRecord<CloneFileInfo> streamRecord) throws Exce
if (LOG.isDebugEnabled()) {
LOG.debug("Begin copy file from {} to {}.", sourcePath, targetPath);
}

if (cloneFileInfo.getFileType() == FileType.MANIFEST_FILE) {
copyManifestFile(sourcePath, targetPath, cloneFileInfo);
} else {
IOUtils.copyBytes(
sourceTableFileIO.newInputStream(sourcePath),
targetTableFileIO.newOutputStream(targetPath, true));
}
IOUtils.copyBytes(
sourceTableFileIO.newInputStream(sourcePath),
targetTableFileIO.newOutputStream(targetPath, true));
if (LOG.isDebugEnabled()) {
LOG.debug("End copy file from {} to {}.", sourcePath, targetPath);
}
Expand All @@ -141,36 +139,6 @@ private Path pathOfTable(Table table) {
return new Path(table.options().get(CoreOptions.PATH.key()));
}

private void copyManifestFile(Path sourcePath, Path targetPath, CloneFileInfo cloneFileInfo)
throws IOException, TableNotExistException {
Identifier sourceIdentifier = Identifier.fromString(cloneFileInfo.getSourceIdentifier());
FileStoreTable sourceTable = (FileStoreTable) sourceCatalog.getTable(sourceIdentifier);
FileStore<?> store = sourceTable.store();
ManifestFile manifestFile = store.manifestFileFactory().create();

List<ManifestEntry> manifestEntries =
manifestFile.readWithIOException(sourcePath.getName());
List<ManifestEntry> targetManifestEntries = new ArrayList<>(manifestEntries.size());

// clone job will clone the source path to target warehouse path, so the target external
// path is null
for (ManifestEntry manifestEntry : manifestEntries) {
ManifestEntry newManifestEntry =
new ManifestEntry(
manifestEntry.kind(),
manifestEntry.partition(),
manifestEntry.bucket(),
manifestEntry.totalBuckets(),
manifestEntry.file().copy((String) null));
targetManifestEntries.add(newManifestEntry);
}

ManifestEntryWriter manifestEntryWriter =
manifestFile.createManifestEntryWriter(targetPath);
manifestEntryWriter.write(targetManifestEntries);
manifestEntryWriter.close();
}

@Override
public void close() throws Exception {
if (sourceCatalog != null) {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -115,22 +115,19 @@ private static Schema newSchemaFromTableSchema(TableSchema tableSchema) {
}

private List<CloneFileInfo> toCloneFileInfos(
Map<FileType, List<Path>> filesMap,
List<Path> files,
Path sourceTableRoot,
String sourceIdentifier,
String targetIdentifier) {
List<CloneFileInfo> result = new ArrayList<>();
for (Map.Entry<FileType, List<Path>> entry : filesMap.entrySet()) {
for (Path file : entry.getValue()) {
Path relativePath = getPathExcludeTableRoot(file, sourceTableRoot);
result.add(
new CloneFileInfo(
file.toUri().toString(),
relativePath.toString(),
sourceIdentifier,
targetIdentifier,
entry.getKey()));
}
for (Path file : files) {
Path relativePath = getPathExcludeTableRoot(file, sourceTableRoot);
result.add(
new CloneFileInfo(
file.toUri().toString(),
relativePath.toString(),
sourceIdentifier,
targetIdentifier));
}
return result;
}
Expand Down
Loading

0 comments on commit 3a63857

Please sign in to comment.