Skip to content

Commit

Permalink
Have a topic per lobby
Browse files Browse the repository at this point in the history
Instead of always getting a list of peers and pushing a message to each
of then, we Subscribe on a lobby topic as well so we can publish on
that.
  • Loading branch information
erikdubbelboer committed Jun 16, 2024
1 parent 9ad3c08 commit 2c4aa09
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 46 deletions.
2 changes: 1 addition & 1 deletion features/basic.feature
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ Feature: Players can create and connect a network of players
When "green" connects to the lobby "dhgp75mn2bll"
And "blue" receives the network event "connected" with the argument "[Peer: ka9qy8em4vxr]"
And "yellow" receives the network event "connected" with the argument "[Peer: ka9qy8em4vxr]"
And "green" receives the network event "connected" with the argument "[Peer: h5yzwyizlwao]"
And "green" receives the network event "connected" with the argument "[Peer: 3t3cfgcqup9e]"
And "green" receives the network event "connected" with the argument "[Peer: h5yzwyizlwao]"

When "blue" boardcasts "Hello, world!" over the reliable channel
Then "yellow" receives the network event "message" with the arguments "[Peer: ka9qy8em4vxr]", "reliable" and "Hello, world!"
Expand Down
33 changes: 14 additions & 19 deletions internal/signaling/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,17 +42,13 @@ func (p *Peer) Close(ctx context.Context) {
ctx, cancel := context.WithTimeout(ctx, 1*time.Minute)
defer cancel()

others, err := p.store.LeaveLobby(ctx, p.Game, p.Lobby, p.ID)
err := p.store.LeaveLobby(ctx, p.Game, p.Lobby, p.ID)
if err != nil {
logger.Warn("failed to leave lobby", zap.Error(err))
} else {
for _, id := range others {
if id != p.ID {
err := p.store.Publish(ctx, p.Game+p.Lobby+id, data)
if err != nil {
logger.Error("failed to publish disconnect packet", zap.Error(err))
}
}
err := p.store.Publish(ctx, p.Game+p.Lobby, data)
if err != nil {
logger.Error("failed to publish disconnect packet", zap.Error(err))
}
}
}
Expand Down Expand Up @@ -247,6 +243,7 @@ func (p *Peer) HandleHelloPacket(ctx context.Context, packet HelloPacket) error
if inLobby {
logger.Info("peer rejoining lobby", zap.String("game", p.Game), zap.String("peer", p.ID), zap.String("lobby", p.Lobby))
p.Lobby = lobby
p.store.Subscribe(ctx, p.Game+p.Lobby, p.ForwardMessage)
p.store.Subscribe(ctx, p.Game+p.Lobby+p.ID, p.ForwardMessage)
go metrics.Record(ctx, "lobby", "reconnected", p.Game, p.ID, p.Lobby)
} else {
Expand Down Expand Up @@ -283,7 +280,7 @@ func (p *Peer) HandleClosePacket(ctx context.Context, packet ClosePacket) error
)

if p.Lobby != "" {
others, err := p.store.LeaveLobby(ctx, p.Game, p.Lobby, p.ID)
err := p.store.LeaveLobby(ctx, p.Game, p.Lobby, p.ID)
if err != nil {
return fmt.Errorf("unable to leave lobby: %w", err)
}
Expand All @@ -293,13 +290,9 @@ func (p *Peer) HandleClosePacket(ctx context.Context, packet ClosePacket) error
}
data, err := json.Marshal(packet)
if err == nil {
for _, id := range others {
if id != p.ID {
err := p.store.Publish(ctx, p.Game+p.Lobby+id, data)
if err != nil {
logger.Error("failed to publish disconnect packet", zap.Error(err))
}
}
err := p.store.Publish(ctx, p.Game+p.Lobby, data)
if err != nil {
logger.Error("failed to publish disconnect packet", zap.Error(err))
}
}
p.Lobby = ""
Expand Down Expand Up @@ -359,6 +352,7 @@ func (p *Peer) HandleCreatePacket(ctx context.Context, packet CreatePacket) erro
return fmt.Errorf("unable to create lobby, too many attempts to find a unique code")
}

p.store.Subscribe(ctx, p.Game+p.Lobby, p.ForwardMessage)
p.store.Subscribe(ctx, p.Game+p.Lobby+p.ID, p.ForwardMessage)

lobby, err := p.store.GetLobby(ctx, p.Game, p.Lobby)
Expand Down Expand Up @@ -392,7 +386,7 @@ func (p *Peer) HandleJoinPacket(ctx context.Context, packet JoinPacket) error {
return fmt.Errorf("lobby code too long")
}

others, err := p.store.JoinLobby(ctx, p.Game, packet.Lobby, p.ID)
err := p.store.JoinLobby(ctx, p.Game, packet.Lobby, p.ID)
if err != nil {
return err
}
Expand All @@ -403,6 +397,7 @@ func (p *Peer) HandleJoinPacket(ctx context.Context, packet JoinPacket) error {
}

p.Lobby = packet.Lobby
p.store.Subscribe(ctx, p.Game+p.Lobby, p.ForwardMessage)
p.store.Subscribe(ctx, p.Game+p.Lobby+p.ID, p.ForwardMessage)

err = p.Send(ctx, JoinedPacket{
Expand All @@ -415,7 +410,7 @@ func (p *Peer) HandleJoinPacket(ctx context.Context, packet JoinPacket) error {
return err
}

for _, otherID := range others {
for _, otherID := range lobby.Peers {
err := p.RequestConnection(ctx, otherID)
if err != nil {
return err
Expand All @@ -426,7 +421,7 @@ func (p *Peer) HandleJoinPacket(ctx context.Context, packet JoinPacket) error {
zap.String("game", p.Game),
zap.String("lobby", p.Lobby),
zap.String("peer", p.ID),
zap.Strings("others", others))
zap.Strings("peers", lobby.Peers))
go metrics.Record(ctx, "lobby", "joined", p.Game, p.ID, p.Lobby)

return nil
Expand Down
30 changes: 14 additions & 16 deletions internal/signaling/stores/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,18 +168,18 @@ func (s *PostgresStore) CreateLobby(ctx context.Context, game, lobbyCode, peerID
return nil
}

func (s *PostgresStore) JoinLobby(ctx context.Context, game, lobbyCode, peerID string) ([]string, error) {
func (s *PostgresStore) JoinLobby(ctx context.Context, game, lobbyCode, peerID string) error {
if len(peerID) > 20 {
logger := logging.GetLogger(ctx)
logger.Warn("peer id too long", zap.String("peerID", peerID))
return nil, ErrInvalidPeerID
return ErrInvalidPeerID
}

now := util.Now(ctx)

tx, err := s.DB.Begin(ctx)
if err != nil {
return nil, err
return err
}
defer tx.Rollback(context.Background()) //nolint:errcheck

Expand All @@ -193,14 +193,14 @@ func (s *PostgresStore) JoinLobby(ctx context.Context, game, lobbyCode, peerID s
`, lobbyCode, game).Scan(&peerlist)
if err != nil {
if errors.Is(err, pgx.ErrNoRows) {
return nil, ErrNotFound
return ErrNotFound
}
return nil, err
return err
}

for _, peer := range peerlist {
if peer == peerID {
return nil, ErrAlreadyInLobby
return ErrAlreadyInLobby
}
}

Expand All @@ -213,15 +213,15 @@ func (s *PostgresStore) JoinLobby(ctx context.Context, game, lobbyCode, peerID s
AND game = $4
`, peerID, now, lobbyCode, game)
if err != nil {
return nil, err
return err
}

err = tx.Commit(ctx)
if err != nil {
return nil, err
return err
}

return peerlist, nil
return nil
}

func (s *PostgresStore) IsPeerInLobby(ctx context.Context, game, lobbyCode, peerID string) (bool, error) {
Expand All @@ -239,23 +239,21 @@ func (s *PostgresStore) IsPeerInLobby(ctx context.Context, game, lobbyCode, peer
return count > 0, nil
}

func (s *PostgresStore) LeaveLobby(ctx context.Context, game, lobbyCode, peerID string) ([]string, error) {
func (s *PostgresStore) LeaveLobby(ctx context.Context, game, lobbyCode, peerID string) error {
now := util.Now(ctx)

var peerlist []string
err := s.DB.QueryRow(ctx, `
_, err := s.DB.Exec(ctx, `
UPDATE lobbies
SET
peers = array_remove(peers, $1),
updated_at = $2
WHERE code = $3
AND game = $4
RETURNING peers
`, peerID, now, lobbyCode, game).Scan(&peerlist)
`, peerID, now, lobbyCode, game)
if err != nil && !errors.Is(err, pgx.ErrNoRows) {
return nil, err
return err
}
return peerlist, nil
return nil
}

func (s *PostgresStore) GetLobby(ctx context.Context, game, lobbyCode string) (Lobby, error) {
Expand Down
4 changes: 2 additions & 2 deletions internal/signaling/stores/shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ type SubscriptionCallback func(context.Context, []byte)

type Store interface {
CreateLobby(ctx context.Context, game, lobby, peerID string, public bool, customData map[string]any) error
JoinLobby(ctx context.Context, game, lobby, id string) ([]string, error)
JoinLobby(ctx context.Context, game, lobby, id string) error
IsPeerInLobby(ctx context.Context, game, lobby, id string) (bool, error)
LeaveLobby(ctx context.Context, game, lobby, id string) ([]string, error)
LeaveLobby(ctx context.Context, game, lobby, id string) error
GetLobby(ctx context.Context, game, lobby string) (Lobby, error)
ListLobbies(ctx context.Context, game, filter string) ([]Lobby, error)

Expand Down
13 changes: 5 additions & 8 deletions internal/signaling/timeout_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,18 +60,15 @@ func (i *TimeoutManager) disconnectPeerInLobby(ctx context.Context, peerID strin
}
data, _ := json.Marshal(packet)

others, err := i.Store.LeaveLobby(ctx, gameID, lobby, peerID)
err := i.Store.LeaveLobby(ctx, gameID, lobby, peerID)
if err != nil {
logger.Warn("failed to leave lobby", zap.Error(err))
return err
}
for _, id := range others {
if id != peerID {
err := i.Store.Publish(ctx, gameID+lobby+id, data)
if err != nil {
logger.Error("failed to publish disconnect packet", zap.Error(err))
}
}

err = i.Store.Publish(ctx, gameID+lobby, data)
if err != nil {
logger.Error("failed to publish disconnect packet", zap.Error(err))
}
return nil
}
Expand Down

0 comments on commit 2c4aa09

Please sign in to comment.