diff --git a/src/nodes/NodeConnectionManager.ts b/src/nodes/NodeConnectionManager.ts index bc24ab83b..8b1a0266b 100644 --- a/src/nodes/NodeConnectionManager.ts +++ b/src/nodes/NodeConnectionManager.ts @@ -11,7 +11,12 @@ import type { TLSConfig, } from '../network/types'; import type { AgentServerManifest } from './agent/handlers'; -import type { NodeId, NodeIdString } from './types'; +import type { + AuthenticateNetworkForwardCallback, + AuthenticateNetworkReverseCallback, + NodeId, + NodeIdString +} from './types'; import { events as quicEvents, QUICServer, @@ -47,6 +52,7 @@ import * as networkUtils from '../network/utils'; import * as utils from '../utils'; import RateLimiter from '../utils/ratelimiter/RateLimiter'; import config from '../config'; +import {NodesAuthenticateConnectionMessage} from "@/nodes/agent/types"; type ConnectionAndTimer = { connection: NodeConnection; @@ -57,6 +63,11 @@ type ConnectionAndTimer = { type ConnectionsEntry = { activeConnection: string; connections: Record; + authenticatedForward: boolean; + authenticatedReverse: boolean; + authenticatedP: Promise; + authenticatedResolveP: (value: void) => void; + authenticatedRejectP: (reason?: any) => void; }; type ConnectionInfo = { @@ -76,6 +87,16 @@ const abortPendingConnectionsReason = Symbol( 'abort pending connections reason', ); +const timerCancellationReason = Symbol('timer cancellation reason'); + +const activePunchCancellationReason = Symbol( + 'active punch cancellation reason', +); + +const activeForwardAuthenticateCancellationReason = Symbol( + 'active forward authenticate cancellation reason', +); + /** * NodeConnectionManager is a server that manages all node connections. * It manages both initiated and received connections. @@ -178,6 +199,22 @@ class NodeConnectionManager { * This is mainly used to limit a single source node making too many requests through a relay. */ protected rateLimiter = new RateLimiter(60000, 20, 10, 1); + /** + * Used to track the active authentication RPC calls + */ + protected activeForwardAuthenticateCalls = new Map< + string, + PromiseCancellable + >(); + + /** + * Callback used to generate authentication data when making the authentication call + */ + protected authenticateForwardCallback: AuthenticateNetworkForwardCallback; + /** + * Callback used to authenticate the peer when processing an authentication request from the peer + */ + protected authenticateReverseCallback: AuthenticateNetworkReverseCallback; protected logger: Logger; protected keyRing: KeyRing; @@ -187,8 +224,8 @@ class NodeConnectionManager { protected quicServer: QUICServer; /** - * Data structure to store all NodeConnections. If a connection to a node n does - * not exist, no entry for n will exist in the map. Alternatively, if a + * Data structure to store all NodeConnections. If a connection to a node `N` does + * not exist, no entry for `N` will exist in the map. Alternatively, if a * connection is currently being instantiated by some thread, an entry will * exist in the map, but only with the lock (no connection object). Once a * connection is instantiated, the entry in the map is updated to include the @@ -244,7 +281,7 @@ class NodeConnectionManager { const connectionAndTimer = connectionsEntry.connections[connectionId]; if (connectionAndTimer == null) utils.never('should have a connection'); connectionAndTimer.usageCount += 1; - connectionAndTimer.timer?.cancel(); + connectionAndTimer.timer?.cancel(timerCancellationReason); connectionAndTimer.timer = null; void stream.closedP.finally(() => { connectionAndTimer.usageCount -= 1; @@ -262,6 +299,8 @@ class NodeConnectionManager { await this.destroyConnection(nodeId, false, connectionId), delay, }); + // Prevent unhandled exceptions when cancelling + connectionAndTimer.timer.catch(() => {}); } }); }; @@ -347,6 +386,159 @@ class NodeConnectionManager { } }; + public forwardAuthenticate( + nodeId: NodeId, + ctx?: Partial, + ): PromiseCancellable; + @timedCancellable( + true, + (nodeConnectionManager: NodeConnectionManager) => + nodeConnectionManager.connectionConnectTimeoutTime, + ) + public async forwardAuthenticate( + nodeId: NodeId, + @context ctx: ContextTimed, + ): Promise { + const targetNodeIdString = nodeId.toString() as NodeIdString; + const connectionsEntry = this.connections.get(targetNodeIdString); + if (connectionsEntry == null) { + throw new nodesErrors.ErrorNodeConnectionManagerConnectionNotFound(); + } + // Need to make an authenticate request here. Get the connection and RPC. + try { + const authenticateMessage = await this.authenticateForwardCallback(ctx); + await this.withConnF(nodeId, async (conn) => { + await conn.rpcClient.methods.nodesAuthenticateConnection(authenticateMessage, ctx); + }); + } catch (e) { + // Handle authentication failures + await this.authenticateFail(nodeId, e); + // throw e; + } + connectionsEntry.authenticatedForward = true; + this.logger.warn( + `Node ${nodesUtils.encodeNodeId(nodeId)} has been forward authenticated`, + ); + if (connectionsEntry.authenticatedReverse) { + this.logger.warn( + `Node ${nodesUtils.encodeNodeId(nodeId)} has been fully authenticated`, + ); + connectionsEntry.authenticatedResolveP(); + } + } + + public handleReverseAuthenticate( + nodeId: NodeId, + message: NodesAuthenticateConnectionMessage, + ctx?: Partial, + ): PromiseCancellable; + @timedCancellable( + true, + (nodeConnectionManager: NodeConnectionManager) => + nodeConnectionManager.connectionConnectTimeoutTime, + ) + public async handleReverseAuthenticate( + nodeId: NodeId, + message: NodesAuthenticateConnectionMessage, + @context ctx: ContextTimed, + ): Promise { + const targetNodeIdString = nodeId.toString() as NodeIdString; + const connectionsEntry = this.connections.get(targetNodeIdString); + if (connectionsEntry == null) { + throw new nodesErrors.ErrorNodeConnectionManagerConnectionNotFound(); + } + try { + // Should resolve without issue if authentication succeeds. + await this.authenticateReverseCallback(message, ctx); + connectionsEntry.authenticatedReverse = true; + } catch (e) { + await this.authenticateFail(nodeId, e); + // Throw back up the RPC + throw e; + } + this.logger.warn( + `Node ${nodesUtils.encodeNodeId(nodeId)} has been reverse authenticated`, + ); + if (connectionsEntry.authenticatedForward) { + this.logger.warn( + `Node ${nodesUtils.encodeNodeId(nodeId)} has been fully authenticated`, + ); + connectionsEntry.authenticatedResolveP(); + } + } + + // TODO: There's a race condition with this and awaiting authentication. There are cases where authentication can complete before we can even start awaiting authenctication. We need to handle this better. + // It's better handled by starting the wait for authentication and then initiating it but the ordering of this sucks. + // best thing for now is to allow awaiting the authentication before even triggering the connections. + /** + * Will initiate a forward authentication call and coalesce + */ + public initiateForwardAuthenticate(nodeId: NodeId) { + // Needs check the map if one is already running, otherwise it needs to start one and manage it. + const nodeIdString = nodeId.toString(); + const existingAuthenticate = + this.activeForwardAuthenticateCalls.get(nodeIdString); + // If it exists in the map then we don't need to start one and can just return + if (existingAuthenticate != null) return; + // Otherwise we need to start one and add it to the map + const forwardAuthenticateP = this.forwardAuthenticate(nodeId).finally( + () => { + this.activeForwardAuthenticateCalls.delete(nodeIdString); + }, + ); + // Prevent unhandled errors + forwardAuthenticateP.then( + () => {}, + () => {}, + ); + this.activeForwardAuthenticateCalls.set(nodeIdString, forwardAuthenticateP); + } + + /** + * Returns true if the connection has been authenticated + */ + public isAuthenticated(nodeId: NodeId): boolean { + const targetNodeIdString = nodeId.toString() as NodeIdString; + const connectionsEntry = this.connections.get(targetNodeIdString); + if (connectionsEntry == null) return false; + return ( + connectionsEntry.authenticatedForward && + connectionsEntry.authenticatedReverse + ); + } + + /** + * Returns a promise that resolves once the connection has authenticated, + * otherwise it rejects with the authentication failure + * @param nodeId + */ + public async isAuthenticatedP(nodeId: NodeId): Promise { + const targetNodeIdString = nodeId.toString() as NodeIdString; + const connectionsEntry = this.connections.get(targetNodeIdString); + if (connectionsEntry == null) { + throw new nodesErrors.ErrorNodeConnectionManagerConnectionNotFound(); + } + try { + return await connectionsEntry.authenticatedP; + } catch (e) { + Error.captureStackTrace(e); + throw e; + } + } + + protected async authenticateFail(nodeId: NodeId, reason: any) { + this.logger.warn(`Authentication failed with ${reason.message}`); + const targetNodeIdString = nodeId.toString() as NodeIdString; + const connectionsEntry = this.connections.get(targetNodeIdString); + if (connectionsEntry == null) { + throw new nodesErrors.ErrorNodeConnectionManagerConnectionNotFound(); + } + const rejectAuthentication = connectionsEntry.authenticatedRejectP; + // Trigger shutdown of the connections + await this.destroyConnection(nodeId, true); + rejectAuthentication(Error('TMP IMP authentication failed error', { cause: reason })); + }; + /** * Constructs the `NodeConnectionManager`. */ @@ -372,7 +564,8 @@ class NodeConnectionManager { .nodesConnectionHolePunchIntervalTime, rpcParserBufferSize = config.defaultsSystem.rpcParserBufferSize, rpcCallTimeoutTime = config.defaultsSystem.rpcCallTimeoutTime, - + authenticateForwardCallback = async () => { throw Error('TMP IMP no authentication callback provided')}, + authenticateReverseCallback = async () => { throw Error('TMP IMP no authentication callback provided')}, logger, }: { keyRing: KeyRing; @@ -388,6 +581,8 @@ class NodeConnectionManager { connectionHolePunchIntervalTime?: number; rpcParserBufferSize?: number; rpcCallTimeoutTime?: number; + authenticateForwardCallback?: AuthenticateNetworkForwardCallback; + authenticateReverseCallback?: AuthenticateNetworkReverseCallback; logger?: Logger; }) { this.logger = logger ?? new Logger(this.constructor.name); @@ -404,6 +599,8 @@ class NodeConnectionManager { this.connectionHolePunchIntervalTime = connectionHolePunchIntervalTime; this.rpcParserBufferSize = rpcParserBufferSize; this.rpcCallTimeoutTime = rpcCallTimeoutTime; + this.authenticateForwardCallback = authenticateForwardCallback; + this.authenticateReverseCallback = authenticateReverseCallback; const quicSocket = new QUICSocket({ resolveHostname: () => { @@ -571,25 +768,34 @@ class NodeConnectionManager { ); this.quicSocket.removeEventListener(EventAll.name, this.handleEventAll); - const destroyProms: Array> = []; + const destroyConnectionPs: Array> = []; + const cancelSignallingPs: Array | Promise> = + []; + const cancelAuthenticationPs: Array> = []; for (const [nodeId] of this.connections) { // It exists so we want to destroy it const destroyProm = this.destroyConnection( IdInternal.fromString(nodeId), force, ); - destroyProms.push(destroyProm); + destroyConnectionPs.push(destroyProm); } - await Promise.all(destroyProms); - const signallingProms: Array | Promise> = []; for (const [, activePunch] of this.activeHolePunchPs) { - signallingProms.push(activePunch); - activePunch.cancel(); + cancelSignallingPs.push(activePunch); + activePunch.cancel(activePunchCancellationReason); } for (const activeSignal of this.activeSignalFinalPs) { - signallingProms.push(activeSignal); + cancelSignallingPs.push(activeSignal); + } + for (const activeForwardAuthenticateCall of this.activeForwardAuthenticateCalls.values()) { + cancelAuthenticationPs.push(activeForwardAuthenticateCall); + activeForwardAuthenticateCall.cancel( + activeForwardAuthenticateCancellationReason, + ); } - await Promise.allSettled(signallingProms); + await Promise.all(destroyConnectionPs); + await Promise.allSettled(cancelSignallingPs); + await Promise.allSettled(cancelAuthenticationPs); await this.quicServer.stop({ force: true }); await this.quicSocket.stop({ force: true }); await this.rpcServer.stop({ force: true }); @@ -628,7 +834,7 @@ class NodeConnectionManager { // Increment usage count, and cancel timer connectionAndTimer.usageCount += 1; - connectionAndTimer.timer?.cancel(); + connectionAndTimer.timer?.cancel(timerCancellationReason); connectionAndTimer.timer = null; // Return tuple of [ResourceRelease, Resource] return [ @@ -648,9 +854,15 @@ class NodeConnectionManager { ); connectionAndTimer.timer = new Timer({ handler: async () => - await this.destroyConnection(targetNodeId, false), + await this.destroyConnection( + targetNodeId, + false, + connectionAndTimer.connection.connectionId, + ), delay, }); + // Prevent unhandled exceptions when cancelling + connectionAndTimer.timer.catch(() => {}); } }, connectionAndTimer.connection, @@ -924,11 +1136,28 @@ class NodeConnectionManager { await this.destroyConnection(nodeId, false, connectionId), delay: this.getStickyTimeoutValue(nodeId, true), }); + // Prevent unhandled exceptions when cancelling + newConnAndTimer.timer.catch(() => {}); + const { + p: authenticatedP, + resolveP: authenticatedResolveP, + rejectP: authenticatedRejectP, + } = utils.promise(); + // Prevent unhandled rejections + authenticatedP.then( + () => {}, + () => {}, + ); entry = { activeConnection: connectionId, connections: { [connectionId]: newConnAndTimer, }, + authenticatedForward: false, + authenticatedReverse: false, + authenticatedP, + authenticatedResolveP, + authenticatedRejectP, }; this.connections.set(nodeIdString, entry); } else { @@ -940,6 +1169,8 @@ class NodeConnectionManager { entry.activeConnection > connectionId, ), }); + // Prevent unhandled exceptions when cancelling + newConnAndTimer.timer.catch(() => {}); // Updating existing entry entry.connections[connectionId] = newConnAndTimer; // If the new connection ID is less than the old then replace it @@ -991,7 +1222,8 @@ class NodeConnectionManager { ); destroyPs.push(connAndTimer.connection.destroy({ force })); // Destroying TTL timer - if (connAndTimer.timer != null) connAndTimer.timer.cancel(); + connAndTimer.timer?.cancel(timerCancellationReason); + connAndTimer.timer = null; delete connections[connectionId]; } } @@ -1231,17 +1463,24 @@ class NodeConnectionManager { } const holePunchAttempt = new PromiseCancellable( async (res, rej, signal) => { - await semaphore!.withF(async () => { - this.holePunch(host, port, { signal }) - .finally(() => { - this.activeHolePunchPs.delete(id); - if (semaphore!.count === 0) { - this.activeHolePunchAddresses.delete(host); - } - }) - .then(res, rej); - }); + await semaphore! + .withF(async () => { + await this.holePunch(host, port, { signal }); + }) + .finally(() => { + this.activeHolePunchPs.delete(id); + if (semaphore!.count === 0) { + this.activeHolePunchAddresses.delete(host); + } + }) + .then(res, rej); }, + ).finally(() => { + this.activeHolePunchPs.delete(id); + }); + holePunchAttempt.then( + () => {}, + () => {}, ); this.activeHolePunchPs.set(id, holePunchAttempt); } diff --git a/src/nodes/NodeManager.ts b/src/nodes/NodeManager.ts index 6ab4b5b8e..4d948198a 100644 --- a/src/nodes/NodeManager.ts +++ b/src/nodes/NodeManager.ts @@ -50,6 +50,7 @@ import * as nodesEvents from './events'; import * as nodesErrors from './errors'; import * as agentErrors from './agent/errors'; import NodeConnectionQueue from './NodeConnectionQueue'; +import { ErrorNodeManagerFindNodeFailed } from './errors'; import { assertClaimNetworkAuthority } from '../claims/payloads/claimNetworkAuthority'; import { assertClaimNetworkAccess } from '../claims/payloads/claimNetworkAccess'; import Token from '../tokens/Token'; @@ -643,7 +644,14 @@ class NodeManager { try { return await Promise.any([findBySignal, findByDirect, findByMDNS]); } catch (e) { - // FIXME: check error type and throw if not connection related failure + if (e instanceof AggregateError) { + for (const error of e.errors) { + // Checking if each error is an expected error + if (!(error instanceof ErrorNodeManagerFindNodeFailed)) throw e; + } + } else if (!(e instanceof ErrorNodeManagerFindNodeFailed)) { + throw e; + } return; } finally { abortController.abort(abortPendingConnectionsReason); diff --git a/src/nodes/agent/callers/index.ts b/src/nodes/agent/callers/index.ts index e213dbb4e..c88bf3da2 100644 --- a/src/nodes/agent/callers/index.ts +++ b/src/nodes/agent/callers/index.ts @@ -1,3 +1,4 @@ +import nodesAuthenticateConnection from './nodesAuthenticateConnection'; import nodesClaimsGet from './nodesClaimsGet'; import nodesClosestActiveConnectionsGet from './nodesClosestActiveConnectionsGet'; import nodesClosestLocalNodesGet from './nodesClosestLocalNodesGet'; @@ -15,6 +16,7 @@ import vaultsScan from './vaultsScan'; * Client manifest */ const manifestClient = { + nodesAuthenticateConnection, nodesClaimsGet, nodesClosestActiveConnectionsGet, nodesClosestLocalNodesGet, @@ -34,6 +36,7 @@ type AgentClientManifest = typeof manifestClient; export default manifestClient; export { + nodesAuthenticateConnection, nodesClaimsGet, nodesClosestActiveConnectionsGet, nodesClosestLocalNodesGet, diff --git a/src/nodes/agent/callers/nodesAuthenticateConnection.ts b/src/nodes/agent/callers/nodesAuthenticateConnection.ts new file mode 100644 index 000000000..d8814a60c --- /dev/null +++ b/src/nodes/agent/callers/nodesAuthenticateConnection.ts @@ -0,0 +1,12 @@ +import type { HandlerTypes } from '@matrixai/rpc'; +import type NodesAuthenticateConnection from '../handlers/NodesAuthenticateConnection'; +import { UnaryCaller } from '@matrixai/rpc'; + +type CallerTypes = HandlerTypes; + +const nodesAuthenticateConnection = new UnaryCaller< + CallerTypes['input'], + CallerTypes['output'] +>(); + +export default nodesAuthenticateConnection; diff --git a/src/nodes/agent/handlers/NodesAuthenticateConnection.ts b/src/nodes/agent/handlers/NodesAuthenticateConnection.ts new file mode 100644 index 000000000..4eacda884 --- /dev/null +++ b/src/nodes/agent/handlers/NodesAuthenticateConnection.ts @@ -0,0 +1,43 @@ +import type { + AgentRPCRequestParams, + AgentRPCResponseResult, + SuccessMessage, +} from '../types'; +import type NodeConnectionManager from '../../../nodes/NodeConnectionManager'; +import type { JSONValue } from '../../../types'; +import type { ContextTimed } from '@matrixai/contexts'; +import { UnaryHandler } from '@matrixai/rpc'; +import * as agentErrors from '../errors'; +import * as agentUtils from '../utils'; + +class NodesAuthenticateConnection extends UnaryHandler< + { + nodeConnectionManager: NodeConnectionManager; + }, + AgentRPCRequestParams, + AgentRPCResponseResult +> { + public handle = async ( + _input, + _cancel, + meta: Record | undefined, + ctx: ContextTimed, + ): Promise> => { + const { nodeConnectionManager } = this.container; + // Connections should always be validated + const requestingNodeId = agentUtils.nodeIdFromMeta(meta); + if (requestingNodeId == null) { + throw new agentErrors.ErrorAgentNodeIdMissing(); + } + await nodeConnectionManager.handleReverseAuthenticate( + requestingNodeId, + ctx, + ); + return { + type: 'success', + success: true, + }; + }; +} + +export default NodesAuthenticateConnection; diff --git a/src/nodes/agent/handlers/index.ts b/src/nodes/agent/handlers/index.ts index 1a212c510..4c0af86c5 100644 --- a/src/nodes/agent/handlers/index.ts +++ b/src/nodes/agent/handlers/index.ts @@ -8,6 +8,7 @@ import type NodeManager from '../../../nodes/NodeManager'; import type NodeConnectionManager from '../../../nodes/NodeConnectionManager'; import type NotificationsManager from '../../../notifications/NotificationsManager'; import type VaultManager from '../../../vaults/VaultManager'; +import NodesAuthenticateConnection from './NodesAuthenticateConnection'; import NodesClaimsGet from './NodesClaimsGet'; import NodesClosestActiveConnectionsGet from './NodesClosestActiveConnectionsGet'; import NodesClosestLocalNodesGet from './NodesClosestLocalNodesGet'; @@ -36,6 +37,7 @@ const manifestServer = (container: { vaultManager: VaultManager; }) => { return { + nodesAuthenticateConnection: new NodesAuthenticateConnection(container), nodesClaimsGet: new NodesClaimsGet(container), nodesClosestActiveConnectionsGet: new NodesClosestActiveConnectionsGet( container, @@ -57,6 +59,7 @@ type AgentServerManifest = ReturnType; export default manifestServer; export { + NodesAuthenticateConnection, NodesClaimsGet, NodesClosestActiveConnectionsGet, NodesClosestLocalNodesGet, diff --git a/src/nodes/agent/types.ts b/src/nodes/agent/types.ts index abec3d5a4..e4501cff0 100644 --- a/src/nodes/agent/types.ts +++ b/src/nodes/agent/types.ts @@ -8,7 +8,7 @@ import type { ClaimIdEncoded, NodeIdEncoded, VaultIdEncoded } from '../../ids'; import type { VaultAction, VaultName } from '../../vaults/types'; import type { SignedNotification } from '../../notifications/types'; import type { Host, Hostname, Port } from '../../network/types'; -import type { NodeContact } from '../../nodes/types'; +import type { NetworkId, NodeContact } from '../../nodes/types'; type AgentRPCRequestParams = JSONRPCRequestParams; @@ -77,6 +77,23 @@ type VaultsScanMessage = VaultInfo & { vaultPermissions: Array; }; +type SuccessMessage = { + type: 'success'; + success: boolean; +}; + +type NodesAuthenticateConnectionMessage = + | NodesAuthenticateConnectionMessageBasicPublic + | NodesAuthenticateConnectionMessageNone; + +type NodesAuthenticateConnectionMessageBasicPublic = { + type: 'NodesAuthenticateConnectionMessageBasicPublic'; + networkId: NetworkId; +} +type NodesAuthenticateConnectionMessageNone = { + type: 'NodesAuthenticateConnectionMessageNone'; +} + export type { AgentRPCRequestParams, AgentRPCResponseResult, @@ -91,4 +108,8 @@ export type { SignedNotificationEncoded, VaultInfo, VaultsScanMessage, + SuccessMessage, + NodesAuthenticateConnectionMessage, + NodesAuthenticateConnectionMessageBasicPublic, + NodesAuthenticateConnectionMessageNone, }; diff --git a/src/nodes/types.ts b/src/nodes/types.ts index 8add7555b..00302e240 100644 --- a/src/nodes/types.ts +++ b/src/nodes/types.ts @@ -1,6 +1,8 @@ import type { NodeId, NodeIdString, NodeIdEncoded } from '../ids/types'; import type { Host, Hostname, Port } from '../network/types'; import type { Opaque } from '../types'; +import {NodesAuthenticateConnectionMessage} from "@/nodes/agent/types"; +import {ContextTimed} from "@matrixai/contexts"; /** * Key indicating which space the NodeGraph is in @@ -71,6 +73,13 @@ enum ConnectionErrorReason { ForceClose = 'NodeConnection is forcing destruction', } +type NetworkId = string; +type AuthenticateNetworkForwardCallback = (ctx: ContextTimed) => Promise; +/** + * Callback should throw on authentication failure + */ +type AuthenticateNetworkReverseCallback = (message: NodesAuthenticateConnectionMessage, ctx: ContextTimed) => Promise; + export type { NodeId, NodeIdString, @@ -85,6 +94,9 @@ export type { NodeBucketMeta, NodeBucket, NodeGraphSpace, + NetworkId, + AuthenticateNetworkForwardCallback, + AuthenticateNetworkReverseCallback, }; export { ConnectionErrorCode, ConnectionErrorReason }; diff --git a/tests/nodes/NodeConnectionManager.test.ts b/tests/nodes/NodeConnectionManager.test.ts index cfd32f5de..f8635b862 100644 --- a/tests/nodes/NodeConnectionManager.test.ts +++ b/tests/nodes/NodeConnectionManager.test.ts @@ -13,6 +13,7 @@ import NodeConnectionManager from '@/nodes/NodeConnectionManager'; import NodesConnectionSignalFinal from '@/nodes/agent/handlers/NodesConnectionSignalFinal'; import NodesConnectionSignalInitial from '@/nodes/agent/handlers/NodesConnectionSignalInitial'; import * as utils from '@/utils'; +import NodesAuthenticateConnection from '@/nodes/agent/handlers/NodesAuthenticateConnection'; import * as nodesTestUtils from './utils'; import * as keysTestUtils from '../keys/utils'; import * as testsUtils from '../utils'; @@ -91,7 +92,13 @@ describe(`${NodeConnectionManager.name}`, () => { }, startOptions: { host: localHost, - agentService: () => dummyManifest, + agentService: (nodeConnectionManager) => { + return { + nodesAuthenticateConnection: new NodesAuthenticateConnection({ + nodeConnectionManager, + }), + } as AgentServerManifest; + }, }, logger: logger.getChild(`${NodeConnectionManager.name}Local`), }); @@ -103,12 +110,20 @@ describe(`${NodeConnectionManager.name}`, () => { }, startOptions: { host: localHost, - agentService: () => dummyManifest, + agentService: (nodeConnectionManager) => { + return { + nodesAuthenticateConnection: new NodesAuthenticateConnection({ + nodeConnectionManager, + }), + } as AgentServerManifest; + }, }, logger: logger.getChild(`${NodeConnectionManager.name}Peer1`), }); + logger.warn('----------------------------------------------'); }); afterEach(async () => { + logger.warn('----------------------------------------------'); await ncmLocal.nodeConnectionManager.stop({ force: true }); await ncmPeer1.nodeConnectionManager.stop({ force: true }); }); @@ -319,6 +334,7 @@ describe(`${NodeConnectionManager.name}`, () => { const connectionAndTimer = ncmLocal.nodeConnectionManager.getConnection( ncmPeer1.nodeId, ); + await ncmLocal.nodeConnectionManager.isAuthenticatedP(ncmPeer1.nodeId); await ncmLocal.nodeConnectionManager.withConnF( ncmPeer1.nodeId, async () => { @@ -524,6 +540,7 @@ describe(`${NodeConnectionManager.name}`, () => { localHost, ncmPeer1.port, ); + await ncmLocal.nodeConnectionManager.isAuthenticatedP(ncmPeer1.nodeId); // Wait for timeout. await ncmLocal.nodeConnectionManager.withConnF( ncmPeer1.nodeId, @@ -560,7 +577,6 @@ describe(`${NodeConnectionManager.name}`, () => { expect(connection.address.port).toBe( ncmPeer1.nodeConnectionManager.port, ); - expect(connection.usageCount).toBe(0); } }); test('stopping NodeConnectionManager should destroy all connections', async () => { @@ -645,6 +661,42 @@ describe(`${NodeConnectionManager.name}`, () => { ncmLocal.nodeConnectionManager.hasConnection(ncmPeer1.nodeId), ).toBeFalse(); }); + + // TODO: This is a temp test + test('can authenticate a connection', async () => { + await ncmLocal.nodeConnectionManager.createConnection( + [ncmPeer1.nodeId], + localHost, + ncmPeer1.port, + ); + // Should exist in the map now. + expect( + ncmLocal.nodeConnectionManager.hasConnection(ncmPeer1.nodeId), + ).toBeTrue(); + expect( + ncmLocal.nodeConnectionManager.isAuthenticated(ncmPeer1.nodeId), + ).toBeFalse(); + await ncmLocal.nodeConnectionManager.isAuthenticatedP(ncmPeer1.nodeId); + expect( + ncmLocal.nodeConnectionManager.isAuthenticated(ncmPeer1.nodeId), + ).toBeTrue(); + }); + test('can fail authentication', async () => { + await ncmLocal.nodeConnectionManager.createConnection( + [ncmPeer1.nodeId], + localHost, + ncmPeer1.port, + ); + // Should exist in the map now. + expect( + ncmLocal.nodeConnectionManager.isAuthenticated(ncmPeer1.nodeId), + ).toBeFalse(); + await expect(ncmLocal.nodeConnectionManager.isAuthenticatedP(ncmPeer1.nodeId)).rejects.not.toThrow(); + expect( + ncmLocal.nodeConnectionManager.isAuthenticated(ncmPeer1.nodeId), + ).toBeFalse(); + }); + test.todo('Cant make most RPC calls while unauthenticated'); }); describe('With 2 peers', () => { let ncmLocal: NCMState; @@ -659,7 +711,13 @@ describe(`${NodeConnectionManager.name}`, () => { }, startOptions: { host: localHost, - agentService: () => dummyManifest, + agentService: (nodeConnectionManager) => { + return { + nodesAuthenticateConnection: new NodesAuthenticateConnection({ + nodeConnectionManager, + }), + } as AgentServerManifest; + }, }, logger: logger.getChild(`${NodeConnectionManager.name}Local`), }); @@ -673,6 +731,9 @@ describe(`${NodeConnectionManager.name}`, () => { host: localHost, agentService: (nodeConnectionManager) => ({ + nodesAuthenticateConnection: new NodesAuthenticateConnection({ + nodeConnectionManager, + }), nodesConnectionSignalFinal: new NodesConnectionSignalFinal({ nodeConnectionManager, logger, @@ -694,6 +755,9 @@ describe(`${NodeConnectionManager.name}`, () => { host: localHost, agentService: (nodeConnectionManager) => ({ + nodesAuthenticateConnection: new NodesAuthenticateConnection({ + nodeConnectionManager, + }), nodesConnectionSignalFinal: new NodesConnectionSignalFinal({ nodeConnectionManager, logger, @@ -894,3 +958,17 @@ describe(`${NodeConnectionManager.name}`, () => { }); }); }); + +test('asd', async () => { + const timer = new Timer({ + delay: 1000, + }); + console.log('waiting'); + timer.cancel(Error('boi')); + timer.catch(() => {}); + await timer.then( + () => {}, + () => {}, + ); + console.log('done'); +});