Skip to content

Commit

Permalink
save
Browse files Browse the repository at this point in the history
  • Loading branch information
chaz6chez committed Apr 9, 2024
1 parent 9b5d3d9 commit 703fb70
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 56 deletions.
31 changes: 7 additions & 24 deletions src/HookServer.php
Original file line number Diff line number Diff line change
Expand Up @@ -74,23 +74,6 @@ public static function instance(): HookServer
return self::$_instance;
}

/**
* @param string $secret
* @param string $method
* @param array $query
* @param string $body
* @return string
*/
public static function sign(string $secret, string $method, array $query, string $body): string
{
ksort($query);
return hash_hmac('sha256',
$method . PHP_EOL . \parse_url(self::getConfig('webhook_url'), \PHP_URL_PATH) . PHP_EOL . http_build_query($query) . PHP_EOL . $body,
$secret,
false
);
}

/**
* 发布消息
*
Expand Down Expand Up @@ -118,18 +101,18 @@ public function publish(string $event, array $data, ?string $republishCountKey =
}
return boolval(self::getStorage()->xAdd($queue,'*', $value));
} catch (RedisException $exception) {
Log::channel('plugin.workbunny.webman-push-server.notice')->warning('Redis server error. ', [
'message' => $exception->getMessage(), 'code' => $exception->getCode(),
'queue' => $queue, 'value' => $value
Log::channel('plugin.workbunny.webman-push-server.warning')->warning('Redis server error. ', [
'message' => $exception->getMessage(), 'code' => $exception->getCode(), 'queue' => $queue,
// 本地储存
'id' => $this->_tempInsert($queue, $value)
]);
$this->_tempInsert($queue, $value);
return false;
} catch (RuntimeException $exception) {
Log::channel('plugin.workbunny.webman-push-server.notice')->notice('Publish failed. ', [
'message' => $exception->getMessage(), 'code' => $exception->getCode(),
'queue' => $queue, 'value' => $value
'message' => $exception->getMessage(), 'code' => $exception->getCode(), 'queue' => $queue,
// 本地储存
'id' => $this->_tempInsert($queue, $value)
]);
$this->_tempInsert($queue, $value);
return null;
}
}
Expand Down
78 changes: 48 additions & 30 deletions src/WebhookHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -26,44 +26,62 @@ public static function instance(): WebhookHandler
return self::$_instance;
}

/**
* @param string $secret
* @param string $method
* @param array $query
* @param string $body
* @return string
*/
public static function sign(string $secret, string $method, array $query, string $body): string
{
ksort($query);
return hash_hmac('sha256',
$method . PHP_EOL . \parse_url(HookServer::getConfig('webhook_url'), \PHP_URL_PATH) . PHP_EOL . http_build_query($query) . PHP_EOL . $body,
$secret,
false
);
}

/** @inheritdoc */
public function run(string $queue, string $group, array $dataArray)
public function run(string $queue, string $group, array $dataArray): void
{
$idArray = array_keys($dataArray);
$messageArray = array_values($dataArray);
// 如果没有配置webhook地址,直接ack
if (!HookServer::getConfig('webhook_url')) {
HookServer::instance()->ack($queue, $group, $idArray);
}
// http发送
$this->_request($method = 'POST', [
'header' => [
'sign' => HookServer::sign(HookServer::getConfig('webhook_secret'), $method, $query = ['id' => uuid()], $body = json_encode([
'time_ms' => microtime(true),
'events' => $messageArray,
]))
],
'query' => $query,
'data' => $body,
], function (Response $response) use ($queue, $group, $idArray, $dataArray) {
// 数据ack
if (HookServer::instance()->ack($queue, $group, $idArray)) {
// 失败数据重入队尾
if($response->getStatusCode() !== 200) {
foreach ($dataArray as $value) {
HookServer::instance()->publish($queue, $value, 'failed_count');
if (HookServer::getConfig('webhook_url')) {
// http发送
$this->_request($method = 'POST', [
'header' => [
'sign' => self::sign(HookServer::getConfig('webhook_secret'), $method, $query = ['id' => uuid()], $body = json_encode([
'time_ms' => microtime(true),
'events' => $messageArray,
]))
],
'query' => $query,
'data' => $body,
], function (Response $response) use ($queue, $group, $idArray, $dataArray) {
// 数据ack
if (HookServer::instance()->ack($queue, $group, $idArray)) {
// 失败数据重入队尾
if($response->getStatusCode() !== 200) {
foreach ($dataArray as $value) {
HookServer::instance()->publish($queue, $value, 'failed_count');
}
}
}
}
}, function (Throwable $throwable) use ($queue, $group, $idArray, $dataArray) {
// 数据ack
if (HookServer::instance()->ack($queue, $group, $idArray)) {
// 重入队尾
foreach ($dataArray as $value) {
HookServer::instance()->publish($queue, $value, 'error_count');
}, function (Throwable $throwable) use ($queue, $group, $idArray, $dataArray) {
// 数据ack
if (HookServer::instance()->ack($queue, $group, $idArray)) {
// 重入队尾
foreach ($dataArray as $value) {
HookServer::instance()->publish($queue, $value, 'error_count');
}
}
}
});
});
return;
}
HookServer::instance()->ack($queue, $group, $idArray);
}

/**
Expand Down
4 changes: 2 additions & 2 deletions src/config/plugin/workbunny/webman-push-server/route.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
declare(strict_types=1);

use Workbunny\WebmanPushServer\ApiClient;
use Workbunny\WebmanPushServer\HookServer;
use Workbunny\WebmanPushServer\WebhookHandler;
use Workerman\Protocols\Http\Request;
use support\Response;
use Workbunny\WebmanPushServer\ApiRoute;
Expand Down Expand Up @@ -42,7 +42,7 @@
*/
ApiRoute::post('/webhook', function (Server $server, Request $request) {
parse_str($request->queryString(), $query);
$sign = HookServer::sign(
$sign = WebhookHandler::sign(
'YOUR_WEBHOOK_SECRET',
'POST',
$query,
Expand Down

0 comments on commit 703fb70

Please sign in to comment.