Skip to content

Commit

Permalink
修正了一些列BUG,增加了对Task的处理
Browse files Browse the repository at this point in the history
  • Loading branch information
dongasai committed Jul 5, 2018
1 parent 5d4bc26 commit 5deeb7d
Show file tree
Hide file tree
Showing 10 changed files with 167 additions and 53 deletions.
1 change: 1 addition & 0 deletions src/App.php
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ private function receive($server, $fd, $reactor_id, $string)
$dispatcher->setActionSuffix('');
$dispatcher->setTaskSuffix('');
$dispatcher->setConnect($connect);
$dispatcher->setServer($server);
$dispatcher->setEventsManager($this->eventsManager);
output([
'n' => $router->getNamespaceName(),
Expand Down
5 changes: 3 additions & 2 deletions src/Controller.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
* @property \pms\bear\Counnect $connect
* @property \pms\Session $session
* @property \Phalcon\Config $dConfig
* @property \Swoole\Server $swoole_server;
* @package pms
*/
class Controller extends \Phalcon\Di\Injectable
Expand All @@ -18,9 +19,9 @@ class Controller extends \Phalcon\Di\Injectable
public $connect;
public $session;

public function __construct()
final public function __construct(\Swoole\Server $swoole_server)
{

$this->swoole_server=$swoole_server;
}


Expand Down
13 changes: 11 additions & 2 deletions src/Dispatcher.php
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,16 @@ public function setConnect(bear\Counnect $connect)
$this->connect = $connect;
}


/**
* 设置连接对象
* @param Counnect $connect
*/
public function setServer(\swoole\server $swoole_server)
{
$this->swoole_server = $swoole_server;
}

/**
* Process the results of the router by calling into the appropriate controller action(s)
* including any routing data or injected parameters.
Expand Down Expand Up @@ -139,11 +149,10 @@ public function dispatch()
}


$handler = new $handlerClass();
$handler = new $handlerClass($this->swoole_server);
if ($session_init) {
$handler->session = $session;
}

$wasFresh = true;

// Handlers must be only objects
Expand Down
59 changes: 26 additions & 33 deletions src/Register.php
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,32 @@ public function __construct(\Swoole\Server $server)
$this->register_client->start();
}

/**
* 配置更新
*/
public function ping()
{

$data = [
'name' => strtolower(SERVICE_NAME),
'host' => APP_HOST_IP,
'port' => APP_HOST_PORT,
'k' => $this->get_key()
];
Output::info('ping', 'ping');
if ($this->reg_status) {
# 注册完毕进行ping
$re = $this->register_client->send_ask('/service/ping', $data);
} else {
# 没有注册完毕,先注册
$re = $this->register_client->send_ask('/service/reg', $data);
}
if ($re === false) {
$this->register_client->start();
}
output($re, "ping_re");

}

/**
* 获取通讯key
Expand All @@ -51,7 +77,6 @@ private function get_key()
return md5(md5(get_env('REGISTER_SECRET_KEY')) . md5(strtolower(SERVICE_NAME)));
}


/**
* 发送数据
* @param $data
Expand All @@ -62,7 +87,6 @@ public function send($router, $data)
return $this->register_client->send_ask($router, $data);
}


/**
* 链接成功
* @param \swoole_client $cli
Expand All @@ -73,7 +97,6 @@ public function connect(Event $event, Client $Client)
$this->ping();
}


/**
* 收到返回值
* @param Event $event
Expand All @@ -83,7 +106,6 @@ public function connect(Event $event, Client $Client)
*/
public function receive(Event $event, bear\Client $Client, $data)
{
Output::debug($data, 'receive_reg');
$error = $data['e'] ?? 0;
if (!$error) {
#没有错误 config_init config_md5 config_data
Expand All @@ -101,40 +123,11 @@ public function receive(Event $event, bear\Client $Client, $data)
private function save($data)
{
$type = $data['t'];
Output::debug($data, 'reg_save');
if ($type == '/service/reg') {
$this->reg_status = 1;
}

}


/**
* 配置更新
*/
public function ping()
{

$data = [
'name' => strtolower(SERVICE_NAME),
'host' => APP_HOST_IP,
'port' => APP_HOST_PORT,
'k' => $this->get_key()
];
Output::info('ping', 'ping');
if ($this->reg_status) {
# 注册完毕进行ping
$re = $this->register_client->send_ask('/service/ping', $data);
} else {
# 没有注册完毕,先注册
$re = $this->register_client->send_ask('/service/reg', $data);
}
if ($re === false) {
$this->register_client->start();
}
output($re, "ping_re");

}


}
2 changes: 1 addition & 1 deletion src/Server.php
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ public function onWorkerStart(\Swoole\Server $server, int $worker_id)
}

if (!$this->cache->get('WKINIT') && !$server->taskworker) {
output(133);
output('init');
$this->cache->save('WKINIT', 1);
# 热更新
if (get_envbl('APP_CODEUPDATE', true)) {
Expand Down
17 changes: 14 additions & 3 deletions src/Task.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
*/
class Task extends Base
{
protected $name='Task';
protected $name = 'Task';

/**
* 在task_worker进程内被调用
Expand All @@ -26,6 +26,17 @@ public function onTask(\Swoole\Server $server, int $task_id, int $src_worker_id,
if ($data == 'codeUpdata') {
$this->codeUpdata();
}
if (is_array($data)) {
//数组的数据是要进行任务类调用
$name = $data['name'] ? $data['name'] : $data[0];
$class_name = 'app\\task\\' . ucfirst($name);
$handel = new $class_name($server,$data);
$handel->setTaskId($task_id);
$handel->setWorkId($src_worker_id);
return $handel->execute();
}


}

/**
Expand Down Expand Up @@ -81,7 +92,7 @@ private function codeUpdateCall($dir)
public function onPipeMessage(\Swoole\Server $server, int $src_worker_id, mixed $message)
{
output('onPipeMessage in task:');
$this->eventsManager->fire($this->name.':onPipeMessage', $this, [$src_worker_id, $message]);
$this->eventsManager->fire($this->name . ':onPipeMessage', $this, [$src_worker_id, $message]);

}

Expand All @@ -94,7 +105,7 @@ public function onPipeMessage(\Swoole\Server $server, int $src_worker_id, mixed
public function onWorkerStart(\Swoole\Server $server, int $worker_id)
{
output($worker_id, 'onWorkerStart in task');
$this->eventsManager->fire($this->name.':onWorkerStart', $this, $worker_id);
$this->eventsManager->fire($this->name . ':onWorkerStart', $this, $worker_id);
}

/**
Expand Down
73 changes: 73 additions & 0 deletions src/Task/Task.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
<?php

namespace pms\Task;
/**
* 任务基类
* Class Task
* @package pms\Task
*/
class Task
{
protected $swoole_server;
protected $trueData;
protected $data;
protected $task_id;
protected $src_worker_id;


public function __construct($swoole_server, $data)
{
$this->swoole_server = $swoole_server;
$this->trueData = $data;
$this->data = $data['data'] ? $data['data'] : $data[0];
}

/**
* 设置任务进程id
* @param $task_id
*/
public function setTaskId($task_id)
{
$this->task_id = $task_id;
}

/**
* 设置任务进程id
* @param $task_id
*/
public function setWorkId($src_worker_id)
{
$this->src_worker_id = $src_worker_id;
}


/**
* 执行方法
* @return mixed
*/
final public function execute()
{
$startTime = microtime(true);
$re = $this->run();
$endTime = microtime(true);

$data = $this->trueData;
$data['re'] = $re;
$data['task_id'] = $this->task_id;
$data['time'] = $endTime - $startTime;
return $data;
}

final public function finish()
{
$startFinishTime = microtime(true);
$re = $this->end();
$endFinishTime = microtime(true);
$data = $this->trueData;
$data['re'] = $re;
$data['task_id'] = $this->task_id;
$data['time'] = $endFinishTime - $startFinishTime;
return $data;
}

}
12 changes: 12 additions & 0 deletions src/Task/TaskInterface.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
<?php

namespace pms\Task;

interface TaskInterface
{
public function run();

public function end();


}
30 changes: 20 additions & 10 deletions src/Work.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,27 @@
class Work extends Base
{

protected $name='Work';
protected $name = 'Work';

/**
* task_worker中完成时,触发
* task_worker进程 在task完成时,触发
* @param swoole_server $serv
* @param int $task_id
* @param string $data
* @param mixed $data
*/
public function onFinish(\Swoole\Server $server, int $task_id, string $data)
public function onFinish(\Swoole\Server $server, int $task_id, $data)
{
output([$task_id, $data], 'onFinish');
$this->eventsManager->fire($this->name.':onFinish', $this, [$task_id, $data]);
$this->eventsManager->fire($this->name . ':onFinish', $this, [$task_id, $data]);
if (is_array($data)) {
//数组的数据是要进行任务类调用
$name = $data['name'] ? $data['name'] : $data[0];
$class_name = 'app\\task\\' . ucfirst($name);
$handel = new $class_name($server, $data);
$handel->setTaskId($task_id);
return $handel->finish();
}

}

/**
Expand All @@ -31,8 +41,8 @@ public function onFinish(\Swoole\Server $server, int $task_id, string $data)
*/
public function onPipeMessage(\Swoole\Server $server, int $src_worker_id, mixed $message)
{
output([$src_worker_id, $message], 'onFinish');
$this->eventsManager->fire($this->name.':onPipeMessage', $this, [$src_worker_id, $message]);
output([$src_worker_id, $message], 'onPipeMessage');
$this->eventsManager->fire($this->name . ':onPipeMessage', $this, [$src_worker_id, $message]);
}


Expand All @@ -45,7 +55,7 @@ public function onPipeMessage(\Swoole\Server $server, int $src_worker_id, mixed
public function onWorkerStart(\Swoole\Server $server, int $worker_id)
{
output($worker_id, 'onWorkerStart in work');
$this->eventsManager->fire($this->name.':onWorkerStart', $this, $worker_id);
$this->eventsManager->fire($this->name . ':onWorkerStart', $this, $worker_id);
}

/**
Expand All @@ -55,7 +65,7 @@ public function onWorkerStart(\Swoole\Server $server, int $worker_id)
*/
public function onWorkerStop(\Swoole\Server $server, int $worker_id)
{
$this->eventsManager->fire($this->name.':onWorkerStop', $this, $worker_id);
$this->eventsManager->fire($this->name . ':onWorkerStop', $this, $worker_id);
}

/**
Expand All @@ -79,7 +89,7 @@ public function onWorkerError(\Swoole\Server $server, int $worker_id, int $worke
*/
public function onWorkerExit(\Swoole\Server $server, int $worker_id)
{
$this->eventsManager->fire($this->name.':onWorkerExit', $this, $worker_id);
$this->eventsManager->fire($this->name . ':onWorkerExit', $this, $worker_id);
}


Expand Down
Loading

0 comments on commit 5deeb7d

Please sign in to comment.