-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix concurrent dispatcher #338
Fix concurrent dispatcher #338
Conversation
Hey there! 👋🏼 We require pull request titles to follow the Conventional Commits specification and it looks like your proposed title needs to be adjusted.
Details:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great! But let's rename that variable and add some comments for better clarity.
|
||
let next_expected_batch = Arc::new(Mutex::new(None)); | ||
|
||
let stop_receiver_clone = stop_receiver.clone(); | ||
let pool_clone = self.pool.clone(); | ||
let config_clone = self.config.clone(); | ||
let next_expected_batch_clone = next_expected_batch.clone(); | ||
let pending_blobs_reader = tokio::spawn(async move { | ||
let mut pending_blobs = JoinSet::new(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Variable name is not descriptive, I would rename it to something like dispatcher_task_set
or disptacher_tasks
.
|
||
let next_expected_batch = Arc::new(Mutex::new(None)); | ||
|
||
let stop_receiver_clone = stop_receiver.clone(); | ||
let pool_clone = self.pool.clone(); | ||
let config_clone = self.config.clone(); | ||
let next_expected_batch_clone = next_expected_batch.clone(); | ||
let pending_blobs_reader = tokio::spawn(async move { | ||
let mut pending_blobs = JoinSet::new(); | ||
pending_blobs.spawn(async move { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Considering that we now don't have a _reader
variable, add a comment above this line stating that this task reads pending blobs from de database.
@@ -120,99 +127,138 @@ impl DataAvailabilityDispatcher { | |||
let client = self.client.clone(); | |||
let request_semaphore = self.request_semaphore.clone(); | |||
let notifier = Arc::new(Notify::new()); | |||
let pending_blobs_sender = tokio::spawn(async move { | |||
let mut spawned_requests = vec![]; | |||
pending_blobs.spawn(async move { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as the other comment, but clarifying that this task is in charge sending blobs to the dispatcher.
598d6e0
into
eigen-client-m0-grafana-metrics
* initial commit * initial commit * fix ambiguous job name & add docker restart command * fix integration test command * update readme, remove fetching lambda repo * reorganize readme instructions * Fix concurrent dispatcher (#338) * Add shutdown to dispatch batches * Add JoinSet * Format code * Fix unbounded channel breaking authenticated dispersal * Fix pr comments * feat(eigen-client-m0-implementation): optimize concurrent dispatcher (#345) * initial commit * optimize dispatch_batches fn * remove commented code * remove needless variables * optimize inclusion_poller fn * break loop if dispatch fail * remove client_lock variable * switch to retriable err * replace arbitrary value with config --------- Co-authored-by: Gianbelinche <[email protected]>
What ❔
This PR fixes a bug with concurrent dispatcher, where the loop was never stopped so the pending blobs sender thread never stopped, so no error was shown when a dispatched failed.
Why ❔
Checklist
zkstack dev fmt
andzkstack dev lint
.