Skip to content

Commit

Permalink
sync-up to cup 2.0.0
Browse files Browse the repository at this point in the history
To release cup 2.0.0
  • Loading branch information
baidu authored and baidu committed Feb 12, 2019
1 parent 1e291c7 commit d1d4d55
Show file tree
Hide file tree
Showing 8 changed files with 201 additions and 103 deletions.
2 changes: 1 addition & 1 deletion cup/net/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ def port_listened(host, port, is_ipv6=False):
listened = False
try:
result = sock.connect_ex((host, port))
# 35 means eagain
# 35 means EAGIN
if result == 0 or result == 35:
listened = True
# pylint: disable=W0703
Expand Down
11 changes: 5 additions & 6 deletions cup/net/async/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,6 @@ def push_msg2sendqueue(self, msg):
self._peer2context[peer] = context
self._fileno2context[fileno] = context
self._context2fileno_peer[context] = (fileno, peer)
log.info('created context for the new sock')
ret = 0
try:
self._epoll.register(
Expand Down Expand Up @@ -267,7 +266,6 @@ def connect(self, peer):
sock.close()
return None
self._set_sock_nonblocking(sock)
log.info('connect peer success')
return sock
except socket.error as error:
log.warn(
Expand Down Expand Up @@ -344,7 +342,7 @@ def _cleanup_context(send_queue, peerinfo):
del self._fileno2context[fileno_peer[0]]
del self._peer2context[fileno_peer[1]]
del self._context2fileno_peer[context]
log.info('socket closed')
log.info('socket {0} closed successfully'.format(peerinfo))
except Exception as error:
pass
finally:
Expand Down Expand Up @@ -750,7 +748,7 @@ def _check_needack_queue(self):
del self._needack_context_dict[msg_key]
self._executor.queue_exec(
last_msg.get_callback_function(),
executor.URGENCY_NORMAL,
executor.URGENCY_HIGH,
last_msg, True
)
else:
Expand All @@ -759,8 +757,9 @@ def _check_needack_queue(self):
)
continue
# not ack_success + not in context_dict
if msg_key not in self._needack_context_dict:
self._needack_context_dict[msg_key] = msg_item
else:
if msg_key not in self._needack_context_dict:
self._needack_context_dict[msg_key] = msg_item
time_out_list = []
for key in self._needack_context_dict.keys():
msg_item = self._needack_context_dict[key]
Expand Down
9 changes: 1 addition & 8 deletions cup/net/async/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,6 @@
Connection Context for each socket
"""
import copy
# import socket
# import select
# import errno
import time
import threading
import traceback
Expand All @@ -21,10 +18,7 @@

import cup
from cup import log
# from cup import err as cuperr
from cup.util import misc
# from cup.util import threadpool
# from cup.services import executor
from cup.net.async import msg as async_msg


Expand Down Expand Up @@ -74,8 +68,7 @@ def to_destroy(self):
else:
msg = 'context with socket: {0}, peer:{1}'.format(
self._sock, self.get_peerinfo())
log.info('({0}) is to be destroyed'.format(msg))
# self._conn.cleanup_error_context(self)
log.debug('({0}) is to be destroyed'.format(msg))
self._lock.release()

def is_detroying(self):
Expand Down
144 changes: 93 additions & 51 deletions cup/services/serializer.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
"""
import os
import time
import traceback
import threading
import collections

from cup import log
Expand Down Expand Up @@ -74,7 +76,6 @@ def convert_bytes2uint(cls, str_data):
return num



MsgPostRevision = collections.namedtuple(
'MsgPostRevision', ['hostkey', 'revision_id', 'is_post']
)
Expand All @@ -83,14 +84,17 @@ def convert_bytes2uint(cls, str_data):
class LocalFileSerilizer(BaseSerilizer):
""" local file serilizer"""
def __init__(
self, storage_dir, skip_badlog=False, max_logfile_size=65536
self, storage_dir, skip_badlog=False, max_logfile_size=512 * 1024,
persist_after_sec=10 * 60
):
"""
:param skip_badlog:
Attention. Plz use this parameter very carefully.
It will skip bad records which means you have high chance
losing data!!!
:param persist_after_sec:
"""
BaseSerilizer.__init__(self)
self._skip_badlog = skip_badlog
Expand All @@ -102,13 +106,21 @@ def __init__(
self._load_stream = None
self._current_loadfile = None
self._buffer_files = None
self._persist_sec = persist_after_sec
self._record_lenbytes = 4

self._loglist_stream = None
self._logfile_switching = False
self._logfile_list = '{0}/logfile.list'.format(storage_dir)
self._logfile_listnew = '{0}.new'.format(self._logfile_list)
self._loglist_switch = '{0}.switch'.format(self._logfile_list)
self._loglist_switched = '{0}.switched'.format(self._logfile_list)
self._mlock = threading.Lock()
self._name = self.__class__

def set_name(self, name):
"""set a name of str for the serializer"""
self._name = name

@classmethod
def __cmp_logfile_id(cls, first, second):
Expand Down Expand Up @@ -224,7 +236,7 @@ def _stream_close(self):
"""close the current stream"""
self._writestream.close()

def _stream_open(self, fname):
def _stream_wbopen(self, fname):
"""open new stream"""
ret = False
try:
Expand All @@ -248,14 +260,14 @@ def _stream_open(self, fname):
)
return ret

def is_stream_open(self):
def is_stream_wbopen(self):
"""is stream open"""
if self._writestream is None:
return False
else:
return True

def get_next_logfile(self, logid):
def _get_next_logfile(self, logid):
"""get current logfile"""
folder = self._get_storage_dir(logid=logid)
fname = '{0}/writing.{1}'.format(folder, logid)
Expand All @@ -268,15 +280,24 @@ def get_subdir(self, log_id=-1):
def _get_ordered_logfiles(self, folder):
"""get log files in order"""
files = sorted(os.listdir(folder), cmp=self.__cmp_logfile_id)
return files
retfiles = []
for item in files:
if any([
len(item.split('.')) != 2,
item.find('done.') < 0 and item.find('writing.') < 0
]):
log.info('file name {0} invalid, will skip'.format(item))
continue
retfiles.append(item)
return retfiles

def set_current_logid(self, logid):
"""reset current log id"""
if logid < 0:
raise ValueError('cannot setup logid less than 0')
self._logid = logid
fname = self.get_next_logfile(self._logid)
if not self._stream_open(fname):
fname = self._get_next_logfile(self._logid)
if not self._stream_wbopen(fname):
log.error('failed to open stream, return False')
return False
log.info('reset current log id to {0}'.format(logid))
Expand Down Expand Up @@ -344,43 +365,63 @@ def _check_need_new_logfile(self):
self._loglist_stream.flush()
os.fsync(self._loglist_stream.fileno())
self._current_filesize = 0
# log.info('next logid:{0}'.format(self._logid))
fname = self.get_next_logfile(self._logid)
if not self._stream_open(fname):
fname = self._get_next_logfile(self._logid)
if not self._stream_wbopen(fname):
return False
return True

def add_log(self, log_type, log_mode, log_binary):
""" add log into the local file"""
if not self.is_stream_open():
fname = self.get_next_logfile(self._logid)
if not self._stream_open(fname):
return False
# binary :=
# 32bit len | 128bit logid | log_type 16bit | log_mode 16bit| binary
bin_logid = self.asign_uint2byte_bybits(self._logid, 128)
bin_type = self.asign_uint2byte_bybits(log_type, 16)
bin_mode = self.asign_uint2byte_bybits(log_mode, 16)
data = '{0}{1}{2}{3}'.format(bin_logid, bin_type, bin_mode, log_binary)
data_len = len(data)
str_data_len = self.asign_uint2byte_bybits(data_len, 32)
log.debug('{0} add_log, log_type {1} log_mode {2}'.format(
self.__class__, log_type, log_mode)
)
write_data = '{0}{1}'.format(str_data_len, data)
log.info('to add data, logid:{0}'.format(self._logid))
if self._write_data(write_data):
log.debug('add_log, write success')
self._current_filesize += (data_len + 4)
if not self._check_need_new_logfile():
return False
def is_empty(self):
"""return if there is no log"""
folder = self._get_storage_dir()
files = self._get_ordered_logfiles(folder)
if len(files) < 1:
return True
else:
log.warn('{0} failed to add_log, log_type {1} log_mode {2}'.format(
self.__class__, log_type, log_mode)
)
return False

def add_log(self, log_type, log_mode, log_binary):
"""add log into the local file
:return:
a tuple (result_True_or_False, logid_or_None)
"""
self._mlock.acquire()
ret = (True, None)
if not self.is_stream_wbopen():
fname = self._get_next_logfile(self._logid)
if not self._stream_wbopen(fname):
ret = (False, None)
if ret[0]:
# binary :=
# 32bit len | 128bit logid | log_type 16bit | log_mode 16bit| bin
bin_logid = self.asign_uint2byte_bybits(self._logid, 128)
bin_type = self.asign_uint2byte_bybits(log_type, 16)
bin_mode = self.asign_uint2byte_bybits(log_mode, 16)
data = '{0}{1}{2}{3}'.format(
bin_logid, bin_type, bin_mode, log_binary
)
data_len = len(data)
str_data_len = self.asign_uint2byte_bybits(
data_len, self._record_lenbytes * 8)
write_data = '{0}{1}'.format(str_data_len, data)
log.info('{0} add_log, type {1} mode {2}, logid {3}, '
'datelen:{4}'.format(
self._name, log_type, log_mode, self._logid, data_len)
)
if self._write_data(write_data):
self._current_filesize += (data_len + len(str_data_len))
if not self._check_need_new_logfile():
ret = (False, None)
else:
ret = (True, self._logid)
else:
log.error('failed to add_log(type:{} mode {}'.format(
log_type, log_mode)
)
ret = (False, None)
self._mlock.release()
return ret

def purge_data(self, before_logid):
"""
log files which contains log (less than before_logid) will be purged.
Expand Down Expand Up @@ -482,9 +523,9 @@ def open4write(self, truncate_last_failure=True):
log.error('cannot create loglist, raise IOError')
raise IOError('cannot create loglist, {0}'.format(err))
log.info(
'try to recover from last '
'write if there is any need, truncate_last_failure:{0}'.format(
truncate_last_failure)
'{0} try to recover from last '
'write if there is any need, truncate_last_failure:{1}'.format(
self._name, truncate_last_failure)
)
self._recover_from_lastwriting(truncate_last_failure)

Expand All @@ -501,15 +542,15 @@ def _try_read_one_log(self, stream):
"""
if stream is None:
return LOGFILE_EOF
return (LOGFILE_EOF, None)
str_datalen = datalen = str_data = None
try:
str_datalen = stream.read(4)
str_datalen = stream.read(self._record_lenbytes)
if len(str_datalen) == 0:
return (LOGFILE_EOF, None)
if len(str_datalen) < 4:
if len(str_datalen) < self._record_lenbytes:
log.warn('got a bad log from stream:{0}'.format(stream.name))
return LOGFILE_BAD_RECORD
return (LOGFILE_BAD_RECORD, None)
datalen = self.convert_bytes2uint(str_datalen)
str_data = stream.read(datalen)
if len(str_data) < datalen:
Expand All @@ -518,17 +559,18 @@ def _try_read_one_log(self, stream):
stream.name)
)
return (LOGFILE_BAD_RECORD, None)
log_id = self.convert_bytes2uint(str_data[0:16])
log_type = self.convert_bytes2uint(str_data[16:2])
log_mode = self.convert_bytes2uint(str_data[18:2])
log_binary = self.convert_bytes2uint(str_data[20:])
log_id = self.convert_bytes2uint(str_data[0: 16])
log_type = self.convert_bytes2uint(str_data[16: 16 + 2])
log_mode = self.convert_bytes2uint(str_data[18: 18 + 2])
log_binary = str_data[20:]
return (
LOGFILE_GOOD, LogRecord(
datalen, log_id, log_type, log_mode, log_binary
)
)
except Exception as err:
log.error('failed to parse log record:{0}'.format(err))
log.error(traceback.format_exc())
return LOGFILE_BAD_RECORD

def _move2next_load_fname(self):
Expand Down Expand Up @@ -576,7 +618,7 @@ def read(self, record_num=128):
and continue reading
:return:
a. return a list of "record_num" of LogRecords.
a. return a list of "record_num" of LogRecord.
b. If the count number of list is less than record_num,
it means the stream encounter EOF, plz read again afterwards.
Expand Down Expand Up @@ -613,7 +655,7 @@ def read(self, record_num=128):
move2nextstream = False
ret = self._move2next_load_fname()
if LOGFILE_EOF == ret:
log.debug('does not have more log edits to read, plz retry')
log.debug('no more log edits to read, plz retry')
break
elif LOGFILE_GOOD == ret:
log.debug('moved to next log edit file, to read new log')
Expand Down
Loading

0 comments on commit d1d4d55

Please sign in to comment.