Skip to content

Commit

Permalink
ci example for cup.net.async
Browse files Browse the repository at this point in the history
  • Loading branch information
mythmgn committed Jul 3, 2016
1 parent 8dd8657 commit 1160459
Show file tree
Hide file tree
Showing 21 changed files with 858 additions and 0 deletions.
Empty file.
Empty file.
142 changes: 142 additions & 0 deletions examples/arrow-runtime/arrow/agent/arrow_agent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*
"""
:authors:
Guannan Ma @mythmgn
:create_date:
2016/05/05
:modify_date:
:description:
"""
import os
import sys
import signal

from cup import decorators
from cup import net
from cup.util import conf
from cup import log

from arrow.agent import control
from arrow.common import settings


_NOW_PATH = os.path.dirname(os.path.abspath(__file__)) + '/'
_TOP_PATH = os.path.abspath(_NOW_PATH + '/../../')


@decorators.Singleton
class Agent(object):
"""
Class of Agent.
"""
_heartbeat_sender = None
_control_service = None
_conf_dict = None
def __init__(self, conf_file):
# load conf
self._load_conf(conf_file)
# control service
ipaddr = net.get_hostip()
port = int(self._conf_dict['control']['port'])
# control service which control msg sending and receiving.
self._control_service = control.ControlService(
ipaddr, port, self._conf_dict
)
log.info('ip:{0}, port:{1}'.format(ipaddr, port))
self._stop_heart_beat = False

def _should_stop_heartbeat(self):
return self._stop_heart_beat

def _load_conf(self, conf_file):
"""load agent conf"""
if not os.path.exists(conf_file):
raise IOError('conf file not found:%s' % conf_file)
self._conf_dict = {}
user_confdict = conf.Configure2Dict(conf_file).get_dict()
default = settings.ARROW_MASTER_DEFAULT_PARAMS
control_conf = {}
log_conf = {}
control_conf['heartbeat_interval'] = settings.check_and_load_existence(
user_confdict, default, '["control"]["heartbeat_interval"]'
)
control_conf['master_ip'] = settings.check_and_load_existence(
user_confdict, default, '["control"]["master_ip"]'
)
control_conf['master_port'] = settings.check_and_load_existence(
user_confdict, default, '["control"]["master_port"]'
)
control_conf['queue_exec_thdnum'] = settings.check_and_load_existence(
user_confdict, default, '["control"]["queue_exec_thdnum"]'
)
control_conf['queue_delay_exe_thdnum'] = settings.check_and_load_existence(
user_confdict, default, '["control"]["queue_delay_exe_thdnum"]'
)
control_conf['port'] = settings.check_and_load_existence(
user_confdict, default, '["control"]["port"]'
)
control_conf['interface'] = settings.check_and_load_existence(
user_confdict, default, '["control"]["interface"]'
)


log_conf = settings.check_and_load_existence(
user_confdict, default, '["log"]["path"]'
)
self._conf_dict['control'] = control_conf
self._conf_dict['log'] = log_conf

# pylint: disable=R0201
def setup(self):
"""setup the master"""
log.info('agent setup')

def stop(self):
"""stop the master"""

def loop(self):
"""
run into loop until function stop is called.
"""
self._control_service.loop()

def signal_handler(self):
"""handle signals"""
self._control_service.stop()


def signal_handler(sig, _):
"""
signal handler for master.
When this process receive SIGTERM signal, it will start stopping process.
"""
if sig == signal.SIGTERM:
log.info('get SIGTERM, to stop arrow master')
agent = Agent(None)
agent.signal_handler()


def _main(argv):
"""main function"""
log.init_comlog('arrow_master', log.INFO,
_TOP_PATH + '/log/arrow_agent.log',
log.ROTATION,
1024000000,
False
)
signal.signal(signal.SIGTERM, signal_handler)
if len(argv) < 2:
sys.stderr.write('should specify conf path')
sys.exit(-1)
print argv[1]
agent = Agent(argv[1])
agent.loop()


if __name__ == '__main__':
_main(sys.argv)

# vi:set tw=0 ts=4 sw=4 nowrap fdm=indent
12 changes: 12 additions & 0 deletions examples/arrow-runtime/arrow/agent/conf/agent.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
[control]
port : 51112
master_ip : yq01-judy-fileserver41.yq01.baidu.com
master_port : 51100
heartbeat_interval : 10
interface : xgbe0
queue_exec_thdnum : 5


[log]
path : ./log/agent.log
split_size : 102400
139 changes: 139 additions & 0 deletions examples/arrow-runtime/arrow/agent/control.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*
"""
:authors:
Guannan Ma @mythmgn
:create_date:
"""
# import gc
import time

# import pympler
# from pympler import summary
# from pympler import muppy

from cup import log
from cup.services import heartbeat as cuphb
from cup.services import executor
from cup import net
from cup.net.async import msgcenter
from cup.net.async import msg

from arrow.common import settings
from arrow.common import service

class ControlService(msgcenter.IMessageCenter):
"""
control service for agent
"""
def __init__(self, ip, port, confdict):
msgcenter.IMessageCenter.__init__(self, ip, port)
# service.BaseService.__init__(self)
# status, 0 inited, 1 running 2 stopping, 3 stopped
self._confdict = confdict
self._status = service.ServiceStatus()
self._status.set_status(self._status.INITED)
self._type_man = msg.CMsgType()
self._type_man.register_types(settings.MSG_TYPE2NUM)
self._executor = executor.ExecutionService(
int(self._confdict['control']['queue_exec_thdnum']),
int(self._confdict['control']['queue_delay_exe_thdnum'])
)
self._agent_ipport = (ip, port)
self._master_ipport = (
net.get_hostip(self._confdict['control']['master_ip']),
int(self._confdict['control']['master_port'])
)
self._last_heartbeat = -1

def _send_heartbeat_loop(self):
if self._status.get_status() != self._status.RUNNING:
log.warn('control service will stop. stop sending heartbeat')
return
hostinfo = cuphb.LinuxHost(
str(self._agent_ipport), True,
self._confdict['control']['interface']
)
log.info('to create msg and send msg')
netmsg = msg.CNetMsg(is_postmsg=True)
netmsg.set_from_addr(self._agent_ipport, (1, 1))
netmsg.set_to_addr(self._master_ipport, (1, 1))
netmsg.set_flag(1)
netmsg.set_msg_type(self._type_man.getnumber_bytype('HEART_BEAT'))
netmsg.set_uniq_id(1)
netmsg.set_body(hostinfo.serilize())
self.post_msg(netmsg)
log.info('finish queue sending heartbeat to {0}'.format(self._master_ipport))
self._executor.delay_exec(
int(self._confdict['control']['heartbeat_interval']) - 3,
self._send_heartbeat_loop,
urgency=executor.URGENCY_HIGH
)

def test_abc(self):
"""test network speed of cup.net.async"""
if self._status.get_status() != self._status.RUNNING:
log.warn('control service is not running, stop heartbeat')
return
netmsg = None
hostinfo = 'a' * 128 * 1024
while self._status.get_status() == self._status.RUNNING:
# hostinfo = cuphb.LinuxHost(
# str(self._agent_ipport), True,
# self._confdict['control']['interface']
# )
netmsg = msg.CNetMsg(is_postmsg=True)
netmsg.set_from_addr(self._agent_ipport, (1, 1))
netmsg.set_to_addr(self._master_ipport, (1, 1))
netmsg.set_flag(self._last_heartbeat)
netmsg.set_msg_type(self._type_man.getnumber_bytype('HEART_BEAT'))
netmsg.set_uniq_id(1)
netmsg.set_body(hostinfo)
self.post_msg(netmsg)
# log.info('finish sending heartbeat to {0}'.format(self._master_ipport))

def _on_recv_heartbeat_ack(self, netmsg):
"""on recv ack msg"""
if netmsg.get_flag() == self._last_heartbeat:
log.info(
'got heartbeat from master:{0}'.format(netmsg.get_from_addr())
)

def handle(self, netmsg):
"""
handle netmsg
"""
log.debug('to handle msg in the child class')
msg_type = netmsg.get_msg_type()
src_peer, stub_future = netmsg.get_from_addr()
log.debug('got msg from: %s stub_future:%s' % (src_peer, stub_future))
if msg_type == self._type_man.getnumber_bytype('ACK_HEART_BEAT'):
self._executor.queue_exec(
self._on_recv_heartbeat_ack, executor.URGENCY_HIGH,
netmsg
)
else:
self.default_handle(msg)

def stop(self):
"""
stop the service
"""
log.info('to stop the arrow agent')
self._status.set_status(self._status.STOPPING)
self._executor.stop()
msgcenter.IMessageCenter.stop(self)
self._status.set_status(self._status.STOPPED)

def loop(self):
"""run loop"""
self._status.set_status(self._status.RUNNING)
self._executor.run()
self._send_heartbeat_loop()
if not msgcenter.IMessageCenter.run(self):
log.error('message center error happened')
self.stop()


# vi:set tw=0 ts=4 sw=4 nowrap fdm=indent
26 changes: 26 additions & 0 deletions examples/arrow-runtime/arrow/agent/unique_id.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*
"""
:authors:
Guannan Ma [email protected] @mythmgn
:create_date:
2016.5.10
:description:
generate unique id for services
"""

from cup import decorators

@decorators.Singleton
class UniqueID(object):
"""
generate unique id
"""
def __init__(self, low, high, increment):
pass

def next(self):
"""get next unique id"""


# vi:set tw=0 ts=4 sw=4 nowrap fdm=indent
Empty file.
45 changes: 45 additions & 0 deletions examples/arrow-runtime/arrow/common/resource.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*
# #############################################################
#
# Copyright (c) 2014 Baidu.com, Inc. All Rights Reserved
#
# #############################################################
"""
:authors:
Guannan Ma [email protected] @mythmgn
:create_date:
2016/04/05 17:23:06
:modify_date:
:description:
"""

# from cup.net.async import msg
from cup.services import heartbeat


class BufferSerilizer(object):
"""
buffer serializer
"""
def __init__(self, buff):
pass

def serialize(self):
"""serialize the buffer"""

def deserialize(self, buff):
"""deserialize the buffer"""


class AgentResource(heartbeat.LinuxHost):
"""
resource
"""
def __init__(self, init_this_host=False, iface='eth0'):
super(self.__class__).__init__(self, init_this_host, iface)


# vi:set tw=0 ts=4 sw=4 nowrap fdm=indent
Loading

0 comments on commit 1160459

Please sign in to comment.