diff --git a/pkg/hls/server_handler.go b/pkg/hls/server_handler.go index 350182ed..36f5c59c 100644 --- a/pkg/hls/server_handler.go +++ b/pkg/hls/server_handler.go @@ -80,14 +80,17 @@ func (s *ServerHandler) ServeHTTPWithUrlCtx(resp http.ResponseWriter, req *http. if s.isSubSessionModeEnable() { sessionIdHash = urlObj.Query().Get("session_id") if filetype == "ts" && sessionIdHash != "" { - // 注意,为了增强容错性,不管是session_id字段无效,还是session_id为空,我们都依然返回ts文件内容给播放端 if sessionIdHash != "" { err = s.keepSessionAlive(sessionIdHash) if err != nil { Log.Warnf("keepSessionAlive failed. session=%s, err=%+v", sessionIdHash, err) + resp.WriteHeader(http.StatusNotFound) + return } } else { - // noop + Log.Warnf("session_id not exist. session=%s, err=%+v", sessionIdHash, err) + resp.WriteHeader(http.StatusNotFound) + return } } else if filetype == "m3u8" { neededRedirect := false @@ -96,7 +99,8 @@ func (s *ServerHandler) ServeHTTPWithUrlCtx(resp http.ResponseWriter, req *http. err = s.keepSessionAlive(sessionIdHash) if err != nil { Log.Warnf("keepSessionAlive failed. session=%s, err=%+v", sessionIdHash, err) - neededRedirect = true + resp.WriteHeader(http.StatusNotFound) + return } } else { neededRedirect = true @@ -201,7 +205,7 @@ func (s *ServerHandler) clearExpireSession() { s.mutex.Lock() defer s.mutex.Unlock() for sessionIdHash, session := range s.sessionMap { - if session.IsExpired() { + if session.IsExpired() || session.IsDisposed() { delete(s.sessionMap, sessionIdHash) s.observer.OnDelHlsSubSession(session) } diff --git a/pkg/hls/server_sub_session.go b/pkg/hls/server_sub_session.go index 232659c4..35aeafc8 100644 --- a/pkg/hls/server_sub_session.go +++ b/pkg/hls/server_sub_session.go @@ -9,6 +9,7 @@ package hls import ( + "github.com/q191201771/naza/pkg/nazaatomic" "net/http" "strings" "time" @@ -29,6 +30,8 @@ type SubSession struct { stat base.StatSession prevStat connection.Stat currStat connection.StatAtomic + + disposedFlag nazaatomic.Bool } func (s *SubSession) UniqueKey() string { @@ -105,10 +108,18 @@ func (s *SubSession) IsExpired() bool { return s.LastRequestTime.Add(s.timeout).Before(time.Now()) } +func (s *SubSession) IsDisposed() bool { + return s.disposedFlag.Load() +} + func (s *SubSession) KeepAlive() { s.LastRequestTime = time.Now() } +func (s *SubSession) Dispose() { + s.disposedFlag.Store(true) +} + func GetAppNameFromUrlCtx(urlCtx base.UrlContext, hlsUrlPattern string) string { if hlsUrlPattern == "" { return urlCtx.PathWithoutLastItem diff --git a/pkg/logic/group__.go b/pkg/logic/group__.go index d3580ebc..887392d9 100644 --- a/pkg/logic/group__.go +++ b/pkg/logic/group__.go @@ -379,6 +379,13 @@ func (group *Group) KickSession(sessionId string) bool { return true } } + } else if strings.HasPrefix(sessionId, base.UkPreHlsSubSession) { + for s := range group.hlsSubSessionSet { + if s.UniqueKey() == sessionId { + s.Dispose() + return true + } + } } else { Log.Errorf("[%s] kick session while session id format invalid. %s", group.UniqueKey, sessionId) }