Skip to content

Commit

Permalink
Use RpcRequest in PubSubSubscriptionPlan (#3403)
Browse files Browse the repository at this point in the history
This PR changes the configs of the `executeRpcPubSubSubscriptionPlan` function such that it accepts a "Subscribe Request" as a `RpcRequest` object instead of a `subscribeMethodName` and `subscribeParams` duo.

This mean we can now use the `RpcRequestTransformer` provided in a more idiomatic way.

Note: A changeset for this PR is included in #3407.
  • Loading branch information
lorisleiva authored Oct 23, 2024
1 parent db144da commit 4614c57
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 33 deletions.
2 changes: 1 addition & 1 deletion packages/rpc-subscriptions-api/src/__tests__/index-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ describe('createSolanaRpcSubscriptionsApi', () => {
});
expect(executeRpcPubSubSubscriptionPlan).toHaveBeenCalledWith(
expect.objectContaining({
subscribeMethodName: 'thingSubscribe',
subscribeRequest: { methodName: 'thingSubscribe', params: [] },
unsubscribeMethodName: 'thingUnsubscribe',
}),
);
Expand Down
16 changes: 8 additions & 8 deletions packages/rpc-subscriptions-api/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,25 +51,25 @@ type Config = RequestTransformerConfig;
function createSolanaRpcSubscriptionsApi_INTERNAL<TApi extends RpcSubscriptionsApiMethods>(
config?: Config,
): RpcSubscriptionsApi<TApi> {
const requestTransformer = getDefaultRequestTransformerForSolanaRpc(config);
const responseTransformer = getDefaultResponseTransformerForSolanaRpcSubscriptions({
allowedNumericKeyPaths: getAllowedNumericKeypaths(),
});
// TODO(loris): Replace with request transformer.
const parametersTransformer = <T extends unknown[]>(notificationName: string, params?: T) => {
return getDefaultRequestTransformerForSolanaRpc(config)({ methodName: notificationName, params })
.params as unknown[];
};
return createRpcSubscriptionsApi<TApi>({
getSubscriptionConfigurationHash({ notificationName, params }) {
return fastStableStringify([notificationName, params]);
},
planExecutor({ notificationName, params, ...rest }) {
const request = { methodName: notificationName, params };
const transformedRequest = requestTransformer(request);
return executeRpcPubSubSubscriptionPlan({
...rest,
responseTransformer,
subscribeMethodName: notificationName.replace(/Notifications$/, 'Subscribe'),
subscribeParams: parametersTransformer(notificationName, params),
unsubscribeMethodName: notificationName.replace(/Notifications$/, 'Unsubscribe'),
subscribeRequest: {
...transformedRequest,
methodName: transformedRequest.methodName.replace(/Notifications$/, 'Subscribe'),
},
unsubscribeMethodName: transformedRequest.methodName.replace(/Notifications$/, 'Unsubscribe'),
});
},
});
Expand Down
4 changes: 2 additions & 2 deletions packages/rpc-subscriptions-spec/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,11 @@ Subscription channels publish events on two channel names:

## Functions

### `executeRpcPubSubSubscriptionPlan({ channel, responseTransformer, signal, subscribeMethodName, subscribeParams, unsubscribeMethodName })`
### `executeRpcPubSubSubscriptionPlan({ channel, responseTransformer, signal, subscribeRequest, unsubscribeMethodName })`

Given a channel, this function executes the particular subscription plan required by the Solana JSON RPC Subscriptions API.

1. Calls the `subscribeMethodName` on the remote RPC
1. Calls the `subscribeRequest` on the remote RPC
2. Waits for a response containing the subscription id
3. Returns a `DataPublisher` that publishes notifications related to that subscriptions id, filtering out all others
4. Calls the `unsubscribeMethodName` on the remote RPC when the abort signal is fired.
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,7 @@ describe('executeRpcPubSubSubscriptionPlan', () => {
const publisherPromise = executeRpcPubSubSubscriptionPlan({
channel: mockChannel as RpcSubscriptionsChannel<unknown, unknown>,
signal: abortController.signal,
subscribeMethodName: 'thingSubscribe',
subscribeParams: [],
subscribeRequest: { methodName: 'thingSubscribe', params: [] },
unsubscribeMethodName: 'thingUnsubscribe',
});
await expect(publisherPromise).rejects.toThrow();
Expand All @@ -56,8 +55,7 @@ describe('executeRpcPubSubSubscriptionPlan', () => {
executeRpcPubSubSubscriptionPlan({
channel: mockChannel as RpcSubscriptionsChannel<unknown, unknown>,
signal: abortController.signal,
subscribeMethodName: 'thingSubscribe',
subscribeParams: [],
subscribeRequest: { methodName: 'thingSubscribe', params: [] },
unsubscribeMethodName: 'thingUnsubscribe',
}).catch(() => {});
expect(mockChannel.on).toHaveBeenCalledWith('error', expect.any(Function), {
Expand All @@ -69,8 +67,7 @@ describe('executeRpcPubSubSubscriptionPlan', () => {
executeRpcPubSubSubscriptionPlan({
channel: mockChannel as RpcSubscriptionsChannel<unknown, unknown>,
signal: abortController.signal,
subscribeMethodName: 'thingSubscribe',
subscribeParams: expectedParams,
subscribeRequest: { methodName: 'thingSubscribe', params: expectedParams },
unsubscribeMethodName: 'thingUnsubscribe',
}).catch(() => {});
expect(mockSend).toHaveBeenCalledWith(
Expand All @@ -91,8 +88,7 @@ describe('executeRpcPubSubSubscriptionPlan', () => {
const publisherPromise = executeRpcPubSubSubscriptionPlan({
channel: mockChannel as RpcSubscriptionsChannel<unknown, unknown>,
signal: abortController.signal,
subscribeMethodName: 'thingSubscribe',
subscribeParams: [],
subscribeRequest: { methodName: 'thingSubscribe', params: [] },
unsubscribeMethodName: 'thingUnsubscribe',
});
await expect(publisherPromise).rejects.toBe('o no');
Expand All @@ -110,8 +106,7 @@ describe('executeRpcPubSubSubscriptionPlan', () => {
publisherPromise = executeRpcPubSubSubscriptionPlan({
channel: mockChannel as RpcSubscriptionsChannel<unknown, unknown>,
signal: abortController.signal,
subscribeMethodName: 'thingSubscribe',
subscribeParams: [],
subscribeRequest: { methodName: 'thingSubscribe', params: [] },
unsubscribeMethodName: 'thingUnsubscribe',
});
});
Expand All @@ -135,8 +130,7 @@ describe('executeRpcPubSubSubscriptionPlan', () => {
const publisherPromise = executeRpcPubSubSubscriptionPlan({
channel: mockChannel as RpcSubscriptionsChannel<unknown, unknown>,
signal: abortController.signal,
subscribeMethodName: 'thingSubscribe',
subscribeParams: [],
subscribeRequest: { methodName: 'thingSubscribe', params: [] },
unsubscribeMethodName: 'thingUnsubscribe',
});
await Promise.resolve();
Expand All @@ -158,8 +152,7 @@ describe('executeRpcPubSubSubscriptionPlan', () => {
channel: mockChannel as RpcSubscriptionsChannel<unknown, unknown>,
responseTransformer: mockResponseTransformer,
signal: abortController.signal,
subscribeMethodName: 'thingSubscribe',
subscribeParams: [],
subscribeRequest: { methodName: 'thingSubscribe', params: [] },
unsubscribeMethodName: 'thingUnsubscribe',
});
await jest.runAllTimersAsync();
Expand Down Expand Up @@ -285,8 +278,7 @@ describe('executeRpcPubSubSubscriptionPlan', () => {
executeRpcPubSubSubscriptionPlan({
channel: mockChannel as RpcSubscriptionsChannel<unknown, unknown>,
signal: secondAbortController.signal,
subscribeMethodName: 'thingSubscribe',
subscribeParams: [],
subscribeRequest: { methodName: 'thingSubscribe', params: [] },
unsubscribeMethodName: 'thingUnsubscribe',
}).catch(() => {});
await jest.runAllTimersAsync();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import {
SolanaError,
} from '@solana/errors';
import { safeRace } from '@solana/promises';
import { createRpcMessage, RpcResponseData } from '@solana/rpc-spec-types';
import { createRpcMessage, RpcRequest, RpcResponseData } from '@solana/rpc-spec-types';
import { DataPublisher } from '@solana/subscribable';
import { demultiplexDataPublisher } from '@solana/subscribable';

Expand All @@ -16,8 +16,7 @@ type Config<TNotification> = Readonly<{
channel: RpcSubscriptionsChannel<unknown, RpcNotification<TNotification> | RpcResponseData<RpcSubscriptionId>>;
responseTransformer?: <T>(response: unknown, notificationName: string) => T;
signal: AbortSignal;
subscribeMethodName: string;
subscribeParams?: unknown[];
subscribeRequest: RpcRequest;
unsubscribeMethodName: string;
}>;

Expand Down Expand Up @@ -98,8 +97,7 @@ export async function executeRpcPubSubSubscriptionPlan<TNotification>({
channel,
responseTransformer,
signal,
subscribeMethodName,
subscribeParams,
subscribeRequest,
unsubscribeMethodName,
}: Config<TNotification>): Promise<DataPublisher<RpcSubscriptionNotificationEvents<TNotification>>> {
let subscriptionId: number | undefined;
Expand Down Expand Up @@ -147,7 +145,7 @@ export async function executeRpcPubSubSubscriptionPlan<TNotification>({
* STEP 2
* Send the subscription request.
*/
const subscribePayload = createRpcMessage({ methodName: subscribeMethodName, params: subscribeParams });
const subscribePayload = createRpcMessage(subscribeRequest);
await channel.send(subscribePayload);
/**
* STEP 3
Expand Down

0 comments on commit 4614c57

Please sign in to comment.