diff --git a/engine/execution/ingestion/uploader/uploader.go b/engine/execution/ingestion/uploader/uploader.go index db8bbe93189..d1dcabb0cbc 100644 --- a/engine/execution/ingestion/uploader/uploader.go +++ b/engine/execution/ingestion/uploader/uploader.go @@ -33,6 +33,8 @@ func NewAsyncUploader(uploader Uploader, metrics: metrics, retryInitialTimeout: retryInitialTimeout, maxRetryNumber: maxRetryNumber, + // we use a channel rather than a Fifoqueue here because a Fifoqueue might drop items when full, + // but it is not acceptable to skip uploading an execution result queue: make(chan *execution.ComputationResult, 100), } builder := component.NewComponentManagerBuilder() diff --git a/engine/execution/ingestion/uploader/uploader_test.go b/engine/execution/ingestion/uploader/uploader_test.go index f4ee13883a4..73a72b91507 100644 --- a/engine/execution/ingestion/uploader/uploader_test.go +++ b/engine/execution/ingestion/uploader/uploader_test.go @@ -206,7 +206,7 @@ func Test_AsyncUploader(t *testing.T) { err := async.Upload(computationResult) require.NoError(t, err) - wgUploadCalleded.Wait() + unittest.AssertReturnsBefore(t, wgUploadCalleded.Wait, time.Second) cancel() unittest.AssertClosesBefore(t, async.Done(), 1*time.Second, "async uploader not done in time")