Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add thread pool with promises to limit concurrent sideloading #472

Open
wants to merge 55 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
32a4cff
Revert "fix: Remove thread pool executor logic until we get a better …
MattFenelon Mar 27, 2024
d6fce34
Add thread pool and concurrency_max_threads configuration option
MattFenelon Mar 27, 2024
2f2b941
Merge branch 'master' into Add-thread-pool-and-concurrency_max_thread…
jkeen Mar 28, 2024
2a462ee
Add overall wait on the same thread pool
MattFenelon Mar 28, 2024
464295e
Add possible test case for deadlock
MattFenelon Mar 28, 2024
6829616
Merge branch 'master' into Add-thread-pool-and-concurrency_max_thread…
MattFenelon Apr 11, 2024
e4de93e
Add a failing test case to show why deadlocks are occurring
MattFenelon Apr 12, 2024
557b029
Fix linting
MattFenelon Apr 12, 2024
9d5e7f2
rough working version of promise code
MattFenelon Apr 26, 2024
251fb2b
Merge branch 'master' into Add-thread-pool-and-concurrency_max_thread…
MattFenelon Apr 26, 2024
80179ab
Add some todos and drop unnecessary code
MattFenelon Apr 26, 2024
dfef16b
Add broken test
MattFenelon Apr 26, 2024
b791e4d
Add integration test for threads
MattFenelon Apr 26, 2024
3db3856
Add code to make the test pass
MattFenelon Apr 26, 2024
7694ee4
Add adapter close on the same thread
MattFenelon May 10, 2024
7607d2a
Add temporary synchronous thread pool to help debug test failures
MattFenelon May 16, 2024
fe6321b
Add promise around resolved
MattFenelon May 20, 2024
719b14a
Drop promise around resolve
MattFenelon May 21, 2024
ea85d83
Add capture results in the context of the sideload promise
MattFenelon May 21, 2024
309b980
Make a gemfile that works locally without appraisal
MattFenelon May 22, 2024
70848c4
Fix gemfile temporarily to get tests passing locally
MattFenelon May 24, 2024
3e050d0
Fix resource_proxy data|to_a|resolve_data to understand promises
MattFenelon May 24, 2024
a6a1c4d
Reset Gemfile and spec_helper
MattFenelon May 24, 2024
419fa2e
Reset gemspec
MattFenelon May 24, 2024
4160473
Reset formatting
MattFenelon May 24, 2024
0971be7
Fix lint
MattFenelon May 24, 2024
c5680c3
Add single threaded mode back in
MattFenelon May 24, 2024
792b015
Bump concurrent-ruby from 1.0 to 1.2+
MattFenelon Jun 13, 2024
339cc7e
Refactor future using methods into their own methods
MattFenelon Jun 13, 2024
b7b5201
Make standardrb happy
MattFenelon Jun 13, 2024
ec3a976
Add Sideload#load back
MattFenelon Jun 13, 2024
16ccfde
Add adapter closing on concurrently loaded sideloads
MattFenelon Jun 14, 2024
5ca2ac5
Fix adapter closing to stick on the same thread
MattFenelon Jun 21, 2024
42e5544
Merge branch 'master' into Add-thread-pool-and-concurrency_max_thread…
jkeen Jul 10, 2024
2860987
Refactor to a modern API
MattFenelon Sep 9, 2024
fd79f8d
Add copying of parent thread's locals to the sideloading thread
MattFenelon Sep 11, 2024
301d4aa
Refactor the code to be a little simpler
MattFenelon Sep 11, 2024
e654801
Refactor to a modern API for Concurrent::Delay
MattFenelon Sep 12, 2024
36c6dbe
Merge branch 'master' into Add-thread-pool-and-concurrency_max_thread…
MattFenelon Sep 12, 2024
604d2dc
Add copying of parent fiber's locals to the sideloading thread
MattFenelon Sep 12, 2024
9394ef4
Add only copy thread and fiber locals when the thread has changed
MattFenelon Sep 12, 2024
8a8bdae
Add clearing of thread/fiber locals
MattFenelon Sep 12, 2024
d8c3b4b
Fix the tests for ruby versions that don't have fiber locals
MattFenelon Sep 12, 2024
1b201b1
Fix tests to do with Fiber.current.storage.keys NoMethodError
MattFenelon Sep 12, 2024
8f3bd82
Add a test for #flat in Scope#future_resolve_sideloads
MattFenelon Sep 14, 2024
4cf866b
Drop byebug line
MattFenelon Sep 14, 2024
092f2f6
Add thread pool stats broadcast for observability
MattFenelon Sep 14, 2024
78ec4f8
Merge branch 'master' into Add-thread-pool-and-concurrency_max_thread…
MattFenelon Sep 17, 2024
ca9be32
Add handling of multiple errors from promises
MattFenelon Sep 17, 2024
c356cc0
Optimise stat keys to symbols
MattFenelon Sep 18, 2024
dcb437f
Fix rescue to run on the thread pool and handle non-exceptions
MattFenelon Sep 18, 2024
e42b9f8
Add synchronous error handling for asynchronously loaded sideloads
MattFenelon Sep 19, 2024
a653122
Refactor if/else statement
MattFenelon Sep 19, 2024
cd5b215
Merge branch 'master' into Add-thread-pool-and-concurrency_max_thread…
jkeen Nov 6, 2024
0040429
Merge branch 'master' into Add-thread-pool-and-concurrency_max_thread…
jkeen Nov 11, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion graphiti.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ Gem::Specification.new do |spec|
spec.add_dependency "jsonapi-renderer", "~> 0.2", ">= 0.2.2"
spec.add_dependency "dry-types", ">= 0.15.0", "< 2.0"
spec.add_dependency "graphiti_errors", "~> 1.1.0"
spec.add_dependency "concurrent-ruby", "~> 1.0"
spec.add_dependency "concurrent-ruby", ">= 1.2", "< 2.0"
spec.add_dependency "activesupport", ">= 5.2"

spec.add_development_dependency "faraday", "~> 0.15"
Expand Down
15 changes: 15 additions & 0 deletions lib/graphiti/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,20 @@ class Configuration
# Defaults to false OR if classes are cached (Rails-only)
attr_accessor :concurrency

# This number must be considered in accordance with the database
# connection pool size configured in `database.yml`. The connection
# pool should be large enough to accommodate both the foreground
# threads (ie. web server or job worker threads) and background
# threads. For each process, Graphiti will create one global
# executor that uses this many threads to sideload resources
# asynchronously. Thus, the pool size should be at least
# `thread_count + concurrency_max_threads + 1`. For example, if your
# web server has a maximum of 3 threads, and
# `concurrency_max_threads` is set to 4, then your pool size should
# be at least 8.
# @return [Integer] Maximum number of threads to use when fetching sideloads concurrently
attr_accessor :concurrency_max_threads

attr_accessor :respond_to
attr_accessor :context_for_endpoint
attr_accessor :links_on_demand
Expand All @@ -27,6 +41,7 @@ class Configuration
def initialize
@raise_on_missing_sideload = true
@concurrency = false
@concurrency_max_threads = 4
@respond_to = [:json, :jsonapi, :xml]
@links_on_demand = false
@pagination_links_on_demand = false
Expand Down
14 changes: 11 additions & 3 deletions lib/graphiti/resource_proxy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -77,16 +77,24 @@ def as_graphql(options = {})
def data
@data ||= begin
records = @scope.resolve
if records.empty? && raise_on_missing?
raise Graphiti::Errors::RecordNotFound
end
raise Graphiti::Errors::RecordNotFound if records.empty? && raise_on_missing?

records = records[0] if single?
records
end
end
alias_method :to_a, :data
alias_method :resolve_data, :data

def future_resolve_data
@scope.future_resolve.then do |records|
raise Graphiti::Errors::RecordNotFound if records.empty? && raise_on_missing?

records = records[0] if single?
@data = records
end
end

def meta
@meta ||= data.respond_to?(:meta) ? data.meta : {}
end
Expand Down
151 changes: 107 additions & 44 deletions lib/graphiti/scope.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,35 @@ module Graphiti
class Scope
attr_accessor :object, :unpaginated_object
attr_reader :pagination

GLOBAL_THREAD_POOL_EXECUTOR_BROADCAST_STATS = %i[
length max_length queue_length max_queue completed_task_count largest_length scheduled_task_count synchronous
]
GLOBAL_THREAD_POOL_EXECUTOR = Concurrent::Promises.delay do
if Graphiti.config.concurrency
concurrency = Graphiti.config.concurrency_max_threads || 4
Concurrent::ThreadPoolExecutor.new(
min_threads: 0,
max_threads: concurrency,
max_queue: concurrency * 4,
fallback_policy: :caller_runs
)
else
Concurrent::ThreadPoolExecutor.new(max_threads: 0, synchronous: true, fallback_policy: :caller_runs)
end
end
private_constant :GLOBAL_THREAD_POOL_EXECUTOR, :GLOBAL_THREAD_POOL_EXECUTOR_BROADCAST_STATS

def self.global_thread_pool_executor
GLOBAL_THREAD_POOL_EXECUTOR.value!
end

def self.global_thread_pool_stats
GLOBAL_THREAD_POOL_EXECUTOR_BROADCAST_STATS.each_with_object({}) do |key, memo|
memo[key] = global_thread_pool_executor.send(key)
end
end

def initialize(object, resource, query, opts = {})
@object = object
@resource = resource
Expand All @@ -14,57 +43,33 @@ def initialize(object, resource, query, opts = {})
end

def resolve
if @query.zero_results?
[]
else
resolved = broadcast_data { |payload|
@object = @resource.before_resolve(@object, @query)
payload[:results] = @resource.resolve(@object)
payload[:results]
}
resolved.compact!
assign_serializer(resolved)
yield resolved if block_given?
@opts[:after_resolve]&.call(resolved)
resolve_sideloads(resolved) unless @query.sideloads.empty?
resolved
end
future_resolve.value!
end

def resolve_sideloads(results)
return if results == []
future_resolve_sideloads(results).value!
end

concurrent = Graphiti.config.concurrency
promises = []
def future_resolve
return Concurrent::Promises.fulfilled_future([], self.class.global_thread_pool_executor) if @query.zero_results?

@query.sideloads.each_pair do |name, q|
sideload = @resource.class.sideload(name)
next if sideload.nil? || sideload.shared_remote?
parent_resource = @resource
graphiti_context = Graphiti.context
resolve_sideload = -> {
Graphiti.config.before_sideload&.call(graphiti_context)
Graphiti.context = graphiti_context
sideload.resolve(results, q, parent_resource)
@resource.adapter.close if concurrent
}
if concurrent
promises << Concurrent::Promise.execute(&resolve_sideload)
else
resolve_sideload.call
end
resolved = broadcast_data { |payload|
@object = @resource.before_resolve(@object, @query)
payload[:results] = @resource.resolve(@object)
payload[:results]
}
resolved.compact!
assign_serializer(resolved)
yield resolved if block_given?
@opts[:after_resolve]&.call(resolved)
sideloaded = @query.parents.any?
close_adapter = Graphiti.config.concurrency && sideloaded
if close_adapter
@resource.adapter.close
end

if concurrent
# Wait for all promises to finish
sleep 0.01 until promises.all? { |p| p.fulfilled? || p.rejected? }
# Re-raise the error with correct stacktrace
# OPTION** to avoid failing here?? if so need serializable patch
# to avoid loading data when association not loaded
if (rejected = promises.find(&:rejected?))
raise rejected.reason
end
end
future_resolve_sideloads(resolved)
.then_on(self.class.global_thread_pool_executor, resolved) { resolved }
end

def parent_resource
Expand Down Expand Up @@ -108,6 +113,64 @@ def updated_at

private

def future_resolve_sideloads(results)
return Concurrent::Promises.fulfilled_future(nil, self.class.global_thread_pool_executor) if results == []

sideload_promises = @query.sideloads.filter_map do |name, q|
sideload = @resource.class.sideload(name)
next if sideload.nil? || sideload.shared_remote?

p = future_with_context(results, q, @resource) do |parent_results, sideload_query, parent_resource|
Graphiti.config.before_sideload&.call(Graphiti.context)
sideload.future_resolve(parent_results, sideload_query, parent_resource)
end
p.flat
end

Concurrent::Promises.zip_futures_on(self.class.global_thread_pool_executor, *sideload_promises)
.rescue_on(self.class.global_thread_pool_executor) do |*reasons|
first_error = reasons.find { |r| r.is_a?(Exception) }
raise first_error
end
end

def future_with_context(*args)
thread_storage = Thread.current.keys.each_with_object({}) do |key, memo|
memo[key] = Thread.current[key]
end
fiber_storage =
if Fiber.current.respond_to?(:storage)
Fiber.current&.storage&.keys&.each_with_object({}) do |key, memo|
memo[key] = Fiber[key]
end
end

Concurrent::Promises.future_on(
self.class.global_thread_pool_executor, Thread.current.object_id, thread_storage, fiber_storage, *args
) do |thread_id, thread_storage, fiber_storage, *args|
execution_context_changed = thread_id != Thread.current.object_id
if execution_context_changed
thread_storage&.keys&.each_with_object(Thread.current) do |key, thread_current|
thread_current[key] = thread_storage[key]
end
fiber_storage&.keys&.each_with_object(Fiber) do |key, fiber_current|
fiber_current[key] = fiber_storage[key]
end
end

result = Graphiti.broadcast(:global_thread_pool_task_run, self.class.global_thread_pool_stats) do
yield(*args)
end

if execution_context_changed
thread_storage&.keys&.each { |key| Thread.current[key] = nil }
fiber_storage&.keys&.each { |key| Fiber[key] = nil }
end

result
end
end

def sideload_resource_proxies
@sideload_resource_proxies ||= begin
@object = @resource.before_resolve(@object, @query)
Expand Down
13 changes: 9 additions & 4 deletions lib/graphiti/sideload.rb
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ def build_resource_proxy(parents, query, graph_parent)
end

def load(parents, query, graph_parent)
build_resource_proxy(parents, query, graph_parent).to_a
future_load(parents, query, graph_parent).value!
end

# Override in subclass
Expand Down Expand Up @@ -285,7 +285,7 @@ def assign(parents, children)
children.replace(associated) if track_associated
end

def resolve(parents, query, graph_parent)
def future_resolve(parents, query, graph_parent)
if single? && parents.length > 1
raise Errors::SingularSideload.new(self, parents.length)
end
Expand All @@ -299,11 +299,11 @@ def resolve(parents, query, graph_parent)
sideload: self,
sideload_parent_length: parents.length,
default_paginate: false
sideload_scope.resolve do |sideload_results|
sideload_scope.future_resolve do |sideload_results|
fire_assign(parents, sideload_results)
end
else
load(parents, query, graph_parent)
future_load(parents, query, graph_parent)
end
end

Expand Down Expand Up @@ -367,6 +367,11 @@ def resource_class_loaded?

private

def future_load(parents, query, graph_parent)
proxy = build_resource_proxy(parents, query, graph_parent)
proxy.respond_to?(:future_resolve_data) ? proxy.future_resolve_data : Concurrent::Promises.fulfilled_future(proxy)
end

def blank_query?(params)
if (filter = params[:filter])
if filter.values == [""]
Expand Down
9 changes: 7 additions & 2 deletions lib/graphiti/sideload/polymorphic_belongs_to.rb
Original file line number Diff line number Diff line change
Expand Up @@ -108,18 +108,23 @@ def child_for_type!(type)
end

def resolve(parents, query, graph_parent)
parents.group_by(&grouper.field_name).each_pair do |group_name, group|
future_resolve(parents, query, graph_parent).value!
end

def future_resolve(parents, query, graph_parent)
promises = parents.group_by(&grouper.field_name).filter_map do |(group_name, group)|
next if group_name.nil? || grouper.ignore?(group_name)

match = ->(c) { c.group_name == group_name.to_sym }
if (sideload = children.values.find(&match))
duped = remove_invalid_sideloads(sideload.resource, query)
sideload.resolve(group, duped, graph_parent)
sideload.future_resolve(group, duped, graph_parent)
else
err = ::Graphiti::Errors::PolymorphicSideloadChildNotFound
raise err.new(self, group_name)
end
end
Concurrent::Promises.zip(*promises)
end

private
Expand Down
15 changes: 15 additions & 0 deletions spec/configuration_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,21 @@
end
end

describe "#concurrency_max_threads" do
include_context "with config", :concurrency_max_threads

it "defaults" do
expect(Graphiti.config.concurrency_max_threads).to eq(4)
end

it "is overridable" do
Graphiti.configure do |c|
c.concurrency_max_threads = 1
end
expect(Graphiti.config.concurrency_max_threads).to eq(1)
end
end

describe "#raise_on_missing_sideload" do
include_context "with config", :raise_on_missing_sideload

Expand Down
Loading
Loading