Skip to content

Commit

Permalink
fix: fixing tests
Browse files Browse the repository at this point in the history
  • Loading branch information
tegefaulkes committed Jun 6, 2022
1 parent 748b477 commit aa29fa9
Show file tree
Hide file tree
Showing 14 changed files with 154 additions and 165 deletions.
4 changes: 2 additions & 2 deletions src/PolykeyAgent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -690,6 +690,7 @@ class PolykeyAgent {
this.logger.info(`Started ${this.constructor.name}`);
} catch (e) {
this.logger.warn(`Failed Starting ${this.constructor.name}`);
this.events.removeAllListeners();
await this.status?.beginStop({ pid: process.pid });
await this.sessionManager?.stop();
await this.notificationsManager?.stop();
Expand All @@ -706,7 +707,6 @@ class PolykeyAgent {
await this.keyManager?.stop();
await this.schema?.stop();
await this.status?.stop({});
this.events.removeAllListeners();
throw e;
}
}
Expand All @@ -716,6 +716,7 @@ class PolykeyAgent {
*/
public async stop() {
this.logger.info(`Stopping ${this.constructor.name}`);
this.events.removeAllListeners();
await this.status.beginStop({ pid: process.pid });
await this.sessionManager.stop();
await this.notificationsManager.stop();
Expand All @@ -736,7 +737,6 @@ class PolykeyAgent {
await this.keyManager.stop();
await this.schema.stop();
await this.status.stop({});
this.events.removeAllListeners();
this.logger.info(`Stopped ${this.constructor.name}`);
}

Expand Down
2 changes: 1 addition & 1 deletion src/nodes/NodeConnectionManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -603,7 +603,7 @@ class NodeConnectionManager {
);
} else {
try {
// FIXME: no tran neededawait this.nodeManager?.setNode(nodeId, nodeData.address);
await this.nodeManager?.setNode(nodeId, nodeData.address);
} catch (e) {
if (!(e instanceof nodesErrors.ErrorNodeGraphSameNodeId)) throw e;
}
Expand Down
119 changes: 53 additions & 66 deletions src/nodes/NodeGraph.ts
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ class NodeGraph {
},
this.nodeGraphBucketsDbPath,
)) {
const { nodeId } = nodesUtils.parseBucketsDbKey(key as unknown as Buffer);
const { nodeId } = nodesUtils.parseBucketsDbKey(key as Array<Buffer>);
yield [nodeId, nodeData];
}
}
Expand All @@ -224,45 +224,37 @@ class NodeGraph {

const [bucketIndex, bucketKey] = this.bucketIndex(nodeId);
const lastUpdatedPath = [...this.nodeGraphLastUpdatedDbPath, bucketKey];
const bucketPath = [...this.nodeGraphBucketsDbPath, bucketKey];
const nodeData = await tran.get<NodeData>([
...bucketPath,
nodesUtils.bucketDbKey(nodeId),
]);
const nodeIdKey = nodesUtils.bucketDbKey(nodeId);
const bucketPath = [...this.nodeGraphBucketsDbPath, bucketKey, nodeIdKey];
const nodeData = await tran.get<NodeData>(bucketPath);
if (nodeData != null) {
this.logger.debug(
`Updating node ${nodesUtils.encodeNodeId(
nodeId,
)} in bucket ${bucketIndex}`,
);
// If the node already exists we want to remove the old `lastUpdated`
const lastUpdatedKey = nodesUtils.lastUpdatedBucketDbKey(
nodeData.lastUpdated,
nodeId,
);
await tran.del([...lastUpdatedPath, lastUpdatedKey]);
const lastUpdatedKey = nodesUtils.lastUpdatedKey(nodeData.lastUpdated);
await tran.del([...lastUpdatedPath, lastUpdatedKey, nodeIdKey]);
} else {
this.logger.debug(
`Adding node ${nodesUtils.encodeNodeId(
nodeId,
)} to bucket ${bucketIndex}`,
);
// It didn't exist so we want to increment the bucket count
// It didn't exist, so we want to increment the bucket count
const count = await this.getBucketMetaProp(bucketIndex, 'count', tran);
await this.setBucketMetaProp(bucketIndex, 'count', count + 1, tran);
}
const lastUpdated = getUnixtime();
await tran.put([...bucketPath, nodesUtils.bucketDbKey(nodeId)], {
await tran.put(bucketPath, {
address: nodeAddress,
lastUpdated,
});
const newLastUpdatedKey = nodesUtils.lastUpdatedBucketDbKey(
lastUpdated,
nodeId,
);
const newLastUpdatedKey = nodesUtils.lastUpdatedKey(lastUpdated);
await tran.put(
[...lastUpdatedPath, newLastUpdatedKey],
nodesUtils.bucketDbKey(nodeId),
[...lastUpdatedPath, newLastUpdatedKey, nodeIdKey],
nodeIdKey,
true,
);
}
Expand All @@ -286,7 +278,7 @@ class NodeGraph {
bucketKey,
])) {
const { nodeId } = nodesUtils.parseLastUpdatedBucketDbKey(
key as unknown as Buffer,
key as Array<Buffer>,
);
oldestNodeIds.push(nodeId);
}
Expand All @@ -304,10 +296,8 @@ class NodeGraph {
const [bucketIndex, bucketKey] = this.bucketIndex(nodeId);
const bucketPath = [...this.nodeGraphBucketsDbPath, bucketKey];
const lastUpdatedPath = [...this.nodeGraphLastUpdatedDbPath, bucketKey];
const nodeData = await tran.get<NodeData>([
...bucketPath,
nodesUtils.bucketDbKey(nodeId),
]);
const nodeIdKey = nodesUtils.bucketDbKey(nodeId);
const nodeData = await tran.get<NodeData>([...bucketPath, nodeIdKey]);
if (nodeData != null) {
this.logger.debug(
`Removing node ${nodesUtils.encodeNodeId(
Expand All @@ -316,12 +306,9 @@ class NodeGraph {
);
const count = await this.getBucketMetaProp(bucketIndex, 'count', tran);
await this.setBucketMetaProp(bucketIndex, 'count', count - 1, tran);
await tran.del([...bucketPath, nodesUtils.bucketDbKey(nodeId)]);
const lastUpdatedKey = nodesUtils.lastUpdatedBucketDbKey(
nodeData.lastUpdated,
nodeId,
);
await tran.del([...lastUpdatedPath, lastUpdatedKey]);
await tran.del([...bucketPath, nodeIdKey]);
const lastUpdatedKey = nodesUtils.lastUpdatedKey(nodeData.lastUpdated);
await tran.del([...lastUpdatedPath, lastUpdatedKey, nodeIdKey]);
}
}

Expand Down Expand Up @@ -359,7 +346,7 @@ class NodeGraph {
},
[...this.nodeGraphBucketsDbPath, bucketKey],
)) {
const nodeId = nodesUtils.parseBucketDbKey(key as unknown as Buffer);
const nodeId = nodesUtils.parseBucketDbKey(key[0] as Buffer);
bucket.push([nodeId, nodeData]);
}
if (sort === 'distance') {
Expand Down Expand Up @@ -434,7 +421,7 @@ class NodeGraph {
this.nodeGraphBucketsDbPath,
)) {
const { bucketIndex: bucketIndex_, nodeId } =
nodesUtils.parseBucketsDbKey(key as unknown as Buffer);
nodesUtils.parseBucketsDbKey(key as Array<Buffer>);
if (bucketIndex == null) {
// First entry of the first bucket
bucketIndex = bucketIndex_;
Expand Down Expand Up @@ -480,8 +467,8 @@ class NodeGraph {
this.nodeGraphLastUpdatedDbPath,
)) {
const { bucketIndex: bucketIndex_, nodeId } =
nodesUtils.parseLastUpdatedBucketsDbKey(key as unknown as Buffer);
bucketsDbIterator.seek(nodesUtils.bucketsDbKey(bucketIndex_, nodeId));
nodesUtils.parseLastUpdatedBucketsDbKey(key as Array<Buffer>);
bucketsDbIterator.seek([key[0], key[2]]);
// @ts-ignore
// eslint-disable-next-line
const iteratorResult = await bucketsDbIterator.next();
Expand Down Expand Up @@ -523,8 +510,10 @@ class NodeGraph {
);
}

const logger = this.logger.getChild('resetBuckets');
// Setup new space
const spaceNew = this.space === '0' ? '1' : '0';
logger.debug('new space: ' + spaceNew);
const nodeGraphMetaDbPathNew = [...this.nodeGraphDbPath, 'meta' + spaceNew];
const nodeGraphBucketsDbPathNew = [
...this.nodeGraphDbPath,
Expand All @@ -545,10 +534,16 @@ class NodeGraph {
this.nodeGraphBucketsDbPath,
)) {
// The key is a combined bucket key and node ID
const { nodeId } = nodesUtils.parseBucketsDbKey(key as unknown as Buffer);
const { bucketIndex: bucketIndexOld, nodeId } =
nodesUtils.parseBucketsDbKey(key as Array<Buffer>);
const nodeIdEncoded = nodesUtils.encodeNodeId(nodeId);
const nodeIdKey = nodesUtils.bucketDbKey(nodeId);
// If the new own node ID is one of the existing node IDs, it is just dropped
// We only map to the new bucket if it isn't one of the existing node IDs
if (nodeId.equals(nodeIdOwn)) {
logger.debug(
`nodeId ${nodeIdEncoded} from bucket ${bucketIndexOld} was identical to new NodeId and was dropped.`,
);
continue;
}
const bucketIndexNew = nodesUtils.bucketIndex(nodeIdOwn, nodeId);
Expand All @@ -560,36 +555,37 @@ class NodeGraph {
if (countNew < this.nodeBucketLimit) {
await tran.put([...metaPathNew, 'count'], countNew + 1);
} else {
let oldestIndexKey: Buffer | undefined = undefined;
let oldestIndexKey: Array<Buffer> | undefined = undefined;
let oldestNodeId: NodeId | undefined = undefined;
for await (const [key] of tran.iterator(
{
limit: 1,
},
indexPathNew,
)) {
oldestIndexKey = key as unknown as Buffer;
oldestIndexKey = key as Array<Buffer>;
({ nodeId: oldestNodeId } = nodesUtils.parseLastUpdatedBucketDbKey(
key as unknown as Buffer,
key as Array<Buffer>,
));
}
await tran.del([
...bucketPathNew,
nodesUtils.bucketDbKey(oldestNodeId!),
]);
await tran.del([...indexPathNew, oldestIndexKey!]);
await tran.del([...indexPathNew, ...oldestIndexKey!]);
}
if (bucketIndexOld !== bucketIndexNew) {
logger.debug(
`nodeId ${nodeIdEncoded} moved ${bucketIndexOld}=>${bucketIndexNew}`,
);
} else {
logger.debug(`nodeId ${nodeIdEncoded} unchanged ${bucketIndexOld}`);
}
await tran.put([...bucketPathNew, nodeIdKey], nodeData);
const lastUpdatedKey = nodesUtils.lastUpdatedKey(nodeData.lastUpdated);
await tran.put(
[...bucketPathNew, nodesUtils.bucketDbKey(nodeId)],
nodeData,
);
const lastUpdatedKey = nodesUtils.lastUpdatedBucketDbKey(
nodeData.lastUpdated,
nodeId,
);
await tran.put(
[...indexPathNew, lastUpdatedKey],
nodesUtils.bucketDbKey(nodeId),
[...indexPathNew, lastUpdatedKey, nodeIdKey],
nodeIdKey,
true,
);
}
Expand Down Expand Up @@ -683,6 +679,8 @@ class NodeGraph {
* current node has less than k nodes in all of its buckets, in which case it
* returns all nodes it has knowledge of)
*/
// FIXME: this is still operating on assumptions from old code.
// I can't get the gt/lt to work on the iterator.
@ready(new nodesErrors.ErrorNodeGraphNotRunning())
public async getClosestNodes(
nodeId: NodeId,
Expand Down Expand Up @@ -721,55 +719,44 @@ class NodeGraph {
// We can just use `!(lexpack bucketId)` to start from
// Less than `!(bucketId 101)!` gets us buckets 100 and lower
// greater than `!(bucketId 99)!` gets up buckets 100 and greater
const prefix = Buffer.from([33]); // Code for `!` prefix
if (nodeIds.length < limit) {
// Just before target bucket
const bucketId = Buffer.from(nodesUtils.bucketKey(startingBucket));
const endKeyLower = Buffer.concat([prefix, bucketId, prefix]);
const bucketIdKey = Buffer.from(nodesUtils.bucketKey(startingBucket));
const remainingLimit = limit - nodeIds.length;
// Iterate over lower buckets
tran.iterator<NodeData>(
{
lt: endKeyLower,
limit: remainingLimit,
valueAsBuffer: false,
},
this.nodeGraphBucketsDbPath,
);
for await (const [key, nodeData] of tran.iterator<NodeData>(
{
lt: endKeyLower,
lt: [bucketIdKey, ''],
limit: remainingLimit,
valueAsBuffer: false,
},
this.nodeGraphBucketsDbPath,
)) {
const info = nodesUtils.parseBucketsDbKey(key as unknown as Buffer);
const info = nodesUtils.parseBucketsDbKey(key as Array<Buffer>);
nodeIds.push([info.nodeId, nodeData]);
}
}
if (nodeIds.length < limit) {
// Just after target bucket
const bucketId = Buffer.from(nodesUtils.bucketKey(startingBucket + 1));
const startKeyUpper = Buffer.concat([prefix, bucketId, prefix]);
const remainingLimit = limit - nodeIds.length;
// Iterate over ids further away
tran.iterator(
{
gt: startKeyUpper,
gt: [bucketId, ''],
limit: remainingLimit,
},
this.nodeGraphBucketsDbPath,
);
for await (const [key, nodeData] of tran.iterator<NodeData>(
{
gt: startKeyUpper,
gt: [bucketId, ''],
limit: remainingLimit,
valueAsBuffer: false,
},
this.nodeGraphBucketsDbPath,
)) {
const info = nodesUtils.parseBucketsDbKey(key as unknown as Buffer);
const info = nodesUtils.parseBucketsDbKey(key as Array<Buffer>);
nodeIds.push([info.nodeId, nodeData]);
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/nodes/NodeManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ class NodeManager {
*/
public async getBucket(
bucketIndex: number,
tran: DBTransaction,
tran?: DBTransaction,
): Promise<NodeBucket | undefined> {
return await this.nodeGraph.getBucket(
bucketIndex,
Expand All @@ -400,7 +400,7 @@ class NodeManager {
* @param block - Flag for if the operation should block or utilize the async queue
* @param force - Flag for if we want to add the node without authenticating or if the bucket is full.
* This will drop the oldest node in favor of the new.
* @param timeout Connection timeout timeout
* @param timeout Connection timeout
* @param tran
*/
@ready(new nodesErrors.ErrorNodeManagerNotRunning())
Expand Down
Loading

0 comments on commit aa29fa9

Please sign in to comment.