diff --git a/pgq/cascade/worker.py b/pgq/cascade/worker.py index 9070a7e..bd30d4f 100644 --- a/pgq/cascade/worker.py +++ b/pgq/cascade/worker.py @@ -182,7 +182,10 @@ def is_batch_done(self, state: DictRow, batch_info: BatchInfo, dst_db: Connectio # on combined-branch the target can get several batches ahead if self._worker_state and self._worker_state.wait_behind: self.wait_for_tick(dst_db, batch_info['tick_id']) - state = self._consumer_state + + # refresh state + if self._consumer_state: + state = self._consumer_state # handle combined_queue type change (branch->root) if self._worker_state and self.was_wait_behind: