Skip to content

Commit

Permalink
Fixed deadlock when handling multiple stream entries errors (#576)
Browse files Browse the repository at this point in the history
This is an alternate approach for #572

The `closeCh` is used to propagate back the error to the client stream
handler. Writing on this channel should never block and it's just
necessary to write once, all other errors are just ignored.
  • Loading branch information
merlimat authored Nov 16, 2024
1 parent 6cc48c4 commit e7ef873
Showing 1 changed file with 12 additions and 5 deletions.
17 changes: 12 additions & 5 deletions server/leader_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -886,10 +886,10 @@ func (lc *leaderController) handleWriteStream(stream proto.OxiaClient_WriteStrea
req, err := stream.Recv()

if err != nil {
closeCh <- err
sendNonBlocking(closeCh, err)
return
} else if req == nil {
closeCh <- errors.New("stream closed")
sendNonBlocking(closeCh, errors.New("stream closed"))
return
}

Expand All @@ -908,7 +908,7 @@ func (lc *leaderController) handleWalSynced(stream proto.OxiaClient_WriteStreamS
offset int64, timestamp uint64, err error, timer metrics.Timer) {
if err != nil {
timer.Done()
closeCh <- err
sendNonBlocking(closeCh, err)
return
}

Expand All @@ -917,13 +917,13 @@ func (lc *leaderController) handleWalSynced(stream proto.OxiaClient_WriteStreamS
}, func(response *proto.WriteResponse, err error) {
if err != nil {
timer.Done()
closeCh <- err
sendNonBlocking(closeCh, err)
return
}

if err = stream.Send(response); err != nil {
timer.Done()
closeCh <- err
sendNonBlocking(closeCh, err)
return
}
timer.Done()
Expand Down Expand Up @@ -1136,3 +1136,10 @@ func checkStatusIsLeader(actual proto.ServingStatus) error {
}
return nil
}

func sendNonBlocking(ch chan error, err error) {
select {
case ch <- err:
default:
}
}

0 comments on commit e7ef873

Please sign in to comment.