Skip to content

Commit

Permalink
Rejoin peers into lobbies based on timeoutmanager (#62)
Browse files Browse the repository at this point in the history
  • Loading branch information
koenbollen authored Sep 27, 2023
1 parent acd7e42 commit 01d2791
Show file tree
Hide file tree
Showing 8 changed files with 32 additions and 24 deletions.
2 changes: 1 addition & 1 deletion internal/signaling/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func Handler(ctx context.Context, store stores.Store, cloudflare *cloudflare.Cre
retrievedIDCallback: manager.Reconnected,
}
defer func() {
logger.Info("peer websocket closed", zap.String("peer", peer.ID))
logger.Info("peer websocket closed", zap.String("peer", peer.ID), zap.String("game", peer.Game), zap.String("origin", r.Header.Get("Origin")))
conn.Close(websocket.StatusInternalError, "unexpceted closure")

if !peer.closedPacketReceived {
Expand Down
18 changes: 10 additions & 8 deletions internal/signaling/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ type Peer struct {

closedPacketReceived bool

retrievedIDCallback func(context.Context, *Peer) (bool, error)
retrievedIDCallback func(context.Context, *Peer) (bool, []string, error)

ID string
Secret string
Expand Down Expand Up @@ -209,19 +209,20 @@ func (p *Peer) HandleHelloPacket(ctx context.Context, packet HelloPacket) error

hasReconnected := false
clientIsReconnecting := false
var reconnectingLobbies []string
if packet.ID != "" && packet.Secret != "" {
clientIsReconnecting = true
p.ID = packet.ID
p.Secret = packet.Secret
logger.Info("peer reconnecting", zap.String("game", p.Game), zap.String("peer", p.ID), zap.String("lobby_in_packet", packet.Lobby))
logger.Info("peer reconnecting", zap.String("game", p.Game), zap.String("peer", p.ID))
} else {
p.ID = util.GeneratePeerID(ctx)
p.Secret = util.GenerateSecret(ctx)
logger.Info("peer connecting", zap.String("game", p.Game), zap.String("peer", p.ID))
}
if clientIsReconnecting {
var err error
hasReconnected, err = p.retrievedIDCallback(ctx, p)
hasReconnected, reconnectingLobbies, err = p.retrievedIDCallback(ctx, p)
if err != nil {
return fmt.Errorf("unable to reconnect: %w", err)
}
Expand All @@ -230,20 +231,21 @@ func (p *Peer) HandleHelloPacket(ctx context.Context, packet HelloPacket) error
}
}

if packet.Lobby != "" {
inLobby, err := p.store.IsPeerInLobby(ctx, p.Game, packet.Lobby, p.ID)
if hasReconnected && len(reconnectingLobbies) > 0 && reconnectingLobbies[0] != "" {
lobby := reconnectingLobbies[0]
inLobby, err := p.store.IsPeerInLobby(ctx, p.Game, lobby, p.ID)
if err != nil {
return err
}
if hasReconnected && inLobby {
if inLobby {
logger.Info("peer rejoining lobby", zap.String("game", p.Game), zap.String("peer", p.ID), zap.String("lobby", p.Lobby))
p.Lobby = packet.Lobby
p.Lobby = lobby
p.store.Subscribe(ctx, p.Game+p.Lobby+p.ID, p.ForwardMessage)
go metrics.Record(ctx, "lobby", "reconnected", p.Game, p.ID, p.Lobby)
} else {
fakeJoinPacket := JoinPacket{
Type: "join",
Lobby: packet.Lobby,
Lobby: lobby,
}
err := p.HandleJoinPacket(ctx, fakeJoinPacket)
if err != nil {
Expand Down
19 changes: 12 additions & 7 deletions internal/signaling/stores/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,20 +330,25 @@ func (s *PostgresStore) TimeoutPeer(ctx context.Context, peerID, secret, gameID
return nil
}

func (s *PostgresStore) ReconnectPeer(ctx context.Context, peerID, secret, gameID string) (bool, error) {
res, err := s.DB.Exec(ctx, `
func (s *PostgresStore) ReconnectPeer(ctx context.Context, peerID, secret, gameID string) (bool, []string, error) {
var lobbies []string
err := s.DB.QueryRow(ctx, `
DELETE FROM timeouts
WHERE peer = $1
AND secret = $2
AND game = $3
`, peerID, secret, gameID)
RETURNING lobbies
`, peerID, secret, gameID).Scan(&lobbies)
if err != nil {
return false, err
if errors.Is(err, pgx.ErrNoRows) {
return false, nil, nil
}
return false, nil, err
}
if res.RowsAffected() == 0 {
return false, nil
if len(lobbies) == 0 {
lobbies = nil
}
return true, nil
return true, lobbies, nil
}

func (s *PostgresStore) ClaimNextTimedOutPeer(ctx context.Context, threshold time.Duration, callback func(peerID, gameID string, lobbies []string) error) (more bool, err error) {
Expand Down
2 changes: 1 addition & 1 deletion internal/signaling/stores/shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type Store interface {
Publish(ctx context.Context, topic string, data []byte) error

TimeoutPeer(ctx context.Context, peerID, secret, gameID string, lobbies []string) error
ReconnectPeer(ctx context.Context, peerID, secret, gameID string) (bool, error)
ReconnectPeer(ctx context.Context, peerID, secret, gameID string) (bool, []string, error)
ClaimNextTimedOutPeer(ctx context.Context, threshold time.Duration, callback func(peerID, gameID string, lobbies []string) error) (bool, error)
}

Expand Down
10 changes: 7 additions & 3 deletions internal/signaling/timeout_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,14 +83,18 @@ func (i *TimeoutManager) Disconnected(ctx context.Context, p *Peer) {
return
}

logger.Debug("peer marked as disconnected", zap.String("id", p.ID))
err := i.Store.TimeoutPeer(ctx, p.ID, p.Secret, p.Game, []string{p.Lobby})
logger.Debug("peer marked as disconnected", zap.String("id", p.ID), zap.String("lobby", p.Lobby))
lobbies := []string{}
if p.Lobby != "" {
lobbies = []string{p.Lobby}
}
err := i.Store.TimeoutPeer(ctx, p.ID, p.Secret, p.Game, lobbies)
if err != nil {
logger.Error("failed to record timeout peer", zap.Error(err))
}
}

func (i *TimeoutManager) Reconnected(ctx context.Context, p *Peer) (bool, error) {
func (i *TimeoutManager) Reconnected(ctx context.Context, p *Peer) (bool, []string, error) {
logger := logging.GetLogger(ctx)

logger.Debug("peer marked as reconnected", zap.String("id", p.ID))
Expand Down
1 change: 0 additions & 1 deletion internal/signaling/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ type HelloPacket struct {
Game string `json:"game"`
ID string `json:"id"`
Secret string `json:"secret"`
Lobby string `json:"lobby"`
}

type WelcomePacket struct {
Expand Down
3 changes: 1 addition & 2 deletions lib/signaling.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,7 @@ export default class Signaling extends EventEmitter<SignalingListeners> {
type: 'hello',
game: this.network.gameID,
id: this.receivedID,
secret: this.receivedSecret,
lobby: this.currentLobby
secret: this.receivedSecret
})
}
const onError = (e: Event): void => {
Expand Down
1 change: 0 additions & 1 deletion lib/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ export interface HelloPacket extends Base {
game: string
id?: string
secret?: string
lobby?: string
}

export interface WelcomePacket extends Base {
Expand Down

0 comments on commit 01d2791

Please sign in to comment.