Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/account data indexing #594

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
'use server';

import { z } from 'zod';
import { act } from '~/systems/Core/utils/act-server';
import { sdk } from '~/systems/Core/utils/sdk';

// Schema to validate inputs
const schema = z.object({
sortBy: z.string().optional(), // Sorting criteria (balance, transaction_count, etc.)
sortOrder: z.string().optional(), // asc or desc
first: z.number().optional().nullable(), // Number of accounts to fetch, can be null
cursor: z.string().optional().nullable(), // Pagination cursor
});

// Common function to fetch top accounts
async function fetchTopAccounts(
cursor?: string | null,
sortBy = 'transaction_count', // Default to transaction_count
sortOrder: 'asc' | 'desc' = 'desc', // Default to descending order
first: number | null = null, // Allow null to fetch all records if no limit is provided
) {
const queryParams: Record<string, any> = {
cursor,
first,
sortBy,
sortOrder,
};

console.log('Params Here', queryParams);

const data = await sdk.paginatedAccounts(queryParams);

if (!data.data.paginatedAccounts.nodes.length) {
return {
accounts: [],
pageInfo: {
hasNextPage: false,
hasPreviousPage: false,
startCursor: null,
endCursor: null,
},
};
}

const { nodes, pageInfo } = data.data.paginatedAccounts;

const accounts = nodes.map((account: any) => ({
id: account.id,
account_id: account.account_id,
balance: account.balance,
transaction_count: account.transaction_count,
}));

return {
accounts,
pageInfo: {
hasNextPage: pageInfo.hasNextPage,
hasPreviousPage: pageInfo.hasPreviousPage,
startCursor: pageInfo.startCursor,
endCursor: pageInfo.endCursor,
},
};
}

export const getTopAccounts = act(schema, async (params) => {
const sortBy = params.sortBy || 'transaction_count';
const sortOrder = (params.sortOrder || 'desc') as 'asc' | 'desc';
const first = params.first === null ? null : params.first;
const cursor = params.cursor || null;

return fetchTopAccounts(cursor, sortBy, sortOrder, first);
});
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,7 @@ export function TopNav() {
return (
<Nav>
<Nav.Desktop className={'px-10 justify-between'}>
<Nav.Menu>
{logo}
</Nav.Menu>
<Nav.Menu>{logo}</Nav.Menu>
<Nav.Menu>
<SearchWidget />
</Nav.Menu>
Expand Down
14 changes: 14 additions & 0 deletions packages/graphql/database/2.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
DROP TABLE indexer.accounts cascade;
CREATE TABLE indexer.accounts (
_id SERIAL PRIMARY KEY,
account_id character varying(66) NOT NULL UNIQUE,
balance BIGINT NOT NULL DEFAULT 0,
transaction_count INTEGER NOT NULL DEFAULT 0,
first_transaction_timestamp timestamp without time zone NOT NULL,
recent_transaction_timestamp timestamp without time zone NOT NULL
);
CREATE UNIQUE INDEX ON indexer.accounts(_id);
CREATE UNIQUE INDEX ON indexer.accounts(account_id);
CREATE INDEX ON indexer.accounts(transaction_count);
CREATE INDEX ON indexer.accounts(recent_transaction_timestamp);
CREATE INDEX ON indexer.accounts(first_transaction_timestamp);
142 changes: 135 additions & 7 deletions packages/graphql/src/application/uc/NewAddBlockRange.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,26 +12,36 @@ import {
import Block from '~/infra/dao/Block';
import Transaction from '~/infra/dao/Transaction';
import { DatabaseConnection } from '~/infra/database/DatabaseConnection';
import { AccountEntity } from '../../domain/Account/AccountEntity';
import AccountDAO from '../../infra/dao/AccountDAO';
import IndexAsset from './IndexAsset';

export default class NewAddBlockRange {
private accountDAO = new AccountDAO();

async execute(input: Input) {
const indexAsset = new IndexAsset();
const uniqueAccountOwners = new Set<string>();
const { from, to } = input;
logger.info(`🔗 Syncing blocks: #${from} - #${to}`);

const blocksData = await this.getBlocks(from, to);
if (blocksData.length === 0) {
logger.info(`🔗 No blocks to sync: #${from} - #${to}`);
return;
}

const start = performance.now();
const connection = DatabaseConnection.getInstance();

for (const blockData of blocksData) {
const queries: { statement: string; params: any }[] = [];
const block = new Block({ data: blockData });

// Add block data to queries
queries.push({
statement:
'insert into indexer.blocks (_id, id, timestamp, data, gas_used, producer) values ($1, $2, $3, $4, $5, $6) on conflict do nothing',
'INSERT INTO indexer.blocks (_id, id, timestamp, data, gas_used, producer) VALUES ($1, $2, $3, $4, $5, $6) ON CONFLICT DO NOTHING',
params: [
block.id,
block.blockHash,
Expand All @@ -41,11 +51,14 @@ export default class NewAddBlockRange {
block.producer,
],
});

// Process each transaction within the block
for (const [index, transactionData] of blockData.transactions.entries()) {
const transaction = new Transaction(transactionData, index, block.id);

queries.push({
statement:
'insert into indexer.transactions (_id, tx_hash, timestamp, data, block_id) values ($1, $2, $3, $4, $5) on conflict do nothing',
'INSERT INTO indexer.transactions (_id, tx_hash, timestamp, data, block_id) VALUES ($1, $2, $3, $4, $5) ON CONFLICT DO NOTHING',
params: [
transaction.id,
transaction.transactionHash,
Expand All @@ -54,67 +67,146 @@ export default class NewAddBlockRange {
transaction.blockId,
],
});

// Process transaction accounts
const accounts = this.getAccounts(transactionData);
for (const accountHash of accounts) {
queries.push({
statement:
'insert into indexer.transactions_accounts (_id, block_id, tx_hash, account_hash) values ($1, $2, $3, $4) on conflict do nothing',
'INSERT INTO indexer.transactions_accounts (_id, block_id, tx_hash, account_hash) VALUES ($1, $2, $3, $4) ON CONFLICT DO NOTHING',
params: [
transaction.id,
transaction.blockId,
transaction.transactionHash,
accountHash,
],
});
uniqueAccountOwners.add(accountHash);
}

// Handle assets in transaction receipts
if (transaction.data?.status?.receipts) {
try {
await indexAsset.execute(transaction.data);
} catch (e: any) {
logger.error('Error fetching assets', e);
}
}

// Insert inputs and predicates
if (transactionData.inputs) {
for (const inputData of transactionData.inputs) {
queries.push({
statement:
'insert into indexer.inputs (transaction_id, data) values ($1, $2) on conflict do nothing',
'INSERT INTO indexer.inputs (transaction_id, data) VALUES ($1, $2) ON CONFLICT DO NOTHING',
params: [transaction.id, inputData],
});

const predicate = this.getPredicate(inputData);
if (predicate) {
queries.push({
statement:
'insert into indexer.predicates (address, bytecode) values ($1, $2) on conflict do nothing',
'INSERT INTO indexer.predicates (address, bytecode) VALUES ($1, $2) ON CONFLICT DO NOTHING',
params: [predicate.address, predicate.bytecode],
});
}
}
}

// Insert outputs and contracts
if (transactionData.outputs) {
for (const outputData of transactionData.outputs) {
queries.push({
statement:
'insert into indexer.outputs (transaction_id, data) values ($1, $2) on conflict do nothing',
'INSERT INTO indexer.outputs (transaction_id, data) VALUES ($1, $2) ON CONFLICT DO NOTHING',
params: [transaction.id, outputData],
});
}

const contractIds = this.getContractIds(transactionData.outputs);
for (const contractId of contractIds) {
const contract = (await client.sdk.contract({ id: contractId }))
.data.contract;
if (contract) {
queries.push({
statement:
'insert into indexer.contracts (contract_hash, data) values ($1, $2) on conflict do nothing',
'INSERT INTO indexer.contracts (contract_hash, data) VALUES ($1, $2) ON CONFLICT DO NOTHING',
params: [contract.id, contract],
});
}
}
}
}

// Process unique account owners for each block
for (const owner of uniqueAccountOwners) {
try {
const existingAccount = await this.accountDAO.getAccountById(owner);
const transactionCountIncrement = blockData.transactions.filter(
(tx) =>
tx.inputs?.some(
(input) =>
input.__typename === 'InputCoin' && input.owner === owner,
),
).length;

let newBalance: bigint;

if (existingAccount) {
// Update existing account
queries.push({
statement: `
UPDATE indexer.accounts
SET transaction_count = transaction_count + $1, recent_transaction_timestamp = $2
WHERE account_id = $3
`,
params: [transactionCountIncrement, block.timestamp, owner],
});

newBalance = await this.fetchBalance(owner);

queries.push({
statement: `
UPDATE indexer.accounts
SET balance = $1
WHERE account_id = $2
`,
params: [newBalance, owner],
});
} else {
// Create a new account entry
newBalance = await this.fetchBalance(owner);

const newAccount = AccountEntity.create({
account_id: owner,
balance: newBalance,
transactionCount: transactionCountIncrement,
first_transaction_timestamp: block.timestamp,
});

queries.push({
statement: `
INSERT INTO indexer.accounts (account_id, balance, transaction_count, first_transaction_timestamp, recent_transaction_timestamp)
VALUES ($1, $2, $3, $4, $5)
ON CONFLICT DO NOTHING
`,
params: [
newAccount.account_id,
newAccount.balance,
newAccount.transactionCount,
block.timestamp,
block.timestamp,
],
});
}
} catch (err) {
console.error(`Error processing owner ${owner}:`, err);
}
}

await connection.executeTransaction(queries);
}

const end = performance.now();
const secs = Number.parseInt(`${(end - start) / 1000}`);
logger.info(`✅ Synced blocks: #${from} - #${to} (${secs}s)`);
Expand Down Expand Up @@ -187,6 +279,42 @@ export default class NewAddBlockRange {
}
return accounts;
}

async fetchBalance(owner: string): Promise<bigint> {
const response = await client.sdk.balance({
owner,
assetId:
'0xf8f8b6283d7fa5b672b530cbb84fcccb4ff8dc40f8176ef4544ddb1f1952ad07',
});
return BigInt(response.data.balance.amount);
}

async fetchAccountDataFromGraphQL(owner: string): Promise<any[]> {
const allBalances: any[] = [];
let hasNextPage = true;
let after: string | null = null;

while (hasNextPage) {
const response = await client.sdk.balances({
filter: { owner },
first: 1000,
after,
});

if (response.data?.balances?.nodes) {
const nodes = response.data.balances.nodes.map((node: any) => ({
amount: BigInt(node.amount),
assetId: node.assetId,
}));
allBalances.push(...nodes);
}

hasNextPage = response.data?.balances?.pageInfo?.hasNextPage || false;
after = response.data?.balances?.pageInfo?.endCursor || null;
}

return allBalances;
}
}

type Input = {
Expand Down
Loading
Loading