From 3ac163e0dcc0ac846356b0422922fa16b00fc0ac Mon Sep 17 00:00:00 2001 From: longfengpili <398745129@qq.com> Date: Tue, 15 Nov 2022 16:43:36 +0800 Subject: [PATCH] =?UTF-8?q?[=E4=BC=98=E5=8C=96]trino=E4=BA=8B=E5=8A=A1?= =?UTF-8?q?=E6=A8=A1=E5=BC=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pydbapi/api/trino.py | 33 +++++++++++++++++++++++++++++---- setup.py | 4 ++-- tests/trino/test_trino.py | 13 +++++++------ 3 files changed, 38 insertions(+), 12 deletions(-) diff --git a/pydbapi/api/trino.py b/pydbapi/api/trino.py index 25f4759..df3dc10 100644 --- a/pydbapi/api/trino.py +++ b/pydbapi/api/trino.py @@ -2,7 +2,7 @@ # @Author: longfengpili # @Date: 2022-11-14 14:17:02 # @Last Modified by: longfengpili -# @Last Modified time: 2022-11-14 19:31:14 +# @Last Modified time: 2022-11-15 16:42:32 import re @@ -56,11 +56,33 @@ def create(self, columns, partition=None): class TrinoDB(DBMixin, DBFileExec): _instance_lock = threading.Lock() - def __init__(self, host, user, password, database, catalog='hive', port=9090, safe_rule=True): + def __init__(self, host, user, password, database, isolation_level=None, catalog='hive', port=9090, safe_rule=True): + '''[summary] + + [init] + + Args: + host ([str]): [host] + user ([str]): [username] + password ([str]): [password] + database ([str]): [database] + isolation_level (number): [isolation_level] (default: `2`) + AUTOCOMMIT = 0 # 每个事务单独执行 + READ_UNCOMMITTED = 1 # 脏读(dirty read),一个事务可以读取到另一个事务未提交的事务记录 + READ_COMMITTED = 2 # 不可重复读(non-repeatable read),一个事务只能读取到已经提交的记录,不能读取到未提交的记录 + REPEATABLE_READ = 3 # 幻读(phantom read),一个事务可以多次从数据库读取某条记录,而且多次读取的那条记录都是一致的,相同的 + SERIALIZABLE = 4 # 事务执行时,会在所有级别上加锁,比如read和write时都会加锁,仿佛事务是以串行的方式进行的,而不是一起发生的。这会防止脏读、不可重复读和幻读的出现,但是,会带来性能的下降 + 数据库默认的隔离级别:mysql为可重复读,oracle为提交后读 + catalog (str): [cataglog] (default: `'hive'`) + port (number): [port] (default: `9090`) + safe_rule (bool): [safe rule] (default: `True`) + ''' + self.host = host self.port = port self.user = user self.password = password + self.isolation_level = isolation_level self.catalog = catalog self.database = database super(TrinoDB, self).__init__() @@ -92,7 +114,9 @@ def get_conn(self): mytrinologger.error(f"please add [trino] path in sys.path, error: {e}") raise conn = connect(schema=self.database, user=self.user, password=self.password, - host=self.host, port=self.port, catalog=self.catalog) + host=self.host, port=self.port, catalog=self.catalog, + # isolation_level=self.isolation_level # 如果使用事务模式,则不能(drop、select、create)混合使用 + ) if not conn: self.get_conn() return conn @@ -157,6 +181,7 @@ def execute(self, sql, count=None, ehandling='raise', verbose=0): if (action == 'SELECT' and (verbose or idx == sqls_length)) \ or (action == 'WITH' and idx == sqls_length): # columns, results = cur_getresults(cur, count) + # results = self.cur_results(cur, count) desc, columns = self.cur_columns(cur) if verbose and columns: mytrinologger.info(f"\n{pd.DataFrame(results, columns=columns)}") @@ -174,7 +199,7 @@ def execute(self, sql, count=None, ehandling='raise', verbose=0): mytrinologger.error(e) conn.rollback() - rows = len(results) + rows = len(results) if results else rows conn.close() return rows, action, results diff --git a/setup.py b/setup.py index 1e75844..0103a24 100644 --- a/setup.py +++ b/setup.py @@ -1,7 +1,7 @@ # @Author: chunyang.xu # @Email: 398745129@qq.com # @Date: 2020-06-09 16:46:54 -# @Last Modified time: 2022-11-14 14:16:24 +# @Last Modified time: 2022-11-15 16:43:03 # @github: https://github.com/longfengpili # !/usr/bin/env python3 @@ -12,7 +12,7 @@ import shutil import setuptools -VERSION = '0.0.93' +VERSION = '0.0.94' PROJECT_NAME = 'pydbapi' with open('README.md', 'r', encoding='utf-8') as f: diff --git a/tests/trino/test_trino.py b/tests/trino/test_trino.py index 37384d2..f89b057 100644 --- a/tests/trino/test_trino.py +++ b/tests/trino/test_trino.py @@ -2,7 +2,7 @@ # @Author: longfengpili # @Date: 2022-11-14 14:25:01 # @Last Modified by: longfengpili -# @Last Modified time: 2022-11-15 09:51:24 +# @Last Modified time: 2022-11-15 16:40:29 import sys @@ -27,7 +27,7 @@ def setup_method(self, method): GAME = os.environ.get('NEWGAME').lower() self.game = json.loads(GAME.replace("'", '"')) self.trinodb = TrinoDB(**self.game, safe_rule=False) - self.tablename = 'report_20000073_11.test_xu' + self.tablename = 'report_20000073_11.test_friuts' self.id = ColumnModel('id', 'integer') self.name = ColumnModel('name', 'varchar(1024)') self.address = ColumnModel('address', 'varchar(1024)') @@ -44,19 +44,20 @@ def test_get_instance(self): def test_create_by_sql(self): sql = ''' - drop table report_20000073_11.test_xu; - create table report_20000073_11.test_xu as + drop table if exists report_20000073_11.test_xu; + create table if not exists report_20000073_11.test_xu as with test as (select * from logs_thirdparty.adjust_callback limit 10), test1 as - (select time, adid + (select time, adid, substring(time, 1, 10) as dt from test ) select * from test1 + ; ''' rows, action, result = self.trinodb.execute(sql, verbose=1) print(f"【rows】: {rows}, 【action】: {action}, 【result】: {result}") @@ -76,7 +77,7 @@ def test_select_by_sql(self): limit 10), test1 as - (select time, adid + (select time, adid, substring(time, 1, 10) as dt from test )