Skip to content

Commit

Permalink
player: fix seeking during dead time (#38914)
Browse files Browse the repository at this point in the history
* player: fix seeking during dead time

This addresses a bug in the new player API where the player does
not respond to seeking during large periods of "dead time."
For example, if there is 30 seconds of dead time between two events,
the player is effectively in a 30 second sleep. Attempting to change
the position of playback during this 30-second period works, but is
not observed until the sleep completes.

With this change, the sleep period is canceled when the user seeks
to a new position.

Fixes #38560

* Improve UX when the user seeks while paused

Restart the stream in this case rather than trying to keep server
and client state in sync.

* Fix

* Update desktop player

* Lint/test fixes

* increase timeout

* Use a real clock for the new test
  • Loading branch information
zmb3 authored Apr 10, 2024
1 parent 4af2d10 commit 41d514b
Show file tree
Hide file tree
Showing 7 changed files with 287 additions and 52 deletions.
143 changes: 123 additions & 20 deletions lib/player/player.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ type Player struct {
advanceTo atomic.Int64

emit chan events.AuditEvent
wake chan int64
done chan struct{}

// playPause holds a channel to be closed when
Expand Down Expand Up @@ -119,6 +120,7 @@ func New(cfg *Config) (*Player, error) {
streamer: cfg.Streamer,
emit: make(chan events.AuditEvent, 1024),
playPause: make(chan chan struct{}, 1),
wake: make(chan int64),
done: make(chan struct{}),
}

Expand Down Expand Up @@ -207,7 +209,13 @@ func (p *Player) stream() {
// time we were advanced to
lastDelay = adv
}
if err := p.applyDelay(time.Duration(currentDelay-lastDelay) * time.Millisecond); err != nil {

switch err := p.applyDelay(lastDelay, currentDelay); {
case errors.Is(err, errSeekWhilePaused):
p.log.Debug("seeked during pause, will restart stream")
go p.stream()
return
case err != nil:
close(p.emit)
return
}
Expand Down Expand Up @@ -269,32 +277,113 @@ func (p *Player) Play() error {
// from the beginning. A duration greater than the length of the session
// will cause playback to rapidly advance to the end of the recording.
func (p *Player) SetPos(d time.Duration) error {
// we use a negative value to indicate rewinding, which means we can't
// rewind to position 0 (there is no negative 0)
if d == 0 {
d = 1 * time.Millisecond
}
if d.Milliseconds() < p.lastPlayed.Load() {
// if we're rewinding we store a negative value
d = -1 * d
}
p.advanceTo.Store(d.Milliseconds())

// try to wake up the player if it's waiting to emit an event
select {
case p.wake <- d.Milliseconds():
default:
}

return nil
}

// applyDelay "sleeps" for d in a manner that
// can be canceled
func (p *Player) applyDelay(d time.Duration) error {
scaled := float64(d) / p.speed.Load().(float64)
select {
case <-p.done:
return errClosed
case <-p.clock.After(time.Duration(scaled)):
return nil
// applyDelay applies the timing delay between the last emitted event
// (lastDelay) and the event that will be emitted next (currentDelay).
//
// The delay can be interrupted by:
// 1. The player being closed.
// 2. The user pausing playback.
// 3. The user seeking to a new position in the playback (SetPos)
//
// A nil return value indicates that the delay has elapsed and that
// the next even can be emitted.
func (p *Player) applyDelay(lastDelay, currentDelay int64) error {
loop:
for {
// TODO(zmb3): changing play speed during a long sleep
// will not apply until after the sleep completes
speed := p.speed.Load().(float64)
scaled := float64(currentDelay-lastDelay) / speed

timer := p.clock.NewTimer(time.Duration(scaled) * time.Millisecond)
defer timer.Stop()

start := time.Now()

select {
case <-p.done:
return errClosed
case newPos := <-p.wake:
// the sleep was interrupted due to the user changing playback controls
switch {
case newPos == interruptForPause:
// the user paused playback while we were waiting to emit the next event:
// 1) figure out much of the sleep we completed
dur := float64(time.Since(start).Milliseconds()) * speed

// 2) wait here until the user resumes playback
if err := p.waitWhilePaused(); errors.Is(err, errSeekWhilePaused) {
// the user changed the playback position, so consider the delay
// applied and let the player pick up from the new position
return errSeekWhilePaused
}

// now that we're playing again, update our delay to account
// for the portion that was already satisfied and apply the
// remaining delay
lastDelay += int64(dur)
timer.Stop()
continue loop
case newPos > currentDelay:
// the user scrubbed forward in time past the current event,
// so we can return as if the delay has elapsed naturally
return nil
case newPos < 0:
// the user has rewinded playback, which means we need to restart
// the stream and can consider this delay as having elapsed naturally
return nil
case newPos < currentDelay:
// the user has scrubbed forward in time, but not enough to
// emit the next event - we need to delay more
lastDelay = newPos
timer.Stop()
continue loop
default:
return nil
}

case <-timer.Chan():
return nil
}
}
}

// interruptForPause is a special value used to interrupt the player's
// sleep due to the user pausing playback.
const interruptForPause = math.MaxInt64

func (p *Player) setPlaying(play bool) {
ch := <-p.playPause
alreadyPlaying := ch == nil

if alreadyPlaying && !play {
ch = make(chan struct{})

// try to wake up the player if it's waiting to emit an event
select {
case p.wake <- interruptForPause:
default:
}

} else if !alreadyPlaying && play {
// signal waiters who are paused that it's time to resume playing
close(ch)
Expand All @@ -304,20 +393,34 @@ func (p *Player) setPlaying(play bool) {
p.playPause <- ch
}

var errSeekWhilePaused = errors.New("player seeked during pause")

// waitWhilePaused blocks while the player is in a paused state.
// It returns immediately if the player is currently playing.
func (p *Player) waitWhilePaused() error {
ch := <-p.playPause
p.playPause <- ch

if alreadyPlaying := ch == nil; !alreadyPlaying {
select {
case <-p.done:
return errClosed
case <-ch:
seeked := false
for {
ch := <-p.playPause
p.playPause <- ch

if alreadyPlaying := ch == nil; !alreadyPlaying {
select {
case <-p.done:
return errClosed
case <-p.wake:
// seek while paused, this can happen an unlimited number of times,
// we just keep waiting until we're unpaused
seeked = true
continue
case <-ch:
// we have been unpaused
}
}
if seeked {
return errSeekWhilePaused
}
return nil
}
return nil
}

// LastPlayed returns the time of the last played event,
Expand Down
52 changes: 52 additions & 0 deletions lib/player/player_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ func TestClose(t *testing.T) {

func TestSeekForward(t *testing.T) {
clk := clockwork.NewFakeClock()

p, err := player.New(&player.Config{
Clock: clk,
SessionID: "test-session",
Expand Down Expand Up @@ -210,6 +211,57 @@ func TestSeekForward(t *testing.T) {
}
}

func TestSeekForwardTwice(t *testing.T) {
clk := clockwork.NewRealClock()
p, err := player.New(&player.Config{
Clock: clk,
SessionID: "test-session",
Streamer: &simpleStreamer{count: 1, delay: 6000},
})
require.NoError(t, err)
t.Cleanup(func() { p.Close() })
require.NoError(t, p.Play())

time.Sleep(100 * time.Millisecond)
p.SetPos(500 * time.Millisecond)
time.Sleep(100 * time.Millisecond)
p.SetPos(5900 * time.Millisecond)

select {
case <-p.C():
case <-time.After(5 * time.Second):
require.FailNow(t, "event not emitted on time")
}
}

// TestInterruptsDelay tests that the player responds to playback
// controls even when it is waiting to emit an event.
func TestInterruptsDelay(t *testing.T) {
clk := clockwork.NewFakeClock()
p, err := player.New(&player.Config{
Clock: clk,
SessionID: "test-session",
Streamer: &simpleStreamer{count: 3, delay: 5000},
})
require.NoError(t, err)
require.NoError(t, p.Play())

t.Cleanup(func() { p.Close() })

clk.BlockUntil(1) // player is now waiting to emit event 0

// emulate the user seeking forward while the player is waiting..
p.SetPos(10_001 * time.Millisecond)

// expect event 0 and event 1 to be emitted right away
// even without advancing the clock
evt0 := <-p.C()
evt1 := <-p.C()

require.Equal(t, int64(0), evt0.GetIndex())
require.Equal(t, int64(1), evt1.GetIndex())
}

func TestRewind(t *testing.T) {
clk := clockwork.NewFakeClock()
p, err := player.New(&player.Config{
Expand Down
2 changes: 2 additions & 0 deletions lib/web/tty_playback.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,8 @@ func (h *Handler) ttyPlaybackHandle(
return
}

case *events.SessionLeave: // do nothing

default:
h.log.Debugf("unexpected event type %T", evt)
}
Expand Down
2 changes: 1 addition & 1 deletion web/packages/teleport/src/Player/DesktopPlayer.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,8 @@ export const DesktopPlayer = ({
onRestart={reload}
onStartMove={() => playerClient.suspendTimeUpdates()}
move={pos => {
playerClient.seekTo(pos);
playerClient.resumeTimeUpdates();
playerClient.seekTo(pos);
}}
onPlaySpeedChange={s => playerClient.setPlaySpeed(s)}
toggle={() => playerClient.togglePlayPause()}
Expand Down
2 changes: 1 addition & 1 deletion web/packages/teleport/src/Player/SshPlayer.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ export default function Player({ sid, clusterId, durationMs }) {
onRestart={() => window.location.reload()}
onStartMove={() => tty.suspendTimeUpdates()}
move={pos => {
tty.move(pos);
tty.resumeTimeUpdates();
tty.move(pos);
}}
toggle={() => {
isPlaying ? tty.stop() : tty.play();
Expand Down
Loading

0 comments on commit 41d514b

Please sign in to comment.