diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/SinkTableLineageTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/SinkTableLineageTable.java
new file mode 100644
index 000000000000..72d6f0fe2171
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/table/system/SinkTableLineageTable.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.system;
+
+import org.apache.paimon.lineage.LineageMeta;
+import org.apache.paimon.lineage.LineageMetaFactory;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.source.InnerTableRead;
+
+import java.util.Map;
+
+/**
+ * This is a system table to display all the sink table lineages.
+ *
+ *
+ * For example:
+ * If we select * from sys.sink_table_lineage, we will get
+ * database_name table_name job_name create_time
+ * default test0 job1 2023-10-22 20:35:12
+ * database1 test1 job1 2023-10-28 21:35:52
+ * ... ... ... ...
+ * We can write sql to fetch the information we need.
+ *
+ */
+public class SinkTableLineageTable extends TableLineageTable {
+
+ public static final String SINK_TABLE_LINEAGE = "sink_table_lineage";
+
+ public SinkTableLineageTable(
+ LineageMetaFactory lineageMetaFactory, Map options) {
+ super(lineageMetaFactory, options);
+ }
+
+ @Override
+ public InnerTableRead newRead() {
+ return new TableLineageRead(lineageMetaFactory, options, LineageMeta::sinkTableLineages);
+ }
+
+ @Override
+ public String name() {
+ return SINK_TABLE_LINEAGE;
+ }
+
+ @Override
+ public Table copy(Map dynamicOptions) {
+ return new SinkTableLineageTable(lineageMetaFactory, options);
+ }
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/SourceTableLineageTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/SourceTableLineageTable.java
index 0cac3852feb6..bdd6af65b727 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/system/SourceTableLineageTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/system/SourceTableLineageTable.java
@@ -18,44 +18,13 @@
package org.apache.paimon.table.system;
-import org.apache.paimon.data.BinaryString;
-import org.apache.paimon.data.GenericRow;
-import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.disk.IOManager;
import org.apache.paimon.lineage.LineageMeta;
import org.apache.paimon.lineage.LineageMetaFactory;
-import org.apache.paimon.lineage.TableLineageEntity;
-import org.apache.paimon.options.Options;
-import org.apache.paimon.predicate.Predicate;
-import org.apache.paimon.reader.RecordReader;
-import org.apache.paimon.table.ReadonlyTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.InnerTableRead;
-import org.apache.paimon.table.source.InnerTableScan;
-import org.apache.paimon.table.source.ReadOnceTableScan;
-import org.apache.paimon.table.source.Split;
-import org.apache.paimon.table.source.TableRead;
-import org.apache.paimon.types.DataField;
-import org.apache.paimon.types.RowType;
-import org.apache.paimon.types.TimestampType;
-import org.apache.paimon.types.VarCharType;
-import org.apache.paimon.utils.IteratorRecordReader;
-import org.apache.paimon.utils.ProjectedRow;
-import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators;
-
-import javax.annotation.Nullable;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
import java.util.Map;
-import static org.apache.paimon.utils.Preconditions.checkNotNull;
-
/**
* This is a system table to display all the source table lineages.
*
@@ -69,38 +38,18 @@
* We can write sql to fetch the information we need.
*
*/
-public class SourceTableLineageTable implements ReadonlyTable {
+public class SourceTableLineageTable extends TableLineageTable {
public static final String SOURCE_TABLE_LINEAGE = "source_table_lineage";
- private final LineageMetaFactory lineageMetaFactory;
- private final Map options;
-
public SourceTableLineageTable(
LineageMetaFactory lineageMetaFactory, Map options) {
- this.lineageMetaFactory = lineageMetaFactory;
- this.options = options;
- }
-
- @Override
- public InnerTableScan newScan() {
- return new ReadOnceTableScan() {
- @Override
- public InnerTableScan withFilter(Predicate predicate) {
- return this;
- }
-
- @Override
- protected Plan innerPlan() {
- /// TODO get the real row count for plan.
- return () -> Collections.singletonList((Split) () -> 1L);
- }
- };
+ super(lineageMetaFactory, options);
}
@Override
public InnerTableRead newRead() {
- return new SourceTableLineageRead(lineageMetaFactory, options);
+ return new TableLineageRead(lineageMetaFactory, options, LineageMeta::sourceTableLineages);
}
@Override
@@ -108,83 +57,8 @@ public String name() {
return SOURCE_TABLE_LINEAGE;
}
- @Override
- public RowType rowType() {
- List fields = new ArrayList<>();
- fields.add(new DataField(0, "database_name", new VarCharType(VarCharType.MAX_LENGTH)));
- fields.add(new DataField(1, "table_name", new VarCharType(VarCharType.MAX_LENGTH)));
- fields.add(new DataField(2, "job_name", new VarCharType(VarCharType.MAX_LENGTH)));
- fields.add(new DataField(3, "create_time", new TimestampType()));
- return new RowType(fields);
- }
-
- @Override
- public List primaryKeys() {
- return Arrays.asList("database_name", "table_name", "job_name");
- }
-
@Override
public Table copy(Map dynamicOptions) {
return new SourceTableLineageTable(lineageMetaFactory, options);
}
-
- /** Source table lineage read. */
- private static class SourceTableLineageRead implements InnerTableRead {
- private final LineageMetaFactory lineageMetaFactory;
- private final Map options;
- @Nullable private Predicate predicate;
- private int[][] projection;
-
- private SourceTableLineageRead(
- LineageMetaFactory lineageMetaFactory, Map options) {
- this.lineageMetaFactory = lineageMetaFactory;
- this.options = options;
- this.predicate = null;
- }
-
- @Override
- public InnerTableRead withFilter(Predicate predicate) {
- this.predicate = predicate;
- return this;
- }
-
- @Override
- public InnerTableRead withProjection(int[][] projection) {
- this.projection = projection;
- return this;
- }
-
- @Override
- public TableRead withIOManager(IOManager ioManager) {
- return this;
- }
-
- @Override
- public RecordReader createReader(Split split) throws IOException {
- try (LineageMeta lineageMeta =
- lineageMetaFactory.create(() -> Options.fromMap(options))) {
- Iterator sourceTableLineages =
- lineageMeta.sourceTableLineages(predicate);
- return new IteratorRecordReader<>(
- Iterators.transform(
- sourceTableLineages,
- entity -> {
- checkNotNull(entity);
- GenericRow row =
- GenericRow.of(
- BinaryString.fromString(entity.getDatabase()),
- BinaryString.fromString(entity.getTable()),
- BinaryString.fromString(entity.getJob()),
- entity.getCreateTime());
- if (projection != null) {
- return ProjectedRow.from(projection).replaceRow(row);
- } else {
- return row;
- }
- }));
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
- }
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java b/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java
index 91c7b92f7983..3079c728441b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java
@@ -39,6 +39,7 @@
import static org.apache.paimon.table.system.OptionsTable.OPTIONS;
import static org.apache.paimon.table.system.PartitionsTable.PARTITIONS;
import static org.apache.paimon.table.system.SchemasTable.SCHEMAS;
+import static org.apache.paimon.table.system.SinkTableLineageTable.SINK_TABLE_LINEAGE;
import static org.apache.paimon.table.system.SnapshotsTable.SNAPSHOTS;
import static org.apache.paimon.table.system.SourceTableLineageTable.SOURCE_TABLE_LINEAGE;
import static org.apache.paimon.table.system.TagsTable.TAGS;
@@ -95,6 +96,15 @@ public static Table loadGlobal(
LINEAGE_META.key()));
return new SourceTableLineageTable(lineageMetaFactory, catalogOptions);
}
+ case SINK_TABLE_LINEAGE:
+ {
+ checkNotNull(
+ lineageMetaFactory,
+ String.format(
+ "Lineage meta should be configured for catalog with %s",
+ LINEAGE_META.key()));
+ return new SinkTableLineageTable(lineageMetaFactory, catalogOptions);
+ }
default:
return null;
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/TableLineageTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/TableLineageTable.java
new file mode 100644
index 000000000000..3f43a764b47e
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/table/system/TableLineageTable.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.system;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.disk.IOManager;
+import org.apache.paimon.lineage.LineageMeta;
+import org.apache.paimon.lineage.LineageMetaFactory;
+import org.apache.paimon.lineage.TableLineageEntity;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.table.ReadonlyTable;
+import org.apache.paimon.table.source.InnerTableRead;
+import org.apache.paimon.table.source.InnerTableScan;
+import org.apache.paimon.table.source.ReadOnceTableScan;
+import org.apache.paimon.table.source.Split;
+import org.apache.paimon.table.source.TableRead;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.types.TimestampType;
+import org.apache.paimon.types.VarCharType;
+import org.apache.paimon.utils.IteratorRecordReader;
+import org.apache.paimon.utils.ProjectedRow;
+
+import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.function.BiFunction;
+
+import static org.apache.paimon.utils.Preconditions.checkNotNull;
+
+/** Base lineage table for source and sink table lineage. */
+public abstract class TableLineageTable implements ReadonlyTable {
+ protected final LineageMetaFactory lineageMetaFactory;
+ protected final Map options;
+
+ protected TableLineageTable(
+ LineageMetaFactory lineageMetaFactory, Map options) {
+ this.lineageMetaFactory = lineageMetaFactory;
+ this.options = options;
+ }
+
+ @Override
+ public InnerTableScan newScan() {
+ return new ReadOnceTableScan() {
+ @Override
+ public InnerTableScan withFilter(Predicate predicate) {
+ return this;
+ }
+
+ @Override
+ protected Plan innerPlan() {
+ /// TODO get the real row count for plan.
+ return () -> Collections.singletonList((Split) () -> 1L);
+ }
+ };
+ }
+
+ @Override
+ public RowType rowType() {
+ List fields = new ArrayList<>();
+ fields.add(new DataField(0, "database_name", new VarCharType(VarCharType.MAX_LENGTH)));
+ fields.add(new DataField(1, "table_name", new VarCharType(VarCharType.MAX_LENGTH)));
+ fields.add(new DataField(2, "job_name", new VarCharType(VarCharType.MAX_LENGTH)));
+ fields.add(new DataField(3, "create_time", new TimestampType()));
+ return new RowType(fields);
+ }
+
+ @Override
+ public List primaryKeys() {
+ return Arrays.asList("database_name", "table_name", "job_name");
+ }
+
+ /** Table lineage read with lineage meta query. */
+ protected static class TableLineageRead implements InnerTableRead {
+ private final LineageMetaFactory lineageMetaFactory;
+ private final Map options;
+ private final BiFunction>
+ tableLineageQuery;
+ @Nullable private Predicate predicate;
+ private int[][] projection;
+
+ protected TableLineageRead(
+ LineageMetaFactory lineageMetaFactory,
+ Map options,
+ BiFunction>
+ tableLineageQuery) {
+ this.lineageMetaFactory = lineageMetaFactory;
+ this.options = options;
+ this.tableLineageQuery = tableLineageQuery;
+ this.predicate = null;
+ }
+
+ @Override
+ public InnerTableRead withFilter(Predicate predicate) {
+ this.predicate = predicate;
+ return this;
+ }
+
+ @Override
+ public InnerTableRead withProjection(int[][] projection) {
+ this.projection = projection;
+ return this;
+ }
+
+ @Override
+ public TableRead withIOManager(IOManager ioManager) {
+ return this;
+ }
+
+ @Override
+ public RecordReader createReader(Split split) throws IOException {
+ try (LineageMeta lineageMeta =
+ lineageMetaFactory.create(() -> Options.fromMap(options))) {
+ Iterator sourceTableLineages =
+ tableLineageQuery.apply(lineageMeta, predicate);
+ return new IteratorRecordReader<>(
+ Iterators.transform(
+ sourceTableLineages,
+ entity -> {
+ checkNotNull(entity);
+ GenericRow row =
+ GenericRow.of(
+ BinaryString.fromString(entity.getDatabase()),
+ BinaryString.fromString(entity.getTable()),
+ BinaryString.fromString(entity.getJob()),
+ entity.getCreateTime());
+ if (projection != null) {
+ return ProjectedRow.from(projection).replaceRow(row);
+ } else {
+ return row;
+ }
+ }));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+}
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkLineageITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkLineageITCase.java
index 0625facbfe60..5b61d5272f80 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkLineageITCase.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkLineageITCase.java
@@ -48,6 +48,8 @@ public class FlinkLineageITCase extends CatalogITCaseBase {
private static final String THROWING_META = "throwing-meta";
private static final Map> jobSourceTableLineages =
new HashMap<>();
+ private static final Map> jobSinkTableLineages =
+ new HashMap<>();
@Override
protected List ddl() {
@@ -72,10 +74,24 @@ public void testTableLineage() throws Exception {
// Call storeSinkTableLineage and storeSourceTableLineage methods
tEnv.getConfig().getConfiguration().set(PipelineOptions.NAME, "insert_t_job");
- assertThatThrownBy(
- () -> tEnv.executeSql("INSERT INTO T VALUES (1, 2, 3),(4, 5, 6);").await())
- .hasRootCauseInstanceOf(UnsupportedOperationException.class)
- .hasRootCauseMessage("Method saveSinkTableLineage is not supported");
+ tEnv.executeSql("INSERT INTO T VALUES (1, 2, 3),(4, 5, 6);").await();
+ assertThat(jobSinkTableLineages).isNotEmpty();
+ TableLineageEntity sinkTableLineage =
+ jobSinkTableLineages.get("insert_t_job").get("default.T.insert_t_job");
+ assertThat(sinkTableLineage.getTable()).isEqualTo("T");
+
+ List sinkTableRows = new ArrayList<>();
+ try (CloseableIterator iterator =
+ tEnv.executeSql("SELECT * FROM sys.sink_table_lineage").collect()) {
+ while (iterator.hasNext()) {
+ sinkTableRows.add(iterator.next());
+ }
+ }
+ assertThat(sinkTableRows.size()).isEqualTo(1);
+ Row sinkTableRow = sinkTableRows.get(0);
+ assertThat(sinkTableRow.getField("database_name")).isEqualTo("default");
+ assertThat(sinkTableRow.getField("table_name")).isEqualTo("T");
+ assertThat(sinkTableRow.getField("job_name")).isEqualTo("insert_t_job");
tEnv.getConfig().getConfiguration().set(PipelineOptions.NAME, "select_t_job");
tEnv.executeSql("SELECT * FROM T").collect().close();
@@ -84,18 +100,18 @@ public void testTableLineage() throws Exception {
jobSourceTableLineages.get("select_t_job").get("default.T.select_t_job");
assertThat(sourceTableLineage.getTable()).isEqualTo("T");
- List lineageRows = new ArrayList<>();
+ List sourceTableRows = new ArrayList<>();
try (CloseableIterator iterator =
tEnv.executeSql("SELECT * FROM sys.source_table_lineage").collect()) {
while (iterator.hasNext()) {
- lineageRows.add(iterator.next());
+ sourceTableRows.add(iterator.next());
}
}
- assertThat(lineageRows.size()).isEqualTo(1);
- Row lineageRow = lineageRows.get(0);
- assertThat(lineageRow.getField("database_name")).isEqualTo("default");
- assertThat(lineageRow.getField("table_name")).isEqualTo("T");
- assertThat(lineageRow.getField("job_name")).isEqualTo("select_t_job");
+ assertThat(sourceTableRows.size()).isEqualTo(1);
+ Row sourceTableRow = sourceTableRows.get(0);
+ assertThat(sourceTableRow.getField("database_name")).isEqualTo("default");
+ assertThat(sourceTableRow.getField("table_name")).isEqualTo("T");
+ assertThat(sourceTableRow.getField("job_name")).isEqualTo("select_t_job");
}
private static String getTableLineageKey(TableLineageEntity entity) {
@@ -144,17 +160,21 @@ public void saveSinkTableLineage(TableLineageEntity entity) {
assertThat(entity.getJob()).isEqualTo("insert_t_job");
assertThat(entity.getTable()).isEqualTo("T");
assertThat(entity.getDatabase()).isEqualTo("default");
- throw new UnsupportedOperationException("Method saveSinkTableLineage is not supported");
+ jobSinkTableLineages
+ .computeIfAbsent(entity.getJob(), key -> new HashMap<>())
+ .put(getTableLineageKey(entity), entity);
}
@Override
public Iterator sinkTableLineages(@Nullable Predicate predicate) {
- throw new UnsupportedOperationException();
+ return jobSinkTableLineages.values().stream()
+ .flatMap(v -> v.values().stream())
+ .iterator();
}
@Override
public void deleteSinkTableLineage(String job) {
- throw new UnsupportedOperationException();
+ jobSinkTableLineages.remove(job);
}
@Override