diff --git a/core/src/main/resources/db/flink/migration/V1__init.sql b/core/src/main/resources/db/flink/migration/V1__init.sql index 14d78e5..6dd0171 100644 --- a/core/src/main/resources/db/flink/migration/V1__init.sql +++ b/core/src/main/resources/db/flink/migration/V1__init.sql @@ -22,7 +22,7 @@ create table if not exists `sharp_etl`.job_log create table if not exists `sharp_etl`.quality_check_log ( - id bigint, + id string, job_id string not null, job_name string not null comment 'job name(workflow_name + period)', `column` string not null comment 'issue column name', diff --git a/core/src/main/scala/com/github/sharpdata/sharpetl/core/quality/QualityCheck.scala b/core/src/main/scala/com/github/sharpdata/sharpetl/core/quality/QualityCheck.scala index 25e679d..8c58fa1 100644 --- a/core/src/main/scala/com/github/sharpdata/sharpetl/core/quality/QualityCheck.scala +++ b/core/src/main/scala/com/github/sharpdata/sharpetl/core/quality/QualityCheck.scala @@ -159,105 +159,74 @@ trait QualityCheck[DataFrame] extends Serializable { def dropView(tempViewName: String): Unit def dropUnusedCols(df: DataFrame, cols: String): DataFrame -} -// scalastyle:on - -object QualityCheck { - val DELIMITER = "__" - val DEFAULT_TOP_N = 1000 - def checkSql(tempViewName: String, resultView: String, dataQualityCheckMapping: Seq[DataQualityConfig], idColumn: String): String = { + def windowByPkSql(tempViewName: String, idColumns: String, sortColumns: String = "", desc: Boolean = true): String = { s""" - |CREATE TEMPORARY VIEW $resultView - |AS SELECT ${joinIdColumns(idColumn)} as id, - | flatten(ARRAY(${generateWarnCases(dataQualityCheckMapping)} - | )) as warn_result, - | flatten(ARRAY(${generateErrorCases(dataQualityCheckMapping)} - | )) as error_result - |FROM `$tempViewName` - """.stripMargin - } - - def emptyArrayIfMissing(query: String): String = { - if (query.trim == "") { - "array()" - } else { - query - } + |SELECT * + |FROM (SELECT *, ROW_NUMBER() + | OVER (PARTITION BY $idColumns + | ORDER BY ${if (isNullOrEmpty(sortColumns)) "1" else sortColumns} ${if (desc) "DESC" else "ASC"}) as __row_num + | FROM $tempViewName + |) WHERE __row_num = 1""".stripMargin } - def generateWarnCases(dataQualityCheckMapping: Seq[DataQualityConfig]): String = { - emptyArrayIfMissing(dataQualityCheckMapping - .filter(_.errorType == ErrorType.warn) - .map(it => s"""CASE WHEN ${it.rule} THEN array("${it.dataCheckType}${DELIMITER}${it.column}") ELSE array() END""") - .mkString(",\n\t\t\t\t") - ) - } - def generateErrorCases(dataQualityCheckMapping: Seq[DataQualityConfig]): String = { - emptyArrayIfMissing(dataQualityCheckMapping - .filter(_.errorType == ErrorType.error) - .map(it => s"""CASE WHEN ${it.rule} THEN array("${it.dataCheckType}${DELIMITER}${it.column}") ELSE array() END""") - .mkString(",\n\t\t\t\t") - ) + def windowByPkSqlErrors(tempViewName: String, idColumns: String, sortColumns: String = "", desc: Boolean = true): String = { + s""" + |SELECT ${joinIdColumns(idColumns)} as id, + | ARRAY('Duplicated PK check$DELIMITER$idColumns') as error_result + |FROM (SELECT *, ROW_NUMBER() + | OVER (PARTITION BY $idColumns + | ORDER BY ${if (isNullOrEmpty(sortColumns)) "1" else sortColumns} ${if (desc) "DESC" else "ASC"}) as __row_num + | FROM $tempViewName + |) WHERE __row_num > 1""".stripMargin } - def generateWarnUnions(dataQualityCheckMapping: Seq[DataQualityConfig], topN: Int, view: String): String = { + def generateErrorUnions(dataQualityCheckMapping: Seq[DataQualityConfig], topN: Int, view: String): String = { dataQualityCheckMapping - .filter(_.errorType == ErrorType.warn) + .filter(_.errorType == ErrorType.error) .map(it => s"""(SELECT | "${it.column}" as column, | "${it.dataCheckType}" as dataCheckType, | arrayJoin(top(collect_list(string(id)), $topN), ',') as ids, | "${it.errorType}" as errorType, - | count(*) as warnCount, - | 0 as errorCount + | 0 as warnCount, + | count(*) as errorCount |FROM `$view` - |WHERE array_contains(warn_result, "${it.dataCheckType}${DELIMITER}${it.column}") + |WHERE array_contains(error_result, "${it.dataCheckType}${DELIMITER}${it.column}") |)""".stripMargin ) .mkString("\nUNION ALL\n") } - def generateErrorUnions(dataQualityCheckMapping: Seq[DataQualityConfig], topN: Int, view: String): String = { + def generateWarnUnions(dataQualityCheckMapping: Seq[DataQualityConfig], topN: Int, view: String): String = { dataQualityCheckMapping - .filter(_.errorType == ErrorType.error) + .filter(_.errorType == ErrorType.warn) .map(it => s"""(SELECT | "${it.column}" as column, | "${it.dataCheckType}" as dataCheckType, | arrayJoin(top(collect_list(string(id)), $topN), ',') as ids, | "${it.errorType}" as errorType, - | 0 as warnCount, - | count(*) as errorCount + | count(*) as warnCount, + | 0 as errorCount |FROM `$view` - |WHERE array_contains(error_result, "${it.dataCheckType}${DELIMITER}${it.column}") + |WHERE array_contains(warn_result, "${it.dataCheckType}${DELIMITER}${it.column}") |)""".stripMargin ) .mkString("\nUNION ALL\n") } - def joinIdColumns(idColumn: String, prefix: String = ""): String = { - val realPrefix = if (isNullOrEmpty(prefix)) "" else s"`$prefix`." - if (idColumn.contains(",")) { - idColumn.split(",").map(it => s"ifnull($realPrefix`${it.trim}`, 'NULL')").mkString("CONCAT(", s", '$DELIMITER', ", ")") - } else { - s"$realPrefix`$idColumn`" - } - } - - def joinOnConditions(resultView: String, tempViewName: String, idColumn: String): String = { - idColumn.split(",").map(_.trim).zipWithIndex.map { case (column: String, idx: Int) => - s"""`$tempViewName`.`$column` = split(`$resultView`.id, '$DELIMITER')[$idx]""" - }.mkString(" AND \n\t") - } - - def antiJoinSql(idColumn: String, tempViewName: String, resultView: String): String = { - s"""|LEFT ANTI JOIN ( - | SELECT id FROM `$resultView` - | WHERE size(error_result) > 0 - |) bad_ids ON bad_ids.id = ${joinIdColumns(idColumn, tempViewName)} + def checkSql(tempViewName: String, resultView: String, dataQualityCheckMapping: Seq[DataQualityConfig], idColumn: String): String = { + s""" + |CREATE TEMPORARY VIEW $resultView + |AS SELECT ${joinIdColumns(idColumn)} as id, + | flatten(ARRAY(${generateWarnCases(dataQualityCheckMapping)} + | )) as warn_result, + | flatten(ARRAY(${generateErrorCases(dataQualityCheckMapping)} + | )) as error_result + |FROM `$tempViewName` """.stripMargin } @@ -267,16 +236,16 @@ object QualityCheck { StringUtil.EMPTY } else { udrWithViews.map { case (udr, viewName) => - s""" - |(SELECT "${udr.column}" as column, - | "${udr.dataCheckType}" as dataCheckType, - | arrayJoin(top(collect_list(string(id)), $topN), ',') as ids, - | "${udr.errorType}" as errorType, - | count(*) as warnCount, - | 0 as errorCount - |FROM $viewName) - |""".stripMargin - } + s""" + |(SELECT "${udr.column}" as column, + | "${udr.dataCheckType}" as dataCheckType, + | arrayJoin(top(collect_list(string(id)), $topN), ',') as ids, + | "${udr.errorType}" as errorType, + | count(*) as warnCount, + | 0 as errorCount + |FROM $viewName) + |""".stripMargin + } .mkString("\nUNION ALL\n") } } @@ -287,20 +256,29 @@ object QualityCheck { StringUtil.EMPTY } else { udrWithViews.map { case (udr, viewName) => - s""" - |(SELECT "${udr.column}" as column, - | "${udr.dataCheckType}" as dataCheckType, - | arrayJoin(top(collect_list(string(id)), $topN), ',') as ids, - | "${udr.errorType}" as errorType, - | 0 as warnCount, - | count(*) as errorCount - |FROM $viewName) - |""".stripMargin - } + s""" + |(SELECT "${udr.column}" as column, + | "${udr.dataCheckType}" as dataCheckType, + | arrayJoin(top(collect_list(string(id)), $topN), ',') as ids, + | "${udr.errorType}" as errorType, + | 0 as warnCount, + | count(*) as errorCount + |FROM $viewName) + |""".stripMargin + } .mkString("\nUNION ALL\n") } } + + def antiJoinSql(idColumn: String, tempViewName: String, resultView: String): String = { + s"""|LEFT ANTI JOIN ( + | SELECT id FROM `$resultView` + | WHERE size(error_result) > 0 + |) bad_ids ON bad_ids.id = ${joinIdColumns(idColumn, tempViewName)} + """.stripMargin + } + def udrAntiJoinSql(idColumn: String, tempViewName: String, viewNames: Seq[String]): String = { if (viewNames.isEmpty) { StringUtil.EMPTY @@ -311,6 +289,51 @@ object QualityCheck { |""".stripMargin } } +} +// scalastyle:on + +object QualityCheck { + val DELIMITER = "__" + val DEFAULT_TOP_N = 1000 + + def emptyArrayIfMissing(query: String): String = { + if (query.trim == "") { + "array()" + } else { + query + } + } + + def generateWarnCases(dataQualityCheckMapping: Seq[DataQualityConfig]): String = { + emptyArrayIfMissing(dataQualityCheckMapping + .filter(_.errorType == ErrorType.warn) + .map(it => s"""CASE WHEN ${it.rule} THEN array("${it.dataCheckType}${DELIMITER}${it.column}") ELSE array() END""") + .mkString(",\n\t\t\t\t") + ) + } + + def generateErrorCases(dataQualityCheckMapping: Seq[DataQualityConfig]): String = { + emptyArrayIfMissing(dataQualityCheckMapping + .filter(_.errorType == ErrorType.error) + .map(it => s"""CASE WHEN ${it.rule} THEN array("${it.dataCheckType}${DELIMITER}${it.column}") ELSE array() END""") + .mkString(",\n\t\t\t\t") + ) + } + + def joinIdColumns(idColumn: String, prefix: String = ""): String = { + val realPrefix = if (isNullOrEmpty(prefix)) "" else s"`$prefix`." + if (idColumn.contains(",")) { + idColumn.split(",").map(it => s"ifnull($realPrefix`${it.trim}`, 'NULL')").mkString("CONCAT(", s", '$DELIMITER', ", ")") + } else { + s"$realPrefix`$idColumn`" + } + } + + def joinOnConditions(resultView: String, tempViewName: String, idColumn: String): String = { + idColumn.split(",").map(_.trim).zipWithIndex.map { case (column: String, idx: Int) => + s"""`$tempViewName`.`$column` = split(`$resultView`.id, '$DELIMITER')[$idx]""" + }.mkString(" AND \n\t") + } def generateAntiJoinSql(sql: String, udrSql: String, tempViewName: String): String = { if (isNullOrEmpty(sql) && isNullOrEmpty(udrSql)) { @@ -322,25 +345,4 @@ object QualityCheck { """.stripMargin } } - - def windowByPkSql(tempViewName: String, idColumns: String, sortColumns: String = "", desc: Boolean = true): String = { - s""" - |SELECT * - |FROM (SELECT *, ROW_NUMBER() - | OVER (PARTITION BY $idColumns - | ORDER BY ${if (isNullOrEmpty(sortColumns)) "1" else sortColumns} ${if (desc) "DESC" else "ASC"}) as __row_num - | FROM $tempViewName - |) WHERE __row_num = 1""".stripMargin - } - - def windowByPkSqlErrors(tempViewName: String, idColumns: String, sortColumns: String = "", desc: Boolean = true): String = { - s""" - |SELECT ${joinIdColumns(idColumns)} as id, - | ARRAY('Duplicated PK check$DELIMITER$idColumns') as error_result - |FROM (SELECT *, ROW_NUMBER() - | OVER (PARTITION BY $idColumns - | ORDER BY ${if (isNullOrEmpty(sortColumns)) "1" else sortColumns} ${if (desc) "DESC" else "ASC"}) as __row_num - | FROM $tempViewName - |) WHERE __row_num > 1""".stripMargin - } } diff --git a/core/src/main/scala/com/github/sharpdata/sharpetl/core/quality/QualityCheckRule.scala b/core/src/main/scala/com/github/sharpdata/sharpetl/core/quality/QualityCheckRule.scala index fa27117..bc10a41 100644 --- a/core/src/main/scala/com/github/sharpdata/sharpetl/core/quality/QualityCheckRule.scala +++ b/core/src/main/scala/com/github/sharpdata/sharpetl/core/quality/QualityCheckRule.scala @@ -3,7 +3,7 @@ package com.github.sharpdata.sharpetl.core.quality final case class QualityCheckRule(dataCheckType: String, rule: String, errorType: String) { def withColumn(column: String): DataQualityConfig = { if (rule.contains("$")) { - DataQualityConfig(column, dataCheckType, rule.replace("$column", column), errorType) + DataQualityConfig(column, dataCheckType, rule.replace("$column", s"`$column`"), errorType) } else { DataQualityConfig(column, dataCheckType, rule, errorType) } diff --git a/core/src/main/scala/com/github/sharpdata/sharpetl/core/repository/mapper/flink/QualityCheckLogMapper.scala b/core/src/main/scala/com/github/sharpdata/sharpetl/core/repository/mapper/flink/QualityCheckLogMapper.scala index 03bbaa3..7b7acaa 100644 --- a/core/src/main/scala/com/github/sharpdata/sharpetl/core/repository/mapper/flink/QualityCheckLogMapper.scala +++ b/core/src/main/scala/com/github/sharpdata/sharpetl/core/repository/mapper/flink/QualityCheckLogMapper.scala @@ -5,10 +5,10 @@ import org.apache.ibatis.annotations.Insert trait QualityCheckLogMapper { @Insert(Array( - "insert into sharp_etl.quality_check_log(job_id, job_name, `column`, data_check_type, ids, error_type, warn_count, " + + "insert into sharp_etl.quality_check_log(id, job_id, job_name, `column`, data_check_type, ids, error_type, warn_count, " + "error_count, create_time, last_update_time)", "values ", - "(#{jobId}, #{jobName}, #{column}, #{dataCheckType}, #{ids}, #{errorType}, #{warnCount}, #{errorCount}, #{createTime}, #{lastUpdateTime})" + "(#{id}, #{jobId}, #{jobName}, #{column}, #{dataCheckType}, #{ids}, #{errorType}, CAST(#{warnCount} as INT), CAST(#{errorCount} as INT), NOW(), NOW())" )) def create(jobError: QualityCheckLog): Unit } diff --git a/core/src/main/scala/com/github/sharpdata/sharpetl/core/repository/model/QualityCheckLog.scala b/core/src/main/scala/com/github/sharpdata/sharpetl/core/repository/model/QualityCheckLog.scala index da1933d..e9c7f95 100644 --- a/core/src/main/scala/com/github/sharpdata/sharpetl/core/repository/model/QualityCheckLog.scala +++ b/core/src/main/scala/com/github/sharpdata/sharpetl/core/repository/model/QualityCheckLog.scala @@ -1,8 +1,11 @@ package com.github.sharpdata.sharpetl.core.repository.model +import com.github.sharpdata.sharpetl.core.util.StringUtil.uuid + import java.time.LocalDateTime import scala.beans.BeanProperty +//noinspection ScalaStyle case class QualityCheckLog( @BeanProperty var jobId: String, @@ -23,5 +26,7 @@ case class QualityCheckLog( @BeanProperty var createTime: LocalDateTime = LocalDateTime.now(), @BeanProperty - var lastUpdateTime: LocalDateTime = LocalDateTime.now() + var lastUpdateTime: LocalDateTime = LocalDateTime.now(), + @BeanProperty + var id: String = uuid ) diff --git a/flink/build.gradle b/flink/build.gradle index 8979711..3d6ee89 100644 --- a/flink/build.gradle +++ b/flink/build.gradle @@ -72,6 +72,7 @@ dependencies { implementation 'org.apache.paimon:paimon-oss:0.6.0-incubating' implementation 'org.apache.flink:flink-connector-jdbc:3.1.1-1.17' + runtimeOnly 'org.apache.hadoop:hadoop-hdfs:2.7.2' // runtimeOnly 'org.apache.flink:flink-shaded-hadoop-2-uber:2.4.1-10.0' // runtimeOnly 'org.apache.hadoop:hadoop-common:2.4.1' diff --git a/flink/src/main/resources/quality-check.yaml b/flink/src/main/resources/quality-check.yaml index 94c707d..82c730c 100644 --- a/flink/src/main/resources/quality-check.yaml +++ b/flink/src/main/resources/quality-check.yaml @@ -1,5 +1,5 @@ - dataCheckType: power null check - rule: powerNullCheck($column) + rule: $column is NULL or $column = 'NULL' or $column = 'null' or $column = '' errorType: error - dataCheckType: null check rule: $column IS NULL @@ -7,6 +7,6 @@ - dataCheckType: duplicated check rule: UDR.com.github.sharpdata.sharpetl.core.quality.udr.DuplicatedCheck errorType: warn -- dataCheckType: mismatch dim check +- dataCheckType: negative check rule: $column = '-1' errorType: warn \ No newline at end of file diff --git a/flink/src/main/resources/tasks/quality_check.sql b/flink/src/main/resources/tasks/quality_check.sql new file mode 100644 index 0000000..936ba30 --- /dev/null +++ b/flink/src/main/resources/tasks/quality_check.sql @@ -0,0 +1,47 @@ +-- workflow=test_dwd_with_quality_check +-- period=1440 +-- loadType=incremental +-- logDrivenType=timewindow + + +-- step=var setup +-- source=temp +-- target=variables +select DATE_FORMAT(TO_TIMESTAMP('${DATA_RANGE_END}' , 'yyyy-MM-dd HH:mm:ss'), 'yyyyMMdd') as `DATE_END`, + extract(hour from TO_TIMESTAMP('${DATA_RANGE_START}' , 'yyyy-MM-dd HH:mm:ss')) as `HOUR_END`, + TO_TIMESTAMP('${DATA_RANGE_START}' , 'yyyy-MM-dd HH:mm:ss') as `EFFECTIVE_START_TIME`; + +-- step=read temp data +-- source=temp +-- options +-- idColumn=order_id +-- sortColumn=order_id +-- column.phone.qualityCheckRules=power null check +-- column.value.qualityCheckRules=negative check +-- target=console +select 1212121242 as `order_id`, + '11' as `phone`, + '-1' as `value`, + '${JOB_ID}' as `job_id`, + '${EFFECTIVE_START_TIME}' as effective_start_time, + '9999-01-01 00:00:00' as effective_end_time, + '1' as is_active, + '1' as is_latest, + '${DATA_RANGE_START}' as idempotent_key, + '${DATE_END}' as dw_insert_date + +UNION ALL select 1212121243 as `order_id`, + 'null' as `phone`, + '1' as `value`, + '${JOB_ID}' as `job_id`, + '${EFFECTIVE_START_TIME}' as effective_start_time, + '9999-01-01 00:00:00' as effective_end_time, + '1' as is_active, + '1' as is_latest, + '${DATA_RANGE_START}' as idempotent_key, + '${DATE_END}' as dw_insert_date; + +-- step=read check result +-- source=built_in +-- target=console +select * from `paimon`.`sharp_etl`.`quality_check_log` where job_id='${JOB_ID}'; diff --git a/flink/src/main/scala/com/github/sharpdata/sharpetl/flink/extra/driver/FlinkJdbcPreparedStatement.scala b/flink/src/main/scala/com/github/sharpdata/sharpetl/flink/extra/driver/FlinkJdbcPreparedStatement.scala index 6d090d7..c7ea499 100644 --- a/flink/src/main/scala/com/github/sharpdata/sharpetl/flink/extra/driver/FlinkJdbcPreparedStatement.scala +++ b/flink/src/main/scala/com/github/sharpdata/sharpetl/flink/extra/driver/FlinkJdbcPreparedStatement.scala @@ -23,6 +23,7 @@ class FlinkJdbcPreparedStatement(val sql: String) extends PreparedStatement { .foldLeft(sql) { case (accSql, (_, value)) => value match { + case _ if !accSql.contains("?") => accSql case _: Int | _: Boolean => accSql.replaceFirst("\\?", value.toString) case _: String => accSql.replaceFirst("\\?", s"\'${escape(value.toString)}\'") case time: LocalDateTime => accSql.replaceFirst("\\?", s"\'${time.format(L_YYYY_MM_DD_HH_MM_SS)}\'") diff --git a/flink/src/main/scala/com/github/sharpdata/sharpetl/flink/extra/driver/FlinkJdbcStatement.scala b/flink/src/main/scala/com/github/sharpdata/sharpetl/flink/extra/driver/FlinkJdbcStatement.scala index 2ac43a5..7dc33e8 100644 --- a/flink/src/main/scala/com/github/sharpdata/sharpetl/flink/extra/driver/FlinkJdbcStatement.scala +++ b/flink/src/main/scala/com/github/sharpdata/sharpetl/flink/extra/driver/FlinkJdbcStatement.scala @@ -1,7 +1,8 @@ package com.github.sharpdata.sharpetl.flink.extra.driver +import com.github.sharpdata.sharpetl.flink.extra.driver.FlinkJdbcStatement.fixedResult import com.github.sharpdata.sharpetl.flink.util.ETLFlinkSession -import org.apache.flink.table.api.DataTypes +import org.apache.flink.table.api.{DataTypes, Table} import org.apache.flink.table.api.Expressions.row import java.sql.{Connection, ResultSet, SQLWarning, Statement} @@ -49,13 +50,6 @@ class FlinkJdbcStatement extends Statement { override def execute(sql: String): Boolean = { ETLFlinkSession.batchEnv.executeSql(sql).print() - val fixedResult = ETLFlinkSession.batchEnv.fromValues( - DataTypes.ROW( - DataTypes.FIELD("result", DataTypes.STRING()) - ), - row("SUCCESS") - ) - this.resultSet = new FlinkJdbcResultSet(fixedResult, this) false } @@ -119,4 +113,13 @@ class FlinkJdbcStatement extends Statement { override def isWrapperFor(iface: Class[_]): Boolean = false } +object FlinkJdbcStatement { + val fixedResult: Table = ETLFlinkSession.batchEnv.fromValues( + DataTypes.ROW( + DataTypes.FIELD("result", DataTypes.STRING()) + ), + row("SUCCESS") + ) +} + // scalastyle:on diff --git a/flink/src/main/scala/com/github/sharpdata/sharpetl/flink/job/FlinkWorkflowInterpreter.scala b/flink/src/main/scala/com/github/sharpdata/sharpetl/flink/job/FlinkWorkflowInterpreter.scala index 784615e..ec186a6 100644 --- a/flink/src/main/scala/com/github/sharpdata/sharpetl/flink/job/FlinkWorkflowInterpreter.scala +++ b/flink/src/main/scala/com/github/sharpdata/sharpetl/flink/job/FlinkWorkflowInterpreter.scala @@ -29,13 +29,6 @@ class FlinkWorkflowInterpreter(override val tEnv: TableEnvironment, override def evalSteps(steps: List[WorkflowStep], jobLog: JobLog, variables: Variables, start: String, end: String): Unit = { super.evalSteps(steps, jobLog, variables, start, end) - cleanUpTempTableFromMemory() - } - - private def cleanUpTempTableFromMemory(): Unit = { - if (Environment.CURRENT != "test") { - tEnv.listTemporaryTables().foreach(it => tEnv.dropTemporaryTable(it)) - } } // deprecated method diff --git a/flink/src/main/scala/com/github/sharpdata/sharpetl/flink/quality/FlinkQualityCheck.scala b/flink/src/main/scala/com/github/sharpdata/sharpetl/flink/quality/FlinkQualityCheck.scala index 5c7b9dd..e1a27d9 100644 --- a/flink/src/main/scala/com/github/sharpdata/sharpetl/flink/quality/FlinkQualityCheck.scala +++ b/flink/src/main/scala/com/github/sharpdata/sharpetl/flink/quality/FlinkQualityCheck.scala @@ -2,13 +2,18 @@ package com.github.sharpdata.sharpetl.flink.quality import com.github.sharpdata.sharpetl.core.annotation.Annotations.Stable import com.github.sharpdata.sharpetl.core.quality.QualityCheck._ -import com.github.sharpdata.sharpetl.core.quality.{DataQualityCheckResult, QualityCheck, QualityCheckRule} +import com.github.sharpdata.sharpetl.core.quality.{DataQualityCheckResult, DataQualityConfig, ErrorType, QualityCheck, QualityCheckRule} import com.github.sharpdata.sharpetl.core.repository.QualityCheckAccessor -import com.github.sharpdata.sharpetl.core.util.ETLLogger +import com.github.sharpdata.sharpetl.core.util.{ETLLogger, StringUtil} +import com.github.sharpdata.sharpetl.flink.extra.driver.FlinkJdbcStatement.fixedResult import com.github.sharpdata.sharpetl.flink.job.Types.DataFrame import org.apache.flink.table.api.Expressions._ -import org.apache.flink.table.api.TableEnvironment +import org.apache.flink.table.api.internal.TableEnvironmentImpl +import org.apache.flink.table.api.{TableEnvironment, ValidationException} +import org.apache.flink.table.operations.{ModifyOperation, Operation, QueryOperation} +import java.util +import java.util.List import scala.jdk.CollectionConverters.asScalaIteratorConverter @Stable(since = "1.0.0") @@ -38,7 +43,21 @@ class FlinkQualityCheck(val tEnv: TableEnvironment, override def execute(sql: String): DataFrame = { ETLLogger.info(s"Execution sql: \n $sql") - tEnv.sqlQuery(sql) + val impl = tEnv.asInstanceOf[TableEnvironmentImpl] + val operations: util.List[Operation] = impl.getParser.parse(sql) + if (operations.size != 1) { + throw new ValidationException("Unsupported SQL query! sqlQuery() only accepts a single SQL query.") + } + else { + val operation: Operation = operations.get(0) + operation match { + case op: QueryOperation if !operation.isInstanceOf[ModifyOperation] => + impl.createTable(op) + case _ => + tEnv.executeSql(sql) + fixedResult + } + } } override def createView(df: DataFrame, tempViewName: String): Unit = { @@ -54,4 +73,156 @@ class FlinkQualityCheck(val tEnv: TableEnvironment, override def dropUnusedCols(df: DataFrame, cols: String): DataFrame = { df.dropColumns(cols.split(",").map(col => $(col.trim)).toArray: _*) } + + override def windowByPkSql(tempViewName: String, idColumns: String, sortColumns: String = "", desc: Boolean = true): String = { + s""" + |SELECT *, 1 as __row_num + |FROM (SELECT *, MAX($sortColumns) + | OVER (PARTITION BY $idColumns) as __max__ + | FROM `$tempViewName` + |) WHERE $idColumns = __max__""".stripMargin + } + + override def windowByPkSqlErrors(tempViewName: String, idColumns: String, sortColumns: String = "", desc: Boolean = true): String = { + s""" + |SELECT ${joinIdColumns(idColumns)} as id, + | ARRAY['Duplicated PK check$DELIMITER$idColumns'] as error_result, + | 1 as __row_num + |FROM (SELECT *, MAX($sortColumns) + | OVER (PARTITION BY $idColumns) as __max__ + | FROM `$tempViewName` + |) WHERE $idColumns = __max__""".stripMargin + } + + override def generateErrorUnions(dataQualityCheckMapping: Seq[DataQualityConfig], topN: Int, view: String): String = { + dataQualityCheckMapping + .filter(_.errorType == ErrorType.error) + .map(it => + s"""(SELECT + | '${it.column}' as `column`, + | '${it.dataCheckType}' as dataCheckType, + | LISTAGG(CAST(id as STRING)) as ids, + | '${it.errorType}' as errorType, + | 0 as warnCount, + | count(*) as errorCount + |FROM `$view` + |WHERE ARRAY_CONTAINS(error_result, '${it.dataCheckType}${DELIMITER}${it.column}') + |)""".stripMargin + ) + .mkString("\nUNION ALL\n") + } + + override def generateWarnUnions(dataQualityCheckMapping: Seq[DataQualityConfig], topN: Int, view: String): String = { + dataQualityCheckMapping + .filter(_.errorType == ErrorType.warn) + .map(it => + s"""(SELECT + | '${it.column}' as `column`, + | '${it.dataCheckType}' as dataCheckType, + | LISTAGG(CAST(id as STRING)) as ids, + | '${it.errorType}' as errorType, + | count(*) as warnCount, + | 0 as errorCount + |FROM `$view` + |WHERE ARRAY_CONTAINS(warn_result, '${it.dataCheckType}${DELIMITER}${it.column}') + |)""".stripMargin + ) + .mkString("\nUNION ALL\n") + } + + override def checkSql(tempViewName: String, resultView: String, dataQualityCheckMapping: Seq[DataQualityConfig], idColumn: String): String = { + s""" + |CREATE TEMPORARY VIEW `$resultView` + |AS SELECT ${joinIdColumns(idColumn)} as id, + | ARRAY[${generateWarnCases(dataQualityCheckMapping)} + | ] as warn_result, + | ARRAY[${generateErrorCases(dataQualityCheckMapping)} + | ] as error_result + |FROM `$tempViewName` + """.stripMargin + } + + override def udrWarnSql(topN: Int, udrWithViews: Seq[(DataQualityConfig, String)]) + : String = { + if (udrWithViews.isEmpty) { + StringUtil.EMPTY + } else { + udrWithViews.map { case (udr, viewName) => + s""" + |(SELECT '${udr.column}' as column, + | '${udr.dataCheckType}' as dataCheckType, + | LISTAGG(CAST(id as STRING)) as ids, + | '${udr.errorType}' as errorType, + | count(*) as warnCount, + | 0 as errorCount + |FROM `$viewName`) + |""".stripMargin + } + .mkString("\nUNION ALL\n") + } + } + + override def udrErrorSql(topN: Int, udrWithViews: Seq[(DataQualityConfig, String)]) + : String = { + if (udrWithViews.isEmpty) { + StringUtil.EMPTY + } else { + udrWithViews.map { case (udr, viewName) => + s""" + |(SELECT '${udr.column}' as column, + | '${udr.dataCheckType}' as dataCheckType, + | LISTAGG(CAST(id as STRING)) as ids, + | '${udr.errorType}' as errorType, + | 0 as warnCount, + | count(*) as errorCount + |FROM `$viewName`) + |""".stripMargin + } + .mkString("\nUNION ALL\n") + } + } + + override def antiJoinSql(idColumn: String, tempViewName: String, resultView: String): String = { + s"""|WHERE `$tempViewName`.`$idColumn` NOT IN ( + | SELECT id FROM `$resultView` + | WHERE CARDINALITY(error_result) > 0 + |) + """.stripMargin + } + + override def udrAntiJoinSql(idColumn: String, tempViewName: String, viewNames: Seq[String]): String = { + if (viewNames.isEmpty) { + StringUtil.EMPTY + } else { + s"""|WHERE `$tempViewName`.`$idColumn` NOT IN ( + | SELECT id FROM (${viewNames.map(view => s"SELECT id FROM $view").mkString("\nUNION ALL\n")}) + | WHERE CARDINALITY(error_result) > 0 + |) + |""".stripMargin + } + } + + def emptyArrayIfMissing(query: String): String = { + if (query.trim == "") { + "array[]" + } else { + query + } + } + + def generateWarnCases(dataQualityCheckMapping: Seq[DataQualityConfig]): String = { + emptyArrayIfMissing(dataQualityCheckMapping + .filter(_.errorType == ErrorType.warn) + .map(it => s"""CASE WHEN ${it.rule} THEN '${it.dataCheckType}${DELIMITER}${it.column}' ELSE '' END""") + .mkString(",\n\t\t\t\t") + ) + } + + def generateErrorCases(dataQualityCheckMapping: Seq[DataQualityConfig]): String = { + emptyArrayIfMissing(dataQualityCheckMapping + .filter(_.errorType == ErrorType.error) + .map(it => s"""CASE WHEN ${it.rule} THEN '${it.dataCheckType}${DELIMITER}${it.column}' ELSE '' END""") + .mkString(",\n\t\t\t\t") + ) + } } diff --git a/flink/src/main/scala/com/github/sharpdata/sharpetl/flink/udf/CollectWsUDF.java b/flink/src/main/scala/com/github/sharpdata/sharpetl/flink/udf/CollectWsUDF.java new file mode 100644 index 0000000..90901c1 --- /dev/null +++ b/flink/src/main/scala/com/github/sharpdata/sharpetl/flink/udf/CollectWsUDF.java @@ -0,0 +1,23 @@ +package com.github.sharpdata.sharpetl.flink.udf; + +import org.apache.flink.table.annotation.DataTypeHint; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.catalog.DataTypeFactory; +import org.apache.flink.table.functions.ScalarFunction; +import org.apache.flink.table.types.inference.TypeInference; + +import java.util.Map; +import java.util.Optional; + +public class CollectWsUDF extends ScalarFunction { + + public String eval(@DataTypeHint("MAP") Map multiset) { + return String.join(",", multiset.keySet()); + } + + @Override + public TypeInference getTypeInference(DataTypeFactory typeFactory) { + return TypeInference.newBuilder().outputTypeStrategy(callContext -> Optional.of(DataTypes.STRING())).build(); + } + +} \ No newline at end of file diff --git a/flink/src/main/scala/com/github/sharpdata/sharpetl/flink/util/ETLFlinkSession.scala b/flink/src/main/scala/com/github/sharpdata/sharpetl/flink/util/ETLFlinkSession.scala index 5501d4a..acf2fb1 100644 --- a/flink/src/main/scala/com/github/sharpdata/sharpetl/flink/util/ETLFlinkSession.scala +++ b/flink/src/main/scala/com/github/sharpdata/sharpetl/flink/util/ETLFlinkSession.scala @@ -5,6 +5,7 @@ import com.github.sharpdata.sharpetl.core.repository.QualityCheckAccessor import com.github.sharpdata.sharpetl.core.util.Constants.ETLDatabaseType.FLINK_SHARP_ETL import com.github.sharpdata.sharpetl.core.util.{ETLConfig, ETLLogger} import com.github.sharpdata.sharpetl.flink.job.FlinkWorkflowInterpreter +import com.github.sharpdata.sharpetl.flink.udf.CollectWsUDF import org.apache.flink.configuration.Configuration import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment} @@ -33,6 +34,10 @@ object ETLFlinkSession { conf } + def initUdf(session: TableEnvironment): Unit = { + session.createTemporarySystemFunction("collect_ws", classOf[CollectWsUDF]) + } + def getFlinkInterpreter(local: Boolean, wfName: String, autoCloseSession: Boolean, @@ -43,7 +48,7 @@ object ETLFlinkSession { ETLFlinkSession.wfName = wfName ETLFlinkSession.autoCloseSession = autoCloseSession val session = ETLFlinkSession.batchEnv - //UdfInitializer.init(session) + initUdf(session) createCatalogIfNeed(etlDatabaseType, session) new FlinkWorkflowInterpreter(session, dataQualityCheckRules, QualityCheckAccessor.getInstance(etlDatabaseType)) } diff --git a/spark/src/test/scala/com/github/sharpdata/sharpetl/spark/quality/DataQualityCheckRuleSpec.scala b/spark/src/test/scala/com/github/sharpdata/sharpetl/spark/quality/DataQualityCheckRuleSpec.scala index b904171..3a99052 100644 --- a/spark/src/test/scala/com/github/sharpdata/sharpetl/spark/quality/DataQualityCheckRuleSpec.scala +++ b/spark/src/test/scala/com/github/sharpdata/sharpetl/spark/quality/DataQualityCheckRuleSpec.scala @@ -16,13 +16,13 @@ class DataQualityCheckRuleSpec extends AnyFlatSpec with should.Matchers with Spa it should "replace column placeholder with actual name" in { BUILT_IN_QUALITY_CHECK_RULES.head.withColumn("name") should be( - DataQualityConfig("name", "null check", "powerNullCheck(name)", ErrorType.error) + DataQualityConfig("name", "null check", "powerNullCheck(`name`)", ErrorType.error) ) } it should "make no effects with custom filter" in { BUILT_IN_QUALITY_CHECK_RULES.tail.head.withColumn("name") should be( - DataQualityConfig("name", "custom check for name and address", "powerNullCheck(name) AND powerNullCheck(address)", ErrorType.error) + DataQualityConfig("name", "custom check for name and address", "powerNullCheck(`name`) AND powerNullCheck(address)", ErrorType.error) ) } } \ No newline at end of file