Skip to content

Commit

Permalink
fix: redis subcribe
Browse files Browse the repository at this point in the history
  • Loading branch information
Yeuoly committed Sep 5, 2024
1 parent de788d4 commit 6926e58
Showing 1 changed file with 24 additions and 5 deletions.
29 changes: 24 additions & 5 deletions internal/utils/cache/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -453,20 +453,39 @@ func Publish(channel string, message any, context ...redis.Cmdable) error {
func Subscribe[T any](channel string) (<-chan T, func()) {
pubsub := client.Subscribe(ctx, channel)
ch := make(chan T)
connection_established := make(chan bool)

go func() {
defer close(ch)
defer close(connection_established)

for msg := range pubsub.Channel() {
v, err := parser.UnmarshalJson[T](msg.Payload)
alive := true
for alive {
iface, err := pubsub.Receive(context.Background())
if err != nil {
continue
alive = false
break
}
switch data := iface.(type) {
case *redis.Subscription:
connection_established <- true
case *redis.Message:
v, err := parser.UnmarshalJson[T](data.Payload)
if err != nil {
continue
}

ch <- v
case *redis.Pong:
default:
alive = false
}

ch <- v
}
}()

// wait for the connection to be established
<-connection_established

return ch, func() {
pubsub.Close()
}
Expand Down

0 comments on commit 6926e58

Please sign in to comment.