diff --git a/pydbapi/api/trino.py b/pydbapi/api/trino.py index df3dc10..946bcdb 100644 --- a/pydbapi/api/trino.py +++ b/pydbapi/api/trino.py @@ -2,7 +2,7 @@ # @Author: longfengpili # @Date: 2022-11-14 14:17:02 # @Last Modified by: longfengpili -# @Last Modified time: 2022-11-15 16:42:32 +# @Last Modified time: 2022-11-16 10:38:30 import re @@ -56,7 +56,7 @@ def create(self, columns, partition=None): class TrinoDB(DBMixin, DBFileExec): _instance_lock = threading.Lock() - def __init__(self, host, user, password, database, isolation_level=None, catalog='hive', port=9090, safe_rule=True): + def __init__(self, host, user, password, database, isolation_level=2, catalog='hive', port=9090, safe_rule=True): '''[summary] [init] @@ -115,7 +115,7 @@ def get_conn(self): raise conn = connect(schema=self.database, user=self.user, password=self.password, host=self.host, port=self.port, catalog=self.catalog, - # isolation_level=self.isolation_level # 如果使用事务模式,则不能(drop、select、create)混合使用 + isolation_level=self.isolation_level # 如果使用事务模式,则不能(drop、select、create)混合使用 ) if not conn: self.get_conn() @@ -175,8 +175,12 @@ def execute(self, sql, count=None, ehandling='raise', verbose=0): else: pass - self._execute_step(cur, sql, ehandling=ehandling) - results = self.cur_results(cur, count) + try: + self._execute_step(cur, sql, ehandling=ehandling) + results = self.cur_results(cur, count) + except Exception: + conn.rollback() + break if (action == 'SELECT' and (verbose or idx == sqls_length)) \ or (action == 'WITH' and idx == sqls_length): diff --git a/pydbapi/db/base.py b/pydbapi/db/base.py index bfd92e9..cc71eb7 100644 --- a/pydbapi/db/base.py +++ b/pydbapi/db/base.py @@ -1,7 +1,7 @@ # @Author: chunyang.xu # @Email: 398745129@qq.com # @Date: 2020-06-02 18:46:58 -# @Last Modified time: 2022-11-14 19:22:45 +# @Last Modified time: 2022-11-16 10:36:26 # @github: https://github.com/longfengpili # !/usr/bin/env python3 @@ -113,7 +113,11 @@ def execute(self, sql, count=None, ehandling='raise', verbose=0): else: pass - self._execute_step(cur, sql, ehandling=ehandling) + try: + self._execute_step(cur, sql, ehandling=ehandling) + except Exception: + conn.rollback() + break if (action == 'SELECT' and (verbose or idx == sqls_length)) \ or (action == 'WITH' and idx == sqls_length): diff --git a/setup.py b/setup.py index 0103a24..da62494 100644 --- a/setup.py +++ b/setup.py @@ -1,7 +1,7 @@ # @Author: chunyang.xu # @Email: 398745129@qq.com # @Date: 2020-06-09 16:46:54 -# @Last Modified time: 2022-11-15 16:43:03 +# @Last Modified time: 2022-11-16 10:37:39 # @github: https://github.com/longfengpili # !/usr/bin/env python3 @@ -12,7 +12,7 @@ import shutil import setuptools -VERSION = '0.0.94' +VERSION = '0.0.95' PROJECT_NAME = 'pydbapi' with open('README.md', 'r', encoding='utf-8') as f: diff --git a/tests/trino/test_trino.py b/tests/trino/test_trino.py index f89b057..34f18c4 100644 --- a/tests/trino/test_trino.py +++ b/tests/trino/test_trino.py @@ -2,7 +2,7 @@ # @Author: longfengpili # @Date: 2022-11-14 14:25:01 # @Last Modified by: longfengpili -# @Last Modified time: 2022-11-15 16:40:29 +# @Last Modified time: 2022-11-16 10:34:42 import sys @@ -42,9 +42,19 @@ def test_get_instance(self): print(self.trinodb) print(dir(self.trinodb)) - def test_create_by_sql(self): + def test_create_by_sql1(self): + sql = ''' + create table report_20000073_11.test_xu + (time varchar, + adid varchar, + dt varchar) + with (partitioned_by = ARRAY['dt']); + ''' + rows, action, result = self.trinodb.execute(sql, verbose=1) + print(f"【rows】: {rows}, 【action】: {action}, 【result】: {result}") + + def test_create_by_sql2(self): sql = ''' - drop table if exists report_20000073_11.test_xu; create table if not exists report_20000073_11.test_xu as with test as (select * @@ -62,6 +72,38 @@ def test_create_by_sql(self): rows, action, result = self.trinodb.execute(sql, verbose=1) print(f"【rows】: {rows}, 【action】: {action}, 【result】: {result}") + def test_insert_by_sql(self): + sql = ''' + delete from report_20000073_11.test_xu; + with test as + (select * + from logs_thirdparty.adjust_callback + limit 10), + + test1 as + (select time, adid, substring(time, 1, 10) as dt + from test + ) + + select * from test1 + ; + insert into report_20000073_11.test_xu + with test as + (select * + from logs_thirdparty.adjust_callback + limit 10), + + test1 as + (select time, adid, substring(time, 1, 10) as dt + from test + ) + + select * from test1 + ; + ''' + rows, action, result = self.trinodb.execute(sql, verbose=1) + print(f"【rows】: {rows}, 【action】: {action}, 【result】: {result}") + def test_drop_by_sql(self): sql = ''' drop table report_20000073_11.test_xu