Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Account Data Indexing #507

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions packages/graphql/database/1.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
CREATE TABLE indexer.accounts (
_id SERIAL PRIMARY KEY,
account_id character varying(66) NOT NULL UNIQUE,
transaction_count INTEGER NOT NULL DEFAULT 0,
data jsonb NOT NULL DEFAULT '{}',
first_transaction_timestamp timestamp without time zone NOT NULL,
recent_transaction_timestamp timestamp without time zone NOT NULL
);
CREATE UNIQUE INDEX ON indexer.accounts(_id);
CREATE UNIQUE INDEX ON indexer.accounts(account_id);
CREATE INDEX ON indexer.accounts(transaction_count);
CREATE INDEX ON indexer.accounts(recent_transaction_timestamp);
CREATE INDEX ON indexer.accounts(first_transaction_timestamp);
83 changes: 83 additions & 0 deletions packages/graphql/src/application/uc/NewAddBlockRange.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,11 @@ import {
import Block from '~/infra/dao/Block';
import Transaction from '~/infra/dao/Transaction';
import { DatabaseConnection } from '~/infra/database/DatabaseConnection';
import { AccountEntity } from '../../domain/Account/AccountEntity';
import { AccountDAO } from '../../infra/dao/AccountDAO';

export default class NewAddBlockRange {
private accountDAO = new AccountDAO();
async execute(input: Input) {
const { from, to } = input;
logger.info(`🔗 Syncing blocks: #${from} - #${to}`);
Expand Down Expand Up @@ -104,6 +107,41 @@ export default class NewAddBlockRange {
}
}
}
// New code starts here: Fetch and save account data
const owners = this.extractUniqueOwners(blockData.transactions);
for (const owner of owners) {
// Fetch existing account if present
const existingAccount = await this.accountDAO.getAccountById(owner);
const transactionCountIncrement = blockData.transactions.filter((tx) =>
tx.inputs?.some(
(input) =>
input.__typename === 'InputCoin' && input.owner === owner,
),
).length;

let newData: any;

if (existingAccount) {
// Increment transaction count by the number of transactions found in the current range
await this.accountDAO.incrementTransactionCount(
owner,
transactionCountIncrement,
);

newData = await this.fetchAccountDataFromGraphQL(owner);

await this.accountDAO.updateAccountData(owner, newData);
} else {
newData = await this.fetchAccountDataFromGraphQL(owner);

const newAccount = AccountEntity.create({
account_id: owner,
transactionCount: transactionCountIncrement,
data: newData,
});
await this.accountDAO.save(newAccount);
}
}
await connection.executeTransaction(queries);
}
const end = performance.now();
Expand Down Expand Up @@ -178,6 +216,51 @@ export default class NewAddBlockRange {
}
return accounts;
}

// New method to extract unique owners
extractUniqueOwners(transactions: GQLTransaction[]): string[] {
const owners = new Set<string>();
for (const tx of transactions) {
if (tx.inputs) {
for (const input of tx.inputs) {
if (input.__typename === 'InputCoin' && input.owner) {
owners.add(input.owner);
}
}
}
}
return Array.from(owners);
}

// New method to fetch account data from GraphQL
async fetchAccountDataFromGraphQL(owner: string): Promise<any[]> {
const allBalances: any[] = [];
let hasNextPage = true;
let after: string | null = null;

while (hasNextPage) {
const response = await client.sdk.balances({
filter: { owner },
first: 1000, // Fetch 1000 records at a time
after, // Use the 'after' cursor for pagination
});

if (response.data?.balances?.nodes) {
// Map the nodes to the desired structure and append to allBalances
const nodes = response.data.balances.nodes.map((node: any) => ({
amount: BigInt(node.amount),
assetId: node.assetId,
}));
allBalances.push(...nodes);
}

// Check if there is a next page and update the 'after' cursor
hasNextPage = response.data?.balances?.pageInfo?.hasNextPage || false;
after = response.data?.balances?.pageInfo?.endCursor || null;
}

return allBalances;
}
}

type Input = {
Expand Down
64 changes: 64 additions & 0 deletions packages/graphql/src/domain/Account/AccountEntity.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import { Hash256 } from '../../application/vo/Hash256';
import { Entity } from '../../core/Entity';
import { AccountData } from './vo/AccountData';
import { AccountModelID } from './vo/AccountModelID';

type AccountInputProps = {
account_id: Hash256;
data: AccountData;
transactionCount: number;
};

export class AccountEntity extends Entity<AccountInputProps, AccountModelID> {
// Adjust the constructor to not require an ID initially
static create(account: any) {
const account_id = Hash256.create(account.account_id);
const data = AccountData.create(account.data);
const transactionCount = account.transactionCount || 0;

const props: AccountInputProps = {
account_id,
data,
transactionCount,
};

// If _id is not provided, set it as undefined
const id = account._id ? AccountModelID.create(account._id) : undefined;

return new AccountEntity(props, id);
}

static toDBItem(account: AccountEntity): any {
return {
account_id: account.props.account_id.value(),
data: AccountEntity.serializeData(account.props.data.value()),
transaction_count: account.props.transactionCount,
};
}

static serializeData(data: any): string {
return JSON.stringify(data, (_, value) =>
typeof value === 'bigint' ? value.toString() : value,
);
}

get cursor() {
return this.id ? this.id.value() : null;
}

get id() {
return this._id;
}

get account_id() {
return this.props.account_id.value();
}

get transactionCount() {
return this.props.transactionCount;
}

get data() {
return this.props.data.value();
}
}
9 changes: 9 additions & 0 deletions packages/graphql/src/domain/Account/AccountModel.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import { integer, jsonb, pgTable, serial, text } from 'drizzle-orm/pg-core';
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we are not using Drizzle anymore, don't need to create the mapping classes


// Define the schema for the accounts table
export const AccountsTable = pgTable('accounts', {
_id: serial('_id').primaryKey(),
account_id: text('account_id').notNull().unique(),
transaction_count: integer('transaction_count').notNull().default(0),
data: jsonb('data').notNull().default({}),
});
24 changes: 24 additions & 0 deletions packages/graphql/src/domain/Account/vo/AccountData.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import { jsonb } from 'drizzle-orm/pg-core';
import { ValueObject } from '../../../core/ValueObject';

interface Props {
value: any;
}

export class AccountData extends ValueObject<Props> {
private constructor(props: Props) {
super(props);
}

static type() {
return jsonb('data').notNull();
}

static create(value: any) {
return new AccountData({ value });
}

value() {
return this.props.value;
}
}
20 changes: 20 additions & 0 deletions packages/graphql/src/domain/Account/vo/AccountModelID.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import { integer } from 'drizzle-orm/pg-core';
import { Identifier } from '../../../core/Identifier';

export class AccountModelID extends Identifier<number> {
private constructor(id: number) {
super(id);
}

static type() {
return integer('_id').primaryKey();
}

static create(id: number): AccountModelID {
if (typeof id !== 'number' || Number.isNaN(id)) {
throw new Error('Invalid ID: ID must be a valid number.');
}

return new AccountModelID(id);
}
}
18 changes: 18 additions & 0 deletions packages/graphql/src/domain/Account/vo/AccountRef.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import { ValueObject } from '../../../core/ValueObject';
interface Props {
value: number;
}

export class AccountRef extends ValueObject<Props> {
private constructor(props: Props) {
super(props);
}

static create(id: number) {
return new AccountRef({ value: id });
}

value() {
return this.props.value;
}
}
111 changes: 111 additions & 0 deletions packages/graphql/src/infra/dao/AccountDAO.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
import { AccountEntity } from '../../domain/Account/AccountEntity';
import { DatabaseConnection } from '../database/DatabaseConnection';

export class AccountDAO {
private databaseConnection: DatabaseConnection;

constructor() {
this.databaseConnection = DatabaseConnection.getInstance();
}

// Custom function to stringify BigInt values
private stringifyBigInt(data: any): string {
return JSON.stringify(data, (_key, value) =>
typeof value === 'bigint' ? value.toString() : value,
);
}

async save(account: AccountEntity) {
const accountData = AccountEntity.toDBItem(account);

const data = this.stringifyBigInt(accountData.data);

// Use raw SQL query to insert or update the account record
await this.databaseConnection.query(
`
INSERT INTO indexer.accounts (account_id, transaction_count, data, first_transaction_timestamp, recent_transaction_timestamp)
VALUES ($1, $2, $3, $4, $5)
ON CONFLICT (account_id)
DO UPDATE SET
transaction_count = EXCLUDED.transaction_count,
data = EXCLUDED.data,
recent_transaction_timestamp = CASE
WHEN accounts.transaction_count <> EXCLUDED.transaction_count THEN EXCLUDED.recent_transaction_timestamp
ELSE accounts.recent_transaction_timestamp
END
`,
[
accountData.account_id,
accountData.transaction_count,
data,
accountData.first_transaction_timestamp || new Date().toISOString(),
new Date().toISOString(),
],
);
}

async getAccountById(id: string): Promise<AccountEntity | null> {
const result = await this.databaseConnection.query(
`
SELECT * FROM indexer.accounts WHERE account_id = $1
`,
[id],
);

return result.length ? AccountEntity.create(result[0]) : null;
}

async incrementTransactionCount(account_id: string, incrementBy = 1) {
await this.databaseConnection.query(
`
UPDATE indexer.accounts
SET transaction_count = transaction_count + $1,
recent_transaction_timestamp = $2
WHERE account_id = $3
`,
[incrementBy, new Date().toISOString(), account_id],
);
}

// Updated method to update account data with BigInt handling
async updateAccountData(account_id: string, newData: any) {
const data = this.stringifyBigInt(newData); // Use custom function for BigInt serialization

await this.databaseConnection.query(
`
UPDATE indexer.accounts
SET data = $1,
recent_transaction_timestamp = $2
WHERE account_id = $3
`,
[data, new Date().toISOString(), account_id],
);
}

async updateAccountTransactionCount(
account_id: string,
newTransactionCount: number,
) {
await this.databaseConnection.query(
`
UPDATE indexer.accounts
SET transaction_count = $1,
recent_transaction_timestamp = $2
WHERE account_id = $3
`,
[newTransactionCount, new Date().toISOString(), account_id],
);
}

// New method to get account data content
async getAccountDataContent(account_id: string): Promise<any | null> {
const result = await this.databaseConnection.query(
`
SELECT data FROM indexer.accounts WHERE account_id = $1
`,
[account_id],
);

return result.length ? result[0].data : null;
}
}