diff --git a/cmd/activator/main.go b/cmd/activator/main.go index 9756e4d43bce..b765ed1c2247 100644 --- a/cmd/activator/main.go +++ b/cmd/activator/main.go @@ -111,7 +111,7 @@ func main() { // We sometimes startup faster than we can reach kube-api. Poll on failure to prevent us terminating var err error - if perr := wait.PollImmediate(time.Second, 60*time.Second, func() (bool, error) { + if perr := wait.PollUntilContextTimeout(ctx, time.Second, 60*time.Second, true, func(context.Context) (bool, error) { if err = version.CheckMinimumVersion(kubeClient.Discovery()); err != nil { log.Print("Failed to get k8s version ", err) } diff --git a/cmd/default-domain/main.go b/cmd/default-domain/main.go index 48c164a81910..94efc88c827c 100644 --- a/cmd/default-domain/main.go +++ b/cmd/default-domain/main.go @@ -124,7 +124,7 @@ func findGatewayAddress(ctx context.Context, kubeclient kubernetes.Interface, cl defer client.NetworkingV1alpha1().Ingresses(system.Namespace()).Delete(ctx, ing.Name, metav1.DeleteOptions{}) // Wait for the Ingress to be Ready. - if err := wait.PollImmediate(pollInterval, waitTimeout, func() (done bool, err error) { + if err := wait.PollUntilContextTimeout(ctx, pollInterval, waitTimeout, true, func(context.Context) (done bool, err error) { ing, err = client.NetworkingV1alpha1().Ingresses(system.Namespace()).Get( ctx, ing.Name, metav1.GetOptions{}) if err != nil { @@ -149,7 +149,7 @@ func findGatewayAddress(ctx context.Context, kubeclient kubernetes.Interface, cl // Wait for the Ingress Service to have an external IP. var svc *corev1.Service - if err := wait.PollImmediate(pollInterval, waitTimeout, func() (done bool, err error) { + if err := wait.PollUntilContextTimeout(ctx, pollInterval, waitTimeout, true, func(context.Context) (done bool, err error) { svc, err = kubeclient.CoreV1().Services(namespace).Get(ctx, name, metav1.GetOptions{}) if err != nil { return true, err diff --git a/pkg/activator/certificate/cache_test.go b/pkg/activator/certificate/cache_test.go index dce999db213c..091800168f73 100644 --- a/pkg/activator/certificate/cache_test.go +++ b/pkg/activator/certificate/cache_test.go @@ -92,7 +92,7 @@ func TestReconcile(t *testing.T) { fakesecretinformer.Get(ctx).Informer().GetIndexer().Add(secret) // Wait for the resources to be created and the handler is called. - if err := wait.PollImmediate(10*time.Millisecond, 2*time.Second, func() (bool, error) { + if err := wait.PollUntilContextTimeout(ctx, 10*time.Millisecond, 2*time.Second, true, func(context.Context) (bool, error) { // To access cert.Certificate, take a lock. cr.certificatesMux.RLock() defer cr.certificatesMux.RUnlock() @@ -108,7 +108,7 @@ func TestReconcile(t *testing.T) { newCert, _ := tls.X509KeyPair(newTLSCrt, newTLSKey) fakekubeclient.Get(ctx).CoreV1().Secrets(system.Namespace()).Update(ctx, secret, metav1.UpdateOptions{}) - if err := wait.PollImmediate(10*time.Millisecond, 5*time.Second, func() (bool, error) { + if err := wait.PollUntilContextTimeout(ctx, 10*time.Millisecond, 5*time.Second, true, func(context.Context) (bool, error) { // To access cert.Certificate, take a lock. cr.certificatesMux.RLock() defer cr.certificatesMux.RUnlock() @@ -132,7 +132,7 @@ func TestReconcile(t *testing.T) { pool.AddCert(ca) fakekubeclient.Get(ctx).CoreV1().Secrets(system.Namespace()).Update(ctx, secret, metav1.UpdateOptions{}) - if err := wait.PollImmediate(10*time.Millisecond, 10*time.Second, func() (bool, error) { + if err := wait.PollUntilContextTimeout(ctx, 10*time.Millisecond, 10*time.Second, true, func(context.Context) (bool, error) { // To access cr.TLSConf.RootCAs, take a lock. cr.certificatesMux.RLock() defer cr.certificatesMux.RUnlock() diff --git a/pkg/activator/net/throttler_test.go b/pkg/activator/net/throttler_test.go index 8bb76dbb3969..5a28aa5c3c99 100644 --- a/pkg/activator/net/throttler_test.go +++ b/pkg/activator/net/throttler_test.go @@ -329,7 +329,7 @@ func TestThrottlerErrorNoRevision(t *testing.T) { // Eventually it should now fail. var lastError error - wait.PollInfinite(10*time.Millisecond, func() (bool, error) { + wait.PollUntilContextCancel(ctx, 10*time.Millisecond, false, func(context.Context) (bool, error) { lastError = throttler.Try(ctx, revID, func(string) error { return nil }) return lastError != nil, nil }) @@ -555,7 +555,7 @@ func TestThrottlerSuccesses(t *testing.T) { if *cc != 0 { wantCapacity = dests * int(*cc) } - if err := wait.PollImmediate(10*time.Millisecond, 3*time.Second, func() (bool, error) { + if err := wait.PollUntilContextTimeout(ctx, 10*time.Millisecond, 3*time.Second, true, func(context.Context) (bool, error) { rt.mux.RLock() defer rt.mux.RUnlock() if *cc != 0 { @@ -770,7 +770,7 @@ func TestActivatorsIndexUpdate(t *testing.T) { // Verify capacity gets updated. This is the very last thing we update // so we now know that the rest is set statically. - if err := wait.PollImmediate(10*time.Millisecond, time.Second, func() (bool, error) { + if err := wait.PollUntilContextTimeout(ctx, 10*time.Millisecond, time.Second, true, func(context.Context) (bool, error) { // Capacity doesn't exceed 1 in this test. return rt.breaker.Capacity() == 1, nil }); err != nil { @@ -795,7 +795,7 @@ func TestActivatorsIndexUpdate(t *testing.T) { endpoints.Informer().GetIndexer().Update(publicEp) // Verify the index was computed. - if err := wait.PollImmediate(10*time.Millisecond, time.Second, func() (bool, error) { + if err := wait.PollUntilContextTimeout(ctx, 10*time.Millisecond, time.Second, true, func(context.Context) (bool, error) { return rt.numActivators.Load() == 1 && rt.activatorIndex.Load() == 0, nil }); err != nil { @@ -867,7 +867,7 @@ func TestMultipleActivators(t *testing.T) { // Verify capacity gets updated. This is the very last thing we update // so we now know that we got and processed both the activator endpoints // and the application endpoints. - if err := wait.PollImmediate(10*time.Millisecond, time.Second, func() (bool, error) { + if err := wait.PollUntilContextTimeout(ctx, 10*time.Millisecond, time.Second, true, func(context.Context) (bool, error) { return rt.breaker.Capacity() == 1, nil }); err != nil { t.Fatal("Timed out waiting for the capacity to be updated") diff --git a/pkg/autoscaler/metrics/collector_test.go b/pkg/autoscaler/metrics/collector_test.go index 50458764dba9..734db7eccbb4 100644 --- a/pkg/autoscaler/metrics/collector_test.go +++ b/pkg/autoscaler/metrics/collector_test.go @@ -17,6 +17,7 @@ limitations under the License. package metrics import ( + "context" "errors" "math" "testing" @@ -185,7 +186,7 @@ func TestMetricCollectorScraperMovingTime(t *testing.T) { } var gotRPS, gotConcurrency, panicRPS, panicConcurrency float64 // Poll to see that the async loop completed. - wait.PollImmediate(10*time.Millisecond, 2*time.Second, func() (bool, error) { + wait.PollUntilContextTimeout(context.Background(), 10*time.Millisecond, 2*time.Second, true, func(context.Context) (bool, error) { gotConcurrency, panicConcurrency, _ = coll.StableAndPanicConcurrency(metricKey, now) gotRPS, panicRPS, _ = coll.StableAndPanicRPS(metricKey, now) return gotConcurrency == wantConcurrency && @@ -256,7 +257,7 @@ func TestMetricCollectorScraper(t *testing.T) { } var gotRPS, gotConcurrency, panicRPS, panicConcurrency float64 // Poll to see that the async loop completed. - wait.PollImmediate(10*time.Millisecond, 2*time.Second, func() (bool, error) { + wait.PollUntilContextTimeout(context.Background(), 10*time.Millisecond, 2*time.Second, true, func(context.Context) (bool, error) { gotConcurrency, panicConcurrency, _ = coll.StableAndPanicConcurrency(metricKey, now) gotRPS, panicRPS, _ = coll.StableAndPanicRPS(metricKey, now) return gotConcurrency == wantConcurrency && @@ -289,7 +290,7 @@ func TestMetricCollectorScraper(t *testing.T) { mtp.Channel <- now // Wait for async loop to finish. - if err := wait.PollImmediate(10*time.Millisecond, 2*time.Second, func() (bool, error) { + if err := wait.PollUntilContextTimeout(context.Background(), 10*time.Millisecond, 2*time.Second, true, func(context.Context) (bool, error) { gotConcurrency, _, _ = coll.StableAndPanicConcurrency(metricKey, now.Add(defaultMetric.Spec.StableWindow).Add(-5*time.Second)) gotRPS, _, _ = coll.StableAndPanicRPS(metricKey, now.Add(defaultMetric.Spec.StableWindow).Add(-5*time.Second)) return gotConcurrency == reportConcurrency*5 && gotRPS == reportRPS*5, nil diff --git a/pkg/autoscaler/statforwarder/forwarder_test.go b/pkg/autoscaler/statforwarder/forwarder_test.go index 521dc905a589..0b44a7972c2a 100644 --- a/pkg/autoscaler/statforwarder/forwarder_test.go +++ b/pkg/autoscaler/statforwarder/forwarder_test.go @@ -17,6 +17,7 @@ limitations under the License. package statforwarder import ( + "context" "errors" "fmt" "os" @@ -117,7 +118,7 @@ func TestForwarderReconcile(t *testing.T) { var lastErr error // Wait for the resources to be created. - if err := wait.PollImmediate(10*time.Millisecond, 2*time.Second, func() (bool, error) { + if err := wait.PollUntilContextTimeout(ctx, 10*time.Millisecond, 2*time.Second, true, func(context.Context) (bool, error) { _, lastErr = service.Lister().Services(testNs).Get(bucket1) return lastErr == nil, nil }); err != nil { @@ -137,7 +138,7 @@ func TestForwarderReconcile(t *testing.T) { // Check the endpoints got updated. el := endpoints.Lister().Endpoints(testNs) - if err := wait.PollImmediate(10*time.Millisecond, 2*time.Second, func() (bool, error) { + if err := wait.PollUntilContextTimeout(ctx, 10*time.Millisecond, 2*time.Second, true, func(context.Context) (bool, error) { got, err := el.Get(bucket1) if err != nil { lastErr = err @@ -161,7 +162,7 @@ func TestForwarderReconcile(t *testing.T) { // Check that the endpoints got updated. wantSubsets[0].Addresses[0].IP = testIP2 - if err := wait.PollImmediate(10*time.Millisecond, 10*time.Second, func() (bool, error) { + if err := wait.PollUntilContextTimeout(ctx, 10*time.Millisecond, 10*time.Second, true, func(context.Context) (bool, error) { // Check the endpoints get updated. got, err := el.Get(bucket1) if err != nil { @@ -445,7 +446,7 @@ func TestProcess(t *testing.T) { kubeClient.CoordinationV1().Leases(testNs).Create(ctx, anotherLease, metav1.CreateOptions{}) lease.Informer().GetIndexer().Add(anotherLease) - if err := wait.PollImmediate(10*time.Millisecond, 10*time.Second, func() (bool, error) { + if err := wait.PollUntilContextTimeout(ctx, 10*time.Millisecond, 10*time.Second, true, func(context.Context) (bool, error) { _, p1owned := f.getProcessor(bucket1).(*localProcessor) _, p2notowned := f.getProcessor(bucket2).(*remoteProcessor) return p1owned && p2notowned, nil diff --git a/pkg/autoscaler/statforwarder/leases.go b/pkg/autoscaler/statforwarder/leases.go index 58229ec876d0..7c27a0ca9974 100644 --- a/pkg/autoscaler/statforwarder/leases.go +++ b/pkg/autoscaler/statforwarder/leases.go @@ -202,7 +202,7 @@ func (f *leaseTracker) leaseUpdated(obj interface{}) { func (f *leaseTracker) createService(ctx context.Context, ns, n string) error { var lastErr error - if err := wait.PollImmediate(retryInterval, retryTimeout, func() (bool, error) { + if err := wait.PollUntilContextTimeout(ctx, retryInterval, retryTimeout, true, func(context.Context) (bool, error) { _, lastErr = f.kc.CoreV1().Services(ns).Create(ctx, &corev1.Service{ ObjectMeta: metav1.ObjectMeta{ Name: n, @@ -247,7 +247,7 @@ func (f *leaseTracker) createOrUpdateEndpoints(ctx context.Context, ns, n string exists := true var lastErr error - if err := wait.PollImmediate(retryInterval, retryTimeout, func() (bool, error) { + if err := wait.PollUntilContextTimeout(ctx, retryInterval, retryTimeout, true, func(context.Context) (bool, error) { e, err := f.endpointsLister.Endpoints(ns).Get(n) if apierrs.IsNotFound(err) { exists = false @@ -281,7 +281,7 @@ func (f *leaseTracker) createOrUpdateEndpoints(ctx context.Context, ns, n string return nil } - if err := wait.PollImmediate(retryInterval, retryTimeout, func() (bool, error) { + if err := wait.PollUntilContextTimeout(ctx, retryInterval, retryTimeout, true, func(context.Context) (bool, error) { _, lastErr = f.kc.CoreV1().Endpoints(ns).Create(ctx, &corev1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ Name: n, diff --git a/pkg/queue/certificate/watcher_test.go b/pkg/queue/certificate/watcher_test.go index 1fe6416d7f8b..f5fea36e0381 100644 --- a/pkg/queue/certificate/watcher_test.go +++ b/pkg/queue/certificate/watcher_test.go @@ -17,6 +17,7 @@ limitations under the License. package certificate import ( + "context" "crypto/tls" "crypto/x509" "os" @@ -70,7 +71,7 @@ func TestCertificateRotation(t *testing.T) { // CertWatcher should return the new certificate // Give CertWatcher some time to update the certificate - if err := wait.Poll(1*time.Second, 60*time.Second, func() (bool, error) { + if err := wait.PollUntilContextTimeout(context.Background(), 1*time.Second, 60*time.Second, true, func(context.Context) (bool, error) { c, err = cw.GetCertificate(nil) if err != nil { return false, err diff --git a/pkg/queue/readiness/probe.go b/pkg/queue/readiness/probe.go index f2c5dad15182..8684e7cb6a22 100644 --- a/pkg/queue/readiness/probe.go +++ b/pkg/queue/readiness/probe.go @@ -17,6 +17,7 @@ limitations under the License. package readiness import ( + "context" "fmt" "io" "os" @@ -163,7 +164,7 @@ func (p *Probe) doProbe(probe func(time.Duration) error) error { var failCount int var lastProbeErr error - pollErr := wait.PollImmediate(retryInterval, p.pollTimeout, func() (bool, error) { + pollErr := wait.PollUntilContextTimeout(context.Background(), retryInterval, p.pollTimeout, true, func(context.Context) (bool, error) { if err := probe(aggressiveProbeTimeout); err != nil { // Reset count of consecutive successes to zero. p.count = 0 diff --git a/pkg/reconciler/accessor/networking/certificate_test.go b/pkg/reconciler/accessor/networking/certificate_test.go index 72c5a27f0511..fe4439e5b384 100644 --- a/pkg/reconciler/accessor/networking/certificate_test.go +++ b/pkg/reconciler/accessor/networking/certificate_test.go @@ -98,7 +98,7 @@ func TestReconcileCertificateCreate(t *testing.T) { ReconcileCertificate(ctx, ownerObj, desired, accessor) lister := fakecertinformer.Get(ctx).Lister() - if err := wait.PollImmediate(10*time.Millisecond, 5*time.Second, func() (bool, error) { + if err := wait.PollUntilContextTimeout(ctx, 10*time.Millisecond, 5*time.Second, true, func(context.Context) (bool, error) { cert, err := lister.Certificates(desired.Namespace).Get(desired.Name) if errors.IsNotFound(err) { return false, nil @@ -121,7 +121,7 @@ func TestReconcileCertificateUpdate(t *testing.T) { ReconcileCertificate(ctx, ownerObj, desired, accessor) lister := fakecertinformer.Get(ctx).Lister() - if err := wait.PollImmediate(10*time.Millisecond, 5*time.Second, func() (bool, error) { + if err := wait.PollUntilContextTimeout(ctx, 10*time.Millisecond, 5*time.Second, true, func(context.Context) (bool, error) { cert, err := lister.Certificates(desired.Namespace).Get(desired.Name) if errors.IsNotFound(err) { return false, nil diff --git a/pkg/reconciler/autoscaling/kpa/kpa_test.go b/pkg/reconciler/autoscaling/kpa/kpa_test.go index 735f1d1cf7cd..c45d96449c6c 100644 --- a/pkg/reconciler/autoscaling/kpa/kpa_test.go +++ b/pkg/reconciler/autoscaling/kpa/kpa_test.go @@ -1377,7 +1377,7 @@ func TestReconcileDeciderCreatesAndDeletes(t *testing.T) { fakenetworkingclient.Get(ctx).NetworkingV1alpha1().ServerlessServices(testNamespace).Create(ctx, sks, metav1.CreateOptions{}) fakeservingclient.Get(ctx).AutoscalingV1alpha1().PodAutoscalers(testNamespace).Create(ctx, kpa, metav1.CreateOptions{}) - wait.PollImmediate(10*time.Millisecond, 5*time.Second, func() (bool, error) { + wait.PollUntilContextTimeout(ctx, 10*time.Millisecond, 5*time.Second, true, func(context.Context) (bool, error) { _, err := fakepainformer.Get(ctx).Lister().PodAutoscalers(testNamespace).Get(kpa.Name) if err != nil && apierrors.IsNotFound(err) { return false, nil @@ -1399,7 +1399,7 @@ func TestReconcileDeciderCreatesAndDeletes(t *testing.T) { // The ReconcileKind call hasn't finished yet at the point where the Decider is created, // so give it more time to finish before checking the PA for IsReady(). - if err := wait.PollImmediate(10*time.Millisecond, 5*time.Second, func() (bool, error) { + if err := wait.PollUntilContextTimeout(ctx, 10*time.Millisecond, 5*time.Second, true, func(context.Context) (bool, error) { newKPA, err := fakeservingclient.Get(ctx).AutoscalingV1alpha1().PodAutoscalers(kpa.Namespace).Get( ctx, kpa.Name, metav1.GetOptions{}) if err != nil && apierrors.IsNotFound(err) { @@ -1665,7 +1665,7 @@ func TestScaleFailure(t *testing.T) { } func pollDeciders(deciders *testDeciders, namespace, name string, cond func(*scaling.Decider) bool) (decider *scaling.Decider, err error) { - wait.PollImmediate(10*time.Millisecond, 3*time.Second, func() (bool, error) { + wait.PollUntilContextTimeout(context.Background(), 10*time.Millisecond, 3*time.Second, true, func(context.Context) (bool, error) { decider, err = deciders.Get(context.Background(), namespace, name) if err != nil { return false, nil diff --git a/pkg/reconciler/metric/metric_test.go b/pkg/reconciler/metric/metric_test.go index c62087c71640..9f920931728a 100644 --- a/pkg/reconciler/metric/metric_test.go +++ b/pkg/reconciler/metric/metric_test.go @@ -211,14 +211,14 @@ func TestReconcileWithCollector(t *testing.T) { scs.AutoscalingV1alpha1().Metrics(m.Namespace).Create(ctx, m, metav1.CreateOptions{}) - if err := wait.PollImmediate(10*time.Millisecond, 5*time.Second, func() (bool, error) { + if err := wait.PollUntilContextTimeout(ctx, 10*time.Millisecond, 5*time.Second, true, func(context.Context) (bool, error) { return collector.createOrUpdateCalls.Load() > 0, nil }); err != nil { t.Fatal("CreateOrUpdate() called 0 times, want non-zero times") } scs.AutoscalingV1alpha1().Metrics(m.Namespace).Delete(ctx, m.Name, metav1.DeleteOptions{}) - if err := wait.PollImmediate(10*time.Millisecond, 5*time.Second, func() (bool, error) { + if err := wait.PollUntilContextTimeout(ctx, 10*time.Millisecond, 5*time.Second, true, func(context.Context) (bool, error) { return collector.deleteCalls.Load() > 0, nil }); err != nil { t.Fatal("Delete() called 0 times, want non-zero times") diff --git a/pkg/reconciler/revision/revision_test.go b/pkg/reconciler/revision/revision_test.go index fb6a1743906d..462cff4e8581 100644 --- a/pkg/reconciler/revision/revision_test.go +++ b/pkg/reconciler/revision/revision_test.go @@ -502,7 +502,7 @@ func TestGlobalResyncOnDefaultCMChange(t *testing.T) { revClient.Create(ctx, rev, metav1.CreateOptions{}) revL := fakerevisioninformer.Get(ctx).Lister() - if err := wait.PollImmediate(10*time.Millisecond, 5*time.Second, func() (bool, error) { + if err := wait.PollUntilContextTimeout(ctx, 10*time.Millisecond, 5*time.Second, true, func(context.Context) (bool, error) { // The only error we're getting in the test reasonably is NotFound. r, _ := revL.Revisions(rev.Namespace).Get(rev.Name) return r != nil && r.Status.ObservedGeneration == r.Generation, nil @@ -513,7 +513,7 @@ func TestGlobalResyncOnDefaultCMChange(t *testing.T) { // Ensure initial PA is in the informers. paL := fakepainformer.Get(ctx).Lister().PodAutoscalers(rev.Namespace) - if ierr := wait.PollImmediate(50*time.Millisecond, 6*time.Second, func() (bool, error) { + if ierr := wait.PollUntilContextTimeout(ctx, 10*time.Millisecond, 6*time.Second, true, func(context.Context) (bool, error) { _, err = paL.Get(rev.Name) return err == nil, nil }); ierr != nil { @@ -549,7 +549,7 @@ func TestGlobalResyncOnDefaultCMChange(t *testing.T) { pa, err := paL.Get(rev.Name) t.Logf("Initial PA: %#v GetErr: %v", pa, err) - if ierr := wait.PollImmediate(50*time.Millisecond, 2*time.Second, func() (bool, error) { + if ierr := wait.PollUntilContextTimeout(ctx, 50*time.Millisecond, 2*time.Second, true, func(context.Context) (bool, error) { pa, err = paL.Get(rev.Name) return pa != nil && pa.Spec.ContainerConcurrency == pos, nil }); ierr == nil { // err==nil! @@ -586,7 +586,7 @@ func TestGlobalResyncOnConfigMapUpdateRevision(t *testing.T) { revClient.Create(ctx, rev, metav1.CreateOptions{}) revL := fakerevisioninformer.Get(ctx).Lister() - if err := wait.PollImmediate(10*time.Millisecond, 5*time.Second, func() (bool, error) { + if err := wait.PollUntilContextTimeout(ctx, 10*time.Millisecond, 5*time.Second, true, func(context.Context) (bool, error) { // The only error we're getting in the test reasonably is NotFound. r, _ := revL.Revisions(rev.Namespace).Get(rev.Name) // We only create a single revision, but make sure it is reconciled. @@ -608,7 +608,7 @@ func TestGlobalResyncOnConfigMapUpdateRevision(t *testing.T) { }) want := "http://new-logging.test.com?filter=" + string(rev.UID) - if ierr := wait.PollImmediate(50*time.Millisecond, 5*time.Second, func() (bool, error) { + if ierr := wait.PollUntilContextTimeout(ctx, 50*time.Millisecond, 5*time.Second, true, func(context.Context) (bool, error) { r, _ := revL.Revisions(rev.Namespace).Get(rev.Name) return r != nil && r.Status.LogURL == want, nil }); ierr != nil { @@ -664,7 +664,7 @@ func TestGlobalResyncOnConfigMapUpdateDeployment(t *testing.T) { revClient.Create(ctx, rev, metav1.CreateOptions{}) revL := fakerevisioninformer.Get(ctx).Lister().Revisions(rev.Namespace) - if err := wait.PollImmediate(10*time.Millisecond, 5*time.Second, func() (bool, error) { + if err := wait.PollUntilContextTimeout(ctx, 10*time.Millisecond, 5*time.Second, true, func(context.Context) (bool, error) { // The only error we're getting in the test reasonably is NotFound. r, _ := revL.Get(rev.Name) // We only create a single revision, but make sure it is reconciled. @@ -677,7 +677,7 @@ func TestGlobalResyncOnConfigMapUpdateDeployment(t *testing.T) { watcher.OnChange(configMapToUpdate) depL := fakedeploymentinformer.Get(ctx).Lister().Deployments(rev.Namespace) - if err := wait.PollImmediate(10*time.Millisecond, 5*time.Second, func() (bool, error) { + if err := wait.PollUntilContextTimeout(ctx, 10*time.Millisecond, 5*time.Second, true, func(context.Context) (bool, error) { dep, _ := depL.Get(names.Deployment(rev)) return dep != nil && checkF(dep), nil }); err != nil { @@ -713,7 +713,7 @@ func TestNewRevisionCallsSyncHandler(t *testing.T) { } // Poll to see PA object to be created. - if err := wait.PollImmediate(25*time.Millisecond, 3*time.Second, func() (bool, error) { + if err := wait.PollUntilContextTimeout(ctx, 10*time.Millisecond, 3*time.Second, true, func(context.Context) (bool, error) { pa, _ := servingClient.AutoscalingV1alpha1().PodAutoscalers(rev.Namespace).Get( ctx, rev.Name, metav1.GetOptions{}) return pa != nil, nil @@ -723,7 +723,7 @@ func TestNewRevisionCallsSyncHandler(t *testing.T) { // Poll to see if the deployment is created. This should _already_ be there. depL := fakedeploymentinformer.Get(ctx).Lister().Deployments(rev.Namespace) - if err := wait.PollImmediate(10*time.Millisecond, 1*time.Second, func() (bool, error) { + if err := wait.PollUntilContextTimeout(ctx, 10*time.Millisecond, 1*time.Second, true, func(context.Context) (bool, error) { dep, _ := depL.Get(names.Deployment(rev)) return dep != nil, nil }); err != nil { diff --git a/pkg/reconciler/route/route_test.go b/pkg/reconciler/route/route_test.go index 74d565e23a6a..d90b9c9facef 100644 --- a/pkg/reconciler/route/route_test.go +++ b/pkg/reconciler/route/route_test.go @@ -1335,7 +1335,7 @@ func TestUpdateDomainConfigMap(t *testing.T) { // Wait initial reconcile to finish. rl := fakerouteinformer.Get(ctx).Lister().Routes(route.Namespace) - if err := wait.PollImmediate(10*time.Millisecond, 5*time.Second, func() (bool, error) { + if err := wait.PollUntilContextTimeout(ctx, 10*time.Millisecond, 5*time.Second, true, func(context.Context) (bool, error) { r, err := rl.Get(route.Name) if err != nil { return false, err @@ -1369,7 +1369,7 @@ func TestUpdateDomainConfigMap(t *testing.T) { } // Ensure we have the proper version in the informers. - if err := wait.PollImmediate(10*time.Millisecond, 3*time.Second, func() (bool, error) { + if err := wait.PollUntilContextTimeout(ctx, 10*time.Millisecond, 3*time.Second, true, func(context.Context) (bool, error) { r, err := rl.Get(route.Name) return r != nil && r.Generation == route.Generation, err }); err != nil { @@ -1383,7 +1383,7 @@ func TestUpdateDomainConfigMap(t *testing.T) { } var gotDomain string - if err := wait.PollImmediate(10*time.Millisecond, 5*time.Second, func() (bool, error) { + if err := wait.PollUntilContextTimeout(ctx, 10*time.Millisecond, 5*time.Second, true, func(context.Context) (bool, error) { r, err := routeClient.Get(ctx, route.Name, metav1.GetOptions{}) if err != nil { return false, err @@ -1498,7 +1498,7 @@ func TestGlobalResyncOnUpdateDomainConfigMap(t *testing.T) { } rl := routeInformer.Lister() - if err := wait.PollImmediate(10*time.Millisecond, 5*time.Second, func() (bool, error) { + if err := wait.PollUntilContextTimeout(ctx, 10*time.Millisecond, 5*time.Second, true, func(context.Context) (bool, error) { r, err := rl.Routes(route.Namespace).Get(route.Name) if err != nil && errors.IsNotFound(err) { return false, nil @@ -1514,7 +1514,7 @@ func TestGlobalResyncOnUpdateDomainConfigMap(t *testing.T) { test.doThings(watcher) expectedDomain := fmt.Sprintf("%s.%s.%s", route.Name, route.Namespace, test.expectedDomainSuffix) - if err := wait.PollImmediate(10*time.Millisecond, 5*time.Second, func() (bool, error) { + if err := wait.PollUntilContextTimeout(ctx, 10*time.Millisecond, 5*time.Second, true, func(context.Context) (bool, error) { r, err := rl.Routes(route.Namespace).Get(route.Name) if err != nil { return false, err diff --git a/pkg/reconciler/serverlessservice/global_resync_test.go b/pkg/reconciler/serverlessservice/global_resync_test.go index cedd9f0a8f8d..ad0f0c0b1218 100644 --- a/pkg/reconciler/serverlessservice/global_resync_test.go +++ b/pkg/reconciler/serverlessservice/global_resync_test.go @@ -125,7 +125,7 @@ func TestGlobalResyncOnActivatorChange(t *testing.T) { } // Actively wait for the endpoints to change their value. - if err := wait.PollImmediate(25*time.Millisecond, 5*time.Second, func() (bool, error) { + if err := wait.PollUntilContextTimeout(ctx, 25*time.Millisecond, 5*time.Second, true, func(context.Context) (bool, error) { ep, err := epsInformer.Lister().Endpoints(ns1).Get(sks1) if err != nil && apierrors.IsNotFound(err) { return false, nil @@ -142,7 +142,7 @@ func TestGlobalResyncOnActivatorChange(t *testing.T) { } func waitForObservedGen(ctx context.Context, client networkingv1alpha1.NetworkingV1alpha1Interface, ns, name string, generation int64) error { - return wait.PollImmediate(10*time.Millisecond, 10*time.Second, func() (bool, error) { + return wait.PollUntilContextTimeout(ctx, 10*time.Millisecond, 10*time.Second, true, func(context.Context) (bool, error) { sks, err := client.ServerlessServices(ns).Get(ctx, name, metav1.GetOptions{}) if err != nil && apierrors.IsNotFound(err) { diff --git a/test/adding_tests.md b/test/adding_tests.md index fbbf720c45d1..0043ba4e7a73 100644 --- a/test/adding_tests.md +++ b/test/adding_tests.md @@ -176,7 +176,7 @@ reach the desired state. The `WaitFor*` functions use the kubernetes [`wait` package](https://godoc.org/k8s.io/apimachinery/pkg/util/wait). To poll they use -[`PollImmediate`](https://godoc.org/k8s.io/apimachinery/pkg/util/wait#PollImmediate) +[`PollUntilContextTimeout`](https://pkg.go.dev/k8s.io/apimachinery/pkg/util/wait#PollUntilContextTimeout) and the return values of the function you provide behave the same as [`ConditionFunc`](https://godoc.org/k8s.io/apimachinery/pkg/util/wait#ConditionFunc): a `bool` to indicate if the function should stop or continue polling, and an diff --git a/test/conformance/api/v1beta1/domain_mapping_test.go b/test/conformance/api/v1beta1/domain_mapping_test.go index 00df41f75e32..95e66cb9bed2 100644 --- a/test/conformance/api/v1beta1/domain_mapping_test.go +++ b/test/conformance/api/v1beta1/domain_mapping_test.go @@ -94,7 +94,7 @@ func TestDomainMapping(t *testing.T) { }) // Wait for DomainMapping to go Ready. - waitErr := wait.PollImmediate(test.PollInterval, test.PollTimeout, func() (bool, error) { + waitErr := wait.PollUntilContextTimeout(ctx, test.PollInterval, test.PollTimeout, true, func(context.Context) (bool, error) { var err error dm, err = clients.ServingBetaClient.DomainMappings.Get(context.Background(), dm.Name, metav1.GetOptions{}) if err != nil { @@ -162,7 +162,7 @@ func TestDomainMapping(t *testing.T) { }) // Second domain mapping should go to DomainMappingConditionDomainClaimed=false state. - waitErr = wait.PollImmediate(test.PollInterval, test.PollTimeout, func() (bool, error) { + waitErr = wait.PollUntilContextTimeout(ctx, test.PollInterval, test.PollTimeout, true, func(context.Context) (bool, error) { state, err := altClients.ServingBetaClient.DomainMappings.Get(context.Background(), dm.Name, metav1.GetOptions{}) if err != nil { return true, err @@ -196,7 +196,7 @@ func TestDomainMapping(t *testing.T) { } // The second DomainMapping should now be able to claim the domain. - waitErr = wait.PollImmediate(test.PollInterval, test.PollTimeout, func() (bool, error) { + waitErr = wait.PollUntilContextTimeout(ctx, test.PollInterval, test.PollTimeout, true, func(context.Context) (bool, error) { var err error altDm, err = altClients.ServingBetaClient.DomainMappings.Get(context.Background(), altDm.Name, metav1.GetOptions{}) if err != nil { diff --git a/test/conformance/runtime/readiness_probe_test.go b/test/conformance/runtime/readiness_probe_test.go index 80bb0d15cf02..9102eaabec7c 100644 --- a/test/conformance/runtime/readiness_probe_test.go +++ b/test/conformance/runtime/readiness_probe_test.go @@ -174,7 +174,7 @@ func TestProbeRuntimeAfterStartup(t *testing.T) { test.EnsureTearDown(t, clients, &names) url, client := waitReadyThenStartFailing(t, clients, names, period) - if err := wait.PollImmediate(1*time.Second, readinessPropagationTime, func() (bool, error) { + if err := wait.PollUntilContextTimeout(context.Background(), 1*time.Second, readinessPropagationTime, true, func(context.Context) (bool, error) { startFailing, err := http.NewRequest(http.MethodGet, url.String(), nil) if err != nil { return false, err diff --git a/test/e2e/autoscale_hpa_test.go b/test/e2e/autoscale_hpa_test.go index 88302f1e8937..4a380eb5d2c3 100644 --- a/test/e2e/autoscale_hpa_test.go +++ b/test/e2e/autoscale_hpa_test.go @@ -247,7 +247,7 @@ func waitForScaleToOne(t *testing.T, deploymentName string, clients *test.Client } func waitForHPAState(t *testing.T, name, namespace string, clients *test.Clients) error { - return wait.PollImmediate(time.Second, 15*time.Minute, func() (bool, error) { + return wait.PollUntilContextTimeout(context.Background(), time.Second, 15*time.Minute, true, func(context.Context) (bool, error) { hpa, err := clients.KubeClient.AutoscalingV2().HorizontalPodAutoscalers(namespace).Get(context.Background(), name, metav1.GetOptions{}) if err != nil { return false, err diff --git a/test/e2e/autoscale_test.go b/test/e2e/autoscale_test.go index ee6da40b332d..dc0d81d2cc1d 100644 --- a/test/e2e/autoscale_test.go +++ b/test/e2e/autoscale_test.go @@ -206,7 +206,7 @@ func TestTargetBurstCapacity(t *testing.T) { // Wait for two stable pods. obsScale := 0.0 - if err := wait.Poll(250*time.Millisecond, 2*cfg.StableWindow, func() (bool, error) { + if err := wait.PollUntilContextTimeout(context.Background(), 250*time.Millisecond, 2*cfg.StableWindow, true, func(context.Context) (bool, error) { obsScale, _, err = numberOfReadyPods(ctx) if err != nil { return false, err @@ -220,7 +220,7 @@ func TestTargetBurstCapacity(t *testing.T) { // Now read the service endpoints and make sure there are 2 endpoints there. // We poll, since network programming takes times, but the timeout is set for // uniformness with one above. - if err := wait.Poll(250*time.Millisecond, 2*cfg.StableWindow, func() (bool, error) { + if err := wait.PollUntilContextTimeout(context.Background(), 250*time.Millisecond, 2*cfg.StableWindow, true, func(context.Context) (bool, error) { svcEps, err := ctx.clients.KubeClient.CoreV1().Endpoints(test.ServingFlags.TestNamespace).Get( context.Background(), ctx.resources.Revision.Name, /* revision service name is equal to revision name*/ metav1.GetOptions{}) @@ -338,7 +338,7 @@ func TestFastScaleToZero(t *testing.T) { // test allows for up to a minute). The 15s delay is based upon maximum // of 20 runs (11s) + 4s of buffer for reliability. st := time.Now() - if err := wait.PollImmediate(1*time.Second, cfg.ScaleToZeroGracePeriod+15*time.Second, func() (bool, error) { + if err := wait.PollUntilContextTimeout(context.Background(), 1*time.Second, cfg.ScaleToZeroGracePeriod+15*time.Second, true, func(context.Context) (bool, error) { eps, err := ctx.clients.KubeClient.CoreV1().Endpoints(test.ServingFlags.TestNamespace).Get(context.Background(), epsN, metav1.GetOptions{}) if err != nil { return false, err @@ -375,14 +375,14 @@ func TestActivationScale(t *testing.T) { } // initial scale of revision - if err := wait.Poll(1*time.Second, 5*time.Minute, func() (bool, error) { + if err := wait.PollUntilContextTimeout(context.Background(), 1*time.Second, 5*time.Minute, true, func(context.Context) (bool, error) { return *resources.Revision.Status.ActualReplicas > 0, nil }); err != nil { t.Errorf("error: revision never had active pods") } // scale to zero - if err := wait.Poll(1*time.Second, 5*time.Minute, func() (bool, error) { + if err := wait.PollUntilContextTimeout(context.Background(), 1*time.Second, 5*time.Minute, true, func(context.Context) (bool, error) { resources, _ = testv1.GetResourceObjects(clients, *ctx.names) return *resources.Revision.Status.ActualReplicas == 0, nil }); err != nil { @@ -406,7 +406,7 @@ func TestActivationScale(t *testing.T) { } // wait for revision desired replicas to equal activation scale - if err := wait.Poll(1*time.Second, 5*time.Minute, func() (bool, error) { + if err := wait.PollUntilContextTimeout(context.Background(), 1*time.Second, 5*time.Minute, true, func(context.Context) (bool, error) { resources, _ = testv1.GetResourceObjects(clients, *ctx.names) return *resources.Revision.Status.DesiredReplicas == activationScale, nil }); err != nil { diff --git a/test/e2e/destroypod_test.go b/test/e2e/destroypod_test.go index 56facf41cfbd..2434649a2813 100644 --- a/test/e2e/destroypod_test.go +++ b/test/e2e/destroypod_test.go @@ -202,7 +202,7 @@ func TestDestroyPodTimely(t *testing.T) { clients.KubeClient.CoreV1().Pods(test.ServingFlags.TestNamespace).Delete(context.Background(), podToDelete, metav1.DeleteOptions{}) var latestPodState *corev1.Pod - if err := wait.PollImmediate(1*time.Second, revisionTimeout, func() (bool, error) { + if err := wait.PollUntilContextTimeout(context.Background(), 1*time.Second, revisionTimeout, true, func(context.Context) (bool, error) { pod, err := clients.KubeClient.CoreV1().Pods(test.ServingFlags.TestNamespace).Get(context.Background(), podToDelete, metav1.GetOptions{}) if apierrs.IsNotFound(err) { // The podToDelete must be deleted. diff --git a/test/e2e/domainmapping/domain_mapping_test.go b/test/e2e/domainmapping/domain_mapping_test.go index fc429e3f95ab..2469726510fe 100644 --- a/test/e2e/domainmapping/domain_mapping_test.go +++ b/test/e2e/domainmapping/domain_mapping_test.go @@ -129,7 +129,7 @@ func TestBYOCertificate(t *testing.T) { clients.ServingBetaClient.DomainMappings.Delete(ctx, dm.Name, metav1.DeleteOptions{}) }) - err = wait.PollImmediate(test.PollInterval, test.PollTimeout, func() (bool, error) { + err = wait.PollUntilContextTimeout(ctx, test.PollInterval, test.PollTimeout, true, func(context.Context) (bool, error) { dm, err := clients.ServingBetaClient.DomainMappings.Get(ctx, dm.Name, metav1.GetOptions{}) if err != nil { return false, err diff --git a/test/e2e/domainmapping/domain_mapping_websocket_test.go b/test/e2e/domainmapping/domain_mapping_websocket_test.go index ee62d94d0177..2f6911069d15 100644 --- a/test/e2e/domainmapping/domain_mapping_websocket_test.go +++ b/test/e2e/domainmapping/domain_mapping_websocket_test.go @@ -93,7 +93,7 @@ func TestDomainMappingWebsocket(t *testing.T) { clients.ServingBetaClient.DomainMappings.Delete(ctx, dm.Name, metav1.DeleteOptions{}) }) - waitErr := wait.PollImmediate(test.PollInterval, test.PollTimeout, func() (bool, error) { + waitErr := wait.PollUntilContextTimeout(ctx, test.PollInterval, test.PollTimeout, true, func(context.Context) (bool, error) { var err error dm, err := clients.ServingBetaClient.DomainMappings.Get(ctx, dm.Name, metav1.GetOptions{}) if err != nil { diff --git a/test/e2e/e2e.go b/test/e2e/e2e.go index be3ceacfe99f..01a1a4b52d12 100644 --- a/test/e2e/e2e.go +++ b/test/e2e/e2e.go @@ -99,7 +99,7 @@ func waitForActivatorEndpoints(ctx *TestContext) error { wantAct int ) - if rerr := wait.Poll(250*time.Millisecond, time.Minute, func() (bool, error) { + if rerr := wait.PollUntilContextTimeout(context.Background(), 250*time.Millisecond, time.Minute, true, func(context.Context) (bool, error) { // We need to fetch the activator endpoints at every check, since it can change. actEps, err := ctx.clients.KubeClient.CoreV1().Endpoints( system.Namespace()).Get(context.Background(), networking.ActivatorServiceName, metav1.GetOptions{}) diff --git a/test/e2e/externaldomaintls/config/dnssetup/main.go b/test/e2e/externaldomaintls/config/dnssetup/main.go index d215050cbb1c..c5dca41e105f 100644 --- a/test/e2e/externaldomaintls/config/dnssetup/main.go +++ b/test/e2e/externaldomaintls/config/dnssetup/main.go @@ -84,7 +84,7 @@ func waitForDNSRecordVisible(record *config.DNSRecord) error { return err } var lastErr error - if err := wait.PollImmediate(10*time.Second, 300*time.Second, func() (bool, error) { + if err := wait.PollUntilContextTimeout(context.Background(), 10*time.Second, 300*time.Second, true, func(context.Context) (bool, error) { for _, ns := range nameservers { nsIP, err := net.LookupHost(ns.Host) if err != nil { diff --git a/test/e2e/externaldomaintls/config/util.go b/test/e2e/externaldomaintls/config/util.go index 01abbba08cf5..171798386560 100644 --- a/test/e2e/externaldomaintls/config/util.go +++ b/test/e2e/externaldomaintls/config/util.go @@ -71,7 +71,7 @@ func ChangeDNSRecord(change *dns.Change, svc *dns.Service, dnsProject, dnsZone s return err } // Wait for change to be acknowledged. - return wait.PollImmediate(10*time.Second, 5*time.Minute, func() (bool, error) { + return wait.PollUntilContextTimeout(context.Background(), 10*time.Second, 5*time.Minute, true, func(context.Context) (bool, error) { tmp, err := svc.Changes.Get(dnsProject, dnsZone, chg.Id).Do() if err != nil { return false, err diff --git a/test/e2e/externaldomaintls/domain_mapping_test.go b/test/e2e/externaldomaintls/domain_mapping_test.go index 95bdfddc32f3..da358813f492 100644 --- a/test/e2e/externaldomaintls/domain_mapping_test.go +++ b/test/e2e/externaldomaintls/domain_mapping_test.go @@ -123,7 +123,7 @@ func TestDomainMappingExternalDomainTLS(t *testing.T) { }) // Wait for DomainMapping to go Ready. - if waitErr := wait.PollImmediate(test.PollInterval, test.PollTimeout, func() (bool, error) { + if waitErr := wait.PollUntilContextTimeout(ctx, test.PollInterval, test.PollTimeout, true, func(context.Context) (bool, error) { state, err := clients.ServingBetaClient.DomainMappings.Get(ctx, dm.Name, metav1.GetOptions{}) // DomainMapping can go Ready if only http is available. diff --git a/test/e2e/gc/gc_test.go b/test/e2e/gc/gc_test.go index e1c16f39d932..93caa408befe 100644 --- a/test/e2e/gc/gc_test.go +++ b/test/e2e/gc/gc_test.go @@ -81,7 +81,7 @@ func TestRevisionGC(t *testing.T) { // Poll for a minute to see not_found on the original revision. var originalRevision *v1.Revision - err = wait.PollImmediate(5*time.Second, time.Minute, func() (bool, error) { + err = wait.PollUntilContextTimeout(context.Background(), 5*time.Second, time.Minute, true, func(context.Context) (bool, error) { originalRevision, err = clients.ServingClient.Revisions.Get(context.Background(), revision.GetName(), metav1.GetOptions{}) if apierrs.IsNotFound(err) { return true, nil diff --git a/test/e2e/logging_test.go b/test/e2e/logging_test.go index e1f325bd4d71..7413546308a3 100644 --- a/test/e2e/logging_test.go +++ b/test/e2e/logging_test.go @@ -150,7 +150,7 @@ func theOnlyPod(clients *test.Clients, ns, rev string) (corev1.Pod, error) { // waitForLog fetches the logs from a container of a pod decided by the given parameters // until the given condition is meet or timeout. Most of knative logs are in json format. func waitForLog(t *testing.T, clients *test.Clients, ns, podName, container string, condition func(log logLine) bool) error { - return wait.PollImmediate(time.Second, 30*time.Second, func() (bool, error) { + return wait.PollUntilContextTimeout(context.Background(), time.Second, 30*time.Second, true, func(context.Context) (bool, error) { req := clients.KubeClient.CoreV1().Pods(ns).GetLogs(podName, &corev1.PodLogOptions{ Container: container, }) diff --git a/test/e2e/minscale_readiness_test.go b/test/e2e/minscale_readiness_test.go index 9447a1638498..d84e77b239f6 100644 --- a/test/e2e/minscale_readiness_test.go +++ b/test/e2e/minscale_readiness_test.go @@ -211,7 +211,7 @@ func latestRevisionName(t *testing.T, clients *test.Clients, configName, oldRevN func privateServiceName(t *testing.T, clients *test.Clients, revisionName string) string { var privateServiceName string - if err := wait.PollImmediate(time.Second, 1*time.Minute, func() (bool, error) { + if err := wait.PollUntilContextTimeout(context.Background(), time.Second, 1*time.Minute, true, func(context.Context) (bool, error) { sks, err := clients.NetworkingClient.ServerlessServices.Get(context.Background(), revisionName, metav1.GetOptions{}) if err != nil { return false, nil @@ -231,7 +231,7 @@ func waitForDesiredScale(clients *test.Clients, serviceName string, cond func(in endpoints := clients.KubeClient.CoreV1().Endpoints(test.ServingFlags.TestNamespace) // See https://github.com/knative/serving/issues/7727#issuecomment-706772507 for context. - return latestReady, wait.PollImmediate(250*time.Millisecond, 3*time.Minute, func() (bool, error) { + return latestReady, wait.PollUntilContextTimeout(context.Background(), 250*time.Millisecond, 3*time.Minute, true, func(context.Context) (bool, error) { endpoint, err := endpoints.Get(context.Background(), serviceName, metav1.GetOptions{}) if err != nil { return false, nil @@ -244,7 +244,7 @@ func waitForDesiredScale(clients *test.Clients, serviceName string, cond func(in func ensureDesiredScale(clients *test.Clients, t *testing.T, serviceName string, cond func(int) bool) (latestReady int, observed bool) { endpoints := clients.KubeClient.CoreV1().Endpoints(test.ServingFlags.TestNamespace) - err := wait.PollImmediate(250*time.Millisecond, 10*time.Second, func() (bool, error) { + err := wait.PollUntilContextTimeout(context.Background(), 250*time.Millisecond, 10*time.Second, true, func(context.Context) (bool, error) { endpoint, err := endpoints.Get(context.Background(), serviceName, metav1.GetOptions{}) if err != nil { return false, nil @@ -256,9 +256,9 @@ func ensureDesiredScale(clients *test.Clients, t *testing.T, serviceName string, return false, nil }) - if !errors.Is(err, wait.ErrWaitTimeout) { + if !errors.Is(err, context.DeadlineExceeded) { t.Log("PollError =", err) } - return latestReady, errors.Is(err, wait.ErrWaitTimeout) + return latestReady, errors.Is(err, context.DeadlineExceeded) } diff --git a/test/e2e/websocket.go b/test/e2e/websocket.go index 9d9e5a4f1f7b..47055798b034 100644 --- a/test/e2e/websocket.go +++ b/test/e2e/websocket.go @@ -66,7 +66,7 @@ func connect(t *testing.T, clients *test.Clients, domain, timeout string) (*webs } var conn *websocket.Conn - waitErr := wait.PollImmediate(connectRetryInterval, connectTimeout, func() (bool, error) { + waitErr := wait.PollUntilContextTimeout(context.Background(), connectRetryInterval, connectTimeout, true, func(context.Context) (bool, error) { t.Logf("Connecting using websocket: url=%s, host=%s", u.String(), domain) dialer := &websocket.Dialer{ Proxy: http.ProxyFromEnvironment, diff --git a/test/ha/ha.go b/test/ha/ha.go index 1a12b24cb9b8..3183f433fe68 100644 --- a/test/ha/ha.go +++ b/test/ha/ha.go @@ -80,7 +80,7 @@ func assertServiceEventuallyWorks(t *testing.T, clients *test.Clients, names tes func waitForEndpointsState(client kubernetes.Interface, svcName, svcNamespace string, inState func(*corev1.Endpoints) (bool, error)) error { endpointsService := client.CoreV1().Endpoints(svcNamespace) - return wait.PollImmediate(test.PollInterval, test.PollTimeout, func() (bool, error) { + return wait.PollUntilContextTimeout(context.Background(), test.PollInterval, test.PollTimeout, true, func(context.Context) (bool, error) { endpoint, err := endpointsService.Get(context.Background(), svcName, metav1.GetOptions{}) if err != nil { return false, err diff --git a/test/performance/performance/performance.go b/test/performance/performance/performance.go index 22b096f7c66c..75de23efbec5 100644 --- a/test/performance/performance/performance.go +++ b/test/performance/performance/performance.go @@ -62,7 +62,7 @@ func ProbeTargetTillReady(target string, duration time.Duration) error { func WaitForScaleToZero(ctx context.Context, namespace string, selector labels.Selector, duration time.Duration) error { pl := podinformer.Get(ctx).Lister() begin := time.Now() - return wait.PollImmediate(time.Second, duration, func() (bool, error) { + return wait.PollUntilContextTimeout(ctx, time.Second, duration, true, func(context.Context) (bool, error) { pods, err := pl.Pods(namespace).List(selector) if err != nil { return false, err diff --git a/test/v1/configuration.go b/test/v1/configuration.go index bfdce19b5426..b5b2fedf8e5b 100644 --- a/test/v1/configuration.go +++ b/test/v1/configuration.go @@ -161,7 +161,7 @@ func WaitForConfigurationState(client *test.ServingClients, name string, inState defer span.End() var lastState *v1.Configuration - waitErr := wait.PollImmediate(test.PollInterval, test.PollTimeout, func() (bool, error) { + waitErr := wait.PollUntilContextTimeout(context.Background(), test.PollInterval, test.PollTimeout, true, func(context.Context) (bool, error) { err := reconciler.RetryTestErrors(func(int) (err error) { lastState, err = client.Configs.Get(context.Background(), name, metav1.GetOptions{}) return err diff --git a/test/v1/revision.go b/test/v1/revision.go index 6fb1785557b7..591ddf94f256 100644 --- a/test/v1/revision.go +++ b/test/v1/revision.go @@ -39,7 +39,7 @@ func WaitForRevisionState(client *test.ServingClients, name string, inState func defer span.End() var lastState *v1.Revision - waitErr := wait.PollImmediate(test.PollInterval, test.PollTimeout, func() (bool, error) { + waitErr := wait.PollUntilContextTimeout(context.Background(), test.PollInterval, test.PollTimeout, true, func(context.Context) (bool, error) { var err error lastState, err = client.Revisions.Get(context.Background(), name, metav1.GetOptions{}) if err != nil { diff --git a/test/v1/route.go b/test/v1/route.go index 1a4c97d66a87..5cfc1ff66ad3 100644 --- a/test/v1/route.go +++ b/test/v1/route.go @@ -90,7 +90,7 @@ func WaitForRouteState(client *test.ServingClients, name string, inState func(r defer span.End() var lastState *v1.Route - waitErr := wait.PollImmediate(test.PollInterval, test.PollTimeout, func() (bool, error) { + waitErr := wait.PollUntilContextTimeout(context.Background(), test.PollInterval, test.PollTimeout, true, func(context.Context) (bool, error) { err := reconciler.RetryTestErrors(func(int) (err error) { lastState, err = client.Routes.Get(context.Background(), name, metav1.GetOptions{}) return err diff --git a/test/v1/service.go b/test/v1/service.go index d2b45e57b406..9d8db16c8e8c 100644 --- a/test/v1/service.go +++ b/test/v1/service.go @@ -258,7 +258,7 @@ func WaitForServiceState(client *test.ServingClients, name string, inState func( defer span.End() var lastState *v1.Service - waitErr := wait.PollImmediate(test.PollInterval, test.PollTimeout, func() (bool, error) { + waitErr := wait.PollUntilContextTimeout(context.Background(), test.PollInterval, test.PollTimeout, true, func(context.Context) (bool, error) { err := reconciler.RetryTestErrors(func(int) (err error) { lastState, err = client.Services.Get(context.Background(), name, metav1.GetOptions{}) return err