Skip to content

Commit

Permalink
add locks
Browse files Browse the repository at this point in the history
  • Loading branch information
ajuvercr committed Sep 10, 2024
1 parent ca6832c commit 8693fff
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 18 deletions.
41 changes: 41 additions & 0 deletions eslint.config.mjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import typescriptEslint from "@typescript-eslint/eslint-plugin";
import globals from "globals";
import tsParser from "@typescript-eslint/parser";
import path from "node:path";
import { fileURLToPath } from "node:url";
import js from "@eslint/js";
import { FlatCompat } from "@eslint/eslintrc";

const __filename = fileURLToPath(import.meta.url);
const __dirname = path.dirname(__filename);
const compat = new FlatCompat({
baseDirectory: __dirname,
recommendedConfig: js.configs.recommended,
allConfig: js.configs.all
});

export default [{
ignores: ["**/lib", "**/node_modules"],
}, ...compat.extends("eslint:recommended", "plugin:@typescript-eslint/recommended", "prettier"), {
plugins: {
"@typescript-eslint": typescriptEslint,
},

languageOptions: {
globals: {
...globals.browser,
},

parser: tsParser,
ecmaVersion: "latest",
sourceType: "module",
},

rules: {
indent: ["error", 4],
"linebreak-style": ["error", "unix"],
quotes: ["error", "double"],
semi: ["error", "always"],
"@typescript-eslint/no-unused-vars": "warn",
},
}];
52 changes: 43 additions & 9 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@rdfc/sds-storage-writer-ts",
"version": "1.0.0-alpha.0",
"version": "1.0.0-alpha.1",
"description": "An RDF-Connect processor to write SDS streams into a given storage system",
"main": "lib/index.js",
"scripts": {
Expand All @@ -27,6 +27,8 @@
"license": "MIT",
"devDependencies": {
"@ajuvercr/ts-transformer-inline-file": "^0.2.0",
"@eslint/eslintrc": "^3.1.0",
"@eslint/js": "^9.10.0",
"@jest/globals": "^29.7.0",
"@rdfc/js-runner": "^1.0.0-alpha.0",
"@types/n3": "^1.16.5",
Expand All @@ -35,6 +37,7 @@
"@typescript-eslint/parser": "^8.3.0",
"eslint": "^9.9.1",
"eslint-config-prettier": "^9.1.0",
"globals": "^15.9.0",
"husky": "^9.1.5",
"lint-staged": "^15.2.9",
"mongodb-memory-server": "^10.0.0",
Expand All @@ -45,6 +48,7 @@
},
"dependencies": {
"@treecg/types": "^0.4.6",
"async-await-mutex-lock": "^1.0.11",
"mongodb": "^6.8.0",
"n3": "^1.21.0",
"rdf-canonize": "^4.0.1",
Expand Down
33 changes: 25 additions & 8 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import { getLoggerFor } from "./utils/logUtil";
import { Extract, Extractor, RdfThing } from "./extractor";
/* @ts-expect-error no type declaration available */
import canonize from "rdf-canonize";
import { Lock } from "async-await-mutex-lock";

const logger = getLoggerFor("ingest");

Expand Down Expand Up @@ -124,7 +125,7 @@ function emptyBuckets(
},
});
}

const lock = new Lock();
async function handleRecords(
extract: Extract,
collection: Collection<DataRecord>,
Expand All @@ -150,7 +151,12 @@ async function handleRecords(
}),
);

await collection.bulkWrite(bulkUpdate);
await lock.acquire("memberCollection");
try {
await collection.bulkWrite(bulkUpdate);
} finally {
lock.release("memberCollection");
}

// Add this payload as a member to the correct buckets
for (const rec of records) {
Expand Down Expand Up @@ -336,11 +342,17 @@ async function setup_metadata(
}

const ser = new Writer().quadsToString(streamMember);
await metaCollection.updateOne(
{ type: SDS.Stream, id: streamId.value },
{ $set: { value: ser } },
{ upsert: true },
);

await lock.acquire("metaCollection");
try {
await metaCollection.updateOne(
{ type: SDS.Stream, id: streamId.value },
{ $set: { value: ser } },
{ upsert: true },
);
} finally {
lock.release("metaCollection");
}
}
};

Expand Down Expand Up @@ -411,7 +423,12 @@ export async function ingest(
handleBuckets(extract, indexOperations);
await handleRelations(extract, indexOperations);

await indexCollection.bulkWrite(indexOperations);
await lock.acquire("indexCollection");
try {
await indexCollection.bulkWrite(indexOperations);
} finally {
lock.release("indexCollection");
}
});

logger.debug("Attached data handler");
Expand Down

0 comments on commit 8693fff

Please sign in to comment.