Skip to content

Commit

Permalink
init commit for flink sql quality check
Browse files Browse the repository at this point in the history
  • Loading branch information
izhangzhihao committed Dec 24, 2023
1 parent f6df093 commit 78799e4
Show file tree
Hide file tree
Showing 15 changed files with 387 additions and 136 deletions.
2 changes: 1 addition & 1 deletion core/src/main/resources/db/flink/migration/V1__init.sql
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ create table if not exists `sharp_etl`.job_log

create table if not exists `sharp_etl`.quality_check_log
(
id bigint,
id string,
job_id string not null,
job_name string not null comment 'job name(workflow_name + period)',
`column` string not null comment 'issue column name',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,105 +159,74 @@ trait QualityCheck[DataFrame] extends Serializable {
def dropView(tempViewName: String): Unit

def dropUnusedCols(df: DataFrame, cols: String): DataFrame
}
// scalastyle:on

object QualityCheck {
val DELIMITER = "__"
val DEFAULT_TOP_N = 1000

def checkSql(tempViewName: String, resultView: String, dataQualityCheckMapping: Seq[DataQualityConfig], idColumn: String): String = {
def windowByPkSql(tempViewName: String, idColumns: String, sortColumns: String = "", desc: Boolean = true): String = {
s"""
|CREATE TEMPORARY VIEW $resultView
|AS SELECT ${joinIdColumns(idColumn)} as id,
| flatten(ARRAY(${generateWarnCases(dataQualityCheckMapping)}
| )) as warn_result,
| flatten(ARRAY(${generateErrorCases(dataQualityCheckMapping)}
| )) as error_result
|FROM `$tempViewName`
""".stripMargin
}

def emptyArrayIfMissing(query: String): String = {
if (query.trim == "") {
"array()"
} else {
query
}
|SELECT *
|FROM (SELECT *, ROW_NUMBER()
| OVER (PARTITION BY $idColumns
| ORDER BY ${if (isNullOrEmpty(sortColumns)) "1" else sortColumns} ${if (desc) "DESC" else "ASC"}) as __row_num
| FROM $tempViewName
|) WHERE __row_num = 1""".stripMargin
}

def generateWarnCases(dataQualityCheckMapping: Seq[DataQualityConfig]): String = {
emptyArrayIfMissing(dataQualityCheckMapping
.filter(_.errorType == ErrorType.warn)
.map(it => s"""CASE WHEN ${it.rule} THEN array("${it.dataCheckType}${DELIMITER}${it.column}") ELSE array() END""")
.mkString(",\n\t\t\t\t")
)
}

def generateErrorCases(dataQualityCheckMapping: Seq[DataQualityConfig]): String = {
emptyArrayIfMissing(dataQualityCheckMapping
.filter(_.errorType == ErrorType.error)
.map(it => s"""CASE WHEN ${it.rule} THEN array("${it.dataCheckType}${DELIMITER}${it.column}") ELSE array() END""")
.mkString(",\n\t\t\t\t")
)
def windowByPkSqlErrors(tempViewName: String, idColumns: String, sortColumns: String = "", desc: Boolean = true): String = {
s"""
|SELECT ${joinIdColumns(idColumns)} as id,
| ARRAY('Duplicated PK check$DELIMITER$idColumns') as error_result
|FROM (SELECT *, ROW_NUMBER()
| OVER (PARTITION BY $idColumns
| ORDER BY ${if (isNullOrEmpty(sortColumns)) "1" else sortColumns} ${if (desc) "DESC" else "ASC"}) as __row_num
| FROM $tempViewName
|) WHERE __row_num > 1""".stripMargin
}

def generateWarnUnions(dataQualityCheckMapping: Seq[DataQualityConfig], topN: Int, view: String): String = {
def generateErrorUnions(dataQualityCheckMapping: Seq[DataQualityConfig], topN: Int, view: String): String = {
dataQualityCheckMapping
.filter(_.errorType == ErrorType.warn)
.filter(_.errorType == ErrorType.error)
.map(it =>
s"""(SELECT
| "${it.column}" as column,
| "${it.dataCheckType}" as dataCheckType,
| arrayJoin(top(collect_list(string(id)), $topN), ',') as ids,
| "${it.errorType}" as errorType,
| count(*) as warnCount,
| 0 as errorCount
| 0 as warnCount,
| count(*) as errorCount
|FROM `$view`
|WHERE array_contains(warn_result, "${it.dataCheckType}${DELIMITER}${it.column}")
|WHERE array_contains(error_result, "${it.dataCheckType}${DELIMITER}${it.column}")
|)""".stripMargin
)
.mkString("\nUNION ALL\n")
}

def generateErrorUnions(dataQualityCheckMapping: Seq[DataQualityConfig], topN: Int, view: String): String = {
def generateWarnUnions(dataQualityCheckMapping: Seq[DataQualityConfig], topN: Int, view: String): String = {
dataQualityCheckMapping
.filter(_.errorType == ErrorType.error)
.filter(_.errorType == ErrorType.warn)
.map(it =>
s"""(SELECT
| "${it.column}" as column,
| "${it.dataCheckType}" as dataCheckType,
| arrayJoin(top(collect_list(string(id)), $topN), ',') as ids,
| "${it.errorType}" as errorType,
| 0 as warnCount,
| count(*) as errorCount
| count(*) as warnCount,
| 0 as errorCount
|FROM `$view`
|WHERE array_contains(error_result, "${it.dataCheckType}${DELIMITER}${it.column}")
|WHERE array_contains(warn_result, "${it.dataCheckType}${DELIMITER}${it.column}")
|)""".stripMargin
)
.mkString("\nUNION ALL\n")
}

def joinIdColumns(idColumn: String, prefix: String = ""): String = {
val realPrefix = if (isNullOrEmpty(prefix)) "" else s"`$prefix`."
if (idColumn.contains(",")) {
idColumn.split(",").map(it => s"ifnull($realPrefix`${it.trim}`, 'NULL')").mkString("CONCAT(", s", '$DELIMITER', ", ")")
} else {
s"$realPrefix`$idColumn`"
}
}

def joinOnConditions(resultView: String, tempViewName: String, idColumn: String): String = {
idColumn.split(",").map(_.trim).zipWithIndex.map { case (column: String, idx: Int) =>
s"""`$tempViewName`.`$column` = split(`$resultView`.id, '$DELIMITER')[$idx]"""
}.mkString(" AND \n\t")
}

def antiJoinSql(idColumn: String, tempViewName: String, resultView: String): String = {
s"""|LEFT ANTI JOIN (
| SELECT id FROM `$resultView`
| WHERE size(error_result) > 0
|) bad_ids ON bad_ids.id = ${joinIdColumns(idColumn, tempViewName)}
def checkSql(tempViewName: String, resultView: String, dataQualityCheckMapping: Seq[DataQualityConfig], idColumn: String): String = {
s"""
|CREATE TEMPORARY VIEW $resultView
|AS SELECT ${joinIdColumns(idColumn)} as id,
| flatten(ARRAY(${generateWarnCases(dataQualityCheckMapping)}
| )) as warn_result,
| flatten(ARRAY(${generateErrorCases(dataQualityCheckMapping)}
| )) as error_result
|FROM `$tempViewName`
""".stripMargin
}

Expand All @@ -267,16 +236,16 @@ object QualityCheck {
StringUtil.EMPTY
} else {
udrWithViews.map { case (udr, viewName) =>
s"""
|(SELECT "${udr.column}" as column,
| "${udr.dataCheckType}" as dataCheckType,
| arrayJoin(top(collect_list(string(id)), $topN), ',') as ids,
| "${udr.errorType}" as errorType,
| count(*) as warnCount,
| 0 as errorCount
|FROM $viewName)
|""".stripMargin
}
s"""
|(SELECT "${udr.column}" as column,
| "${udr.dataCheckType}" as dataCheckType,
| arrayJoin(top(collect_list(string(id)), $topN), ',') as ids,
| "${udr.errorType}" as errorType,
| count(*) as warnCount,
| 0 as errorCount
|FROM $viewName)
|""".stripMargin
}
.mkString("\nUNION ALL\n")
}
}
Expand All @@ -287,20 +256,29 @@ object QualityCheck {
StringUtil.EMPTY
} else {
udrWithViews.map { case (udr, viewName) =>
s"""
|(SELECT "${udr.column}" as column,
| "${udr.dataCheckType}" as dataCheckType,
| arrayJoin(top(collect_list(string(id)), $topN), ',') as ids,
| "${udr.errorType}" as errorType,
| 0 as warnCount,
| count(*) as errorCount
|FROM $viewName)
|""".stripMargin
}
s"""
|(SELECT "${udr.column}" as column,
| "${udr.dataCheckType}" as dataCheckType,
| arrayJoin(top(collect_list(string(id)), $topN), ',') as ids,
| "${udr.errorType}" as errorType,
| 0 as warnCount,
| count(*) as errorCount
|FROM $viewName)
|""".stripMargin
}
.mkString("\nUNION ALL\n")
}
}


def antiJoinSql(idColumn: String, tempViewName: String, resultView: String): String = {
s"""|LEFT ANTI JOIN (
| SELECT id FROM `$resultView`
| WHERE size(error_result) > 0
|) bad_ids ON bad_ids.id = ${joinIdColumns(idColumn, tempViewName)}
""".stripMargin
}

def udrAntiJoinSql(idColumn: String, tempViewName: String, viewNames: Seq[String]): String = {
if (viewNames.isEmpty) {
StringUtil.EMPTY
Expand All @@ -311,6 +289,51 @@ object QualityCheck {
|""".stripMargin
}
}
}
// scalastyle:on

object QualityCheck {
val DELIMITER = "__"
val DEFAULT_TOP_N = 1000

def emptyArrayIfMissing(query: String): String = {
if (query.trim == "") {
"array()"
} else {
query
}
}

def generateWarnCases(dataQualityCheckMapping: Seq[DataQualityConfig]): String = {
emptyArrayIfMissing(dataQualityCheckMapping
.filter(_.errorType == ErrorType.warn)
.map(it => s"""CASE WHEN ${it.rule} THEN array("${it.dataCheckType}${DELIMITER}${it.column}") ELSE array() END""")
.mkString(",\n\t\t\t\t")
)
}

def generateErrorCases(dataQualityCheckMapping: Seq[DataQualityConfig]): String = {
emptyArrayIfMissing(dataQualityCheckMapping
.filter(_.errorType == ErrorType.error)
.map(it => s"""CASE WHEN ${it.rule} THEN array("${it.dataCheckType}${DELIMITER}${it.column}") ELSE array() END""")
.mkString(",\n\t\t\t\t")
)
}

def joinIdColumns(idColumn: String, prefix: String = ""): String = {
val realPrefix = if (isNullOrEmpty(prefix)) "" else s"`$prefix`."
if (idColumn.contains(",")) {
idColumn.split(",").map(it => s"ifnull($realPrefix`${it.trim}`, 'NULL')").mkString("CONCAT(", s", '$DELIMITER', ", ")")
} else {
s"$realPrefix`$idColumn`"
}
}

def joinOnConditions(resultView: String, tempViewName: String, idColumn: String): String = {
idColumn.split(",").map(_.trim).zipWithIndex.map { case (column: String, idx: Int) =>
s"""`$tempViewName`.`$column` = split(`$resultView`.id, '$DELIMITER')[$idx]"""
}.mkString(" AND \n\t")
}

def generateAntiJoinSql(sql: String, udrSql: String, tempViewName: String): String = {
if (isNullOrEmpty(sql) && isNullOrEmpty(udrSql)) {
Expand All @@ -322,25 +345,4 @@ object QualityCheck {
""".stripMargin
}
}

def windowByPkSql(tempViewName: String, idColumns: String, sortColumns: String = "", desc: Boolean = true): String = {
s"""
|SELECT *
|FROM (SELECT *, ROW_NUMBER()
| OVER (PARTITION BY $idColumns
| ORDER BY ${if (isNullOrEmpty(sortColumns)) "1" else sortColumns} ${if (desc) "DESC" else "ASC"}) as __row_num
| FROM $tempViewName
|) WHERE __row_num = 1""".stripMargin
}

def windowByPkSqlErrors(tempViewName: String, idColumns: String, sortColumns: String = "", desc: Boolean = true): String = {
s"""
|SELECT ${joinIdColumns(idColumns)} as id,
| ARRAY('Duplicated PK check$DELIMITER$idColumns') as error_result
|FROM (SELECT *, ROW_NUMBER()
| OVER (PARTITION BY $idColumns
| ORDER BY ${if (isNullOrEmpty(sortColumns)) "1" else sortColumns} ${if (desc) "DESC" else "ASC"}) as __row_num
| FROM $tempViewName
|) WHERE __row_num > 1""".stripMargin
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package com.github.sharpdata.sharpetl.core.quality
final case class QualityCheckRule(dataCheckType: String, rule: String, errorType: String) {
def withColumn(column: String): DataQualityConfig = {
if (rule.contains("$")) {
DataQualityConfig(column, dataCheckType, rule.replace("$column", column), errorType)
DataQualityConfig(column, dataCheckType, rule.replace("$column", s"`$column`"), errorType)
} else {
DataQualityConfig(column, dataCheckType, rule, errorType)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ import org.apache.ibatis.annotations.Insert

trait QualityCheckLogMapper {
@Insert(Array(
"insert into sharp_etl.quality_check_log(job_id, job_name, `column`, data_check_type, ids, error_type, warn_count, " +
"insert into sharp_etl.quality_check_log(id, job_id, job_name, `column`, data_check_type, ids, error_type, warn_count, " +
"error_count, create_time, last_update_time)",
"values ",
"(#{jobId}, #{jobName}, #{column}, #{dataCheckType}, #{ids}, #{errorType}, #{warnCount}, #{errorCount}, #{createTime}, #{lastUpdateTime})"
"(#{id}, #{jobId}, #{jobName}, #{column}, #{dataCheckType}, #{ids}, #{errorType}, CAST(#{warnCount} as INT), CAST(#{errorCount} as INT), NOW(), NOW())"
))
def create(jobError: QualityCheckLog): Unit
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package com.github.sharpdata.sharpetl.core.repository.model

import com.github.sharpdata.sharpetl.core.util.StringUtil.uuid

import java.time.LocalDateTime
import scala.beans.BeanProperty

//noinspection ScalaStyle
case class QualityCheckLog(
@BeanProperty
var jobId: String,
Expand All @@ -23,5 +26,7 @@ case class QualityCheckLog(
@BeanProperty
var createTime: LocalDateTime = LocalDateTime.now(),
@BeanProperty
var lastUpdateTime: LocalDateTime = LocalDateTime.now()
var lastUpdateTime: LocalDateTime = LocalDateTime.now(),
@BeanProperty
var id: String = uuid
)
1 change: 1 addition & 0 deletions flink/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ dependencies {
implementation 'org.apache.paimon:paimon-oss:0.6.0-incubating'
implementation 'org.apache.flink:flink-connector-jdbc:3.1.1-1.17'

runtimeOnly 'org.apache.hadoop:hadoop-hdfs:2.7.2'

// runtimeOnly 'org.apache.flink:flink-shaded-hadoop-2-uber:2.4.1-10.0'
// runtimeOnly 'org.apache.hadoop:hadoop-common:2.4.1'
Expand Down
4 changes: 2 additions & 2 deletions flink/src/main/resources/quality-check.yaml
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
- dataCheckType: power null check
rule: powerNullCheck($column)
rule: $column is NULL or $column = 'NULL' or $column = 'null' or $column = ''
errorType: error
- dataCheckType: null check
rule: $column IS NULL
errorType: error
- dataCheckType: duplicated check
rule: UDR.com.github.sharpdata.sharpetl.core.quality.udr.DuplicatedCheck
errorType: warn
- dataCheckType: mismatch dim check
- dataCheckType: negative check
rule: $column = '-1'
errorType: warn
Loading

0 comments on commit 78799e4

Please sign in to comment.