Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use LND subscriptions #726

Merged
merged 24 commits into from
Jan 8, 2024
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
c1f8ce0
Use parallel invoice subscriptions
ekzyis Jan 1, 2024
5929983
Fix missing idempotency
ekzyis Jan 2, 2024
221a9cd
Log error
ekzyis Jan 4, 2024
d5fdfb1
Use cursor for invoice subscription
ekzyis Jan 4, 2024
e8afaee
Subscribe to outgoing payments for withdrawals
ekzyis Jan 4, 2024
2b92013
Add TODO comments regarding migration to LND subscriptions
ekzyis Jan 4, 2024
d073099
Also use isPoll variable in checkInvoice
ekzyis Jan 4, 2024
b569145
Queue status check of pending withdrawals
ekzyis Jan 4, 2024
7a42a59
Use for loop to check pending withdrawals
ekzyis Jan 4, 2024
ec25e58
Reconnect to LND gRPC API on error
ekzyis Jan 5, 2024
00a92c6
Fix hash modified of applied migrations
ekzyis Jan 5, 2024
3a370e5
Separate wallet code from worker index
ekzyis Jan 5, 2024
13698e2
refactor subscription code some more
ekzyis Jan 5, 2024
6a2539e
Migrate from polling to LND subscriptions
ekzyis Jan 5, 2024
65cc0dd
Remove unnecessary reconnect code
ekzyis Jan 5, 2024
d5a5103
Add FIXME
ekzyis Jan 6, 2024
4852fba
Add listener for HODL invoice updates
ekzyis Jan 6, 2024
e8427dc
Remove obsolete comment
ekzyis Jan 6, 2024
8eb6794
Update README
ekzyis Jan 6, 2024
cbbab5e
Add job to cancel hodl invoice if expired
ekzyis Jan 6, 2024
2ad9d8f
Fix missing else
ekzyis Jan 6, 2024
023ddda
Merge branch 'master' into worker-invoice-subscriptions
huumn Jan 7, 2024
b04d65c
small bug fixes and readability enhancements
huumn Jan 8, 2024
29eadeb
refine and add periodic redundant deposit/withdrawal checks
huumn Jan 8, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ To configure the image proxy, you will need to set the `IMGPROXY_` env vars. `NE
The site is written in javascript using Next.js, a React framework. The backend API is provided via GraphQL. The database is PostgreSQL modeled with Prisma. The job queue is also maintained in PostgreSQL. We use lnd for our lightning node. A customized Bootstrap theme is used for styling.

# processes
There are two. 1. the web app and 2. the worker, which dequeues jobs sent to it by the web app, e.g. polling lnd for invoice/payment status
There are two. 1. the web app and 2. the worker, which dequeues jobs sent to it by the web app, e.g. processing images.

# wallet transaction safety
To ensure stackers balances are kept sane, all wallet updates are run in serializable transactions at the database level. Because prisma has relatively poor support for transactions all wallet touching code is written in plpgsql stored procedures and can be found in the prisma/migrations folder.
Expand Down
7 changes: 6 additions & 1 deletion api/resolvers/wallet.js
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,12 @@ export default {
${expiresAt}::timestamp, ${amount * 1000}, ${user.id}::INTEGER, ${description}, NULL, NULL,
${invLimit}::INTEGER, ${balanceLimit})`)

if (hodlInvoice) await models.invoice.update({ where: { hash: invoice.id }, data: { preimage: invoice.secret } })
if (hodlInvoice) {
await models.invoice.update({ where: { hash: invoice.id }, data: { preimage: invoice.secret } })
await models.$queryRaw`
INSERT INTO pgboss.job(name, data, startafter)
VALUES ('finalizeHodlInvoice', ${JSON.stringify({ hash: invoice.id })}::JSONB, ${expiresAt})`
}

// the HMAC is only returned during invoice creation
// this makes sure that only the person who created this invoice
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
-- AlterTable
ALTER TABLE "Invoice" ADD COLUMN "confirmedIndex" BIGINT;

-- CreateIndex
CREATE INDEX "Invoice.confirmedIndex_index" ON "Invoice"("confirmedIndex");
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
-- This is an empty migration.

-- remove 'checkInvoice' job insertion since we're using LND subscriptions now
CREATE OR REPLACE FUNCTION create_invoice(hash TEXT, bolt11 TEXT, expires_at timestamp(3) without time zone,
msats_req BIGINT, user_id INTEGER, idesc TEXT, comment TEXT, lud18_data JSONB, inv_limit INTEGER, balance_limit_msats BIGINT)
RETURNS "Invoice"
LANGUAGE plpgsql
AS $$
DECLARE
invoice "Invoice";
inv_limit_reached BOOLEAN;
balance_limit_reached BOOLEAN;
inv_pending_msats BIGINT;
BEGIN
PERFORM ASSERT_SERIALIZED();

-- prevent too many pending invoices
SELECT inv_limit > 0 AND count(*) >= inv_limit, COALESCE(sum("msatsRequested"), 0) INTO inv_limit_reached, inv_pending_msats
FROM "Invoice"
WHERE "userId" = user_id AND "expiresAt" > now_utc() AND "confirmedAt" IS NULL AND cancelled = false;

IF inv_limit_reached THEN
RAISE EXCEPTION 'SN_INV_PENDING_LIMIT';
END IF;

-- prevent pending invoices + msats from exceeding the limit
SELECT balance_limit_msats > 0 AND inv_pending_msats+msats_req+msats > balance_limit_msats INTO balance_limit_reached
FROM users
WHERE id = user_id;

IF balance_limit_reached THEN
RAISE EXCEPTION 'SN_INV_EXCEED_BALANCE';
END IF;

-- we good, proceed frens
INSERT INTO "Invoice" (hash, bolt11, "expiresAt", "msatsRequested", "userId", created_at, updated_at, "desc", comment, "lud18Data")
VALUES (hash, bolt11, expires_at, msats_req, user_id, now_utc(), now_utc(), idesc, comment, lud18_data) RETURNING * INTO invoice;

RETURN invoice;
END;
$$;

-- remove 'checkWithdrawal' job insertion since we're using LND subscriptions now
CREATE OR REPLACE FUNCTION create_withdrawl(lnd_id TEXT, invoice TEXT, msats_amount BIGINT, msats_max_fee BIGINT, username TEXT)
RETURNS "Withdrawl"
LANGUAGE plpgsql
AS $$
DECLARE
user_id INTEGER;
user_msats BIGINT;
withdrawl "Withdrawl";
BEGIN
PERFORM ASSERT_SERIALIZED();

SELECT msats, id INTO user_msats, user_id FROM users WHERE name = username;
IF (msats_amount + msats_max_fee) > user_msats THEN
RAISE EXCEPTION 'SN_INSUFFICIENT_FUNDS';
END IF;

IF EXISTS (SELECT 1 FROM "Withdrawl" WHERE hash = lnd_id AND status IS NULL) THEN
RAISE EXCEPTION 'SN_PENDING_WITHDRAWL_EXISTS';
END IF;

IF EXISTS (SELECT 1 FROM "Withdrawl" WHERE hash = lnd_id AND status = 'CONFIRMED') THEN
RAISE EXCEPTION 'SN_CONFIRMED_WITHDRAWL_EXISTS';
END IF;

INSERT INTO "Withdrawl" (hash, bolt11, "msatsPaying", "msatsFeePaying", "userId", created_at, updated_at)
VALUES (lnd_id, invoice, msats_amount, msats_max_fee, user_id, now_utc(), now_utc()) RETURNING * INTO withdrawl;

UPDATE users SET msats = msats - msats_amount - msats_max_fee WHERE id = user_id;

RETURN withdrawl;
END;
$$;
2 changes: 2 additions & 0 deletions prisma/schema.prisma
Original file line number Diff line number Diff line change
Expand Up @@ -540,6 +540,7 @@ model Invoice {
bolt11 String
expiresAt DateTime
confirmedAt DateTime?
confirmedIndex BigInt?
cancelled Boolean @default(false)
msatsRequested BigInt
msatsReceived BigInt?
Expand All @@ -550,6 +551,7 @@ model Invoice {

@@index([createdAt], map: "Invoice.created_at_index")
@@index([userId], map: "Invoice.userId_index")
@@index([confirmedIndex], map: "Invoice.confirmedIndex_index")
}

model Withdrawl {
Expand Down
8 changes: 5 additions & 3 deletions worker/index.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import PgBoss from 'pg-boss'
import nextEnv from '@next/env'
import { PrismaClient } from '@prisma/client'
import { checkInvoice, checkWithdrawal, autoDropBolt11s } from './wallet.js'
import { autoDropBolt11s, finalizeHodlInvoice, lndSubscriptions } from './wallet.js'
import { repin } from './repin.js'
import { trust } from './trust.js'
import { auction } from './auction.js'
Expand Down Expand Up @@ -71,8 +71,10 @@ async function work () {
}

await boss.start()
await boss.work('checkInvoice', jobWrapper(checkInvoice))
await boss.work('checkWithdrawal', jobWrapper(checkWithdrawal))

lndSubscriptions(args).catch(console.error)

await boss.work('finalizeHodlInvoice', jobWrapper(finalizeHodlInvoice))
await boss.work('autoDropBolt11s', jobWrapper(autoDropBolt11s))
await boss.work('repin-*', jobWrapper(repin))
await boss.work('trust', jobWrapper(trust))
Expand Down
188 changes: 156 additions & 32 deletions worker/wallet.js
Original file line number Diff line number Diff line change
@@ -1,36 +1,165 @@
import serialize from '../api/resolvers/serial.js'
import { getInvoice, getPayment, cancelHodlInvoice } from 'ln-service'
import { datePivot } from '../lib/time.js'
import { getInvoice, getPayment, cancelHodlInvoice, subscribeToInvoices, subscribeToPayments, subscribeToInvoice } from 'ln-service'
import { sendUserNotification } from '../api/webPush/index.js'
import { msatsToSats, numWithUnits } from '../lib/format'
import { INVOICE_RETENTION_DAYS } from '../lib/constants'

const walletOptions = { startAfter: 5, retryLimit: 21, retryBackoff: true }
export async function lndSubscriptions (args) {
subscribeToDeposits(args).catch(console.error)
subscribeToWithdrawals(args).catch(console.error)
}

const logEvent = (name, args) => console.log(`event ${name} triggered with args`, args)
const logEventError = (name, error) => console.error(`error running ${name}`, error)

async function subscribeToDeposits (args) {
const { models, lnd } = args

const [lastConfirmed] = await models.$queryRaw`SELECT "confirmedIndex" FROM "Invoice" ORDER BY "confirmedIndex" DESC NULLS LAST LIMIT 1`

// https://www.npmjs.com/package/ln-service#subscribetoinvoices
const sub = subscribeToInvoices({ lnd, confirmed_after: lastConfirmed?.confirmedIndex })
sub.on('invoice_updated', async (inv) => {
if (!inv.secret) {
// this is a HODL invoice. We need to use SubscribeToInvoice
// to get all state transition since SubscribeToInvoices is only for invoice creation and settlement.
// see https://api.lightning.community/api/lnd/invoices/subscribe-single-invoice
// vs https://api.lightning.community/api/lnd/lightning/subscribe-invoices
return subscribeToHodlInvoice({ hash: inv.id, ...args }).catch(console.error)
}
logEvent('invoice_updated', inv)
try {
await checkInvoice({ data: { hash: inv.id }, ...args })
} catch (error) {
logEventError('invoice_updated', error)
}
})
sub.on('error', console.error)

// NOTE:
// This can be removed when all pending invoices that were created before we switched off polling ("pre-migration invoices") have finalized.
// This is one hour after deployment since that's when these invoices expire if they weren't paid already.
// This is required to sync the database with any invoice that was paid and thus will not trigger the callback of `subscribeToInvoices` anymore.
// For pre-migration invoices that weren't paid, we can rely on the LND subscription to trigger on updates.
await checkPendingDeposits(args)
}

async function subscribeToHodlInvoice (args) {
const { lnd, hash } = args
let sub
try {
await new Promise((resolve, reject) => {
// https://www.npmjs.com/package/ln-service#subscribetoinvoice
sub = subscribeToInvoice({ id: hash, lnd })
sub.on('invoice_updated', async (inv) => {
logEvent('hodl_invoice_updated', inv)
try {
await checkInvoice({ data: { hash: inv.id }, ...args })
// If invoice is canceled, the invoice was finalized and there will be no more updates for this HODL invoice.
// On expiration, the callback will also get called with `is_canceled` set, so expiration is the same as cancelation.
// However, the callback will NOT get called if the HODL invoice was already paid.
// We run a job for this case to manually cancel the invoice if it wasn't settled yet.
// That's why we can also stop listening for updates when the invoice was already paid (`is_held` set).
if (inv.is_held || inv.is_canceled) {
return resolve()
}
} catch (error) {
logEventError('hodl_invoice_updated', error)
reject(error)
}
})
sub.on('error', reject)
})
} catch (error) {
console.error(error)
}
sub?.removeAllListeners()
}

export async function finalizeHodlInvoice ({ data: { hash }, models, lnd }) {
let inv
try {
inv = await getInvoice({ id: hash, lnd })
} catch (err) {
console.log(err, hash)
return
}
if (!inv.is_confirmed) await cancelHodlInvoice({ id: hash, lnd })
}

async function subscribeToWithdrawals (args) {
const { lnd } = args

// https://www.npmjs.com/package/ln-service#subscribetopayments
const sub = subscribeToPayments({ lnd })
sub.on('confirmed', async (payment) => {
logEvent('confirmed', payment)
try {
await checkWithdrawal({ data: { hash: payment.id }, ...args })
} catch (error) {
logEventError('confirmed', error)
}
})
sub.on('failed', async (payment) => {
logEvent('failed', payment)
try {
await checkWithdrawal({ data: { hash: payment.id }, ...args })
} catch (error) {
logEventError('failed', error)
}
})
// ignore payment attempts
sub.on('paying', (attempt) => {})
sub.on('error', console.error)

// check pending withdrawals since they might have been paid while worker was down.
await checkPendingWithdrawals(args)
}

async function checkPendingWithdrawals (args) {
const { models } = args
const pendingWithdrawals = await models.withdrawl.findMany({ where: { status: null } })
for (const w of pendingWithdrawals) {
await checkWithdrawal({ data: { id: w.id, hash: w.hash }, ...args })
}
}

async function checkPendingDeposits (args) {
const { models } = args
const pendingDeposits = await models.invoice.findMany({ where: { confirmedAt: null, cancelled: false } })
for (const d of pendingDeposits) {
await checkInvoice({ data: { id: d.id, hash: d.hash }, ...args })
}
}

// TODO this should all be done via websockets
export async function checkInvoice ({ data: { hash, isHeldSet }, boss, models, lnd }) {
async function checkInvoice ({ data: { hash }, boss, models, lnd }) {
let inv
try {
inv = await getInvoice({ id: hash, lnd })
} catch (err) {
console.log(err, hash)
// on lnd related errors, we manually retry so we don't exponentially backoff
await boss.send('checkInvoice', { hash }, walletOptions)
return
}
console.log(inv)

// check if invoice still exists since HODL invoices get deleted after usage
// check if invoice exists since it might just have been created by LND and wasn't inserted into the database yet
// but that is not a problem since this function will be called again with the update
// FIXME: there might be a race condition here if the invoice gets paid before the invoice was inserted into the db.
ekzyis marked this conversation as resolved.
Show resolved Hide resolved
const dbInv = await models.invoice.findUnique({ where: { hash } })
if (!dbInv) return
if (!dbInv) {
console.log('invoice not found in database', hash)
return
}

const expired = new Date(inv.expires_at) <= new Date()

if (inv.is_confirmed && !inv.is_held) {
// never mark hodl invoices as confirmed here because
// we manually confirm them when we settle them
await serialize(models,
models.$executeRaw`SELECT confirm_invoice(${inv.id}, ${Number(inv.received_mtokens)})`)
models.$executeRaw`SELECT confirm_invoice(${inv.id}, ${Number(inv.received_mtokens)})`,
models.invoice.update({ where: { hash }, data: { confirmedIndex: inv.confirmed_index } })
)
sendUserNotification(dbInv.userId, {
title: `${numWithUnits(msatsToSats(inv.received_mtokens), { abbreviate: false })} were deposited in your account`,
body: dbInv.comment || undefined,
Expand All @@ -52,40 +181,41 @@ export async function checkInvoice ({ data: { hash, isHeldSet }, boss, models, l
}))
}

if (inv.is_held && !isHeldSet) {
if (inv.is_held) {
// this is basically confirm_invoice without setting confirmed_at since it's not settled yet
// and without setting the user balance since that's done inside the same tx as the HODL invoice action.
await serialize(models,
models.invoice.update({ where: { hash }, data: { msatsReceived: Number(inv.received_mtokens), isHeld: true } }))
// remember that we already executed this if clause
// (even though the query above is idempotent but imo, this makes the flow more clear)
isHeldSet = true
}

if (!expired) {
// recheck in 5 seconds if the invoice is younger than 5 minutes
// otherwise recheck in 60 seconds
const startAfter = new Date(inv.created_at) > datePivot(new Date(), { minutes: -5 }) ? 5 : 60
await boss.send('checkInvoice', { hash, isHeldSet }, { ...walletOptions, startAfter })
models.invoice.update({ where: { hash }, data: { msatsReceived: Number(inv.received_mtokens), isHeld: true, confirmedIndex: inv.confirmed_index } }))
}

if (expired && inv.is_held) {
await cancelHodlInvoice({ id: hash, lnd })
}
}

export async function checkWithdrawal ({ data: { id, hash }, boss, models, lnd }) {
async function checkWithdrawal ({ data: { hash }, boss, models, lnd }) {
const dbWdrwl = await models.withdrawl.findFirst({ where: { hash } })
if (!dbWdrwl) {
// [WARNING] Withdrawal was not found in database!
// This might be the case if we're subscribed to outgoing payments
// but for some reason, LND paid an invoice that wasn't created via the SN GraphQL API.
// >>> If this line ever gets hit, an adversary might be draining our funds right now <<<
console.error('unexpected outgoing payment detected:', hash)
// TODO: log this in Slack
return
}
const id = dbWdrwl.id
let wdrwl
let notFound = false

try {
wdrwl = await getPayment({ id: hash, lnd })
} catch (err) {
console.log(err)
if (err[1] === 'SentPaymentNotFound') {
// withdrawal was not found by LND
notFound = true
} else {
// on lnd related errors, we manually retry so we don't exponentially backoff
await boss.send('checkWithdrawal', { id, hash }, walletOptions)
return
}
}
Expand All @@ -105,13 +235,7 @@ export async function checkWithdrawal ({ data: { id, hash }, boss, models, lnd }
status = 'PATHFINDING_TIMEOUT'
} else if (wdrwl?.failed.is_route_not_found) {
status = 'ROUTE_NOT_FOUND'
}
await serialize(models, models.$executeRaw`
SELECT reverse_withdrawl(${id}::INTEGER, ${status}::"WithdrawlStatus")`)
} else {
// we need to requeue to check again in 5 seconds
const startAfter = new Date(wdrwl.created_at) > datePivot(new Date(), { minutes: -5 }) ? 5 : 60
await boss.send('checkWithdrawal', { id, hash }, { ...walletOptions, startAfter })
} else await serialize(models, models.$executeRaw`SELECT reverse_withdrawl(${id}::INTEGER, ${status}::"WithdrawlStatus")`)
huumn marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand Down