Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ci: merge staging to master #851

Merged
merged 3 commits into from
Dec 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "polykey",
"version": "1.16.2",
"version": "1.16.3",
"homepage": "https://polykey.com",
"author": "Matrix AI",
"contributors": [
Expand Down
4 changes: 2 additions & 2 deletions src/PolykeyAgent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -806,7 +806,7 @@ class PolykeyAgent {
if (nodeId == null) {
utils.never(`failed to decode NodeId "${nodeIdEncoded}"`);
}
const setNodeProm = this.nodeManager.setNode(
const setNodeP = this.nodeManager.setNode(
nodeId,
optionsDefaulted.seedNodes[nodeIdEncoded],
{
Expand All @@ -816,7 +816,7 @@ class PolykeyAgent {
},
true,
);
setNodeProms.push(setNodeProm);
setNodeProms.push(setNodeP);
}
await Promise.all(setNodeProms);
await this.nodeGraph.start({ fresh });
Expand Down
3 changes: 2 additions & 1 deletion src/audit/Audit.ts
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ class Audit {
promise.cancel(new auditErrors.ErrorAuditNotRunning());
}
}
await Promise.all([...this.taskPromises]).catch(() => {});
await Promise.allSettled([...this.taskPromises]);
this.logger.info(`Stopped ${this.constructor.name}`);
}

Expand Down Expand Up @@ -333,6 +333,7 @@ class Audit {
resolveBlockP = resolveP;
blockPSignal = signal;
});
void blockP.catch(() => {});
this.taskPromises.add(blockP);

const iterator = this.getAuditEvents(topicPath, {
Expand Down
7 changes: 4 additions & 3 deletions src/identities/IdentitiesManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,8 @@ class IdentitiesManager {
);
}
// Create identity claim on our node
const publishedClaimProm = promise<IdentitySignedClaim>();
const { p: publishedClaimP, resolveP: publishedClaimResolveP } =
promise<IdentitySignedClaim>();
await this.db.withTransactionF((tran) =>
this.sigchain.addClaim(
{
Expand All @@ -281,7 +282,7 @@ class IdentitiesManager {
identityId,
claim,
);
publishedClaimProm.resolveP(identitySignedClaim);
publishedClaimResolveP(identitySignedClaim);
// Append the ProviderIdentityClaimId to the token
const payload: ClaimLinkIdentity = {
...claim.payload,
Expand All @@ -294,7 +295,7 @@ class IdentitiesManager {
tran,
),
);
const publishedClaim = await publishedClaimProm.p;
const publishedClaim = await publishedClaimP;
// Publish claim on identity
const issNodeInfo = {
nodeId: this.keyRing.getNodeId(),
Expand Down
34 changes: 19 additions & 15 deletions src/nodes/NodeConnectionManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -571,25 +571,25 @@ class NodeConnectionManager {
);
this.quicSocket.removeEventListener(EventAll.name, this.handleEventAll);

const destroyProms: Array<Promise<void>> = [];
const destroyPs: Array<Promise<void>> = [];
for (const [nodeId] of this.connections) {
// It exists so we want to destroy it
const destroyProm = this.destroyConnection(
const destroyP = this.destroyConnection(
IdInternal.fromString<NodeId>(nodeId),
force,
);
destroyProms.push(destroyProm);
destroyPs.push(destroyP);
}
await Promise.all(destroyProms);
const signallingProms: Array<PromiseCancellable<void> | Promise<void>> = [];
await Promise.all(destroyPs);
const signallingPs: Array<PromiseCancellable<void> | Promise<void>> = [];
for (const [, activePunch] of this.activeHolePunchPs) {
signallingProms.push(activePunch);
signallingPs.push(activePunch);
activePunch.cancel();
}
for (const activeSignal of this.activeSignalFinalPs) {
signallingProms.push(activeSignal);
signallingPs.push(activeSignal);
}
await Promise.allSettled(signallingProms);
await Promise.allSettled(signallingPs);
await this.quicServer.stop({ force: true });
await this.quicSocket.stop({ force: true });
await this.rpcServer.stop({ force: true });
Expand Down Expand Up @@ -1113,13 +1113,13 @@ class NodeConnectionManager {
): Promise<void> {
// We need to send a random data packet to the target until the process times out or a connection is established
let ended = false;
const endedProm = utils.promise();
const { p: endedP, resolveP: endedResolveP } = utils.promise();
if (ctx.signal.aborted) {
endedProm.resolveP();
endedResolveP();
}
const onAbort = () => {
ended = true;
endedProm.resolveP();
endedResolveP();
ctx.signal.removeEventListener('abort', onAbort);
};
ctx.signal.addEventListener('abort', onAbort);
Expand All @@ -1134,7 +1134,7 @@ class NodeConnectionManager {
await this.quicSocket
.send(Buffer.from(message), port, host)
.catch(() => {});
await Promise.race([utils.sleep(delay), endedProm.p]);
await Promise.race([utils.sleep(delay), endedP]);
if (ended) break;
delay *= 2;
}
Expand Down Expand Up @@ -1243,6 +1243,8 @@ class NodeConnectionManager {
});
},
);
// Prevent promise rejection leak
void holePunchAttempt.catch(() => {});
this.activeHolePunchPs.set(id, holePunchAttempt);
}

Expand Down Expand Up @@ -1299,7 +1301,7 @@ class NodeConnectionManager {
this.keyRing.keyPair,
data,
);
const connProm = this.withConnF(targetNodeId, async (conn) => {
const connectionSignalP = this.withConnF(targetNodeId, async (conn) => {
const client = conn.getClient();
await client.methods.nodesConnectionSignalFinal({
sourceNodeIdEncoded: nodesUtils.encodeNodeId(sourceNodeId),
Expand All @@ -1325,9 +1327,11 @@ class NodeConnectionManager {
},
)
.finally(() => {
this.activeSignalFinalPs.delete(connProm);
this.activeSignalFinalPs.delete(connectionSignalP);
});
this.activeSignalFinalPs.add(connProm);
// Preventing promise rejection leak.
connectionSignalP.catch(() => {});
this.activeSignalFinalPs.add(connectionSignalP);
return {
host,
port,
Expand Down
1 change: 1 addition & 0 deletions src/nodes/NodeConnectionQueue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ export class NodeConnectionQueue {
ctx: ContextCancellable,
): Promise<void> {
const abortP = utils.signalPromise(ctx.signal);
void abortP.catch(() => {});
try {
while (
!this.connectionMade &&
Expand Down
20 changes: 11 additions & 9 deletions src/nodes/NodeManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -290,12 +290,12 @@ class NodeManager {
const task = await this.updateRefreshBucketDelay(i, 0, false);
refreshBuckets.push(task.promise());
}
const signalProm = utils.signalPromise(ctx.signal);
await Promise.race([Promise.all(refreshBuckets), signalProm]).finally(
const signalP = utils.signalPromise(ctx.signal);
await Promise.race([Promise.all(refreshBuckets), signalP]).finally(
async () => {
// Clean up signal promise when done
signalProm.cancel();
await signalProm;
signalP.cancel();
await signalP;
},
);
};
Expand Down Expand Up @@ -1424,8 +1424,9 @@ class NodeManager {
};

// Now we want to send our own claim signed
const halfSignedClaimProm = utils.promise<SignedTokenEncoded>();
const claimProm = this.sigchain.addClaim(
const { p: halfSignedClaimP, resolveP: halfSignedClaimResolveP } =
utils.promise<SignedTokenEncoded>();
const claimP = this.sigchain.addClaim(
{
typ: 'ClaimLinkNode',
iss: nodesUtils.encodeNodeId(requestingNodeId),
Expand All @@ -1436,7 +1437,7 @@ class NodeManager {
const halfSignedClaim = token.toSigned();
const halfSignedClaimEncoded =
claimsUtils.generateSignedClaim(halfSignedClaim);
halfSignedClaimProm.resolveP(halfSignedClaimEncoded);
halfSignedClaimResolveP(halfSignedClaimEncoded);
const readStatus = await input.next();
if (readStatus.done) {
throw new claimsErrors.ErrorEmptyStream();
Expand All @@ -1462,10 +1463,11 @@ class NodeManager {
return fullySignedToken;
},
);
void claimP.catch(() => {});
yield {
signedTokenEncoded: await halfSignedClaimProm.p,
signedTokenEncoded: await halfSignedClaimP,
};
const [, claim] = await claimProm;
const [, claim] = await claimP;
// With the claim created we want to add it to the gestalt graph
const issNodeInfo = {
nodeId: requestingNodeId,
Expand Down
4 changes: 3 additions & 1 deletion src/nodes/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,7 @@ function generateRandomNodeIdForBucket(
* This is generally used to check the connection has failed
* before cleaning it up.
*/
// FIXME: need to include ErrorQUICConnectionIdleTimeout error in this list.
function isConnectionError(e): boolean {
return (
e instanceof nodesErrors.ErrorNodeConnectionDestroyed ||
Expand All @@ -323,7 +324,8 @@ function isConnectionError(e): boolean {
e instanceof quicErrors.ErrorQUICConnectionPeer ||
e instanceof quicErrors.ErrorQUICConnectionLocal ||
e instanceof quicErrors.ErrorQUICConnectionNotRunning ||
e instanceof quicErrors.ErrorQUICConnectionStopping
e instanceof quicErrors.ErrorQUICConnectionStopping ||
e instanceof quicErrors.ErrorQUICConnectionIdleTimeout
);
}

Expand Down
Loading