Skip to content

Commit

Permalink
Merge pull request #5147 from nextcloud/backport/5126/stable28
Browse files Browse the repository at this point in the history
[stable28] Preserve step queue during reconnect
  • Loading branch information
juliusknorr authored Dec 21, 2023
2 parents 36ce4f9 + e524f04 commit db52a6d
Show file tree
Hide file tree
Showing 5 changed files with 147 additions and 20 deletions.
2 changes: 2 additions & 0 deletions cypress/e2e/api/SyncServiceProvider.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ describe('Sync service provider', function() {
* @param {object} ydoc Yjs document
*/
function createProvider(ydoc) {
const queue = []
const syncService = new SyncService({
serialize: () => 'Serialized',
getDocumentState: () => null,
Expand All @@ -70,6 +71,7 @@ describe('Sync service provider', function() {
syncService,
fileId,
initialSession: null,
queue,
disableBc: true,
})
}
Expand Down
4 changes: 4 additions & 0 deletions src/components/Editor.vue
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,7 @@ export default {
},
created() {
this.$ydoc = new Doc()
this.$queue = []
// The following can be useful for debugging ydoc updates
// this.$ydoc.on('update', function(update, origin, doc, tr) {
// console.debug('ydoc update', update, origin, doc, tr)
Expand Down Expand Up @@ -377,10 +378,13 @@ export default {

this.listenSyncServiceEvents()

this.$providers.forEach(p => p?.destroy())
this.$providers = []
const syncServiceProvider = createSyncServiceProvider({
ydoc: this.$ydoc,
syncService: this.$syncService,
fileId: this.fileId,
queue: this.$queue,
initialSession: this.initialSession,
})
this.$providers.push(syncServiceProvider)
Expand Down
5 changes: 3 additions & 2 deletions src/services/SyncServiceProvider.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,16 @@ import { logger } from '../helpers/logger.js'
* @param {object} options.ydoc - the Ydoc
* @param {object} options.syncService - sync service to build upon
* @param {number} options.fileId - file id of the file to open
* @param {number} options.queue - queue for outgoing steps
* @param {object} options.initialSession - initialSession to start from
* @param {boolean} options.disableBc - disable broadcast channel synchronization (default: disabled in debug mode, enabled otherwise)
*/
export default function createSyncServiceProvider({ ydoc, syncService, fileId, initialSession, disableBc }) {
export default function createSyncServiceProvider({ ydoc, syncService, fileId, initialSession, queue, disableBc }) {
if (!fileId) {
// We need a file id as a unique identifier for y.js as otherwise state might leak between different files
throw new Error('fileId is required')
}
const WebSocketPolyfill = initWebSocketPolyfill(syncService, fileId, initialSession)
const WebSocketPolyfill = initWebSocketPolyfill(syncService, fileId, initialSession, queue)
disableBc = disableBc ?? !!window?._oc_debug
const websocketProvider = new WebsocketProvider(
'ws://localhost:1234',
Expand Down
42 changes: 24 additions & 18 deletions src/services/WebSocketPolyfill.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ import { encodeArrayBuffer, decodeArrayBuffer } from '../helpers/base64.js'
* @param {object} syncService - the sync service to build upon
* @param {number} fileId - id of the file to open
* @param {object} initialSession - initial session to open
* @param {object[]} queue - queue for the outgoing steps
*/
export default function initWebSocketPolyfill(syncService, fileId, initialSession) {
export default function initWebSocketPolyfill(syncService, fileId, initialSession, queue) {
return class WebSocketPolyfill {

#url
Expand All @@ -41,11 +42,9 @@ export default function initWebSocketPolyfill(syncService, fileId, initialSessio
onclose
onopen
#handlers
#queue

constructor(url) {
this.url = url
this.#queue = []
logger.debug('WebSocketPolyfill#constructor', { url, fileId, initialSession })
this.#registerHandlers({
opened: ({ version, session }) => {
Expand Down Expand Up @@ -83,32 +82,34 @@ export default function initWebSocketPolyfill(syncService, fileId, initialSessio
// Useful for debugging what steps are sent and how they were initiated
// data.forEach(logStep)

this.#queue.push(...data)
queue.push(...data)
let outbox = []
return syncService.sendSteps(() => {
outbox = [...this.#queue]
const data = {
steps: this.#steps,
awareness: this.#awareness,
version: this.#version,
}
this.#queue = []
outbox = [...queue]
logger.debug('sending steps ', data)
return data
})?.catch(err => {
logger.error(err)
// try to send the steps again
this.#queue = [...outbox, ...this.#queue]
})
})?.then(ret => {
// only keep the steps that were not send yet
queue.splice(0,
queue.length,
...queue.filter(s => !outbox.includes(s)),
)
return ret
}, err => logger.error(err))
}

get #steps() {
return this.#queue.map(s => encodeArrayBuffer(s))
return queue.map(s => encodeArrayBuffer(s))
.filter(s => s < 'AQ')
}

get #awareness() {
return this.#queue.map(s => encodeArrayBuffer(s))
return queue.map(s => encodeArrayBuffer(s))
.findLast(s => s > 'AQ') || ''
}

Expand All @@ -124,19 +125,24 @@ export default function initWebSocketPolyfill(syncService, fileId, initialSessio
}

#sendRemainingSteps() {
if (this.#queue.length) {
if (queue.length) {
let outbox = []
return syncService.sendStepsNow(() => {
const data = {
steps: this.#steps,
awareness: this.#awareness,
version: this.#version,
}
this.#queue = []
outbox = [...queue]
logger.debug('sending final steps ', data)
return data
})?.catch(err => {
logger.error(err)
})
})?.then(() => {
// only keep the steps that were not send yet
queue.splice(0,
queue.length,
...queue.filter(s => !outbox.includes(s)),
)
}, err => logger.error(err))
}
}

Expand Down
114 changes: 114 additions & 0 deletions src/tests/services/WebsocketPolyfill.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
import initWebSocketPolyfill from '../../services/WebSocketPolyfill.js'

describe('Init function', () => {

it('returns a websocket polyfill class', () => {
const syncService = { on: jest.fn(), open: jest.fn() }
const Polyfill = initWebSocketPolyfill(syncService)
const websocket = new Polyfill('url')
expect(websocket).toBeInstanceOf(Polyfill)
})

it('registers handlers', () => {
const syncService = { on: jest.fn(), open: jest.fn() }
const Polyfill = initWebSocketPolyfill(syncService)
const websocket = new Polyfill('url')
expect(syncService.on).toHaveBeenCalled()
})

it('opens sync service', () => {
const syncService = { on: jest.fn(), open: jest.fn() }
const fileId = 123
const initialSession = { }
const Polyfill = initWebSocketPolyfill(syncService, fileId, initialSession)
const websocket = new Polyfill('url')
expect(syncService.open).toHaveBeenCalledWith({ fileId, initialSession })
})

it('sends steps to sync service', async () => {
const syncService = {
on: jest.fn(),
open: jest.fn(),
sendSteps: async getData => getData(),
}
const queue = [ 'initial' ]
const data = { dummy: 'data' }
const Polyfill = initWebSocketPolyfill(syncService, null, null, queue)
const websocket = new Polyfill('url')
const result = websocket.send(data)
expect(result).toBeInstanceOf(Promise)
expect(queue).toEqual([ 'initial' , data ])
const dataSendOut = await result
expect(queue).toEqual([])
expect(dataSendOut).toHaveProperty('awareness')
expect(dataSendOut).toHaveProperty('steps')
expect(dataSendOut).toHaveProperty('version')
})

it('handles early reject', async () => {
const syncService = {
on: jest.fn(),
open: jest.fn(),
sendSteps: jest.fn().mockRejectedValue('error before reading steps in sync service'),
}
const queue = [ 'initial' ]
const data = { dummy: 'data' }
const Polyfill = initWebSocketPolyfill(syncService, null, null, queue)
const websocket = new Polyfill('url')
const result = websocket.send(data)
expect(queue).toEqual([ 'initial' , data ])
expect(result).toBeInstanceOf(Promise)
const returned = await result
expect(returned).toBeUndefined()
expect(queue).toEqual([ 'initial' , data ])
})

it('handles reject after reading data', async () => {
const syncService = {
on: jest.fn(),
open: jest.fn(),
sendSteps: jest.fn().mockImplementation( async getData => {
getData()
throw 'error when sending in sync service'
}),
}
const queue = [ 'initial' ]
const data = { dummy: 'data' }
const Polyfill = initWebSocketPolyfill(syncService, null, null, queue)
const websocket = new Polyfill('url')
const result = websocket.send(data)
expect(queue).toEqual([ 'initial' , data ])
expect(result).toBeInstanceOf(Promise)
const returned = await result
expect(returned).toBeUndefined()
expect(queue).toEqual([ 'initial' , data ])
})

it('queue survives a close', async () => {
const syncService = {
on: jest.fn(),
open: jest.fn(),
sendSteps: jest.fn().mockImplementation( async getData => {
getData()
throw 'error when sending in sync service'
}),
sendStepsNow: jest.fn().mockImplementation( async getData => {
getData()
throw 'sendStepsNow error when sending'
}),
off: jest.fn(),
close: jest.fn( async data => data ),
}
const queue = [ 'initial' ]
const data = { dummy: 'data' }
const Polyfill = initWebSocketPolyfill(syncService, null, null, queue)
const websocket = new Polyfill('url')
websocket.onclose = jest.fn()
await websocket.send(data)
const promise = websocket.close()
expect(queue).toEqual([ 'initial' , data ])
await promise
expect(queue).toEqual([ 'initial' , data ])
})

})

0 comments on commit db52a6d

Please sign in to comment.