From 696db89b38e8892ed93b12b4c04068047515307a Mon Sep 17 00:00:00 2001 From: longfengpili <398745129@qq.com> Date: Wed, 29 Nov 2023 14:07:14 +0800 Subject: [PATCH] =?UTF-8?q?[=E4=BC=98=E5=8C=96]log?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pydbapi/db/base.py | 502 ++++++++++++++++++++--------------------- pydbapi/db/fileexec.py | 2 +- 2 files changed, 252 insertions(+), 252 deletions(-) diff --git a/pydbapi/db/base.py b/pydbapi/db/base.py index 1020226..a186c2b 100644 --- a/pydbapi/db/base.py +++ b/pydbapi/db/base.py @@ -1,251 +1,251 @@ -# -*- coding: utf-8 -*- -# @Author: longfengpili -# @Date: 2023-06-02 15:27:41 -# @Last Modified by: longfengpili -# @Last Modified time: 2023-11-14 18:37:59 -# @github: https://github.com/longfengpili - - -import re -import sys -import pandas as pd - -from tqdm.contrib.logging import logging_redirect_tqdm - -from pydbapi.sql import SqlParse, SqlCompile -from pydbapi.conf import AUTO_RULES - -import logging -dblogger = logging.getLogger(__name__) - - -class DBbase(object): - - def __init__(self, *args, **kwargs): - pass - - def get_conn(self): - pass - - def prepare_sql_statements(self, sql, verbose): - if any("jupyter" in arg for arg in sys.argv): - from tqdm.notebook import tqdm - else: - from tqdm import tqdm - - sqls = SqlParse.split_sqls(sql) - # print(sqls) - sqls = [sql.strip() for sql in sqls if sql] - bar_format = '{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}] {postfix[0]}' - sqls = sqls if verbose <= 1 else tqdm(sqls, postfix=['START'], bar_format=bar_format) # 如果verbose>=2则显示进度条 - return sqls - - def _execute_step(self, cursor, sql, ehandling='raise'): - '''[summary] - - [description] - 在conn中执行单步sql - Arguments: - cursor {[cursor]} -- [游标] - sql {[str]} -- [sql] - - Raises: - ValueError -- [sql执行错误原因及SQL] - ''' - sql = re.sub(r'\s{2,}', '\n', sql) - try: - cursor.execute(sql) - except Exception as e: - dblogger.error(e) - if ehandling == 'raise': - raise ValueError(f"【Error】:{e}【Sql】:{sql}") - - def cur_results(self, cursor, count): - results = cursor.fetchmany(count) if count else cursor.fetchall() - results = list(results) if results else [] - return results - - def cur_columns(self, cursor): - desc = cursor.description - columns = tuple(map(lambda x: x[0].lower(), desc)) if desc else None # 列名 - - return desc, columns - - def fetch_query_results(self, cur, action, count, verbose): - desc, columns = self.cur_columns(cur) - results = self.cur_results(cur, count) if action in ['SELECT', 'WITH'] else None - - if (verbose == 1 or verbose >= 3) and results: - dblogger.info(f"\n{pd.DataFrame(results, columns=columns)}") - elif verbose and not columns: - dblogger.warning(f"Not Columns, cursor description is {desc}") - - if results: - results.insert(0, columns) - - return columns, results - - def handle_progress_logging(self, step, verbose, sqls): - if verbose == 1: - dblogger.info(step) - elif verbose >= 2: - sqls.postfix[0] = step - if verbose >= 3: - dblogger.info(step) - - def execute(self, sql, count=None, ehandling='raise', verbose=0): - '''[summary] - - [description] - 执行sql - Arguments: - sql {[str]} -- [sql] - - Keyword Arguments: - count {[int]} -- [返回的结果数量] (default: {None}) - ehandling {[str]} -- [错误处理] (raise: 错误弹出异常) - verbose {[int]} -- [打印进程状态] (0:不打印, 1:文字进度, 2:进度条) - - Returns: - rows {[int]} -- [影响的行数] - results {[list]} -- [返回的结果] - ''' - - conn = self.get_conn() - cur = conn.cursor() - sqls = self.prepare_sql_statements(sql, verbose) - try: - with logging_redirect_tqdm(): - for idx, _sql in enumerate(sqls): - parser = SqlParse(_sql) - comment, sql, action, tablename = parser.comment, parser.sql, parser.action, parser.tablename - if not sql: - # dblogger.info(f'【{idx:0>2d}_PROGRESS】 no run !!!\n{_sql}') - continue - - step = f"【{idx:0>2d}_PROGRESS】({action}){tablename}::{comment}" - self.handle_progress_logging(step, verbose, sqls) - self._execute_step(cur, sql, ehandling=ehandling) - - if idx + 1 == len(sqls) or action in ['SELECT', 'WITH']: - columns, results = self.fetch_query_results(cur, action, count, verbose) - - conn.commit() - except Exception as e: - dblogger.error(e) - conn.rollback() - raise e - finally: - cur.close() - conn.close() - - rows = cur.rowcount - rows = len(results[1:]) if rows == -1 and results else rows - return rows, action, results - - -class DBMixin(DBbase): - - def __init__(self): - self.auto_rules = AUTO_RULES - super(DBMixin, self).__init__() - - def _check_isauto(self, tablename): - '''[summary] - - [description] - 通过tablename控制是否可以通过python代码处理 - Arguments: - tablename {[str]} -- [表名] - ''' - if not self.auto_rules: - return True - for rule in self.auto_rules: - if rule in tablename: - return True - else: - raise Exception(f"【drop】 please drop [{tablename}] on workbench! Or add rule into auto_rules !") - return False - - def drop(self, tablename, verbose=0): - if self._check_isauto(tablename): - sqlcompile = SqlCompile(tablename) - sql_for_drop = sqlcompile.drop() - rows, action, result = self.execute(sql_for_drop, verbose=verbose) - dblogger.info(f'【{action}】{tablename} drop succeed !') - return rows, action, result - - def delete(self, tablename, condition, verbose=0): - if self._check_isauto(tablename): - sqlcompile = SqlCompile(tablename) - sql_for_delete = sqlcompile.delete(condition) - rows, action, result = self.execute(sql_for_delete, verbose=verbose) - dblogger.info(f'【{action}】{tablename} delete {rows} rows succeed !') - return rows, action, result - - def insert(self, tablename, columns, inserttype='value', values=None, chunksize=1000, - fromtable=None, condition=None, verbose=0): - if values: - vlength = len(values) - - if self._check_isauto(tablename): - sqlcompile = SqlCompile(tablename) - sql_for_insert = sqlcompile.insert(columns, inserttype=inserttype, values=values, - chunksize=chunksize, fromtable=fromtable, condition=condition) - rows, action, result = self.execute(sql_for_insert, verbose=verbose) - - if values and rows != (vlength % chunksize or chunksize): - raise Exception('Insert Error !!!') - - rows = vlength if values else rows - dblogger.info(f'【{action}】{tablename} insert {rows} rows succeed !') - return rows, action, result - - def get_columns(self, tablename, verbose=0): - sql = f"select * from {tablename} limit 1;" - rows, action, result = self.execute(sql, verbose=verbose) - columns = result[0] - columns = [c.lower() for c in columns] - return columns - - def select(self, tablename, columns, condition=None, verbose=0): - '''[summary] - - [description] - 执行select - Arguments: - tablename {[str]} -- [表名] - columns {[dict]} -- [列的信息] - - Keyword Arguments: - condition {[str]} -- [where中的表达式] (default: {None}) - - Returns: - rows[int] -- [影响的数量] - action[str] -- [sql表达式DML] - result[list] -- [结果, 第一个元素是列名] - ''' - sqlcompile = SqlCompile(tablename) - sql_for_select = sqlcompile.select_base(columns, condition=condition) - rows, action, result = self.execute(sql_for_select, verbose=verbose) - return rows, action, result - - def add_columns(self, tablename, columns, verbose=0): - old_columns = self.get_columns(tablename) - old_columns = set(old_columns) - new_columns = columns.new_cols - new_columns = set([col.strip() for col in new_columns.split(',')]) - # dblogger.info(f'{old_columns}, {new_columns}') - - if old_columns == new_columns: - dblogger.info(f'【{tablename}】columns not changed !') - if old_columns - new_columns: - raise Exception(f"【{tablename}】columns【{old_columns - new_columns}】 not set, should exists !") - if new_columns - old_columns: - sqlcompile = SqlCompile(tablename) - add_columns = new_columns - old_columns - for col_name in add_columns: - column = columns.get_column_by_name(col_name) - sql = sqlcompile.add_column(column.newname, column.coltype) - self.execute(sql, verbose=0) - dblogger.info(f'【{tablename}】add columns succeeded !【{new_columns - old_columns}】') +# -*- coding: utf-8 -*- +# @Author: longfengpili +# @Date: 2023-06-02 15:27:41 +# @Last Modified by: longfengpili +# @Last Modified time: 2023-11-29 13:58:55 +# @github: https://github.com/longfengpili + + +import re +import sys +import pandas as pd + +from tqdm.contrib.logging import logging_redirect_tqdm + +from pydbapi.sql import SqlParse, SqlCompile +from pydbapi.conf import AUTO_RULES + +import logging +dblogger = logging.getLogger(__name__) + + +class DBbase(object): + + def __init__(self, *args, **kwargs): + pass + + def get_conn(self): + pass + + def prepare_sql_statements(self, sql, verbose): + if any("jupyter" in arg for arg in sys.argv): + from tqdm.notebook import tqdm + else: + from tqdm import tqdm + + sqls = SqlParse.split_sqls(sql) + # print(sqls) + sqls = [sql.strip() for sql in sqls if sql] + bar_format = '{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}] {postfix[0]}' + sqls = sqls if verbose <= 1 else tqdm(sqls, postfix=['START'], bar_format=bar_format) # 如果verbose>=2则显示进度条 + return sqls + + def _execute_step(self, cursor, sql, ehandling='raise'): + '''[summary] + + [description] + 在conn中执行单步sql + Arguments: + cursor {[cursor]} -- [游标] + sql {[str]} -- [sql] + + Raises: + ValueError -- [sql执行错误原因及SQL] + ''' + sql = re.sub(r'\s{2,}', '\n', sql) + try: + cursor.execute(sql) + except Exception as e: + dblogger.error(e) + if ehandling == 'raise': + raise ValueError(f"【Error】:{e}【Sql】:{sql}") + + def cur_results(self, cursor, count): + results = cursor.fetchmany(count) if count else cursor.fetchall() + results = list(results) if results else [] + return results + + def cur_columns(self, cursor): + desc = cursor.description + columns = tuple(map(lambda x: x[0].lower(), desc)) if desc else None # 列名 + + return desc, columns + + def fetch_query_results(self, cur, action, count, verbose): + desc, columns = self.cur_columns(cur) + results = self.cur_results(cur, count) if action in ['SELECT', 'WITH'] else None + + if (verbose == 1 or verbose >= 3) and results: + dblogger.info(f"\n{pd.DataFrame(results, columns=columns)}") + elif verbose and not columns: + dblogger.warning(f"Not Columns, cursor description is {desc}") + + if results: + results.insert(0, columns) + + return columns, results + + def handle_progress_logging(self, step, verbose, sqls): + if verbose == 1: + dblogger.info(step) + elif verbose >= 2: + sqls.postfix[0] = step + if verbose >= 3: + dblogger.info(step) + + def execute(self, sql, count=None, ehandling='raise', verbose=0): + '''[summary] + + [description] + 执行sql + Arguments: + sql {[str]} -- [sql] + + Keyword Arguments: + count {[int]} -- [返回的结果数量] (default: {None}) + ehandling {[str]} -- [错误处理] (raise: 错误弹出异常) + verbose {[int]} -- [打印进程状态] (0:不打印, 1:文字进度, 2:进度条) + + Returns: + rows {[int]} -- [影响的行数] + results {[list]} -- [返回的结果] + ''' + + conn = self.get_conn() + cur = conn.cursor() + sqls = self.prepare_sql_statements(sql, verbose) + try: + with logging_redirect_tqdm(): + for idx, _sql in enumerate(sqls): + parser = SqlParse(_sql) + comment, sql, action, tablename = parser.comment, parser.sql, parser.action, parser.tablename + if not sql: + # dblogger.info(f'【{idx:0>2d}_PROGRESS】 no run !!!\n{_sql}') + continue + + step = f"【{idx:0>2d}_PROGRESS】({action}){tablename}::{comment}" + self.handle_progress_logging(step, verbose, sqls) + self._execute_step(cur, sql, ehandling=ehandling) + + if idx + 1 == len(sqls) or action in ['SELECT', 'WITH']: + columns, results = self.fetch_query_results(cur, action, count, verbose) + + conn.commit() + except Exception as e: + dblogger.error(e) + conn.rollback() + raise e + finally: + cur.close() + conn.close() + + rows = cur.rowcount + rows = len(results[1:]) if rows == -1 and results else rows + return rows, action, results + + +class DBMixin(DBbase): + + def __init__(self): + self.auto_rules = AUTO_RULES + super(DBMixin, self).__init__() + + def _check_isauto(self, tablename): + '''[summary] + + [description] + 通过tablename控制是否可以通过python代码处理 + Arguments: + tablename {[str]} -- [表名] + ''' + if not self.auto_rules: + return True + for rule in self.auto_rules: + if rule in tablename: + return True + else: + raise Exception(f"【drop】 please drop [{tablename}] on workbench! Or add rule into auto_rules !") + return False + + def drop(self, tablename, verbose=0): + if self._check_isauto(tablename): + sqlcompile = SqlCompile(tablename) + sql_for_drop = sqlcompile.drop() + rows, action, result = self.execute(sql_for_drop, verbose=verbose) + dblogger.info(f'【{action}】{tablename} drop succeed !') + return rows, action, result + + def delete(self, tablename, condition, verbose=0): + if self._check_isauto(tablename): + sqlcompile = SqlCompile(tablename) + sql_for_delete = sqlcompile.delete(condition) + rows, action, result = self.execute(sql_for_delete, verbose=verbose) + dblogger.info(f'【{action}】{tablename} delete {rows} rows succeed !') + return rows, action, result + + def insert(self, tablename, columns, inserttype='value', values=None, chunksize=1000, + fromtable=None, condition=None, verbose=0): + if values: + vlength = len(values) + + if self._check_isauto(tablename): + sqlcompile = SqlCompile(tablename) + sql_for_insert = sqlcompile.insert(columns, inserttype=inserttype, values=values, + chunksize=chunksize, fromtable=fromtable, condition=condition) + rows, action, result = self.execute(sql_for_insert, verbose=verbose) + + if values and rows != (vlength % chunksize or chunksize): + raise Exception('Insert Error !!!') + + rows = vlength if values else rows + dblogger.info(f'【{action}】{tablename} insert {rows} rows succeed !') + return rows, action, result + + def get_columns(self, tablename, verbose=0): + sql = f"select * from {tablename} limit 1;" + rows, action, result = self.execute(sql, verbose=verbose) + columns = result[0] + columns = [c.lower() for c in columns] + return columns + + def select(self, tablename, columns, condition=None, verbose=0): + '''[summary] + + [description] + 执行select + Arguments: + tablename {[str]} -- [表名] + columns {[dict]} -- [列的信息] + + Keyword Arguments: + condition {[str]} -- [where中的表达式] (default: {None}) + + Returns: + rows[int] -- [影响的数量] + action[str] -- [sql表达式DML] + result[list] -- [结果, 第一个元素是列名] + ''' + sqlcompile = SqlCompile(tablename) + sql_for_select = sqlcompile.select_base(columns, condition=condition) + rows, action, result = self.execute(sql_for_select, verbose=verbose) + return rows, action, result + + def add_columns(self, tablename, columns, verbose=0): + old_columns = self.get_columns(tablename) + old_columns = set(old_columns) + new_columns = columns.new_cols + new_columns = set([col.strip() for col in new_columns.split(',')]) + # dblogger.info(f'{old_columns}, {new_columns}') + + if old_columns == new_columns: + dblogger.info(f'【{tablename}】columns not changed !') + if old_columns - new_columns: + raise Exception(f"【{tablename}】columns【{old_columns - new_columns}】 not set, should exists !") + if new_columns - old_columns: + sqlcompile = SqlCompile(tablename) + add_columns = new_columns - old_columns + for col_name in add_columns: + column = columns.get_column_by_name(col_name) + sql = sqlcompile.add_column(column.newname, column.coltype) + self.execute(sql, verbose=0) + dblogger.info(f'【{tablename}】add columns succeeded !【{new_columns - old_columns}】') diff --git a/pydbapi/db/fileexec.py b/pydbapi/db/fileexec.py index b85902f..5528b33 100644 --- a/pydbapi/db/fileexec.py +++ b/pydbapi/db/fileexec.py @@ -2,7 +2,7 @@ # @Author: longfengpili # @Date: 2023-06-02 15:27:41 # @Last Modified by: longfengpili -# @Last Modified time: 2023-07-27 15:34:45 +# @Last Modified time: 2023-11-29 14:01:56 # @github: https://github.com/longfengpili