Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support full sync when the work watcher creating or the work client reconnected #133

Merged
merged 1 commit into from
Jun 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cmd/maestro/agent/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func NewAgentCommand() *cobra.Command {
agentOption.MaxJSONRawLength = maxJSONRawLength
agentOption.CloudEventsClientCodecs = []string{"manifest", "manifestbundle"}
cfg := spoke.NewWorkAgentConfig(commonOptions, agentOption)
cmdConfig := commonOptions.CommoOpts.
cmdConfig := commonOptions.CommonOpts.
NewControllerCommandConfig("maestro-agent", version.Get(), cfg.RunWorkloadAgent)

cmd := cmdConfig.NewCommandWithContext(context.TODO())
Expand Down Expand Up @@ -68,6 +68,6 @@ func NewAgentCommand() *cobra.Command {
func addFlags(fs *pflag.FlagSet) {
fs.StringVar(&commonOptions.SpokeClusterName, "consumer-name",
commonOptions.SpokeClusterName, "Name of the consumer")
fs.BoolVar(&commonOptions.CommoOpts.CmdConfig.DisableLeaderElection, "disable-leader-election",
fs.BoolVar(&commonOptions.CommonOpts.CmdConfig.DisableLeaderElection, "disable-leader-election",
true, "Disable leader election.")
}
19 changes: 8 additions & 11 deletions examples/manifestworkclient/README.md
Original file line number Diff line number Diff line change
@@ -1,24 +1,21 @@
# gRPC Source ManifestWork Client

This example shows how to build a source ManifestWork client with `RESTFullAPIWatcherStore` and watch/create/get/update/delete works by the client.
This example shows how to build a source ManifestWork client with Maestro gRPC service and watch/create/get/update/delete works by this client.

## Build the client

Using sdk-go to build a source ManifestWork client with `RESTFullAPIWatcherStore`

```golang
sourceID := "mw-client-example"

workClient, err := work.NewClientHolderBuilder(grpcOptions).
WithClientID(fmt.Sprintf("%s-client", sourceID)).
WithSourceID(sourceID).
WithCodecs(codec.NewManifestBundleCodec()).
WithWorkClientWatcherStore(grpcsource.NewRESTFullAPIWatcherStore(ctx, maestroAPIClient, sourceID)).
WithResyncEnabled(false).
NewSourceClientHolder(ctx)
workClient, err := grpcsource.NewMaestroGRPCSourceWorkClient(
ctx,
maestroAPIClient,
maestroGRPCOptions,
sourceID,
)

if err != nil {
log.Fatal(err)
log.Fatal(err)
}

// watch/create/patch/get/delete/list by workClient
Expand Down
37 changes: 19 additions & 18 deletions examples/manifestworkclient/client-a/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ import (

workv1 "open-cluster-management.io/api/work/v1"
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc"
"open-cluster-management.io/sdk-go/pkg/cloudevents/work"
"open-cluster-management.io/sdk-go/pkg/cloudevents/work/source/codec"
)

const sourceID = "mw-client-example"
Expand All @@ -31,6 +29,7 @@ var (
maestroServerAddr = flag.String("maestro-server", "https://127.0.0.1:30080", "The Maestro server address")
grpcServerAddr = flag.String("grpc-server", "127.0.0.1:30090", "The GRPC server address")
consumerName = flag.String("consumer-name", "", "The Consumer Name")
printWorkDetails = flag.Bool("print-work-details", false, "Print work details")
)

func main() {
Expand Down Expand Up @@ -71,13 +70,12 @@ func main() {
grpcOptions := grpc.NewGRPCOptions()
grpcOptions.URL = *grpcServerAddr

workClient, err := work.NewClientHolderBuilder(grpcOptions).
WithClientID(fmt.Sprintf("%s-client-a", sourceID)).
WithSourceID(sourceID).
WithCodecs(codec.NewManifestBundleCodec()).
WithWorkClientWatcherStore(grpcsource.NewRESTFullAPIWatcherStore(maestroAPIClient, sourceID)).
WithResyncEnabled(false).
NewSourceClientHolder(ctx)
workClient, err := grpcsource.NewMaestroGRPCSourceWorkClient(
ctx,
maestroAPIClient,
grpcOptions,
sourceID,
)
if err != nil {
log.Fatal(err)
}
Expand All @@ -99,11 +97,9 @@ func main() {
}
switch event.Type {
case watch.Modified:
fmt.Printf("watched work (uid=%s) is modified\n", event.Object.(*workv1.ManifestWork).UID)
PrintWork(event.Object.(*workv1.ManifestWork))
Print(event, *printWorkDetails)
case watch.Deleted:
fmt.Printf("watched work (uid=%s) is deleted\n", event.Object.(*workv1.ManifestWork).UID)
PrintWork(event.Object.(*workv1.ManifestWork))
Print(event, *printWorkDetails)
}
}
}
Expand All @@ -112,10 +108,15 @@ func main() {
<-ctx.Done()
}

func PrintWork(work *workv1.ManifestWork) {
workJson, err := json.MarshalIndent(work, "", " ")
if err != nil {
log.Fatal(err)
func Print(event watch.Event, printDetails bool) {
work := event.Object.(*workv1.ManifestWork)
fmt.Printf("watched work (uid=%s) is %s\n", work.UID, event.Type)

if printDetails {
workJson, err := json.MarshalIndent(work, "", " ")
if err != nil {
log.Fatal(err)
}
fmt.Printf("%s\n", string(workJson))
}
fmt.Printf("%s\n", string(workJson))
}
15 changes: 6 additions & 9 deletions examples/manifestworkclient/client-b/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ import (
workv1 "open-cluster-management.io/api/work/v1"

"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc"
"open-cluster-management.io/sdk-go/pkg/cloudevents/work"
"open-cluster-management.io/sdk-go/pkg/cloudevents/work/source/codec"
)

const sourceID = "mw-client-example"
Expand Down Expand Up @@ -67,13 +65,12 @@ func main() {
grpcOptions := grpc.NewGRPCOptions()
grpcOptions.URL = *grpcServerAddr

workClient, err := work.NewClientHolderBuilder(grpcOptions).
WithClientID(fmt.Sprintf("%s-client-b", sourceID)).
WithSourceID(sourceID).
WithCodecs(codec.NewManifestBundleCodec()).
WithWorkClientWatcherStore(grpcsource.NewRESTFullAPIWatcherStore(maestroAPIClient, sourceID)).
WithResyncEnabled(false).
NewSourceClientHolder(ctx)
workClient, err := grpcsource.NewMaestroGRPCSourceWorkClient(
ctx,
maestroAPIClient,
grpcOptions,
sourceID,
)
if err != nil {
log.Fatal(err)
}
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ require (
k8s.io/component-base v0.29.3
k8s.io/klog/v2 v2.120.1
open-cluster-management.io/api v0.13.1-0.20240605083248-f9e7f50520fc
open-cluster-management.io/ocm v0.13.1-0.20240612012446-8e792c14d8f4
open-cluster-management.io/sdk-go v0.13.1-0.20240607073142-990fcdba50a6
open-cluster-management.io/ocm v0.13.1-0.20240618054845-e2a7b9e78b33
open-cluster-management.io/sdk-go v0.13.1-0.20240618022514-b2c1dd175afd
)

require (
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -823,10 +823,10 @@ k8s.io/utils v0.0.0-20240310230437-4693a0247e57 h1:gbqbevonBh57eILzModw6mrkbwM0g
k8s.io/utils v0.0.0-20240310230437-4693a0247e57/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0=
open-cluster-management.io/api v0.13.1-0.20240605083248-f9e7f50520fc h1:tcfncubZRFphYtDXBE7ApBNlSnj1RNazhW+8F01XYYg=
open-cluster-management.io/api v0.13.1-0.20240605083248-f9e7f50520fc/go.mod h1:ltijKJhDifrPH0csvCUmFt5lzaERv+BBfh6X3l83rT0=
open-cluster-management.io/ocm v0.13.1-0.20240612012446-8e792c14d8f4 h1:Z3gwbMUZmblGTHFx6iOO1zlCJc2FJcAJsa2RASTKDqA=
open-cluster-management.io/ocm v0.13.1-0.20240612012446-8e792c14d8f4/go.mod h1:RuYCuKuVJzNxRBkSoQnxyJxyUqOyCH388DlR/QDr7rE=
open-cluster-management.io/sdk-go v0.13.1-0.20240607073142-990fcdba50a6 h1:/nPyxceSdi66Vs+A9LJ+7X6kLe4xK98dBMi4adZv1d0=
open-cluster-management.io/sdk-go v0.13.1-0.20240607073142-990fcdba50a6/go.mod h1:muWzHWsgK8IsopltwTnsBjf4DN9IcC9rF0G2uEq/Pjw=
open-cluster-management.io/ocm v0.13.1-0.20240618054845-e2a7b9e78b33 h1:7uPjyn1x25QZIzfZqeSFfZdNrzc2hlHm6t/JKYKu9fI=
open-cluster-management.io/ocm v0.13.1-0.20240618054845-e2a7b9e78b33/go.mod h1:KzUwhPZAg6Wq+4xRu10fVVpqNADyz5CtRW4ziqIC2z4=
open-cluster-management.io/sdk-go v0.13.1-0.20240618022514-b2c1dd175afd h1:kTVZOR7bTdh4ID7EoliyGhPR5CItpx8GehN581IxoPA=
open-cluster-management.io/sdk-go v0.13.1-0.20240618022514-b2c1dd175afd/go.mod h1:muWzHWsgK8IsopltwTnsBjf4DN9IcC9rF0G2uEq/Pjw=
sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.28.0 h1:TgtAeesdhpm2SGwkQasmbeqDo8th5wOBA5h/AjTKA4I=
sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.28.0/go.mod h1:VHVDI/KrK4fjnV61bE2g3sA7tiETLn8sooImelsCx3Y=
sigs.k8s.io/controller-runtime v0.17.3 h1:65QmN7r3FWgTxDMz9fvGnO1kbf2nu+acg9p2R9oYYYk=
Expand Down
88 changes: 88 additions & 0 deletions pkg/client/cloudevents/grpcsource/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package grpcsource

import (
"context"
"fmt"

"github.com/openshift-online/maestro/pkg/api/openapi"
"k8s.io/client-go/rest"
"k8s.io/klog/v2"

workv1client "open-cluster-management.io/api/client/work/clientset/versioned/typed/work/v1"
workv1 "open-cluster-management.io/api/work/v1"

"open-cluster-management.io/sdk-go/pkg/cloudevents/generic"
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc"
sourceclient "open-cluster-management.io/sdk-go/pkg/cloudevents/work/source/client"
sourcecodec "open-cluster-management.io/sdk-go/pkg/cloudevents/work/source/codec"
)

func NewMaestroGRPCSourceWorkClient(
ctx context.Context,
apiClient *openapi.APIClient,
opts *grpc.GRPCOptions,
sourceID string,
) (workv1client.WorkV1Interface, error) {
if len(sourceID) == 0 {
return nil, fmt.Errorf("source id is required")
}

options, err := generic.BuildCloudEventsSourceOptions(opts, fmt.Sprintf("%s-maestro", sourceID), sourceID)
machi1990 marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, err
}

watcherStore := newRESTFulAPIWatcherStore(ctx, apiClient, sourceID)

cloudEventsClient, err := generic.NewCloudEventSourceClient[*workv1.ManifestWork](
ctx,
options,
nil, // resync is disabled, so lister is not required
nil, // resync is disabled, so status hash is not required
sourcecodec.NewManifestBundleCodec(),
)
if err != nil {
return nil, err
}

cloudEventsClient.Subscribe(ctx, watcherStore.HandleReceivedWork)

// start a go routine to receive client reconnect signal
go func() {
for {
select {
case <-ctx.Done():
return
case <-cloudEventsClient.ReconnectedChan():
machi1990 marked this conversation as resolved.
Show resolved Hide resolved
// reconnect happened, sync the works for current watchers
if err := watcherStore.Sync(); err != nil {
klog.Errorf("failed to sync the works %v", err)
}
}
}
}()

manifestWorkClient := sourceclient.NewManifestWorkSourceClient(sourceID, cloudEventsClient, watcherStore)
return &WorkV1ClientWrapper{ManifestWorkClient: manifestWorkClient}, nil

}

// WorkV1ClientWrapper wraps a ManifestWork client to a WorkV1Interface
type WorkV1ClientWrapper struct {
ManifestWorkClient *sourceclient.ManifestWorkSourceClient
}

var _ workv1client.WorkV1Interface = &WorkV1ClientWrapper{}

func (c *WorkV1ClientWrapper) ManifestWorks(namespace string) workv1client.ManifestWorkInterface {
c.ManifestWorkClient.SetNamespace(namespace)
return c.ManifestWorkClient
}

func (c *WorkV1ClientWrapper) AppliedManifestWorks() workv1client.AppliedManifestWorkInterface {
return nil
}

func (c *WorkV1ClientWrapper) RESTClient() rest.Interface {
return nil
}
79 changes: 79 additions & 0 deletions pkg/client/cloudevents/grpcsource/watch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package grpcsource

import (
"sync"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/klog/v2"
workv1 "open-cluster-management.io/api/work/v1"
)

// workWatcher implements the watch.Interface.
type workWatcher struct {
sync.RWMutex

result chan watch.Event
done chan struct{}
stopped bool

namespace string
}

var _ watch.Interface = &workWatcher{}

func newWorkWatcher(namespace string) *workWatcher {
return &workWatcher{
result: make(chan watch.Event),
done: make(chan struct{}),
namespace: namespace,
}
}

// ResultChan implements Interface.
func (w *workWatcher) ResultChan() <-chan watch.Event {
return w.result
}

// Stop implements Interface.
func (w *workWatcher) Stop() {
// Call Close() exactly once by locking and setting a flag.
w.Lock()
defer w.Unlock()
// closing a closed channel always panics, therefore check before closing
select {
case <-w.done:
close(w.result)
default:
w.stopped = true
close(w.done)
}
}

// Receive an event and sends down the result channel.
func (w *workWatcher) Receive(evt watch.Event) {
if w.isStopped() {
// this watcher is stopped, do nothing.
return
}

work, ok := evt.Object.(*workv1.ManifestWork)
if !ok {
klog.Errorf("unknown event object type %T", evt.Object)
return
}

if w.namespace != metav1.NamespaceAll && w.namespace != work.Namespace {
klog.V(4).Infof("ignore the work %s/%s for the watcher %s", work.Namespace, work.Name, w.namespace)
return
}

w.result <- evt
}

func (w *workWatcher) isStopped() bool {
w.RLock()
defer w.RUnlock()

return w.stopped
}
Loading
Loading