From 116045966e2a0e30b38597d2edb8add48c97bf40 Mon Sep 17 00:00:00 2001 From: Guannan Ma Date: Sun, 3 Jul 2016 22:44:35 +0800 Subject: [PATCH] ci example for cup.net.async --- examples/arrow-runtime/arrow/__init__.py | 0 .../arrow-runtime/arrow/agent/__init__.py | 0 .../arrow-runtime/arrow/agent/arrow_agent.py | 142 ++++++++++++++++ .../arrow-runtime/arrow/agent/conf/agent.conf | 12 ++ examples/arrow-runtime/arrow/agent/control.py | 139 +++++++++++++++ .../arrow-runtime/arrow/agent/unique_id.py | 26 +++ .../arrow-runtime/arrow/common/__init__.py | 0 .../arrow-runtime/arrow/common/resource.py | 45 +++++ .../arrow-runtime/arrow/common/service.py | 47 +++++ .../arrow-runtime/arrow/common/settings.py | 95 +++++++++++ .../arrow-runtime/arrow/master/__init__.py | 0 .../arrow/master/arrow_master.py | 160 ++++++++++++++++++ .../arrow/master/conf/master.conf | 5 + .../arrow-runtime/arrow/master/control.py | 132 +++++++++++++++ .../arrow-runtime/arrow/master/heartbeat.py | 24 +++ .../arrow-runtime/arrow/master/master.pid | 1 + examples/arrow-runtime/arrow/master/nohup.out | 4 + examples/arrow-runtime/bin/start-agent.sh | 6 + examples/arrow-runtime/bin/start-master.sh | 7 + .../arrow-runtime/bin/start-pypy-agent.sh | 6 + .../arrow-runtime/bin/start-pypy-master.sh | 7 + 21 files changed, 858 insertions(+) create mode 100644 examples/arrow-runtime/arrow/__init__.py create mode 100644 examples/arrow-runtime/arrow/agent/__init__.py create mode 100644 examples/arrow-runtime/arrow/agent/arrow_agent.py create mode 100644 examples/arrow-runtime/arrow/agent/conf/agent.conf create mode 100644 examples/arrow-runtime/arrow/agent/control.py create mode 100644 examples/arrow-runtime/arrow/agent/unique_id.py create mode 100644 examples/arrow-runtime/arrow/common/__init__.py create mode 100644 examples/arrow-runtime/arrow/common/resource.py create mode 100644 examples/arrow-runtime/arrow/common/service.py create mode 100644 examples/arrow-runtime/arrow/common/settings.py create mode 100644 examples/arrow-runtime/arrow/master/__init__.py create mode 100644 examples/arrow-runtime/arrow/master/arrow_master.py create mode 100644 examples/arrow-runtime/arrow/master/conf/master.conf create mode 100644 examples/arrow-runtime/arrow/master/control.py create mode 100644 examples/arrow-runtime/arrow/master/heartbeat.py create mode 100644 examples/arrow-runtime/arrow/master/master.pid create mode 100644 examples/arrow-runtime/arrow/master/nohup.out create mode 100644 examples/arrow-runtime/bin/start-agent.sh create mode 100644 examples/arrow-runtime/bin/start-master.sh create mode 100644 examples/arrow-runtime/bin/start-pypy-agent.sh create mode 100644 examples/arrow-runtime/bin/start-pypy-master.sh diff --git a/examples/arrow-runtime/arrow/__init__.py b/examples/arrow-runtime/arrow/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/examples/arrow-runtime/arrow/agent/__init__.py b/examples/arrow-runtime/arrow/agent/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/examples/arrow-runtime/arrow/agent/arrow_agent.py b/examples/arrow-runtime/arrow/agent/arrow_agent.py new file mode 100644 index 0000000..343dcd6 --- /dev/null +++ b/examples/arrow-runtime/arrow/agent/arrow_agent.py @@ -0,0 +1,142 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -* +""" +:authors: + Guannan Ma @mythmgn +:create_date: + 2016/05/05 +:modify_date: + +:description: + +""" +import os +import sys +import signal + +from cup import decorators +from cup import net +from cup.util import conf +from cup import log + +from arrow.agent import control +from arrow.common import settings + + +_NOW_PATH = os.path.dirname(os.path.abspath(__file__)) + '/' +_TOP_PATH = os.path.abspath(_NOW_PATH + '/../../') + + +@decorators.Singleton +class Agent(object): + """ + Class of Agent. + """ + _heartbeat_sender = None + _control_service = None + _conf_dict = None + def __init__(self, conf_file): + # load conf + self._load_conf(conf_file) + # control service + ipaddr = net.get_hostip() + port = int(self._conf_dict['control']['port']) + # control service which control msg sending and receiving. + self._control_service = control.ControlService( + ipaddr, port, self._conf_dict + ) + log.info('ip:{0}, port:{1}'.format(ipaddr, port)) + self._stop_heart_beat = False + + def _should_stop_heartbeat(self): + return self._stop_heart_beat + + def _load_conf(self, conf_file): + """load agent conf""" + if not os.path.exists(conf_file): + raise IOError('conf file not found:%s' % conf_file) + self._conf_dict = {} + user_confdict = conf.Configure2Dict(conf_file).get_dict() + default = settings.ARROW_MASTER_DEFAULT_PARAMS + control_conf = {} + log_conf = {} + control_conf['heartbeat_interval'] = settings.check_and_load_existence( + user_confdict, default, '["control"]["heartbeat_interval"]' + ) + control_conf['master_ip'] = settings.check_and_load_existence( + user_confdict, default, '["control"]["master_ip"]' + ) + control_conf['master_port'] = settings.check_and_load_existence( + user_confdict, default, '["control"]["master_port"]' + ) + control_conf['queue_exec_thdnum'] = settings.check_and_load_existence( + user_confdict, default, '["control"]["queue_exec_thdnum"]' + ) + control_conf['queue_delay_exe_thdnum'] = settings.check_and_load_existence( + user_confdict, default, '["control"]["queue_delay_exe_thdnum"]' + ) + control_conf['port'] = settings.check_and_load_existence( + user_confdict, default, '["control"]["port"]' + ) + control_conf['interface'] = settings.check_and_load_existence( + user_confdict, default, '["control"]["interface"]' + ) + + + log_conf = settings.check_and_load_existence( + user_confdict, default, '["log"]["path"]' + ) + self._conf_dict['control'] = control_conf + self._conf_dict['log'] = log_conf + + # pylint: disable=R0201 + def setup(self): + """setup the master""" + log.info('agent setup') + + def stop(self): + """stop the master""" + + def loop(self): + """ + run into loop until function stop is called. + """ + self._control_service.loop() + + def signal_handler(self): + """handle signals""" + self._control_service.stop() + + +def signal_handler(sig, _): + """ + signal handler for master. + When this process receive SIGTERM signal, it will start stopping process. + """ + if sig == signal.SIGTERM: + log.info('get SIGTERM, to stop arrow master') + agent = Agent(None) + agent.signal_handler() + + +def _main(argv): + """main function""" + log.init_comlog('arrow_master', log.INFO, + _TOP_PATH + '/log/arrow_agent.log', + log.ROTATION, + 1024000000, + False + ) + signal.signal(signal.SIGTERM, signal_handler) + if len(argv) < 2: + sys.stderr.write('should specify conf path') + sys.exit(-1) + print argv[1] + agent = Agent(argv[1]) + agent.loop() + + +if __name__ == '__main__': + _main(sys.argv) + +# vi:set tw=0 ts=4 sw=4 nowrap fdm=indent diff --git a/examples/arrow-runtime/arrow/agent/conf/agent.conf b/examples/arrow-runtime/arrow/agent/conf/agent.conf new file mode 100644 index 0000000..68a2121 --- /dev/null +++ b/examples/arrow-runtime/arrow/agent/conf/agent.conf @@ -0,0 +1,12 @@ +[control] + port : 51112 + master_ip : yq01-judy-fileserver41.yq01.baidu.com + master_port : 51100 + heartbeat_interval : 10 + interface : xgbe0 + queue_exec_thdnum : 5 + + +[log] + path : ./log/agent.log + split_size : 102400 diff --git a/examples/arrow-runtime/arrow/agent/control.py b/examples/arrow-runtime/arrow/agent/control.py new file mode 100644 index 0000000..c9f08b9 --- /dev/null +++ b/examples/arrow-runtime/arrow/agent/control.py @@ -0,0 +1,139 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -* +""" +:authors: + Guannan Ma @mythmgn +:create_date: + +""" +# import gc +import time + +# import pympler +# from pympler import summary +# from pympler import muppy + +from cup import log +from cup.services import heartbeat as cuphb +from cup.services import executor +from cup import net +from cup.net.async import msgcenter +from cup.net.async import msg + +from arrow.common import settings +from arrow.common import service + +class ControlService(msgcenter.IMessageCenter): + """ + control service for agent + """ + def __init__(self, ip, port, confdict): + msgcenter.IMessageCenter.__init__(self, ip, port) + # service.BaseService.__init__(self) + # status, 0 inited, 1 running 2 stopping, 3 stopped + self._confdict = confdict + self._status = service.ServiceStatus() + self._status.set_status(self._status.INITED) + self._type_man = msg.CMsgType() + self._type_man.register_types(settings.MSG_TYPE2NUM) + self._executor = executor.ExecutionService( + int(self._confdict['control']['queue_exec_thdnum']), + int(self._confdict['control']['queue_delay_exe_thdnum']) + ) + self._agent_ipport = (ip, port) + self._master_ipport = ( + net.get_hostip(self._confdict['control']['master_ip']), + int(self._confdict['control']['master_port']) + ) + self._last_heartbeat = -1 + + def _send_heartbeat_loop(self): + if self._status.get_status() != self._status.RUNNING: + log.warn('control service will stop. stop sending heartbeat') + return + hostinfo = cuphb.LinuxHost( + str(self._agent_ipport), True, + self._confdict['control']['interface'] + ) + log.info('to create msg and send msg') + netmsg = msg.CNetMsg(is_postmsg=True) + netmsg.set_from_addr(self._agent_ipport, (1, 1)) + netmsg.set_to_addr(self._master_ipport, (1, 1)) + netmsg.set_flag(1) + netmsg.set_msg_type(self._type_man.getnumber_bytype('HEART_BEAT')) + netmsg.set_uniq_id(1) + netmsg.set_body(hostinfo.serilize()) + self.post_msg(netmsg) + log.info('finish queue sending heartbeat to {0}'.format(self._master_ipport)) + self._executor.delay_exec( + int(self._confdict['control']['heartbeat_interval']) - 3, + self._send_heartbeat_loop, + urgency=executor.URGENCY_HIGH + ) + + def test_abc(self): + """test network speed of cup.net.async""" + if self._status.get_status() != self._status.RUNNING: + log.warn('control service is not running, stop heartbeat') + return + netmsg = None + hostinfo = 'a' * 128 * 1024 + while self._status.get_status() == self._status.RUNNING: + # hostinfo = cuphb.LinuxHost( + # str(self._agent_ipport), True, + # self._confdict['control']['interface'] + # ) + netmsg = msg.CNetMsg(is_postmsg=True) + netmsg.set_from_addr(self._agent_ipport, (1, 1)) + netmsg.set_to_addr(self._master_ipport, (1, 1)) + netmsg.set_flag(self._last_heartbeat) + netmsg.set_msg_type(self._type_man.getnumber_bytype('HEART_BEAT')) + netmsg.set_uniq_id(1) + netmsg.set_body(hostinfo) + self.post_msg(netmsg) + # log.info('finish sending heartbeat to {0}'.format(self._master_ipport)) + + def _on_recv_heartbeat_ack(self, netmsg): + """on recv ack msg""" + if netmsg.get_flag() == self._last_heartbeat: + log.info( + 'got heartbeat from master:{0}'.format(netmsg.get_from_addr()) + ) + + def handle(self, netmsg): + """ + handle netmsg + """ + log.debug('to handle msg in the child class') + msg_type = netmsg.get_msg_type() + src_peer, stub_future = netmsg.get_from_addr() + log.debug('got msg from: %s stub_future:%s' % (src_peer, stub_future)) + if msg_type == self._type_man.getnumber_bytype('ACK_HEART_BEAT'): + self._executor.queue_exec( + self._on_recv_heartbeat_ack, executor.URGENCY_HIGH, + netmsg + ) + else: + self.default_handle(msg) + + def stop(self): + """ + stop the service + """ + log.info('to stop the arrow agent') + self._status.set_status(self._status.STOPPING) + self._executor.stop() + msgcenter.IMessageCenter.stop(self) + self._status.set_status(self._status.STOPPED) + + def loop(self): + """run loop""" + self._status.set_status(self._status.RUNNING) + self._executor.run() + self._send_heartbeat_loop() + if not msgcenter.IMessageCenter.run(self): + log.error('message center error happened') + self.stop() + + +# vi:set tw=0 ts=4 sw=4 nowrap fdm=indent diff --git a/examples/arrow-runtime/arrow/agent/unique_id.py b/examples/arrow-runtime/arrow/agent/unique_id.py new file mode 100644 index 0000000..393942b --- /dev/null +++ b/examples/arrow-runtime/arrow/agent/unique_id.py @@ -0,0 +1,26 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -* +""" +:authors: + Guannan Ma maguannan@baidu.com @mythmgn +:create_date: + 2016.5.10 +:description: + generate unique id for services +""" + +from cup import decorators + +@decorators.Singleton +class UniqueID(object): + """ + generate unique id + """ + def __init__(self, low, high, increment): + pass + + def next(self): + """get next unique id""" + + +# vi:set tw=0 ts=4 sw=4 nowrap fdm=indent diff --git a/examples/arrow-runtime/arrow/common/__init__.py b/examples/arrow-runtime/arrow/common/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/examples/arrow-runtime/arrow/common/resource.py b/examples/arrow-runtime/arrow/common/resource.py new file mode 100644 index 0000000..6ebb9d3 --- /dev/null +++ b/examples/arrow-runtime/arrow/common/resource.py @@ -0,0 +1,45 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -* +# ############################################################# +# +# Copyright (c) 2014 Baidu.com, Inc. All Rights Reserved +# +# ############################################################# +""" +:authors: + Guannan Ma maguannan@baidu.com @mythmgn +:create_date: + 2016/04/05 17:23:06 +:modify_date: + +:description: + +""" + +# from cup.net.async import msg +from cup.services import heartbeat + + +class BufferSerilizer(object): + """ + buffer serializer + """ + def __init__(self, buff): + pass + + def serialize(self): + """serialize the buffer""" + + def deserialize(self, buff): + """deserialize the buffer""" + + +class AgentResource(heartbeat.LinuxHost): + """ + resource + """ + def __init__(self, init_this_host=False, iface='eth0'): + super(self.__class__).__init__(self, init_this_host, iface) + + +# vi:set tw=0 ts=4 sw=4 nowrap fdm=indent diff --git a/examples/arrow-runtime/arrow/common/service.py b/examples/arrow-runtime/arrow/common/service.py new file mode 100644 index 0000000..2d1e423 --- /dev/null +++ b/examples/arrow-runtime/arrow/common/service.py @@ -0,0 +1,47 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -* +# ############################################################# +# +# Copyright (c) 2014 Baidu.com, Inc. All Rights Reserved +# +# ############################################################# +""" +:authors: + Guannan Ma maguannan@baidu.com @mythmgn +:create_date: + [% date('%c') %] +:modify_date: + +:description: + +""" + +class ServiceStatus(object): + """ + BaseService for arrow + """ + INITED = 0 + RUNNING = 1 + STOPPING = 2 + STOPPED = 3 + + def __init__(self): + self._statuslist = [ + self.INITED, self.RUNNING, self.STOPPING, self.STOPPED + ] + self._status = self.INITED + + def set_status(self, status): + """set status, return true if set successfully""" + if status not in self._statuslist: + return False + else: + self._status = status + return True + + def get_status(self): + """get status""" + return self._status + + +# vi:set tw=0 ts=4 sw=4 nowrap fdm=indent diff --git a/examples/arrow-runtime/arrow/common/settings.py b/examples/arrow-runtime/arrow/common/settings.py new file mode 100644 index 0000000..df5235f --- /dev/null +++ b/examples/arrow-runtime/arrow/common/settings.py @@ -0,0 +1,95 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -* +# ############################################################# +# +# Copyright (c) 2014 Baidu.com, Inc. All Rights Reserved +# +# ############################################################# +""" +:authors: + Guannan Ma maguannan@baidu.com @mythmgn +:create_date: + 2014/04/05 17:23:06 +:modify_date: + +:description: + +""" +from cup import log + +ARROW_MASTER_DEFAULT_PARAMS = { + 'control': { + # internal + 'queue_delay_exe_thdnum': 4, + 'queue_exec_thdnum': 3, + 'local_datadir': './data/', + + # default values which can be configure from outside + 'check_heartbeat_interval': 10, + 'judge_agent_dead_in_sec': 30, + 'keep_lost': 1, + }, +} + +ARROW_AGENT_DEFAULT_PARAMS = { + 'control': { + 'heartbeat_interval': 10, + 'master_ip': '127.0.0.1', + 'master_port': '51100', + 'interface': 'eth1' + }, + 'log': { + 'path': './log/agent.log' + } +} + + +MSG_TYPE2NUM = { + 'HEART_BEAT': 1, + 'RESOURCE_ACQUIRE': 2, + 'RESOURCE_RELEASE': 3, + 'ACK_OK': 4, + 'ACK_FAILURE': 5, + 'ACK_HEART_BEAT': 6 +} + + +class ConfItemError(Exception): + """conf item error""" + def __init__(self, msg): + """ + """ + + + def repr(self): + """ repr the error msg """ + return self._msg + + +def check_and_load_existence(user_confdict, default_dict, key, required=False): + """ + check if the conf item is required to be existent. + Use default if it's not required and does not exist. + Raise ConfItemError if it's required and does not exists + """ + confitem = None + try: + # try user conf dict + confitem = eval('user_confdict{0}'.format(key)) + except KeyError: + log.debug('user conf does not have {0} in user_confdict'.format(key)) + + if confitem is None: + try: + # try user conf dict + confitem = eval('default_dict{0}'.format(key)) + log.info('{0} will use default value:{1}'.format( + key, confitem) + ) + except KeyError: + log.warn('default conf does not have {0}'.format(key)) + if confitem is None and required: + raise ConfItemError('{0} should exist'.format(key)) + return confitem + +# vi:set tw=0 ts=4 sw=4 nowrap fdm=indent diff --git a/examples/arrow-runtime/arrow/master/__init__.py b/examples/arrow-runtime/arrow/master/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/examples/arrow-runtime/arrow/master/arrow_master.py b/examples/arrow-runtime/arrow/master/arrow_master.py new file mode 100644 index 0000000..b15755b --- /dev/null +++ b/examples/arrow-runtime/arrow/master/arrow_master.py @@ -0,0 +1,160 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -* +""" +:authors: + Guannan Ma @mythmgn +:create_date: + 2016/05/05 +:description: + TODO: + 1. Heartbeat with resource + 2. Serialize base for all network msg + 3. Heartbeat network msg + +""" +import os +import sys +import signal + +_NOW_PATH = os.path.dirname(os.path.abspath(__file__)) + '/' +_TOP_PATH = os.path.abspath(_NOW_PATH + '/../../') + +from cup import decorators +from cup import log +from cup import net +from cup.util import conf + +from arrow.master import control +from arrow.common import settings + + +@decorators.Singleton +class Master(object): + """ + Master class + """ + _heartbeat_service = None + _control_service = None + _conf_dict = None + def __init__(self, conf_file): + # load conf + self._load_conf(conf_file) + # control service + self._control_service = control.ControlService( + net.get_hostip(), int(self._conf_dict['control']['port']), + self._conf_dict + ) + + def _load_conf(self, conf_file): + """ + load conf into memory. + If user conf does not have the conf item, arrow will use + default values in arrow.common.settings(.py) + """ + if not os.path.exists(conf_file): + raise IOError('conf file not found:%s' % conf_file) + self._conf_dict = {} + user_confdict = conf.Configure2Dict(conf_file).get_dict() + default = settings.ARROW_MASTER_DEFAULT_PARAMS + self._conf_dict['control'] = {} + settings.check_and_load_existence( + user_confdict, default, '["control"]' + ) + self._conf_dict['control']['queue_exec_thdnum'] = \ + settings.check_and_load_existence( + user_confdict, default, '["control"]["queue_exec_thdnum"]' + ) + self._conf_dict['control']['queue_delay_exe_thdnum'] = \ + settings.check_and_load_existence( + user_confdict, default, '["control"]["queue_delay_exe_thdnum"]' + ) + self._conf_dict['control']['queue_exec_thdnum'] = \ + settings.check_and_load_existence( + user_confdict, default, '["control"]["queue_exec_thdnum"]' + ) + self._conf_dict['control']['local_datadir'] = \ + settings.check_and_load_existence( + user_confdict, default, '["control"]["local_datadir"]' + ) + + self._conf_dict['control']['check_heartbeat_interval'] = \ + settings.check_and_load_existence( + user_confdict, default, '["control"]["check_heartbeat_interval"]' + ) + self._conf_dict['control']['judge_agent_dead_in_sec'] = \ + settings.check_and_load_existence( + user_confdict, default, '["control"]["judge_agent_dead_in_sec"]' + ) + self._conf_dict['control']['keep_lost'] = \ + settings.check_and_load_existence( + user_confdict, default, '["control"]["keep_lost"]' + ) + self._conf_dict['control']['port'] = \ + settings.check_and_load_existence( + user_confdict, default, '["control"]["port"]' + ) + + # pylint: disable=R0201 + def setup(self): + """setup the master""" + log.info('master setup') + + def run(self): + """run the master service""" + self.setup() + + def stop(self): + """stop the master""" + + log.info('to stop the arrow master') + log.info('to stop control service') + self._control_service.stop() + log.info('arrow master stopped') + + def loop(self): + """ + run into loop until function stop is called + """ + pid = os.getpid() + with open('master.pid', 'w+') as fhandle: + fhandle.write('{0}'.format(pid)) + self._control_service.run() + + def signal_handler(self): + """handle signals""" + log.info('to stop control service') + self._control_service.stop() + + +def signal_handler(sig, _): + """ + signal handler for master. + When this process receive SIGTERM signal, it will start stopping process. + """ + if sig == signal.SIGTERM: + log.info('get SIGTERM, to stop arrow master') + master = Master(None) + master.signal_handler() + + +def _main(argv): + """main function""" + log.init_comlog('arrow_master', log.INFO, + _TOP_PATH + '/log/arrow_master.log', + log.ROTATION, + 1024000000, + False + ) + signal.signal(signal.SIGTERM, signal_handler) + if len(argv) < 2: + sys.stderr.write('should specify conf path') + sys.exit(-1) + print argv[1] + master = Master(argv[1]) + master.loop() + + +if __name__ == '__main__': + _main(sys.argv) + +# vi:set tw=0 ts=4 sw=4 nowrap fdm=indent diff --git a/examples/arrow-runtime/arrow/master/conf/master.conf b/examples/arrow-runtime/arrow/master/conf/master.conf new file mode 100644 index 0000000..b02d0df --- /dev/null +++ b/examples/arrow-runtime/arrow/master/conf/master.conf @@ -0,0 +1,5 @@ +[control] + check_heartbeat_interval : 10 + judge_agent_dead_in_sec : 60 + keep_lost_agent : 1 + port : 51100 diff --git a/examples/arrow-runtime/arrow/master/control.py b/examples/arrow-runtime/arrow/master/control.py new file mode 100644 index 0000000..f446c8a --- /dev/null +++ b/examples/arrow-runtime/arrow/master/control.py @@ -0,0 +1,132 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -* +""" +:authors: + Guannan Ma @mythmgn +:create_date: + +""" +# import os +# import sys +# import copy +# import objgraph + +# from cup import decorators +from cup import log +# from cup import net +from cup.services import executor +from cup.net.async import msgcenter +from cup.net.async import msg +from cup.services import heartbeat as hb_service +# from cup.util import conf + +# from arrow.master import heartbeat +from arrow.common import settings +# from arrow.common import service + +class ControlService(msgcenter.IMessageCenter): + def __init__(self, ip, port, confdict): + """control service of arrow master""" + # status, 0 inited, 1 running 2 stopping, 3 stopped + msgcenter.IMessageCenter.__init__(self, ip, port) + self._master_ipport = (ip, port) + self._confdict = confdict + self._status = 0 + self._type_man = msg.CMsgType() + self._type_man.register_types(settings.MSG_TYPE2NUM) + self._executor = executor.ExecutionService( + self._confdict['control']['queue_exec_thdnum'], + self._confdict['control']['queue_delay_exe_thdnum'] + ) + self._heartbeat_service = hb_service.HeartbeatService( + self._confdict['control']['judge_agent_dead_in_sec'], + self._confdict['control']['keep_lost'] + ) + self._msg_recv = 0 + + def _add_new_agent(self, ipaddr, port, resource=None): + key = '%s:%s' % (ipaddr, port) + # refresh heart for agent(str: 'ip:port') + self._heartbeat_service.refresh(key, resource) + + def _on_heartbeat(self, netmsg): + ip_port, _ = netmsg.get_from_addr() + log.info( + 'receive heartbeat, msg_len:%d, msg_flag:%d, msg_src:%s, ' + 'uniqid:%d' % + ( + netmsg.get_msg_len(), + netmsg.get_flag(), + str(ip_port), + netmsg.get_uniq_id() + ) + ) + ack_msg = msg.CNetMsg(is_postmsg=True) + ack_msg.set_from_addr(self._master_ipport, (1, 1)) + ipaddr, stub_future = netmsg.get_from_addr() + ack_msg.set_to_addr(ipaddr, stub_future) + ack_msg.set_flag(netmsg.get_flag()) + ack_msg.set_msg_type(self._type_man.getnumber_bytype('ACK_HEART_BEAT')) + ack_msg.set_uniq_id(netmsg.get_uniq_id() + 1) + ack_msg.set_body('ACK_HEART_BEAT') + resource = hb_service.LinuxHost(name=str(self._master_ipport)) + resource.deserilize(netmsg.get_body()) + self._heartbeat_service.refresh( + '%s:%s' % (ip_port[0], ip_port[1]), resource + ) + self.post_msg(ack_msg) + return + + def _do_heartbeat(self, msg): + pass + + def _do_check_dead_agent(self): + lost = self._heartbeat_service.get_lost() + # schedule next handle dead_agent + # status 2 == stopping + if self._status != 2: + self._executor.queue_exec( + settings.ARROW_MASTER_DEFAULT_PARAMS['check_heartbeat_interval'], + self._do_heartbeat, + 1, + None + ) + else: + log.info( + 'ControlService is stopping. Check dead agent service' + 'exited' + ) + + def run(self): + """run control service""" + self._executor.run() + # call CUP message center to run + msgcenter.IMessageCenter.run(self) + + def stop(self): + """stop control service""" + msgcenter.IMessageCenter.stop(self) + self._executor.stop() + + def handle(self, msg): + """ + handle msg + """ + log.debug('to handle msg in the child class') + msg_type = msg.get_msg_type() + src_peer, stub_future = msg.get_from_addr() + # log.debug('got msg from: %s stub_future:%s' % (src_peer, stub_future)) + # log.debug('type of msg_type:{0}, settings msg_type:{1}'.format( + # type(msg_type), type(self._type_man.getnumber_bytype('HEART_BEAT')) + # )) + if msg_type == self._type_man.getnumber_bytype('HEART_BEAT'): + self._executor.queue_exec( + self._on_heartbeat, + 1, + msg + ) + else: + self.default_handle(msg) + + +# vi:set tw=0 ts=4 sw=4 nowrap fdm=indent diff --git a/examples/arrow-runtime/arrow/master/heartbeat.py b/examples/arrow-runtime/arrow/master/heartbeat.py new file mode 100644 index 0000000..1a5ada1 --- /dev/null +++ b/examples/arrow-runtime/arrow/master/heartbeat.py @@ -0,0 +1,24 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -* +""" +:authors: + Guannan Ma @mythmgn +:create_date: + 2016/06/07 +:description: + heartbeat service +""" + +from cup.services import heartbeat + +class HeartbeatService(heartbeat.HeartbeatService): + """ + heartbeat service. not in use yet + """ + def __init__(self, judge_lost_in_sec, keep_lost=False): + heartbeat.HeartbeatService.__init__(self, judge_lost_in_sec, keep_lost) + self._judge_lost_in_sec = judge_lost_in_sec + self._keep_lost = keep_lost + + +# vi:set tw=0 ts=4 sw=4 nowrap fdm=indent diff --git a/examples/arrow-runtime/arrow/master/master.pid b/examples/arrow-runtime/arrow/master/master.pid new file mode 100644 index 0000000..a64a8f2 --- /dev/null +++ b/examples/arrow-runtime/arrow/master/master.pid @@ -0,0 +1 @@ +4165 \ No newline at end of file diff --git a/examples/arrow-runtime/arrow/master/nohup.out b/examples/arrow-runtime/arrow/master/nohup.out new file mode 100644 index 0000000..ba34fcd --- /dev/null +++ b/examples/arrow-runtime/arrow/master/nohup.out @@ -0,0 +1,4 @@ +Traceback (most recent call last): + File "arrow_master.py", line 37, in + from arrow.master import heartbeat +ImportError: No module named arrow.master diff --git a/examples/arrow-runtime/bin/start-agent.sh b/examples/arrow-runtime/bin/start-agent.sh new file mode 100644 index 0000000..284c3e3 --- /dev/null +++ b/examples/arrow-runtime/bin/start-agent.sh @@ -0,0 +1,6 @@ +#!/bin/bash + +export PYTHONPATH=`pwd`/:${PYTHONPATH} + +cd ./arrow/agent/; nohup python ./arrow_agent.py ./conf/agent.conf ../../log/agent.stdout.stderr & + diff --git a/examples/arrow-runtime/bin/start-master.sh b/examples/arrow-runtime/bin/start-master.sh new file mode 100644 index 0000000..96cc51f --- /dev/null +++ b/examples/arrow-runtime/bin/start-master.sh @@ -0,0 +1,7 @@ +#!/bin/bash + +export PYTHONPATH=`pwd`/:${PYTHONPATH} + +cd ./arrow/master/; nohup python ./arrow_master.py ./conf/master.conf >../../log/master.stdout.stderr & +# cd ./arrow/master/; nohup pypy ./arrow_master.py ./conf/master.conf >../../log/master.stdout.stderr & + diff --git a/examples/arrow-runtime/bin/start-pypy-agent.sh b/examples/arrow-runtime/bin/start-pypy-agent.sh new file mode 100644 index 0000000..f8e55ad --- /dev/null +++ b/examples/arrow-runtime/bin/start-pypy-agent.sh @@ -0,0 +1,6 @@ +#!/bin/bash + +export PYTHONPATH=`pwd`/:${PYTHONPATH} + +cd ./arrow/agent/; nohup pypy ./arrow_agent.py ./conf/agent.conf >../../log/agent.stdout.stderr & + diff --git a/examples/arrow-runtime/bin/start-pypy-master.sh b/examples/arrow-runtime/bin/start-pypy-master.sh new file mode 100644 index 0000000..af1612b --- /dev/null +++ b/examples/arrow-runtime/bin/start-pypy-master.sh @@ -0,0 +1,7 @@ +#!/bin/bash + +export PYTHONPATH=`pwd`/:${PYTHONPATH} + +# cd ./arrow/master/; nohup python ./arrow_master.py ./conf/master.conf >../../log/master.stdout.stderr & +cd ./arrow/master/; nohup pypy ./arrow_master.py ./conf/master.conf >../../log/master.stdout.stderr & +