Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DO_NOT_MERGE] sync effect (V2) development and testing #2952

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 84 additions & 56 deletions src/vanilla/store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -248,8 +248,7 @@ const buildStore = (...storeArgs: StoreArgs): Store => {
const unmountCallbacks = new Set<() => void>()
const mountCallbacks = new Set<() => void>()

let inTransaction = 0
const runWithTransaction = <T>(fn: () => T): T => {
const flushCallbacks = () => {
const errors: unknown[] = []
const call = (fn: () => void) => {
try {
Expand All @@ -258,36 +257,24 @@ const buildStore = (...storeArgs: StoreArgs): Store => {
errors.push(e)
}
}
let result: T
++inTransaction
try {
result = fn()
} finally {
if (inTransaction === 1) {
while (
changedAtoms.size ||
unmountCallbacks.size ||
mountCallbacks.size
) {
recomputeInvalidatedAtoms()
;(store as any)[INTERNAL_flushStoreHook]?.()
const callbacks = new Set<() => void>()
const add = callbacks.add.bind(callbacks)
changedAtoms.forEach((atomState) => atomState.m?.l.forEach(add))
changedAtoms.clear()
unmountCallbacks.forEach(add)
unmountCallbacks.clear()
mountCallbacks.forEach(add)
mountCallbacks.clear()
callbacks.forEach(call)
}
do {
;(store as any)[INTERNAL_flushStoreHook]?.()
const callbacks = new Set<() => void>()
const add = callbacks.add.bind(callbacks)
changedAtoms.forEach((atomState) => atomState.m?.l.forEach(add))
changedAtoms.clear()
unmountCallbacks.forEach(add)
unmountCallbacks.clear()
mountCallbacks.forEach(add)
mountCallbacks.clear()
callbacks.forEach(call)
if (changedAtoms.size) {
recomputeInvalidatedAtoms()
}
--inTransaction
}
} while (changedAtoms.size || unmountCallbacks.size || mountCallbacks.size)
if (errors.length) {
throw errors[0]
}
return result
}

const setAtomStateValueOrPromise = (
Expand Down Expand Up @@ -344,7 +331,9 @@ const buildStore = (...storeArgs: StoreArgs): Store => {
let isSync = true
const mountDependenciesIfAsync = () => {
if (atomState.m) {
runWithTransaction(() => mountDependencies(atom, atomState))
mountDependencies(atom, atomState)
recomputeInvalidatedAtoms()
flushCallbacks()
}
}
const getter: Getter = <V>(a: Atom<V>) => {
Expand Down Expand Up @@ -440,14 +429,12 @@ const buildStore = (...storeArgs: StoreArgs): Store => {
return dependents
}

const invalidateDependents = <Value>(atomState: AtomState<Value>) => {
const visited = new WeakSet<AtomState>()
const invalidateDependents = (atomState: AtomState) => {
const stack: AtomState[] = [atomState]
while (stack.length) {
const aState = stack.pop()!
if (!visited.has(aState)) {
visited.add(aState)
for (const [d, s] of getMountedOrPendingDependents(aState)) {
for (const [d, s] of getMountedOrPendingDependents(aState)) {
if (!invalidatedAtoms.has(d)) {
invalidatedAtoms.set(d, s.n)
stack.push(s)
}
Expand Down Expand Up @@ -529,13 +516,14 @@ const buildStore = (...storeArgs: StoreArgs): Store => {
atom: WritableAtom<Value, Args, Result>,
...args: Args
): Result => {
let isSync = true
const getter: Getter = <V>(a: Atom<V>) => returnAtomValue(readAtomState(a))
const setter: Setter = <V, As extends unknown[], R>(
a: WritableAtom<V, As, R>,
...args: As
) => {
const aState = ensureAtomState(a)
return runWithTransaction(() => {
try {
if (isSelfAtom(atom, a)) {
if (!hasInitialValue(a)) {
// NOTE technically possible but restricted as it may cause bugs
Expand All @@ -554,23 +542,45 @@ const buildStore = (...storeArgs: StoreArgs): Store => {
} else {
return writeAtomState(a, ...args)
}
})
} finally {
if (!isSync) {
recomputeInvalidatedAtoms()
flushCallbacks()
}
}
}
try {
return atomWrite(atom, getter, setter, ...args)
} finally {
isSync = false
}
return atomWrite(atom, getter, setter, ...args)
}

const writeAtom = <Value, Args extends unknown[], Result>(
atom: WritableAtom<Value, Args, Result>,
...args: Args
): Result => runWithTransaction(() => writeAtomState(atom, ...args))
): Result => {
try {
return writeAtomState(atom, ...args)
} finally {
recomputeInvalidatedAtoms()
flushCallbacks()
}
}

const mountDependencies = (atom: AnyAtom, atomState: AtomState) => {
if (atomState.m && !isPendingPromise(atomState.v)) {
for (const a of atomState.d.keys()) {
for (const [a, n] of atomState.d) {
if (!atomState.m.d.has(a)) {
const aMounted = mountAtom(a, ensureAtomState(a))
const aState = ensureAtomState(a)
const aMounted = mountAtom(a, aState)
aMounted.t.add(atom)
atomState.m.d.add(a)
if (n !== aState.n) {
changedAtoms.set(a, aState)
aState.u?.()
invalidateDependents(aState)
}
}
}
for (const a of atomState.m.d || []) {
Expand Down Expand Up @@ -605,11 +615,31 @@ const buildStore = (...storeArgs: StoreArgs): Store => {
if (isActuallyWritableAtom(atom)) {
const mounted = atomState.m
const processOnMount = () => {
const onUnmount = atomOnMount(atom, (...args) =>
runWithTransaction(() => writeAtomState(atom, ...args)),
)
if (onUnmount) {
mounted.u = onUnmount
let isSync = true
const setAtom = (...args: unknown[]) => {
try {
return writeAtomState(atom, ...args)
} finally {
if (!isSync) {
recomputeInvalidatedAtoms()
flushCallbacks()
}
}
}
try {
const onUnmount = atomOnMount(atom, setAtom)
if (onUnmount) {
mounted.u = () => {
isSync = true
try {
onUnmount()
} finally {
isSync = false
}
}
}
} finally {
isSync = false
}
}
mountCallbacks.add(processOnMount)
Expand Down Expand Up @@ -646,17 +676,15 @@ const buildStore = (...storeArgs: StoreArgs): Store => {

const subscribeAtom = (atom: AnyAtom, listener: () => void) => {
const atomState = ensureAtomState(atom)
return runWithTransaction(() => {
const mounted = mountAtom(atom, atomState)
const listeners = mounted.l
listeners.add(listener)
return () => {
runWithTransaction(() => {
listeners.delete(listener)
unmountAtom(atom, atomState)
})
}
})
const mounted = mountAtom(atom, atomState)
const listeners = mounted.l
listeners.add(listener)
flushCallbacks()
return () => {
listeners.delete(listener)
unmountAtom(atom, atomState)
flushCallbacks()
}
}

const unstable_derive: Store['unstable_derive'] = (fn) =>
Expand Down
Loading
Loading