Skip to content

Commit

Permalink
[lineage] Introduce jdbc lineage meta implementations
Browse files Browse the repository at this point in the history
  • Loading branch information
FangYongs committed Nov 6, 2023
1 parent a7047f0 commit de38dc2
Show file tree
Hide file tree
Showing 11 changed files with 372 additions and 21 deletions.
8 changes: 8 additions & 0 deletions paimon-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,14 @@ under the License.
</exclusion>
</exclusions>
</dependency>

<!-- For jdbc lineage meta tests -->
<dependency>
<groupId>org.apache.derby</groupId>
<artifactId>derby</artifactId>
<version>10.14.2.0</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.lineage;

import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.EncodingUtils;
import org.apache.paimon.utils.StringUtils;

import javax.annotation.Nullable;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

import static org.apache.paimon.lineage.LineageMetaUtils.SINK_TABLE_LINEAGE;
import static org.apache.paimon.lineage.LineageMetaUtils.SOURCE_TABLE_LINEAGE;
import static org.apache.paimon.lineage.LineageMetaUtils.tableLineagePrimaryKeys;
import static org.apache.paimon.lineage.LineageMetaUtils.tableLineageRowType;
import static org.apache.paimon.options.CatalogOptions.JDBC_AUTO_DDL;
import static org.apache.paimon.options.CatalogOptions.JDBC_PASSWORD;
import static org.apache.paimon.options.CatalogOptions.JDBC_URL;
import static org.apache.paimon.options.CatalogOptions.JDBC_USER;

/** Use jdbc to Paimon meta inforation such as table and data lineage. */
public class JdbcLineageMeta implements LineageMeta {
private final Connection connection;
private final Statement statement;

public JdbcLineageMeta(Options options) throws Exception {
String url = options.get(JDBC_URL);
String user = options.getOptional(JDBC_USER).orElse(null);
String password = options.getOptional(JDBC_PASSWORD).orElse(null);

this.connection =
user != null && password != null
? DriverManager.getConnection(url, user, password)
: DriverManager.getConnection(url);
this.statement = connection.createStatement();

if (options.get(JDBC_AUTO_DDL)) {
initializeTables();
}
}

private void initializeTables() throws Exception {
String sourceTableLineageSql =
buildDDL(SOURCE_TABLE_LINEAGE, tableLineageRowType(), tableLineagePrimaryKeys());
statement.execute(sourceTableLineageSql);

String sinkTableLineageSql =
buildDDL(SINK_TABLE_LINEAGE, tableLineageRowType(), tableLineagePrimaryKeys());
statement.execute(sinkTableLineageSql);
}

private String buildDDL(String tableName, RowType rowType, List<String> primaryKeys) {
List<String> fieldSqlList = new ArrayList<>();
for (DataField dataField : rowType.getFields()) {
fieldSqlList.add(
dataField
.asSQLString()
.replace(
EncodingUtils.escapeIdentifier(dataField.name()),
dataField.name())
.replaceAll("TIMESTAMP\\([0-9]+\\)", "TIMESTAMP"));
}
if (!primaryKeys.isEmpty()) {
fieldSqlList.add(
String.format(
"PRIMARY KEY(%s)", StringUtils.join(primaryKeys.iterator(), ",")));
}
return String.format(
"CREATE TABLE %s ( %s )",
tableName, StringUtils.join(fieldSqlList.iterator(), ",\n"));
}

@Override
public void saveSourceTableLineage(TableLineageEntity entity) {
throw new UnsupportedOperationException();
}

@Override
public void deleteSourceTableLineage(String job) {
throw new UnsupportedOperationException();
}

@Override
public Iterator<TableLineageEntity> sourceTableLineages(@Nullable Predicate predicate) {
throw new UnsupportedOperationException();
}

@Override
public void saveSinkTableLineage(TableLineageEntity entity) {
throw new UnsupportedOperationException();
}

@Override
public Iterator<TableLineageEntity> sinkTableLineages(@Nullable Predicate predicate) {
throw new UnsupportedOperationException();
}

@Override
public void deleteSinkTableLineage(String job) {
throw new UnsupportedOperationException();
}

@Override
public void saveSourceDataLineage(DataLineageEntity entity) {
throw new UnsupportedOperationException();
}

@Override
public Iterator<DataLineageEntity> sourceDataLineages(@Nullable Predicate predicate) {
throw new UnsupportedOperationException();
}

@Override
public void saveSinkDataLineage(DataLineageEntity entity) {
throw new UnsupportedOperationException();
}

@Override
public Iterator<DataLineageEntity> sinkDataLineages(@Nullable Predicate predicate) {
throw new UnsupportedOperationException();
}

@VisibleForTesting
Statement statement() {
return statement;
}

@Override
public void close() throws Exception {
this.statement.close();
this.connection.close();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.lineage;

/** Factory to create jdbc lineage meta. */
public class JdbcLineageMetaFactory implements LineageMetaFactory {
private static final String IDENTIFIER = "jdbc";

@Override
public String identifier() {
return IDENTIFIER;
}

@Override
public LineageMeta create(LineageMetaContext context) {
try {
return new JdbcLineageMeta(context.options());
} catch (Exception e) {
throw new RuntimeException("Initialize jdbc lineage meta failed", e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.lineage;

import org.apache.paimon.types.DataField;
import org.apache.paimon.types.RowType;
import org.apache.paimon.types.TimestampType;
import org.apache.paimon.types.VarCharType;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

public class LineageMetaUtils {
private static final int MAX_VARCHAR_LENGTH = 10240;

public static final String SOURCE_TABLE_LINEAGE = "source_table_lineage";

public static final String SINK_TABLE_LINEAGE = "sink_table_lineage";

public static RowType tableLineageRowType() {
List<DataField> fields = new ArrayList<>();
fields.add(new DataField(0, "database_name", new VarCharType(MAX_VARCHAR_LENGTH)));
fields.add(new DataField(1, "table_name", new VarCharType(MAX_VARCHAR_LENGTH)));
fields.add(new DataField(2, "job_name", new VarCharType(MAX_VARCHAR_LENGTH)));
fields.add(new DataField(3, "create_time", new TimestampType()));
return new RowType(fields);
}

public static List<String> tableLineagePrimaryKeys() {
return Arrays.asList("database_name", "table_name", "job_name");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -97,4 +97,31 @@ public class CatalogOptions {
TextElement.text(
"\"custom\": You can implement LineageMetaFactory and LineageMeta to store lineage information in customized storage."))
.build());

public static final ConfigOption<String> JDBC_URL =
key("jdbc.url")
.stringType()
.noDefaultValue()
.withDescription(
"Jdbc url for Paimon to store some information such as lineage and meta.");

public static final ConfigOption<String> JDBC_USER =
key("jdbc.user")
.stringType()
.noDefaultValue()
.withDescription(
"Jdbc user name for Paimon to store some information such as lineage and meta.");

public static final ConfigOption<String> JDBC_PASSWORD =
key("jdbc.password")
.stringType()
.noDefaultValue()
.withDescription(
"Jdbc password for Paimon to store some information such as lineage and meta.");

public static final ConfigOption<Boolean> JDBC_AUTO_DDL =
key("jdbc.auto-ddl")
.booleanType()
.defaultValue(false)
.withDescription("If true, jdbc will create tables automatically.");
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.lineage;

import org.apache.paimon.options.Options;

import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

import java.sql.ResultSet;
import java.sql.Statement;
import java.util.HashMap;
import java.util.Map;

import static org.apache.paimon.lineage.LineageMetaUtils.SINK_TABLE_LINEAGE;
import static org.apache.paimon.lineage.LineageMetaUtils.SOURCE_TABLE_LINEAGE;
import static org.apache.paimon.options.CatalogOptions.JDBC_AUTO_DDL;
import static org.apache.paimon.options.CatalogOptions.JDBC_URL;
import static org.apache.paimon.options.CatalogOptions.LINEAGE_META;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

/** Tests for jdbc lineage meta. */
class JdbcLineageMetaTest {
private static final String DB_NAME = "lineage_meta";
private static final String URL = "jdbc:derby:memory:" + DB_NAME + ";create=true";

@BeforeAll
static void setUp() throws Exception {
Class.forName("org.apache.derby.jdbc.EmbeddedDriver").newInstance();
}

@Test
void testLineageMetaDDL() throws Exception {
Map<String, String> config = new HashMap<>();
config.put(LINEAGE_META.key(), "jdbc");
config.put(JDBC_URL.key(), URL);
config.put(JDBC_AUTO_DDL.key(), "true");

JdbcLineageMetaFactory factory = new JdbcLineageMetaFactory();
try (JdbcLineageMeta lineageMeta =
(JdbcLineageMeta) factory.create(() -> Options.fromMap(config))) {
Statement statement = lineageMeta.statement();
assertThatThrownBy(
() ->
statement.executeQuery(
String.format("SELECT * FROM %s", "not_exist_table")))
.hasMessage("Table/View 'NOT_EXIST_TABLE' does not exist.");

// Validate source and sink tables are existing.
try (ResultSet resultSet =
statement.executeQuery(
String.format("SELECT * FROM %s", SOURCE_TABLE_LINEAGE))) {
assertThat(resultSet.next()).isFalse();
}
try (ResultSet resultSet =
statement.executeQuery(String.format("SELECT * FROM %s", SINK_TABLE_LINEAGE))) {
assertThat(resultSet.next()).isFalse();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@

import java.util.Map;

import static org.apache.paimon.lineage.LineageMetaUtils.SINK_TABLE_LINEAGE;

/**
* This is a system table to display all the sink table lineages.
*
Expand All @@ -40,8 +42,6 @@
*/
public class SinkTableLineageTable extends TableLineageTable {

public static final String SINK_TABLE_LINEAGE = "sink_table_lineage";

public SinkTableLineageTable(
LineageMetaFactory lineageMetaFactory, Map<String, String> options) {
super(lineageMetaFactory, options);
Expand Down
Loading

0 comments on commit de38dc2

Please sign in to comment.