Skip to content

Commit

Permalink
Fix clone table.
Browse files Browse the repository at this point in the history
  • Loading branch information
zhongyujiang committed Aug 5, 2024
1 parent f4b1589 commit 27eaf8d
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -337,13 +337,4 @@ public Schema build() {
return new Schema(columns, partitionKeys, primaryKeys, options, comment);
}
}

public static Schema fromTableSchema(TableSchema tableSchema) {
return new Schema(
tableSchema.fields(),
tableSchema.partitionKeys(),
tableSchema.primaryKeys(),
tableSchema.options(),
tableSchema.comment());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,21 @@

package org.apache.paimon.flink.clone;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.FlinkCatalogFactory;
import org.apache.paimon.fs.Path;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.Preconditions;

import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;

import com.google.common.collect.Iterables;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
Expand All @@ -37,6 +43,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;

/**
* Pick the files to be cloned of a table based on the input record. The record type it produce is
Expand Down Expand Up @@ -77,7 +84,7 @@ public void processElement(StreamRecord<Tuple2<String, String>> streamRecord) th
FileStoreTable sourceTable = (FileStoreTable) sourceCatalog.getTable(sourceIdentifier);
targetCatalog.createDatabase(targetIdentifier.getDatabaseName(), true);
targetCatalog.createTable(
targetIdentifier, Schema.fromTableSchema(sourceTable.schema()), true);
targetIdentifier, newSchemaFromTableSchema(sourceTable.schema()), true);

List<CloneFileInfo> result =
toCloneFileInfos(
Expand All @@ -95,6 +102,18 @@ public void processElement(StreamRecord<Tuple2<String, String>> streamRecord) th
}
}

private static Schema newSchemaFromTableSchema(TableSchema tableSchema) {
return new Schema(
ImmutableList.copyOf(tableSchema.fields()),
ImmutableList.copyOf(tableSchema.partitionKeys()),
ImmutableList.copyOf(tableSchema.primaryKeys()),
ImmutableMap.copyOf(
Iterables.filter(
tableSchema.options().entrySet(),
entry -> !Objects.equals(entry.getKey(), CoreOptions.PATH.key()))),
tableSchema.comment());
}

private List<CloneFileInfo> toCloneFileInfos(
List<Path> files,
Path sourceTableRoot,
Expand Down

0 comments on commit 27eaf8d

Please sign in to comment.