Skip to content

Commit

Permalink
sync works when the watcher creating
Browse files Browse the repository at this point in the history
Signed-off-by: Wei Liu <[email protected]>
  • Loading branch information
skeeey committed Jun 18, 2024
1 parent f9be517 commit 79b986b
Show file tree
Hide file tree
Showing 12 changed files with 385 additions and 130 deletions.
4 changes: 2 additions & 2 deletions cmd/maestro/agent/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func NewAgentCommand() *cobra.Command {
agentOption.MaxJSONRawLength = maxJSONRawLength
agentOption.CloudEventsClientCodecs = []string{"manifest", "manifestbundle"}
cfg := spoke.NewWorkAgentConfig(commonOptions, agentOption)
cmdConfig := commonOptions.CommoOpts.
cmdConfig := commonOptions.CommonOpts.
NewControllerCommandConfig("maestro-agent", version.Get(), cfg.RunWorkloadAgent)

cmd := cmdConfig.NewCommandWithContext(context.TODO())
Expand Down Expand Up @@ -68,6 +68,6 @@ func NewAgentCommand() *cobra.Command {
func addFlags(fs *pflag.FlagSet) {
fs.StringVar(&commonOptions.SpokeClusterName, "consumer-name",
commonOptions.SpokeClusterName, "Name of the consumer")
fs.BoolVar(&commonOptions.CommoOpts.CmdConfig.DisableLeaderElection, "disable-leader-election",
fs.BoolVar(&commonOptions.CommonOpts.CmdConfig.DisableLeaderElection, "disable-leader-election",
true, "Disable leader election.")
}
2 changes: 1 addition & 1 deletion examples/manifestworkclient/client-a/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func main() {
WithClientID(fmt.Sprintf("%s-client-a", sourceID)).
WithSourceID(sourceID).
WithCodecs(codec.NewManifestBundleCodec()).
WithWorkClientWatcherStore(grpcsource.NewRESTFullAPIWatcherStore(maestroAPIClient, sourceID)).
WithWorkClientWatcherStore(grpcsource.NewRESTFulAPIWatcherStore(ctx, maestroAPIClient, sourceID)).
WithResyncEnabled(false).
NewSourceClientHolder(ctx)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion examples/manifestworkclient/client-b/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func main() {
WithClientID(fmt.Sprintf("%s-client-b", sourceID)).
WithSourceID(sourceID).
WithCodecs(codec.NewManifestBundleCodec()).
WithWorkClientWatcherStore(grpcsource.NewRESTFullAPIWatcherStore(maestroAPIClient, sourceID)).
WithWorkClientWatcherStore(grpcsource.NewRESTFulAPIWatcherStore(ctx, maestroAPIClient, sourceID)).
WithResyncEnabled(false).
NewSourceClientHolder(ctx)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ require (
k8s.io/component-base v0.29.3
k8s.io/klog/v2 v2.120.1
open-cluster-management.io/api v0.13.1-0.20240605083248-f9e7f50520fc
open-cluster-management.io/ocm v0.13.1-0.20240612012446-8e792c14d8f4
open-cluster-management.io/sdk-go v0.13.1-0.20240607073142-990fcdba50a6
open-cluster-management.io/ocm v0.13.1-0.20240618054845-e2a7b9e78b33
open-cluster-management.io/sdk-go v0.13.1-0.20240618022514-b2c1dd175afd
)

require (
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1175,10 +1175,10 @@ k8s.io/utils v0.0.0-20240310230437-4693a0247e57 h1:gbqbevonBh57eILzModw6mrkbwM0g
k8s.io/utils v0.0.0-20240310230437-4693a0247e57/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0=
open-cluster-management.io/api v0.13.1-0.20240605083248-f9e7f50520fc h1:tcfncubZRFphYtDXBE7ApBNlSnj1RNazhW+8F01XYYg=
open-cluster-management.io/api v0.13.1-0.20240605083248-f9e7f50520fc/go.mod h1:ltijKJhDifrPH0csvCUmFt5lzaERv+BBfh6X3l83rT0=
open-cluster-management.io/ocm v0.13.1-0.20240612012446-8e792c14d8f4 h1:Z3gwbMUZmblGTHFx6iOO1zlCJc2FJcAJsa2RASTKDqA=
open-cluster-management.io/ocm v0.13.1-0.20240612012446-8e792c14d8f4/go.mod h1:RuYCuKuVJzNxRBkSoQnxyJxyUqOyCH388DlR/QDr7rE=
open-cluster-management.io/sdk-go v0.13.1-0.20240607073142-990fcdba50a6 h1:/nPyxceSdi66Vs+A9LJ+7X6kLe4xK98dBMi4adZv1d0=
open-cluster-management.io/sdk-go v0.13.1-0.20240607073142-990fcdba50a6/go.mod h1:muWzHWsgK8IsopltwTnsBjf4DN9IcC9rF0G2uEq/Pjw=
open-cluster-management.io/ocm v0.13.1-0.20240618054845-e2a7b9e78b33 h1:7uPjyn1x25QZIzfZqeSFfZdNrzc2hlHm6t/JKYKu9fI=
open-cluster-management.io/ocm v0.13.1-0.20240618054845-e2a7b9e78b33/go.mod h1:KzUwhPZAg6Wq+4xRu10fVVpqNADyz5CtRW4ziqIC2z4=
open-cluster-management.io/sdk-go v0.13.1-0.20240618022514-b2c1dd175afd h1:kTVZOR7bTdh4ID7EoliyGhPR5CItpx8GehN581IxoPA=
open-cluster-management.io/sdk-go v0.13.1-0.20240618022514-b2c1dd175afd/go.mod h1:muWzHWsgK8IsopltwTnsBjf4DN9IcC9rF0G2uEq/Pjw=
rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8=
rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0=
rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA=
Expand Down
79 changes: 79 additions & 0 deletions pkg/client/cloudevents/grpcsource/watch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package grpcsource

import (
"sync"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/klog/v2"
workv1 "open-cluster-management.io/api/work/v1"
)

// workWatcher implements the watch.Interface.
type workWatcher struct {
sync.RWMutex

result chan watch.Event
done chan struct{}
stopped bool

namespace string
}

var _ watch.Interface = &workWatcher{}

func newWorkWatcher(namespace string) *workWatcher {
return &workWatcher{
result: make(chan watch.Event),
done: make(chan struct{}),
namespace: namespace,
}
}

// ResultChan implements Interface.
func (w *workWatcher) ResultChan() <-chan watch.Event {
return w.result
}

// Stop implements Interface.
func (w *workWatcher) Stop() {
// Call Close() exactly once by locking and setting a flag.
w.Lock()
defer w.Unlock()
// closing a closed channel always panics, therefore check before closing
select {
case <-w.done:
close(w.result)
default:
w.stopped = true
close(w.done)
}
}

// Receive an event and sends down the result channel.
func (w *workWatcher) Receive(evt watch.Event) {
if w.isStopped() {
// this watcher is stopped, do nothing.
return
}

work, ok := evt.Object.(*workv1.ManifestWork)
if !ok {
klog.Errorf("unknown event object type %T", evt.Object)
return
}

if w.namespace != metav1.NamespaceAll && w.namespace != work.Namespace {
klog.V(4).Infof("ignore the work %s/%s for the watcher %s", work.Namespace, work.Name, w.namespace)
return
}

w.result <- evt
}

func (w *workWatcher) isStopped() bool {
w.RLock()
defer w.RUnlock()

return w.stopped
}
164 changes: 122 additions & 42 deletions pkg/client/cloudevents/grpcsource/watcherstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,19 @@ import (
"context"
"fmt"
"net/http"
"strings"
"sync"
"time"

"github.com/openshift-online/maestro/pkg/api/openapi"

"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"

workv1 "open-cluster-management.io/api/work/v1"

Expand All @@ -28,68 +33,87 @@ import (
type RESTFulAPIWatcherStore struct {
sync.RWMutex

result chan watch.Event
done chan struct{}
watcherStopped bool

sourceID string
apiClient *openapi.APIClient

watchers map[string]*workWatcher
workQueue cache.Queue
}

var _ store.WorkClientWatcherStore = &RESTFulAPIWatcherStore{}

func NewRESTFullAPIWatcherStore(apiClient *openapi.APIClient, sourceID string) *RESTFulAPIWatcherStore {
return &RESTFulAPIWatcherStore{
result: make(chan watch.Event),
done: make(chan struct{}),
watcherStopped: false,

func NewRESTFulAPIWatcherStore(ctx context.Context, apiClient *openapi.APIClient, sourceID string) *RESTFulAPIWatcherStore {
s := &RESTFulAPIWatcherStore{
sourceID: sourceID,
apiClient: apiClient,
watchers: make(map[string]*workWatcher),
workQueue: cache.NewFIFO(func(obj interface{}) (string, error) {
work, ok := obj.(*workv1.ManifestWork)
if !ok {
return "", fmt.Errorf("unknown object type %T", obj)
}

// ensure there is only one object in the queue for a work
return string(work.UID), nil
}),
}
}

// ResultChan implements watch interface.
func (m *RESTFulAPIWatcherStore) ResultChan() <-chan watch.Event {
return m.result
// start a goroutine to send works to the watcher
go wait.Until(s.process, time.Second, ctx.Done())

return s
}

// Stop implements watch interface.
func (m *RESTFulAPIWatcherStore) Stop() {
// Call Close() exactly once by locking and setting a flag.
m.Lock()
defer m.Unlock()
// closing a closed channel always panics, therefore check before closing
select {
case <-m.done:
close(m.result)
default:
m.watcherStopped = true
close(m.done)
// GetWatcher returns a watcher to the source work client with a specified namespace (consumer name).
// Using `metav1.NamespaceAll` to specify all namespaces.
func (m *RESTFulAPIWatcherStore) GetWatcher(namespace string, opts metav1.ListOptions) (watch.Interface, error) {
// Only list works from maestro server with the given namespace when a watcher is required
search := []string{fmt.Sprintf("source = '%s'", m.sourceID)}
if namespace != metav1.NamespaceAll {
search = append(search, fmt.Sprintf("consumer_name = '%s'", namespace))
}

rbs, _, err := m.apiClient.DefaultApi.ApiMaestroV1ResourceBundlesGet(context.Background()).
Search(strings.Join(search, " and ")).
Page(1).
Size(-1).
Execute()
if err != nil {
return nil, err
}

// save the works to a queue
for _, rb := range rbs.Items {
work, err := ToManifestWork(&rb)
if err != nil {
return nil, err
}

if err := m.workQueue.Add(work); err != nil {
return nil, err
}
}

return m.registerWatcher(namespace), nil
}

// HandleReceivedWork sends the received works to the watch channel
func (m *RESTFulAPIWatcherStore) HandleReceivedWork(action types.ResourceAction, work *workv1.ManifestWork) error {
if m.isWatcherStopped() {
// watcher is stopped, do nothing.
return nil
}

switch action {
case types.StatusModified:
watchType := watch.Modified
if meta.IsStatusConditionTrue(work.Status.Conditions, common.ManifestsDeleted) {
watchType = watch.Deleted
}

m.result <- watch.Event{Type: watchType, Object: work}
m.sendWatchEvent(watch.Event{Type: watchType, Object: work})
return nil
default:
return fmt.Errorf("unknown resource action %s", action)
}
}

// Get a work from maestro server with its namespace and name
func (m *RESTFulAPIWatcherStore) Get(namespace, name string) (*workv1.ManifestWork, error) {
id := utils.UID(m.sourceID, namespace, name)
rb, resp, err := m.apiClient.DefaultApi.ApiMaestroV1ResourceBundlesIdGet(context.Background(), id).Execute()
Expand All @@ -104,22 +128,30 @@ func (m *RESTFulAPIWatcherStore) Get(namespace, name string) (*workv1.ManifestWo
return ToManifestWork(rb)
}

func (m *RESTFulAPIWatcherStore) List(opts metav1.ListOptions) ([]*workv1.ManifestWork, error) {
// List works from maestro server with a specified namespace and list options.
// Using `metav1.NamespaceAll` to specify all namespace
func (m *RESTFulAPIWatcherStore) List(namespace string, opts metav1.ListOptions) ([]*workv1.ManifestWork, error) {
works := []*workv1.ManifestWork{}

// TODO consider how to support configuring page
var page int32 = 1

var size int32 = -1
if opts.Limit > 0 {
size = int32(opts.Limit)
}

apiRequest := m.apiClient.DefaultApi.ApiMaestroV1ResourceBundlesGet(context.Background()).
Search(fmt.Sprintf("source = '%s'", m.sourceID)).
Page(1). // TODO consider how to support this
Size(size)

// TODO filter works by labels
search := []string{fmt.Sprintf("source = '%s'", m.sourceID)}
if namespace != metav1.NamespaceAll {
search = append(search, fmt.Sprintf("consumer_name = '%s'", namespace))
}

rbs, _, err := apiRequest.Execute()
rbs, _, err := m.apiClient.DefaultApi.ApiMaestroV1ResourceBundlesGet(context.Background()).
Search(strings.Join(search, " and ")).
Page(page).
Size(size).
Execute()
if err != nil {
return nil, err
}
Expand All @@ -137,7 +169,7 @@ func (m *RESTFulAPIWatcherStore) List(opts metav1.ListOptions) ([]*workv1.Manife
}

func (m *RESTFulAPIWatcherStore) ListAll() ([]*workv1.ManifestWork, error) {
return m.List(metav1.ListOptions{})
return m.List(metav1.NamespaceAll, metav1.ListOptions{})
}

func (m *RESTFulAPIWatcherStore) Add(work *workv1.ManifestWork) error {
Expand All @@ -159,9 +191,57 @@ func (m *RESTFulAPIWatcherStore) HasInitiated() bool {
return true
}

func (m *RESTFulAPIWatcherStore) isWatcherStopped() bool {
// process drains the work queue and send the work to the watch channel.
func (m *RESTFulAPIWatcherStore) process() {
for {
// this will be blocked until the work queue has works
obj, err := m.workQueue.Pop(func(interface{}, bool) error {
// do nothing
return nil
})
if err != nil {
if err == cache.ErrFIFOClosed {
return
}

klog.Warningf("failed to pop the %v requeue it, %v", obj, err)
// this is the safe way to re-enqueue.
if err := m.workQueue.AddIfNotPresent(obj); err != nil {
klog.Errorf("failed to requeue the obj %v, %v", obj, err)
return
}
}

work, ok := obj.(*workv1.ManifestWork)
if !ok {
klog.Errorf("unknown the object type %T from the event queue", obj)
return
}

m.sendWatchEvent(watch.Event{Type: watch.Modified, Object: work})
}
}

func (m *RESTFulAPIWatcherStore) registerWatcher(namespace string) watch.Interface {
m.Lock()
defer m.Unlock()

watcher, ok := m.watchers[namespace]
if ok {
return watcher
}

watcher = newWorkWatcher(namespace)
m.watchers[namespace] = watcher
return watcher
}

func (m *RESTFulAPIWatcherStore) sendWatchEvent(evt watch.Event) {
m.RLock()
defer m.RUnlock()

return m.watcherStopped
for _, w := range m.watchers {
// this will be blocked until this work is consumed
w.Receive(evt)
}
}
Loading

0 comments on commit 79b986b

Please sign in to comment.