From 9b8dd75308dd3510886d129f98bfcebcfb1b98b7 Mon Sep 17 00:00:00 2001 From: Davis Puciriuss Date: Mon, 17 Jun 2024 15:42:54 +0300 Subject: [PATCH] Added adaptive import that will turn on/off parallel import based on the document count. --- src/Console/Commands/ImportCommand.php | 14 ++++++++++---- src/Jobs/Import.php | 6 +++++- src/Jobs/ImportStages.php | 18 +++++++++++++++++- tests/Feature/ImportCommandTest.php | 26 ++++++++++++++++++++++++++ 4 files changed, 58 insertions(+), 6 deletions(-) diff --git a/src/Console/Commands/ImportCommand.php b/src/Console/Commands/ImportCommand.php index 9e09b84..71a31ce 100644 --- a/src/Console/Commands/ImportCommand.php +++ b/src/Console/Commands/ImportCommand.php @@ -17,7 +17,7 @@ final class ImportCommand extends Command /** * {@inheritdoc} */ - protected $signature = 'scout:import {searchable?* : The name of the searchable} {--P|parallel : Index items in parallel}'; + protected $signature = 'scout:import {searchable?* : The name of the searchable} {--P|parallel : Index items in parallel} {--A|adaptive : Index items in parallel if it is beneficial}'; /** * {@inheritdoc} @@ -30,14 +30,19 @@ final class ImportCommand extends Command public function handle(): void { $parallel = false; + $adaptive = false; if ($this->option('parallel')) { $parallel = true; } + if ($this->option('adaptive')) { + $adaptive = true; + } + $this->searchableList((array) $this->argument('searchable')) - ->each(function (string $searchable) use ($parallel) { - $this->import($searchable, $parallel); + ->each(function (string $searchable) use ($parallel, $adaptive) { + $this->import($searchable, $parallel, $adaptive); }); } @@ -59,7 +64,7 @@ private function searchableList(array $argument): Collection * @param bool $parallel * @return void */ - private function import(string $searchable, bool $parallel): void + private function import(string $searchable, bool $parallel, bool $adaptive): void { $sourceFactory = app(ImportSourceFactory::class); $source = $sourceFactory::from($searchable); @@ -70,6 +75,7 @@ private function import(string $searchable, bool $parallel): void $job->timeout = (int) $queueTimeout; } $job->parallel = $parallel; + $job->adaptive = $adaptive; if (config('scout.queue')) { $job = (new QueueableJob())->chain([$job]); diff --git a/src/Jobs/Import.php b/src/Jobs/Import.php index 65985ae..12b61ce 100644 --- a/src/Jobs/Import.php +++ b/src/Jobs/Import.php @@ -25,6 +25,10 @@ final class Import * @var bool */ public $parallel = false; + /** + * @var bool + */ + public $adaptive = false; public ?int $timeout = null; @@ -69,6 +73,6 @@ public function handle(Client $elasticsearch): void */ private function stages(): Collection { - return ImportStages::fromSource($this->source, $this->parallel); + return ImportStages::fromSource($this->source, $this->parallel, $this->adaptive); } } diff --git a/src/Jobs/ImportStages.php b/src/Jobs/ImportStages.php index 9cbc3e6..7d8be9b 100644 --- a/src/Jobs/ImportStages.php +++ b/src/Jobs/ImportStages.php @@ -2,6 +2,7 @@ namespace Matchish\ScoutElasticSearch\Jobs; +use Illuminate\Queue\Queue; use Illuminate\Support\Collection; use Matchish\ScoutElasticSearch\ElasticSearch\Index; use Matchish\ScoutElasticSearch\Jobs\Stages\CleanUp; @@ -25,10 +26,25 @@ class ImportStages extends Collection * @param bool $parallel * @return Collection */ - public static function fromSource(ImportSource $source, bool $parallel = false) + public static function fromSource(ImportSource $source, bool $parallel = false, bool $adaptive = false) { $index = Index::fromSource($source); + if ($adaptive) { + $source = $source->chunked(); + + if ($source === null) { + return collect(); + } + + // Performance starts to increase at 75k records for parallel indexing. + if ($source->getChunkSize() * $source->getTotalChunks() <= 75000) { + $parallel = false; + } else { + $parallel = true; + } + } + if ($parallel && class_exists(\Junges\TrackableJobs\Providers\TrackableJobsServiceProvider::class)) { return (new self([ new StopTrackedJobs($source), diff --git a/tests/Feature/ImportCommandTest.php b/tests/Feature/ImportCommandTest.php index f90285f..d58b36d 100644 --- a/tests/Feature/ImportCommandTest.php +++ b/tests/Feature/ImportCommandTest.php @@ -149,6 +149,32 @@ public function test_import_all_pages_in_parallel(): void $this->assertEquals($productsAmount, $response['hits']['total']['value']); } + public function test_import_all_pages_using_adaptive(): void + { + $dispatcher = Product::getEventDispatcher(); + Product::unsetEventDispatcher(); + + $productsAmount = 10; + + factory(Product::class, $productsAmount)->create(); + + Product::setEventDispatcher($dispatcher); + + Artisan::call('scout:import', [ + '--adaptive' => true, + ]); + $params = [ + 'index' => (new Product())->searchableAs(), + 'body' => [ + 'query' => [ + 'match_all' => new stdClass(), + ], + ], + ]; + $response = $this->elasticsearch->search($params); + $this->assertEquals($productsAmount, $response['hits']['total']['value']); + } + public function test_import_with_custom_key_all_pages(): void { $this->app['config']['scout.key'] = 'title';