Skip to content

Commit

Permalink
v0.28: fixed error when dash in dataset name
Browse files Browse the repository at this point in the history
  • Loading branch information
unytics committed Jul 26, 2024
1 parent 78574b1 commit 1425356
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 6 deletions.
10 changes: 5 additions & 5 deletions airbyte_serverless/destinations.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ def __init__(self, dataset='', **kwargs):
import google.cloud.bigquery
assert dataset, 'dataset argument must be defined'
assert len(dataset.split('.')) == 2, '`BigQueryDestination.dataset` must be like `project.dataset`'
self.dataset = dataset
self.dataset = dataset.replace('`', '').strip()
self.project, _ = self.dataset.split('.')
self.bigquery = google.cloud.bigquery.Client(project=self.project)
self.created_tables = []
Expand All @@ -132,7 +132,7 @@ def get_state(self):
json_value(_airbyte_data, '$.stream.stream_descriptor.name') as stream,
_airbyte_data as state,
_airbyte_loaded_at,
from {self.dataset}._airbyte_states
from `{self.dataset}._airbyte_states`
where json_value(_airbyte_data, '$.type') = 'STREAM'
qualify row_number() over (partition by stream order by _airbyte_loaded_at desc) = 1
Expand All @@ -143,7 +143,7 @@ def get_state(self):
select
_airbyte_data as state,
_airbyte_loaded_at,
from {self.dataset}._airbyte_states
from `{self.dataset}._airbyte_states`
where json_value(_airbyte_data, '$.type') = 'GLOBAL'
order by _airbyte_loaded_at desc
limit 1
Expand All @@ -155,7 +155,7 @@ def get_state(self):
select
json_extract(_airbyte_data, '$.data') as state,
_airbyte_loaded_at,
from {self.dataset}._airbyte_states
from `{self.dataset}._airbyte_states`
where json_extract(_airbyte_data, '$.data') is not null
order by _airbyte_loaded_at desc
limit 1
Expand Down Expand Up @@ -205,7 +205,7 @@ def _create_table_if_needed(self, table):
for column, type, description in self.destination_columns
])
self.bigquery.query(f'''
create table if not exists {self.dataset}.{table} (
create table if not exists `{self.dataset}.{table}` (
{columns_definitions}
)
partition by date(_airbyte_loaded_at)
Expand Down
2 changes: 1 addition & 1 deletion airbyte_serverless/version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
VERSION = '0.27'
VERSION = '0.28'

0 comments on commit 1425356

Please sign in to comment.