Skip to content

Commit

Permalink
fix: sshDev watch rpc logic
Browse files Browse the repository at this point in the history
  • Loading branch information
Mmx233 committed Apr 6, 2024
1 parent 4deb3f7 commit 3f207b9
Showing 1 changed file with 13 additions and 12 deletions.
25 changes: 13 additions & 12 deletions internal/rpc/sshDev/grpc.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
package sshDev

import (
"context"
"encoding/json"
"errors"
redisPkg "github.com/ncuhome/GeniusAuthoritarian/internal/db/redis"
"github.com/ncuhome/GeniusAuthoritarian/internal/rpc/middlewares"
"github.com/ncuhome/GeniusAuthoritarian/internal/rpc/sshDev/sshDevClient/proto"
Expand Down Expand Up @@ -52,22 +50,25 @@ func (a *SshAccounts) Watch(_ *emptypb.Empty, server proto.SshAccounts_WatchServ
Accounts: TransformAccountArray(sshAccounts),
})
if err != nil {
return err
return status.Error(codes.DataLoss, err.Error())
}

msgChannel := make(chan []sshDevModel.SshAccountMsg)
go func() {
msgSubChan := sub.Channel()
for {
msg, err := sub.ReceiveMessage(context.Background())
if err != nil {
msg, ok := <-msgSubChan
if !ok {
close(msgChannel)
return
}

if msg.PayloadSlice == nil && msg.Payload != "" {
msg.PayloadSlice = []string{msg.Payload}
} else {
continue
if msg.PayloadSlice == nil {
if msg.Payload != "" {
msg.PayloadSlice = []string{msg.Payload}
} else {
continue
}
}

for _, payload := range msg.PayloadSlice {
Expand All @@ -83,20 +84,20 @@ func (a *SshAccounts) Watch(_ *emptypb.Empty, server proto.SshAccounts_WatchServ
select {
case messages, ok := <-msgChannel:
if !ok {
return errors.New("ssh account status subscription exception")
return status.Error(codes.Internal, "ssh account status subscription exception")
}
err := server.Send(&proto.AccountStream{
Accounts: TransformMsgArray(messages),
})
if err != nil {
return err
return status.Error(codes.DataLoss, err.Error())
}
case <-time.After(time.Minute):
err := server.Send(&proto.AccountStream{
IsHeartBeat: true,
})
if err != nil {
return err
return status.Error(codes.Unknown, err.Error())
}
}
}
Expand Down

0 comments on commit 3f207b9

Please sign in to comment.