diff --git a/internal/signaling/handler.go b/internal/signaling/handler.go index cdd31f8..3ac63aa 100644 --- a/internal/signaling/handler.go +++ b/internal/signaling/handler.go @@ -58,10 +58,12 @@ func Handler(ctx context.Context, store stores.Store, cloudflare *cloudflare.Cre logger.Info("peer websocket closed", zap.String("peer", peer.ID)) conn.Close(websocket.StatusInternalError, "unexpceted closure") - // At this point ctx has already been cancelled, so we create a new one to use for the disconnect. - nctx, cancel := context.WithTimeout(logging.WithLogger(context.Background(), logger), time.Second*10) - defer cancel() - manager.Disconnected(nctx, peer) + if !peer.closedPacketReceived { + // At this point ctx has already been cancelled, so we create a new one to use for the disconnect. + nctx, cancel := context.WithTimeout(logging.WithLogger(context.Background(), logger), time.Second*10) + defer cancel() + manager.Disconnected(nctx, peer) + } }() for ctx.Err() == nil { @@ -75,6 +77,11 @@ func Handler(ctx context.Context, store stores.Store, cloudflare *cloudflare.Cre util.ErrorAndDisconnect(ctx, conn, err) } + if peer.closedPacketReceived { + logger.Warn("received packet after close", zap.String("peer", peer.ID), zap.String("type", typeOnly.Type)) + continue + } + switch typeOnly.Type { case "credentials": credentials, err := cloudflare.GetCredentials(ctx) diff --git a/internal/signaling/peer.go b/internal/signaling/peer.go index 9d728d2..79107e2 100644 --- a/internal/signaling/peer.go +++ b/internal/signaling/peer.go @@ -19,6 +19,8 @@ type Peer struct { store stores.Store conn *websocket.Conn + closedPacketReceived bool + retrievedIDCallback func(context.Context, *Peer) (bool, error) ID string @@ -121,6 +123,16 @@ func (p *Peer) HandlePacket(ctx context.Context, typ string, raw []byte) error { return fmt.Errorf("unable to handle packet: %w", err) } + case "close": + packet := ClosePacket{} + if err := json.Unmarshal(raw, &packet); err != nil { + return fmt.Errorf("unable to unmarshal json: %w", err) + } + err = p.HandleClosePacket(ctx, packet) + if err != nil { + return fmt.Errorf("unable to handle packet: %w", err) + } + case "list": packet := ListPacket{} if err := json.Unmarshal(raw, &packet); err != nil { @@ -151,30 +163,7 @@ func (p *Peer) HandlePacket(ctx context.Context, typ string, raw []byte) error { return fmt.Errorf("unable to handle packet: %w", err) } - case "leave": - go metrics.Record(ctx, "lobby", "leave", p.Game, p.ID, p.Lobby) - logger.Info("leaving lobby", zap.String("lobby", p.Lobby)) - - others, err := p.store.LeaveLobby(ctx, p.Game, p.Lobby, p.ID) - if err != nil { - return fmt.Errorf("unable to leave lobby: %w", err) - } - packet := DisconnectPacket{ - Type: "disconnect", - ID: p.ID, - } - data, err := json.Marshal(packet) - if err == nil { - for _, id := range others { - if id != p.ID { - err := p.store.Publish(ctx, p.Game+p.Lobby+id, data) - if err != nil { - logger.Error("failed to publish disconnect packet", zap.Error(err)) - } - } - } - } - p.Lobby = "" + // case "leave": case "connected": // TODO: Do we want to keep track of connections between peers? case "disconnected": // TODO: Do we want to keep track of connections between peers? @@ -269,6 +258,45 @@ func (p *Peer) HandleHelloPacket(ctx context.Context, packet HelloPacket) error }) } +func (p *Peer) HandleClosePacket(ctx context.Context, packet ClosePacket) error { + logger := logging.GetLogger(ctx) + go metrics.Record(ctx, "client", "close", p.Game, p.ID, p.Lobby) + + p.closedPacketReceived = true + + logger.Info("client closed", + zap.String("game", p.Game), + zap.String("peer", p.ID), + zap.String("lobby", p.Lobby), + zap.String("reason", packet.Reason), + ) + + if p.Lobby != "" { + others, err := p.store.LeaveLobby(ctx, p.Game, p.Lobby, p.ID) + if err != nil { + return fmt.Errorf("unable to leave lobby: %w", err) + } + packet := DisconnectPacket{ + Type: "disconnect", + ID: p.ID, + } + data, err := json.Marshal(packet) + if err == nil { + for _, id := range others { + if id != p.ID { + err := p.store.Publish(ctx, p.Game+p.Lobby+id, data) + if err != nil { + logger.Error("failed to publish disconnect packet", zap.Error(err)) + } + } + } + } + p.Lobby = "" + } + + return nil +} + func (p *Peer) HandleListPacket(ctx context.Context, packet ListPacket) error { logger := logging.GetLogger(ctx) if p.ID == "" { diff --git a/internal/signaling/types.go b/internal/signaling/types.go index 2ce5be0..243cb40 100644 --- a/internal/signaling/types.go +++ b/internal/signaling/types.go @@ -77,7 +77,7 @@ type DisconnectPacket struct { Reason string `json:"reason"` } -type LeavePacket struct { +type ClosePacket struct { Type string `json:"type"` ID string `json:"id"` diff --git a/lib/network.ts b/lib/network.ts index 67f0c6c..5fbb85c 100644 --- a/lib/network.ts +++ b/lib/network.ts @@ -92,7 +92,7 @@ export default class Network extends EventEmitter { if (this.id !== '') { this.signaling.send({ - type: 'leave', + type: 'close', id: this.id, reason: reason ?? 'normal closure' }) diff --git a/lib/types.ts b/lib/types.ts index c8b91ad..b9f8c84 100644 --- a/lib/types.ts +++ b/lib/types.ts @@ -27,6 +27,7 @@ interface Base { export type SignalingPacketTypes = | CandidatePacket +| ClosePacket | ConnectedPacket | ConnectPacket | CreatePacket @@ -39,7 +40,6 @@ export type SignalingPacketTypes = | HelloPacket | JoinedPacket | JoinPacket -| LeavePacket | ListPacket | LobbiesPacket | WelcomePacket @@ -91,8 +91,8 @@ export interface JoinedPacket extends Base { id: string } -export interface LeavePacket extends Base { - type: 'leave' +export interface ClosePacket extends Base { + type: 'close' id: string reason: string }