From ff1d5ede0b7c69d868fa2a39622757f65f1d49c9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christian=20E=C3=9Fl?= Date: Wed, 7 Aug 2024 16:22:51 +0200 Subject: [PATCH] [TASK] Speed up the file processing queue with bulkInsert Relates: #6 --- .../Resource/ProcessedFileQueueRepository.php | 76 ++++++++++++------- 1 file changed, 49 insertions(+), 27 deletions(-) diff --git a/Classes/Resource/ProcessedFileQueueRepository.php b/Classes/Resource/ProcessedFileQueueRepository.php index 07bb45c..7f081d5 100644 --- a/Classes/Resource/ProcessedFileQueueRepository.php +++ b/Classes/Resource/ProcessedFileQueueRepository.php @@ -15,26 +15,23 @@ class ProcessedFileQueueRepository implements SingletonInterface { private const TABLE = 'sys_file_processedfile_queue'; + private array $insertQueue = []; + + public function __destruct() + { + $this->persistQueue(); + } public function enqueue(TaskInterface $task): void { - $connection = GeneralUtility::makeInstance(ConnectionPool::class)->getConnectionForTable(self::TABLE); - $connection->insert( - self::TABLE, - [ - 'public_url' => $task->getTargetFile()->getPublicUrl(), - 'storage' => $task->getSourceFile()->getStorage()->getUid(), - 'original' => $task->getSourceFile()->getUid(), - 'task_type' => $task->getType() . '.' . $task->getName(), - 'configuration' => (new ConfigurationService())->serialize($task->getConfiguration()), - 'checksum' => $task->getConfigurationChecksum() - ], - [ - 'storage' => Connection::PARAM_INT, - 'original' => Connection::PARAM_INT, - 'configuration' => Connection::PARAM_LOB, - ] - ); + $this->insertQueue[$task->getConfigurationChecksum()] = [ + 'public_url' => $task->getTargetFile()->getPublicUrl(), + 'storage' => $task->getSourceFile()->getStorage()->getUid(), + 'original' => $task->getSourceFile()->getUid(), + 'task_type' => $task->getType() . '.' . $task->getName(), + 'configuration' => (new ConfigurationService())->serialize($task->getConfiguration()), + 'checksum' => $task->getConfigurationChecksum() + ]; } public function dequeue(int $uid): void @@ -49,16 +46,17 @@ public function dequeue(int $uid): void public function isEnqueued(TaskInterface $task): bool { $connection = GeneralUtility::makeInstance(ConnectionPool::class)->getConnectionForTable(self::TABLE); - return (bool)$connection->count( - '*', - self::TABLE, - [ - 'storage' => $task->getSourceFile()->getStorage()->getUid(), - 'original' => $task->getSourceFile()->getUid(), - 'task_type' => $task->getType() . '.' . $task->getName(), - 'checksum' => $task->getConfigurationChecksum() - ], - ); + + return isset($this->insertQueue[$task->getConfigurationChecksum()]) || $connection->count( + '*', + self::TABLE, + [ + 'storage' => $task->getSourceFile()->getStorage()->getUid(), + 'original' => $task->getSourceFile()->getUid(), + 'task_type' => $task->getType() . '.' . $task->getName(), + 'checksum' => $task->getConfigurationChecksum() + ], + ); } /** @@ -89,4 +87,28 @@ public function findByPublicUrl(string $publicUrl): QueuedTask|bool $result['checksum'] ); } + + private function persistQueue(): void + { + if (count($this->insertQueue) > 0) { + $connection = GeneralUtility::makeInstance(ConnectionPool::class)->getConnectionForTable(self::TABLE); + $connection->bulkInsert( + self::TABLE, + $this->insertQueue, + [ + 'public_url', + 'storage', + 'original', + 'task_type', + 'configuration', + 'checksum', + ], + [ + 'storage' => Connection::PARAM_INT, + 'original' => Connection::PARAM_INT, + 'configuration' => Connection::PARAM_LOB, + ] + ); + } + } }