diff --git a/internal/rpc/sshDev/grpc.go b/internal/rpc/sshDev/grpc.go index 5648d3b..958f090 100644 --- a/internal/rpc/sshDev/grpc.go +++ b/internal/rpc/sshDev/grpc.go @@ -1,9 +1,7 @@ package sshDev import ( - "context" "encoding/json" - "errors" redisPkg "github.com/ncuhome/GeniusAuthoritarian/internal/db/redis" "github.com/ncuhome/GeniusAuthoritarian/internal/rpc/middlewares" "github.com/ncuhome/GeniusAuthoritarian/internal/rpc/sshDev/sshDevClient/proto" @@ -52,22 +50,25 @@ func (a *SshAccounts) Watch(_ *emptypb.Empty, server proto.SshAccounts_WatchServ Accounts: TransformAccountArray(sshAccounts), }) if err != nil { - return err + return status.Error(codes.DataLoss, err.Error()) } msgChannel := make(chan []sshDevModel.SshAccountMsg) go func() { + msgSubChan := sub.Channel() for { - msg, err := sub.ReceiveMessage(context.Background()) - if err != nil { + msg, ok := <-msgSubChan + if !ok { close(msgChannel) return } - if msg.PayloadSlice == nil && msg.Payload != "" { - msg.PayloadSlice = []string{msg.Payload} - } else { - continue + if msg.PayloadSlice == nil { + if msg.Payload != "" { + msg.PayloadSlice = []string{msg.Payload} + } else { + continue + } } for _, payload := range msg.PayloadSlice { @@ -83,20 +84,20 @@ func (a *SshAccounts) Watch(_ *emptypb.Empty, server proto.SshAccounts_WatchServ select { case messages, ok := <-msgChannel: if !ok { - return errors.New("ssh account status subscription exception") + return status.Error(codes.Internal, "ssh account status subscription exception") } err := server.Send(&proto.AccountStream{ Accounts: TransformMsgArray(messages), }) if err != nil { - return err + return status.Error(codes.DataLoss, err.Error()) } case <-time.After(time.Minute): err := server.Send(&proto.AccountStream{ IsHeartBeat: true, }) if err != nil { - return err + return status.Error(codes.Unknown, err.Error()) } } }