Skip to content

Commit

Permalink
[优化]trino事务模式
Browse files Browse the repository at this point in the history
  • Loading branch information
longfengpili committed Nov 15, 2022
1 parent 15b7ce5 commit 3ac163e
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 12 deletions.
33 changes: 29 additions & 4 deletions pydbapi/api/trino.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# @Author: longfengpili
# @Date: 2022-11-14 14:17:02
# @Last Modified by: longfengpili
# @Last Modified time: 2022-11-14 19:31:14
# @Last Modified time: 2022-11-15 16:42:32


import re
Expand Down Expand Up @@ -56,11 +56,33 @@ def create(self, columns, partition=None):
class TrinoDB(DBMixin, DBFileExec):
_instance_lock = threading.Lock()

def __init__(self, host, user, password, database, catalog='hive', port=9090, safe_rule=True):
def __init__(self, host, user, password, database, isolation_level=None, catalog='hive', port=9090, safe_rule=True):
'''[summary]
[init]
Args:
host ([str]): [host]
user ([str]): [username]
password ([str]): [password]
database ([str]): [database]
isolation_level (number): [isolation_level] (default: `2`)
AUTOCOMMIT = 0 # 每个事务单独执行
READ_UNCOMMITTED = 1 # 脏读(dirty read),一个事务可以读取到另一个事务未提交的事务记录
READ_COMMITTED = 2 # 不可重复读(non-repeatable read),一个事务只能读取到已经提交的记录,不能读取到未提交的记录
REPEATABLE_READ = 3 # 幻读(phantom read),一个事务可以多次从数据库读取某条记录,而且多次读取的那条记录都是一致的,相同的
SERIALIZABLE = 4 # 事务执行时,会在所有级别上加锁,比如read和write时都会加锁,仿佛事务是以串行的方式进行的,而不是一起发生的。这会防止脏读、不可重复读和幻读的出现,但是,会带来性能的下降
数据库默认的隔离级别:mysql为可重复读,oracle为提交后读
catalog (str): [cataglog] (default: `'hive'`)
port (number): [port] (default: `9090`)
safe_rule (bool): [safe rule] (default: `True`)
'''

self.host = host
self.port = port
self.user = user
self.password = password
self.isolation_level = isolation_level
self.catalog = catalog
self.database = database
super(TrinoDB, self).__init__()
Expand Down Expand Up @@ -92,7 +114,9 @@ def get_conn(self):
mytrinologger.error(f"please add [trino] path in sys.path, error: {e}")
raise
conn = connect(schema=self.database, user=self.user, password=self.password,
host=self.host, port=self.port, catalog=self.catalog)
host=self.host, port=self.port, catalog=self.catalog,
# isolation_level=self.isolation_level # 如果使用事务模式,则不能(drop、select、create)混合使用
)
if not conn:
self.get_conn()
return conn
Expand Down Expand Up @@ -157,6 +181,7 @@ def execute(self, sql, count=None, ehandling='raise', verbose=0):
if (action == 'SELECT' and (verbose or idx == sqls_length)) \
or (action == 'WITH' and idx == sqls_length):
# columns, results = cur_getresults(cur, count)
# results = self.cur_results(cur, count)
desc, columns = self.cur_columns(cur)
if verbose and columns:
mytrinologger.info(f"\n{pd.DataFrame(results, columns=columns)}")
Expand All @@ -174,7 +199,7 @@ def execute(self, sql, count=None, ehandling='raise', verbose=0):
mytrinologger.error(e)
conn.rollback()

rows = len(results)
rows = len(results) if results else rows
conn.close()
return rows, action, results

Expand Down
4 changes: 2 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# @Author: chunyang.xu
# @Email: [email protected]
# @Date: 2020-06-09 16:46:54
# @Last Modified time: 2022-11-14 14:16:24
# @Last Modified time: 2022-11-15 16:43:03
# @github: https://github.com/longfengpili

# !/usr/bin/env python3
Expand All @@ -12,7 +12,7 @@
import shutil
import setuptools

VERSION = '0.0.93'
VERSION = '0.0.94'
PROJECT_NAME = 'pydbapi'

with open('README.md', 'r', encoding='utf-8') as f:
Expand Down
13 changes: 7 additions & 6 deletions tests/trino/test_trino.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# @Author: longfengpili
# @Date: 2022-11-14 14:25:01
# @Last Modified by: longfengpili
# @Last Modified time: 2022-11-15 09:51:24
# @Last Modified time: 2022-11-15 16:40:29


import sys
Expand All @@ -27,7 +27,7 @@ def setup_method(self, method):
GAME = os.environ.get('NEWGAME').lower()
self.game = json.loads(GAME.replace("'", '"'))
self.trinodb = TrinoDB(**self.game, safe_rule=False)
self.tablename = 'report_20000073_11.test_xu'
self.tablename = 'report_20000073_11.test_friuts'
self.id = ColumnModel('id', 'integer')
self.name = ColumnModel('name', 'varchar(1024)')
self.address = ColumnModel('address', 'varchar(1024)')
Expand All @@ -44,19 +44,20 @@ def test_get_instance(self):

def test_create_by_sql(self):
sql = '''
drop table report_20000073_11.test_xu;
create table report_20000073_11.test_xu as
drop table if exists report_20000073_11.test_xu;
create table if not exists report_20000073_11.test_xu as
with test as
(select *
from logs_thirdparty.adjust_callback
limit 10),
test1 as
(select time, adid
(select time, adid, substring(time, 1, 10) as dt
from test
)
select * from test1
;
'''
rows, action, result = self.trinodb.execute(sql, verbose=1)
print(f"【rows】: {rows}, 【action】: {action}, 【result】: {result}")
Expand All @@ -76,7 +77,7 @@ def test_select_by_sql(self):
limit 10),
test1 as
(select time, adid
(select time, adid, substring(time, 1, 10) as dt
from test
)
Expand Down

0 comments on commit 3ac163e

Please sign in to comment.