-
Notifications
You must be signed in to change notification settings - Fork 21
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
sync works when the watcher creating
Signed-off-by: Wei Liu <[email protected]>
- Loading branch information
Showing
13 changed files
with
551 additions
and
153 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,87 @@ | ||
package grpcsource | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
|
||
"github.com/openshift-online/maestro/pkg/api/openapi" | ||
"k8s.io/client-go/rest" | ||
"k8s.io/klog/v2" | ||
|
||
workv1client "open-cluster-management.io/api/client/work/clientset/versioned/typed/work/v1" | ||
workv1 "open-cluster-management.io/api/work/v1" | ||
|
||
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic" | ||
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc" | ||
sourceclient "open-cluster-management.io/sdk-go/pkg/cloudevents/work/source/client" | ||
sourcecodec "open-cluster-management.io/sdk-go/pkg/cloudevents/work/source/codec" | ||
) | ||
|
||
func NewMaestroGRPCSourceWorkClient( | ||
ctx context.Context, | ||
apiClient *openapi.APIClient, | ||
opts *grpc.GRPCOptions, | ||
sourceID string, | ||
) (workv1client.WorkV1Interface, error) { | ||
if len(sourceID) == 0 { | ||
return nil, fmt.Errorf("source id is required") | ||
} | ||
|
||
options, err := generic.BuildCloudEventsSourceOptions(opts, fmt.Sprintf("%s-maestro", sourceID), sourceID) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
watcherStore := newRESTFulAPIWatcherStore(ctx, apiClient, sourceID) | ||
|
||
cloudEventsClient, err := generic.NewCloudEventSourceClient[*workv1.ManifestWork]( | ||
ctx, | ||
options, | ||
nil, // resync is disabled, so lister is not required | ||
nil, // resync is disabled, so status hash is not required | ||
sourcecodec.NewManifestBundleCodec(), | ||
) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
cloudEventsClient.Subscribe(ctx, watcherStore.HandleReceivedWork) | ||
|
||
// start a go routine to receive client reconnect signal | ||
go func() { | ||
for { | ||
select { | ||
case <-ctx.Done(): | ||
return | ||
case <-cloudEventsClient.ReconnectedChan(): | ||
if err := watcherStore.Sync(); err != nil { | ||
klog.Errorf("failed to sync the works %v", err) | ||
} | ||
} | ||
} | ||
}() | ||
|
||
manifestWorkClient := sourceclient.NewManifestWorkSourceClient(sourceID, cloudEventsClient, watcherStore) | ||
return &WorkV1ClientWrapper{ManifestWorkClient: manifestWorkClient}, nil | ||
|
||
} | ||
|
||
// WorkV1ClientWrapper wraps a ManifestWork client to a WorkV1Interface | ||
type WorkV1ClientWrapper struct { | ||
ManifestWorkClient *sourceclient.ManifestWorkSourceClient | ||
} | ||
|
||
var _ workv1client.WorkV1Interface = &WorkV1ClientWrapper{} | ||
|
||
func (c *WorkV1ClientWrapper) ManifestWorks(namespace string) workv1client.ManifestWorkInterface { | ||
c.ManifestWorkClient.SetNamespace(namespace) | ||
return c.ManifestWorkClient | ||
} | ||
|
||
func (c *WorkV1ClientWrapper) AppliedManifestWorks() workv1client.AppliedManifestWorkInterface { | ||
return nil | ||
} | ||
|
||
func (c *WorkV1ClientWrapper) RESTClient() rest.Interface { | ||
return nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,79 @@ | ||
package grpcsource | ||
|
||
import ( | ||
"sync" | ||
|
||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||
"k8s.io/apimachinery/pkg/watch" | ||
"k8s.io/klog/v2" | ||
workv1 "open-cluster-management.io/api/work/v1" | ||
) | ||
|
||
// workWatcher implements the watch.Interface. | ||
type workWatcher struct { | ||
sync.RWMutex | ||
|
||
result chan watch.Event | ||
done chan struct{} | ||
stopped bool | ||
|
||
namespace string | ||
} | ||
|
||
var _ watch.Interface = &workWatcher{} | ||
|
||
func newWorkWatcher(namespace string) *workWatcher { | ||
return &workWatcher{ | ||
result: make(chan watch.Event), | ||
done: make(chan struct{}), | ||
namespace: namespace, | ||
} | ||
} | ||
|
||
// ResultChan implements Interface. | ||
func (w *workWatcher) ResultChan() <-chan watch.Event { | ||
return w.result | ||
} | ||
|
||
// Stop implements Interface. | ||
func (w *workWatcher) Stop() { | ||
// Call Close() exactly once by locking and setting a flag. | ||
w.Lock() | ||
defer w.Unlock() | ||
// closing a closed channel always panics, therefore check before closing | ||
select { | ||
case <-w.done: | ||
close(w.result) | ||
default: | ||
w.stopped = true | ||
close(w.done) | ||
} | ||
} | ||
|
||
// Receive an event and sends down the result channel. | ||
func (w *workWatcher) Receive(evt watch.Event) { | ||
if w.isStopped() { | ||
// this watcher is stopped, do nothing. | ||
return | ||
} | ||
|
||
work, ok := evt.Object.(*workv1.ManifestWork) | ||
if !ok { | ||
klog.Errorf("unknown event object type %T", evt.Object) | ||
return | ||
} | ||
|
||
if w.namespace != metav1.NamespaceAll && w.namespace != work.Namespace { | ||
klog.V(4).Infof("ignore the work %s/%s for the watcher %s", work.Namespace, work.Name, w.namespace) | ||
return | ||
} | ||
|
||
w.result <- evt | ||
} | ||
|
||
func (w *workWatcher) isStopped() bool { | ||
w.RLock() | ||
defer w.RUnlock() | ||
|
||
return w.stopped | ||
} |
Oops, something went wrong.