Skip to content

Commit

Permalink
Adjust apiPromise code
Browse files Browse the repository at this point in the history
Signed-off-by: KayleCoder <[email protected]>
  • Loading branch information
KayleCoder committed Oct 18, 2024
1 parent afd25a7 commit a84d3c4
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 70 deletions.
33 changes: 4 additions & 29 deletions src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,9 @@ import {router as psaRouter} from './routes/psa';
import * as bodyParser from 'body-parser';
const pinningAuthHandler = require('./middlewares/auth/authHandler');
const w3authHandler = require('@crustio/ipfs-w3auth-handler');
const schedule = require('node-schedule');
const Postgrator = require('postgrator');
const path = require('path');
import {updatePinObjectStatus, orderStart, pinExpireFiles} from './service/pinning';
import {logger} from './logger';
import {sendCrustOrderWarningMsg} from './service/crust/order';
import {configs} from './config/config';

const app = express();
Expand All @@ -32,30 +29,8 @@ const postgrator = new Postgrator({
});

postgrator.migrate('max').then((migrations: any) => {
app.listen(configs.server.port);
app.listen(configs.server.port);
updatePinObjectStatus();
orderStart();
pinExpireFiles();
});

schedule.scheduleJob('0 * * * * *', () => {
logger.info('pin status schedule start');
updatePinObjectStatus()
.then(() => {
logger.info('pin status schedule finished');
})
.catch((e: Error) => {
logger.error(`pin status update err: ${e.message}`);
});
});

orderStart()
.then(() => {
logger.info('order schedule finished');
})
.catch((e: Error) => {
sendCrustOrderWarningMsg('crust order crashed', `err: ${e.message}`);
logger.error(`order status err: ${e.message}`);
});

pinExpireFiles().catch((e: Error) => {
sendCrustOrderWarningMsg('pin crust expire file job crashed', `pin crust expire file job crashed err: ${e.message}`);
logger.error(`Pin expire job crashed err: ${e.message}`);
})
9 changes: 9 additions & 0 deletions src/dao/pinObjectDao.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import {PinObjectsQuery, PinResults, PinStatus} from '../models/PinObjects';
import {PinObjectStatus} from "../common/commonUtils";
const {TextMatchingStrategy} = require('../common/commonUtils');
const _ = require('lodash');
const commonDao = require('./commonDao');
Expand All @@ -7,6 +8,7 @@ const pinObjectDao = {
selectPinObjectListByQuery: selectPinObjectListByQuery,
selectPinObjectByRequestIdAndUserId: selectPinObjectByRequestIdAndUserId,
deletePinObjectByRequestIdAndUserId: deletePinObjectByRequestIdAndUserId,
queryPinningObjects: queryPinningObjects,
};

async function deletePinObjectByRequestIdAndUserId(
Expand All @@ -19,6 +21,13 @@ async function deletePinObjectByRequestIdAndUserId(
);
}

async function queryPinningObjects(limit: number = 100) {
return commonDao.queryForArray(
'select * from pin_object where deleted = ? and status = ? limit ?',
[0, PinObjectStatus.pinning, limit]
);
}

async function selectPinObjectByRequestIdAndUserId(
requestId: string,
userId: number
Expand Down
18 changes: 13 additions & 5 deletions src/service/crust/api.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,16 @@
import {ApiPromise, WsProvider} from '@polkadot/api';
import {ApiPromise, ApiRx, WsProvider} from '@polkadot/api';
import {typesBundleForPolkadot} from '@crustio/type-definitions';
import {configs} from '../../config/config';

export const api = new ApiPromise({
provider: new WsProvider(configs.crust.chainWsUrl),
typesBundle: typesBundleForPolkadot,
});
export const apiConnect = (): ApiPromise => {
return new ApiPromise({
provider: new WsProvider(configs.crust.chainWsUrl),
typesBundle: typesBundleForPolkadot,
});
}

export const disconnectApi = async (api: ApiPromise) => {
if (api) {
await api.disconnect().catch((e) => {});
}
}
89 changes: 53 additions & 36 deletions src/service/pinning/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@ import {
sendCrustOrderWarningMsg,
sendTx,
} from '../crust/order';
import {api} from '../crust/api';
import {apiConnect, disconnectApi} from '../crust/api';
import createKeyring from '../crust/krp';
const commonDao = require('../../dao/commonDao');
const moment = require('moment');
const _ = require('lodash');
import {logger} from '../../logger';
import {timeoutOrError} from '../../common/promise-utils';
import {ApiPromise} from "@polkadot/api";
import {disconnect} from "node:cluster";
const Sequelize = require('sequelize');
const {sleep} = require('../../common/commonUtils');
const pinObjectDao = require('../../dao/pinObjectDao');
Expand Down Expand Up @@ -61,16 +63,16 @@ export async function pinByCid(userId: number, pin: Pin): Promise<PinStatus> {
}

export async function orderStart() {
for (;;) {
while (true) {
let apiPromise;
try {
const checkAccount = await checkAccountBalanceAndWarning(api);
apiPromise = apiConnect();
const checkAccount = await checkAccountBalanceAndWarning(apiPromise);
if (!checkAccount) {
await sleep(configs.crust.loopTimeAwait);
continue;
}
await placeOrderQueuedFiles().catch(e => {
logger.error(`place order queued files failed: ${e.message}`);
});
await placeOrderQueuedFiles(apiPromise);
await sleep(configs.crust.loopTimeAwait);
} catch (e) {
logger.error(`place order loop error: ${e.message}`);
Expand All @@ -79,11 +81,13 @@ export async function orderStart() {
`### crust-pinner(${configs.server.name}) error \n err msg: ${e.message}`
);
await sleep(configs.crust.loopTimeAwait);
} finally {
await disconnectApi(apiPromise)
}
}
}

async function placeOrderQueuedFiles() {
async function placeOrderQueuedFiles(apiPromise: ApiPromise) {
logger.info('start placeOrderQueuedFiles');
const pinObjects = await PinObjects.findAll({
where: {
Expand All @@ -98,7 +102,6 @@ async function placeOrderQueuedFiles() {
},
order: [['update_time', 'asc']],
});
// distinct by cid
if (_.isEmpty(pinObjects)) {
logger.info('not pin objects to order');
return;
Expand All @@ -112,7 +115,7 @@ async function placeOrderQueuedFiles() {
for (const cid of _.map(cidRetryGroup, (i: any, j: any) => j)) {
const needToOrder = await needOrder(cid, cidRetryGroup[cid][0].retry_times);
if (needToOrder.needOrder) {
await placeOrderInCrust(cid, needToOrder.retryTimes).catch(e => {
await placeOrderInCrust(apiPromise, cid, needToOrder.retryTimes).catch(e => {
logger.error(`order error catch: ${JSON.stringify(e)} cid: ${cid}`);
});
await sleep(configs.crust.orderTimeAwait);
Expand Down Expand Up @@ -156,7 +159,7 @@ class PinObjectState {
status: string;
}

async function placeOrderInCrust(cid: string, retryTimes = 0) {
async function placeOrderInCrust(apiPromise: ApiPromise, cid: string, retryTimes = 0) {
let pinStatus = PinObjectStatus.pinning;
let retryTimeAdd = false;
try {
Expand All @@ -170,7 +173,7 @@ async function placeOrderInCrust(cid: string, retryTimes = 0) {
const res = await timeoutOrError(
'Crust place order',
placeOrder(
api,
apiPromise,
krp,
fileCid,
fileSize,
Expand Down Expand Up @@ -213,38 +216,51 @@ async function placeOrderInCrust(cid: string, retryTimes = 0) {
}

export async function updatePinObjectStatus() {
const pinningObjects = await PinObjects.findAll({
where: {status: PinObjectStatus.pinning, deleted: 0},
});
if (!_.isEmpty(pinningObjects)) {
for (const obj of pinningObjects) {
let apiPromise;
while (true) {
try {
const res = await getOrderState(api, obj.cid);
if (res) {
if (
res.meaningfulData.reported_replica_count >=
configs.crust.validFileSize
) {
obj.status = PinObjectStatus.pinned;
} else {
obj.status = PinObjectStatus.pinning;
}
} else {
// invalid file size
obj.deleted = 1;
obj.status = PinObjectStatus.failed;
const pinningObjects = await pinObjectDao.queryPinningObjects();
if (_.isEmpty(pinningObjects)) {
await sleep(configs.crust.loopTimeAwait);
continue;
}
apiPromise = apiConnect();
for (const obj of pinningObjects) {
const res = await getOrderState(apiPromise, obj.cid);
if (res) {
if (
res.meaningfulData.reported_replica_count >=
configs.crust.validFileSize
) {
obj.status = PinObjectStatus.pinned;
} else {
obj.status = PinObjectStatus.pinning;
}
} else {
// invalid file size
obj.deleted = 1;
obj.status = PinObjectStatus.failed;
}
await PinObjects.update({
status: obj.status,
deleted: obj.deleted,
}, {
where: { id: obj.id }
});
}
await obj.save();
} catch (e) {
logger.error(`get order state err: ${e}`);
} finally {
await disconnectApi(apiPromise);
}
}
}
}

export async function pinExpireFiles() {
let apiPromise;
while(true) {
try {
apiPromise = apiConnect();
const pinningObjects = await PinObjects.findAll({
where: { status: PinObjectStatus.pinned, deleted: 0 },
order: [['id', 'asc']],
Expand All @@ -254,9 +270,8 @@ export async function pinExpireFiles() {
await sleep(1000 * 6);
continue;
}
await api.isReadyOrError;
const hash = await api.rpc.chain.getFinalizedHead();
const block = await api.rpc.chain.getBlock(hash);
const hash = await apiPromise.rpc.chain.getFinalizedHead();
const block = await apiPromise.rpc.chain.getBlock(hash);
const finalizeNumber = block.block.header.number.toNumber();
for (const p of pinningObjects) {
const existFileNotPinned = await PinObjects.findOne({
Expand All @@ -266,7 +281,7 @@ export async function pinExpireFiles() {
limit: 1
});
if (_.isEmpty(existFileNotPinned)) {
const res = await getOrderState(api, p.cid);
const res = await getOrderState(apiPromise, p.cid);
if (_.isEmpty(res) || (res.meaningfulData.expired_at <= (finalizeNumber + configs.crust.expireBlockNumber))) {
await PinObjects.update({status: PinObjectStatus.queued, retry_times: 0}, { where: { id: p.id } })
}
Expand All @@ -275,6 +290,8 @@ export async function pinExpireFiles() {
}
} catch (e) {
await sleep(1000 * 60);
} finally {
await disconnectApi(apiPromise);
}
}
}

0 comments on commit a84d3c4

Please sign in to comment.