Skip to content

Commit

Permalink
Use json for state value and update code to fix sharding timeout exce…
Browse files Browse the repository at this point in the history
…ed (#430)
  • Loading branch information
donhardman authored Jan 6, 2025
1 parent a489ccb commit 81a6484
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 15 deletions.
12 changes: 7 additions & 5 deletions src/Plugin/Sharding/CreateHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -150,19 +150,21 @@ protected static function getShardingFn(): Closure {
$timeout = $payload->getShardingTimeout();
while (true) {
// TODO: think about the way to refactor it and remove duplication
$q = "select `value` from _sharding_state where `key` = 'table:{$payload->table}'";
$q = "select value[0] as value from _sharding_state where `key` = 'table:{$payload->table}'";
$resp = $client->sendRequest($q);
$result = $resp->getResult();
/** @var array{0:array{data?:array{0:array{value:string}}}} $result */
if (isset($result[0]['data'][0]['value'])) {
$value = json_decode($result[0]['data'][0]['value'], true);
}
$value = json_decode($result[0]['data'][0]['value'] ?? '[]', true);

/** @var array{result:string,status?:string,type?:string} $value */
$type = $value['type'] ?? 'unknown';
$status = $value['status'] ?? 'processing';
if ($type === 'create' && $status !== 'processing') {
return TaskResult::fromResponse(Response::fromBody($value['result']));
// We have multiple encode decode cuz manticore does not support json inside json and says
// TOK_IDENT we should fix it when possible and review
/** @var string */
$body = json_encode($value['result']);
return TaskResult::fromResponse(Response::fromBody($body));
}
if ((time() - $ts) > $timeout) {
break;
Expand Down
2 changes: 1 addition & 1 deletion src/Plugin/Sharding/DropHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ protected function validate(): ?Task {
*/
protected function getTableState(string $table): array {
// TODO: think about the way to refactor it and remove duplication
$q = "select `value` from _sharding_state where `key` = 'table:{$table}'";
$q = "select value[0] as value from _sharding_state where `key` = 'table:{$table}'";
$resp = $this->manticoreClient->sendRequest($q);

/** @var array{0:array{data?:array{0:array{value:string}}}} $result */
Expand Down
5 changes: 3 additions & 2 deletions src/Plugin/Sharding/Operator.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
namespace Manticoresearch\Buddy\Base\Plugin\Sharding;

use Manticoresearch\Buddy\Core\ManticoreSearch\Client;
use Manticoresearch\Buddy\Core\Network\Struct;
use Manticoresearch\Buddy\Core\Task\TaskResult;
use Manticoresearch\Buddy\Core\Tool\Buddy;
use RuntimeException;
Expand Down Expand Up @@ -288,8 +289,8 @@ public function checkTableStatus(string $table): bool {
if ($isProcessed) {
$result['status'] = 'done';
$result['result'] = getenv('DEBUG') && $result['type'] === 'create'
? $this->client->sendRequest("SHOW CREATE TABLE {$table}")->getBody()
: TaskResult::none()->toString();
? Struct::fromJson($this->client->sendRequest("SHOW CREATE TABLE {$table}")->getBody())
: TaskResult::none()->getStruct();
$this->state->set($stateKey, $result);
}

Expand Down
22 changes: 15 additions & 7 deletions src/Plugin/Sharding/State.php
Original file line number Diff line number Diff line change
Expand Up @@ -56,18 +56,19 @@ public function set(string $key, mixed $value): static {
: $this->table
;
$now = time();
$encodedValue = addcslashes(json_encode($value) ?: '', "'");
$encodedValue = addcslashes(json_encode($value) ?: 'null', "'");

$query = match ($this->fetch($key)) {
null => "
INSERT INTO {$table}
(`key`, `value`, `updated_at`)
VALUES
('{$key}', '{$encodedValue}', {$now})
('{$key}', '[{$encodedValue}]', {$now})
",
default => "
UPDATE {$table} SET
`updated_at` = {$now},
`value` = '{$encodedValue}'
`value` = '[{$encodedValue}]'
WHERE `key` = '{$key}'
",
};
Expand Down Expand Up @@ -100,7 +101,7 @@ public function listRegex(string $regex): Vector {
/** @var array{0:array{data?:array{0?:array{key:string,value:string}}}} $res */
$res = $this->client
->sendRequest(
"SELECT `key`, `value` FROM {$this->table} WHERE REGEX(`key`, '{$regex}')"
"SELECT `key`, value[0] AS value FROM {$this->table} WHERE REGEX(`key`, '{$regex}')"
)
->getResult();

Expand All @@ -124,11 +125,18 @@ public function listRegex(string $regex): Vector {
protected function fetch(string $key): mixed {
$res = $this->client
->sendRequest(
"SELECT value FROM {$this->table} WHERE key = '$key'"
"SELECT value[0] as value FROM {$this->table} WHERE key = '$key'"
)
->getResult();
/** @var array{0:array{data:array{0?:array{value:string}}}} $res */
$value = isset($res[0]['data'][0]) ? $res[0]['data'][0]['value'] : null;
$value = $res[0]['data'][0]['value'] ?? null;
if (is_string($value)) {
$value = trim($value);
// Manticore returns unquoted string for empty string
if (!str_starts_with($value, '{') && !str_ends_with($value, '}')) {
return $value;
}
}
return isset($value) ? json_decode($value, true) : null;
}

Expand All @@ -145,7 +153,7 @@ public function setup(): void {
}
$query = "CREATE TABLE `{$this->table}` (
`key` string,
`value` string,
`value` json,
`updated_at` timestamp
)";
$this->client->sendRequest($query);
Expand Down

0 comments on commit 81a6484

Please sign in to comment.