Skip to content

Commit

Permalink
feat: support chunk promises for injectToStream() (closes #40) (#42)
Browse files Browse the repository at this point in the history
  • Loading branch information
brillout authored Jul 3, 2024
1 parent 29458e8 commit 300308e
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 46 deletions.
2 changes: 1 addition & 1 deletion src/server/renderToStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ async function renderToStream(element: React.ReactNode, options: Options = {}):
assertUsage(!options.renderToPipeableStream && !options.renderToReadableStream, 'using deprecated options')

element = React.createElement(SuspenseData, null, element)
let injectToStream: InjectToStream = (chunk) => {
let injectToStream: InjectToStream = async (chunk) => {
buffer.push(chunk)
}
const buffer: Chunk[] = []
Expand Down
63 changes: 35 additions & 28 deletions src/server/renderToStream/createBuffer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,28 +3,34 @@ export type { InjectToStream }
export type { StreamOperations }
export type { Chunk }

import { assert, assertUsage, createDebugger } from '../utils'
import { assert, assertUsage, createDebugger, isPromise } from '../utils'

const debug = createDebugger('react-streaming:buffer')

// - According to sebmarkbage chunks should be injected "before React writes".
// - https://github.com/reactwg/react-18/discussions/114#:~:text=Injecting%20Into%20the%20SSR%20Stream
// - Indeed, chunks cannot be arbitrary injected between React chunks.
// - It isn't clear what "before React writes" means. I interpret it like this: nothing should be injected between two React synchronous writes. My interpreted rule seems to be working so far.
// - It's also the interpretation of the Apollo GraphQL team: https://github.com/apollographql/apollo-client-nextjs/issues/325#issuecomment-2205375796
// - Being able to pass a chunk promise to injectToStream() is required for integrating Apollo GraphQL, see:
// - https://github.com/apollographql/apollo-client-nextjs/issues/325#issuecomment-2199664143
// - https://github.com/brillout/react-streaming/issues/40
type InjectToStreamOptions = {
flush?: boolean
/* We used to have this option (https://github.com/brillout/react-streaming/commit/2f5bf270832a8a45f04af6821d709f590cc9cb7f) but it isn't needed anymore
/* We used to have this option (https://github.com/brillout/react-streaming/commit/2f5bf270832a8a45f04af6821d709f590cc9cb7f) but it isn't needed anymore.
tolerateStreamEnded?: boolean
*/
}
// A chunk doesn't have to be a string: let's wait for users to complain and let's progressively add all expected types.
type Chunk = string
type InjectToStream = (chunk: Chunk, options?: InjectToStreamOptions) => void
type Chunk = string | Promise<string> // A chunk doesn't have to be a string. Let's progressively add all expected types as users complain.
type InjectToStream = (chunk: Chunk, options?: InjectToStreamOptions) => Promise<void>

type StreamOperations = {
operations: null | { writeChunk: (chunk: unknown) => void; flush: null | (() => void) }
}

function createBuffer(streamOperations: StreamOperations): {
injectToStream: InjectToStream
onReactWriteBefore: (chunk: unknown) => void
onReactWriteAfter: () => void
onBeforeEnd: () => void
onReactWrite: (chunk: unknown) => Promise<void>
onBeforeEnd: () => Promise<void>
hasStreamEnded: () => boolean
} {
const buffer: { chunk: Chunk; flush: undefined | boolean }[] = []
Expand All @@ -36,9 +42,9 @@ function createBuffer(streamOperations: StreamOperations): {
// - Thus, we delay any write to the stream until react wrote its first chunk.
let writePermission = false

return { injectToStream, onReactWriteBefore, onReactWriteAfter, onBeforeEnd, hasStreamEnded }
return { injectToStream, onReactWrite, onBeforeEnd, hasStreamEnded }

function injectToStream(chunk: Chunk, options?: InjectToStreamOptions) {
async function injectToStream(chunk: Chunk, options?: InjectToStreamOptions) {
if (debug.isEnabled) {
debug('injectToStream()', getChunkAsString(chunk))
}
Expand All @@ -51,10 +57,10 @@ function createBuffer(streamOperations: StreamOperations): {
)
}
buffer.push({ chunk, flush: options?.flush })
flushBuffer()
await flushBuffer()
}

function flushBuffer() {
async function flushBuffer() {
if (!writePermission) {
return
}
Expand All @@ -66,14 +72,13 @@ function createBuffer(streamOperations: StreamOperations): {
return
}
let flushStream = false
buffer.forEach((bufferEntry) => {
for (let { chunk, flush } of buffer) {
assert(streamOperations.operations)
const { writeChunk } = streamOperations.operations
writeChunk(bufferEntry.chunk)
if (bufferEntry.flush) {
flushStream = true
}
})
if (isPromise(chunk)) chunk = await chunk
writeChunk(chunk)
if (flush) flushStream = true
}
buffer.length = 0
assert(streamOperations.operations)
if (flushStream && streamOperations.operations.flush !== null) {
Expand All @@ -82,23 +87,25 @@ function createBuffer(streamOperations: StreamOperations): {
}
}

function onReactWriteAfter() {
const writeWasBlocked = !writePermission
writePermission = true
if (writeWasBlocked) flushBuffer()
}
function onReactWriteBefore(chunk: unknown) {
async function onReactWrite(chunk: unknown) {
state === 'UNSTARTED' && debug('>>> START')
if (debug.isEnabled) {
debug('react write', getChunkAsString(chunk))
}
state = 'STREAMING'
flushBuffer()
const bufferReactEntry = { chunk: chunk as any, flush: true }
if (!writePermission) {
buffer.unshift(bufferReactEntry)
} else {
buffer.push(bufferReactEntry)
}
writePermission = true
await flushBuffer()
}

function onBeforeEnd() {
async function onBeforeEnd() {
writePermission = true // in case React didn't write anything
flushBuffer()
await flushBuffer()
assert(buffer.length === 0)
state = 'ENDED'
debug('>>> END')
Expand Down
17 changes: 8 additions & 9 deletions src/server/renderToStream/createPipeWrapper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@ async function createPipeWrapper(
const streamOperations: StreamOperations = {
operations: null,
}
const { injectToStream, onReactWriteBefore, onReactWriteAfter, onBeforeEnd, hasStreamEnded } =
createBuffer(streamOperations)
const { injectToStream, onReactWrite, onBeforeEnd, hasStreamEnded } = createBuffer(streamOperations)
return { pipeForUser, streamEnd, injectToStream, hasStreamEnded }

function createPipeForUser(): { pipeForUser: Pipe; streamEnd: Promise<void> } {
Expand All @@ -36,21 +35,21 @@ async function createPipeWrapper(
})
const pipeForUser: Pipe = (writableFromUser: StreamNodeWritable) => {
const writableForReact = new Writable({
write(chunk: unknown, encoding, callback) {
async write(chunk: unknown, encoding, callback) {
debug('write')
onReactWriteBefore(chunk)
if (!writableFromUser.destroyed) {
writableFromUser.write(chunk, encoding, callback)
onReactWriteAfter()
await onReactWrite(chunk)
} else {
// Destroying twice is fine: https://github.com/brillout/react-streaming/pull/21#issuecomment-1554517163
// - E.g. when the server closes the connection.
// - Destroying twice is fine: https://github.com/brillout/react-streaming/pull/21#issuecomment-1554517163
writableForReact.destroy()
}
callback()
},
final(callback) {
async final(callback) {
debug('final')
stopTimeout?.()
onBeforeEnd()
await onBeforeEnd()
writableFromUser.end()
onEnded()
callback()
Expand Down
11 changes: 4 additions & 7 deletions src/server/renderToStream/createReadableWrapper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ function createReadableWrapper(readableFromReact: ReadableStream, { stopTimeout
onReady(onEnded)
},
})
const { injectToStream, onReactWriteBefore, onReactWriteAfter, onBeforeEnd, hasStreamEnded } =
createBuffer(streamOperations)
const { injectToStream, onReactWrite, onBeforeEnd, hasStreamEnded } = createBuffer(streamOperations)
return { readableForUser, streamEnd, injectToStream, hasStreamEnded }

async function onReady(onEnded: () => void) {
Expand All @@ -47,18 +46,16 @@ function createReadableWrapper(readableFromReact: ReadableStream, { stopTimeout
if (done) {
break
}
onReactWriteBefore(value)
streamOperations.operations.writeChunk(value)
onReactWriteAfter()
await onReactWrite(value)
}

stopTimeout?.()

// Collect injectToStream() calls stuck in an async call.
// Workaround for: https://github.com/brillout/react-streaming/issues/40#issuecomment-2199424650
// We should probably remove this workaround once we have a proper solution.
setTimeout(() => {
onBeforeEnd()
setTimeout(async () => {
await onBeforeEnd()
controllerOfUserStream.close()
onEnded()
}, 0)
Expand Down
2 changes: 1 addition & 1 deletion test/renderToStream.test.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ describe('renderToStream()', async () => {
expect(timeoutResolved).toBe(true)

try {
injectToStream('someChunk')
await injectToStream('someChunk')
expect(1).toBe(2)
} catch (err) {
expect(err.message).include('Cannot inject the following chunk because the stream has already ended')
Expand Down

0 comments on commit 300308e

Please sign in to comment.