From 5194c45eb98d16af733f335f052d6275c743254e Mon Sep 17 00:00:00 2001 From: Sam Willis Date: Tue, 3 Dec 2024 16:04:13 +0000 Subject: [PATCH] Loading faster, and a loading screen --- demos/linearlite/db/generate_data.js | 2 +- demos/linearlite/db/load_data.js | 14 +- .../db/migrations-client/01-create_tables.sql | 20 +- .../post-initial-sync-fts-index.sql | 1 + .../post-initial-sync-indexes.sql | 11 + demos/linearlite/src/App.tsx | 71 ++++- demos/linearlite/src/components/TopFilter.tsx | 34 +- demos/linearlite/src/migrations.ts | 28 +- demos/linearlite/src/pglite-worker.ts | 230 +------------- demos/linearlite/src/sync.ts | 296 ++++++++++++++++++ demos/linearlite/src/utils/filterState.ts | 6 +- packages/pglite-sync/src/index.ts | 21 +- 12 files changed, 470 insertions(+), 264 deletions(-) create mode 100644 demos/linearlite/db/migrations-client/post-initial-sync-fts-index.sql create mode 100644 demos/linearlite/db/migrations-client/post-initial-sync-indexes.sql create mode 100644 demos/linearlite/src/sync.ts diff --git a/demos/linearlite/db/generate_data.js b/demos/linearlite/db/generate_data.js index c9d101dbf..a2e5240c9 100644 --- a/demos/linearlite/db/generate_data.js +++ b/demos/linearlite/db/generate_data.js @@ -35,7 +35,7 @@ function generateIssue(kanbanKey) { username: faker.internet.userName(), comments: faker.helpers.multiple( () => generateComment(issueId, createdAt), - { count: faker.number.int({ min: 0, max: 10 }) } + { count: faker.number.int({ min: 0, max: 1 }) } ), } } diff --git a/demos/linearlite/db/load_data.js b/demos/linearlite/db/load_data.js index b1503c70e..4a4f494e6 100644 --- a/demos/linearlite/db/load_data.js +++ b/demos/linearlite/db/load_data.js @@ -34,7 +34,7 @@ try { // Process data in batches for (let i = 0; i < issues.length; i += BATCH_SIZE) { const issueBatch = issues.slice(i, i + BATCH_SIZE) - + await sql.begin(async (sql) => { // Disable FK checks await sql`SET CONSTRAINTS ALL DEFERRED` @@ -47,12 +47,20 @@ try { // Insert related comments const batchComments = issueBatch.flatMap((issue) => issue.comments) const commentColumns = Object.keys(batchComments[0]) - await batchInsert(sql, 'comment', commentColumns, batchComments, BATCH_SIZE) + await batchInsert( + sql, + 'comment', + commentColumns, + batchComments, + BATCH_SIZE + ) commentCount += batchComments.length }) - process.stdout.write(`\nProcessed batch ${Math.floor(i / BATCH_SIZE) + 1}: ${Math.min(i + BATCH_SIZE, issues.length)} of ${issues.length} issues\n`) + process.stdout.write( + `\nProcessed batch ${Math.floor(i / BATCH_SIZE) + 1}: ${Math.min(i + BATCH_SIZE, issues.length)} of ${issues.length} issues\n` + ) } console.info(`Loaded ${issueCount} issues with ${commentCount} comments.`) diff --git a/demos/linearlite/db/migrations-client/01-create_tables.sql b/demos/linearlite/db/migrations-client/01-create_tables.sql index d1f9e70e1..7129fac7e 100644 --- a/demos/linearlite/db/migrations-client/01-create_tables.sql +++ b/demos/linearlite/db/migrations-client/01-create_tables.sql @@ -15,10 +15,6 @@ CREATE TABLE IF NOT EXISTS "issue" ( "sent_to_server" BOOLEAN NOT NULL DEFAULT FALSE, -- Flag to track if the row has been sent to the server "synced" BOOLEAN GENERATED ALWAYS AS (ARRAY_LENGTH(modified_columns, 1) IS NULL AND NOT deleted AND NOT new) STORED, "backup" JSONB, -- JSONB column to store the backup of the row data for modified columns - "search_vector" tsvector GENERATED ALWAYS AS ( - setweight(to_tsvector('simple', coalesce(title, '')), 'A') || - setweight(to_tsvector('simple', coalesce(description, '')), 'B') - ) STORED, CONSTRAINT "issue_pkey" PRIMARY KEY ("id") ); @@ -39,20 +35,8 @@ CREATE TABLE IF NOT EXISTS "comment" ( ); CREATE INDEX IF NOT EXISTS "issue_id_idx" ON "issue" ("id"); -CREATE INDEX IF NOT EXISTS "issue_priority_idx" ON "issue" ("priority"); -CREATE INDEX IF NOT EXISTS "issue_status_idx" ON "issue" ("status"); -CREATE INDEX IF NOT EXISTS "issue_modified_idx" ON "issue" ("modified"); -CREATE INDEX IF NOT EXISTS "issue_created_idx" ON "issue" ("created"); -CREATE INDEX IF NOT EXISTS "issue_kanbanorder_idx" ON "issue" ("kanbanorder"); -CREATE INDEX IF NOT EXISTS "issue_deleted_idx" ON "issue" ("deleted"); -CREATE INDEX IF NOT EXISTS "issue_synced_idx" ON "issue" ("synced"); -CREATE INDEX IF NOT EXISTS "issue_search_idx" ON "issue" USING GIN ("search_vector"); CREATE INDEX IF NOT EXISTS "comment_id_idx" ON "comment" ("id"); -CREATE INDEX IF NOT EXISTS "comment_issue_id_idx" ON "comment" ("issue_id"); -CREATE INDEX IF NOT EXISTS "comment_created_idx" ON "comment" ("created"); -CREATE INDEX IF NOT EXISTS "comment_deleted_idx" ON "comment" ("deleted"); -CREATE INDEX IF NOT EXISTS "comment_synced_idx" ON "comment" ("synced"); -- During sync the electric.syncing config var is set to true -- We can use this in triggers to determine the action that should be performed @@ -297,3 +281,7 @@ $$ LANGUAGE plpgsql; -- Example usage: -- SELECT revert_local_changes('issue', '123e4567-e89b-12d3-a456-426614174000'); -- SELECT revert_local_changes('comment', '123e4567-e89b-12d3-a456-426614174001'); + + +ALTER TABLE issue DISABLE TRIGGER ALL; +ALTER TABLE comment DISABLE TRIGGER ALL; diff --git a/demos/linearlite/db/migrations-client/post-initial-sync-fts-index.sql b/demos/linearlite/db/migrations-client/post-initial-sync-fts-index.sql new file mode 100644 index 000000000..9e292c053 --- /dev/null +++ b/demos/linearlite/db/migrations-client/post-initial-sync-fts-index.sql @@ -0,0 +1 @@ +CREATE INDEX IF NOT EXISTS "issue_search_idx" ON "issue" USING GIN ((setweight(to_tsvector('simple', coalesce(title, '')), 'A') || setweight(to_tsvector('simple', coalesce(description, '')), 'B'))); \ No newline at end of file diff --git a/demos/linearlite/db/migrations-client/post-initial-sync-indexes.sql b/demos/linearlite/db/migrations-client/post-initial-sync-indexes.sql new file mode 100644 index 000000000..30328e467 --- /dev/null +++ b/demos/linearlite/db/migrations-client/post-initial-sync-indexes.sql @@ -0,0 +1,11 @@ +CREATE INDEX IF NOT EXISTS "issue_priority_idx" ON "issue" ("priority"); +CREATE INDEX IF NOT EXISTS "issue_status_idx" ON "issue" ("status"); +CREATE INDEX IF NOT EXISTS "issue_modified_idx" ON "issue" ("modified"); +CREATE INDEX IF NOT EXISTS "issue_created_idx" ON "issue" ("created"); +CREATE INDEX IF NOT EXISTS "issue_kanbanorder_idx" ON "issue" ("kanbanorder"); +CREATE INDEX IF NOT EXISTS "issue_deleted_idx" ON "issue" ("deleted"); +CREATE INDEX IF NOT EXISTS "issue_synced_idx" ON "issue" ("synced"); +CREATE INDEX IF NOT EXISTS "comment_issue_id_idxx" ON "comment" ("issue_id"); +CREATE INDEX IF NOT EXISTS "comment_created_idx" ON "comment" ("created"); +CREATE INDEX IF NOT EXISTS "comment_deleted_idx" ON "comment" ("deleted"); +CREATE INDEX IF NOT EXISTS "comment_synced_idx" ON "comment" ("synced"); \ No newline at end of file diff --git a/demos/linearlite/src/App.tsx b/demos/linearlite/src/App.tsx index 169414f33..de86fced5 100644 --- a/demos/linearlite/src/App.tsx +++ b/demos/linearlite/src/App.tsx @@ -20,6 +20,9 @@ import { FilterState, } from './utils/filterState' import { Issue as IssueType, Status, StatusValue } from './types/types' +import { startSync, useSyncStatus, waitForInitialSyncDone } from './sync' +import { electricSync } from '@electric-sql/pglite-sync' +import { ImSpinner8 } from 'react-icons/im' interface MenuContextInterface { showMenu: boolean @@ -30,13 +33,36 @@ export const MenuContext = createContext(null as MenuContextInterface | null) type PGliteWorkerWithLive = PGliteWorker & { live: LiveNamespace } -const pgPromise = PGliteWorker.create(new PGWorker(), { - extensions: { - live, - }, +async function createPGlite() { + return PGliteWorker.create(new PGWorker(), { + extensions: { + live, + sync: electricSync(), + }, + }) +} + +const pgPromise = createPGlite() + +let syncStarted = false +pgPromise.then(async (pg) => { + console.log('PGlite worker started') + pg.onLeaderChange(() => { + console.log('Leader changed, isLeader:', pg.isLeader) + if (pg.isLeader && !syncStarted) { + syncStarted = true + startSync(pg) + } + }) +}) + +let resolveFirstLoaderPromise: (value: void | PromiseLike) => void +const firstLoaderPromise = new Promise((resolve) => { + resolveFirstLoaderPromise = resolve }) async function issueListLoader({ request }: { request: Request }) { + await waitForInitialSyncDone() const pg = await pgPromise const url = new URL(request.url) const filterState = getFilterStateFromSearchParams(url.searchParams) @@ -48,10 +74,12 @@ async function issueListLoader({ request }: { request: Request }) { offset: 0, limit: 100, }) + resolveFirstLoaderPromise() return { liveIssues, filterState } } async function boardIssueListLoader({ request }: { request: Request }) { + await waitForInitialSyncDone() const pg = await pgPromise const url = new URL(request.url) const filterState = getFilterStateFromSearchParams(url.searchParams) @@ -78,6 +106,8 @@ async function boardIssueListLoader({ request }: { request: Request }) { columnsLiveIssues[status] = colLiveIssues } + resolveFirstLoaderPromise() + return { columnsLiveIssues: columnsLiveIssues as Record< StatusValue, @@ -132,21 +162,52 @@ const router = createBrowserRouter([ }, ]) +const LoadingScreen = ({ children }: { children: React.ReactNode }) => { + return ( +
+ +
+ {children} +
+
+ ) +} + const App = () => { const [showMenu, setShowMenu] = useState(false) const [pgForProvider, setPgForProvider] = useState(null) + const [syncStatus, syncMessage] = useSyncStatus() + const [firstLoaderDone, setFirstLoaderDone] = useState(false) useEffect(() => { pgPromise.then(setPgForProvider) }, []) + useEffect(() => { + if (firstLoaderDone) return + firstLoaderPromise.then(() => { + setFirstLoaderDone(true) + }) + }, [firstLoaderDone]) + const menuContextValue = useMemo( () => ({ showMenu, setShowMenu }), [showMenu] ) - if (!pgForProvider) return
Loading...
+ if (!pgForProvider) return Starting PGlite... + + if (syncStatus === 'initial-sync') + return ( + + Performing initial sync... +
+ {syncMessage} +
+ ) + + if (!firstLoaderDone) return Loading... return ( diff --git a/demos/linearlite/src/components/TopFilter.tsx b/demos/linearlite/src/components/TopFilter.tsx index 0169442da..5724b4e2e 100644 --- a/demos/linearlite/src/components/TopFilter.tsx +++ b/demos/linearlite/src/components/TopFilter.tsx @@ -1,13 +1,14 @@ import { ReactComponent as MenuIcon } from '../assets/icons/menu.svg' -import { useState, useContext } from 'react' +import { useState, useContext, useEffect } from 'react' import { BsSortUp, BsPlus, BsX, BsSearch as SearchIcon } from 'react-icons/bs' -import { useLiveQuery } from '@electric-sql/pglite-react' +import { useLiveQuery, usePGlite } from '@electric-sql/pglite-react' import ViewOptionMenu from './ViewOptionMenu' import { MenuContext } from '../App' import FilterMenu from './contextmenu/FilterMenu' import { FilterState, useFilterState } from '../utils/filterState' import { PriorityDisplay, StatusDisplay } from '../types/types' import debounce from 'lodash.debounce' +import { createFTSIndex } from '../migrations' interface Props { filteredIssuesCount: number @@ -28,6 +29,8 @@ export default function ({ const [showViewOption, setShowViewOption] = useState(false) const { showMenu, setShowMenu } = useContext(MenuContext)! const [searchQuery, setSearchQuery] = useState(``) + const [FTSIndexReady, setFTSIndexReady] = useState(true) + const pg = usePGlite() filterState ??= usedFilterState @@ -63,6 +66,24 @@ export default function ({ } } + useEffect(() => { + if (!showSearch) return + const checkFTSIndex = async () => { + const res = await pg.query( + `SELECT 1 FROM pg_indexes WHERE indexname = 'issue_search_idx';` + ) + const indexReady = res.rows.length > 0 + if (!indexReady) { + setFTSIndexReady(false) + await createFTSIndex(pg) + } + setFTSIndexReady(true) + } + checkFTSIndex() + }, [showSearch, pg]) + + const showFTSIndexProgress = showSearch && !FTSIndexReady + return ( <>
@@ -177,6 +198,15 @@ export default function ({
)} + {showFTSIndexProgress && ( +
+
+
+ Building full text search index... (only happens once) +
+
+ )} + setShowViewOption(false)} diff --git a/demos/linearlite/src/migrations.ts b/demos/linearlite/src/migrations.ts index 62ae2dbbf..143a6fb48 100644 --- a/demos/linearlite/src/migrations.ts +++ b/demos/linearlite/src/migrations.ts @@ -1,6 +1,30 @@ -import type { PGlite } from '@electric-sql/pglite' +import type { PGlite, PGliteInterface } from '@electric-sql/pglite' import m1 from '../db/migrations-client/01-create_tables.sql?raw' +import postInitialSyncIndexes from '../db/migrations-client/post-initial-sync-indexes.sql?raw' +import postInitialSyncFtsIndex from '../db/migrations-client/post-initial-sync-fts-index.sql?raw' export async function migrate(pg: PGlite) { - await pg.exec(m1) + const tables = await pg.query( + `SELECT table_name FROM information_schema.tables WHERE table_schema='public'` + ) + if (tables.rows.length === 0) { + await pg.exec(m1) + } +} + +export async function postInitialSync(pg: PGliteInterface) { + const commands = postInitialSyncIndexes + .split('\n') + .map((c) => c.trim()) + .filter((c) => c.length > 0) + for (const command of commands) { + // wait 100ms between commands + console.time(`command: ${command}`) + await pg.exec(command) + console.timeEnd(`command: ${command}`) + } +} + +export async function createFTSIndex(pg: PGliteInterface) { + await pg.exec(postInitialSyncFtsIndex) } diff --git a/demos/linearlite/src/pglite-worker.ts b/demos/linearlite/src/pglite-worker.ts index ce0b35a32..598424e24 100644 --- a/demos/linearlite/src/pglite-worker.ts +++ b/demos/linearlite/src/pglite-worker.ts @@ -1,243 +1,15 @@ import { worker } from '@electric-sql/pglite/worker' -import { PGlite, Mutex } from '@electric-sql/pglite' -import { live, type PGliteWithLive } from '@electric-sql/pglite/live' -import { electricSync, type PGliteWithSync } from '@electric-sql/pglite-sync' +import { PGlite } from '@electric-sql/pglite' import { migrate } from './migrations' -import type { IssueChange, CommentChange, ChangeSet } from './utils/changes' - -const WRITE_SERVER_URL = import.meta.env.VITE_WRITE_SERVER_URL -const ELECTRIC_URL = import.meta.env.VITE_ELECTRIC_URL -const APPLY_CHANGES_URL = `${WRITE_SERVER_URL}/apply-changes` - -type PGliteWithExtensions = PGliteWithLive & PGliteWithSync worker({ async init() { const pg = await PGlite.create({ - // debug: 1, dataDir: 'idb://linearlite2', relaxedDurability: true, - extensions: { - sync: electricSync(), - live, - }, }) - // Migrate the database to the latest schema await migrate(pg) - - // This waits for the last weeks data to sync to the database - await startSyncToDatabase(pg) - - startWritePath(pg) - return pg }, }) - -const INITIAL_SYNC_DAYS = 7 -// We can set this to a specific date to sync from, or leave it blank to sync from 30 days ago -// this is used for the demo to sync from a specific date based on what we have in the demo data -const INITIAL_SYNC_FROM_DATE = import.meta.env.VITE_INITIAL_SYNC_FROM_DATE ?? '2024-11-28T00:00:00.000Z' - -async function initialSyncToDatabase(pg: PGliteWithExtensions) { - // We are going to first sync just the last weeks data. - // To make this cache efficient lets sync to the previous Monday that is at least - // 7 days prior to today. - const today = new Date() - const syncFrom = new Date(INITIAL_SYNC_FROM_DATE ?? today) - if (!INITIAL_SYNC_FROM_DATE) { - syncFrom.setDate( - today.getDate() - (INITIAL_SYNC_DAYS + ((today.getDay() + 6) % 7)) - ) - } - - console.log('syncing from', syncFrom.toISOString()) - - const issuesSync = await pg.sync.syncShapeToTable({ - shape: { - url: `${ELECTRIC_URL}/v1/shape`, - table: 'issue', - where: `created >= '${syncFrom.toISOString()}'`, - }, - table: 'issue', - primaryKey: ['id'], - }) - const issueSyncUpToDate = new Promise((resolve, reject) => { - issuesSync.subscribe(() => { - // Subscribe will be called when the sync is up to date - // at which point we can unsubscribe and resolve the promise - console.log('issue sync up to date') - issuesSync.unsubscribe() - resolve() - }, reject) - }) - const commentsSync = await pg.sync.syncShapeToTable({ - shape: { - url: `${ELECTRIC_URL}/v1/shape`, - table: 'comment', - where: `created >= '${syncFrom.toISOString()}'`, - }, - table: 'comment', - primaryKey: ['id'], - }) - const commentSyncUpToDate = new Promise((resolve, reject) => { - commentsSync.subscribe(() => { - // Subscribe will be called when the sync is up to date - // at which point we can unsubscribe and resolve the promise - console.log('comment sync up to date') - commentsSync.unsubscribe() - resolve() - }, reject) - }) - // Wait for both syncs to complete - await Promise.all([issueSyncUpToDate, commentSyncUpToDate]) -} - -async function startSyncToDatabase(pg: PGliteWithExtensions) { - // First sync the last weeks data if the database is empty - const issueCount = await pg.query<{ count: number }>(` - SELECT count(id) as count FROM issue - `) - if (issueCount.rows[0].count === 0) { - console.log('initial sync to database') - await initialSyncToDatabase(pg) - console.log('initial sync to database complete') - } - - // Finally start the full sync - const throttle = 100 // used during initial sync to prevent too many renders - pg.sync.syncShapeToTable({ - shape: { - url: `${ELECTRIC_URL}/v1/shape`, - table: 'issue', - }, - table: 'issue', - primaryKey: ['id'], - shapeKey: 'issues', - commitGranularity: 'transaction', - commitThrottle: throttle, - }) - pg.sync.syncShapeToTable({ - shape: { - url: `${ELECTRIC_URL}/v1/shape`, - table: 'comment', - }, - table: 'comment', - primaryKey: ['id'], - shapeKey: 'comments', - commitGranularity: 'transaction', - commitThrottle: throttle, - }) -} - -const syncMutex = new Mutex() - -async function startWritePath(pg: PGliteWithExtensions) { - // Use a live query to watch for changes to the local tables that need to be synced - pg.live.query<{ - issue_count: number - comment_count: number - }>( - ` - SELECT * FROM - (SELECT count(id) as issue_count FROM issue WHERE synced = false), - (SELECT count(id) as comment_count FROM comment WHERE synced = false) - `, - [], - async (results) => { - const { issue_count, comment_count } = results.rows[0] - if (issue_count > 0 || comment_count > 0) { - await syncMutex.acquire() - try { - doSyncToServer(pg) - } finally { - syncMutex.release() - } - } - } - ) -} - -// Call wrapped in mutex to prevent multiple syncs from happening at the same time -async function doSyncToServer(pg: PGliteWithExtensions) { - let issueChanges: IssueChange[] - let commentChanges: CommentChange[] - await pg.transaction(async (tx) => { - const issueRes = await tx.query(` - SELECT - id, - title, - description, - priority, - status, - modified, - created, - kanbanorder, - username, - modified_columns, - deleted, - new - FROM issue - WHERE synced = false AND sent_to_server = false - `) - const commentRes = await tx.query(` - SELECT - id, - body, - username, - issue_id, - modified, - created, - modified_columns, - deleted, - new - FROM comment - WHERE synced = false AND sent_to_server = false - `) - issueChanges = issueRes.rows - commentChanges = commentRes.rows - }) - const changeSet: ChangeSet = { - issues: issueChanges!, - comments: commentChanges!, - } - const response = await fetch(APPLY_CHANGES_URL, { - method: 'POST', - headers: { - 'Content-Type': 'application/json', - }, - body: JSON.stringify(changeSet), - }) - if (!response.ok) { - throw new Error('Failed to apply changes') - } - await pg.transaction(async (tx) => { - // Mark all changes as sent to server, but check that the modified timestamp - // has not changed in the meantime - - tx.exec('SET LOCAL electric.bypass_triggers = true') - - for (const issue of issueChanges!) { - await tx.query( - ` - UPDATE issue - SET sent_to_server = true - WHERE id = $1 AND modified = $2 - `, - [issue.id, issue.modified] - ) - } - - for (const comment of commentChanges!) { - await tx.query( - ` - UPDATE comment - SET sent_to_server = true - WHERE id = $1 AND modified = $2 - `, - [comment.id, comment.modified] - ) - } - }) -} diff --git a/demos/linearlite/src/sync.ts b/demos/linearlite/src/sync.ts new file mode 100644 index 000000000..c4387f8c7 --- /dev/null +++ b/demos/linearlite/src/sync.ts @@ -0,0 +1,296 @@ +import { Mutex } from '@electric-sql/pglite' +import { type PGliteWithLive } from '@electric-sql/pglite/live' +import { type PGliteWithSync } from '@electric-sql/pglite-sync' +import type { IssueChange, CommentChange, ChangeSet } from './utils/changes' +import { postInitialSync } from './migrations' +import { useEffect, useState } from 'react' + +const WRITE_SERVER_URL = import.meta.env.VITE_WRITE_SERVER_URL +const ELECTRIC_URL = import.meta.env.VITE_ELECTRIC_URL +const APPLY_CHANGES_URL = `${WRITE_SERVER_URL}/apply-changes` + +type SyncStatus = 'initial-sync' | 'done' + +type PGliteWithExtensions = PGliteWithLive & PGliteWithSync + +export async function startSync(pg: PGliteWithExtensions) { + await startSyncToDatabase(pg) + startWritePath(pg) +} + +async function startSyncToDatabase(pg: PGliteWithExtensions) { + // Check if there are any issues already in the database + const issues = await pg.query(`SELECT 1 FROM issue LIMIT 1`) + const hasIssuesAtStart = issues.rows.length > 0 + + if (!hasIssuesAtStart) { + updateSyncStatus('initial-sync', 'Downloading shape data...') + } + + let issueShapeInitialSyncDone = false + let commentShapeInitialSyncDone = false + let postInitialSyncDone = false + + let postInitialSyncDoneResolver: () => void + const postInitialSyncDonePromise = new Promise((resolve) => { + postInitialSyncDoneResolver = resolve + }) + + const doPostInitialSync = async () => { + if ( + !hasIssuesAtStart && + issueShapeInitialSyncDone && + commentShapeInitialSyncDone && + !postInitialSyncDone + ) { + postInitialSyncDone = true + updateSyncStatus('initial-sync', 'Creating indexes...') + await postInitialSync(pg) + postInitialSyncDoneResolver() + } + } + + // Issues Sync + const issuesSync = await pg.sync.syncShapeToTable({ + shape: { + url: `${ELECTRIC_URL}/v1/shape`, + table: 'issue', + }, + table: 'issue', + primaryKey: ['id'], + shapeKey: 'issues', + commitGranularity: 'up-to-date', + useCopy: true, + onInitialSync: async () => { + issueShapeInitialSyncDone = true + await pg.exec(`ALTER TABLE issue ENABLE TRIGGER ALL`) + doPostInitialSync() + }, + }) + issuesSync.subscribe( + () => { + if (!hasIssuesAtStart) { + updateSyncStatus('initial-sync', 'Inserting issues...') + } + }, + (error) => { + console.error('issuesSync error', error) + } + ) + + // Comments Sync + const commentsSync = await pg.sync.syncShapeToTable({ + shape: { + url: `${ELECTRIC_URL}/v1/shape`, + table: 'comment', + }, + table: 'comment', + primaryKey: ['id'], + shapeKey: 'comments', + commitGranularity: 'up-to-date', + useCopy: true, + onInitialSync: async () => { + commentShapeInitialSyncDone = true + await pg.exec(`ALTER TABLE comment ENABLE TRIGGER ALL`) + doPostInitialSync() + }, + }) + commentsSync.subscribe( + () => { + if (!hasIssuesAtStart) { + updateSyncStatus('initial-sync', 'Inserting comments...') + } + }, + (error) => { + console.error('commentsSync error', error) + } + ) + + if (!hasIssuesAtStart) { + await postInitialSyncDonePromise + await pg.query(`SELECT 1;`) // Do a query to ensure PGlite is idle + } + updateSyncStatus('done') +} + +const syncMutex = new Mutex() + +async function startWritePath(pg: PGliteWithExtensions) { + // Use a live query to watch for changes to the local tables that need to be synced + pg.live.query<{ + issue_count: number + comment_count: number + }>( + ` + SELECT * FROM + (SELECT count(id) as issue_count FROM issue WHERE synced = false), + (SELECT count(id) as comment_count FROM comment WHERE synced = false) + `, + [], + async (results) => { + const { issue_count, comment_count } = results.rows[0] + if (issue_count > 0 || comment_count > 0) { + await syncMutex.acquire() + try { + doSyncToServer(pg) + } finally { + syncMutex.release() + } + } + } + ) +} + +// Call wrapped in mutex to prevent multiple syncs from happening at the same time +async function doSyncToServer(pg: PGliteWithExtensions) { + let issueChanges: IssueChange[] + let commentChanges: CommentChange[] + await pg.transaction(async (tx) => { + const issueRes = await tx.query(` + SELECT + id, + title, + description, + priority, + status, + modified, + created, + kanbanorder, + username, + modified_columns, + deleted, + new + FROM issue + WHERE synced = false AND sent_to_server = false + `) + const commentRes = await tx.query(` + SELECT + id, + body, + username, + issue_id, + modified, + created, + modified_columns, + deleted, + new + FROM comment + WHERE synced = false AND sent_to_server = false + `) + issueChanges = issueRes.rows + commentChanges = commentRes.rows + }) + const changeSet: ChangeSet = { + issues: issueChanges!, + comments: commentChanges!, + } + const response = await fetch(APPLY_CHANGES_URL, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + }, + body: JSON.stringify(changeSet), + }) + if (!response.ok) { + throw new Error('Failed to apply changes') + } + await pg.transaction(async (tx) => { + // Mark all changes as sent to server, but check that the modified timestamp + // has not changed in the meantime + + tx.exec('SET LOCAL electric.bypass_triggers = true') + + for (const issue of issueChanges!) { + await tx.query( + ` + UPDATE issue + SET sent_to_server = true + WHERE id = $1 AND modified = $2 + `, + [issue.id, issue.modified] + ) + } + + for (const comment of commentChanges!) { + await tx.query( + ` + UPDATE comment + SET sent_to_server = true + WHERE id = $1 AND modified = $2 + `, + [comment.id, comment.modified] + ) + } + }) +} + +export function updateSyncStatus(newStatus: SyncStatus, message?: string) { + localStorage.setItem('syncStatus', JSON.stringify([newStatus, message])) + // Fire a storage event on this tab as this doesn't happen automatically + window.dispatchEvent( + new StorageEvent('storage', { + key: 'syncStatus', + newValue: JSON.stringify([newStatus, message]), + }) + ) +} + +export function useSyncStatus() { + const currentSyncStatusJson = localStorage.getItem('syncStatus') + const currentSyncStatus: [SyncStatus, string] = currentSyncStatusJson + ? JSON.parse(currentSyncStatusJson) + : ['initial-sync', 'Starting sync...'] + const [syncStatus, setSyncStatus] = + useState<[SyncStatus, string]>(currentSyncStatus) + + useEffect(() => { + const handleStorageChange = (e: StorageEvent) => { + if (e.key === 'syncStatus' && e.newValue) { + const [newStatus, message] = JSON.parse(e.newValue) + setSyncStatus([newStatus, message]) + } + } + + window.addEventListener('storage', handleStorageChange) + + return () => { + window.removeEventListener('storage', handleStorageChange) + } + }, []) + + return syncStatus +} + +let initialSyncDone = false + +export function waitForInitialSyncDone() { + return new Promise((resolve) => { + if (initialSyncDone) { + resolve() + return + } + const handleStorageChange = (e: StorageEvent) => { + if (e.key === 'syncStatus' && e.newValue) { + const [newStatus] = JSON.parse(e.newValue) + if (newStatus === 'done') { + window.removeEventListener('storage', handleStorageChange) + initialSyncDone = true + resolve() + } + } + } + + // Check current status first + const currentSyncStatusJson = localStorage.getItem('syncStatus') + const [currentStatus] = currentSyncStatusJson + ? JSON.parse(currentSyncStatusJson) + : ['initial-sync'] + + if (currentStatus === 'done') { + initialSyncDone = true + resolve() + } else { + window.addEventListener('storage', handleStorageChange) + } + }) +} diff --git a/demos/linearlite/src/utils/filterState.ts b/demos/linearlite/src/utils/filterState.ts index d7ef3c904..490909c61 100644 --- a/demos/linearlite/src/utils/filterState.ts +++ b/demos/linearlite/src/utils/filterState.ts @@ -94,7 +94,11 @@ export function filterStateToSql(filterState: FilterState) { sqlParams.push(...filterState.priority) } if (filterState.query) { - sqlWhere.push(`search_vector @@ plainto_tsquery('simple', $${i++})`) + sqlWhere.push(` + (setweight(to_tsvector('simple', coalesce(title, '')), 'A') || + setweight(to_tsvector('simple', coalesce(description, '')), 'B')) + @@ plainto_tsquery('simple', $${i++}) + `) sqlParams.push(filterState.query) } const sql = ` diff --git a/packages/pglite-sync/src/index.ts b/packages/pglite-sync/src/index.ts index 687bf9baa..df5d46b08 100644 --- a/packages/pglite-sync/src/index.ts +++ b/packages/pglite-sync/src/index.ts @@ -44,6 +44,7 @@ export interface SyncShapeToTableOptions { useCopy?: boolean commitGranularity?: CommitGranularity commitThrottle?: number + onInitialSync?: () => void } export interface ElectricSyncOptions { @@ -69,6 +70,7 @@ async function createPlugin( const namespaceObj = { syncShapeToTable: async (options: SyncShapeToTableOptions) => { + await firstRun() options = { commitGranularity: 'up-to-date', ...options, @@ -128,7 +130,10 @@ async function createPlugin( const commit = async () => { if (messageAggregator.length === 0 && !truncateNeeded) return await pg.transaction(async (tx) => { - if (debug) console.log('up-to-date, committing all messages') + if (debug) { + console.log('committing message batch', messageAggregator.length) + console.time('commit') + } // Set the syncing flag to true during this transaction so that // user defined triggers on the table are able to chose how to run @@ -215,6 +220,7 @@ async function createPlugin( }) } }) + if (debug) console.timeEnd('commit') messageAggregator = [] // Await a timeout to start a new task and allow other connections to do work await new Promise((resolve) => setTimeout(resolve, 0)) @@ -272,6 +278,9 @@ async function createPlugin( case 'up-to-date': // perform all accumulated changes and store stream state await commit() // not throttled, we want this to happen ASAP + if (isNewSubscription && options.onInitialSync) { + options.onInitialSync() + } break } } @@ -313,7 +322,11 @@ async function createPlugin( } } - const init = async () => { + let firstRunDone = false + + const firstRun = async () => { + if (firstRunDone) return + firstRunDone = true await migrateShapeMetadataTables({ pg, metadataSchema, @@ -323,7 +336,6 @@ async function createPlugin( return { namespaceObj, close, - init, } } @@ -339,11 +351,10 @@ export function electricSync(options?: ElectricSyncOptions) { return { name: 'ElectricSQL Sync', setup: async (pg: PGliteInterface) => { - const { namespaceObj, close, init } = await createPlugin(pg, options) + const { namespaceObj, close } = await createPlugin(pg, options) return { namespaceObj, close, - init, } }, } satisfies Extension