diff --git a/cup/ChangeLog b/cup/ChangeLog index 57ed17b..bb162f6 100644 --- a/cup/ChangeLog +++ b/cup/ChangeLog @@ -1,4 +1,21 @@ +Version 1.6.1 - starting from 2018.2.5 ~ 2018.6.1 + * [New] cup.shell.is_proc_alive - Add optional to abandon vim|less|vi|tail|cat|more or custom filter + * [Bug] cup.shell.get_pid - Fix grep to surely abandon vim|less|vi|tail|cat|more + * [New] cup.log - Add support for stack manipulation, which can pop out function calls. + * [New] cup.err - Add UnImplemented exception class. + * [New] cup.exfile - Support temp files which will be removed immediately after the variable life ends. + * [Enhancement] cup.util.conf - support $ in a conf key + * [Doc] cup.shell - Fix doc bug. + * [New] cup.shell - Add grep support string with space + * [New] cup.storage.obj - Support common object storage apis including ftp, s3 + * [Bug] cup.res.linux - Getting cpuinfo has bugs (new kernel 3.10) + * [Enhancement] - cup.util.threadpool, add daemon_threads as the parameter + that you can use to let the threadpool threads behave like daemon-thread + (when the main thread exits, it exits as well) + * [Enhancement] - cup.util.conf - support conf line "[.test] # comments" + Version 1.6.0 - starting from 2017.9.6 ~ 2017.12.29 + * [New] cup.bidu.icafe - interact with baidu icafe. * [New] MsgBroker - Add a broker for handling system failures * [New] cup. * [Bug] Linux Resource Query Bug - related to data columuns diff --git a/cup/cache.py b/cup/cache.py index 52adc7a..018aa93 100644 --- a/cup/cache.py +++ b/cup/cache.py @@ -2,7 +2,7 @@ # -*- coding: utf-8 -* # ############################################################################# # -# Copyright (c) 2014 Baidu.com, Inc. All Rights Reserved +# Copyright (c) Baidu.com, Inc. All Rights Reserved # # ############################################################################# """ diff --git a/cup/err.py b/cup/err.py index 37e6091..e9feb62 100644 --- a/cup/err.py +++ b/cup/err.py @@ -16,7 +16,8 @@ __all__ = [ 'BaseCupException', 'DecoratorException', 'LoggerException', 'ResException', 'NoSuchProcess', 'AccessDenied', 'NetException', - 'AsyncMsgError', 'ThreadTermException', 'LockFileError' + 'AsyncMsgError', 'ThreadTermException', 'LockFileError', + 'NotImplementedYet' ] @@ -154,4 +155,23 @@ def __init__(self, expect, got): msg = 'expect failure, expect {0}, got {1}'.format(expect, got) BaseCupException.__init__(self, msg) + +class NotImplementedYet(BaseCupException): + """ + Not implemented yet + """ + def __init__(self, msg=''): + msg = 'The functionality is not implemented yet, {0}'.format(msg) + BaseCupException.__init__(self, msg) + + +class ConfigError(BaseCupException): + """ + ConfigError + """ + def __init__(self, msg=''): + msg = 'Configuration Error: {0}'.format(msg) + BaseCupException.__init__(self, msg) + + # vi:set tw=0 ts=4 sw=4 nowrap fdm=indent diff --git a/cup/exfile.py b/cup/exfile.py index 544937b..1ae9414 100644 --- a/cup/exfile.py +++ b/cup/exfile.py @@ -17,10 +17,14 @@ from cup import err from cup import decorators +from cup import platforms + +if platforms.is_linux(): + import tempfile __all__ = [ 'LockFile', 'FILELOCK_SHARED', 'FILELOCK_EXCLUSIVE', - 'FILELOCK_NONBLOCKING', 'FILELOCK_UNLOCK' + 'FILELOCK_NONBLOCKING', 'FILELOCK_UNLOCK', 'TempFile' ] @@ -116,9 +120,51 @@ def unlock(self): raise err.LockFileError(error) -# TODO tempfile -# class TempFile(object): -# """ -# tempfile -# """ +class TempFile(object): + """ + tempfile, the temp file will be deleted immediately after the lifetime. + + You can use TempFile like the original Python File Object. + + :: + + tmp = TempFile('./') + tmp.write / read /seek / etc + tmp.close() + """ + def __init__(self, filedir, prefix='', suffix=''): + """ + :param filedir: + temp file dir which contains the temp file + + :prefix: + prefix of the temp filename + + :suffix: + suffix of the file name. e.g. '.tmp'. + """ + self._fdir = filedir + self._fp = None + self._prefix = prefix + self._suffix = suffix + + def __enter__(self): + """enter""" + if platforms.is_linux(): + self._fp = tempfile.TemporaryFile( + dir=self._fdir, + prefix=self._prefix, + suffix=self._suffix + ) + return self._fp + else: + raise err.NotImplementedYet('TemporaryFile') + + def __exit__(self, exc_type, exc_val, exc_tb): + """ + exit + """ + if platforms.is_linux(): + self._fp.close() + # vi:set tw=0 ts=4 sw=4 nowrap fdm=indent diff --git a/cup/log.py b/cup/log.py index ba0eec7..943ae13 100644 --- a/cup/log.py +++ b/cup/log.py @@ -17,7 +17,9 @@ 'debug', 'info', 'warn', 'critical', 'init_comlog', 'setloglevel', 'ROTATION', 'INFINITE', - 'reinit_comlog', 'get_inited_loggername', 'parse' + 'reinit_comlog', 'get_inited_loggername', 'parse', + 'backtrace_info', 'backtrace_debug', 'backtrace_error', + 'backtrace_critical' ] @@ -336,78 +338,78 @@ def _fail_handle(msg, e): print '%s\nerror:%s' % (msg, e) -# def info(msg, back_trace_len=0): -# """ -# logging.INFO的日志打印 -# """ -# try: -# msg = _log_file_func_info(msg, back_trace_len) -# loggerman = _LoggerMan() -# loggerman._getlogger().info(msg) -# except err.LoggerException: -# return -# except Exception as e: -# _fail_handle(msg, e) -# -# -# def debug(msg, back_trace_len=0): -# """ -# :param msg: -# logging.DEBUG级别的日志打印。 -# :param back_trace_len: -# 为扩展预留的参数, 正常使用可忽略。 -# -# """ -# try: -# msg = _log_file_func_info(msg, back_trace_len) -# loggerman = _LoggerMan() -# loggerman._getlogger().debug(msg) -# except err.LoggerException: -# return -# except Exception as e: -# _fail_handle(msg, e) -# -# -# def warn(msg, back_trace_len=0): -# """ -# logging.WARN级别的日志打印 -# """ -# try: -# msg = _log_file_func_info(msg, back_trace_len) -# loggerman = _LoggerMan() -# loggerman._getlogger().warn(msg) -# except err.LoggerException: -# return -# except Exception as e: -# _fail_handle(msg, e) -# -# -# def error(msg, back_trace_len=0): -# """ -# logging.ERROR级别的日志打印 -# """ -# try: -# msg = _log_file_func_info(msg, back_trace_len) -# loggerman = _LoggerMan() -# loggerman._getlogger().error(msg) -# except err.LoggerException: -# return -# except Exception as error: -# _fail_handle(msg, error) -# -# -# def critical(msg, back_trace_len=0): -# """ -# logging.CRITICAL级别的日志打印 -# """ -# try: -# msg = _log_file_func_info(msg, back_trace_len) -# loggerman = _LoggerMan() -# loggerman._getlogger().critical(msg) -# except err.LoggerException: -# return -# except Exception as e: -# _fail_handle(msg, e) +def backtrace_info(msg, back_trace_len=0): + """ + logging.INFO的日志打印 + """ + try: + msg = _log_file_func_info(msg, back_trace_len) + loggerman = _LoggerMan() + loggerman._getlogger().info(msg) + except err.LoggerException: + return + except Exception as e: + _fail_handle(msg, e) + + +def backtrace_debug(msg, back_trace_len=0): + """ + :param msg: + logging.DEBUG级别的日志打印。 + :param back_trace_len: + 为扩展预留的参数, 正常使用可忽略。 + + """ + try: + msg = _log_file_func_info(msg, back_trace_len) + loggerman = _LoggerMan() + loggerman._getlogger().debug(msg) + except err.LoggerException: + return + except Exception as e: + _fail_handle(msg, e) + + +def backtrace_warn(msg, back_trace_len=0): + """ + logging.WARN级别的日志打印 + """ + try: + msg = _log_file_func_info(msg, back_trace_len) + loggerman = _LoggerMan() + loggerman._getlogger().warn(msg) + except err.LoggerException: + return + except Exception as e: + _fail_handle(msg, e) + + +def backtrace_error(msg, back_trace_len=0): + """ + logging.ERROR级别的日志打印 + """ + try: + msg = _log_file_func_info(msg, back_trace_len) + loggerman = _LoggerMan() + loggerman._getlogger().error(msg) + except err.LoggerException: + return + except Exception as error: + _fail_handle(msg, error) + + +def backtrace_critical(msg, back_trace_len=0): + """ + logging.CRITICAL级别的日志打印 + """ + try: + msg = _log_file_func_info(msg, back_trace_len) + loggerman = _LoggerMan() + loggerman._getlogger().critical(msg) + except err.LoggerException: + return + except Exception as e: + _fail_handle(msg, e) def setloglevel(logginglevel): diff --git a/cup/net/async/conn.py b/cup/net/async/conn.py index a1b76d1..e764173 100644 --- a/cup/net/async/conn.py +++ b/cup/net/async/conn.py @@ -35,6 +35,7 @@ import cup from cup import log +from cup import err as cuperr from cup.util import misc from cup.util import threadpool from cup.services import executor @@ -917,7 +918,7 @@ def _do_write(self, context): succ_len = sock.send(data) log.debug('succeed to send length:%d' % succ_len) msg.seek_write(succ_len) - except err.AsyncMsgError as error: + except cuperr.AsyncMsgError as error: log.debug('has seek out of msg len, continue') except socket.error as error: err = error.args[0] @@ -981,7 +982,7 @@ def add_write_job(self, context): log.info('failed to get peerinfo, return') return if not context.try_writelock(): - log.info( + log.debug( 'Another thread is writing the context, return. ' 'Peerinfo:%s:%s' % (peerinfo[0], peerinfo[1]) diff --git a/cup/res/linux.py b/cup/res/linux.py index 4e39990..cb043c8 100644 --- a/cup/res/linux.py +++ b/cup/res/linux.py @@ -15,15 +15,16 @@ """ import os -import errno -import sys import re -import collections -import warnings +import sys import time +import errno import socket import base64 import struct +import threading +import warnings +import collections import cup @@ -81,11 +82,8 @@ def get_kernel_version(): e.g.('2', '6', '32'): """ - reg = re.compile(r'\d{1}\.\d{1,2}\.\d{1,3}') - cmd = '/bin/uname -a' - versions = cup.shell.execshell_withpipe_str(cmd, False) - version = reg.findall(versions)[0] - + versions = os.uname()[2] + version = versions[0: versions.find('_')] return tuple([int(info) for info in version.split('.')]) @@ -193,10 +191,17 @@ class MemInfo(collections.namedtuple('vmem', ' '.join([ ] -def _get_cpu_columns(): - version = get_kernel_version() - if version >= (2, 6, 33): - _CPU_COLUMNS.append('guest_nice') +# def _get_cpu_columns(): +# version = get_kernel_version() +# if version >= (2, 6, 33): +# _CPU_COLUMNS.append('guest_nice') + +_COLUMN_LOCK = threading.Lock() + +_COLUMN_LOCK.acquire() +if get_kernel_version() >= (2, 6, 33) and _CPU_COLUMNS.count('guest_nice') <= 0: + _CPU_COLUMNS.append('guest_nice') +_COLUMN_LOCK.release() class CPUInfo(collections.namedtuple('CPUInfo', _CPU_COLUMNS)): diff --git a/cup/services/heartbeat.py b/cup/services/heartbeat.py index 114c37e..c0f221d 100644 --- a/cup/services/heartbeat.py +++ b/cup/services/heartbeat.py @@ -92,7 +92,7 @@ class LinuxHost(Device): """ a linux host resource """ - def __init__(self, name, init_this_host=False, iface='eth0'): + def __init__(self, name, init_this_host=False, iface='eth0', port=0): """ :param name: name of the LinuxHost @@ -109,6 +109,7 @@ def __init__(self, name, init_this_host=False, iface='eth0'): self._dict_info = { 'iface': iface, 'ipaddr': '0.0.0.0', + 'port': 0, 'hostname': net.get_local_hostname(), 'cpu_idle': -1, 'mem_inuse': -1, # MB @@ -119,6 +120,7 @@ def __init__(self, name, init_this_host=False, iface='eth0'): if init_this_host: self._dict_info['ipaddr'] = net.get_hostip() + self._dict_info['port'] = port cpuinfo = linux.get_cpu_usage(1) meminfo = linux.get_meminfo() self._dict_info['net_in'] = linux.get_net_recv_speed( @@ -168,6 +170,12 @@ def get_ip(self): """ return self._dict_info['ipaddr'] + def get_ip_port(self): + """ + return ip:port + """ + return self._dict_info['ipaddr'] + ':' + str(self._dict_info['port']) + def set_cpu_idle(self, idle_rate): """ set cpu idle rate diff --git a/cup/shell/__init__.py b/cup/shell/__init__.py index d8e752f..f3181b0 100644 --- a/cup/shell/__init__.py +++ b/cup/shell/__init__.py @@ -2,16 +2,12 @@ # -*- coding: utf-8 -* # ############################################################################# # -# Copyright (c) 2014 Baidu.com, Inc. All Rights Reserved +# Copyright (c) Baidu.com, Inc. All Rights Reserved # # ############################################################################# """ :author: Guannan Ma -:create_date: - 2014 -:last_date: - 2017/01/24 11:42:03 :descrition: shell related module """ @@ -74,7 +70,8 @@ def __init__(self): class ShellExec(object): # pylint: disable=R0903 """ 用来执行shell的类。 用法如下: - shellexec = cup.shell.ShellExec() + from cup import shell + shellexec = shell.ShellExec() # timeout=None, 一直等待直到命令执行完 shellexec.run('/bin/ls', timeout=None) # timeout>=0, 等待固定时间,如超时未结束terminate这个shell命令。 @@ -339,16 +336,29 @@ def execshell_withpipe_exwitherr(cmd, b_printcmd=True): return lines -def is_proc_alive(procname, is_whole_word=False): +def is_proc_alive(procname, is_whole_word=False, is_server_tag=False, filters=False): """ 通过ps -ef|grep -w procname$ |grep -v grep|wc -l 判断进程是否存在 相关函数有: cup.oper.is_proc_exist(path, name) """ # print procName if is_whole_word: - cmd = 'ps -ef|grep -w %s$ |grep -v grep|wc -l' % procname + cmd = "ps -ef|grep -w '%s'$ |grep -v grep" % procname else: - cmd = 'ps -ef|grep -w %s |grep -v grep|wc -l' % procname + cmd = "ps -ef|grep -w '%s' |grep -v grep" % procname + + if is_server_tag: + cmd += '|grep -vwE "vim |less |vi |tail |cat |more "' + + if filters: + if type(filters) == str: + cmd += "|grep -v '%s'" % filters + elif type(filters) == list: + for i, task in enumerate(filters): + cmd += "|grep -v '%s'" % task + + cmd += '|wc -l' + # print cmd rev = execshell_withpipe_str(cmd, False) if int(rev) > 0: @@ -480,7 +490,7 @@ def get_pid(process_path, grep_string): """ cmd = ( - 'ps -ef|grep %s|grep -v grep|grep -vE "^[vim|less|vi|tail|cat|more] "' + 'ps -ef|grep \'%s\'|grep -v grep|grep -vwE "vim |less |vi |tail |cat |more "' '|awk \'{print $2}\'' ) % (grep_string) ret = cup.shell.ShellExec().run(cmd, 10) diff --git a/cup/storage/__init__.py b/cup/storage/__init__.py new file mode 100644 index 0000000..75654d5 --- /dev/null +++ b/cup/storage/__init__.py @@ -0,0 +1,16 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -* +# ############################################################# +# +# Copyright (c) Baidu.com, Inc. All Rights Reserved +# +# ############################################################# +""" +:authors: + Guannan Ma maguannan @mythmgn +:description: + +""" + + +# vi:set tw=0 ts=4 sw=4 nowrap fdm=indent diff --git a/cup/storage/obj.py b/cup/storage/obj.py new file mode 100644 index 0000000..de54bca --- /dev/null +++ b/cup/storage/obj.py @@ -0,0 +1,471 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -* +# ############################################################# +# +# Copyright (c) Baidu.com, Inc. All Rights Reserved +# +# ############################################################# +""" +:authors: + Guannan Ma maguannan @mythmgn +:description: + + """ + +import abc +import logging + +from cup import log +from cup import err + +class ObjectInterface(object): + """ + object interface, abstract class. Should not be used directly + """ + __metaclass__ = abc.ABCMeta + def __init__(self, config): + """ + :param config: + be complied with cup.util.conf.Configure2Dict().get_dict(). + Shoule be dict like object + """ + self._config = config + + def _validate_config(self, config, keys): + """validate config if there's any missing items""" + ret = True + for key in keys: + if not key in config: + ret = False + + return ret + + @abc.abstractmethod + def put(self, dest, localfile): + """ + :param dest: + system path + :param localfile: + localfile + + :return: + { + 'returncode': 0 for success, others for failure, + 'msg': 'if any' + + } + """ + + @abc.abstractmethod + def delete(self, path): + """ + delete a file + + :param path: + object path + + :return: + { + 'returncode': 0 for success, others for failure, + 'msg': 'if any' + + } + """ + + + @abc.abstractmethod + def get(self, path, localpath): + """ + get the object into localpath + :return: + { + 'returncode': 0 for success, others for failure, + 'msg': 'if any' + + } + + """ + + @abc.abstractmethod + def head(self, path): + """ + get the object info + :return: + { + 'returncode': 0 for success, others for failure, + 'msg': 'if any' + 'objectinfo': { + size: 1024, + ....... + } + + } + """ + + @abc.abstractmethod + def mkdir(self, path): + """ + mkdir dir of a path + :return: + { + 'returncode': 0 for success, others for failure, + 'msg': 'if any' + 'objectinfo': { + size: 1024, + ....... + } + + } + """ + + @abc.abstractmethod + def rmdir(self, path): + """rmdir of a path""" + + +class AFSObjectSystem(ObjectInterface): + """ + afs object + """ + def __init__(self, config): + """ + :param config: + be complied with cup.util.conf.Configure2Dict().get_dict(). + Shoule be dict like object + """ + ObjectInterface.__init__(self, config) + + def put(self, dest, localfile): + """ + :param dest: + system path + :param localfile: + localfile + + :return: + { + 'returncode': 0 for success, others for failure, + 'msg': 'if any' + + } + """ + + def delete(self, path): + """ + delete a file + + :param path: + object path + + :return: + { + 'returncode': 0 for success, others for failure, + 'msg': 'if any' + + } + """ + + + def get(self, path, localpath): + """ + get the object into localpath + :return: + { + 'returncode': 0 for success, others for failure, + 'msg': 'if any' + + } + + """ + + def head(self, path): + """ + get the object info + :return: + { + 'returncode': 0 for success, others for failure, + 'msg': 'if any' + 'objectinfo': { + size: 1024, + ....... + } + + } + """ + + def mkdir(self, path): + """ + mkdir dir of a path + :return: + { + 'returncode': 0 for success, others for failure, + 'msg': 'if any' + 'objectinfo': { + size: 1024, + ....... + } + + } + """ + + def rmdir(self, path): + """rmdir of a path""" + + +# pylint: disable=R0902 +# need to have so many +class S3ObjectSystem(ObjectInterface): + """ + s3 object system + """ + def __init__(self, config): + """ + :param config: + be complied with cup.util.conf.Configure2Dict().get_dict(). + Shoule be dict like object + + :raise: + cup.err.ConfigError if there's any config item missing + """ + ObjectInterface.__init__(self, config) + required_keys = ['ak', 'sk', 'endpoint', 'bucket'] + if not self._validate_config(config, required_keys): + raise err.ConfigError(str(required_keys)) + self._config = config + self._ak = self._config['ak'] + self._sk = self._config['sk'] + self._endpoint = self._config['endpoint'] + self._bucket = self._config['bucket'] + import boto3 + from botocore import exceptions + from botocore import client as coreclient + self._s3_config = coreclient.Config( + signature_version='s3v4', + s3={'addressing_style': 'path'} + ) + logging.getLogger('boto3').setLevel(logging.INFO) + logging.getLogger('botocore').setLevel(logging.INFO) + logging.getLogger('s3transfer').setLevel(logging.INFO) + log.info('to connect to boto3') + self.__s3conn = boto3.client( + 's3', + aws_access_key_id=self._ak, + aws_secret_access_key=self._sk, + endpoint_url=self._endpoint, + # region_name=conf_dict['region_name'], + config=self._s3_config + ) + self._exception = exceptions.ClientError + + def put(self, dest, localfile): + """ + :param dest: + system path + :param localfile: + localfile + + :return: + { + 'returncode': 0 for success, others for failure, + 'msg': 'if any' + + } + """ + ret = { + 'returncode': -1, + 'msg': 'failed to put object' + } + with open(localfile, 'r') as fhandle: + try: + self.__s3conn.put_object( + Key='{0}'.format(dest), + Bucket=self._bucket, + Body=fhandle + ) + ret['returncode'] = 0 + ret['msg'] = 'success' + except self._exception as error: + ret['returncode'] = -1 + ret['msg'] = str(error) + return ret + + def delete(self, path): + """ + delete a file + + :param path: + object path + + :return: + { + 'returncode': 0 for success, others for failure, + 'msg': 'if any' + + } + """ + ret = { + 'returncode': -1, + 'msg': 'failed to put object' + } + try: + self.__s3conn.delete_object( + Key='{0}'.format(path), + Bucket=self._bucket + ) + except self._exception as error: + ret['returncode'] = -1 + ret['msg'] = str(error) + return ret + + def get(self, path, localpath): + """ + get the object into localpath + :return: + { + 'returncode': 0 for success, others for failure, + 'msg': 'if any' + + } + + """ + ret = { + 'returncode': -1, + 'msg': 'failed to put object' + } + try: + with open(localpath, 'w+') as fhandle: + resp = self.__s3conn.get_object( + Key='{0}'.format(path), + Bucket=self._bucket + ) + fhandle.write(resp['Body'].read()) + except Exception as error: + ret['returncode'] = -1 + ret['msg'] = str(error) + return ret + + def head(self, path): + """ + get the object info + :return: + { + 'returncode': 0 for success, others for failure, + 'msg': 'if any' + 'objectinfo': { + size: 1024, + ....... + } + + } + """ + ret = { + 'returncode': -1, + 'msg': 'failed to put object' + } + try: + resp = self.__s3conn.head_object( + Key='{0}'.format(path), + Bucket=self._bucket + ) + ret['objectinfo'] = resp + ret['returncode'] = 0 + ret['msg'] = 'success' + except self._exception as error: + ret['returncode'] = -1 + ret['msg'] = str(error) + return ret + + def mkdir(self, path): + """ + mkdir dir of a path + :return: + { + 'returncode': 0 for success, others for failure, + 'msg': 'if any' + 'objectinfo': { + size: 1024, + ....... + } + } + """ + raise err.NotImplementedYet('mkdir not supported for S3ObjectSystem') + + def rmdir(self, path): + """rmdir of a path""" + raise err.NotImplementedYet('rmdir not supported for S3ObjectSystem') + + def create_bucket(self, bucket_name): + """create bucket""" + ret = { + 'returncode': -1, + 'msg': 'failed to create bucket' + } + try: + resp = self.__s3conn.create_bucket( + Bucket=bucket_name + ) + ret['returncode'] = 0 + ret['msg'] = 'success' + except self._exception as error: + ret['returncode'] = -1 + ret['msg'] = str(error) + return ret + + def head_bucket(self, bucket_name): + """create bucket""" + ret = { + 'returncode': -1, + 'msg': 'failed to create bucket', + 'bucket_info': None + } + try: + resp = self.__s3conn.head_bucket( + Bucket=bucket_name + ) + ret['returncode'] = 0 + ret['msg'] = 'success' + ret['bucket_info'] = resp + except self._exception as error: + ret['returncode'] = -1 + ret['msg'] = str(error) + return ret + + def delete_bucket(self, bucket_name, forcely=False): + """delete bucket + + :param forcely: + if forcely is True, the bucket will be delete no matter it has + objects inside. Otherwise, you have to delete items inside, + then delete the bucket + + """ + ret = { + 'returncode': -1, + 'msg': 'failed to create bucket' + } + try: + if forcely: + resp = self.head_bucket(bucket_name) + res = self.__s3conn.list_objects(Bucket=bucket_name) + if 'Contents' in res: + for obj in res['Contents']: + try: + self.__s3conn.delete_object( + Bucket=bucket_name, + Key=obj['Key'] + ) + except Exception as error: + ret['msg'] = 'faield to delete obj in bucket' + return ret + resp = self.__s3conn.delete_bucket( + Bucket=bucket_name + ) + ret['returncode'] = 0 + ret['msg'] = 'success' + except self._exception as error: + ret['returncode'] = -1 + ret['msg'] = str(error) + return ret + + +# vi:set tw=0 ts=4 sw=4 nowrap fdm=indent diff --git a/cup/unittest.py b/cup/unittest.py index 1898554..bb97b74 100644 --- a/cup/unittest.py +++ b/cup/unittest.py @@ -17,7 +17,7 @@ import traceback import logging -import cup +from cup import log from cup import err __all__ = [ @@ -41,7 +41,7 @@ def _assert_bool(val, exp, errmsg=''): if (val is not exp): msg = 'got %s, expect %s\nUser ErrMsg: %s' % (val, exp, errmsg) try: - cup.log.critical(msg) + log.backtrace_critical(msg, 2) # pylint: disable=W0703 except Exception: pass @@ -79,7 +79,7 @@ def assert_eq(val, exp, errmsg=''): if (val != exp): msg = 'got %s, expect %s\nUser ErrMsg: %s' % (val, exp, errmsg) try: - cup.log.critical(msg) + log.backtrace_critical(msg, 1) # pylint: disable=W0703 except Exception: pass @@ -95,7 +95,7 @@ def assert_not_eq(val, exp, errmsg=''): val, errmsg ) try: - cup.log.critical(msg) + log.backtrace_critical(msg, 1) # pylint: disable=W0703 except Exception: pass @@ -118,7 +118,7 @@ def assert_eq_one(val, array, errmsg=''): val, str_arr, errmsg ) try: - cup.log.critical(msg) + log.backtrace_critical(msg, 1) # pylint: disable=W0703 except Exception: pass @@ -149,7 +149,7 @@ def assert_lt(val, exp, errmsg=''): val, exp, errmsg ) try: - cup.log.critical(msg) + log.backtrace_critical(msg, 1) # pylint: disable=W0703 except Exception: pass @@ -166,7 +166,7 @@ def assert_gt(val, exp, errmsg=''): val, exp, errmsg ) try: - cup.log.critical(msg) + log.backtrace_critical(msg, 1) except Exception: pass assert False, msg @@ -180,7 +180,7 @@ def assert_ge(val, exp, errmsg=''): msg = 'got %s, expect greater than (or equal to) %s\n User ErrMsg:%s'\ % (val, exp, errmsg) try: - cup.log.critical(msg) + log.backtrace_critical(msg, 1) # pylint: disable=W0703 except Exception: pass @@ -196,7 +196,7 @@ def assert_le(val, exp, errmsg=''): val, exp, errmsg ) try: - cup.log.critical(msg) + log.backtrace_critical(msg, 1) # pylint: disable=W0703 except Exception: pass @@ -211,7 +211,7 @@ def assert_ne(val, exp, errmsg=''): msg = 'Expect non-equal, got two equal values %s:%s\nUser Errmsg: %s' \ % (val, exp, errmsg) try: - cup.log.critical(msg) + log.backtrace_critical(msg, 1) # pylint: disable=W0703 except Exception: pass @@ -301,9 +301,9 @@ def __init__(self, logfile='./test.log', b_logstd=False, b_debug=False): else: debuglevel = logging.INFO - cup.log.init_comlog( + log.init_comlog( 'test_case', debuglevel, - logfile, cup.log.ROTATION, 5242880, b_logstd + logfile, log.ROTATION, 5242880, b_logstd ) def setup(self): @@ -467,6 +467,7 @@ def expect_raise(function, exception, *argc, **kwargs): """expect raise exception""" try: function(*argc, **kwargs) + raise err.ExpectFailure(exception, None) except exception: pass else: diff --git a/cup/util/conf.py b/cup/util/conf.py index 35bfc79..053351e 100644 --- a/cup/util/conf.py +++ b/cup/util/conf.py @@ -8,8 +8,6 @@ """ :author: Liu.Jia Guannan Ma -:create_date: - 2014 :descrition: Complex conf support """ @@ -742,7 +740,7 @@ def _check_key_valid(self, key): # pylint: disable=R0201 key = key[1:] for char in key: if not char.isalnum() and char != '_' \ - and char != '-' and char != '.': + and char != '-' and char != '.' and char != '$': raise KeyFormatError(key) # Check the [GROUP] key format @@ -806,6 +804,8 @@ def _get_input_lines(self, ignore_error): # pylint: disable=R0912,R0915 continue # if it's a section if line.startswith('['): + if line.find('#') > 0: + line = line[:line.find('#')].strip() if not line.endswith(']'): raise LineFormatError('Parse line error, line:\n' + line) line = line[1:-1] diff --git a/cup/util/threadpool.py b/cup/util/threadpool.py index f3f883e..cdd5e32 100644 --- a/cup/util/threadpool.py +++ b/cup/util/threadpool.py @@ -1,356 +1,364 @@ -#!/usr/bin/python -# -*- coding: utf-8 -* -# ############################################################################# -# -# Copyright (c) Baidu.com, Inc. All Rights Reserved -# -# ############################################################################# -""" -:author: - Guannan Ma -:create_date: - 2014 -:last_date: - 2014 -:descrition: - Guannan ported threadpool from twisted.python. - Mit License applied for twisted. - http://www.opensource.org/licenses/mit-license.php - if any concern, plz contact Guannan at mythmgn@gmail.com -""" - -try: - import Queue as queue -except ImportError: - # pylint: disable=F0401 - import queue -import sys -import copy -import time -import contextlib -import threading - -import cup -from cup import log -from cup.util import context -from cup.util import thread - -_CONTEXT_TRACKER = context.ContextTracker4Thread() - - -class ThreadPool(object): - """ - Threadpool class - """ - - # _THREAD_FACTORY = threading.Thread - _THREAD_FACTORY = thread.CupThread - _CURRENT_THREAD = staticmethod(threading.current_thread) - _WORKER_STOP_SIGN = object() - - def __init__(self, minthreads=5, maxthreads=20, name=None): - """ - 创建一个线程池。 - :param minthreads: - 最少多少个线程在工作。 - :param maxthreads: - 最多多少个线程在工作 - """ - assert minthreads > 0, 'minimum must be >= 0 ' - assert minthreads <= maxthreads, 'minimum is greater than maximum' - - self._min = 5 - self._max = 20 - self._joined = False - self._started = False - self._workers = 0 - self._name = None - # Queue is a thread-safe queue - self._jobqueue = queue.Queue(0) - self._min = minthreads - self._max = maxthreads - self._name = name - self._waiters = [] - self._threads = [] - self._working = [] - - def start(self): - """ - 启动线程池 - """ - self._joined = False - self._started = True - # Start some threads. - self.adjust_poolsize() - - def start1worker(self): - """ - 为线程池增加一个线程。 - """ - self._workers += 1 - name = "PoolThread-%s-%s" % (self._name or id(self), self._workers) - new_thd = self._THREAD_FACTORY(target=self._worker, name=name) - self._threads.append(new_thd) - new_thd.start() - - def stop1worker(self): - """ - 为线程池减少一个线程。 - """ - self._jobqueue.put(self._WORKER_STOP_SIGN) - self._workers -= 1 - - def __setstate__(self, state): - """ - For pickling an instance from a serilized string - """ - self.__dict__ = state - self.__class__.__init__(self, self._min, self._max) - - def __getstate__(self): - state = {} - state['min'] = self._min - state['max'] = self._max - return state - - def _start_decent_workers(self): - need_size = self._jobqueue.qsize() + len(self._working) - # Create enough, but not too many - while self._workers < min(self._max, need_size): - self.start1worker() - - def add_1job(self, func, *args, **kwargs): - """ - :param func: - 会被线程池调度的函数 - - :param *args: - func函数需要的参数 - - :param **kw: - func函数需要的kwargs参数 - """ - # log.info('add 1job[{0}]'.format(func)) - self.add_1job_with_callback(None, func, *args, **kwargs) - - def add_1job_with_callback(self, result_callback, func, *args, **kwargs): - """ - :param result_callback: - func作业处理函数被线程池调用后,无论成功与否都会 - 执行result_callback. - - result_callback函数需要有两个参数 - (ret_in_bool, result), 成功的话为(True, result), 失败的话 - 为(False, result) - - 如果func raise exception, result_callback会收到(False, failure) - - :param func: - 同add_1job, 被调度的作业函数 - - :param *args: - 同add_1job, func的参数 - - :param **kwargs: - 同add_1job, func的kwargs参数 - """ - if self._joined: - return - # pylint: disable=W0621 - context = _CONTEXT_TRACKER.current_context().contexts[-1] - job = (context, func, args, kwargs, result_callback) - self._jobqueue.put(job) - if self._started: - self._start_decent_workers() - - @contextlib.contextmanager - def _worker_state(self, state_list, worker_thread): - state_list.append(worker_thread) - try: - yield - finally: - state_list.remove(worker_thread) - - def _log_err_context(self, context): - log.warn( - 'Seems a call with context failed. See the context info' - ) - log.warn(str(context)) - - def _worker(self): - """ - worker func to handle jobs - """ - current_thd = self._CURRENT_THREAD() - with self._worker_state(self._waiters, current_thd): - job = self._jobqueue.get() - - while job is not self._WORKER_STOP_SIGN: - with self._worker_state(self._working, current_thd): - # pylint: disable=W0621 - context, function, args, kwargs, result_callback = job - del job - - try: - # pylint: disable=W0142 - result = _CONTEXT_TRACKER.call_with_context( - context, function, *args, **kwargs - ) - success = True - except Exception as error: - success = False - log.warn( - 'Func failed, func:%s, error_msg: %s' % - (str(function), str(error)) - ) - if result_callback is None: - log.warn('This func does not have callback.') - _CONTEXT_TRACKER.call_with_context( - context, self._log_err_context, context - ) - result = None - else: - result = error - - del function, args, kwargs - # when out of "with scope", - # the self._working will remove the thread from - # its self._working list - - if result_callback is not None: - try: - _CONTEXT_TRACKER.call_with_context( - context, result_callback, success, result - ) - except Exception as e: - # traceback.print_exc(file=sys.stderr) - log.warn( - 'result_callback func failed, callback func:%s,' - 'err_msg:%s' % (str(result_callback), str(e)) - ) - _CONTEXT_TRACKER.call_with_context( - context, self._log_err_context, context - ) - - del context, result_callback, result - - with self._worker_state(self._waiters, current_thd): - job = self._jobqueue.get() - # after with statements, self._waiters will remove current_thd - - # remove this thread from the list - self._threads.remove(current_thd) - - def stop(self, force_stop=False): - """ - 停止线程池, 该操作是同步操作, 会夯住一直等到线程池所有线程退出。 - - :force_stop: - if force_stop is True, try to stop the threads in the pool - immediately (and this may do damage to the logic) - """ - if not force_stop: - self._joined = True - threads = copy.copy(self._threads) - while self._workers: - self._jobqueue.put(self._WORKER_STOP_SIGN) - self._workers -= 1 - - # and let's just make sure - # FIXME: threads that have died before calling stop() are not joined. - for thread in threads: - thread.join() - else: - for thd in self._threads: - thd.terminate() - retry = False - times = 0 - while (not retry and (times <= 100)): - for thd in self._threads: - if thd.isAlive(): - thd.terminate() - retry = True - time.sleep(0.1) - times += 1 - - def try_stop(self, check_interval=0.1): - """ - 发送停止线程池命令, 并尝试查看是否stop了。 如果没停止,返回False - - try_stop不会夯住, 会回返。 属于nonblocking模式下 - """ - self._joined = True - threads = copy.copy(self._threads) - while self._workers: - self._jobqueue.put(self._WORKER_STOP_SIGN) - self._workers -= 1 - - for thread in threads: - thread.join(check_interval) - - for thread in threads: - if thread.isAlive(): - return False - - return True - - def adjust_poolsize(self, minthreads=None, maxthreads=None): - """ - 调整线程池的线程最少和最多运行线程个数 - """ - if minthreads is None: - minthreads = self._min - if maxthreads is None: - maxthreads = self._max - - assert minthreads >= 0, 'minimum is negative' - assert minthreads <= maxthreads, 'minimum is greater than maximum' - - self._min = minthreads - self._max = maxthreads - if not self._started: - return - - # Kill of some threads if we have too many. - while self._workers > self._max: - self.stop1worker() - # Start some threads if we have too few. - while self._workers < self._min: - self.start1worker() - # Start some threads if there is a need. - self._start_decent_workers() - - def get_stats(self): - """ - 回返当前threadpool的状态信息. - 其中queue_len为当前threadpool排队的作业长度 - waiters_num为当前空闲的thread num - working_num为当前正在工作的thread num - thread_num为当前一共可以使用的thread num:: - stat = {} - stat['queue_len'] = self._jobqueue.qsize() - stat['waiters_num'] = len(self._waiters) - stat['working_num'] = len(self._working) - stat['thread_num'] = len(self._threads) - """ - stat = {} - stat['queue_len'] = self._jobqueue.qsize() - stat['waiters_num'] = len(self._waiters) - stat['working_num'] = len(self._working) - stat['thread_num'] = len(self._threads) - return stat - - def dump_stats(self, print_stdout=False): - """ - 打印当前threadpool的状态信息到log 和stdout - 其中状态信息来自于get_stats函数 - """ - stat = self.get_stats() - if print_stdout: - print stat - log.info('ThreadPool Stat %s: %s' % (self._name, stat)) - log.debug('queue: %s' % self._jobqueue.queue) - log.debug('waiters: %s' % self._waiters) - log.debug('workers: %s' % self._working) - log.debug('total: %s' % self._threads) - return stat +#!/usr/bin/python +# -*- coding: utf-8 -* +# ############################################################################# +# +# Copyright (c) Baidu.com, Inc. All Rights Reserved +# +# ############################################################################# +""" +:author: + Guannan Ma +:descrition: + Guannan back-ported threadpool from twisted.python. + Mit License applied for twisted. + http://www.opensource.org/licenses/mit-license.php + if any concern, plz contact Guannan (mythmgn@gmail.com) +""" + +try: + import Queue as queue +except ImportError: + # pylint: disable=F0401 + import queue +import copy +import time +import contextlib +import threading + +from cup import log +from cup.util import context +from cup.util import thread + +_CONTEXT_TRACKER = context.ContextTracker4Thread() + + +# pylint: disable=R0902 +class ThreadPool(object): + """ + Threadpool class + """ + + # _THREAD_FACTORY = threading.Thread + _THREAD_FACTORY = thread.CupThread + _CURRENT_THREAD = staticmethod(threading.current_thread) + _WORKER_STOP_SIGN = object() + + def __init__( + self, minthreads=5, maxthreads=20, name=None, + daemon_threads=False + ): + """ + 创建一个线程池。 + :param minthreads: + 最少多少个线程在工作。 + :param maxthreads: + 最多多少个线程在工作 + :param daemon_threads: + 线程池内的线程是否是daemon threads, 默认是False. + 如果设置为True, 线程池里面的线程会随着主线程退出而退出, + 请无比了解清楚什么是daemon_threads在开启使用. + """ + assert minthreads > 0, 'minimum must be >= 0 ' + assert minthreads <= maxthreads, 'minimum is greater than maximum' + + self._min = 5 + self._max = 20 + self._joined = False + self._started = False + self._workers = 0 + self._name = None + self._daemon_thread = daemon_threads + # Queue is a thread-safe queue + self._jobqueue = queue.Queue(0) + self._min = minthreads + self._max = maxthreads + self._name = name + self._waiters = [] + self._threads = [] + self._working = [] + + def start(self): + """ + 启动线程池 + """ + self._joined = False + self._started = True + # Start some threads. + self.adjust_poolsize() + + def start1worker(self): + """ + 为线程池增加一个线程。 + """ + self._workers += 1 + name = "PoolThread-%s-%s" % (self._name or id(self), self._workers) + new_thd = self._THREAD_FACTORY(target=self._worker, name=name) + if self._daemon_thread: + new_thd.daemon = True + self._threads.append(new_thd) + new_thd.start() + + def stop1worker(self): + """ + 为线程池减少一个线程。 + """ + self._jobqueue.put(self._WORKER_STOP_SIGN) + self._workers -= 1 + + def __setstate__(self, state): + """ + For pickling an instance from a serilized string + """ + # pylint: disable=W0201 + # set up state for it + self.__dict__ = state + self.__class__.__init__(self, self._min, self._max) + + def __getstate__(self): + state = {} + state['min'] = self._min + state['max'] = self._max + return state + + def _start_decent_workers(self): + """ start decent/proper number of thread workers""" + need_size = self._jobqueue.qsize() + len(self._working) + # Create enough, but not too many + while self._workers < min(self._max, need_size): + self.start1worker() + + def add_1job(self, func, *args, **kwargs): + """ + :param func: + 会被线程池调度的函数 + + :param *args: + func函数需要的参数 + + :param **kw: + func函数需要的kwargs参数 + """ + # log.info('add 1job[{0}]'.format(func)) + self.add_1job_with_callback(None, func, *args, **kwargs) + + def add_1job_with_callback(self, result_callback, func, *args, **kwargs): + """ + :param result_callback: + func作业处理函数被线程池调用后,无论成功与否都会 + 执行result_callback. + + result_callback函数需要有两个参数 + (ret_in_bool, result), 成功的话为(True, result), 失败的话 + 为(False, result) + + 如果func raise exception, result_callback会收到(False, failure) + + :param func: + 同add_1job, 被调度的作业函数 + + :param *args: + 同add_1job, func的参数 + + :param **kwargs: + 同add_1job, func的kwargs参数 + """ + if self._joined: + return + # pylint: disable=W0621 + context = _CONTEXT_TRACKER.current_context().contexts[-1] + job = (context, func, args, kwargs, result_callback) + self._jobqueue.put(job) + if self._started: + self._start_decent_workers() + + @contextlib.contextmanager + def _worker_state(self, state_list, worker_thread): + state_list.append(worker_thread) + try: + yield + finally: + state_list.remove(worker_thread) + + def _log_err_context(self, context): + log.warn( + 'Seems a call with context failed. See the context info' + ) + log.warn(str(context)) + + def _worker(self): + """ + worker func to handle jobs + """ + current_thd = self._CURRENT_THREAD() + with self._worker_state(self._waiters, current_thd): + job = self._jobqueue.get() + + while job is not self._WORKER_STOP_SIGN: + with self._worker_state(self._working, current_thd): + # pylint: disable=W0621 + context, function, args, kwargs, result_callback = job + del job + + try: + # pylint: disable=W0142 + result = _CONTEXT_TRACKER.call_with_context( + context, function, *args, **kwargs + ) + success = True + except Exception as error: + success = False + log.warn( + 'Func failed, func:%s, error_msg: %s' % + (str(function), str(error)) + ) + if result_callback is None: + log.warn('This func does not have callback.') + _CONTEXT_TRACKER.call_with_context( + context, self._log_err_context, context + ) + result = None + else: + result = error + + del function, args, kwargs + # when out of "with scope", + # the self._working will remove the thread from + # its self._working list + + if result_callback is not None: + try: + _CONTEXT_TRACKER.call_with_context( + context, result_callback, success, result + ) + except Exception as e: + # traceback.print_exc(file=sys.stderr) + log.warn( + 'result_callback func failed, callback func:%s,' + 'err_msg:%s' % (str(result_callback), str(e)) + ) + _CONTEXT_TRACKER.call_with_context( + context, self._log_err_context, context + ) + + del context, result_callback, result + + with self._worker_state(self._waiters, current_thd): + job = self._jobqueue.get() + # after with statements, self._waiters will remove current_thd + + # remove this thread from the list + self._threads.remove(current_thd) + + def stop(self, force_stop=False): + """ + 停止线程池, 该操作是同步操作, 会夯住一直等到线程池所有线程退出。 + + :force_stop: + if force_stop is True, try to stop the threads in the pool + immediately (and this may do damage to the logic) + """ + if not force_stop: + self._joined = True + threads = copy.copy(self._threads) + while self._workers: + self._jobqueue.put(self._WORKER_STOP_SIGN) + self._workers -= 1 + + # and let's just make sure + # FIXME: threads that have died before calling stop() are not joined. + for thread in threads: + thread.join() + else: + for thd in self._threads: + thd.terminate() + retry = False + times = 0 + while (not retry and (times <= 100)): + for thd in self._threads: + if thd.isAlive(): + thd.terminate() + retry = True + time.sleep(0.1) + times += 1 + + def try_stop(self, check_interval=0.1): + """ + 发送停止线程池命令, 并尝试查看是否stop了。 如果没停止,返回False + + try_stop不会夯住, 会回返。 属于nonblocking模式下 + """ + self._joined = True + threads = copy.copy(self._threads) + while self._workers: + self._jobqueue.put(self._WORKER_STOP_SIGN) + self._workers -= 1 + + for thread in threads: + thread.join(check_interval) + + for thread in threads: + if thread.isAlive(): + return False + + return True + + def adjust_poolsize(self, minthreads=None, maxthreads=None): + """ + 调整线程池的线程最少和最多运行线程个数 + """ + if minthreads is None: + minthreads = self._min + if maxthreads is None: + maxthreads = self._max + + assert minthreads >= 0, 'minimum is negative' + assert minthreads <= maxthreads, 'minimum is greater than maximum' + + self._min = minthreads + self._max = maxthreads + if not self._started: + return + + # Kill of some threads if we have too many. + while self._workers > self._max: + self.stop1worker() + # Start some threads if we have too few. + while self._workers < self._min: + self.start1worker() + # Start some threads if there is a need. + self._start_decent_workers() + + def get_stats(self): + """ + 回返当前threadpool的状态信息. + 其中queue_len为当前threadpool排队的作业长度 + waiters_num为当前空闲的thread num + working_num为当前正在工作的thread num + thread_num为当前一共可以使用的thread num:: + stat = {} + stat['queue_len'] = self._jobqueue.qsize() + stat['waiters_num'] = len(self._waiters) + stat['working_num'] = len(self._working) + stat['thread_num'] = len(self._threads) + """ + stat = {} + stat['queue_len'] = self._jobqueue.qsize() + stat['waiters_num'] = len(self._waiters) + stat['working_num'] = len(self._working) + stat['thread_num'] = len(self._threads) + return stat + + def dump_stats(self, print_stdout=False): + """ + 打印当前threadpool的状态信息到log 和stdout + 其中状态信息来自于get_stats函数 + """ + stat = self.get_stats() + if print_stdout: + print stat + log.info('ThreadPool Stat %s: %s' % (self._name, stat)) + log.debug('queue: %s' % self._jobqueue.queue) + log.debug('waiters: %s' % self._waiters) + log.debug('workers: %s' % self._working) + log.debug('total: %s' % self._threads) + return stat diff --git a/cup/version.py b/cup/version.py index 7395cf1..ef9dfc6 100644 --- a/cup/version.py +++ b/cup/version.py @@ -11,7 +11,7 @@ Guannan Ma maguannan @mythmgn """ -VERSION = '1.6.0' +VERSION = '1.6.1' AUTHOR = 'nfs-qa' # vi:set tw=0 ts=4 sw=4 nowrap fdm=indent