diff --git a/cup/net/__init__.py b/cup/net/__init__.py index 2608811..8c53ba4 100644 --- a/cup/net/__init__.py +++ b/cup/net/__init__.py @@ -199,7 +199,7 @@ def port_listened(host, port, is_ipv6=False): listened = False try: result = sock.connect_ex((host, port)) - # 35 means eagain + # 35 means EAGIN if result == 0 or result == 35: listened = True # pylint: disable=W0703 diff --git a/cup/net/async/conn.py b/cup/net/async/conn.py index 75fd0f6..51c483c 100644 --- a/cup/net/async/conn.py +++ b/cup/net/async/conn.py @@ -209,7 +209,6 @@ def push_msg2sendqueue(self, msg): self._peer2context[peer] = context self._fileno2context[fileno] = context self._context2fileno_peer[context] = (fileno, peer) - log.info('created context for the new sock') ret = 0 try: self._epoll.register( @@ -267,7 +266,6 @@ def connect(self, peer): sock.close() return None self._set_sock_nonblocking(sock) - log.info('connect peer success') return sock except socket.error as error: log.warn( @@ -344,7 +342,7 @@ def _cleanup_context(send_queue, peerinfo): del self._fileno2context[fileno_peer[0]] del self._peer2context[fileno_peer[1]] del self._context2fileno_peer[context] - log.info('socket closed') + log.info('socket {0} closed successfully'.format(peerinfo)) except Exception as error: pass finally: @@ -750,7 +748,7 @@ def _check_needack_queue(self): del self._needack_context_dict[msg_key] self._executor.queue_exec( last_msg.get_callback_function(), - executor.URGENCY_NORMAL, + executor.URGENCY_HIGH, last_msg, True ) else: @@ -759,8 +757,9 @@ def _check_needack_queue(self): ) continue # not ack_success + not in context_dict - if msg_key not in self._needack_context_dict: - self._needack_context_dict[msg_key] = msg_item + else: + if msg_key not in self._needack_context_dict: + self._needack_context_dict[msg_key] = msg_item time_out_list = [] for key in self._needack_context_dict.keys(): msg_item = self._needack_context_dict[key] diff --git a/cup/net/async/context.py b/cup/net/async/context.py index 7f7358d..fa1ee2c 100644 --- a/cup/net/async/context.py +++ b/cup/net/async/context.py @@ -8,9 +8,6 @@ Connection Context for each socket """ import copy -# import socket -# import select -# import errno import time import threading import traceback @@ -21,10 +18,7 @@ import cup from cup import log -# from cup import err as cuperr from cup.util import misc -# from cup.util import threadpool -# from cup.services import executor from cup.net.async import msg as async_msg @@ -74,8 +68,7 @@ def to_destroy(self): else: msg = 'context with socket: {0}, peer:{1}'.format( self._sock, self.get_peerinfo()) - log.info('({0}) is to be destroyed'.format(msg)) - # self._conn.cleanup_error_context(self) + log.debug('({0}) is to be destroyed'.format(msg)) self._lock.release() def is_detroying(self): diff --git a/cup/services/serializer.py b/cup/services/serializer.py index 49d5070..9864410 100644 --- a/cup/services/serializer.py +++ b/cup/services/serializer.py @@ -8,6 +8,8 @@ """ import os import time +import traceback +import threading import collections from cup import log @@ -74,7 +76,6 @@ def convert_bytes2uint(cls, str_data): return num - MsgPostRevision = collections.namedtuple( 'MsgPostRevision', ['hostkey', 'revision_id', 'is_post'] ) @@ -83,7 +84,8 @@ def convert_bytes2uint(cls, str_data): class LocalFileSerilizer(BaseSerilizer): """ local file serilizer""" def __init__( - self, storage_dir, skip_badlog=False, max_logfile_size=65536 + self, storage_dir, skip_badlog=False, max_logfile_size=512 * 1024, + persist_after_sec=10 * 60 ): """ @@ -91,6 +93,8 @@ def __init__( Attention. Plz use this parameter very carefully. It will skip bad records which means you have high chance losing data!!! + :param persist_after_sec: + """ BaseSerilizer.__init__(self) self._skip_badlog = skip_badlog @@ -102,6 +106,8 @@ def __init__( self._load_stream = None self._current_loadfile = None self._buffer_files = None + self._persist_sec = persist_after_sec + self._record_lenbytes = 4 self._loglist_stream = None self._logfile_switching = False @@ -109,6 +115,12 @@ def __init__( self._logfile_listnew = '{0}.new'.format(self._logfile_list) self._loglist_switch = '{0}.switch'.format(self._logfile_list) self._loglist_switched = '{0}.switched'.format(self._logfile_list) + self._mlock = threading.Lock() + self._name = self.__class__ + + def set_name(self, name): + """set a name of str for the serializer""" + self._name = name @classmethod def __cmp_logfile_id(cls, first, second): @@ -224,7 +236,7 @@ def _stream_close(self): """close the current stream""" self._writestream.close() - def _stream_open(self, fname): + def _stream_wbopen(self, fname): """open new stream""" ret = False try: @@ -248,14 +260,14 @@ def _stream_open(self, fname): ) return ret - def is_stream_open(self): + def is_stream_wbopen(self): """is stream open""" if self._writestream is None: return False else: return True - def get_next_logfile(self, logid): + def _get_next_logfile(self, logid): """get current logfile""" folder = self._get_storage_dir(logid=logid) fname = '{0}/writing.{1}'.format(folder, logid) @@ -268,15 +280,24 @@ def get_subdir(self, log_id=-1): def _get_ordered_logfiles(self, folder): """get log files in order""" files = sorted(os.listdir(folder), cmp=self.__cmp_logfile_id) - return files + retfiles = [] + for item in files: + if any([ + len(item.split('.')) != 2, + item.find('done.') < 0 and item.find('writing.') < 0 + ]): + log.info('file name {0} invalid, will skip'.format(item)) + continue + retfiles.append(item) + return retfiles def set_current_logid(self, logid): """reset current log id""" if logid < 0: raise ValueError('cannot setup logid less than 0') self._logid = logid - fname = self.get_next_logfile(self._logid) - if not self._stream_open(fname): + fname = self._get_next_logfile(self._logid) + if not self._stream_wbopen(fname): log.error('failed to open stream, return False') return False log.info('reset current log id to {0}'.format(logid)) @@ -344,43 +365,63 @@ def _check_need_new_logfile(self): self._loglist_stream.flush() os.fsync(self._loglist_stream.fileno()) self._current_filesize = 0 - # log.info('next logid:{0}'.format(self._logid)) - fname = self.get_next_logfile(self._logid) - if not self._stream_open(fname): + fname = self._get_next_logfile(self._logid) + if not self._stream_wbopen(fname): return False return True - def add_log(self, log_type, log_mode, log_binary): - """ add log into the local file""" - if not self.is_stream_open(): - fname = self.get_next_logfile(self._logid) - if not self._stream_open(fname): - return False - # binary := - # 32bit len | 128bit logid | log_type 16bit | log_mode 16bit| binary - bin_logid = self.asign_uint2byte_bybits(self._logid, 128) - bin_type = self.asign_uint2byte_bybits(log_type, 16) - bin_mode = self.asign_uint2byte_bybits(log_mode, 16) - data = '{0}{1}{2}{3}'.format(bin_logid, bin_type, bin_mode, log_binary) - data_len = len(data) - str_data_len = self.asign_uint2byte_bybits(data_len, 32) - log.debug('{0} add_log, log_type {1} log_mode {2}'.format( - self.__class__, log_type, log_mode) - ) - write_data = '{0}{1}'.format(str_data_len, data) - log.info('to add data, logid:{0}'.format(self._logid)) - if self._write_data(write_data): - log.debug('add_log, write success') - self._current_filesize += (data_len + 4) - if not self._check_need_new_logfile(): - return False + def is_empty(self): + """return if there is no log""" + folder = self._get_storage_dir() + files = self._get_ordered_logfiles(folder) + if len(files) < 1: return True else: - log.warn('{0} failed to add_log, log_type {1} log_mode {2}'.format( - self.__class__, log_type, log_mode) - ) return False + def add_log(self, log_type, log_mode, log_binary): + """add log into the local file + + :return: + a tuple (result_True_or_False, logid_or_None) + """ + self._mlock.acquire() + ret = (True, None) + if not self.is_stream_wbopen(): + fname = self._get_next_logfile(self._logid) + if not self._stream_wbopen(fname): + ret = (False, None) + if ret[0]: + # binary := + # 32bit len | 128bit logid | log_type 16bit | log_mode 16bit| bin + bin_logid = self.asign_uint2byte_bybits(self._logid, 128) + bin_type = self.asign_uint2byte_bybits(log_type, 16) + bin_mode = self.asign_uint2byte_bybits(log_mode, 16) + data = '{0}{1}{2}{3}'.format( + bin_logid, bin_type, bin_mode, log_binary + ) + data_len = len(data) + str_data_len = self.asign_uint2byte_bybits( + data_len, self._record_lenbytes * 8) + write_data = '{0}{1}'.format(str_data_len, data) + log.info('{0} add_log, type {1} mode {2}, logid {3}, ' + 'datelen:{4}'.format( + self._name, log_type, log_mode, self._logid, data_len) + ) + if self._write_data(write_data): + self._current_filesize += (data_len + len(str_data_len)) + if not self._check_need_new_logfile(): + ret = (False, None) + else: + ret = (True, self._logid) + else: + log.error('failed to add_log(type:{} mode {}'.format( + log_type, log_mode) + ) + ret = (False, None) + self._mlock.release() + return ret + def purge_data(self, before_logid): """ log files which contains log (less than before_logid) will be purged. @@ -482,9 +523,9 @@ def open4write(self, truncate_last_failure=True): log.error('cannot create loglist, raise IOError') raise IOError('cannot create loglist, {0}'.format(err)) log.info( - 'try to recover from last ' - 'write if there is any need, truncate_last_failure:{0}'.format( - truncate_last_failure) + '{0} try to recover from last ' + 'write if there is any need, truncate_last_failure:{1}'.format( + self._name, truncate_last_failure) ) self._recover_from_lastwriting(truncate_last_failure) @@ -501,15 +542,15 @@ def _try_read_one_log(self, stream): """ if stream is None: - return LOGFILE_EOF + return (LOGFILE_EOF, None) str_datalen = datalen = str_data = None try: - str_datalen = stream.read(4) + str_datalen = stream.read(self._record_lenbytes) if len(str_datalen) == 0: return (LOGFILE_EOF, None) - if len(str_datalen) < 4: + if len(str_datalen) < self._record_lenbytes: log.warn('got a bad log from stream:{0}'.format(stream.name)) - return LOGFILE_BAD_RECORD + return (LOGFILE_BAD_RECORD, None) datalen = self.convert_bytes2uint(str_datalen) str_data = stream.read(datalen) if len(str_data) < datalen: @@ -518,10 +559,10 @@ def _try_read_one_log(self, stream): stream.name) ) return (LOGFILE_BAD_RECORD, None) - log_id = self.convert_bytes2uint(str_data[0:16]) - log_type = self.convert_bytes2uint(str_data[16:2]) - log_mode = self.convert_bytes2uint(str_data[18:2]) - log_binary = self.convert_bytes2uint(str_data[20:]) + log_id = self.convert_bytes2uint(str_data[0: 16]) + log_type = self.convert_bytes2uint(str_data[16: 16 + 2]) + log_mode = self.convert_bytes2uint(str_data[18: 18 + 2]) + log_binary = str_data[20:] return ( LOGFILE_GOOD, LogRecord( datalen, log_id, log_type, log_mode, log_binary @@ -529,6 +570,7 @@ def _try_read_one_log(self, stream): ) except Exception as err: log.error('failed to parse log record:{0}'.format(err)) + log.error(traceback.format_exc()) return LOGFILE_BAD_RECORD def _move2next_load_fname(self): @@ -576,7 +618,7 @@ def read(self, record_num=128): and continue reading :return: - a. return a list of "record_num" of LogRecords. + a. return a list of "record_num" of LogRecord. b. If the count number of list is less than record_num, it means the stream encounter EOF, plz read again afterwards. @@ -613,7 +655,7 @@ def read(self, record_num=128): move2nextstream = False ret = self._move2next_load_fname() if LOGFILE_EOF == ret: - log.debug('does not have more log edits to read, plz retry') + log.debug('no more log edits to read, plz retry') break elif LOGFILE_GOOD == ret: log.debug('moved to next log edit file, to read new log') diff --git a/cup/storage/obj.py b/cup/storage/obj.py index 81a872a..ee5140f 100644 --- a/cup/storage/obj.py +++ b/cup/storage/obj.py @@ -8,10 +8,11 @@ """ import os -import sys import abc +import time import shutil import ftplib +import traceback import logging from cup import log @@ -478,7 +479,8 @@ def delete_bucket(self, bucket_name, forcely=False): class FTPObjectSystem(ObjectInterface): """ - ftp object system + ftp object system, Plz notice all methods of FTPObjectSystem is NOT + thread-safe! Be careful when you use it in a service of concurrency. """ def __init__(self, config): """ @@ -494,12 +496,12 @@ def __init__(self, config): cup.err.ConfigError if there's any config item missing """ ObjectInterface.__init__(self, config) - required_keys = ['uri', 'user', 'password'] + required_keys = ['uri', 'user', 'passwords'] if not self._validate_config(self._config, required_keys): raise err.ConfigError(str(required_keys)) self._uri = self._config['uri'] self._user = self._config['user'] - self._passwd = self._config['password'] + self._passwd = self._config['passwords'] self._extra = self._config['extra'] self._dufault_timeout = 30 if self._extra is not None and isinstance(self._config['extra'], int): @@ -512,15 +514,42 @@ def __init__(self, config): self.port = int(self._uri.split(':')[2]) self._ftp_con.connect(self._host, self._port, self._dufault_timeout) self._ftp_con.login(self._user, self._passwd) + self._last_optime = time.time() + self._timeout = 15 # idle time for ftp def __del__(self): """release connect""" - self._ftp_con.quit() + try: + self._ftp_con.quit() + except: + pass - def put(self, dest, localfile): - """ - :param dest: - ftp path + def _check_timeout(self): + """check if we need to reconnect""" + if time.time() - self._last_optime > self._timeout: + try: + self._ftp_con.quit() + except: + pass + self._ftp_con.connect( + self._host, self._port, self._dufault_timeout + ) + self._ftp_con.login(self._user, self._passwd) + self._last_optime = time.time() + + def _get_relative_path(self, path, cwd): + """get relative path for real actions""" + cwd = os.path.normpath(cwd) + path = os.path.normpath(path) + if path.find(cwd) >= 0 and path.startswith('/'): + path = path[len(cwd):] + path = path.lstrip('/') + return path + + def put(self, destfile, localfile): + """ + :param destfile: + ftp path for the localfile :param localfile: localfile """ @@ -528,19 +557,32 @@ def put(self, dest, localfile): 'returncode': 0, 'msg': 'success' } - src_path = self._ftp_con.pwd() - file_name = localfile - if "/" in localfile: - file_name = localfile.split('/')[-1] + log.info('to put localfile {0} to ftp {1}'.format(localfile, destfile)) + self._check_timeout() + cwd = self._ftp_con.pwd() + destdir = None + destfile = os.path.normpath(destfile) + destfile = self._get_relative_path(destfile, cwd) + rindex = destfile.rfind('/') + if rindex < 0: + destdir = cwd + file_name = destfile + elif rindex >= (len(destfile) - 1): + raise ValueError('value error, destfile {0}'.format( + destfile)) + else: + destdir = destfile[:rindex] + file_name = destfile.split('/')[-1] + log.info('put localfile {0} into ftp {1}'.format(localfile, destfile)) with open(localfile, 'rb') as fhandle: try: - self._ftp_con.cwd(dest) - ftp_cmd = 'STOR ' + file_name + self._ftp_con.cwd(destdir) + ftp_cmd = 'STOR {0}'.format(file_name) self._ftp_con.storbinary(ftp_cmd, fhandle) except Exception as error: ret['returncode'] = -1 - ret['msg'] = 'failed to put:{}'.format(error) - self._ftp_con.cwd(src_path) + ret['msg'] = 'failed to put, err:{0}'.format(error) + self._ftp_con.cwd(cwd) return ret def delete(self, path): @@ -549,6 +591,10 @@ def delete(self, path): 'returncode': 0, 'msg': 'success' } + log.info('to delete ftp file: {0}'.format(path)) + self._check_timeout() + cwd = os.path.normpath(self._ftp_con.pwd()) + path = self._get_relative_path(path, cwd) try: self._ftp_con.delete(path) except Exception as error: @@ -564,16 +610,21 @@ def get(self, path, localpath): 'returncode': 0, 'msg': 'success' } + log.info('to get ftp file {0} to {1}'.format(path, localpath)) + self._check_timeout() + cwd = self._ftp_con.pwd() + path = self._get_relative_path(path, cwd) if localpath.endswith('/'): localpath += path.split('/')[-1] + log.info('to get ftp {0} to local {1}'.format(path, localpath)) try: with open(localpath, 'w+') as fhandle: ftp_cmd = 'RETR {0}'.format(path) resp = self._ftp_con.retrbinary(ftp_cmd, fhandle.write) except Exception as error: + log.error(traceback.format_exc()) ret['returncode'] = -1 ret['msg'] = str(error) - return ret def head(self, path): @@ -593,6 +644,9 @@ def head(self, path): 'returncode': -1, 'msg': 'failed to get objectinfo' } + self._check_timeout() + cwd = self._ftp_con.pwd() + path = self._get_relative_path(path, cwd) res_info = [] f_flag = False if self.is_file(path): @@ -621,7 +675,9 @@ def mkdir(self, path, recursive=True): 'returncode': 0, 'msg': 'success' } - src_path = self._ftp_con.pwd() + self._check_timeout() + cwd = self._ftp_con.pwd() + path = self._get_relative_path(path, cwd) try: if not recursive: self._ftp_con.mkd(path) @@ -635,8 +691,8 @@ def mkdir(self, path, recursive=True): self._ftp_con.cwd(subdir) except Exception as error: ret['returncode'] = -1 - ret['msg'] = 'failed to mkdir, err:{}'.format(error) - self._ftp_con.cwd(src_path) + ret['msg'] = 'failed to mkdir, err:{0}'.format(error) + self._ftp_con.cwd(cwd) return ret def rmdir(self, path, recursive=True): @@ -647,12 +703,14 @@ def rmdir(self, path, recursive=True): 'returncode': 0, 'msg': 'success' } - src_path = self._ftp_con.pwd() + self._check_timeout() + cwd = self._ftp_con.pwd() + path = self._get_relative_path(path, cwd) try: if not recursive: self._ftp_con.rmd(path) else: - src_path = self._ftp_con.pwd() + cwd = self._ftp_con.pwd() self._ftp_con.cwd(path) allItems = self._ftp_con.nlst() for item in allItems: @@ -660,25 +718,26 @@ def rmdir(self, path, recursive=True): self._ftp_con.delete(item) else: self.rmdir(item) - self._ftp_con.cwd(src_path) + self._ftp_con.cwd(cwd) self._ftp_con.rmd(path) except Exception as error: ret['returncode'] = -1 - ret['msg'] = 'failed to rmdir, err:{}'.format(error) - self._ftp_con.cwd(src_path) + ret['msg'] = 'failed to rmdir, err:{0}'.format(error) + self._ftp_con.cwd(cwd) return ret def is_file(self, path): """path is file or not""" res = False - src_path = self._ftp_con.pwd() - path = path.rstrip('/') + self._check_timeout() + cwd = self._ftp_con.pwd() + path = self._get_relative_path(path, cwd) res_info = [] def _call_back(arg): res_info.append(arg) try: self._ftp_con.cwd(path) - self._ftp_con.cwd(src_path) + self._ftp_con.cwd(cwd) return res except Exception as e: pass @@ -689,9 +748,9 @@ def _call_back(arg): self._ftp_con.retrlines('MLSD', _call_back) for item in res_info: if item.split(';')[-1].strip() == file_name and 'type=file' in item: - self._ftp_con.cwd(src_path) + self._ftp_con.cwd(cwd) return True - self._ftp_con.cwd(src_path) + self._ftp_con.cwd(cwd) return False @@ -723,7 +782,7 @@ def put(self, dest, localfile): # pylint: disable=W0703 except Exception as error: ret['returncode'] = -1 - ret['msg'] = 'failed to put:{}'.format(error) + ret['msg'] = 'failed to put:{0}'.format(error) return ret def delete(self, path): @@ -737,7 +796,9 @@ def delete(self, path): # pylint: disable=W0703 except Exception as error: ret['returncode'] = -1 - ret['msg'] = 'failed to unlink file:{}, err:{}'.format(path, error) + ret['msg'] = 'failed to unlink file:{0}, err:{1}'.format( + path, error + ) return ret def get(self, path, localpath): @@ -784,7 +845,7 @@ def mkdir(self, path, recursive=True): func(path) except IOError as error: ret['returncode'] = -1 - ret['msg'] = 'failed to mkdir, err:{}'.format(error) + ret['msg'] = 'failed to mkdir, err:{0}'.format(error) return ret def rmdir(self, path, recursive=True): @@ -802,7 +863,7 @@ def rmdir(self, path, recursive=True): func(path) except IOError as error: ret['returncode'] = -1 - ret['msg'] = 'failed to rmdir, err:{}'.format(error) + ret['msg'] = 'failed to rmdir, err:{0}'.format(error) return ret # vi:set tw=0 ts=4 sw=4 nowrap fdm=indent diff --git a/cup/thread.py b/cup/thread.py index cf3312d..fc05723 100644 --- a/cup/thread.py +++ b/cup/thread.py @@ -29,7 +29,7 @@ class CupThread(threading.Thread): """ CupThread is a sub-class inherited from threading.Thread; - CupThread has 3 more features: + CupThread has 3 more methods: 1. raise_exc, to send a raise-exception signal to the thread, TRY to let the thread raise an exception. diff --git a/cup/version.py b/cup/version.py index baa94f4..e49c28a 100644 --- a/cup/version.py +++ b/cup/version.py @@ -13,7 +13,7 @@ ] -VERSION = '1.7.0' +VERSION = '2.0.0' AUTHOR = 'CUP-DEV Team' # vi:set tw=0 ts=4 sw=4 nowrap fdm=indent diff --git a/docs/package-lock.json b/docs/package-lock.json new file mode 100644 index 0000000..48e341a --- /dev/null +++ b/docs/package-lock.json @@ -0,0 +1,3 @@ +{ + "lockfileVersion": 1 +}