Skip to content

Commit

Permalink
Reconnect to LND gRPC API on error
Browse files Browse the repository at this point in the history
  • Loading branch information
ekzyis committed Jan 5, 2024
1 parent 7a42a59 commit ec25e58
Showing 1 changed file with 33 additions and 21 deletions.
54 changes: 33 additions & 21 deletions worker/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import { deleteItem } from './ephemeralItems.js'
import { deleteUnusedImages } from './deleteUnusedImages.js'
import { territoryBilling } from './territory.js'
import { ofac } from './ofac.js'
import { sleep } from '../lib/time.js'

const { loadEnvConfig } = nextEnv
const { ApolloClient, HttpLink, InMemoryCache } = apolloClient
Expand Down Expand Up @@ -70,36 +71,47 @@ async function work () {
}
}

async function subWrapper (sub, ...eventFns) {
for (let i = 0; i < eventFns.length; i += 2) {
const [event, fn] = [eventFns[i], eventFns[i + 1]]
sub.on(event, async (...args) => {
console.log(`event ${event} triggered with args`, args)
try {
await fn(...args)
} catch (error) {
console.error(`error running ${event}`, error)
return
}
console.log(`finished ${event}`)
})
async function subWrapper (subFn, ...eventFns) {
while (true) {
try {
await new Promise((resolve, reject) => {
const sub = subFn({ lnd })
for (let i = 0; i < eventFns.length; i += 2) {
const [event, fn] = [eventFns[i], eventFns[i + 1]]
sub.on(event, async (...args) => {
console.log(`event ${event} triggered with args`, args)
try {
await fn(...args)
} catch (error) {
console.error(`error running ${event}`, error)
return
}
console.log(`finished ${event}`)
})
}
sub.on('error', (err) => {
// LND connection lost
// see https://www.npmjs.com/package/ln-service#subscriptions
sub.removeAllListeners()
reject(err)
})
})
} catch (err) {
console.error(err)
}
await sleep(5000)
console.log('attempting to reconnect to LND gRPC API ...')
}
sub.on('error', (err) => {
console.error(err)
// LND connection lost
// see https://www.npmjs.com/package/ln-service#subscriptions
sub.removeAllListeners()
})
}

await boss.start()

const [lastConfirmed] = await models.$queryRaw`SELECT "confirmedIndex" FROM "Invoice" ORDER BY "confirmedIndex" DESC NULLS LAST LIMIT 1`
subWrapper(subscribeToInvoices({ lnd, confirmed_after: lastConfirmed?.confirmedIndex }),
subWrapper(() => subscribeToInvoices({ lnd, confirmed_after: lastConfirmed?.confirmedIndex }),
'invoice_updated', (inv) => checkInvoice({ data: { hash: inv.id, sub: true }, ...args }))
await boss.work('checkInvoice', jobWrapper(checkInvoice))

subWrapper(subscribeToPayments({ lnd }),
subWrapper(subscribeToPayments,
'confirmed', (inv) => checkWithdrawal({ data: { hash: inv.id, sub: true }, ...args }),
'failed', (inv) => checkWithdrawal({ data: { hash: inv.id, sub: true } }, ...args),
'paying', (inv) => {} // ignore payment attempts
Expand Down

0 comments on commit ec25e58

Please sign in to comment.