Skip to content

Commit

Permalink
balance and block accesses subs working
Browse files Browse the repository at this point in the history
  • Loading branch information
robertlincecum authored and rileystephens28 committed Oct 22, 2024
1 parent 41e0967 commit bb283bf
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 26 deletions.
4 changes: 2 additions & 2 deletions src/_tests/integration/sendquai.integration.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ describe('Test sending Quai', function () {
from: wallet.address,
};
console.log(`Sending quai to: ${receiverAddress}`);
provider.on({ address: receiverAddress }, () => {
console.log('Received quai');
provider.on({ type: 'balance', address: receiverAddress }, (balance) => {
console.log(`Received quai on address ${receiverAddress}. New Balance is ${balance}`);
});
const tx = (await wallet.sendTransaction(txObj)) as QuaiTransactionResponse;
//wait 2 seconds
Expand Down
25 changes: 13 additions & 12 deletions src/providers/abstract-provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,19 @@ async function getSubscription(_event: ProviderEvent, zone?: Zone): Promise<Subs
return { type: 'orphan', tag: getTag('orphan', event), filter: copy(event), zone };
}

if ((<any>_event).topics || Array.isArray((<any>_event).address)) {
if ((<any>_event).type && (<any>_event).address) {
const address = formatMixedCaseChecksumAddress(
isHexString((<any>_event).address) ? (<any>_event).address : await resolveAddress((<any>_event).address),
);
const filter = <AccessesFilter>{
type: (<any>_event).type,
address: address,
};
if (!zone) {
zone = toZone(address.slice(0, 4));
}
return { filter, tag: getTag('accesses', filter), type: 'accesses', zone };
} else if ((<any>_event).topics || (<any>_event).address) {
const event = <EventFilter>_event;

const filter: EventFilter = {
Expand Down Expand Up @@ -418,17 +430,6 @@ async function getSubscription(_event: ProviderEvent, zone?: Zone): Promise<Subs
}

return { filter, tag: getTag('event', filter), type: 'event', zone };
} else if ((<any>_event).address) {
const address = formatMixedCaseChecksumAddress(
isHexString((<any>_event).address) ? (<any>_event).address : await resolveAddress((<any>_event).address),
);
const filter = <AccessesFilter>{
address: address,
};
if (!zone) {
zone = toZone(address.slice(0, 4));
}
return { filter, tag: getTag('accesses', filter), type: 'accesses', zone };
}

assertArgument(false, 'unknown ProviderEvent', 'event', _event);
Expand Down
13 changes: 8 additions & 5 deletions src/providers/provider-socket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -188,10 +188,10 @@ export class SocketBlockSubscriber extends SocketSubscriber {
* @category Providers
*/
export class SocketAccessesSubscriber extends SocketSubscriber {
#logFilter: string;
#accessesFilter: string;

get logFilter(): EventFilter {
return JSON.parse(this.#logFilter);
get accessesFilter(): AccessesFilter {
return JSON.parse(this.#accessesFilter);
}
/**
* Creates a new **SocketBlockSubscriber**.
Expand All @@ -203,7 +203,7 @@ export class SocketAccessesSubscriber extends SocketSubscriber {
*/
constructor(provider: SocketProvider, filter: AccessesFilter, zone: Zone) {
super(provider, ['accesses', filter.address], zone);
this.#logFilter = JSON.stringify(filter);
this.#accessesFilter = JSON.stringify(filter);
}

/**
Expand All @@ -215,7 +215,10 @@ export class SocketAccessesSubscriber extends SocketSubscriber {
* @returns {Promise<void>}
*/
async _emit(provider: SocketProvider, message: any): Promise<void> {
provider.emit(this.logFilter, this.zone, message);
if (this.accessesFilter.type === 'balance') {
message = await provider.getBalance(this.accessesFilter.address);
}
provider.emit(this.accessesFilter, this.zone, message);
}
}

Expand Down
30 changes: 23 additions & 7 deletions src/providers/provider-websocket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ export interface WebSocketLike {
onopen: null | ((...args: Array<any>) => any);
onmessage: null | ((...args: Array<any>) => any);
onerror: null | ((...args: Array<any>) => any);
onclose: null | ((...args: Array<any>) => any);

readyState: number;

Expand Down Expand Up @@ -116,6 +117,20 @@ export class WebSocketProvider extends SocketProvider {
// @TODO: now what? Attempt reconnect?
}
};

// @TODO: implement onclose
// websocket.onclose = () => {
// console.log('WebSocket closed. Attempting to reconnect...');
// setTimeout(() => {
// const baseUrl = websocket.url.split(':').slice(0, 2).join(':');
// const shardSuffix = this._getOption('usePathing') ? `/${fromShard(shard, 'nickname')}` : `:${port}`;
// const newWebSocket = this.createWebSocket(baseUrl, shardSuffix);
// this.initWebSocket(newWebSocket, shard, port);
// this.#websockets.push(newWebSocket);
// this._urlMap.set(shard, newWebSocket);
// }, 500); // Reconnect after 5 seconds
// };

websocket.onmessage = (message: { data: string }) => {
this._processMessage(message.data);
};
Expand All @@ -139,6 +154,12 @@ export class WebSocketProvider extends SocketProvider {
}
}

createWebSocket = (baseUrl: string, suffix: string): WebSocketLike => {
const tempWs = new _WebSocket(`${baseUrl}${suffix}`);
return tempWs as WebSocketLike;
// wait 2 minutes
};

/**
* Initialize the URL map with WebSocket connections.
*
Expand All @@ -152,11 +173,6 @@ export class WebSocketProvider extends SocketProvider {
this._urlMap.clear();
try {
const primeSuffix = this._getOption('usePathing') ? `/${fromShard(Shard.Prime, 'nickname')}` : ':8001';
const createWebSocket = (baseUrl: string, suffix: string): WebSocketLike => {
const tempWs = new _WebSocket(`${baseUrl}${suffix}`);
return tempWs as WebSocketLike;
// wait 2 minutes
};

const initShardWebSockets = async (baseUrl: string) => {
const shards = await this._getRunningLocations(Shard.Prime, true);
Expand All @@ -168,7 +184,7 @@ export class WebSocketProvider extends SocketProvider {
? `/${fromShard(shardEnum, 'nickname')}`
: `:${port}`;
const shardUrl = baseUrl.split(':').slice(0, 2).join(':');
const websocket = createWebSocket(shardUrl, shardSuffix);
const websocket = this.createWebSocket(shardUrl, shardSuffix);
this.initWebSocket(websocket, shardEnum);
this.#websockets.push(websocket);
this._urlMap.set(shardEnum, websocket);
Expand All @@ -185,7 +201,7 @@ export class WebSocketProvider extends SocketProvider {
if (Array.isArray(urls)) {
for (const url of urls) {
const baseUrl = `${url.split(':')[0]}:${url.split(':')[1]}`;
const primeWebsocket = createWebSocket(baseUrl, primeSuffix);
const primeWebsocket = this.createWebSocket(baseUrl, primeSuffix);
this.initWebSocket(primeWebsocket, Shard.Prime);
this.#websockets.push(primeWebsocket);
this._urlMap.set(Shard.Prime, primeWebsocket);
Expand Down
3 changes: 3 additions & 0 deletions src/providers/provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2627,12 +2627,15 @@ export interface EventFilter {
nodeLocation?: NodeLocation;
}

export type AccessesType = 'block' | 'balance';

/**
* An **AccessesFilter** allows efficiently filtering accesses (state uses) using address.
*
* @category Providers
*/
export interface AccessesFilter {
type: AccessesType;
address: AddressLike;
}

Expand Down

0 comments on commit bb283bf

Please sign in to comment.