diff --git a/src/amplitude_experiment/cohort/cohort_loader.py b/src/amplitude_experiment/cohort/cohort_loader.py index e951cd9..e5d6639 100644 --- a/src/amplitude_experiment/cohort/cohort_loader.py +++ b/src/amplitude_experiment/cohort/cohort_loader.py @@ -39,16 +39,20 @@ def download_cohort(self, cohort_id: str) -> Cohort: def update_stored_cohorts(self) -> Future: def update_task(): errors = [] - try: - futures = [self.executor.submit(self.__load_cohort_internal, cohort_id) for cohort_id in self.cohort_storage.get_cohort_ids()] + cohort_ids = self.cohort_storage.get_cohort_ids() - for future, cohort_id in zip(as_completed(futures), self.cohort_storage.get_cohort_ids()): - try: - future.result() - except Exception as e: - errors.append((cohort_id, e)) - except Exception as e: - errors.append(e) + futures = [] + with self.lock_jobs: + for cohort_id in cohort_ids: + future = self.load_cohort(cohort_id) + futures.append(future) + + for future in as_completed(futures): + cohort_id = next(c_id for c_id, f in self.jobs.items() if f == future) + try: + future.result() + except Exception as e: + errors.append((cohort_id, e)) if errors: raise CohortUpdateException(errors)