Skip to content

Commit

Permalink
fix maestro server resync after reconnecting.
Browse files Browse the repository at this point in the history
Signed-off-by: morvencao <[email protected]>
  • Loading branch information
morvencao committed Jun 21, 2024
1 parent 85524b1 commit 3ab80a4
Show file tree
Hide file tree
Showing 5 changed files with 253 additions and 1 deletion.
5 changes: 5 additions & 0 deletions pkg/client/cloudevents/source_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type SourceClient interface {
OnDelete(ctx context.Context, id string) error
Subscribe(ctx context.Context, handlers ...cegeneric.ResourceHandler[*api.Resource])
Resync(ctx context.Context, consumers []string) error
ReconnectedChan() <-chan struct{}
}

type SourceClientImpl struct {
Expand Down Expand Up @@ -145,6 +146,10 @@ func (s *SourceClientImpl) Resync(ctx context.Context, consumers []string) error
return nil
}

func (s *SourceClientImpl) ReconnectedChan() <-chan struct{} {
return s.CloudEventSourceClient.ReconnectedChan()
}

func ResourceStatusHashGetter(res *api.Resource) (string, error) {
status, err := api.DecodeStatus(res.Status)
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions pkg/client/cloudevents/source_client_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,3 +160,7 @@ func (s *SourceClientMock) Subscribe(ctx context.Context, handlers ...cegeneric.
func (s *SourceClientMock) Resync(ctx context.Context, consumers []string) error {
return nil
}

func (s *SourceClientMock) ReconnectedChan() <-chan struct{} {
return nil
}
20 changes: 20 additions & 0 deletions pkg/dispatcher/hash_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,31 @@ func (d *HashDispatcher) Start(ctx context.Context) {
// start a goroutine to periodically check the instances and consumers.
go wait.UntilWithContext(ctx, d.check, 5*time.Second)

// start a goroutine to resync current consumers for this source when the client is reconnected
go d.resyncOnReconnect(ctx)

// wait until context is canceled
<-ctx.Done()
d.workQueue.ShutDown()
}

// resyncOnReconnect listens for the client reconnected signal and resyncs current consumers for this source.
func (d *HashDispatcher) resyncOnReconnect(ctx context.Context) {
log := logger.NewOCMLogger(ctx)
// receive client reconnect signal and resync current consumers for this source
for {
select {
case <-ctx.Done():
return
case <-d.sourceClient.ReconnectedChan():
// when receiving a client reconnected signal, we resync current consumers for this source
if err := d.sourceClient.Resync(ctx, d.consumerSet.ToSlice()); err != nil {
log.Error(fmt.Sprintf("failed to resync resourcs status for consumers (%s), %v", d.consumerSet.ToSlice(), err))
}
}
}
}

// Dispatch checks if the provided consumer ID is owned by the current maestro instance.
// It returns true if the consumer is part of the current instance's consumer set;
// otherwise, it returns false.
Expand Down
31 changes: 31 additions & 0 deletions pkg/dispatcher/noop_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/openshift-online/maestro/pkg/client/cloudevents"
"github.com/openshift-online/maestro/pkg/dao"
"github.com/openshift-online/maestro/pkg/logger"
)

var _ Dispatcher = &NoopDispatcher{}
Expand All @@ -28,6 +29,36 @@ func NewNoopDispatcher(consumerDao dao.ConsumerDao, sourceClient cloudevents.Sou

// Start is a no-op implementation.
func (d *NoopDispatcher) Start(ctx context.Context) {
// handle client reconnected signal and resync status from consumers for this source
d.resyncOnReconnect(ctx)
}

// resyncOnReconnect listens for client reconnected signal and resyncs all consumers for this source.
func (d *NoopDispatcher) resyncOnReconnect(ctx context.Context) {
log := logger.NewOCMLogger(ctx)
// receive client reconnect signal and resync current consumers for this source
for {
select {
case <-ctx.Done():
return
case <-d.sourceClient.ReconnectedChan():
// when receiving a client reconnected signal, we resync all consumers for this source
// TODO: optimize this to only resync resource status for necessary consumers
consumerIDs := []string{}
consumers, err := d.consumerDao.All(ctx)
if err != nil {
log.Error(fmt.Sprintf("failed to get all consumers: %v", err))
continue
}

for _, c := range consumers {
consumerIDs = append(consumerIDs, c.ID)
}
if err := d.sourceClient.Resync(ctx, consumerIDs); err != nil {
log.Error(fmt.Sprintf("failed to resync resourcs status for consumers (%s), %v", consumerIDs, err))
}
}
}
}

// Dispatch always returns true, indicating that the current maestro instance should process the resource status update.
Expand Down
194 changes: 193 additions & 1 deletion test/e2e/pkg/status_resync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
)

var _ = Describe("Status resync", Ordered, Label("e2e-tests-status-resync"), func() {

var resource *openapi.Resource
var maestroServerReplicas int
var mqttReplicas, maestroServerReplicas int

Context("Resource resync resource status after maestro server restarts", func() {

Expand Down Expand Up @@ -184,4 +185,195 @@ var _ = Describe("Status resync", Ordered, Label("e2e-tests-status-resync"), fun
})

})

Context("Resource resync resource status after maestro server reconnects", func() {

It("post the nginx resource with non-default service account to the maestro api", func() {

res := helper.NewAPIResourceWithSA(consumer_name, 1, "nginx")
var resp *http.Response
var err error
resource, resp, err = apiClient.DefaultApi.ApiMaestroV1ResourcesPost(context.Background()).Resource(res).Execute()
Expect(err).ShouldNot(HaveOccurred())
Expect(resp.StatusCode).To(Equal(http.StatusCreated))
Expect(*resource.Id).ShouldNot(BeEmpty())

Eventually(func() error {
deploy, err := kubeClient.AppsV1().Deployments("default").Get(context.Background(), "nginx", metav1.GetOptions{})
if err != nil {
return err
}
if *deploy.Spec.Replicas != 1 {
return fmt.Errorf("unexpected replicas, expected 1, got %d", *deploy.Spec.Replicas)
}
return nil
}, 1*time.Minute, 1*time.Second).ShouldNot(HaveOccurred())

gotResource, resp, err := apiClient.DefaultApi.ApiMaestroV1ResourcesIdGet(context.Background(), *resource.Id).Execute()
Expect(err).ShouldNot(HaveOccurred())
Expect(resp.StatusCode).To(Equal(http.StatusOK))
Expect(*gotResource.Id).To(Equal(*resource.Id))
Expect(*gotResource.Version).To(Equal(*resource.Version))

Eventually(func() error {
gotResource, _, err := apiClient.DefaultApi.ApiMaestroV1ResourcesIdGet(context.Background(), *resource.Id).Execute()
if err != nil {
return err
}
statusJSON, err := json.Marshal(gotResource.Status)
if err != nil {
return err
}
if !strings.Contains(string(statusJSON), "error looking up service account default/nginx") {
return fmt.Errorf("unexpected status, expected error looking up service account default/nginx, got %s", string(statusJSON))
}
return nil
}, 1*time.Minute, 1*time.Second).ShouldNot(HaveOccurred())
})

It("delete the mqtt-broker service for server", func() {

err := kubeClient.CoreV1().Services("maestro").Delete(context.Background(), "maestro-mqtt-server", metav1.DeleteOptions{})
Expect(err).ShouldNot(HaveOccurred())
})

It("create default/nginx serviceaccount", func() {

_, err := kubeClient.CoreV1().ServiceAccounts("default").Create(context.Background(), &corev1.ServiceAccount{
ObjectMeta: metav1.ObjectMeta{
Name: "nginx",
},
}, metav1.CreateOptions{})
Expect(err).ShouldNot(HaveOccurred())

// delete the nginx deployment to tigger recreating
err = kubeClient.AppsV1().Deployments("default").Delete(context.Background(), "nginx", metav1.DeleteOptions{})
Expect(err).ShouldNot(HaveOccurred())
})

It("Rollout the mqtt-broker", func() {

deploy, err := kubeClient.AppsV1().Deployments("maestro").Get(context.Background(), "maestro-mqtt", metav1.GetOptions{})
Expect(err).ShouldNot(HaveOccurred())
mqttReplicas = int(*deploy.Spec.Replicas)
deploy, err = kubeClient.AppsV1().Deployments("maestro").Patch(context.Background(), "maestro-mqtt", types.MergePatchType, []byte(`{"spec":{"replicas":0}}`), metav1.PatchOptions{
FieldManager: "testKubeClient",
})
Expect(err).ShouldNot(HaveOccurred())
Expect(*deploy.Spec.Replicas).To(Equal(int32(0)))

// ensure no running mqtt-broker pods
Eventually(func() error {
pods, err := kubeClient.CoreV1().Pods("maestro").List(context.Background(), metav1.ListOptions{
LabelSelector: "name=maestro-mqtt",
})
if err != nil {
return err
}
if len(pods.Items) > 0 {
return fmt.Errorf("maestro-mqtt pods still running")
}
return nil
}, 1*time.Minute, 1*time.Second).ShouldNot(HaveOccurred())

// patch mqtt-broker replicas to mqttReplicas
deploy, err = kubeClient.AppsV1().Deployments("maestro").Patch(context.Background(), "maestro-mqtt", types.MergePatchType, []byte(fmt.Sprintf(`{"spec":{"replicas":%d}}`, mqttReplicas)), metav1.PatchOptions{
FieldManager: "testKubeClient",
})
Expect(err).ShouldNot(HaveOccurred())
Expect(*deploy.Spec.Replicas).To(Equal(int32(mqttReplicas)))

// ensure mqtt-broker pod is up and running
Eventually(func() error {
pods, err := kubeClient.CoreV1().Pods("maestro").List(context.Background(), metav1.ListOptions{
LabelSelector: "name=maestro-mqtt",
})
if err != nil {
return err
}
if len(pods.Items) != mqttReplicas {
return fmt.Errorf("unexpected maestro-mqtt pod count, expected %d, got %d", mqttReplicas, len(pods.Items))
}
for _, pod := range pods.Items {
if pod.Status.Phase != "Running" {
return fmt.Errorf("maestro-mqtt pod not in running state")
}
if pod.Status.ContainerStatuses[0].State.Running == nil {
return fmt.Errorf("maestro-mqtt container not in running state")
}
}
return nil
}, 1*time.Minute, 1*time.Second).ShouldNot(HaveOccurred())
})

It("recreate the mqtt-broker service for server", func() {

mqttServerService := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "maestro-mqtt-server",
Namespace: "maestro",
},
Spec: corev1.ServiceSpec{
Selector: map[string]string{
"name": "maestro-mqtt",
},
Ports: []corev1.ServicePort{
{
Name: "mosquitto",
Protocol: corev1.ProtocolTCP,
Port: 1883,
TargetPort: intstr.FromInt(1883),
},
},
Type: corev1.ServiceTypeClusterIP,
},
}

_, err := kubeClient.CoreV1().Services("maestro").Create(context.Background(), mqttServerService, metav1.CreateOptions{})
Expect(err).ShouldNot(HaveOccurred())
})

It("ensure the resource status is resynced", func() {

Eventually(func() error {
gotResource, _, err := apiClient.DefaultApi.ApiMaestroV1ResourcesIdGet(context.Background(), *resource.Id).Execute()
if err != nil {
return err
}
if _, ok := gotResource.Status["ContentStatus"]; !ok {
return fmt.Errorf("unexpected status, expected contains ContentStatus, got %v", gotResource.Status)
}
statusJSON, err := json.Marshal(gotResource.Status)
if err != nil {
return err
}
if strings.Contains(string(statusJSON), "error looking up service account default/nginx") {
return fmt.Errorf("unexpected status, should not contain error looking up service account default/nginx, got %s", string(statusJSON))
}
return nil
}, 3*time.Minute, 3*time.Second).ShouldNot(HaveOccurred())
})

It("delete the nginx resource", func() {

resp, err := apiClient.DefaultApi.ApiMaestroV1ResourcesIdDelete(context.Background(), *resource.Id).Execute()
Expect(err).ShouldNot(HaveOccurred())
Expect(resp.StatusCode).To(Equal(http.StatusNoContent))

Eventually(func() error {
_, err := kubeClient.AppsV1().Deployments("default").Get(context.Background(), "nginx", metav1.GetOptions{})
if err != nil {
if errors.IsNotFound(err) {
return nil
}
return err
}
return fmt.Errorf("nginx deployment still exists")
}, 1*time.Minute, 1*time.Second).ShouldNot(HaveOccurred())

err = kubeClient.CoreV1().ServiceAccounts("default").Delete(context.Background(), "nginx", metav1.DeleteOptions{})
Expect(err).ShouldNot(HaveOccurred())
})

})
})

0 comments on commit 3ab80a4

Please sign in to comment.