Skip to content

Commit

Permalink
Merge pull request kubeedge#5649 from JiaweiGithub/feat/dmi_state_cloud
Browse files Browse the repository at this point in the history
cloud support device state
  • Loading branch information
kubeedge-bot authored Jul 19, 2024
2 parents f0bf11c + 7402bf7 commit 7358af2
Show file tree
Hide file tree
Showing 10 changed files with 129 additions and 21 deletions.
6 changes: 6 additions & 0 deletions build/crds/devices/devices_v1beta1_device.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -755,6 +755,12 @@ spec:
description: DeviceStatus reports the device state and the desired/reported
values of twin attributes.
properties:
lastOnlineTime:
description: 'Optional: The last time the device was online.'
type: string
state:
description: 'Optional: The state of the device.'
type: string
twins:
description: 'A list of device twins containing desired/reported desired/reported
values of twin properties. Optional: A passive device won''t have
Expand Down
4 changes: 3 additions & 1 deletion cloud/pkg/common/messagelayer/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ const (
ResourceDevice = "device"
ResourceTypeTwinEdgeUpdated = "twin/edge_updated"
ResourceTypeMembershipDetail = "membership/detail"
ResourceDeviceStateUpdated = "state/update"
)

// BuildResource return a string as "beehive/pkg/core/model".Message.Router.Resource
Expand Down Expand Up @@ -143,7 +144,8 @@ func GetResourceTypeForDevice(resource string) (string, error) {
return ResourceTypeTwinEdgeUpdated, nil
} else if strings.Contains(resource, ResourceTypeMembershipDetail) {
return ResourceTypeMembershipDetail, nil
} else if strings.Contains(resource, ResourceDeviceStateUpdated) {
return ResourceDeviceStateUpdated, nil
}

return "", fmt.Errorf("unknown resource, found: %s", resource)
}
1 change: 1 addition & 0 deletions cloud/pkg/devicecontroller/constants/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package constants
const (
ResourceTypeTwinEdgeUpdated = "twin/edge_updated"
ResourceTypeMembershipDetail = "membership/detail"
ResourceDeviceStateUpdated = "state/update"

// Group
GroupTwin = "twin"
Expand Down
89 changes: 83 additions & 6 deletions cloud/pkg/devicecontroller/controller/upstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,10 @@ const (
type UpstreamController struct {
crdClient crdClientset.Interface
messageLayer messagelayer.MessageLayer
// message channel
deviceStatusChan chan model.Message

// deviceTwinsChan message channel
deviceTwinsChan chan model.Message
// deviceStates message channel
deviceStatesChan chan model.Message
// downstream controller to update device status in cache
dc *DownstreamController
}
Expand All @@ -64,7 +65,8 @@ type UpstreamController struct {
func (uc *UpstreamController) Start() error {
klog.Info("Start upstream devicecontroller")

uc.deviceStatusChan = make(chan model.Message, config.Config.Buffer.UpdateDeviceStatus)
uc.deviceTwinsChan = make(chan model.Message, config.Config.Buffer.UpdateDeviceTwins)
uc.deviceStatesChan = make(chan model.Message, config.Config.Buffer.UpdateDeviceStates)
go uc.dispatchMessage()

for i := 0; i < int(config.Config.Load.UpdateDeviceStatusWorkers); i++ {
Expand Down Expand Up @@ -98,7 +100,9 @@ func (uc *UpstreamController) dispatchMessage() {

switch resourceType {
case constants.ResourceTypeTwinEdgeUpdated:
uc.deviceStatusChan <- msg
uc.deviceTwinsChan <- msg
case constants.ResourceDeviceStateUpdated:
uc.deviceStatesChan <- msg
case constants.ResourceTypeMembershipDetail:
default:
klog.Warningf("Message: %s, with resource type: %s not intended for device controller", msg.GetID(), resourceType)
Expand All @@ -112,7 +116,67 @@ func (uc *UpstreamController) updateDeviceStatus() {
case <-beehiveContext.Done():
klog.Info("Stop updateDeviceStatus")
return
case msg := <-uc.deviceStatusChan:
case msg := <-uc.deviceStatesChan:
klog.Infof("Message: %s, operation is: %s, and resource is: %s", msg.GetID(), msg.GetOperation(), msg.GetResource())
msgState, err := uc.unmarshalDeviceStatesMessage(msg)
if err != nil {
klog.Warningf("Unmarshall failed due to error %v", err)
continue
}
deviceID, err := messagelayer.GetDeviceID(msg.GetResource())
if err != nil {
klog.Warning("Failed to get device id")
continue
}
device, ok := uc.dc.deviceManager.Device.Load(deviceID)
if !ok {
klog.Warningf("Device %s does not exist in upstream controller", deviceID)
continue
}
cacheDevice, ok := device.(*v1beta1.Device)
if !ok {
klog.Warning("Failed to assert to CacheDevice type")
continue
}

// Store the status in cache so that when update is received by informer, it is not processed by downstream controller
cacheDevice.Status.State = msgState.Device.State
cacheDevice.Status.LastOnlineTime = msgState.Device.LastOnlineTime
uc.dc.deviceManager.Device.Store(deviceID, cacheDevice)

body, err := json.Marshal(cacheDevice.Status)
if err != nil {
klog.Errorf("Failed to marshal device states %v", cacheDevice.Status)
continue
}
err = uc.crdClient.DevicesV1beta1().RESTClient().Patch(MergePatchType).Namespace(cacheDevice.Namespace).Resource(ResourceTypeDevices).Name(cacheDevice.Name).Body(body).Do(context.Background()).Error()
if err != nil {
klog.Errorf("Failed to patch device states %v of device %v in namespace %v, err: %v", cacheDevice,
deviceID, cacheDevice.Namespace, err)
continue
}

//send confirm message to edge twin
resMsg := model.NewMessage(msg.GetID())
nodeID, err := messagelayer.GetNodeID(msg)
if err != nil {
klog.Warningf("Message: %s process failure, get node id failed with error: %s", msg.GetID(), err)
continue
}
resource, err := messagelayer.BuildResourceForDevice(nodeID, "twin", "")
if err != nil {
klog.Warningf("Message: %s process failure, build message resource failed with error: %s", msg.GetID(), err)
continue
}
resMsg.BuildRouter(modules.DeviceControllerModuleName, constants.GroupTwin, resource, model.ResponseOperation)
resMsg.Content = commonconst.MessageSuccessfulContent
err = uc.messageLayer.Response(*resMsg)
if err != nil {
klog.Warningf("Message: %s process failure, response failed with error: %s", msg.GetID(), err)
continue
}
klog.Infof("Message: %s process successfully", msg.GetID())
case msg := <-uc.deviceTwinsChan:
klog.Infof("Message: %s, operation is: %s, and resource is: %s", msg.GetID(), msg.GetOperation(), msg.GetResource())
msgTwin, err := uc.unmarshalDeviceStatusMessage(msg)
if err != nil {
Expand Down Expand Up @@ -217,6 +281,19 @@ func (uc *UpstreamController) unmarshalDeviceStatusMessage(msg model.Message) (*
return twinUpdate, nil
}

func (uc *UpstreamController) unmarshalDeviceStatesMessage(msg model.Message) (*types.DeviceStateUpdate, error) {
contentData, err := msg.GetContentData()
if err != nil {
return nil, err
}

stateUpdate := &types.DeviceStateUpdate{}
if err := json.Unmarshal(contentData, stateUpdate); err != nil {
return nil, err
}
return stateUpdate, nil
}

// NewUpstreamController create UpstreamController from config
func NewUpstreamController(dc *DownstreamController) (*UpstreamController, error) {
uc := &UpstreamController{
Expand Down
20 changes: 13 additions & 7 deletions cloud/pkg/devicecontroller/types/device.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@ package types

// Device the struct of device
type Device struct {
ID string `json:"id,omitempty"`
Name string `json:"name,omitempty"`
Description string `json:"description,omitempty"`
State string `json:"state,omitempty"`
LastOnline string `json:"last_online,omitempty"`
Attributes map[string]*MsgAttr `json:"attributes,omitempty"`
Twin map[string]*MsgTwin `json:"twin,omitempty"`
ID string `json:"id,omitempty"`
Name string `json:"name,omitempty"`
Description string `json:"description,omitempty"`
State string `json:"state,omitempty"`
LastOnlineTime string `json:"lastOnlineTime,omitempty"`
Attributes map[string]*MsgAttr `json:"attributes,omitempty"`
Twin map[string]*MsgTwin `json:"twin,omitempty"`
}

// BaseMessage the base struct of event message
Expand Down Expand Up @@ -106,3 +106,9 @@ type DeviceTwinUpdate struct {
BaseMessage
Twin map[string]*MsgTwin `json:"twin"`
}

// DeviceStateUpdate the struct of device state update
type DeviceStateUpdate struct {
BaseMessage
Device Device
}
3 changes: 2 additions & 1 deletion common/constants/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ const (
DefaultRuleEndpointsEventBuffer = 1

// DeviceController
DefaultUpdateDeviceStatusBuffer = 1024
DefaultUpdateDeviceTwinsBuffer = 1024
DefaultUpdateDeviceStatesBuffer = 1024
DefaultDeviceEventBuffer = 1
DefaultDeviceModelEventBuffer = 1
DefaultUpdateDeviceStatusWorkers = 1
Expand Down
6 changes: 6 additions & 0 deletions manifests/charts/cloudcore/crds/devices_v1beta1_device.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -755,6 +755,12 @@ spec:
description: DeviceStatus reports the device state and the desired/reported
values of twin attributes.
properties:
lastOnlineTime:
description: 'Optional: The last time the device was online.'
type: string
state:
description: 'Optional: The state of the device.'
type: string
twins:
description: 'A list of device twins containing desired/reported desired/reported
values of twin properties. Optional: A passive device won''t have
Expand Down
3 changes: 2 additions & 1 deletion pkg/apis/componentconfig/cloudcore/v1alpha1/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,8 @@ func NewDefaultCloudCoreConfig() *CloudCoreConfig {
DeviceController: &DeviceController{
Enable: true,
Buffer: &DeviceControllerBuffer{
UpdateDeviceStatus: constants.DefaultUpdateDeviceStatusBuffer,
UpdateDeviceTwins: constants.DefaultUpdateDeviceTwinsBuffer,
UpdateDeviceStates: constants.DefaultUpdateDeviceStatesBuffer,
DeviceEvent: constants.DefaultDeviceEventBuffer,
DeviceModelEvent: constants.DefaultDeviceModelEventBuffer,
},
Expand Down
7 changes: 5 additions & 2 deletions pkg/apis/componentconfig/cloudcore/v1alpha1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,9 +396,12 @@ type DeviceController struct {

// DeviceControllerBuffer indicates deviceController buffer
type DeviceControllerBuffer struct {
// UpdateDeviceStatus indicates the buffer of update device status
// UpdateDeviceTwins indicates the buffer of update device twins
// default 1024
UpdateDeviceStatus int32 `json:"updateDeviceStatus,omitempty"`
UpdateDeviceTwins int32 `json:"updateDeviceTwins,omitempty"`
// UpdateDeviceStates indicates the buffer of update device states
// default 1024
UpdateDeviceStates int32 `json:"updateDeviceStatus,omitempty"`
// DeviceEvent indicates the buffer of device event
// default 1
DeviceEvent int32 `json:"deviceEvent,omitempty"`
Expand Down
11 changes: 8 additions & 3 deletions pkg/apis/devices/v1beta1/device_instance_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@ type DeviceStatus struct {
// Optional: A passive device won't have twin properties and this list could be empty.
// +optional
Twins []Twin `json:"twins,omitempty"`
// Optional: The state of the device.
// +optional
State string `json:"state,omitempty"`
// Optional: The last time the device was online.
// +optional
LastOnlineTime string `json:"lastOnlineTime,omitempty"`
}

// Twin provides a logical representation of control properties (writable properties in the
Expand Down Expand Up @@ -263,9 +269,8 @@ type VisitorConfig struct {
type Device struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`

Spec DeviceSpec `json:"spec,omitempty"`
Status DeviceStatus `json:"status,omitempty"`
Spec DeviceSpec `json:"spec,omitempty"`
Status DeviceStatus `json:"status,omitempty"`
}

// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
Expand Down

0 comments on commit 7358af2

Please sign in to comment.