Skip to content

Commit

Permalink
Use LND subscriptions (#726)
Browse files Browse the repository at this point in the history
* Use parallel invoice subscriptions

* Fix missing idempotency

* Log error

* Use cursor for invoice subscription

* Subscribe to outgoing payments for withdrawals

* Add TODO comments regarding migration to LND subscriptions

* Also use isPoll variable in checkInvoice

* Queue status check of pending withdrawals

* Use for loop to check pending withdrawals

* Reconnect to LND gRPC API on error

* Fix hash modified of applied migrations

* Separate wallet code from worker index

* refactor subscription code some more

* remove unnecessary subWrapper abstraction
* move all wallet related code into worker/wallet.js such that only a single import is needed in worker/index.js

* Migrate from polling to LND subscriptions

* Remove unnecessary reconnect code

* Add FIXME

* Add listener for HODL invoice updates

* Remove obsolete comment

* Update README

* Add job to cancel hodl invoice if expired

* Fix missing else

* small bug fixes and readability enhancements

* refine and add periodic redundant deposit/withdrawal checks

---------

Co-authored-by: ekzyis <[email protected]>
Co-authored-by: Keyan <[email protected]>
Co-authored-by: keyan <[email protected]>
  • Loading branch information
4 people authored Jan 8, 2024
1 parent cb076ec commit 2151323
Show file tree
Hide file tree
Showing 10 changed files with 310 additions and 65 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ To configure the image proxy, you will need to set the `IMGPROXY_` env vars. `NE
The site is written in javascript using Next.js, a React framework. The backend API is provided via GraphQL. The database is PostgreSQL modeled with Prisma. The job queue is also maintained in PostgreSQL. We use lnd for our lightning node. A customized Bootstrap theme is used for styling.

# processes
There are two. 1. the web app and 2. the worker, which dequeues jobs sent to it by the web app, e.g. polling lnd for invoice/payment status
There are two. 1. the web app and 2. the worker, which dequeues jobs sent to it by the web app, e.g. processing images.

# wallet transaction safety
To ensure stackers balances are kept sane, all wallet updates are run in serializable transactions at the database level. Because prisma has relatively poor support for transactions all wallet touching code is written in plpgsql stored procedures and can be found in the prisma/migrations folder.
Expand Down
5 changes: 2 additions & 3 deletions api/resolvers/serial.js
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,8 @@ export async function serializeInvoicable (query, { models, lnd, hash, hmac, me,
if (hash) {
invoice = await checkInvoice(models, hash, hmac, enforceFee)
trx = [
models.$queryRaw`UPDATE users SET msats = msats + ${invoice.msatsReceived} WHERE id = ${invoice.user.id}`,
...trx,
models.invoice.update({ where: { hash: invoice.hash }, data: { confirmedAt: new Date() } })
models.$executeRaw`SELECT confirm_invoice(${hash}, ${invoice.msatsReceived})`,

This comment has been minimized.

Copy link
@ekzyis

ekzyis Jan 10, 2024

Author Member

I guess we don't set confirmedIndex here since the subscription callback will set it in checkInvoice anyway? And we don't need it set here?

This comment has been minimized.

Copy link
@huumn

huumn Jan 10, 2024

Author Member

Yep!

...trx
]
}

Expand Down
4 changes: 1 addition & 3 deletions api/resolvers/wallet.js
Original file line number Diff line number Diff line change
Expand Up @@ -297,12 +297,10 @@ export default {
console.log('invoice', balanceLimit)

const [inv] = await serialize(models,
models.$queryRaw`SELECT * FROM create_invoice(${invoice.id}, ${invoice.request},
models.$queryRaw`SELECT * FROM create_invoice(${invoice.id}, ${hodlInvoice ? invoice.secret : null}::TEXT, ${invoice.request},
${expiresAt}::timestamp, ${amount * 1000}, ${user.id}::INTEGER, ${description}, NULL, NULL,
${invLimit}::INTEGER, ${balanceLimit})`)

if (hodlInvoice) await models.invoice.update({ where: { hash: invoice.id }, data: { preimage: invoice.secret } })

// the HMAC is only returned during invoice creation
// this makes sure that only the person who created this invoice
// has access to the HMAC
Expand Down
4 changes: 2 additions & 2 deletions lib/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ export const MAX_TERRITORY_DESC_LENGTH = 1000 // 1k
export const MAX_POLL_CHOICE_LENGTH = 40
export const ITEM_SPAM_INTERVAL = '10m'
export const ANON_ITEM_SPAM_INTERVAL = '0'
export const INV_PENDING_LIMIT = 10
export const INV_PENDING_LIMIT = 100
export const BALANCE_LIMIT_MSATS = 250000000 // 250k sat
export const SN_USER_IDS = [616, 6030, 946, 4502]
export const ANON_INV_PENDING_LIMIT = 100
export const ANON_INV_PENDING_LIMIT = 1000
export const ANON_BALANCE_LIMIT_MSATS = 0 // disable
export const MAX_POLL_NUM_CHOICES = 10
export const MIN_POLL_NUM_CHOICES = 2
Expand Down
2 changes: 1 addition & 1 deletion pages/api/lnurlp/[username]/pay.js
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ export default async ({ query: { username, amount, nostr, comment, payerdata: pa
})

await serialize(models,
models.$queryRaw`SELECT * FROM create_invoice(${invoice.id}, ${invoice.request},
models.$queryRaw`SELECT * FROM create_invoice(${invoice.id}, NULL, ${invoice.request},
${expiresAt}::timestamp, ${Number(amount)}, ${user.id}::INTEGER, ${noteStr || description},
${comment || null}, ${parsedPayerData || null}::JSONB, ${INV_PENDING_LIMIT}::INTEGER, ${BALANCE_LIMIT_MSATS})`)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
-- AlterTable
ALTER TABLE "Invoice" ADD COLUMN "confirmedIndex" BIGINT;

-- CreateIndex
CREATE INDEX "Invoice.confirmedIndex_index" ON "Invoice"("confirmedIndex");
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
-- remove 'checkInvoice' job insertion since we're using LND subscriptions now
-- also allow function to take preimage as an argument
DROP FUNCTION IF EXISTS create_invoice(hash TEXT, bolt11 TEXT, expires_at timestamp(3) without time zone,
msats_req BIGINT, user_id INTEGER, idesc TEXT, comment TEXT, lud18_data JSONB, inv_limit INTEGER, balance_limit_msats BIGINT);
CREATE OR REPLACE FUNCTION create_invoice(hash TEXT, preimage TEXT, bolt11 TEXT, expires_at timestamp(3) without time zone,
msats_req BIGINT, user_id INTEGER, idesc TEXT, comment TEXT, lud18_data JSONB, inv_limit INTEGER, balance_limit_msats BIGINT)
RETURNS "Invoice"
LANGUAGE plpgsql
AS $$
DECLARE
invoice "Invoice";
inv_limit_reached BOOLEAN;
balance_limit_reached BOOLEAN;
inv_pending_msats BIGINT;
BEGIN
PERFORM ASSERT_SERIALIZED();

-- prevent too many pending invoices
SELECT inv_limit > 0 AND count(*) >= inv_limit, COALESCE(sum("msatsRequested"), 0) INTO inv_limit_reached, inv_pending_msats
FROM "Invoice"
WHERE "userId" = user_id AND "expiresAt" > now_utc() AND "confirmedAt" IS NULL AND cancelled = false;

IF inv_limit_reached THEN
RAISE EXCEPTION 'SN_INV_PENDING_LIMIT';
END IF;

-- prevent pending invoices + msats from exceeding the limit
SELECT balance_limit_msats > 0 AND inv_pending_msats+msats_req+msats > balance_limit_msats INTO balance_limit_reached
FROM users
WHERE id = user_id;

IF balance_limit_reached THEN
RAISE EXCEPTION 'SN_INV_EXCEED_BALANCE';
END IF;

-- we good, proceed frens
INSERT INTO "Invoice" (hash, preimage, bolt11, "expiresAt", "msatsRequested", "userId", created_at, updated_at, "desc", comment, "lud18Data")
VALUES (hash, preimage, bolt11, expires_at, msats_req, user_id, now_utc(), now_utc(), idesc, comment, lud18_data) RETURNING * INTO invoice;

IF preimage IS NOT NULL THEN
INSERT INTO pgboss.job (name, data, retrylimit, retrybackoff, startafter)
VALUES ('finalizeHodlInvoice', jsonb_build_object('hash', hash), 21, true, expires_at);
END IF;

RETURN invoice;
END;
$$;

-- remove 'checkWithdrawal' job insertion since we're using LND subscriptions now
CREATE OR REPLACE FUNCTION create_withdrawl(lnd_id TEXT, invoice TEXT, msats_amount BIGINT, msats_max_fee BIGINT, username TEXT)
RETURNS "Withdrawl"
LANGUAGE plpgsql
AS $$
DECLARE
user_id INTEGER;
user_msats BIGINT;
withdrawl "Withdrawl";
BEGIN
PERFORM ASSERT_SERIALIZED();

SELECT msats, id INTO user_msats, user_id FROM users WHERE name = username;
IF (msats_amount + msats_max_fee) > user_msats THEN
RAISE EXCEPTION 'SN_INSUFFICIENT_FUNDS';
END IF;

IF EXISTS (SELECT 1 FROM "Withdrawl" WHERE hash = lnd_id AND status IS NULL) THEN
RAISE EXCEPTION 'SN_PENDING_WITHDRAWL_EXISTS';
END IF;

IF EXISTS (SELECT 1 FROM "Withdrawl" WHERE hash = lnd_id AND status = 'CONFIRMED') THEN
RAISE EXCEPTION 'SN_CONFIRMED_WITHDRAWL_EXISTS';
END IF;

INSERT INTO "Withdrawl" (hash, bolt11, "msatsPaying", "msatsFeePaying", "userId", created_at, updated_at)
VALUES (lnd_id, invoice, msats_amount, msats_max_fee, user_id, now_utc(), now_utc()) RETURNING * INTO withdrawl;

UPDATE users SET msats = msats - msats_amount - msats_max_fee WHERE id = user_id;

RETURN withdrawl;
END;
$$;

CREATE OR REPLACE FUNCTION check_invoices_and_withdrawals()
RETURNS INTEGER
LANGUAGE plpgsql
AS $$
DECLARE
BEGIN
INSERT INTO pgboss.schedule (name, cron, timezone) VALUES ('checkPendingDeposits', '*/10 * * * *', 'America/Chicago') ON CONFLICT DO NOTHING;
INSERT INTO pgboss.schedule (name, cron, timezone) VALUES ('checkPendingWithdrawals', '*/10 * * * *', 'America/Chicago') ON CONFLICT DO NOTHING;
return 0;
EXCEPTION WHEN OTHERS THEN
return 0;
END;
$$;

SELECT check_invoices_and_withdrawals();
DROP FUNCTION check_invoices_and_withdrawals();
2 changes: 2 additions & 0 deletions prisma/schema.prisma
Original file line number Diff line number Diff line change
Expand Up @@ -540,6 +540,7 @@ model Invoice {
bolt11 String
expiresAt DateTime
confirmedAt DateTime?
confirmedIndex BigInt?
cancelled Boolean @default(false)
msatsRequested BigInt
msatsReceived BigInt?
Expand All @@ -550,6 +551,7 @@ model Invoice {
@@index([createdAt], map: "Invoice.created_at_index")
@@index([userId], map: "Invoice.userId_index")
@@index([confirmedIndex], map: "Invoice.confirmedIndex_index")
}

model Withdrawl {
Expand Down
9 changes: 6 additions & 3 deletions worker/index.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import PgBoss from 'pg-boss'
import nextEnv from '@next/env'
import { PrismaClient } from '@prisma/client'
import { checkInvoice, checkWithdrawal, autoDropBolt11s } from './wallet.js'
import { autoDropBolt11s, checkPendingDeposits, checkPendingWithdrawals, finalizeHodlInvoice, subscribeToWallet } from './wallet.js'
import { repin } from './repin.js'
import { trust } from './trust.js'
import { auction } from './auction.js'
Expand Down Expand Up @@ -71,8 +71,11 @@ async function work () {
}

await boss.start()
await boss.work('checkInvoice', jobWrapper(checkInvoice))
await boss.work('checkWithdrawal', jobWrapper(checkWithdrawal))

await subscribeToWallet(args)
await boss.work('finalizeHodlInvoice', jobWrapper(finalizeHodlInvoice))
await boss.work('checkPendingDeposits', jobWrapper(checkPendingDeposits))
await boss.work('checkPendingWithdrawals', jobWrapper(checkPendingWithdrawals))
await boss.work('autoDropBolt11s', jobWrapper(autoDropBolt11s))
await boss.work('repin-*', jobWrapper(repin))
await boss.work('trust', jobWrapper(trust))
Expand Down
Loading

2 comments on commit 2151323

@ekzyis
Copy link
Member Author

@ekzyis ekzyis commented on 2151323 Jan 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The database will never register that a HODL invoice was paid while the worker was down if there is no check for is_held in checkInvoice.

To reproduce:

  1. Create hodl invoice
  2. Stop worker
  3. Pay invoice
  4. Start worker
  5. Payment stuck until finalizeHodlInvoice gets called

Patch:

diff --git a/worker/wallet.js b/worker/wallet.js
index 816f9ab..7041cd0 100644
--- a/worker/wallet.js
+++ b/worker/wallet.js
@@ -123,6 +123,16 @@ async function checkInvoice ({ data: { hash }, boss, models, lnd }) {
     return await boss.send('nip57', { hash })
   }

+  if (inv.is_held) {
+    return await models.invoice.update({
+      where: { hash },
+      data: {
+        msatsReceived: Number(inv.received_mtokens),
+        isHeld: true
+      }
+    })
+  }
+
   if (inv.is_canceled) {
     return await serialize(models,
       models.invoice.update({

@ekzyis
Copy link
Member Author

@ekzyis ekzyis commented on 2151323 Jan 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left some comments but other than that, this looks good and cleaner!

I don't think I'll find anything else. Makes all sense to me and easier to follow.

Please sign in to comment.