Skip to content

Commit

Permalink
fix: ClientService is changed to StartStop
Browse files Browse the repository at this point in the history
  • Loading branch information
CMCDragonkai committed Oct 11, 2023
1 parent e0306a4 commit 493afa1
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 86 deletions.
153 changes: 73 additions & 80 deletions src/client/ClientService.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
import type {
IdGen,
JSONRPCRequest,
JSONRPCResponse,
MiddlewareFactory,
ServerManifest,
} from '@matrixai/rpc';
import type { TLSConfig } from '../network/types';
import Logger from '@matrixai/logger';
import { StartStop } from '@matrixai/async-init/dist/StartStop';
import { StartStop, ready } from '@matrixai/async-init/dist/StartStop';
import { running, status } from '@matrixai/async-init';
import { WebSocketServer, events as wsEvents } from '@matrixai/ws';
import { RPCServer } from '@matrixai/rpc';
import { middleware as rpcUtilsMiddleware } from '@matrixai/rpc';
import { RPCServer, middleware as rpcMiddleware } from '@matrixai/rpc';
import * as events from './events';
import * as errors from './errors';
import config from '../config';
import * as networkUtils from '../network/utils';
import config from '../config';

interface ClientService extends StartStop {}
@StartStop({
Expand All @@ -28,49 +28,68 @@ class ClientService {
protected webSocketServer: WebSocketServer;
protected logger: Logger;

protected handleEventWebSocketServerConnection = (
evt: wsEvents.EventWebSocketServerConnection,
) => {
const conn = evt.detail;
const streamHandler = (evt: wsEvents.EventWebSocketConnectionStream) => {
const stream = evt.detail;
if (!this.rpcServer[running] || this.rpcServer[status] === 'stopping') {
stream.cancel(Error('TMP RPCServer not running'));
return;
}
this.rpcServer.handleStream(stream);
};
conn.addEventListener(
wsEvents.EventWebSocketConnectionStream.name,
streamHandler,
);
conn.addEventListener(
wsEvents.EventWebSocketConnectionClose.name,
() => {
conn.removeEventListener(
wsEvents.EventWebSocketConnectionStream.name,
streamHandler,
);
},
{ once: true },
);
};

public constructor({
manifest,
tlsConfig,
options: {
middlewareFactory,
host = config.defaultsUser.clientServiceHost,
port = config.defaultsUser.clientServicePort,
keepAliveTimeoutTime = config.defaultsSystem.clientKeepAliveTimeoutTime,
keepAliveIntervalTime = config.defaultsSystem.clientKeepAliveIntervalTime,
rpcCallTimeoutTime = config.defaultsSystem.rpcCallTimeoutTime,
rpcParserBufferSize = config.defaultsSystem.rpcParserBufferSize,
},
logger = new Logger('ClientService Logger'),
middlewareFactory,
idGen = async () => null,
keepAliveTimeoutTime = config.defaultsSystem.clientKeepAliveTimeoutTime,
keepAliveIntervalTime = config.defaultsSystem.clientKeepAliveIntervalTime,
rpcCallTimeoutTime = config.defaultsSystem.rpcCallTimeoutTime,
rpcParserBufferSize = config.defaultsSystem.rpcParserBufferSize,
logger,
}: {
manifest: ServerManifest;
tlsConfig: TLSConfig;
options: {
middlewareFactory?: MiddlewareFactory<
JSONRPCRequest,
JSONRPCRequest,
JSONRPCResponse,
JSONRPCResponse
>;
host?: string;
port?: number;
keepAliveTimeoutTime?: number;
keepAliveIntervalTime?: number;
rpcCallTimeoutTime?: number;
rpcParserBufferSize?: number;
};
middlewareFactory?: MiddlewareFactory<
JSONRPCRequest,
JSONRPCRequest,
JSONRPCResponse,
JSONRPCResponse
>;
idGen?: IdGen;
keepAliveTimeoutTime?: number;
keepAliveIntervalTime?: number;
rpcCallTimeoutTime?: number;
rpcParserBufferSize?: number;
logger?: Logger;
}) {
this.logger = logger;
this.logger = logger ?? new Logger(this.constructor.name);
this.rpcServer = new RPCServer({
idGen: async () => null,
idGen,
handlerTimeoutTime: rpcCallTimeoutTime,
middlewareFactory: rpcUtilsMiddleware.defaultServerMiddlewareWrapper(
// ClientUtilsMiddleware.middlewareServer(sessionManager, keyRing),
middlewareFactory: rpcMiddleware.defaultServerMiddlewareWrapper(
middlewareFactory,
rpcParserBufferSize,
),
fromError: networkUtils.fromError,
logger: logger.getChild(RPCServer.name),
logger: this.logger.getChild(RPCServer.name),
});
this.webSocketServer = new WebSocketServer({
config: {
Expand All @@ -79,69 +98,41 @@ class ClientService {
keepAliveIntervalTime,
keepAliveTimeoutTime,
},
logger: logger.getChild(WebSocketServer.name),
logger: this.logger.getChild(WebSocketServer.name),
});
}

@ready(new errors.ErrorClientServiceNotRunning())
public get host() {
return this.webSocketServer.host;
}

@ready(new errors.ErrorClientServiceNotRunning())
public get port() {
return this.webSocketServer.port;
}

public async start({
manifest,
options: {
host = config.defaultsUser.clientServiceHost,
port = config.defaultsUser.clientServicePort,
},
host = config.defaultsUser.clientServiceHost,
port = config.defaultsUser.clientServicePort,
}: {
manifest: ServerManifest;
options: {
host?: string;
port?: number;
};
host?: string;
port?: number;
}): Promise<void> {
this.logger.info(`Start ${this.constructor.name}`);
await this.rpcServer.start({ manifest });
this.webSocketServer.addEventListener(
wsEvents.EventWebSocketServerConnection.name,
this.handleEventWebSocketServerConnection,
);
await this.rpcServer.start({ manifest });

await this.webSocketServer.start({
host,
port,
});
this.logger.info(`Started ${this.constructor.name}`);
}
get host() {
return this.webSocketServer.host;
}

get port() {
return this.webSocketServer.port;
}
protected handleEventWebSocketServerConnection = (
evt: wsEvents.EventWebSocketServerConnection,
) => {
const conn = evt.detail;
const streamHandler = (evt: wsEvents.EventWebSocketConnectionStream) => {
const stream = evt.detail;
if (!this.rpcServer[running] || this.rpcServer[status] === 'stopping') {
stream.cancel(Error('TMP RPCServer not running'));
return;
}
this.rpcServer.handleStream(stream);
};
conn.addEventListener(
wsEvents.EventWebSocketConnectionStream.name,
streamHandler,
);
conn.addEventListener(
wsEvents.EventWebSocketConnectionClose.name,
() => {
conn.removeEventListener(
wsEvents.EventWebSocketConnectionStream.name,
streamHandler,
);
},
{ once: true },
);
};

public async stop({
force = false,
Expand All @@ -155,6 +146,8 @@ class ClientService {
await this.rpcServer.stop({ force: true });
this.logger.info(`Stopped ${this.constructor.name}`);
}

@ready(new errors.ErrorClientServiceNotRunning())
public setTlsConfig(tlsConfig: TLSConfig): void {
this.webSocketServer.updateConfig({
key: tlsConfig.keyPrivatePem,
Expand Down
6 changes: 0 additions & 6 deletions src/client/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,11 @@ class EventClientServiceStop extends EventsClientService {}

class EventClientServiceStopped extends EventsClientService {}

class EventClientServiceDestroy extends EventsClientService {}

class EventClientServiceDestroyed extends EventsClientService {}

export {
EventsClient,
EventsClientService,
EventClientServiceStart,
EventClientServiceStarted,
EventClientServiceStop,
EventClientServiceStopped,
EventClientServiceDestroy,
EventClientServiceDestroyed,
};

0 comments on commit 493afa1

Please sign in to comment.