Skip to content

Commit

Permalink
fix remotesub demand changes detection
Browse files Browse the repository at this point in the history
  • Loading branch information
totegamma committed Nov 8, 2023
1 parent 4c0173e commit c9c24a1
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 23 deletions.
45 changes: 25 additions & 20 deletions x/socket/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ func (m *manager) Subscribe(conn *websocket.Conn, streams []string) {
// Unsubscribe unsubscribes a client from a stream
func (m *manager) Unsubscribe(conn *websocket.Conn) {
if _, ok := m.clientSubs[conn]; ok {
log.Printf("[remote] unsubscribe: %v", conn.RemoteAddr())
delete(m.clientSubs, conn)
}
}
Expand All @@ -104,34 +103,40 @@ func (m *manager) GetAllRemoteSubs() []string {
return allSubs
}

// update m.remoteSubs
// also update remoteConns if needed
func (m *manager) createInsufficientSubs() {
currentSubs := make(map[string][]string)
currentSubs := make(map[string]bool)
for _, streams := range m.clientSubs {
for _, stream := range streams {
split := strings.Split(stream, "@")
if len(split) != 2 {
continue
}
domain := split[1]
if _, ok := currentSubs[domain]; !ok {
currentSubs[domain] = append(currentSubs[domain], stream)
}
currentSubs[stream] = true
}
}

// on this func, update only if there is a new subscription
// update remoteSubs
// only add new subscriptions
// also detect remote subscription changes
changedRemotes := make([]string, 0)
for domain, streams := range currentSubs {
for stream := range currentSubs {
split := strings.Split(stream, "@")
if len(split) != 2 {
continue
}
domain := split[1]

if domain == m.config.Concurrent.FQDN {
continue
}

if _, ok := m.remoteSubs[domain]; !ok {
m.remoteSubs[domain] = streams
changedRemotes = append(changedRemotes, domain)
m.remoteSubs[domain] = []string{stream}
if !slices.Contains(changedRemotes, domain) {
changedRemotes = append(changedRemotes, domain)
}
} else {
for _, stream := range streams {
if !slices.Contains(m.remoteSubs[domain], stream) {
m.remoteSubs[domain] = append(m.remoteSubs[domain], stream)
if !slices.Contains(m.remoteSubs[domain], stream) {
m.remoteSubs[domain] = append(m.remoteSubs[domain], stream)
if !slices.Contains(changedRemotes, domain) {
changedRemotes = append(changedRemotes, domain)
}
}
Expand Down Expand Up @@ -214,7 +219,7 @@ func (m *manager) RemoteSubRoutine(domain string, streams []string) {
c.Close()
}
delete(m.remoteConns, domain)
log.Printf("##### remote connection closed: %s", domain)
log.Printf("[remote ws.reader] remote connection closed: %s", domain)
}()
for {
// check if the connection is still alive
Expand All @@ -240,7 +245,7 @@ func (m *manager) RemoteSubRoutine(domain string, streams []string) {
}
pingTicker.Stop()
delete(m.remoteConns, domain)
log.Printf("##### remote connection closed: %s", domain)
log.Printf("[remote ws.publisher] remote connection closed: %s", domain)
}()

var lastPong time.Time = time.Now()
Expand Down Expand Up @@ -301,7 +306,7 @@ func (m *manager) RemoteSubRoutine(domain string, streams []string) {
}
err := m.remoteConns[domain].WriteJSON(request)
if err != nil {
log.Printf("fail to send subscribe request to remote server %v: %v", domain, err)
log.Printf("[remote] fail to send subscribe request to remote server %v: %v", domain, err)
delete(m.remoteConns, domain)
return
}
Expand Down
3 changes: 0 additions & 3 deletions x/stream/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,6 @@ func (r *repository) GetChunksFromRemote(ctx context.Context, host string, strea
for streamID, chunk := range chunkResp.Content {
if slices.Contains(currentSubsciptions, streamID) {
cacheChunks[streamID] = chunk
log.Printf("stream %s is subscribed", streamID)
} else {
log.Printf("stream %s is not subscribed", streamID)
}
}

Expand Down

0 comments on commit c9c24a1

Please sign in to comment.