diff --git a/lib/player/player.go b/lib/player/player.go index f043430ff61e3..6941446ac5a86 100644 --- a/lib/player/player.go +++ b/lib/player/player.go @@ -58,6 +58,7 @@ type Player struct { advanceTo atomic.Int64 emit chan events.AuditEvent + wake chan int64 done chan struct{} // playPause holds a channel to be closed when @@ -119,6 +120,7 @@ func New(cfg *Config) (*Player, error) { streamer: cfg.Streamer, emit: make(chan events.AuditEvent, 1024), playPause: make(chan chan struct{}, 1), + wake: make(chan int64), done: make(chan struct{}), } @@ -207,7 +209,13 @@ func (p *Player) stream() { // time we were advanced to lastDelay = adv } - if err := p.applyDelay(time.Duration(currentDelay-lastDelay) * time.Millisecond); err != nil { + + switch err := p.applyDelay(lastDelay, currentDelay); { + case errors.Is(err, errSeekWhilePaused): + p.log.Debug("seeked during pause, will restart stream") + go p.stream() + return + case err != nil: close(p.emit) return } @@ -269,32 +277,113 @@ func (p *Player) Play() error { // from the beginning. A duration greater than the length of the session // will cause playback to rapidly advance to the end of the recording. func (p *Player) SetPos(d time.Duration) error { + // we use a negative value to indicate rewinding, which means we can't + // rewind to position 0 (there is no negative 0) + if d == 0 { + d = 1 * time.Millisecond + } if d.Milliseconds() < p.lastPlayed.Load() { - // if we're rewinding we store a negative value d = -1 * d } p.advanceTo.Store(d.Milliseconds()) + + // try to wake up the player if it's waiting to emit an event + select { + case p.wake <- d.Milliseconds(): + default: + } + return nil } -// applyDelay "sleeps" for d in a manner that -// can be canceled -func (p *Player) applyDelay(d time.Duration) error { - scaled := float64(d) / p.speed.Load().(float64) - select { - case <-p.done: - return errClosed - case <-p.clock.After(time.Duration(scaled)): - return nil +// applyDelay applies the timing delay between the last emitted event +// (lastDelay) and the event that will be emitted next (currentDelay). +// +// The delay can be interrupted by: +// 1. The player being closed. +// 2. The user pausing playback. +// 3. The user seeking to a new position in the playback (SetPos) +// +// A nil return value indicates that the delay has elapsed and that +// the next even can be emitted. +func (p *Player) applyDelay(lastDelay, currentDelay int64) error { +loop: + for { + // TODO(zmb3): changing play speed during a long sleep + // will not apply until after the sleep completes + speed := p.speed.Load().(float64) + scaled := float64(currentDelay-lastDelay) / speed + + timer := p.clock.NewTimer(time.Duration(scaled) * time.Millisecond) + defer timer.Stop() + + start := time.Now() + + select { + case <-p.done: + return errClosed + case newPos := <-p.wake: + // the sleep was interrupted due to the user changing playback controls + switch { + case newPos == interruptForPause: + // the user paused playback while we were waiting to emit the next event: + // 1) figure out much of the sleep we completed + dur := float64(time.Since(start).Milliseconds()) * speed + + // 2) wait here until the user resumes playback + if err := p.waitWhilePaused(); errors.Is(err, errSeekWhilePaused) { + // the user changed the playback position, so consider the delay + // applied and let the player pick up from the new position + return errSeekWhilePaused + } + + // now that we're playing again, update our delay to account + // for the portion that was already satisfied and apply the + // remaining delay + lastDelay += int64(dur) + timer.Stop() + continue loop + case newPos > currentDelay: + // the user scrubbed forward in time past the current event, + // so we can return as if the delay has elapsed naturally + return nil + case newPos < 0: + // the user has rewinded playback, which means we need to restart + // the stream and can consider this delay as having elapsed naturally + return nil + case newPos < currentDelay: + // the user has scrubbed forward in time, but not enough to + // emit the next event - we need to delay more + lastDelay = newPos + timer.Stop() + continue loop + default: + return nil + } + + case <-timer.Chan(): + return nil + } } } +// interruptForPause is a special value used to interrupt the player's +// sleep due to the user pausing playback. +const interruptForPause = math.MaxInt64 + func (p *Player) setPlaying(play bool) { ch := <-p.playPause alreadyPlaying := ch == nil if alreadyPlaying && !play { ch = make(chan struct{}) + + // try to wake up the player if it's waiting to emit an event + select { + case p.wake <- interruptForPause: + default: + } + } else if !alreadyPlaying && play { // signal waiters who are paused that it's time to resume playing close(ch) @@ -304,20 +393,34 @@ func (p *Player) setPlaying(play bool) { p.playPause <- ch } +var errSeekWhilePaused = errors.New("player seeked during pause") + // waitWhilePaused blocks while the player is in a paused state. // It returns immediately if the player is currently playing. func (p *Player) waitWhilePaused() error { - ch := <-p.playPause - p.playPause <- ch - - if alreadyPlaying := ch == nil; !alreadyPlaying { - select { - case <-p.done: - return errClosed - case <-ch: + seeked := false + for { + ch := <-p.playPause + p.playPause <- ch + + if alreadyPlaying := ch == nil; !alreadyPlaying { + select { + case <-p.done: + return errClosed + case <-p.wake: + // seek while paused, this can happen an unlimited number of times, + // we just keep waiting until we're unpaused + seeked = true + continue + case <-ch: + // we have been unpaused + } + } + if seeked { + return errSeekWhilePaused } + return nil } - return nil } // LastPlayed returns the time of the last played event, diff --git a/lib/player/player_test.go b/lib/player/player_test.go index 44414ad83ebce..60de2184b0ba5 100644 --- a/lib/player/player_test.go +++ b/lib/player/player_test.go @@ -174,6 +174,7 @@ func TestClose(t *testing.T) { func TestSeekForward(t *testing.T) { clk := clockwork.NewFakeClock() + p, err := player.New(&player.Config{ Clock: clk, SessionID: "test-session", @@ -210,6 +211,57 @@ func TestSeekForward(t *testing.T) { } } +func TestSeekForwardTwice(t *testing.T) { + clk := clockwork.NewRealClock() + p, err := player.New(&player.Config{ + Clock: clk, + SessionID: "test-session", + Streamer: &simpleStreamer{count: 1, delay: 6000}, + }) + require.NoError(t, err) + t.Cleanup(func() { p.Close() }) + require.NoError(t, p.Play()) + + time.Sleep(100 * time.Millisecond) + p.SetPos(500 * time.Millisecond) + time.Sleep(100 * time.Millisecond) + p.SetPos(5900 * time.Millisecond) + + select { + case <-p.C(): + case <-time.After(5 * time.Second): + require.FailNow(t, "event not emitted on time") + } +} + +// TestInterruptsDelay tests that the player responds to playback +// controls even when it is waiting to emit an event. +func TestInterruptsDelay(t *testing.T) { + clk := clockwork.NewFakeClock() + p, err := player.New(&player.Config{ + Clock: clk, + SessionID: "test-session", + Streamer: &simpleStreamer{count: 3, delay: 5000}, + }) + require.NoError(t, err) + require.NoError(t, p.Play()) + + t.Cleanup(func() { p.Close() }) + + clk.BlockUntil(1) // player is now waiting to emit event 0 + + // emulate the user seeking forward while the player is waiting.. + p.SetPos(10_001 * time.Millisecond) + + // expect event 0 and event 1 to be emitted right away + // even without advancing the clock + evt0 := <-p.C() + evt1 := <-p.C() + + require.Equal(t, int64(0), evt0.GetIndex()) + require.Equal(t, int64(1), evt1.GetIndex()) +} + func TestRewind(t *testing.T) { clk := clockwork.NewFakeClock() p, err := player.New(&player.Config{ diff --git a/lib/web/tty_playback.go b/lib/web/tty_playback.go index 57e3c830bda27..232ec37e525bf 100644 --- a/lib/web/tty_playback.go +++ b/lib/web/tty_playback.go @@ -221,6 +221,8 @@ func (h *Handler) ttyPlaybackHandle( return } + case *events.SessionLeave: // do nothing + default: h.log.Debugf("unexpected event type %T", evt) } diff --git a/web/packages/teleport/src/Player/DesktopPlayer.tsx b/web/packages/teleport/src/Player/DesktopPlayer.tsx index c7602e688d402..1e2c1b3a1e2d9 100644 --- a/web/packages/teleport/src/Player/DesktopPlayer.tsx +++ b/web/packages/teleport/src/Player/DesktopPlayer.tsx @@ -122,8 +122,8 @@ export const DesktopPlayer = ({ onRestart={reload} onStartMove={() => playerClient.suspendTimeUpdates()} move={pos => { - playerClient.seekTo(pos); playerClient.resumeTimeUpdates(); + playerClient.seekTo(pos); }} onPlaySpeedChange={s => playerClient.setPlaySpeed(s)} toggle={() => playerClient.togglePlayPause()} diff --git a/web/packages/teleport/src/Player/SshPlayer.tsx b/web/packages/teleport/src/Player/SshPlayer.tsx index 49dd5976ecc6f..878bc1c230caa 100644 --- a/web/packages/teleport/src/Player/SshPlayer.tsx +++ b/web/packages/teleport/src/Player/SshPlayer.tsx @@ -70,8 +70,8 @@ export default function Player({ sid, clusterId, durationMs }) { onRestart={() => window.location.reload()} onStartMove={() => tty.suspendTimeUpdates()} move={pos => { - tty.move(pos); tty.resumeTimeUpdates(); + tty.move(pos); }} toggle={() => { isPlaying ? tty.stop() : tty.play(); diff --git a/web/packages/teleport/src/lib/tdp/playerClient.ts b/web/packages/teleport/src/lib/tdp/playerClient.ts index bbba36bc9297d..1b8473d03b0c3 100644 --- a/web/packages/teleport/src/lib/tdp/playerClient.ts +++ b/web/packages/teleport/src/lib/tdp/playerClient.ts @@ -44,9 +44,12 @@ export class PlayerClient extends Client { private speed = 1.0; private paused = false; + private lastPlayedTimestamp = 0; + private skipTimeUpdatesUntil = null; + private lastTimestamp = 0; private sendTimeUpdates = true; - private lastUpdate = 0; + private lastUpdateTime = 0; private timeout = null; constructor({ url, setTime, setPlayerStatus, setStatusText }) { @@ -62,6 +65,8 @@ export class PlayerClient extends Client { if (this.sendTimeUpdates) { this._setTime(t); } + this.lastTimestamp = t; + this.lastUpdateTime = Date.now(); }, PROGRESS_UPDATE_INTERVAL_MS); } @@ -73,11 +78,11 @@ export class PlayerClient extends Client { scheduleNextUpdate(current: number) { this.timeout = setTimeout(() => { - const delta = Date.now() - this.lastUpdate; + const delta = Date.now() - this.lastUpdateTime; const next = current + delta * this.speed; this.setTime(next); - this.lastUpdate = Date.now(); + this.lastUpdateTime = Date.now(); this.scheduleNextUpdate(next); }, PROGRESS_UPDATE_INTERVAL_MS); @@ -101,11 +106,21 @@ export class PlayerClient extends Client { // togglePlayPause toggles the playback system between "playing" and "paused" states. togglePlayPause() { this.paused = !this.paused; - this.send(JSON.stringify({ action: Action.TOGGLE_PLAY_PAUSE })); + this.setPlayerStatus(this.paused ? StatusEnum.PAUSED : StatusEnum.PLAYING); if (this.paused) { this.cancelTimeUpdate(); } - this.setPlayerStatus(this.paused ? StatusEnum.PAUSED : StatusEnum.PLAYING); + + this.lastUpdateTime = Date.now(); + + if (this.isSeekingForward()) { + const next = Math.max(this.skipTimeUpdatesUntil, this.lastTimestamp); + this.scheduleNextUpdate(next); + } else { + this.scheduleNextUpdate(this.lastTimestamp); + } + + this.send(JSON.stringify({ action: Action.TOGGLE_PLAY_PAUSE })); } // setPlaySpeed sets the playback speed of the recording. @@ -126,12 +141,29 @@ export class PlayerClient extends Client { } else { this.cancelTimeUpdate(); this.lastPlayedTimestamp = json.ms; - this.lastUpdate = Date.now(); + this.lastUpdateTime = Date.now(); this.setTime(json.ms); - // schedule the next time update (in case this - // part of the recording is dead time) - if (!this.paused) { + // clear seek state if we caught up to the seek point + if ( + this.skipTimeUpdatesUntil !== null && + this.lastPlayedTimestamp >= this.skipTimeUpdatesUntil + ) { + this.skipTimeUpdatesUntil = null; + } + + if (!this.isSeekingForward()) { + this.cancelTimeUpdate(); + } + + // schedule the next time update, which ensures that + // the progress bar continues to update even if this + // section of the recording is idle time + // + // note: we don't schedule an update if we're currently + // seeking forward in time, as we're trying to get there + // as quickly as possible + if (!this.paused && !this.isSeekingForward()) { this.scheduleNextUpdate(json.ms); } @@ -144,16 +176,24 @@ export class PlayerClient extends Client { this.send(JSON.stringify({ action: Action.SEEK, pos })); + this._setTime(pos); + this.lastUpdateTime = Date.now(); + this.skipTimeUpdatesUntil = pos; + if (pos < this.lastPlayedTimestamp) { // TODO: clear canvas - } else if (this.paused) { - // if we're paused, we want the scrubber to "stick" at the new - // time until we press play (rather than waiting for us to click - // play and start receiving new data) - this._setTime(pos); + } else if (!this.paused) { + this.scheduleNextUpdate(pos); } } + isSeekingForward(): boolean { + return ( + this.skipTimeUpdatesUntil !== null && + this.skipTimeUpdatesUntil > this.lastPlayedTimestamp + ); + } + // Overrides Client implementation. handleClientScreenSpec(buffer: ArrayBuffer) { this.emit( diff --git a/web/packages/teleport/src/lib/term/ttyPlayer.js b/web/packages/teleport/src/lib/term/ttyPlayer.js index d7f04846824c6..b231e83c1c7ce 100644 --- a/web/packages/teleport/src/lib/term/ttyPlayer.js +++ b/web/packages/teleport/src/lib/term/ttyPlayer.js @@ -50,10 +50,12 @@ export default class TtyPlayer extends Tty { this._paused = false; this._lastPlayedTimestamp = 0; + this._skipTimeUpdatesUntil = null; this._sendTimeUpdates = true; this._setTime = throttle(t => setTime(t), PROGRESS_UPDATE_INTERVAL_MS); - this._lastUpdate = 0; + this._lastUpdateTime = 0; + this._lastTimestamp = 0; this._timeout = null; } @@ -97,6 +99,9 @@ export default class TtyPlayer extends Tty { if (this._sendTimeUpdates) { this._setTime(t); } + + this._lastTimestamp = t; + this._lastUpdateTime = Date.now(); } disconnect(closeCode = WebsocketCloseCode.NORMAL) { @@ -108,10 +113,11 @@ export default class TtyPlayer extends Tty { scheduleNextUpdate(current) { this._timeout = setTimeout(() => { - const delta = Date.now() - this._lastUpdate; + const delta = Date.now() - this._lastUpdateTime; const next = current + delta; + this.setTime(next); - this._lastUpdate = Date.now(); + this._lastUpdateTime = Date.now(); this.scheduleNextUpdate(next); }, PROGRESS_UPDATE_INTERVAL_MS); @@ -133,8 +139,6 @@ export default class TtyPlayer extends Tty { // see lib/web/tty_playback.go for details on this protocol switch (typ) { case messageTypePty: - this.cancelTimeUpdate(); - const delay = Number(dv.getBigUint64(3)); const data = dv.buffer.slice( dv.byteOffset + 11, @@ -143,13 +147,29 @@ export default class TtyPlayer extends Tty { this.emit(TermEvent.DATA, data); this._lastPlayedTimestamp = delay; - - this._lastUpdate = Date.now(); + this._lastUpdateTime = Date.now(); this.setTime(delay); - // schedule the next time update (in case this - // part of the recording is dead time) - if (!this._paused) { + // clear seek state if we caught up to the seek point + if ( + this._skipTimeUpdatesUntil !== null && + this._lastPlayedTimestamp >= this._skipTimeUpdatesUntil + ) { + this._skipTimeUpdatesUntil = null; + } + + if (!this.isSeekingForward()) { + this.cancelTimeUpdate(); + } + + // schedule the next time update, which ensures that + // the progress bar continues to update even if this + // section of the recording is idle time + // + // note: we don't schedule an update if we're currently + // seeking forward in time, as we're trying to get there + // as quickly as possible + if (!this._paused && !this.isSeekingForward()) { this.scheduleNextUpdate(delay); } break; @@ -187,7 +207,6 @@ export default class TtyPlayer extends Tty { move(newPos) { this.cancelTimeUpdate(); - try { const buffer = new ArrayBuffer(11); const dv = new DataView(buffer); @@ -199,16 +218,26 @@ export default class TtyPlayer extends Tty { logger.error('error seeking', e); } + this._setTime(newPos); + this._lastUpdateTime = Date.now(); + this._skipTimeUpdatesUntil = newPos; + if (newPos < this._lastPlayedTimestamp) { this.emit(TermEvent.RESET); - } else if (this._paused) { - // if we're paused, we want the scrubber to "stick" at the new - // time until we press play (rather than waiting for us to click - // play and start receiving new data) - this._setTime(newPos); + } else { + if (!this._paused) { + this.scheduleNextUpdate(newPos); + } } } + isSeekingForward() { + return ( + this._skipTimeUpdatesUntil !== null && + this._skipTimeUpdatesUntil > this._lastPlayedTimestamp + ); + } + stop() { this._paused = true; this.cancelTimeUpdate(); @@ -226,6 +255,15 @@ export default class TtyPlayer extends Tty { this._paused = false; this._setPlayerStatus(StatusEnum.PLAYING); + this._lastUpdateTime = Date.now(); + + if (this.isSeekingForward()) { + const next = Math.max(this._skipTimeUpdatesUntil, this._lastTimestamp); + this.scheduleNextUpdate(next); + } else { + this.scheduleNextUpdate(this._lastTimestamp); + } + // the very first play call happens before we've even // connected - we only need to send the websocket message // for subsequent calls