Skip to content

Commit

Permalink
[BigQuery] Better TOAST Handling (#1077)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 authored Dec 12, 2024
1 parent d93bf20 commit 29ddf90
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 14 deletions.
6 changes: 1 addition & 5 deletions clients/bigquery/dialect/dialect.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,7 @@ func (BigQueryDialect) IsTableDoesNotExistErr(_ error) bool {

func (bd BigQueryDialect) BuildIsNotToastValueExpression(tableAlias constants.TableAlias, column columns.Column) string {
colName := sql.QuoteTableAliasColumn(tableAlias, column, bd)
if column.KindDetails == typing.Struct {
return fmt.Sprintf(`COALESCE(TO_JSON_STRING(%s) != '{"key":"%s"}', true)`,
colName, constants.ToastUnavailableValuePlaceholder)
}
return fmt.Sprintf("COALESCE(%s != '%s', true)", colName, constants.ToastUnavailableValuePlaceholder)
return fmt.Sprintf(`TO_JSON_STRING(%s) NOT LIKE '%s'`, colName, "%"+constants.ToastUnavailableValuePlaceholder+"%")
}

func (bd BigQueryDialect) BuildDedupeTableQuery(tableID sql.TableIdentifier, primaryKeys []string) string {
Expand Down
4 changes: 2 additions & 2 deletions clients/bigquery/dialect/dialect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,11 @@ func TestBigQueryDialect_BuildAddColumnQuery(t *testing.T) {

func TestBigQueryDialect_BuildIsNotToastValueExpression(t *testing.T) {
assert.Equal(t,
"COALESCE(tbl.`bar` != '__debezium_unavailable_value', true)",
"TO_JSON_STRING(tbl.`bar`) NOT LIKE '%__debezium_unavailable_value%'",
BigQueryDialect{}.BuildIsNotToastValueExpression("tbl", columns.NewColumn("bar", typing.Invalid)),
)
assert.Equal(t,
"COALESCE(TO_JSON_STRING(tbl.`foo`) != '{\"key\":\"__debezium_unavailable_value\"}', true)",
"TO_JSON_STRING(tbl.`foo`) NOT LIKE '%__debezium_unavailable_value%'",
BigQueryDialect{}.BuildIsNotToastValueExpression("tbl", columns.NewColumn("foo", typing.Struct)),
)
}
Expand Down
11 changes: 4 additions & 7 deletions lib/sql/tests/columns_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package tests

import (
"fmt"
"testing"

bigqueryDialect "github.com/artie-labs/transfer/clients/bigquery/dialect"
Expand Down Expand Up @@ -96,7 +95,6 @@ func TestBuildColumnsUpdateFragment_BigQuery(t *testing.T) {

lastCaseEscapeTypes = append(lastCaseEscapeTypes, columns.NewColumn(constants.DeleteColumnMarker, typing.Boolean))

key := `{"key":"__debezium_unavailable_value"}`
testCases := []struct {
name string
columns []columns.Column
Expand All @@ -105,13 +103,12 @@ func TestBuildColumnsUpdateFragment_BigQuery(t *testing.T) {
{
name: "struct, string and toast string (bigquery)",
columns: lastCaseColTypes,
expectedString: "`a1`= CASE WHEN COALESCE(TO_JSON_STRING(stg.`a1`) != '{\"key\":\"__debezium_unavailable_value\"}', true) THEN stg.`a1` ELSE tgt.`a1` END,`b2`= CASE WHEN COALESCE(stg.`b2` != '__debezium_unavailable_value', true) THEN stg.`b2` ELSE tgt.`b2` END,`c3`=stg.`c3`",
expectedString: "`a1`= CASE WHEN TO_JSON_STRING(stg.`a1`) NOT LIKE '%__debezium_unavailable_value%' THEN stg.`a1` ELSE tgt.`a1` END,`b2`= CASE WHEN TO_JSON_STRING(stg.`b2`) NOT LIKE '%__debezium_unavailable_value%' THEN stg.`b2` ELSE tgt.`b2` END,`c3`=stg.`c3`",
},
{
name: "struct, string and toast string (bigquery) w/ reserved keywords",
columns: lastCaseEscapeTypes,
expectedString: fmt.Sprintf("`a1`= CASE WHEN COALESCE(TO_JSON_STRING(stg.`a1`) != '%s', true) THEN stg.`a1` ELSE tgt.`a1` END,`b2`= CASE WHEN COALESCE(stg.`b2` != '__debezium_unavailable_value', true) THEN stg.`b2` ELSE tgt.`b2` END,`c3`=stg.`c3`,%s,%s",
key, fmt.Sprintf("`start`= CASE WHEN COALESCE(TO_JSON_STRING(stg.`start`) != '%s', true) THEN stg.`start` ELSE tgt.`start` END", key), "`select`=stg.`select`,`__artie_delete`=stg.`__artie_delete`"),
name: "struct, string and toast string (bigquery) w/ reserved keywords",
columns: lastCaseEscapeTypes,
expectedString: "`a1`= CASE WHEN TO_JSON_STRING(stg.`a1`) NOT LIKE '%__debezium_unavailable_value%' THEN stg.`a1` ELSE tgt.`a1` END,`b2`= CASE WHEN TO_JSON_STRING(stg.`b2`) NOT LIKE '%__debezium_unavailable_value%' THEN stg.`b2` ELSE tgt.`b2` END,`c3`=stg.`c3`,`start`= CASE WHEN TO_JSON_STRING(stg.`start`) NOT LIKE '%__debezium_unavailable_value%' THEN stg.`start` ELSE tgt.`start` END,`select`=stg.`select`,`__artie_delete`=stg.`__artie_delete`",
},
}

Expand Down

0 comments on commit 29ddf90

Please sign in to comment.