diff --git a/src/app.ts b/src/app.ts index 9d21e46..31c15b0 100644 --- a/src/app.ts +++ b/src/app.ts @@ -4,12 +4,9 @@ import {router as psaRouter} from './routes/psa'; import * as bodyParser from 'body-parser'; const pinningAuthHandler = require('./middlewares/auth/authHandler'); const w3authHandler = require('@crustio/ipfs-w3auth-handler'); -const schedule = require('node-schedule'); const Postgrator = require('postgrator'); const path = require('path'); import {updatePinObjectStatus, orderStart, pinExpireFiles} from './service/pinning'; -import {logger} from './logger'; -import {sendCrustOrderWarningMsg} from './service/crust/order'; import {configs} from './config/config'; const app = express(); @@ -32,30 +29,8 @@ const postgrator = new Postgrator({ }); postgrator.migrate('max').then((migrations: any) => { - app.listen(configs.server.port); + app.listen(configs.server.port); + updatePinObjectStatus(); + orderStart(); + pinExpireFiles(); }); - -schedule.scheduleJob('0 * * * * *', () => { - logger.info('pin status schedule start'); - updatePinObjectStatus() - .then(() => { - logger.info('pin status schedule finished'); - }) - .catch((e: Error) => { - logger.error(`pin status update err: ${e.message}`); - }); -}); - -orderStart() - .then(() => { - logger.info('order schedule finished'); - }) - .catch((e: Error) => { - sendCrustOrderWarningMsg('crust order crashed', `err: ${e.message}`); - logger.error(`order status err: ${e.message}`); - }); - -pinExpireFiles().catch((e: Error) => { - sendCrustOrderWarningMsg('pin crust expire file job crashed', `pin crust expire file job crashed err: ${e.message}`); - logger.error(`Pin expire job crashed err: ${e.message}`); -}) diff --git a/src/dao/pinObjectDao.ts b/src/dao/pinObjectDao.ts index 01c199f..2198a22 100644 --- a/src/dao/pinObjectDao.ts +++ b/src/dao/pinObjectDao.ts @@ -1,4 +1,5 @@ import {PinObjectsQuery, PinResults, PinStatus} from '../models/PinObjects'; +import {PinObjectStatus} from "../common/commonUtils"; const {TextMatchingStrategy} = require('../common/commonUtils'); const _ = require('lodash'); const commonDao = require('./commonDao'); @@ -7,6 +8,7 @@ const pinObjectDao = { selectPinObjectListByQuery: selectPinObjectListByQuery, selectPinObjectByRequestIdAndUserId: selectPinObjectByRequestIdAndUserId, deletePinObjectByRequestIdAndUserId: deletePinObjectByRequestIdAndUserId, + queryPinningObjects: queryPinningObjects, }; async function deletePinObjectByRequestIdAndUserId( @@ -19,6 +21,13 @@ async function deletePinObjectByRequestIdAndUserId( ); } +async function queryPinningObjects(limit: number = 100) { + return commonDao.queryForArray( + 'select * from pin_object where deleted = ? and status = ? limit ?', + [0, PinObjectStatus.pinning, limit] + ); +} + async function selectPinObjectByRequestIdAndUserId( requestId: string, userId: number diff --git a/src/service/crust/api.ts b/src/service/crust/api.ts index a205af1..8b5aed7 100644 --- a/src/service/crust/api.ts +++ b/src/service/crust/api.ts @@ -1,8 +1,16 @@ -import {ApiPromise, WsProvider} from '@polkadot/api'; +import {ApiPromise, ApiRx, WsProvider} from '@polkadot/api'; import {typesBundleForPolkadot} from '@crustio/type-definitions'; import {configs} from '../../config/config'; -export const api = new ApiPromise({ - provider: new WsProvider(configs.crust.chainWsUrl), - typesBundle: typesBundleForPolkadot, -}); +export const apiConnect = (): ApiPromise => { + return new ApiPromise({ + provider: new WsProvider(configs.crust.chainWsUrl), + typesBundle: typesBundleForPolkadot, + }); +} + +export const disconnectApi = async (api: ApiPromise) => { + if (api) { + await api.disconnect().catch((e) => {}); + } +} diff --git a/src/service/pinning/index.ts b/src/service/pinning/index.ts index f166f00..35e83cc 100644 --- a/src/service/pinning/index.ts +++ b/src/service/pinning/index.ts @@ -8,13 +8,15 @@ import { sendCrustOrderWarningMsg, sendTx, } from '../crust/order'; -import {api} from '../crust/api'; +import {apiConnect, disconnectApi} from '../crust/api'; import createKeyring from '../crust/krp'; const commonDao = require('../../dao/commonDao'); const moment = require('moment'); const _ = require('lodash'); import {logger} from '../../logger'; import {timeoutOrError} from '../../common/promise-utils'; +import {ApiPromise} from "@polkadot/api"; +import {disconnect} from "node:cluster"; const Sequelize = require('sequelize'); const {sleep} = require('../../common/commonUtils'); const pinObjectDao = require('../../dao/pinObjectDao'); @@ -61,16 +63,16 @@ export async function pinByCid(userId: number, pin: Pin): Promise { } export async function orderStart() { - for (;;) { + while (true) { + let apiPromise; try { - const checkAccount = await checkAccountBalanceAndWarning(api); + apiPromise = apiConnect(); + const checkAccount = await checkAccountBalanceAndWarning(apiPromise); if (!checkAccount) { await sleep(configs.crust.loopTimeAwait); continue; } - await placeOrderQueuedFiles().catch(e => { - logger.error(`place order queued files failed: ${e.message}`); - }); + await placeOrderQueuedFiles(apiPromise); await sleep(configs.crust.loopTimeAwait); } catch (e) { logger.error(`place order loop error: ${e.message}`); @@ -79,11 +81,13 @@ export async function orderStart() { `### crust-pinner(${configs.server.name}) error \n err msg: ${e.message}` ); await sleep(configs.crust.loopTimeAwait); + } finally { + await disconnectApi(apiPromise) } } } -async function placeOrderQueuedFiles() { +async function placeOrderQueuedFiles(apiPromise: ApiPromise) { logger.info('start placeOrderQueuedFiles'); const pinObjects = await PinObjects.findAll({ where: { @@ -98,7 +102,6 @@ async function placeOrderQueuedFiles() { }, order: [['update_time', 'asc']], }); - // distinct by cid if (_.isEmpty(pinObjects)) { logger.info('not pin objects to order'); return; @@ -112,7 +115,7 @@ async function placeOrderQueuedFiles() { for (const cid of _.map(cidRetryGroup, (i: any, j: any) => j)) { const needToOrder = await needOrder(cid, cidRetryGroup[cid][0].retry_times); if (needToOrder.needOrder) { - await placeOrderInCrust(cid, needToOrder.retryTimes).catch(e => { + await placeOrderInCrust(apiPromise, cid, needToOrder.retryTimes).catch(e => { logger.error(`order error catch: ${JSON.stringify(e)} cid: ${cid}`); }); await sleep(configs.crust.orderTimeAwait); @@ -156,7 +159,7 @@ class PinObjectState { status: string; } -async function placeOrderInCrust(cid: string, retryTimes = 0) { +async function placeOrderInCrust(apiPromise: ApiPromise, cid: string, retryTimes = 0) { let pinStatus = PinObjectStatus.pinning; let retryTimeAdd = false; try { @@ -170,7 +173,7 @@ async function placeOrderInCrust(cid: string, retryTimes = 0) { const res = await timeoutOrError( 'Crust place order', placeOrder( - api, + apiPromise, krp, fileCid, fileSize, @@ -213,38 +216,51 @@ async function placeOrderInCrust(cid: string, retryTimes = 0) { } export async function updatePinObjectStatus() { - const pinningObjects = await PinObjects.findAll({ - where: {status: PinObjectStatus.pinning, deleted: 0}, - }); - if (!_.isEmpty(pinningObjects)) { - for (const obj of pinningObjects) { + let apiPromise; + while (true) { try { - const res = await getOrderState(api, obj.cid); - if (res) { - if ( - res.meaningfulData.reported_replica_count >= - configs.crust.validFileSize - ) { - obj.status = PinObjectStatus.pinned; - } else { - obj.status = PinObjectStatus.pinning; - } - } else { - // invalid file size - obj.deleted = 1; - obj.status = PinObjectStatus.failed; + const pinningObjects = await pinObjectDao.queryPinningObjects(); + if (_.isEmpty(pinningObjects)) { + await sleep(configs.crust.loopTimeAwait); + continue; + } + apiPromise = apiConnect(); + for (const obj of pinningObjects) { + const res = await getOrderState(apiPromise, obj.cid); + if (res) { + if ( + res.meaningfulData.reported_replica_count >= + configs.crust.validFileSize + ) { + obj.status = PinObjectStatus.pinned; + } else { + obj.status = PinObjectStatus.pinning; + } + } else { + // invalid file size + obj.deleted = 1; + obj.status = PinObjectStatus.failed; + } + await PinObjects.update({ + status: obj.status, + deleted: obj.deleted, + }, { + where: { id: obj.id } + }); } - await obj.save(); } catch (e) { logger.error(`get order state err: ${e}`); + } finally { + await disconnectApi(apiPromise); } - } } } export async function pinExpireFiles() { + let apiPromise; while(true) { try { + apiPromise = apiConnect(); const pinningObjects = await PinObjects.findAll({ where: { status: PinObjectStatus.pinned, deleted: 0 }, order: [['id', 'asc']], @@ -254,9 +270,8 @@ export async function pinExpireFiles() { await sleep(1000 * 6); continue; } - await api.isReadyOrError; - const hash = await api.rpc.chain.getFinalizedHead(); - const block = await api.rpc.chain.getBlock(hash); + const hash = await apiPromise.rpc.chain.getFinalizedHead(); + const block = await apiPromise.rpc.chain.getBlock(hash); const finalizeNumber = block.block.header.number.toNumber(); for (const p of pinningObjects) { const existFileNotPinned = await PinObjects.findOne({ @@ -266,7 +281,7 @@ export async function pinExpireFiles() { limit: 1 }); if (_.isEmpty(existFileNotPinned)) { - const res = await getOrderState(api, p.cid); + const res = await getOrderState(apiPromise, p.cid); if (_.isEmpty(res) || (res.meaningfulData.expired_at <= (finalizeNumber + configs.crust.expireBlockNumber))) { await PinObjects.update({status: PinObjectStatus.queued, retry_times: 0}, { where: { id: p.id } }) } @@ -275,6 +290,8 @@ export async function pinExpireFiles() { } } catch (e) { await sleep(1000 * 60); + } finally { + await disconnectApi(apiPromise); } } }