From db2a49a0ccaaba0fe07a022eb96727fc0dfc24d3 Mon Sep 17 00:00:00 2001 From: Zac Bergquist Date: Fri, 12 Apr 2024 15:43:54 -0600 Subject: [PATCH] [v15] player: fix seeking during dead time (#40442) * player: fix seeking during dead time This addresses a bug in the new player API where the player does not respond to seeking during large periods of "dead time." For example, if there is 30 seconds of dead time between two events, the player is effectively in a 30 second sleep. Attempting to change the position of playback during this 30-second period works, but is not observed until the sleep completes. With this change, the sleep period is canceled when the user seeks to a new position. Fixes #38560 * Improve UX when the user seeks while paused Restart the stream in this case rather than trying to keep server and client state in sync. * Fix * Update desktop player * Lint/test fixes * increase timeout * Use a real clock for the new test --- lib/player/player.go | 143 +++++++++++++++--- lib/player/player_test.go | 52 +++++++ lib/web/tty_playback.go | 2 + .../teleport/src/Player/DesktopPlayer.tsx | 2 +- .../teleport/src/Player/SshPlayer.tsx | 2 +- .../teleport/src/lib/tdp/playerClient.ts | 68 +++++++-- .../teleport/src/lib/term/ttyPlayer.js | 70 +++++++-- 7 files changed, 287 insertions(+), 52 deletions(-) diff --git a/lib/player/player.go b/lib/player/player.go index f043430ff61e3..6941446ac5a86 100644 --- a/lib/player/player.go +++ b/lib/player/player.go @@ -58,6 +58,7 @@ type Player struct { advanceTo atomic.Int64 emit chan events.AuditEvent + wake chan int64 done chan struct{} // playPause holds a channel to be closed when @@ -119,6 +120,7 @@ func New(cfg *Config) (*Player, error) { streamer: cfg.Streamer, emit: make(chan events.AuditEvent, 1024), playPause: make(chan chan struct{}, 1), + wake: make(chan int64), done: make(chan struct{}), } @@ -207,7 +209,13 @@ func (p *Player) stream() { // time we were advanced to lastDelay = adv } - if err := p.applyDelay(time.Duration(currentDelay-lastDelay) * time.Millisecond); err != nil { + + switch err := p.applyDelay(lastDelay, currentDelay); { + case errors.Is(err, errSeekWhilePaused): + p.log.Debug("seeked during pause, will restart stream") + go p.stream() + return + case err != nil: close(p.emit) return } @@ -269,32 +277,113 @@ func (p *Player) Play() error { // from the beginning. A duration greater than the length of the session // will cause playback to rapidly advance to the end of the recording. func (p *Player) SetPos(d time.Duration) error { + // we use a negative value to indicate rewinding, which means we can't + // rewind to position 0 (there is no negative 0) + if d == 0 { + d = 1 * time.Millisecond + } if d.Milliseconds() < p.lastPlayed.Load() { - // if we're rewinding we store a negative value d = -1 * d } p.advanceTo.Store(d.Milliseconds()) + + // try to wake up the player if it's waiting to emit an event + select { + case p.wake <- d.Milliseconds(): + default: + } + return nil } -// applyDelay "sleeps" for d in a manner that -// can be canceled -func (p *Player) applyDelay(d time.Duration) error { - scaled := float64(d) / p.speed.Load().(float64) - select { - case <-p.done: - return errClosed - case <-p.clock.After(time.Duration(scaled)): - return nil +// applyDelay applies the timing delay between the last emitted event +// (lastDelay) and the event that will be emitted next (currentDelay). +// +// The delay can be interrupted by: +// 1. The player being closed. +// 2. The user pausing playback. +// 3. The user seeking to a new position in the playback (SetPos) +// +// A nil return value indicates that the delay has elapsed and that +// the next even can be emitted. +func (p *Player) applyDelay(lastDelay, currentDelay int64) error { +loop: + for { + // TODO(zmb3): changing play speed during a long sleep + // will not apply until after the sleep completes + speed := p.speed.Load().(float64) + scaled := float64(currentDelay-lastDelay) / speed + + timer := p.clock.NewTimer(time.Duration(scaled) * time.Millisecond) + defer timer.Stop() + + start := time.Now() + + select { + case <-p.done: + return errClosed + case newPos := <-p.wake: + // the sleep was interrupted due to the user changing playback controls + switch { + case newPos == interruptForPause: + // the user paused playback while we were waiting to emit the next event: + // 1) figure out much of the sleep we completed + dur := float64(time.Since(start).Milliseconds()) * speed + + // 2) wait here until the user resumes playback + if err := p.waitWhilePaused(); errors.Is(err, errSeekWhilePaused) { + // the user changed the playback position, so consider the delay + // applied and let the player pick up from the new position + return errSeekWhilePaused + } + + // now that we're playing again, update our delay to account + // for the portion that was already satisfied and apply the + // remaining delay + lastDelay += int64(dur) + timer.Stop() + continue loop + case newPos > currentDelay: + // the user scrubbed forward in time past the current event, + // so we can return as if the delay has elapsed naturally + return nil + case newPos < 0: + // the user has rewinded playback, which means we need to restart + // the stream and can consider this delay as having elapsed naturally + return nil + case newPos < currentDelay: + // the user has scrubbed forward in time, but not enough to + // emit the next event - we need to delay more + lastDelay = newPos + timer.Stop() + continue loop + default: + return nil + } + + case <-timer.Chan(): + return nil + } } } +// interruptForPause is a special value used to interrupt the player's +// sleep due to the user pausing playback. +const interruptForPause = math.MaxInt64 + func (p *Player) setPlaying(play bool) { ch := <-p.playPause alreadyPlaying := ch == nil if alreadyPlaying && !play { ch = make(chan struct{}) + + // try to wake up the player if it's waiting to emit an event + select { + case p.wake <- interruptForPause: + default: + } + } else if !alreadyPlaying && play { // signal waiters who are paused that it's time to resume playing close(ch) @@ -304,20 +393,34 @@ func (p *Player) setPlaying(play bool) { p.playPause <- ch } +var errSeekWhilePaused = errors.New("player seeked during pause") + // waitWhilePaused blocks while the player is in a paused state. // It returns immediately if the player is currently playing. func (p *Player) waitWhilePaused() error { - ch := <-p.playPause - p.playPause <- ch - - if alreadyPlaying := ch == nil; !alreadyPlaying { - select { - case <-p.done: - return errClosed - case <-ch: + seeked := false + for { + ch := <-p.playPause + p.playPause <- ch + + if alreadyPlaying := ch == nil; !alreadyPlaying { + select { + case <-p.done: + return errClosed + case <-p.wake: + // seek while paused, this can happen an unlimited number of times, + // we just keep waiting until we're unpaused + seeked = true + continue + case <-ch: + // we have been unpaused + } + } + if seeked { + return errSeekWhilePaused } + return nil } - return nil } // LastPlayed returns the time of the last played event, diff --git a/lib/player/player_test.go b/lib/player/player_test.go index 44414ad83ebce..60de2184b0ba5 100644 --- a/lib/player/player_test.go +++ b/lib/player/player_test.go @@ -174,6 +174,7 @@ func TestClose(t *testing.T) { func TestSeekForward(t *testing.T) { clk := clockwork.NewFakeClock() + p, err := player.New(&player.Config{ Clock: clk, SessionID: "test-session", @@ -210,6 +211,57 @@ func TestSeekForward(t *testing.T) { } } +func TestSeekForwardTwice(t *testing.T) { + clk := clockwork.NewRealClock() + p, err := player.New(&player.Config{ + Clock: clk, + SessionID: "test-session", + Streamer: &simpleStreamer{count: 1, delay: 6000}, + }) + require.NoError(t, err) + t.Cleanup(func() { p.Close() }) + require.NoError(t, p.Play()) + + time.Sleep(100 * time.Millisecond) + p.SetPos(500 * time.Millisecond) + time.Sleep(100 * time.Millisecond) + p.SetPos(5900 * time.Millisecond) + + select { + case <-p.C(): + case <-time.After(5 * time.Second): + require.FailNow(t, "event not emitted on time") + } +} + +// TestInterruptsDelay tests that the player responds to playback +// controls even when it is waiting to emit an event. +func TestInterruptsDelay(t *testing.T) { + clk := clockwork.NewFakeClock() + p, err := player.New(&player.Config{ + Clock: clk, + SessionID: "test-session", + Streamer: &simpleStreamer{count: 3, delay: 5000}, + }) + require.NoError(t, err) + require.NoError(t, p.Play()) + + t.Cleanup(func() { p.Close() }) + + clk.BlockUntil(1) // player is now waiting to emit event 0 + + // emulate the user seeking forward while the player is waiting.. + p.SetPos(10_001 * time.Millisecond) + + // expect event 0 and event 1 to be emitted right away + // even without advancing the clock + evt0 := <-p.C() + evt1 := <-p.C() + + require.Equal(t, int64(0), evt0.GetIndex()) + require.Equal(t, int64(1), evt1.GetIndex()) +} + func TestRewind(t *testing.T) { clk := clockwork.NewFakeClock() p, err := player.New(&player.Config{ diff --git a/lib/web/tty_playback.go b/lib/web/tty_playback.go index 57e3c830bda27..232ec37e525bf 100644 --- a/lib/web/tty_playback.go +++ b/lib/web/tty_playback.go @@ -221,6 +221,8 @@ func (h *Handler) ttyPlaybackHandle( return } + case *events.SessionLeave: // do nothing + default: h.log.Debugf("unexpected event type %T", evt) } diff --git a/web/packages/teleport/src/Player/DesktopPlayer.tsx b/web/packages/teleport/src/Player/DesktopPlayer.tsx index c7602e688d402..1e2c1b3a1e2d9 100644 --- a/web/packages/teleport/src/Player/DesktopPlayer.tsx +++ b/web/packages/teleport/src/Player/DesktopPlayer.tsx @@ -122,8 +122,8 @@ export const DesktopPlayer = ({ onRestart={reload} onStartMove={() => playerClient.suspendTimeUpdates()} move={pos => { - playerClient.seekTo(pos); playerClient.resumeTimeUpdates(); + playerClient.seekTo(pos); }} onPlaySpeedChange={s => playerClient.setPlaySpeed(s)} toggle={() => playerClient.togglePlayPause()} diff --git a/web/packages/teleport/src/Player/SshPlayer.tsx b/web/packages/teleport/src/Player/SshPlayer.tsx index 49dd5976ecc6f..878bc1c230caa 100644 --- a/web/packages/teleport/src/Player/SshPlayer.tsx +++ b/web/packages/teleport/src/Player/SshPlayer.tsx @@ -70,8 +70,8 @@ export default function Player({ sid, clusterId, durationMs }) { onRestart={() => window.location.reload()} onStartMove={() => tty.suspendTimeUpdates()} move={pos => { - tty.move(pos); tty.resumeTimeUpdates(); + tty.move(pos); }} toggle={() => { isPlaying ? tty.stop() : tty.play(); diff --git a/web/packages/teleport/src/lib/tdp/playerClient.ts b/web/packages/teleport/src/lib/tdp/playerClient.ts index bbba36bc9297d..1b8473d03b0c3 100644 --- a/web/packages/teleport/src/lib/tdp/playerClient.ts +++ b/web/packages/teleport/src/lib/tdp/playerClient.ts @@ -44,9 +44,12 @@ export class PlayerClient extends Client { private speed = 1.0; private paused = false; + private lastPlayedTimestamp = 0; + private skipTimeUpdatesUntil = null; + private lastTimestamp = 0; private sendTimeUpdates = true; - private lastUpdate = 0; + private lastUpdateTime = 0; private timeout = null; constructor({ url, setTime, setPlayerStatus, setStatusText }) { @@ -62,6 +65,8 @@ export class PlayerClient extends Client { if (this.sendTimeUpdates) { this._setTime(t); } + this.lastTimestamp = t; + this.lastUpdateTime = Date.now(); }, PROGRESS_UPDATE_INTERVAL_MS); } @@ -73,11 +78,11 @@ export class PlayerClient extends Client { scheduleNextUpdate(current: number) { this.timeout = setTimeout(() => { - const delta = Date.now() - this.lastUpdate; + const delta = Date.now() - this.lastUpdateTime; const next = current + delta * this.speed; this.setTime(next); - this.lastUpdate = Date.now(); + this.lastUpdateTime = Date.now(); this.scheduleNextUpdate(next); }, PROGRESS_UPDATE_INTERVAL_MS); @@ -101,11 +106,21 @@ export class PlayerClient extends Client { // togglePlayPause toggles the playback system between "playing" and "paused" states. togglePlayPause() { this.paused = !this.paused; - this.send(JSON.stringify({ action: Action.TOGGLE_PLAY_PAUSE })); + this.setPlayerStatus(this.paused ? StatusEnum.PAUSED : StatusEnum.PLAYING); if (this.paused) { this.cancelTimeUpdate(); } - this.setPlayerStatus(this.paused ? StatusEnum.PAUSED : StatusEnum.PLAYING); + + this.lastUpdateTime = Date.now(); + + if (this.isSeekingForward()) { + const next = Math.max(this.skipTimeUpdatesUntil, this.lastTimestamp); + this.scheduleNextUpdate(next); + } else { + this.scheduleNextUpdate(this.lastTimestamp); + } + + this.send(JSON.stringify({ action: Action.TOGGLE_PLAY_PAUSE })); } // setPlaySpeed sets the playback speed of the recording. @@ -126,12 +141,29 @@ export class PlayerClient extends Client { } else { this.cancelTimeUpdate(); this.lastPlayedTimestamp = json.ms; - this.lastUpdate = Date.now(); + this.lastUpdateTime = Date.now(); this.setTime(json.ms); - // schedule the next time update (in case this - // part of the recording is dead time) - if (!this.paused) { + // clear seek state if we caught up to the seek point + if ( + this.skipTimeUpdatesUntil !== null && + this.lastPlayedTimestamp >= this.skipTimeUpdatesUntil + ) { + this.skipTimeUpdatesUntil = null; + } + + if (!this.isSeekingForward()) { + this.cancelTimeUpdate(); + } + + // schedule the next time update, which ensures that + // the progress bar continues to update even if this + // section of the recording is idle time + // + // note: we don't schedule an update if we're currently + // seeking forward in time, as we're trying to get there + // as quickly as possible + if (!this.paused && !this.isSeekingForward()) { this.scheduleNextUpdate(json.ms); } @@ -144,16 +176,24 @@ export class PlayerClient extends Client { this.send(JSON.stringify({ action: Action.SEEK, pos })); + this._setTime(pos); + this.lastUpdateTime = Date.now(); + this.skipTimeUpdatesUntil = pos; + if (pos < this.lastPlayedTimestamp) { // TODO: clear canvas - } else if (this.paused) { - // if we're paused, we want the scrubber to "stick" at the new - // time until we press play (rather than waiting for us to click - // play and start receiving new data) - this._setTime(pos); + } else if (!this.paused) { + this.scheduleNextUpdate(pos); } } + isSeekingForward(): boolean { + return ( + this.skipTimeUpdatesUntil !== null && + this.skipTimeUpdatesUntil > this.lastPlayedTimestamp + ); + } + // Overrides Client implementation. handleClientScreenSpec(buffer: ArrayBuffer) { this.emit( diff --git a/web/packages/teleport/src/lib/term/ttyPlayer.js b/web/packages/teleport/src/lib/term/ttyPlayer.js index d7f04846824c6..b231e83c1c7ce 100644 --- a/web/packages/teleport/src/lib/term/ttyPlayer.js +++ b/web/packages/teleport/src/lib/term/ttyPlayer.js @@ -50,10 +50,12 @@ export default class TtyPlayer extends Tty { this._paused = false; this._lastPlayedTimestamp = 0; + this._skipTimeUpdatesUntil = null; this._sendTimeUpdates = true; this._setTime = throttle(t => setTime(t), PROGRESS_UPDATE_INTERVAL_MS); - this._lastUpdate = 0; + this._lastUpdateTime = 0; + this._lastTimestamp = 0; this._timeout = null; } @@ -97,6 +99,9 @@ export default class TtyPlayer extends Tty { if (this._sendTimeUpdates) { this._setTime(t); } + + this._lastTimestamp = t; + this._lastUpdateTime = Date.now(); } disconnect(closeCode = WebsocketCloseCode.NORMAL) { @@ -108,10 +113,11 @@ export default class TtyPlayer extends Tty { scheduleNextUpdate(current) { this._timeout = setTimeout(() => { - const delta = Date.now() - this._lastUpdate; + const delta = Date.now() - this._lastUpdateTime; const next = current + delta; + this.setTime(next); - this._lastUpdate = Date.now(); + this._lastUpdateTime = Date.now(); this.scheduleNextUpdate(next); }, PROGRESS_UPDATE_INTERVAL_MS); @@ -133,8 +139,6 @@ export default class TtyPlayer extends Tty { // see lib/web/tty_playback.go for details on this protocol switch (typ) { case messageTypePty: - this.cancelTimeUpdate(); - const delay = Number(dv.getBigUint64(3)); const data = dv.buffer.slice( dv.byteOffset + 11, @@ -143,13 +147,29 @@ export default class TtyPlayer extends Tty { this.emit(TermEvent.DATA, data); this._lastPlayedTimestamp = delay; - - this._lastUpdate = Date.now(); + this._lastUpdateTime = Date.now(); this.setTime(delay); - // schedule the next time update (in case this - // part of the recording is dead time) - if (!this._paused) { + // clear seek state if we caught up to the seek point + if ( + this._skipTimeUpdatesUntil !== null && + this._lastPlayedTimestamp >= this._skipTimeUpdatesUntil + ) { + this._skipTimeUpdatesUntil = null; + } + + if (!this.isSeekingForward()) { + this.cancelTimeUpdate(); + } + + // schedule the next time update, which ensures that + // the progress bar continues to update even if this + // section of the recording is idle time + // + // note: we don't schedule an update if we're currently + // seeking forward in time, as we're trying to get there + // as quickly as possible + if (!this._paused && !this.isSeekingForward()) { this.scheduleNextUpdate(delay); } break; @@ -187,7 +207,6 @@ export default class TtyPlayer extends Tty { move(newPos) { this.cancelTimeUpdate(); - try { const buffer = new ArrayBuffer(11); const dv = new DataView(buffer); @@ -199,16 +218,26 @@ export default class TtyPlayer extends Tty { logger.error('error seeking', e); } + this._setTime(newPos); + this._lastUpdateTime = Date.now(); + this._skipTimeUpdatesUntil = newPos; + if (newPos < this._lastPlayedTimestamp) { this.emit(TermEvent.RESET); - } else if (this._paused) { - // if we're paused, we want the scrubber to "stick" at the new - // time until we press play (rather than waiting for us to click - // play and start receiving new data) - this._setTime(newPos); + } else { + if (!this._paused) { + this.scheduleNextUpdate(newPos); + } } } + isSeekingForward() { + return ( + this._skipTimeUpdatesUntil !== null && + this._skipTimeUpdatesUntil > this._lastPlayedTimestamp + ); + } + stop() { this._paused = true; this.cancelTimeUpdate(); @@ -226,6 +255,15 @@ export default class TtyPlayer extends Tty { this._paused = false; this._setPlayerStatus(StatusEnum.PLAYING); + this._lastUpdateTime = Date.now(); + + if (this.isSeekingForward()) { + const next = Math.max(this._skipTimeUpdatesUntil, this._lastTimestamp); + this.scheduleNextUpdate(next); + } else { + this.scheduleNextUpdate(this._lastTimestamp); + } + // the very first play call happens before we've even // connected - we only need to send the websocket message // for subsequent calls