Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better handle closing of clients #51

Merged
merged 1 commit into from
Aug 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions internal/signaling/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,12 @@ func Handler(ctx context.Context, store stores.Store, cloudflare *cloudflare.Cre
logger.Info("peer websocket closed", zap.String("peer", peer.ID))
conn.Close(websocket.StatusInternalError, "unexpceted closure")

// At this point ctx has already been cancelled, so we create a new one to use for the disconnect.
nctx, cancel := context.WithTimeout(logging.WithLogger(context.Background(), logger), time.Second*10)
defer cancel()
manager.Disconnected(nctx, peer)
if !peer.closedPacketReceived {
// At this point ctx has already been cancelled, so we create a new one to use for the disconnect.
nctx, cancel := context.WithTimeout(logging.WithLogger(context.Background(), logger), time.Second*10)
defer cancel()
manager.Disconnected(nctx, peer)
}
}()

for ctx.Err() == nil {
Expand All @@ -75,6 +77,11 @@ func Handler(ctx context.Context, store stores.Store, cloudflare *cloudflare.Cre
util.ErrorAndDisconnect(ctx, conn, err)
}

if peer.closedPacketReceived {
logger.Warn("received packet after close", zap.String("peer", peer.ID), zap.String("type", typeOnly.Type))
continue
}

switch typeOnly.Type {
case "credentials":
credentials, err := cloudflare.GetCredentials(ctx)
Expand Down
76 changes: 52 additions & 24 deletions internal/signaling/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ type Peer struct {
store stores.Store
conn *websocket.Conn

closedPacketReceived bool

retrievedIDCallback func(context.Context, *Peer) (bool, error)

ID string
Expand Down Expand Up @@ -121,6 +123,16 @@ func (p *Peer) HandlePacket(ctx context.Context, typ string, raw []byte) error {
return fmt.Errorf("unable to handle packet: %w", err)
}

case "close":
packet := ClosePacket{}
if err := json.Unmarshal(raw, &packet); err != nil {
return fmt.Errorf("unable to unmarshal json: %w", err)
}
err = p.HandleClosePacket(ctx, packet)
if err != nil {
return fmt.Errorf("unable to handle packet: %w", err)
}

case "list":
packet := ListPacket{}
if err := json.Unmarshal(raw, &packet); err != nil {
Expand Down Expand Up @@ -151,30 +163,7 @@ func (p *Peer) HandlePacket(ctx context.Context, typ string, raw []byte) error {
return fmt.Errorf("unable to handle packet: %w", err)
}

case "leave":
go metrics.Record(ctx, "lobby", "leave", p.Game, p.ID, p.Lobby)
logger.Info("leaving lobby", zap.String("lobby", p.Lobby))

others, err := p.store.LeaveLobby(ctx, p.Game, p.Lobby, p.ID)
if err != nil {
return fmt.Errorf("unable to leave lobby: %w", err)
}
packet := DisconnectPacket{
Type: "disconnect",
ID: p.ID,
}
data, err := json.Marshal(packet)
if err == nil {
for _, id := range others {
if id != p.ID {
err := p.store.Publish(ctx, p.Game+p.Lobby+id, data)
if err != nil {
logger.Error("failed to publish disconnect packet", zap.Error(err))
}
}
}
}
p.Lobby = ""
// case "leave":

case "connected": // TODO: Do we want to keep track of connections between peers?
case "disconnected": // TODO: Do we want to keep track of connections between peers?
Expand Down Expand Up @@ -269,6 +258,45 @@ func (p *Peer) HandleHelloPacket(ctx context.Context, packet HelloPacket) error
})
}

func (p *Peer) HandleClosePacket(ctx context.Context, packet ClosePacket) error {
logger := logging.GetLogger(ctx)
go metrics.Record(ctx, "client", "close", p.Game, p.ID, p.Lobby)

p.closedPacketReceived = true

logger.Info("client closed",
zap.String("game", p.Game),
zap.String("peer", p.ID),
zap.String("lobby", p.Lobby),
zap.String("reason", packet.Reason),
)

if p.Lobby != "" {
others, err := p.store.LeaveLobby(ctx, p.Game, p.Lobby, p.ID)
if err != nil {
return fmt.Errorf("unable to leave lobby: %w", err)
}
packet := DisconnectPacket{
Type: "disconnect",
ID: p.ID,
}
data, err := json.Marshal(packet)
if err == nil {
for _, id := range others {
if id != p.ID {
err := p.store.Publish(ctx, p.Game+p.Lobby+id, data)
if err != nil {
logger.Error("failed to publish disconnect packet", zap.Error(err))
}
}
}
}
p.Lobby = ""
}

return nil
}

func (p *Peer) HandleListPacket(ctx context.Context, packet ListPacket) error {
logger := logging.GetLogger(ctx)
if p.ID == "" {
Expand Down
2 changes: 1 addition & 1 deletion internal/signaling/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ type DisconnectPacket struct {
Reason string `json:"reason"`
}

type LeavePacket struct {
type ClosePacket struct {
Type string `json:"type"`

ID string `json:"id"`
Expand Down
2 changes: 1 addition & 1 deletion lib/network.ts
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ export default class Network extends EventEmitter<NetworkListeners> {

if (this.id !== '') {
this.signaling.send({
type: 'leave',
type: 'close',
id: this.id,
reason: reason ?? 'normal closure'
})
Expand Down
6 changes: 3 additions & 3 deletions lib/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ interface Base {

export type SignalingPacketTypes =
| CandidatePacket
| ClosePacket
| ConnectedPacket
| ConnectPacket
| CreatePacket
Expand All @@ -39,7 +40,6 @@ export type SignalingPacketTypes =
| HelloPacket
| JoinedPacket
| JoinPacket
| LeavePacket
| ListPacket
| LobbiesPacket
| WelcomePacket
Expand Down Expand Up @@ -91,8 +91,8 @@ export interface JoinedPacket extends Base {
id: string
}

export interface LeavePacket extends Base {
type: 'leave'
export interface ClosePacket extends Base {
type: 'close'
id: string
reason: string
}
Expand Down