diff --git a/README.md b/README.md index 1133dcd47..733c2658d 100644 --- a/README.md +++ b/README.md @@ -34,7 +34,7 @@ To configure the image proxy, you will need to set the `IMGPROXY_` env vars. `NE The site is written in javascript using Next.js, a React framework. The backend API is provided via GraphQL. The database is PostgreSQL modeled with Prisma. The job queue is also maintained in PostgreSQL. We use lnd for our lightning node. A customized Bootstrap theme is used for styling. # processes -There are two. 1. the web app and 2. the worker, which dequeues jobs sent to it by the web app, e.g. polling lnd for invoice/payment status +There are two. 1. the web app and 2. the worker, which dequeues jobs sent to it by the web app, e.g. processing images. # wallet transaction safety To ensure stackers balances are kept sane, all wallet updates are run in serializable transactions at the database level. Because prisma has relatively poor support for transactions all wallet touching code is written in plpgsql stored procedures and can be found in the prisma/migrations folder. diff --git a/api/resolvers/serial.js b/api/resolvers/serial.js index 3731e4902..e160f32c9 100644 --- a/api/resolvers/serial.js +++ b/api/resolvers/serial.js @@ -77,9 +77,8 @@ export async function serializeInvoicable (query, { models, lnd, hash, hmac, me, if (hash) { invoice = await checkInvoice(models, hash, hmac, enforceFee) trx = [ - models.$queryRaw`UPDATE users SET msats = msats + ${invoice.msatsReceived} WHERE id = ${invoice.user.id}`, - ...trx, - models.invoice.update({ where: { hash: invoice.hash }, data: { confirmedAt: new Date() } }) + models.$executeRaw`SELECT confirm_invoice(${hash}, ${invoice.msatsReceived})`, + ...trx ] } diff --git a/api/resolvers/wallet.js b/api/resolvers/wallet.js index 86047b9df..05de8f8ad 100644 --- a/api/resolvers/wallet.js +++ b/api/resolvers/wallet.js @@ -297,12 +297,10 @@ export default { console.log('invoice', balanceLimit) const [inv] = await serialize(models, - models.$queryRaw`SELECT * FROM create_invoice(${invoice.id}, ${invoice.request}, + models.$queryRaw`SELECT * FROM create_invoice(${invoice.id}, ${hodlInvoice ? invoice.secret : null}::TEXT, ${invoice.request}, ${expiresAt}::timestamp, ${amount * 1000}, ${user.id}::INTEGER, ${description}, NULL, NULL, ${invLimit}::INTEGER, ${balanceLimit})`) - if (hodlInvoice) await models.invoice.update({ where: { hash: invoice.id }, data: { preimage: invoice.secret } }) - // the HMAC is only returned during invoice creation // this makes sure that only the person who created this invoice // has access to the HMAC diff --git a/lib/constants.js b/lib/constants.js index c09cfa394..9d00c7483 100644 --- a/lib/constants.js +++ b/lib/constants.js @@ -31,10 +31,10 @@ export const MAX_TERRITORY_DESC_LENGTH = 1000 // 1k export const MAX_POLL_CHOICE_LENGTH = 40 export const ITEM_SPAM_INTERVAL = '10m' export const ANON_ITEM_SPAM_INTERVAL = '0' -export const INV_PENDING_LIMIT = 10 +export const INV_PENDING_LIMIT = 100 export const BALANCE_LIMIT_MSATS = 250000000 // 250k sat export const SN_USER_IDS = [616, 6030, 946, 4502] -export const ANON_INV_PENDING_LIMIT = 100 +export const ANON_INV_PENDING_LIMIT = 1000 export const ANON_BALANCE_LIMIT_MSATS = 0 // disable export const MAX_POLL_NUM_CHOICES = 10 export const MIN_POLL_NUM_CHOICES = 2 diff --git a/pages/api/lnurlp/[username]/pay.js b/pages/api/lnurlp/[username]/pay.js index df130fe2e..eaa300428 100644 --- a/pages/api/lnurlp/[username]/pay.js +++ b/pages/api/lnurlp/[username]/pay.js @@ -81,7 +81,7 @@ export default async ({ query: { username, amount, nostr, comment, payerdata: pa }) await serialize(models, - models.$queryRaw`SELECT * FROM create_invoice(${invoice.id}, ${invoice.request}, + models.$queryRaw`SELECT * FROM create_invoice(${invoice.id}, NULL, ${invoice.request}, ${expiresAt}::timestamp, ${Number(amount)}, ${user.id}::INTEGER, ${noteStr || description}, ${comment || null}, ${parsedPayerData || null}::JSONB, ${INV_PENDING_LIMIT}::INTEGER, ${BALANCE_LIMIT_MSATS})`) diff --git a/prisma/migrations/20240104004135_invoice_confirmed_index/migration.sql b/prisma/migrations/20240104004135_invoice_confirmed_index/migration.sql new file mode 100644 index 000000000..aaa263cb4 --- /dev/null +++ b/prisma/migrations/20240104004135_invoice_confirmed_index/migration.sql @@ -0,0 +1,5 @@ +-- AlterTable +ALTER TABLE "Invoice" ADD COLUMN "confirmedIndex" BIGINT; + +-- CreateIndex +CREATE INDEX "Invoice.confirmedIndex_index" ON "Invoice"("confirmedIndex"); diff --git a/prisma/migrations/20240105190205_replace_polling_with_lnd_subscriptions/migration.sql b/prisma/migrations/20240105190205_replace_polling_with_lnd_subscriptions/migration.sql new file mode 100644 index 000000000..bcf1cbf00 --- /dev/null +++ b/prisma/migrations/20240105190205_replace_polling_with_lnd_subscriptions/migration.sql @@ -0,0 +1,98 @@ +-- remove 'checkInvoice' job insertion since we're using LND subscriptions now +-- also allow function to take preimage as an argument +DROP FUNCTION IF EXISTS create_invoice(hash TEXT, bolt11 TEXT, expires_at timestamp(3) without time zone, + msats_req BIGINT, user_id INTEGER, idesc TEXT, comment TEXT, lud18_data JSONB, inv_limit INTEGER, balance_limit_msats BIGINT); +CREATE OR REPLACE FUNCTION create_invoice(hash TEXT, preimage TEXT, bolt11 TEXT, expires_at timestamp(3) without time zone, + msats_req BIGINT, user_id INTEGER, idesc TEXT, comment TEXT, lud18_data JSONB, inv_limit INTEGER, balance_limit_msats BIGINT) +RETURNS "Invoice" +LANGUAGE plpgsql +AS $$ +DECLARE + invoice "Invoice"; + inv_limit_reached BOOLEAN; + balance_limit_reached BOOLEAN; + inv_pending_msats BIGINT; +BEGIN + PERFORM ASSERT_SERIALIZED(); + + -- prevent too many pending invoices + SELECT inv_limit > 0 AND count(*) >= inv_limit, COALESCE(sum("msatsRequested"), 0) INTO inv_limit_reached, inv_pending_msats + FROM "Invoice" + WHERE "userId" = user_id AND "expiresAt" > now_utc() AND "confirmedAt" IS NULL AND cancelled = false; + + IF inv_limit_reached THEN + RAISE EXCEPTION 'SN_INV_PENDING_LIMIT'; + END IF; + + -- prevent pending invoices + msats from exceeding the limit + SELECT balance_limit_msats > 0 AND inv_pending_msats+msats_req+msats > balance_limit_msats INTO balance_limit_reached + FROM users + WHERE id = user_id; + + IF balance_limit_reached THEN + RAISE EXCEPTION 'SN_INV_EXCEED_BALANCE'; + END IF; + + -- we good, proceed frens + INSERT INTO "Invoice" (hash, preimage, bolt11, "expiresAt", "msatsRequested", "userId", created_at, updated_at, "desc", comment, "lud18Data") + VALUES (hash, preimage, bolt11, expires_at, msats_req, user_id, now_utc(), now_utc(), idesc, comment, lud18_data) RETURNING * INTO invoice; + + IF preimage IS NOT NULL THEN + INSERT INTO pgboss.job (name, data, retrylimit, retrybackoff, startafter) + VALUES ('finalizeHodlInvoice', jsonb_build_object('hash', hash), 21, true, expires_at); + END IF; + + RETURN invoice; +END; +$$; + +-- remove 'checkWithdrawal' job insertion since we're using LND subscriptions now +CREATE OR REPLACE FUNCTION create_withdrawl(lnd_id TEXT, invoice TEXT, msats_amount BIGINT, msats_max_fee BIGINT, username TEXT) +RETURNS "Withdrawl" +LANGUAGE plpgsql +AS $$ +DECLARE + user_id INTEGER; + user_msats BIGINT; + withdrawl "Withdrawl"; +BEGIN + PERFORM ASSERT_SERIALIZED(); + + SELECT msats, id INTO user_msats, user_id FROM users WHERE name = username; + IF (msats_amount + msats_max_fee) > user_msats THEN + RAISE EXCEPTION 'SN_INSUFFICIENT_FUNDS'; + END IF; + + IF EXISTS (SELECT 1 FROM "Withdrawl" WHERE hash = lnd_id AND status IS NULL) THEN + RAISE EXCEPTION 'SN_PENDING_WITHDRAWL_EXISTS'; + END IF; + + IF EXISTS (SELECT 1 FROM "Withdrawl" WHERE hash = lnd_id AND status = 'CONFIRMED') THEN + RAISE EXCEPTION 'SN_CONFIRMED_WITHDRAWL_EXISTS'; + END IF; + + INSERT INTO "Withdrawl" (hash, bolt11, "msatsPaying", "msatsFeePaying", "userId", created_at, updated_at) + VALUES (lnd_id, invoice, msats_amount, msats_max_fee, user_id, now_utc(), now_utc()) RETURNING * INTO withdrawl; + + UPDATE users SET msats = msats - msats_amount - msats_max_fee WHERE id = user_id; + + RETURN withdrawl; +END; +$$; + +CREATE OR REPLACE FUNCTION check_invoices_and_withdrawals() +RETURNS INTEGER +LANGUAGE plpgsql +AS $$ +DECLARE +BEGIN + INSERT INTO pgboss.schedule (name, cron, timezone) VALUES ('checkPendingDeposits', '*/10 * * * *', 'America/Chicago') ON CONFLICT DO NOTHING; + INSERT INTO pgboss.schedule (name, cron, timezone) VALUES ('checkPendingWithdrawals', '*/10 * * * *', 'America/Chicago') ON CONFLICT DO NOTHING; + return 0; +EXCEPTION WHEN OTHERS THEN + return 0; +END; +$$; + +SELECT check_invoices_and_withdrawals(); +DROP FUNCTION check_invoices_and_withdrawals(); \ No newline at end of file diff --git a/prisma/schema.prisma b/prisma/schema.prisma index 171bf6c97..3610f85f8 100644 --- a/prisma/schema.prisma +++ b/prisma/schema.prisma @@ -540,6 +540,7 @@ model Invoice { bolt11 String expiresAt DateTime confirmedAt DateTime? + confirmedIndex BigInt? cancelled Boolean @default(false) msatsRequested BigInt msatsReceived BigInt? @@ -550,6 +551,7 @@ model Invoice { @@index([createdAt], map: "Invoice.created_at_index") @@index([userId], map: "Invoice.userId_index") + @@index([confirmedIndex], map: "Invoice.confirmedIndex_index") } model Withdrawl { diff --git a/worker/index.js b/worker/index.js index 38988d0e3..4036b2d90 100644 --- a/worker/index.js +++ b/worker/index.js @@ -1,7 +1,7 @@ import PgBoss from 'pg-boss' import nextEnv from '@next/env' import { PrismaClient } from '@prisma/client' -import { checkInvoice, checkWithdrawal, autoDropBolt11s } from './wallet.js' +import { autoDropBolt11s, checkPendingDeposits, checkPendingWithdrawals, finalizeHodlInvoice, subscribeToWallet } from './wallet.js' import { repin } from './repin.js' import { trust } from './trust.js' import { auction } from './auction.js' @@ -71,8 +71,11 @@ async function work () { } await boss.start() - await boss.work('checkInvoice', jobWrapper(checkInvoice)) - await boss.work('checkWithdrawal', jobWrapper(checkWithdrawal)) + + await subscribeToWallet(args) + await boss.work('finalizeHodlInvoice', jobWrapper(finalizeHodlInvoice)) + await boss.work('checkPendingDeposits', jobWrapper(checkPendingDeposits)) + await boss.work('checkPendingWithdrawals', jobWrapper(checkPendingWithdrawals)) await boss.work('autoDropBolt11s', jobWrapper(autoDropBolt11s)) await boss.work('repin-*', jobWrapper(repin)) await boss.work('trust', jobWrapper(trust)) diff --git a/worker/wallet.js b/worker/wallet.js index 463105efe..816f9abc1 100644 --- a/worker/wallet.js +++ b/worker/wallet.js @@ -1,47 +1,130 @@ import serialize from '../api/resolvers/serial.js' -import { getInvoice, getPayment, cancelHodlInvoice } from 'ln-service' -import { datePivot } from '../lib/time.js' +import { + getInvoice, getPayment, cancelHodlInvoice, + subscribeToInvoices, subscribeToPayments, subscribeToInvoice +} from 'ln-service' import { sendUserNotification } from '../api/webPush/index.js' import { msatsToSats, numWithUnits } from '../lib/format' import { INVOICE_RETENTION_DAYS } from '../lib/constants' +import { sleep } from '../lib/time.js' -const walletOptions = { startAfter: 5, retryLimit: 21, retryBackoff: true } +export async function subscribeToWallet (args) { + await subscribeToDeposits(args) + await subscribeToWithdrawals(args) +} + +const logEvent = (name, args) => console.log(`event ${name} triggered with args`, args) +const logEventError = (name, error) => console.error(`error running ${name}`, error) + +async function subscribeToDeposits (args) { + const { models, lnd } = args + + const [lastConfirmed] = await models.$queryRaw` + SELECT "confirmedIndex" + FROM "Invoice" + ORDER BY "confirmedIndex" DESC NULLS LAST + LIMIT 1` + + // https://www.npmjs.com/package/ln-service#subscribetoinvoices + const sub = subscribeToInvoices({ lnd, confirmed_after: lastConfirmed?.confirmedIndex }) + sub.on('invoice_updated', async (inv) => { + try { + if (inv.secret) { + logEvent('invoice_updated', inv) + await checkInvoice({ data: { hash: inv.id }, ...args }) + } else { + // this is a HODL invoice. We need to use SubscribeToInvoice which has is_held transitions + // https://api.lightning.community/api/lnd/invoices/subscribe-single-invoice + // SubscribeToInvoices is only for invoice creation and settlement transitions + // https://api.lightning.community/api/lnd/lightning/subscribe-invoices + await subscribeToHodlInvoice({ hash: inv.id, ...args }) + } + } catch (error) { + // XXX This is a critical error + // It might mean that we failed to record an invoice confirming + // and we won't get another chance to record it until restart + logEventError('invoice_updated', error) + } + }) + sub.on('error', console.error) -// TODO this should all be done via websockets -export async function checkInvoice ({ data: { hash, isHeldSet }, boss, models, lnd }) { - let inv + // check pending deposits as a redundancy in case we failed to record + // an invoice_updated event + await checkPendingDeposits(args) +} + +async function subscribeToHodlInvoice (args) { + const { lnd, hash, models } = args + let sub try { - inv = await getInvoice({ id: hash, lnd }) - } catch (err) { - console.log(err, hash) - // on lnd related errors, we manually retry so we don't exponentially backoff - await boss.send('checkInvoice', { hash }, walletOptions) - return + await new Promise((resolve, reject) => { + // https://www.npmjs.com/package/ln-service#subscribetoinvoice + sub = subscribeToInvoice({ id: hash, lnd }) + sub.on('invoice_updated', async (inv) => { + logEvent('hodl_invoice_updated', inv) + try { + // record the is_held transition + if (inv.is_held) { + // this is basically confirm_invoice without setting confirmed_at + // and without setting the user balance + // those will be set when the invoice is settled by user action + await models.invoice.update({ + where: { hash }, + data: { + msatsReceived: Number(inv.received_mtokens), + isHeld: true + } + }) + // after that we can stop listening for updates + resolve() + } + } catch (error) { + logEventError('hodl_invoice_updated', error) + reject(error) + } + }) + sub.on('error', reject) + }) + } finally { + sub?.removeAllListeners() } - console.log(inv) +} - // check if invoice still exists since HODL invoices get deleted after usage - const dbInv = await models.invoice.findUnique({ where: { hash } }) - if (!dbInv) return +async function checkInvoice ({ data: { hash }, boss, models, lnd }) { + const inv = await getInvoice({ id: hash, lnd }) - const expired = new Date(inv.expires_at) <= new Date() + // invoice could be created by LND but wasn't inserted into the database yet + // this is expected and the function will be called again with the updates + const dbInv = await models.invoice.findUnique({ where: { hash } }) + if (!dbInv) { + console.log('invoice not found in database', hash) + return + } - if (inv.is_confirmed && !inv.is_held) { - // never mark hodl invoices as confirmed here because - // we manually confirm them when we settle them + if (inv.is_confirmed) { + // NOTE: confirm invoice prevents double confirmations (idempotent) + // ALSO: is_confirmed and is_held are mutually exclusive + // that is, a hold invoice will first be is_held but not is_confirmed + // and once it's settled it will be is_confirmed but not is_held await serialize(models, - models.$executeRaw`SELECT confirm_invoice(${inv.id}, ${Number(inv.received_mtokens)})`) + models.$executeRaw`SELECT confirm_invoice(${inv.id}, ${Number(inv.received_mtokens)})`, + models.invoice.update({ where: { hash }, data: { confirmedIndex: inv.confirmed_index } }) + ) + + // don't send notifications for hodl invoices + if (dbInv.preimage) return + sendUserNotification(dbInv.userId, { title: `${numWithUnits(msatsToSats(inv.received_mtokens), { abbreviate: false })} were deposited in your account`, body: dbInv.comment || undefined, tag: 'DEPOSIT', data: { sats: msatsToSats(inv.received_mtokens) } }).catch(console.error) - return boss.send('nip57', { hash }) + return await boss.send('nip57', { hash }) } if (inv.is_canceled) { - return serialize(models, + return await serialize(models, models.invoice.update({ where: { hash: inv.id @@ -51,41 +134,62 @@ export async function checkInvoice ({ data: { hash, isHeldSet }, boss, models, l } })) } +} - if (inv.is_held && !isHeldSet) { - // this is basically confirm_invoice without setting confirmed_at since it's not settled yet - // and without setting the user balance since that's done inside the same tx as the HODL invoice action. - await serialize(models, - models.invoice.update({ where: { hash }, data: { msatsReceived: Number(inv.received_mtokens), isHeld: true } })) - // remember that we already executed this if clause - // (even though the query above is idempotent but imo, this makes the flow more clear) - isHeldSet = true - } +async function subscribeToWithdrawals (args) { + const { lnd } = args - if (!expired) { - // recheck in 5 seconds if the invoice is younger than 5 minutes - // otherwise recheck in 60 seconds - const startAfter = new Date(inv.created_at) > datePivot(new Date(), { minutes: -5 }) ? 5 : 60 - await boss.send('checkInvoice', { hash, isHeldSet }, { ...walletOptions, startAfter }) - } + // https://www.npmjs.com/package/ln-service#subscribetopayments + const sub = subscribeToPayments({ lnd }) + sub.on('confirmed', async (payment) => { + logEvent('confirmed', payment) + try { + await checkWithdrawal({ data: { hash: payment.id }, ...args }) + } catch (error) { + // XXX This is a critical error + // It might mean that we failed to record an invoice confirming + // and we won't get another chance to record it until restart + logEventError('confirmed', error) + } + }) + sub.on('failed', async (payment) => { + logEvent('failed', payment) + try { + await checkWithdrawal({ data: { hash: payment.id }, ...args }) + } catch (error) { + // XXX This is a critical error + // It might mean that we failed to record an invoice confirming + // and we won't get another chance to record it until restart + logEventError('failed', error) + } + }) + // ignore payment attempts + sub.on('paying', (attempt) => {}) + sub.on('error', console.error) - if (expired && inv.is_held) { - await cancelHodlInvoice({ id: hash, lnd }) - } + // check pending withdrawals since they might have been paid while worker was down + await checkPendingWithdrawals(args) } -export async function checkWithdrawal ({ data: { id, hash }, boss, models, lnd }) { +async function checkWithdrawal ({ data: { hash }, boss, models, lnd }) { + const dbWdrwl = await models.withdrawl.findFirst({ where: { hash } }) + if (!dbWdrwl) { + // [WARNING] LND paid an invoice that wasn't created via the SN GraphQL API. + // >>> an adversary might be draining our funds right now <<< + console.error('unexpected outgoing payment detected:', hash) + // TODO: log this in Slack + return + } + let wdrwl let notFound = false try { wdrwl = await getPayment({ id: hash, lnd }) } catch (err) { - console.log(err) if (err[1] === 'SentPaymentNotFound') { notFound = true } else { - // on lnd related errors, we manually retry so we don't exponentially backoff - await boss.send('checkWithdrawal', { id, hash }, walletOptions) + console.error('error getting payment', err) return } } @@ -94,7 +198,7 @@ export async function checkWithdrawal ({ data: { id, hash }, boss, models, lnd } const fee = Number(wdrwl.payment.fee_mtokens) const paid = Number(wdrwl.payment.mtokens) - fee await serialize(models, models.$executeRaw` - SELECT confirm_withdrawl(${id}::INTEGER, ${paid}, ${fee})`) + SELECT confirm_withdrawl(${dbWdrwl.id}::INTEGER, ${paid}, ${fee})`) } else if (wdrwl?.is_failed || notFound) { let status = 'UNKNOWN_FAILURE' if (wdrwl?.failed.is_insufficient_balance) { @@ -106,12 +210,11 @@ export async function checkWithdrawal ({ data: { id, hash }, boss, models, lnd } } else if (wdrwl?.failed.is_route_not_found) { status = 'ROUTE_NOT_FOUND' } - await serialize(models, models.$executeRaw` - SELECT reverse_withdrawl(${id}::INTEGER, ${status}::"WithdrawlStatus")`) - } else { - // we need to requeue to check again in 5 seconds - const startAfter = new Date(wdrwl.created_at) > datePivot(new Date(), { minutes: -5 }) ? 5 : 60 - await boss.send('checkWithdrawal', { id, hash }, { ...walletOptions, startAfter }) + + await serialize(models, + models.$executeRaw` + SELECT reverse_withdrawl(${dbWdrwl.id}::INTEGER, ${status}::"WithdrawlStatus")` + ) } } @@ -124,3 +227,40 @@ export async function autoDropBolt11s ({ models }) { AND hash IS NOT NULL;` ) } + +// The callback subscriptions above will NOT get called for HODL invoices that are already paid. +// So we manually cancel the HODL invoice here if it wasn't settled by user action +export async function finalizeHodlInvoice ({ data: { hash }, models, lnd }) { + const inv = await getInvoice({ id: hash, lnd }) + if (inv.is_confirmed) { + return + } + + await cancelHodlInvoice({ id: hash, lnd }) +} + +export async function checkPendingDeposits (args) { + const { models } = args + const pendingDeposits = await models.invoice.findMany({ where: { confirmedAt: null, cancelled: false } }) + for (const d of pendingDeposits) { + try { + await checkInvoice({ data: { id: d.id, hash: d.hash }, ...args }) + await sleep(10) + } catch { + console.error('error checking invoice', d.hash) + } + } +} + +export async function checkPendingWithdrawals (args) { + const { models } = args + const pendingWithdrawals = await models.withdrawl.findMany({ where: { status: null } }) + for (const w of pendingWithdrawals) { + try { + await checkWithdrawal({ data: { id: w.id, hash: w.hash }, ...args }) + await sleep(10) + } catch { + console.error('error checking withdrawal', w.hash) + } + } +}