-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Basically the concurrency crawling was being done wrong: 1. I was spinning way too many tasks (as many as URLs in the queue) 2. poolboy was only being used for throttling network requests. 3. too much going on when I actually only needed a simple mapping over urls concurrently. TLDR: swapped all the broken pooling implementation for `Task.Supervisor.async_stream/3`. The crawling was blocking anyways, so there was no benefit really in all the supervisors/workers being spinned up.
- Loading branch information
Showing
12 changed files
with
131 additions
and
177 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1 +1,2 @@ | ||
elixir 1.11.2-otp-22 | ||
elixir 1.11.4-otp-23 | ||
erlang 23.3.1 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,56 @@ | ||
{ | ||
"version": "2.0.0", | ||
"tasks": [ | ||
{ | ||
"label": "Test Function at Cursor", | ||
"command": "mix test ${relativeFile}:${lineNumber}", | ||
"group": "test", | ||
"type": "shell", | ||
"problemMatcher": [ | ||
"$mixCompileError", | ||
"$mixCompileWarning", | ||
"$mixTestFailure" | ||
], | ||
"presentation": { | ||
"echo": true, | ||
"reveal": "always", | ||
"focus": false, | ||
"panel": "shared" | ||
} | ||
}, | ||
{ | ||
"label": "Test Current File", | ||
"command": "mix test ${relativeFile}", | ||
"group": "test", | ||
"type": "shell", | ||
"problemMatcher": [ | ||
"$mixCompileError", | ||
"$mixCompileWarning", | ||
"$mixTestFailure" | ||
], | ||
"presentation": { | ||
"echo": true, | ||
"reveal": "always", | ||
"focus": false, | ||
"panel": "shared" | ||
} | ||
}, | ||
{ | ||
"label": "Run All Tests", | ||
"command": "mix test", | ||
"type": "shell", | ||
"group": "test", | ||
"problemMatcher": [ | ||
"$mixCompileError", | ||
"$mixCompileWarning", | ||
"$mixTestFailure" | ||
], | ||
"presentation": { | ||
"echo": true, | ||
"reveal": "always", | ||
"focus": false, | ||
"panel": "shared" | ||
} | ||
} | ||
] | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,46 +1,66 @@ | ||
defmodule Spidey.Crawler do | ||
alias Spidey.Logger | ||
alias Spidey.Crawler.{PoolManager, UrlStore, Queue, Worker} | ||
|
||
@worker_timeout 60_000 | ||
alias Spidey.Filter | ||
alias Spidey.Crawler.{UrlStore, Queue, Content} | ||
|
||
def crawl(seed, pool_name, opts) do | ||
filter = Keyword.get(opts, :filter, Spidey.Filter.DefaultFilter) | ||
|
||
Logger.log("starting crawler supervision tree #{pool_name}") | ||
{:ok, pid} = Spidey.Crawler.Supervisor.start_link(pool_name, []) | ||
|
||
try do | ||
Logger.log("starting pool and ETS table #{pool_name}") | ||
PoolManager.start_child(pool_name, opts) | ||
Logger.log("starting ETS table #{pool_name}") | ||
UrlStore.init!(seed, pool_name) | ||
|
||
Queue.push(seed, pool_name) | ||
crawl_queue(pool_name, seed) | ||
crawl_queue(pool_name, seed, filter) | ||
after | ||
Logger.log("terminating pool and ETS table #{pool_name}") | ||
PoolManager.terminate_child(pool_name) | ||
Logger.log("terminating crawler supervision tree #{pool_name}") | ||
Process.exit(pid, :normal) | ||
|
||
Logger.log("terminating ETS table #{pool_name}") | ||
UrlStore.teardown(pool_name) | ||
end | ||
end | ||
|
||
defp crawl_queue(pool_name, seed) do | ||
defp crawl_queue(pool_name, seed, filter) do | ||
queue_length = Queue.length(pool_name) | ||
|
||
if queue_length == 0 do | ||
Logger.log("no urls remaining in queue. Returning all urls") | ||
UrlStore.retrieve_all(pool_name) | ||
else | ||
queue_length | ||
|> Queue.take(pool_name) | ||
|> Enum.map(&run_in_pool(&1, pool_name, seed)) | ||
|> Task.await_many(@worker_timeout) | ||
max_concurrency = System.schedulers_online() | ||
|
||
Logger.log( | ||
"attempting to crawl #{queue_length} urls at a concurrent rate of #{max_concurrency}" | ||
) | ||
|
||
urls = Queue.take(queue_length, pool_name) | ||
|
||
crawl_queue(pool_name, seed) | ||
Task.Supervisor.async_stream( | ||
:"#{pool_name}TaskSupervisor", | ||
urls, | ||
fn url -> | ||
url | ||
|> Content.scan() | ||
|> Filter.filter_urls(filter, seed: seed) | ||
|> Stream.reject(&UrlStore.exists?(&1, pool_name)) | ||
|> Stream.each(&push_to_stores(&1, pool_name)) | ||
|> Stream.run() | ||
end, | ||
timeout: 10_000, | ||
on_timeout: :kill_task | ||
) | ||
|> Stream.run() | ||
|
||
crawl_queue(pool_name, seed, filter) | ||
end | ||
end | ||
|
||
defp run_in_pool(url, pool_name, seed) do | ||
Task.async(fn -> | ||
:poolboy.transaction( | ||
pool_name, | ||
fn pid -> Worker.crawl(pid, url, pool_name, seed, timeout: @worker_timeout - 5000) end, | ||
@worker_timeout | ||
) | ||
end) | ||
defp push_to_stores(url, pool_name) do | ||
Queue.push(url, pool_name) | ||
UrlStore.add(url, pool_name) | ||
end | ||
end |
This file was deleted.
Oops, something went wrong.
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,19 @@ | ||
defmodule Spidey.Crawler.Supervisor do | ||
use Supervisor | ||
|
||
alias Spidey.Crawler.Queue | ||
|
||
def start_link(pool_name, _opts) do | ||
Supervisor.start_link(__MODULE__, %{pool_name: pool_name}, name: :"#{pool_name}Supervisor") | ||
end | ||
|
||
@impl true | ||
def init(%{pool_name: pool_name}) do | ||
children = [ | ||
{Task.Supervisor, name: :"#{pool_name}TaskSupervisor"}, | ||
Queue.child_spec(pool_name) | ||
] | ||
|
||
Supervisor.init(children, strategy: :one_for_all) | ||
end | ||
end |
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.