Skip to content

Commit

Permalink
[新增]TrinoDB事务
Browse files Browse the repository at this point in the history
  • Loading branch information
longfengpili committed Nov 16, 2022
1 parent 3ac163e commit 19bec30
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 12 deletions.
14 changes: 9 additions & 5 deletions pydbapi/api/trino.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# @Author: longfengpili
# @Date: 2022-11-14 14:17:02
# @Last Modified by: longfengpili
# @Last Modified time: 2022-11-15 16:42:32
# @Last Modified time: 2022-11-16 10:38:30


import re
Expand Down Expand Up @@ -56,7 +56,7 @@ def create(self, columns, partition=None):
class TrinoDB(DBMixin, DBFileExec):
_instance_lock = threading.Lock()

def __init__(self, host, user, password, database, isolation_level=None, catalog='hive', port=9090, safe_rule=True):
def __init__(self, host, user, password, database, isolation_level=2, catalog='hive', port=9090, safe_rule=True):
'''[summary]
[init]
Expand Down Expand Up @@ -115,7 +115,7 @@ def get_conn(self):
raise
conn = connect(schema=self.database, user=self.user, password=self.password,
host=self.host, port=self.port, catalog=self.catalog,
# isolation_level=self.isolation_level # 如果使用事务模式,则不能(drop、select、create)混合使用
isolation_level=self.isolation_level # 如果使用事务模式,则不能(drop、select、create)混合使用
)
if not conn:
self.get_conn()
Expand Down Expand Up @@ -175,8 +175,12 @@ def execute(self, sql, count=None, ehandling='raise', verbose=0):
else:
pass

self._execute_step(cur, sql, ehandling=ehandling)
results = self.cur_results(cur, count)
try:
self._execute_step(cur, sql, ehandling=ehandling)
results = self.cur_results(cur, count)
except Exception:
conn.rollback()
break

if (action == 'SELECT' and (verbose or idx == sqls_length)) \
or (action == 'WITH' and idx == sqls_length):
Expand Down
8 changes: 6 additions & 2 deletions pydbapi/db/base.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# @Author: chunyang.xu
# @Email: [email protected]
# @Date: 2020-06-02 18:46:58
# @Last Modified time: 2022-11-14 19:22:45
# @Last Modified time: 2022-11-16 10:36:26
# @github: https://github.com/longfengpili

# !/usr/bin/env python3
Expand Down Expand Up @@ -113,7 +113,11 @@ def execute(self, sql, count=None, ehandling='raise', verbose=0):
else:
pass

self._execute_step(cur, sql, ehandling=ehandling)
try:
self._execute_step(cur, sql, ehandling=ehandling)
except Exception:
conn.rollback()
break

if (action == 'SELECT' and (verbose or idx == sqls_length)) \
or (action == 'WITH' and idx == sqls_length):
Expand Down
4 changes: 2 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# @Author: chunyang.xu
# @Email: [email protected]
# @Date: 2020-06-09 16:46:54
# @Last Modified time: 2022-11-15 16:43:03
# @Last Modified time: 2022-11-16 10:37:39
# @github: https://github.com/longfengpili

# !/usr/bin/env python3
Expand All @@ -12,7 +12,7 @@
import shutil
import setuptools

VERSION = '0.0.94'
VERSION = '0.0.95'
PROJECT_NAME = 'pydbapi'

with open('README.md', 'r', encoding='utf-8') as f:
Expand Down
48 changes: 45 additions & 3 deletions tests/trino/test_trino.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# @Author: longfengpili
# @Date: 2022-11-14 14:25:01
# @Last Modified by: longfengpili
# @Last Modified time: 2022-11-15 16:40:29
# @Last Modified time: 2022-11-16 10:34:42


import sys
Expand Down Expand Up @@ -42,9 +42,19 @@ def test_get_instance(self):
print(self.trinodb)
print(dir(self.trinodb))

def test_create_by_sql(self):
def test_create_by_sql1(self):
sql = '''
create table report_20000073_11.test_xu
(time varchar,
adid varchar,
dt varchar)
with (partitioned_by = ARRAY['dt']);
'''
rows, action, result = self.trinodb.execute(sql, verbose=1)
print(f"【rows】: {rows}, 【action】: {action}, 【result】: {result}")

def test_create_by_sql2(self):
sql = '''
drop table if exists report_20000073_11.test_xu;
create table if not exists report_20000073_11.test_xu as
with test as
(select *
Expand All @@ -62,6 +72,38 @@ def test_create_by_sql(self):
rows, action, result = self.trinodb.execute(sql, verbose=1)
print(f"【rows】: {rows}, 【action】: {action}, 【result】: {result}")

def test_insert_by_sql(self):
sql = '''
delete from report_20000073_11.test_xu;
with test as
(select *
from logs_thirdparty.adjust_callback
limit 10),
test1 as
(select time, adid, substring(time, 1, 10) as dt
from test
)
select * from test1
;
insert into report_20000073_11.test_xu
with test as
(select *
from logs_thirdparty.adjust_callback
limit 10),
test1 as
(select time, adid, substring(time, 1, 10) as dt
from test
)
select * from test1
;
'''
rows, action, result = self.trinodb.execute(sql, verbose=1)
print(f"【rows】: {rows}, 【action】: {action}, 【result】: {result}")

def test_drop_by_sql(self):
sql = '''
drop table report_20000073_11.test_xu
Expand Down

0 comments on commit 19bec30

Please sign in to comment.