Skip to content

Commit

Permalink
Added adaptive import that will turn on/off parallel import based on …
Browse files Browse the repository at this point in the history
…the document count.
  • Loading branch information
Davis Puciriuss committed Jun 17, 2024
1 parent 0752f15 commit 9b8dd75
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 6 deletions.
14 changes: 10 additions & 4 deletions src/Console/Commands/ImportCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ final class ImportCommand extends Command
/**
* {@inheritdoc}
*/
protected $signature = 'scout:import {searchable?* : The name of the searchable} {--P|parallel : Index items in parallel}';
protected $signature = 'scout:import {searchable?* : The name of the searchable} {--P|parallel : Index items in parallel} {--A|adaptive : Index items in parallel if it is beneficial}';

/**
* {@inheritdoc}
Expand All @@ -30,14 +30,19 @@ final class ImportCommand extends Command
public function handle(): void
{
$parallel = false;
$adaptive = false;

if ($this->option('parallel')) {
$parallel = true;
}

if ($this->option('adaptive')) {
$adaptive = true;
}

$this->searchableList((array) $this->argument('searchable'))
->each(function (string $searchable) use ($parallel) {
$this->import($searchable, $parallel);
->each(function (string $searchable) use ($parallel, $adaptive) {
$this->import($searchable, $parallel, $adaptive);
});
}

Expand All @@ -59,7 +64,7 @@ private function searchableList(array $argument): Collection
* @param bool $parallel
* @return void
*/
private function import(string $searchable, bool $parallel): void
private function import(string $searchable, bool $parallel, bool $adaptive): void
{
$sourceFactory = app(ImportSourceFactory::class);
$source = $sourceFactory::from($searchable);
Expand All @@ -70,6 +75,7 @@ private function import(string $searchable, bool $parallel): void
$job->timeout = (int) $queueTimeout;
}
$job->parallel = $parallel;
$job->adaptive = $adaptive;

if (config('scout.queue')) {
$job = (new QueueableJob())->chain([$job]);
Expand Down
6 changes: 5 additions & 1 deletion src/Jobs/Import.php
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ final class Import
* @var bool
*/
public $parallel = false;
/**
* @var bool
*/
public $adaptive = false;

public ?int $timeout = null;

Expand Down Expand Up @@ -69,6 +73,6 @@ public function handle(Client $elasticsearch): void
*/
private function stages(): Collection
{
return ImportStages::fromSource($this->source, $this->parallel);
return ImportStages::fromSource($this->source, $this->parallel, $this->adaptive);
}
}
18 changes: 17 additions & 1 deletion src/Jobs/ImportStages.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

namespace Matchish\ScoutElasticSearch\Jobs;

use Illuminate\Queue\Queue;
use Illuminate\Support\Collection;
use Matchish\ScoutElasticSearch\ElasticSearch\Index;
use Matchish\ScoutElasticSearch\Jobs\Stages\CleanUp;
Expand All @@ -25,10 +26,25 @@ class ImportStages extends Collection
* @param bool $parallel
* @return Collection<int, StageInterface>
*/
public static function fromSource(ImportSource $source, bool $parallel = false)
public static function fromSource(ImportSource $source, bool $parallel = false, bool $adaptive = false)
{
$index = Index::fromSource($source);

if ($adaptive) {
$source = $source->chunked();

if ($source === null) {
return collect();
}

// Performance starts to increase at 75k records for parallel indexing.
if ($source->getChunkSize() * $source->getTotalChunks() <= 75000) {
$parallel = false;
} else {
$parallel = true;
}
}

if ($parallel && class_exists(\Junges\TrackableJobs\Providers\TrackableJobsServiceProvider::class)) {
return (new self([
new StopTrackedJobs($source),
Expand Down
26 changes: 26 additions & 0 deletions tests/Feature/ImportCommandTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,32 @@ public function test_import_all_pages_in_parallel(): void
$this->assertEquals($productsAmount, $response['hits']['total']['value']);
}

public function test_import_all_pages_using_adaptive(): void
{
$dispatcher = Product::getEventDispatcher();
Product::unsetEventDispatcher();

$productsAmount = 10;

factory(Product::class, $productsAmount)->create();

Product::setEventDispatcher($dispatcher);

Artisan::call('scout:import', [
'--adaptive' => true,
]);
$params = [
'index' => (new Product())->searchableAs(),
'body' => [
'query' => [
'match_all' => new stdClass(),
],
],
];
$response = $this->elasticsearch->search($params);
$this->assertEquals($productsAmount, $response['hits']['total']['value']);
}

public function test_import_with_custom_key_all_pages(): void
{
$this->app['config']['scout.key'] = 'title';
Expand Down

0 comments on commit 9b8dd75

Please sign in to comment.