Skip to content

Commit

Permalink
fix: rpc support multiple params
Browse files Browse the repository at this point in the history
  • Loading branch information
wzhudev committed Dec 10, 2024
1 parent d6415f9 commit 9159bc5
Showing 1 changed file with 15 additions and 17 deletions.
32 changes: 15 additions & 17 deletions packages/rpc/src/services/rpc/rpc.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
* limitations under the License.
*/

/* eslint-disable ts/no-explicit-any */

import type { Subscription } from 'rxjs';
import { RxDisposable } from '@univerjs/core';
import { BehaviorSubject, firstValueFrom, isObservable, Observable, of } from 'rxjs';
Expand All @@ -23,9 +25,7 @@ import { filter, take, takeUntil } from 'rxjs/operators';

/** This protocol is for transferring data from the two peer univer instance running in different locations. */
export interface IMessageProtocol {
// eslint-disable-next-line ts/no-explicit-any
send(message: any): void;
// eslint-disable-next-line ts/no-explicit-any
onMessage: Observable<any>;
}

Expand All @@ -36,10 +36,8 @@ export interface IMessageProtocol {
* event sources are usually provided by the same service or controller.
*/
export interface IChannel {
// eslint-disable-next-line ts/no-explicit-any
call<T>(method: string, args?: any): Promise<T>;
// eslint-disable-next-line ts/no-explicit-any
subscribe<T>(event: string, args?: any): Observable<T>;
call<T>(method: string, ...args: any[]): Promise<T>;
subscribe<T>(event: string, ...args: any[]): Observable<T>;
}

/**
Expand All @@ -57,10 +55,10 @@ export function fromModule(module: unknown): IChannel {
// const observables = new Map<string, Observable<any>>();

return new (class implements IChannel {
call<T>(method: string, args?: any): Promise<T> {
call<T>(method: string, args: any[]): Promise<T> {
const target = handler[method];
if (typeof target === 'function') {
let res = target.apply(handler, [args]);
let res = args ? target.apply(handler, args) : target.call(handler);
if (!(res instanceof Promise)) {
res = Promise.resolve(res);
}
Expand All @@ -70,10 +68,10 @@ export function fromModule(module: unknown): IChannel {
throw new Error(`[RPC]: method not found for ${method}!`);
}

subscribe<T>(eventMethod: string, args: any): Observable<T> {
subscribe<T>(eventMethod: string, ...args: any[]): Observable<T> {
const target = handler[eventMethod];
if (typeof target === 'function') {
const res = target.apply(handler, args);
const res = args ? target.call(handler, ...args) : target.call(handler);
if (!isObservable(res)) {
return of(res);
}
Expand Down Expand Up @@ -104,11 +102,11 @@ export function toModule<T extends object>(channel: IChannel): T {
return function (...args: any[]) {
const isObservable = propertyIsEventSource(propKey);
if (isObservable) {
const observable = channel.subscribe(propKey, args[0]);
const observable = channel.subscribe(propKey, ...args);
return observable;
}

return channel.call(propKey, args[0]);
return channel.call(propKey, ...args);
};
},
});
Expand Down Expand Up @@ -153,7 +151,7 @@ interface IRPCRequest {
type: RequestType;
channelName: string;
method: string;
args?: any;
args?: any[];
}

enum ResponseType {
Expand Down Expand Up @@ -208,7 +206,7 @@ export class ChannelClient extends RxDisposable implements IChannelClient {
const self = this;

return {
call(method: string, args?: any) {
call(method: string, ...args: any[]) {
if (self._disposed) {
return Promise.reject();
}
Expand All @@ -234,7 +232,7 @@ export class ChannelClient extends RxDisposable implements IChannelClient {
);
}

private async _remoteCall(channelName: string, method: string, args?: any): Promise<any> {
private async _remoteCall(channelName: string, method: string, args: any[]): Promise<any> {
await this._whenReady();

const sequence = ++this._lastRequestCounter;
Expand Down Expand Up @@ -267,7 +265,7 @@ export class ChannelClient extends RxDisposable implements IChannelClient {
});
}

private _remoteSubscribe(channelName: string, method: string, args?: any): Observable<any> {
private _remoteSubscribe(channelName: string, method: string, args: any[]): Observable<any> {
return new Observable((subscriber) => {
let sequence: number = -1;
this._whenReady().then(() => {
Expand Down Expand Up @@ -388,7 +386,7 @@ export class ChannelServer extends RxDisposable implements IChannelServer {
if (!channel) {
throw new Error(`[ChannelServer]: Channel ${channelName} not found!`);
}
promise = channel.call(method, args);
promise = args ? channel.call(method, args) : channel.call(method);
} catch (err: unknown) {
promise = Promise.reject(err);
}
Expand Down

0 comments on commit 9159bc5

Please sign in to comment.