From 1433726cbf25023502ed39a1a329001fa2981342 Mon Sep 17 00:00:00 2001 From: Zeeshan Akram <97m.zeeshan@gmail.com> Date: Thu, 29 Feb 2024 10:59:33 +0500 Subject: [PATCH 1/9] add schema entities and mappings to process set node operational status protobuf message --- CHANGELOG.md | 4 + ...06586630-Data.js => 1709153133577-Data.js} | 10 +- generated/schema.graphql | 230 +++++++++++++++++- package-lock.json | 60 ++--- package.json | 4 +- schema/events.graphql | 22 +- schema/storage.graphql | 50 ++++ schema/workingGroups.graphql | 5 + src/mappings/storage/metadata.ts | 103 +++++++- src/mappings/storage/utils.ts | 125 +++++++++- src/mappings/utils.ts | 5 + src/mappings/workingGroups/index.ts | 172 +++++++++++++ src/processor.ts | 25 +- typegen.json | 24 +- 14 files changed, 787 insertions(+), 52 deletions(-) rename db/migrations/{1705406586630-Data.js => 1709153133577-Data.js} (92%) create mode 100644 schema/workingGroups.graphql create mode 100644 src/mappings/workingGroups/index.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index 475847b..28391ab 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,7 @@ +# 1.5.0 + +- Updates mappings to process `NodeOperationalStatusMetadata` protobuf message. This metaprotocol message enables both storage/distribution workers & leads to set the operational status of the nodes. + # 1.4.4 diff --git a/db/migrations/1705406586630-Data.js b/db/migrations/1709153133577-Data.js similarity index 92% rename from db/migrations/1705406586630-Data.js rename to db/migrations/1709153133577-Data.js index d4a4225..f13e9a4 100644 --- a/db/migrations/1705406586630-Data.js +++ b/db/migrations/1709153133577-Data.js @@ -1,5 +1,5 @@ -module.exports = class Data1705406586630 { - name = 'Data1705406586630' +module.exports = class Data1709153133577 { + name = 'Data1709153133577' async up(db) { await db.query(`CREATE TABLE "next_entity_id" ("entity_name" character varying NOT NULL, "next_id" bigint NOT NULL, CONSTRAINT "PK_09a3b40db622a65096e7344d7ae" PRIMARY KEY ("entity_name"))`) @@ -22,13 +22,14 @@ module.exports = class Data1705406586630 { await db.query(`CREATE INDEX "IDX_ff8014300b8039dbaed764f51b" ON "storage_data_object" ("storage_bag_id") `) await db.query(`CREATE TABLE "video_subtitle" ("id" character varying NOT NULL, "mime_type" text NOT NULL, "asset_id" character varying, CONSTRAINT "PK_2ac3e585fc608e673e7fbf94d8e" PRIMARY KEY ("id"))`) await db.query(`CREATE INDEX "IDX_b6eabfb8de4128b28d73681020" ON "video_subtitle" ("asset_id") `) - await db.query(`CREATE TABLE "storage_bucket_operator_metadata" ("id" character varying NOT NULL, "storage_bucket_id" character varying NOT NULL, "node_endpoint" text, "node_location" jsonb, "extra" text, CONSTRAINT "StorageBucketOperatorMetadata_storageBucket" UNIQUE ("storage_bucket_id") DEFERRABLE INITIALLY DEFERRED, CONSTRAINT "REL_7beffc9530b3f307bc1169cb52" UNIQUE ("storage_bucket_id"), CONSTRAINT "PK_9846a397400ae1a39b21fbd02d4" PRIMARY KEY ("id"))`) + await db.query(`CREATE TABLE "storage_bucket_operator_metadata" ("id" character varying NOT NULL, "storage_bucket_id" character varying NOT NULL, "node_endpoint" text, "node_location" jsonb, "node_operational_status" jsonb, "extra" text, CONSTRAINT "StorageBucketOperatorMetadata_storageBucket" UNIQUE ("storage_bucket_id") DEFERRABLE INITIALLY DEFERRED, CONSTRAINT "REL_7beffc9530b3f307bc1169cb52" UNIQUE ("storage_bucket_id"), CONSTRAINT "PK_9846a397400ae1a39b21fbd02d4" PRIMARY KEY ("id"))`) await db.query(`CREATE INDEX "IDX_7beffc9530b3f307bc1169cb52" ON "storage_bucket_operator_metadata" ("storage_bucket_id") `) await db.query(`CREATE TABLE "distribution_bucket_family_metadata" ("id" character varying NOT NULL, "family_id" character varying NOT NULL, "region" text, "description" text, "areas" jsonb, "latency_test_targets" text array, CONSTRAINT "DistributionBucketFamilyMetadata_family" UNIQUE ("family_id") DEFERRABLE INITIALLY DEFERRED, CONSTRAINT "REL_dd93ca0ea24f3e7a02f11c4c14" UNIQUE ("family_id"), CONSTRAINT "PK_df7a270835bb313d3ef17bdee2f" PRIMARY KEY ("id"))`) await db.query(`CREATE INDEX "IDX_dd93ca0ea24f3e7a02f11c4c14" ON "distribution_bucket_family_metadata" ("family_id") `) await db.query(`CREATE INDEX "IDX_5510d3b244a63d6ec702faa426" ON "distribution_bucket_family_metadata" ("region") `) - await db.query(`CREATE TABLE "distribution_bucket_operator_metadata" ("id" character varying NOT NULL, "distirbution_bucket_operator_id" character varying NOT NULL, "node_endpoint" text, "node_location" jsonb, "extra" text, CONSTRAINT "DistributionBucketOperatorMetadata_distirbutionBucketOperator" UNIQUE ("distirbution_bucket_operator_id") DEFERRABLE INITIALLY DEFERRED, CONSTRAINT "REL_69ec9bdc975b95f7dff94a7106" UNIQUE ("distirbution_bucket_operator_id"), CONSTRAINT "PK_9bbecaa12f30e3826922688274f" PRIMARY KEY ("id"))`) + await db.query(`CREATE TABLE "distribution_bucket_operator_metadata" ("id" character varying NOT NULL, "distirbution_bucket_operator_id" character varying NOT NULL, "node_endpoint" text, "node_location" jsonb, "node_operational_status" jsonb, "extra" text, CONSTRAINT "DistributionBucketOperatorMetadata_distirbutionBucketOperator" UNIQUE ("distirbution_bucket_operator_id") DEFERRABLE INITIALLY DEFERRED, CONSTRAINT "REL_69ec9bdc975b95f7dff94a7106" UNIQUE ("distirbution_bucket_operator_id"), CONSTRAINT "PK_9bbecaa12f30e3826922688274f" PRIMARY KEY ("id"))`) await db.query(`CREATE INDEX "IDX_69ec9bdc975b95f7dff94a7106" ON "distribution_bucket_operator_metadata" ("distirbution_bucket_operator_id") `) + await db.query(`CREATE TABLE "worker" ("id" character varying NOT NULL, CONSTRAINT "PK_dc8175fa0e34ce7a39e4ec73b94" PRIMARY KEY ("id"))`) await db.query(`ALTER TABLE "storage_bucket_bag" ADD CONSTRAINT "FK_791e2f82e3919ffcef8712aa1b9" FOREIGN KEY ("storage_bucket_id") REFERENCES "storage_bucket"("id") ON DELETE NO ACTION ON UPDATE NO ACTION DEFERRABLE INITIALLY DEFERRED`) await db.query(`ALTER TABLE "storage_bucket_bag" ADD CONSTRAINT "FK_aaf00b2c7d0cba49f97da14fbba" FOREIGN KEY ("bag_id") REFERENCES "storage_bag"("id") ON DELETE NO ACTION ON UPDATE NO ACTION DEFERRABLE INITIALLY DEFERRED`) await db.query(`ALTER TABLE "distribution_bucket_operator" ADD CONSTRAINT "FK_678dc5427cdde0cd4fef2c07a43" FOREIGN KEY ("distribution_bucket_id") REFERENCES "distribution_bucket"("id") ON DELETE NO ACTION ON UPDATE NO ACTION DEFERRABLE INITIALLY DEFERRED`) @@ -70,6 +71,7 @@ module.exports = class Data1705406586630 { await db.query(`DROP INDEX "public"."IDX_5510d3b244a63d6ec702faa426"`) await db.query(`DROP TABLE "distribution_bucket_operator_metadata"`) await db.query(`DROP INDEX "public"."IDX_69ec9bdc975b95f7dff94a7106"`) + await db.query(`DROP TABLE "worker"`) await db.query(`ALTER TABLE "storage_bucket_bag" DROP CONSTRAINT "FK_791e2f82e3919ffcef8712aa1b9"`) await db.query(`ALTER TABLE "storage_bucket_bag" DROP CONSTRAINT "FK_aaf00b2c7d0cba49f97da14fbba"`) await db.query(`ALTER TABLE "distribution_bucket_operator" DROP CONSTRAINT "FK_678dc5427cdde0cd4fef2c07a43"`) diff --git a/generated/schema.graphql b/generated/schema.graphql index ccae9c0..9831953 100644 --- a/generated/schema.graphql +++ b/generated/schema.graphql @@ -373,6 +373,9 @@ type DistributionBucketOperatorMetadata { """Optional node location metadata""" nodeLocation: NodeLocationMetadata + """Optional node operational status""" + nodeOperationalStatus: NodeOperationalStatus + """Additional information about the node/operator""" extra: String } @@ -417,6 +420,26 @@ enum DistributionBucketOperatorMetadataOrderByInput { nodeLocation_city_DESC nodeLocation_city_ASC_NULLS_FIRST nodeLocation_city_DESC_NULLS_LAST + nodeOperationalStatus_rationale_ASC + nodeOperationalStatus_rationale_DESC + nodeOperationalStatus_rationale_ASC_NULLS_FIRST + nodeOperationalStatus_rationale_DESC_NULLS_LAST + nodeOperationalStatus_forced_ASC + nodeOperationalStatus_forced_DESC + nodeOperationalStatus_forced_ASC_NULLS_FIRST + nodeOperationalStatus_forced_DESC_NULLS_LAST + nodeOperationalStatus_from_ASC + nodeOperationalStatus_from_DESC + nodeOperationalStatus_from_ASC_NULLS_FIRST + nodeOperationalStatus_from_DESC_NULLS_LAST + nodeOperationalStatus_to_ASC + nodeOperationalStatus_to_DESC + nodeOperationalStatus_to_ASC_NULLS_FIRST + nodeOperationalStatus_to_DESC_NULLS_LAST + nodeOperationalStatus_isTypeOf_ASC + nodeOperationalStatus_isTypeOf_DESC + nodeOperationalStatus_isTypeOf_ASC_NULLS_FIRST + nodeOperationalStatus_isTypeOf_DESC_NULLS_LAST extra_ASC extra_DESC extra_ASC_NULLS_FIRST @@ -462,6 +485,8 @@ input DistributionBucketOperatorMetadataWhereInput { nodeEndpoint_not_endsWith: String nodeLocation_isNull: Boolean nodeLocation: NodeLocationMetadataWhereInput + nodeOperationalStatus_isNull: Boolean + nodeOperationalStatus: NodeOperationalStatusWhereInput extra_isNull: Boolean extra_eq: String extra_not_eq: String @@ -651,6 +676,14 @@ input DistributionBucketWhereInput { OR: [DistributionBucketWhereInput!] } +type DistributionNodeOperationalStatusSetEvent { + """Distribution bucket operator""" + bucketOperator: DistributionBucketOperator + + """Operational status that was set""" + operationalStatus: NodeOperationalStatus! +} + type Event { """{blockNumber}-{indexInBlock}""" id: String! @@ -671,7 +704,7 @@ type Event { data: EventData! } -union EventData = MetaprotocolTransactionStatusEventData | DataObjectDeletedEventData +union EventData = MetaprotocolTransactionStatusEventData | DataObjectDeletedEventData | StorageNodeOperationalStatusSetEvent | DistributionNodeOperationalStatusSetEvent input EventDataWhereInput { result_isNull: Boolean @@ -693,6 +726,12 @@ input EventDataWhereInput { dataObjectId_not_startsWith: String dataObjectId_endsWith: String dataObjectId_not_endsWith: String + storageBucket_isNull: Boolean + storageBucket: StorageBucketWhereInput + operationalStatus_isNull: Boolean + operationalStatus: NodeOperationalStatusWhereInput + bucketOperator_isNull: Boolean + bucketOperator: DistributionBucketOperatorWhereInput isTypeOf_isNull: Boolean isTypeOf_eq: String isTypeOf_not_eq: String @@ -975,6 +1014,113 @@ input NodeLocationMetadataWhereInput { coordinates: GeoCoordinatesWhereInput } +union NodeOperationalStatus = NodeOperationalStatusNormal | NodeOperationalStatusNoService | NodeOperationalStatusNoServiceFrom | NodeOperationalStatusNoServiceDuring + +type NodeOperationalStatusNormal { + """Reason why node was set to this state""" + rationale: String +} + +type NodeOperationalStatusNoService { + """ + Whether the state was set by lead (true) or by the operator (false), it is + meant to prevent worker from unilaterally reversing. + """ + forced: Boolean! + + """Reason why node was set to this state""" + rationale: String +} + +type NodeOperationalStatusNoServiceDuring { + """ + Whether the state was set by lead (true) or by the operator (false), it is + meant to prevent worker from unilaterally reversing. + """ + forced: Boolean! + + """The time from which the bucket would have to no service""" + from: DateTime! + + """The time until which the bucket would have to no service""" + to: DateTime! + + """Reason why node was set to this state""" + rationale: String +} + +type NodeOperationalStatusNoServiceFrom { + """ + Whether the state was set by lead (true) or by the operator (false), it is + meant to prevent worker from unilaterally reversing. + """ + forced: Boolean! + + """The time from which the bucket would have to no service""" + from: DateTime! + + """Reason why node was set to this state""" + rationale: String +} + +input NodeOperationalStatusWhereInput { + rationale_isNull: Boolean + rationale_eq: String + rationale_not_eq: String + rationale_gt: String + rationale_gte: String + rationale_lt: String + rationale_lte: String + rationale_in: [String!] + rationale_not_in: [String!] + rationale_contains: String + rationale_not_contains: String + rationale_containsInsensitive: String + rationale_not_containsInsensitive: String + rationale_startsWith: String + rationale_not_startsWith: String + rationale_endsWith: String + rationale_not_endsWith: String + forced_isNull: Boolean + forced_eq: Boolean + forced_not_eq: Boolean + from_isNull: Boolean + from_eq: DateTime + from_not_eq: DateTime + from_gt: DateTime + from_gte: DateTime + from_lt: DateTime + from_lte: DateTime + from_in: [DateTime!] + from_not_in: [DateTime!] + to_isNull: Boolean + to_eq: DateTime + to_not_eq: DateTime + to_gt: DateTime + to_gte: DateTime + to_lt: DateTime + to_lte: DateTime + to_in: [DateTime!] + to_not_in: [DateTime!] + isTypeOf_isNull: Boolean + isTypeOf_eq: String + isTypeOf_not_eq: String + isTypeOf_gt: String + isTypeOf_gte: String + isTypeOf_lt: String + isTypeOf_lte: String + isTypeOf_in: [String!] + isTypeOf_not_in: [String!] + isTypeOf_contains: String + isTypeOf_not_contains: String + isTypeOf_containsInsensitive: String + isTypeOf_not_containsInsensitive: String + isTypeOf_startsWith: String + isTypeOf_not_startsWith: String + isTypeOf_endsWith: String + isTypeOf_not_endsWith: String +} + type PageInfo { hasNextPage: Boolean! hasPreviousPage: Boolean! @@ -1035,6 +1181,10 @@ type Query { distributionBucketFamilyById(id: String!): DistributionBucketFamily distributionBucketFamilyByUniqueInput(where: WhereIdInput!): DistributionBucketFamily @deprecated(reason: "Use distributionBucketFamilyById") distributionBucketFamiliesConnection(orderBy: [DistributionBucketFamilyOrderByInput!]!, after: String, first: Int, where: DistributionBucketFamilyWhereInput): DistributionBucketFamiliesConnection! + workers(where: WorkerWhereInput, orderBy: [WorkerOrderByInput!], offset: Int, limit: Int): [Worker!]! + workerById(id: String!): Worker + workerByUniqueInput(where: WhereIdInput!): Worker @deprecated(reason: "Use workerById") + workersConnection(orderBy: [WorkerOrderByInput!]!, after: String, first: Int, where: WorkerWhereInput): WorkersConnection! squidStatus: SquidStatus squidVersion: SquidVersion! } @@ -1376,6 +1526,9 @@ type StorageBucketOperatorMetadata { """Optional node location metadata""" nodeLocation: NodeLocationMetadata + """Optional node operational status""" + nodeOperationalStatus: NodeOperationalStatus + """Additional information about the node/operator""" extra: String } @@ -1432,6 +1585,26 @@ enum StorageBucketOperatorMetadataOrderByInput { nodeLocation_city_DESC nodeLocation_city_ASC_NULLS_FIRST nodeLocation_city_DESC_NULLS_LAST + nodeOperationalStatus_rationale_ASC + nodeOperationalStatus_rationale_DESC + nodeOperationalStatus_rationale_ASC_NULLS_FIRST + nodeOperationalStatus_rationale_DESC_NULLS_LAST + nodeOperationalStatus_forced_ASC + nodeOperationalStatus_forced_DESC + nodeOperationalStatus_forced_ASC_NULLS_FIRST + nodeOperationalStatus_forced_DESC_NULLS_LAST + nodeOperationalStatus_from_ASC + nodeOperationalStatus_from_DESC + nodeOperationalStatus_from_ASC_NULLS_FIRST + nodeOperationalStatus_from_DESC_NULLS_LAST + nodeOperationalStatus_to_ASC + nodeOperationalStatus_to_DESC + nodeOperationalStatus_to_ASC_NULLS_FIRST + nodeOperationalStatus_to_DESC_NULLS_LAST + nodeOperationalStatus_isTypeOf_ASC + nodeOperationalStatus_isTypeOf_DESC + nodeOperationalStatus_isTypeOf_ASC_NULLS_FIRST + nodeOperationalStatus_isTypeOf_DESC_NULLS_LAST extra_ASC extra_DESC extra_ASC_NULLS_FIRST @@ -1477,6 +1650,8 @@ input StorageBucketOperatorMetadataWhereInput { nodeEndpoint_not_endsWith: String nodeLocation_isNull: Boolean nodeLocation: NodeLocationMetadataWhereInput + nodeOperationalStatus_isNull: Boolean + nodeOperationalStatus: NodeOperationalStatusWhereInput extra_isNull: Boolean extra_eq: String extra_not_eq: String @@ -1860,6 +2035,14 @@ input StorageDataObjectWhereInput { OR: [StorageDataObjectWhereInput!] } +type StorageNodeOperationalStatusSetEvent { + """Storage Bucket""" + storageBucket: StorageBucket! + + """Operational status that was set""" + operationalStatus: NodeOperationalStatus! +} + type VideoSubtitle { """(videoId)-{type}-{language}""" id: String! @@ -1966,3 +2149,48 @@ input WhereIdInput { id: String! } +type Worker { + """Worker id ({workingGroupName}-{workerId})""" + id: String! +} + +type WorkerEdge { + node: Worker! + cursor: String! +} + +enum WorkerOrderByInput { + id_ASC + id_DESC + id_ASC_NULLS_FIRST + id_DESC_NULLS_LAST +} + +type WorkersConnection { + edges: [WorkerEdge!]! + pageInfo: PageInfo! + totalCount: Int! +} + +input WorkerWhereInput { + id_isNull: Boolean + id_eq: String + id_not_eq: String + id_gt: String + id_gte: String + id_lt: String + id_lte: String + id_in: [String!] + id_not_in: [String!] + id_contains: String + id_not_contains: String + id_containsInsensitive: String + id_not_containsInsensitive: String + id_startsWith: String + id_not_startsWith: String + id_endsWith: String + id_not_endsWith: String + AND: [WorkerWhereInput!] + OR: [WorkerWhereInput!] +} + diff --git a/package-lock.json b/package-lock.json index c0a684b..3d53d48 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,15 +1,15 @@ { "name": "storage-squid", - "version": "1.4.4", + "version": "1.5.0", "lockfileVersion": 2, "requires": true, "packages": { "": { "name": "storage-squid", - "version": "1.4.4", + "version": "1.5.0", "hasInstallScript": true, "dependencies": { - "@joystream/js": "^1.4.0", + "@joystream/js": "^1.9.0", "@opentelemetry/api": "^1.4.1", "@opentelemetry/auto-instrumentations-node": "0.37.0", "@opentelemetry/core": "1.13.0", @@ -3361,11 +3361,11 @@ "integrity": "sha512-CtzORUwWTTOTqfVtHaKRJ0I1kNQd1bpn3sUh8I3nJDVY+5/M/Oe1DnEWzPQvqq/xPIIkzzzIP7mfCoAjFRvDhg==" }, "node_modules/@joystream/js": { - "version": "1.4.0", - "resolved": "https://registry.npmjs.org/@joystream/js/-/js-1.4.0.tgz", - "integrity": "sha512-kEiKPIhsuk4B2vteAZoFYoZ+slyAiDXu1hDKMBddt1AKWfD1qxrHKAuF2cBkQIN4KpeNEJrRK9vy8G3glvaDvQ==", + "version": "1.9.0", + "resolved": "https://registry.npmjs.org/@joystream/js/-/js-1.9.0.tgz", + "integrity": "sha512-C0vSKID6BHvfwEKqvfRWkls1aVSE18i/OqzasVywPS8GRG2Kp5H7F6Yp/K5tpqbA3mZXbR2zNvxTmtHUO/V1LA==", "dependencies": { - "@joystream/metadata-protobuf": "^2.8.1", + "@joystream/metadata-protobuf": "^2.14.0", "@joystream/types": "^2.0.0", "@polkadot/util-crypto": "9.5.1", "axios": "^1.2.1", @@ -3376,7 +3376,7 @@ "protobufjs": "^6.11.3" }, "engines": { - "node": ">=14.0.0", + "node": ">=14.18.0", "yarn": "^1.22.15" } }, @@ -3386,9 +3386,9 @@ "integrity": "sha512-GKSNGeNAtw8IryjjkhZxuKB3JzlcLTwjtiQCHKvqQet81I93kXslhDQruGI/QsddO83mcDToBVy7GqGS/zYf/A==" }, "node_modules/@joystream/metadata-protobuf": { - "version": "2.8.1", - "resolved": "https://registry.npmjs.org/@joystream/metadata-protobuf/-/metadata-protobuf-2.8.1.tgz", - "integrity": "sha512-ckxvQP3RC8gKCJWU1xpXosxEfgFcChgaIncy06AZfn50x6+9mFEsxQTTogN7b1g1T026oSFYZMDq52tkBw2Zew==", + "version": "2.14.0", + "resolved": "https://registry.npmjs.org/@joystream/metadata-protobuf/-/metadata-protobuf-2.14.0.tgz", + "integrity": "sha512-1/FgUW6mGt3t+p0k7/KdKnRQPU0HghY4MJTSkip1SCTMICZWoDuSADJuM8nlh2Qdu9kqvodqtndKjz057b0t3g==", "dependencies": { "@types/iso-3166-2": "^1.0.0", "@types/long": "^4.0.1", @@ -7338,9 +7338,9 @@ } }, "node_modules/@types/iso-3166-2": { - "version": "1.0.0", - "resolved": "https://registry.npmjs.org/@types/iso-3166-2/-/iso-3166-2-1.0.0.tgz", - "integrity": "sha512-DYDyoRyPyxBeI9bYoTXLfsOZH12m1anrWEj9LU5Sl9rgsJ4soJMYf/7byozM+64Smn6/a1i079eQLGuPykwaHQ==" + "version": "1.0.3", + "resolved": "https://registry.npmjs.org/@types/iso-3166-2/-/iso-3166-2-1.0.3.tgz", + "integrity": "sha512-jOqKFSfZYVJ5ARkXH2V2RrEhLvFds1RVrFGRKKxqLU3jjExlf5LtqdFgXNd19KAMRhgsZSNjedeLDVYlGUemZg==" }, "node_modules/@types/js-yaml": { "version": "4.0.5", @@ -14492,9 +14492,9 @@ } }, "node_modules/protobufjs": { - "version": "6.11.3", - "resolved": "https://registry.npmjs.org/protobufjs/-/protobufjs-6.11.3.tgz", - "integrity": "sha512-xL96WDdCZYdU7Slin569tFX712BxsxslWwAfAhCYjQKGTq7dAU91Lomy6nLLhh/dyGhk/YH4TwTSRxTzhuHyZg==", + "version": "6.11.4", + "resolved": "https://registry.npmjs.org/protobufjs/-/protobufjs-6.11.4.tgz", + "integrity": "sha512-5kQWPaJHi1WoCpjTGszzQ32PG2F4+wRY6BmAT4Vfw56Q2FZ4YZzK20xUYQH4YkfehY1e6QSICrJquM6xXZNcrw==", "hasInstallScript": true, "dependencies": { "@protobufjs/aspromise": "^1.1.2", @@ -19220,11 +19220,11 @@ "integrity": "sha512-CtzORUwWTTOTqfVtHaKRJ0I1kNQd1bpn3sUh8I3nJDVY+5/M/Oe1DnEWzPQvqq/xPIIkzzzIP7mfCoAjFRvDhg==" }, "@joystream/js": { - "version": "1.4.0", - "resolved": "https://registry.npmjs.org/@joystream/js/-/js-1.4.0.tgz", - "integrity": "sha512-kEiKPIhsuk4B2vteAZoFYoZ+slyAiDXu1hDKMBddt1AKWfD1qxrHKAuF2cBkQIN4KpeNEJrRK9vy8G3glvaDvQ==", + "version": "1.9.0", + "resolved": "https://registry.npmjs.org/@joystream/js/-/js-1.9.0.tgz", + "integrity": "sha512-C0vSKID6BHvfwEKqvfRWkls1aVSE18i/OqzasVywPS8GRG2Kp5H7F6Yp/K5tpqbA3mZXbR2zNvxTmtHUO/V1LA==", "requires": { - "@joystream/metadata-protobuf": "^2.8.1", + "@joystream/metadata-protobuf": "^2.14.0", "@joystream/types": "0.20.5", "@polkadot/util-crypto": "9.5.1", "axios": "^1.2.1", @@ -19243,9 +19243,9 @@ } }, "@joystream/metadata-protobuf": { - "version": "2.8.1", - "resolved": "https://registry.npmjs.org/@joystream/metadata-protobuf/-/metadata-protobuf-2.8.1.tgz", - "integrity": "sha512-ckxvQP3RC8gKCJWU1xpXosxEfgFcChgaIncy06AZfn50x6+9mFEsxQTTogN7b1g1T026oSFYZMDq52tkBw2Zew==", + "version": "2.14.0", + "resolved": "https://registry.npmjs.org/@joystream/metadata-protobuf/-/metadata-protobuf-2.14.0.tgz", + "integrity": "sha512-1/FgUW6mGt3t+p0k7/KdKnRQPU0HghY4MJTSkip1SCTMICZWoDuSADJuM8nlh2Qdu9kqvodqtndKjz057b0t3g==", "requires": { "@types/iso-3166-2": "^1.0.0", "@types/long": "^4.0.1", @@ -22253,9 +22253,9 @@ } }, "@types/iso-3166-2": { - "version": "1.0.0", - "resolved": "https://registry.npmjs.org/@types/iso-3166-2/-/iso-3166-2-1.0.0.tgz", - "integrity": "sha512-DYDyoRyPyxBeI9bYoTXLfsOZH12m1anrWEj9LU5Sl9rgsJ4soJMYf/7byozM+64Smn6/a1i079eQLGuPykwaHQ==" + "version": "1.0.3", + "resolved": "https://registry.npmjs.org/@types/iso-3166-2/-/iso-3166-2-1.0.3.tgz", + "integrity": "sha512-jOqKFSfZYVJ5ARkXH2V2RrEhLvFds1RVrFGRKKxqLU3jjExlf5LtqdFgXNd19KAMRhgsZSNjedeLDVYlGUemZg==" }, "@types/js-yaml": { "version": "4.0.5", @@ -27577,9 +27577,9 @@ "integrity": "sha512-vGrhOavPSTz4QVNuBNdcNXePNdNMaO1xj9yBeH1ScQPjk/rhg9sSlCXPhMkFuaNNW/syTvYqsnbIJxMBfRbbag==" }, "protobufjs": { - "version": "6.11.3", - "resolved": "https://registry.npmjs.org/protobufjs/-/protobufjs-6.11.3.tgz", - "integrity": "sha512-xL96WDdCZYdU7Slin569tFX712BxsxslWwAfAhCYjQKGTq7dAU91Lomy6nLLhh/dyGhk/YH4TwTSRxTzhuHyZg==", + "version": "6.11.4", + "resolved": "https://registry.npmjs.org/protobufjs/-/protobufjs-6.11.4.tgz", + "integrity": "sha512-5kQWPaJHi1WoCpjTGszzQ32PG2F4+wRY6BmAT4Vfw56Q2FZ4YZzK20xUYQH4YkfehY1e6QSICrJquM6xXZNcrw==", "requires": { "@protobufjs/aspromise": "^1.1.2", "@protobufjs/base64": "^1.1.2", diff --git a/package.json b/package.json index f9e15fd..1c15b89 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "storage-squid", - "version": "1.4.4", + "version": "1.5.0", "engines": { "node": ">=16" }, @@ -26,7 +26,7 @@ "@joystream/types": "0.20.5" }, "dependencies": { - "@joystream/js": "^1.4.0", + "@joystream/js": "^1.9.0", "@opentelemetry/api": "^1.4.1", "@opentelemetry/auto-instrumentations-node": "0.37.0", "@opentelemetry/core": "1.13.0", diff --git a/schema/events.graphql b/schema/events.graphql index 61ee217..779e231 100644 --- a/schema/events.graphql +++ b/schema/events.graphql @@ -18,7 +18,11 @@ type Event @entity { data: EventData! } -union EventData = MetaprotocolTransactionStatusEventData | DataObjectDeletedEventData +union EventData = + MetaprotocolTransactionStatusEventData + | DataObjectDeletedEventData + | StorageNodeOperationalStatusSetEvent + | DistributionNodeOperationalStatusSetEvent type MetaprotocolTransactionResultOK { phantom: Int @@ -41,3 +45,19 @@ type DataObjectDeletedEventData { "Runtime ID of deleted the deleted object" dataObjectId: ID! } + +type StorageNodeOperationalStatusSetEvent { + "Storage Bucket" + storageBucket: StorageBucket! + + "Operational status that was set" + operationalStatus: NodeOperationalStatus! +} + +type DistributionNodeOperationalStatusSetEvent { + "Distribution bucket operator" + bucketOperator: DistributionBucketOperator + + "Operational status that was set" + operationalStatus: NodeOperationalStatus! +} diff --git a/schema/storage.graphql b/schema/storage.graphql index ec0ffb2..e75a85e 100644 --- a/schema/storage.graphql +++ b/schema/storage.graphql @@ -62,6 +62,50 @@ type NodeLocationMetadata { coordinates: GeoCoordinates } +type NodeOperationalStatusNormal { + "Reason why node was set to this state" + rationale: String +} + +type NodeOperationalStatusNoService { + "Whether the state was set by lead (true) or by the operator (false), it is meant to prevent worker from unilaterally reversing." + forced: Boolean! + + "Reason why node was set to this state" + rationale: String +} + +type NodeOperationalStatusNoServiceFrom { + "Whether the state was set by lead (true) or by the operator (false), it is meant to prevent worker from unilaterally reversing." + forced: Boolean! + + "The time from which the bucket would have to no service" + from: DateTime! + + "Reason why node was set to this state" + rationale: String +} + +type NodeOperationalStatusNoServiceDuring { + "Whether the state was set by lead (true) or by the operator (false), it is meant to prevent worker from unilaterally reversing." + forced: Boolean! + + "The time from which the bucket would have to no service" + from: DateTime! + + "The time until which the bucket would have to no service" + to: DateTime! + + "Reason why node was set to this state" + rationale: String +} + +union NodeOperationalStatus = + NodeOperationalStatusNormal + | NodeOperationalStatusNoService + | NodeOperationalStatusNoServiceFrom + | NodeOperationalStatusNoServiceDuring + type StorageBucketOperatorMetadata @entity { id: ID! @@ -74,6 +118,9 @@ type StorageBucketOperatorMetadata @entity { "Optional node location metadata" nodeLocation: NodeLocationMetadata + "Optional node operational status" + nodeOperationalStatus: NodeOperationalStatus + "Additional information about the node/operator" extra: String } @@ -233,6 +280,9 @@ type DistributionBucketOperatorMetadata @entity { "Optional node location metadata" nodeLocation: NodeLocationMetadata + "Optional node operational status" + nodeOperationalStatus: NodeOperationalStatus + "Additional information about the node/operator" extra: String } diff --git a/schema/workingGroups.graphql b/schema/workingGroups.graphql new file mode 100644 index 0000000..208143e --- /dev/null +++ b/schema/workingGroups.graphql @@ -0,0 +1,5 @@ +# Working Groups +type Worker @entity { + "Worker id ({workingGroupName}-{workerId})" + id: ID! +} diff --git a/src/mappings/storage/metadata.ts b/src/mappings/storage/metadata.ts index acbb7c6..3e61e7a 100644 --- a/src/mappings/storage/metadata.ts +++ b/src/mappings/storage/metadata.ts @@ -4,7 +4,9 @@ import { IDistributionBucketOperatorMetadata, IGeographicalArea, INodeLocationMetadata, + INodeOperationalStatusMetadata, IStorageBucketOperatorMetadata, + NodeOperationalStatusMetadata, } from '@joystream/metadata-protobuf' import { DecodedMetadataObject } from '@joystream/metadata-protobuf/types' import { @@ -13,22 +15,27 @@ import { isValidCountryCode, isValidSubdivisionCode, } from '@joystream/metadata-protobuf/utils' +import _ from 'lodash' +import { Logger } from '../../logger' import { - StorageBucketOperatorMetadata, - NodeLocationMetadata, - GeoCoordinates, + Continent, DistributionBucketFamilyMetadata, + DistributionBucketOperatorMetadata, + GeoCoordinates, + GeographicalArea, GeographicalAreaContinent, - Continent, GeographicalAreaCountry, GeographicalAreaSubdivistion, - DistributionBucketOperatorMetadata, - GeographicalArea, + NodeLocationMetadata, + NodeOperationalStatus, + NodeOperationalStatusNoService, + NodeOperationalStatusNoServiceDuring, + NodeOperationalStatusNoServiceFrom, + NodeOperationalStatusNormal, + StorageBucketOperatorMetadata, } from '../../model' -import { invalidMetadata } from '../utils' import { EntityManagerOverlay, Flat } from '../../utils/overlay' -import { Logger } from '../../logger' -import _ from 'lodash' +import { invalidMetadata } from '../utils' export const protobufContinentToGraphlContinent: { [key in GeographicalAreaProto.Continent]: Continent @@ -60,6 +67,9 @@ export async function processStorageOperatorMetadata( if (isSet(metadataUpdate.location)) { processNodeLocationMetadata(operatorMetadata, metadataUpdate.location) } + if (isSet(metadataUpdate.operationalStatus)) { + processNodeOperationalStatusMetadata('worker', undefined, metadataUpdate.operationalStatus) + } if (isSet(metadataUpdate.extra)) { operatorMetadata.extra = metadataUpdate.extra || null } @@ -103,6 +113,78 @@ function processNodeLocationMetadata( } } +export function processNodeOperationalStatusMetadata( + actorContext: 'lead' | 'worker', + currentStatus: NodeOperationalStatus | null | undefined, + meta: INodeOperationalStatusMetadata +): NodeOperationalStatus { + const isCurrentStatusForced = + currentStatus && + (currentStatus instanceof NodeOperationalStatusNoService || + currentStatus instanceof NodeOperationalStatusNoServiceFrom || + currentStatus instanceof NodeOperationalStatusNoServiceDuring) && + currentStatus.forced + + // if current state is forced by lead, then prevent worker from unilaterally reversing. + if (isCurrentStatusForced && actorContext === 'worker') { + return currentStatus + } + + // Validate date formats + let validatedNoServiceFrom: Date | undefined + let validatedNoServiceTo: Date | undefined + + try { + if (meta.noServiceFrom) { + new Date(meta.noServiceFrom).toISOString() + validatedNoServiceFrom = new Date(meta.noServiceFrom) + } + } catch (error) { + invalidMetadata(NodeOperationalStatusMetadata, `Invalid date format for "noServiceFrom"`, { + decodedMessage: meta, + }) + } + + try { + if (meta.noServiceTo) { + new Date(meta.noServiceTo).toISOString() + validatedNoServiceTo = new Date(meta.noServiceTo) + } + } catch (error) { + invalidMetadata(NodeOperationalStatusMetadata, `Invalid date format for "noServiceTo"`, { + decodedMessage: meta, + }) + } + + // set node state to NoService + if (meta.status === NodeOperationalStatusMetadata.OperationalStatus.NO_SERVICE) { + if (validatedNoServiceFrom && validatedNoServiceTo) { + const status = new NodeOperationalStatusNoServiceDuring() + status.rationale = meta.rationale + status.forced = actorContext === 'lead' + status.from = validatedNoServiceFrom + status.to = validatedNoServiceTo + return status + } else if (validatedNoServiceFrom && !validatedNoServiceTo) { + const status = new NodeOperationalStatusNoServiceFrom() + status.rationale = meta.rationale + status.forced = actorContext === 'lead' + status.from = validatedNoServiceFrom + return status + } else if (!validatedNoServiceFrom && !validatedNoServiceTo) { + const status = new NodeOperationalStatusNoService() + status.rationale = meta.rationale + status.forced = actorContext === 'lead' + return status + } + } + + // Default operational status of the node + const status = new NodeOperationalStatusNormal() + status.rationale = meta.rationale + return status +} + export async function processDistributionOperatorMetadata( overlay: EntityManagerOverlay, operatorId: string, @@ -121,6 +203,9 @@ export async function processDistributionOperatorMetadata( if (isSet(metadataUpdate.location)) { processNodeLocationMetadata(operatorMetadata, metadataUpdate.location) } + if (isSet(metadataUpdate.operationalStatus)) { + processNodeOperationalStatusMetadata('worker', undefined, metadataUpdate.operationalStatus) + } if (isSet(metadataUpdate.extra)) { operatorMetadata.extra = metadataUpdate.extra || null } diff --git a/src/mappings/storage/utils.ts b/src/mappings/storage/utils.ts index 61983d8..89cd7ea 100644 --- a/src/mappings/storage/utils.ts +++ b/src/mappings/storage/utils.ts @@ -1,8 +1,11 @@ +import { ISetNodeOperationalStatus, SetNodeOperationalStatus } from '@joystream/metadata-protobuf' +import { isSet } from '@joystream/metadata-protobuf/utils' import { hexToString } from '@polkadot/util' import { DataObjectDeletedEventData, DistributionBucketOperator, DistributionBucketOperatorMetadata, + DistributionNodeOperationalStatusSetEvent, Event, StorageBag, StorageBagOwner, @@ -10,8 +13,12 @@ import { StorageBagOwnerCouncil, StorageBagOwnerMember, StorageBagOwnerWorkingGroup, + StorageBucket, + StorageBucketOperatorMetadata, StorageDataObject, + StorageNodeOperationalStatusSetEvent, VideoSubtitle, + Worker, } from '../../model' import { Block } from '../../processor' import { @@ -23,7 +30,8 @@ import { } from '../../types/v1000' import { criticalError } from '../../utils/misc' import { EntityManagerOverlay, Flat, RepositoryOverlay } from '../../utils/overlay' -import { genericEventFields } from '../utils' +import { genericEventFields, invalidMetadata } from '../utils' +import { processNodeOperationalStatusMetadata } from './metadata' export function getDynamicBagId(bagId: DynamicBagIdType): string { if (bagId.__kind === 'Channel') { @@ -206,3 +214,118 @@ export async function deleteDataObjectsByIds( subtitlesRepository.remove(...currentSubtitles.flat()) await deleteDataObjects(overlay, block, indexInBlock, extrinsicHash, objects) } + +export async function processSetNodeOperationalStatusMessage( + overlay: EntityManagerOverlay, + block: Block, + indexInBlock: number, + extrinsicHash: string | undefined, + workingGroup: 'storageWorkingGroup' | 'distributionWorkingGroup', + actorContext: 'lead' | 'worker', + meta: ISetNodeOperationalStatus +): Promise { + const workerId = Number(meta.workerId) + const bucketId = String(meta.bucketId) + + if ((await overlay.getRepository(Worker).getById(`${workingGroup}-${workerId}`)) === undefined) { + return invalidMetadata( + SetNodeOperationalStatus, + `The worker ${workerId} does not exist in the ${workingGroup} working group` + ) + } + + // Update the operational status of Storage node + if (workingGroup === 'storageWorkingGroup') { + const storageBucket = await overlay.getRepository(StorageBucket).getById(bucketId) + if (!storageBucket) { + return invalidMetadata( + SetNodeOperationalStatus, + `The storage bucket ${bucketId} does not exist` + ) + } else if (storageBucket.operatorStatus.isTypeOf !== 'StorageBucketOperatorStatusActive') { + return invalidMetadata( + SetNodeOperationalStatus, + `The storage bucket ${bucketId} is not active` + ) + } else if (storageBucket.operatorStatus.workerId !== workerId) { + return invalidMetadata( + SetNodeOperationalStatus, + `The worker ${workerId} is not the operator of the storage bucket ${bucketId}` + ) + } + + // create metadata entity if it does not exist already + const metadataEntity = + (await overlay.getRepository(StorageBucketOperatorMetadata).getById(bucketId)) || + overlay + .getRepository(StorageBucketOperatorMetadata) + .new({ id: bucketId, storageBucketId: bucketId }) + + if (isSet(meta.operationalStatus)) { + metadataEntity.nodeOperationalStatus = processNodeOperationalStatusMetadata( + actorContext, + metadataEntity.nodeOperationalStatus, + meta.operationalStatus + ) + } + + // event processing + + const operationalStatusSetEvent = new StorageNodeOperationalStatusSetEvent({ + ...genericEventFields(overlay, block, indexInBlock, extrinsicHash), + storageBucket: storageBucket.id, + operationalStatus: metadataEntity.nodeOperationalStatus || undefined, + }) + + overlay.getRepository(Event).new({ + ...genericEventFields(overlay, block, indexInBlock, extrinsicHash), + data: operationalStatusSetEvent, + }) + } + + // Update the operational status of Distribution node + if (workingGroup === 'distributionWorkingGroup') { + const distributionOperatorId = `${bucketId}-${workerId}` + const operator = await overlay + .getRepository(DistributionBucketOperator) + .getById(distributionOperatorId) + + if (!operator) { + return invalidMetadata( + SetNodeOperationalStatus, + `The distribution bucket operator ${distributionOperatorId} does not exist` + ) + } + + // create metadata entity if it does not exist already + const metadataEntity = + (await overlay + .getRepository(DistributionBucketOperatorMetadata) + .getById(distributionOperatorId)) || + overlay.getRepository(DistributionBucketOperatorMetadata).new({ + id: distributionOperatorId, + distirbutionBucketOperatorId: distributionOperatorId, + }) + + if (isSet(meta.operationalStatus)) { + metadataEntity.nodeOperationalStatus = processNodeOperationalStatusMetadata( + actorContext, + metadataEntity.nodeOperationalStatus, + meta.operationalStatus + ) + } + + // event processing + + const operationalStatusSetEvent = new DistributionNodeOperationalStatusSetEvent({ + ...genericEventFields(overlay, block, indexInBlock, extrinsicHash), + bucketOperator: operator.id, + operationalStatus: metadataEntity.nodeOperationalStatus || undefined, + }) + + overlay.getRepository(Event).new({ + ...genericEventFields(overlay, block, indexInBlock, extrinsicHash), + data: operationalStatusSetEvent, + }) + } +} diff --git a/src/mappings/utils.ts b/src/mappings/utils.ts index 738bbd0..5e12382 100644 --- a/src/mappings/utils.ts +++ b/src/mappings/utils.ts @@ -104,3 +104,8 @@ export function backwardCompatibleMetaID(block: Block, indexInBlock: number) { export function u8aToBytes(array?: Uint8Array | null): Bytes { return createType('Bytes', array ? u8aToHex(array) : '') } + +export function toLowerFirstLetter(str: string) { + if (!str) return '' // Return an empty string if str is falsy + return str.charAt(0).toLowerCase() + str.slice(1) +} diff --git a/src/mappings/workingGroups/index.ts b/src/mappings/workingGroups/index.ts new file mode 100644 index 0000000..8ad9349 --- /dev/null +++ b/src/mappings/workingGroups/index.ts @@ -0,0 +1,172 @@ +import { RemarkMetadataAction } from '@joystream/metadata-protobuf' +import { Worker } from '../../model' +import { Block, EventHandlerContext } from '../../processor' +import { criticalError } from '../../utils/misc' +import { EntityManagerOverlay } from '../../utils/overlay' +import { processSetNodeOperationalStatusMessage } from '../storage/utils' +import { deserializeMetadataStr, invalidMetadata, toLowerFirstLetter } from '../utils' + +export async function processWorkingGroupsOpeningFilledEvent({ + overlay, + event, + eventDecoder, +}: EventHandlerContext< + 'StorageWorkingGroup.OpeningFilled' | 'DistributionWorkingGroup.OpeningFilled' +>) { + const [, applicationIdToWorkerIdMap, applicationIdsSet] = eventDecoder.v1000.decode(event) + const [workingGroupName] = event.name.split('.') + + const getWorkerIdByApplicationId = (applicationId: bigint): bigint => { + const tuple = applicationIdToWorkerIdMap.find(([appId, _]) => appId === applicationId) + if (!tuple) { + criticalError( + `Worker id for application id ${applicationId} not found in applicationIdToWorkerIdMap` + ) + } + return tuple[1] + } + + applicationIdsSet.forEach((applicationId) => { + const workerId = getWorkerIdByApplicationId(applicationId) + overlay.getRepository(Worker).new({ + id: `${toLowerFirstLetter(workingGroupName)}-${workerId}`, + }) + }) +} + +export async function processWorkingGroupsWorkerTerminatedOrExitedEvent({ + overlay, + event, + eventDecoder, +}: EventHandlerContext< + | 'StorageWorkingGroup.TerminatedWorker' + | 'StorageWorkingGroup.TerminatedLeader' + | 'StorageWorkingGroup.WorkerExited' + | 'DistributionWorkingGroup.TerminatedWorker' + | 'DistributionWorkingGroup.TerminatedLeader' + | 'DistributionWorkingGroup.WorkerExited' +>) { + const decoded = eventDecoder.v1000.decode(event) + + let workerId: bigint + + if (typeof decoded === 'bigint') { + workerId = decoded + } else { + workerId = decoded[0] + } + + // Get the working group name + const [workingGroupName] = event.name.split('.') + + // Remove the worker + overlay.getRepository(Worker).remove(`${toLowerFirstLetter(workingGroupName)}-${workerId}`) +} + +export async function processStorageWorkingGroupLeadRemarkedEvent({ + overlay, + block, + event, + indexInBlock, + extrinsicHash, + eventDecoder, +}: EventHandlerContext<'StorageWorkingGroup.LeadRemarked'>) { + const [metadataBytes] = eventDecoder.v1000.decode(event) + + await applyWorkingGroupsRemark( + overlay, + block, + indexInBlock, + extrinsicHash, + 'storageWorkingGroup', + 'lead', + metadataBytes + ) +} + +export async function processStorageWorkingGroupWorkerRemarkedEvent({ + overlay, + block, + event, + indexInBlock, + extrinsicHash, + eventDecoder, +}: EventHandlerContext<'StorageWorkingGroup.WorkerRemarked'>) { + const [, metadataBytes] = eventDecoder.v1000.decode(event) + + await applyWorkingGroupsRemark( + overlay, + block, + indexInBlock, + extrinsicHash, + 'storageWorkingGroup', + 'worker', + metadataBytes + ) +} + +export async function processDistributionWorkingGroupLeadRemarkedEvent({ + overlay, + block, + event, + indexInBlock, + extrinsicHash, + eventDecoder, +}: EventHandlerContext<'DistributionWorkingGroup.LeadRemarked'>) { + const [metadataBytes] = eventDecoder.v1000.decode(event) + await applyWorkingGroupsRemark( + overlay, + block, + indexInBlock, + extrinsicHash, + 'distributionWorkingGroup', + 'lead', + metadataBytes + ) +} + +export async function processDistributionWorkingGroupWorkerRemarkedEvent({ + overlay, + block, + event, + indexInBlock, + extrinsicHash, + eventDecoder, +}: EventHandlerContext<'DistributionWorkingGroup.WorkerRemarked'>) { + const [, metadataBytes] = eventDecoder.v1000.decode(event) + await applyWorkingGroupsRemark( + overlay, + block, + indexInBlock, + extrinsicHash, + 'distributionWorkingGroup', + 'worker', + metadataBytes + ) +} + +async function applyWorkingGroupsRemark( + overlay: EntityManagerOverlay, + block: Block, + indexInBlock: number, + extrinsicHash: string | undefined, + workingGroup: 'storageWorkingGroup' | 'distributionWorkingGroup', + actorContext: 'lead' | 'worker', + metadataBytes: string +): Promise { + const metadata = deserializeMetadataStr(RemarkMetadataAction, metadataBytes) + + if (metadata?.setNodeOperationalStatus) { + await processSetNodeOperationalStatusMessage( + overlay, + block, + indexInBlock, + extrinsicHash, + workingGroup, + actorContext, + metadata.setNodeOperationalStatus + ) + } else { + return invalidMetadata(RemarkMetadataAction, 'Unsupported remarked action') + } +} diff --git a/src/processor.ts b/src/processor.ts index b9ad307..bd7cb6e 100644 --- a/src/processor.ts +++ b/src/processor.ts @@ -40,6 +40,14 @@ import { processStorageOperatorMetadataSetEvent, processVoucherChangedEvent, } from './mappings/storage' +import { + processDistributionWorkingGroupLeadRemarkedEvent, + processDistributionWorkingGroupWorkerRemarkedEvent, + processStorageWorkingGroupLeadRemarkedEvent, + processStorageWorkingGroupWorkerRemarkedEvent, + processWorkingGroupsOpeningFilledEvent, + processWorkingGroupsWorkerTerminatedOrExitedEvent, +} from './mappings/workingGroups' import { events } from './types' import { EntityManagerOverlay } from './utils/overlay' @@ -57,7 +65,10 @@ type MapModuleEvents = { string as `${Capitalize}.${Capitalize}`]: typeof events[Module][Event] } -type EventsMap = MapModuleEvents<'content'> & MapModuleEvents<'storage'> +type EventsMap = MapModuleEvents<'content'> & + MapModuleEvents<'storage'> & + MapModuleEvents<'storageWorkingGroup'> & + MapModuleEvents<'distributionWorkingGroup'> type EventNames = keyof EventsMap export type EventHandlerContext = { @@ -110,6 +121,18 @@ const eventHandlers: EventHandlers = { 'Storage.DistributionBucketFamilyCreated': processDistributionBucketFamilyCreatedEvent, 'Storage.DistributionBucketFamilyMetadataSet': processDistributionBucketFamilyMetadataSetEvent, 'Storage.DistributionBucketFamilyDeleted': processDistributionBucketFamilyDeletedEvent, + 'StorageWorkingGroup.OpeningFilled': processWorkingGroupsOpeningFilledEvent, + 'StorageWorkingGroup.TerminatedWorker': processWorkingGroupsWorkerTerminatedOrExitedEvent, + 'StorageWorkingGroup.TerminatedLeader': processWorkingGroupsWorkerTerminatedOrExitedEvent, + 'StorageWorkingGroup.WorkerExited': processWorkingGroupsWorkerTerminatedOrExitedEvent, + 'StorageWorkingGroup.LeadRemarked': processStorageWorkingGroupLeadRemarkedEvent, + 'StorageWorkingGroup.WorkerRemarked': processStorageWorkingGroupWorkerRemarkedEvent, + 'DistributionWorkingGroup.OpeningFilled': processWorkingGroupsOpeningFilledEvent, + 'DistributionWorkingGroup.TerminatedWorker': processWorkingGroupsWorkerTerminatedOrExitedEvent, + 'DistributionWorkingGroup.TerminatedLeader': processWorkingGroupsWorkerTerminatedOrExitedEvent, + 'DistributionWorkingGroup.WorkerExited': processWorkingGroupsWorkerTerminatedOrExitedEvent, + 'DistributionWorkingGroup.LeadRemarked': processDistributionWorkingGroupLeadRemarkedEvent, + 'DistributionWorkingGroup.WorkerRemarked': processDistributionWorkingGroupWorkerRemarkedEvent, } const eventNames = Object.keys(eventHandlers) diff --git a/typegen.json b/typegen.json index a67a894..75ecb50 100644 --- a/typegen.json +++ b/typegen.json @@ -38,9 +38,27 @@ "DistributionBucketFamilyCreated", "DistributionBucketFamilyMetadataSet", "DistributionBucketFamilyDeleted" - ], - "calls": [], - "storage": [] + ] + }, + "StorageWorkingGroup": { + "events": [ + "WorkerRemarked", + "LeadRemarked", + "OpeningFilled", + "TerminatedWorker", + "TerminatedLeader", + "WorkerExited" + ] + }, + "DistributionWorkingGroup": { + "events": [ + "WorkerRemarked", + "LeadRemarked", + "OpeningFilled", + "TerminatedWorker", + "TerminatedLeader", + "WorkerExited" + ] } } } From fae93f634c538639ebb6bf94821dd246bcb98dfc Mon Sep 17 00:00:00 2001 From: Zeeshan Akram <97m.zeeshan@gmail.com> Date: Thu, 29 Feb 2024 14:08:38 +0500 Subject: [PATCH 2/9] change 'workerId' type in graphql schema from Int to BigInt --- ...53133577-Data.js => 1709195273064-Data.js} | 8 +-- generated/schema.graphql | 54 ++++++++++++------- schema/storage.graphql | 6 +-- schema/workingGroups.graphql | 3 ++ src/mappings/storage/index.ts | 8 +-- 5 files changed, 49 insertions(+), 30 deletions(-) rename db/migrations/{1709153133577-Data.js => 1709195273064-Data.js} (98%) diff --git a/db/migrations/1709153133577-Data.js b/db/migrations/1709195273064-Data.js similarity index 98% rename from db/migrations/1709153133577-Data.js rename to db/migrations/1709195273064-Data.js index f13e9a4..1042554 100644 --- a/db/migrations/1709153133577-Data.js +++ b/db/migrations/1709195273064-Data.js @@ -1,5 +1,5 @@ -module.exports = class Data1709153133577 { - name = 'Data1709153133577' +module.exports = class Data1709195273064 { + name = 'Data1709195273064' async up(db) { await db.query(`CREATE TABLE "next_entity_id" ("entity_name" character varying NOT NULL, "next_id" bigint NOT NULL, CONSTRAINT "PK_09a3b40db622a65096e7344d7ae" PRIMARY KEY ("entity_name"))`) @@ -10,7 +10,7 @@ module.exports = class Data1709153133577 { await db.query(`CREATE INDEX "IDX_aaf00b2c7d0cba49f97da14fbb" ON "storage_bucket_bag" ("bag_id") `) await db.query(`CREATE INDEX "IDX_4c475f6c9300284b095859eec3" ON "storage_bucket_bag" ("storage_bucket_id", "bag_id") `) await db.query(`CREATE TABLE "distribution_bucket_family" ("id" character varying NOT NULL, CONSTRAINT "PK_8cb7454d1ec34b0d3bb7ecdee4e" PRIMARY KEY ("id"))`) - await db.query(`CREATE TABLE "distribution_bucket_operator" ("id" character varying NOT NULL, "distribution_bucket_id" character varying, "worker_id" integer NOT NULL, "status" character varying(7) NOT NULL, CONSTRAINT "PK_03b87e6e972f414bab94c142285" PRIMARY KEY ("id"))`) + await db.query(`CREATE TABLE "distribution_bucket_operator" ("id" character varying NOT NULL, "distribution_bucket_id" character varying, "worker_id" numeric NOT NULL, "status" character varying(7) NOT NULL, CONSTRAINT "PK_03b87e6e972f414bab94c142285" PRIMARY KEY ("id"))`) await db.query(`CREATE INDEX "IDX_678dc5427cdde0cd4fef2c07a4" ON "distribution_bucket_operator" ("distribution_bucket_id") `) await db.query(`CREATE TABLE "distribution_bucket" ("id" character varying NOT NULL, "family_id" character varying, "bucket_index" integer NOT NULL, "accepting_new_bags" boolean NOT NULL, "distributing" boolean NOT NULL, CONSTRAINT "PK_c90d25fff461f2f5fa9082e2fb7" PRIMARY KEY ("id"))`) await db.query(`CREATE INDEX "IDX_8cb7454d1ec34b0d3bb7ecdee4" ON "distribution_bucket" ("family_id") `) @@ -29,7 +29,7 @@ module.exports = class Data1709153133577 { await db.query(`CREATE INDEX "IDX_5510d3b244a63d6ec702faa426" ON "distribution_bucket_family_metadata" ("region") `) await db.query(`CREATE TABLE "distribution_bucket_operator_metadata" ("id" character varying NOT NULL, "distirbution_bucket_operator_id" character varying NOT NULL, "node_endpoint" text, "node_location" jsonb, "node_operational_status" jsonb, "extra" text, CONSTRAINT "DistributionBucketOperatorMetadata_distirbutionBucketOperator" UNIQUE ("distirbution_bucket_operator_id") DEFERRABLE INITIALLY DEFERRED, CONSTRAINT "REL_69ec9bdc975b95f7dff94a7106" UNIQUE ("distirbution_bucket_operator_id"), CONSTRAINT "PK_9bbecaa12f30e3826922688274f" PRIMARY KEY ("id"))`) await db.query(`CREATE INDEX "IDX_69ec9bdc975b95f7dff94a7106" ON "distribution_bucket_operator_metadata" ("distirbution_bucket_operator_id") `) - await db.query(`CREATE TABLE "worker" ("id" character varying NOT NULL, CONSTRAINT "PK_dc8175fa0e34ce7a39e4ec73b94" PRIMARY KEY ("id"))`) + await db.query(`CREATE TABLE "worker" ("id" character varying NOT NULL, "runtime_id" numeric NOT NULL, CONSTRAINT "PK_dc8175fa0e34ce7a39e4ec73b94" PRIMARY KEY ("id"))`) await db.query(`ALTER TABLE "storage_bucket_bag" ADD CONSTRAINT "FK_791e2f82e3919ffcef8712aa1b9" FOREIGN KEY ("storage_bucket_id") REFERENCES "storage_bucket"("id") ON DELETE NO ACTION ON UPDATE NO ACTION DEFERRABLE INITIALLY DEFERRED`) await db.query(`ALTER TABLE "storage_bucket_bag" ADD CONSTRAINT "FK_aaf00b2c7d0cba49f97da14fbba" FOREIGN KEY ("bag_id") REFERENCES "storage_bag"("id") ON DELETE NO ACTION ON UPDATE NO ACTION DEFERRABLE INITIALLY DEFERRED`) await db.query(`ALTER TABLE "distribution_bucket_operator" ADD CONSTRAINT "FK_678dc5427cdde0cd4fef2c07a43" FOREIGN KEY ("distribution_bucket_id") REFERENCES "distribution_bucket"("id") ON DELETE NO ACTION ON UPDATE NO ACTION DEFERRABLE INITIALLY DEFERRED`) diff --git a/generated/schema.graphql b/generated/schema.graphql index 9831953..6ea51f9 100644 --- a/generated/schema.graphql +++ b/generated/schema.graphql @@ -347,7 +347,7 @@ type DistributionBucketOperator { distributionBucket: DistributionBucket! """ID of the distribution group worker""" - workerId: Int! + workerId: BigInt! """Current operator status""" status: DistributionBucketOperatorStatus! @@ -583,14 +583,14 @@ input DistributionBucketOperatorWhereInput { distributionBucket_isNull: Boolean distributionBucket: DistributionBucketWhereInput workerId_isNull: Boolean - workerId_eq: Int - workerId_not_eq: Int - workerId_gt: Int - workerId_gte: Int - workerId_lt: Int - workerId_lte: Int - workerId_in: [Int!] - workerId_not_in: [Int!] + workerId_eq: BigInt + workerId_not_eq: BigInt + workerId_gt: BigInt + workerId_gte: BigInt + workerId_lt: BigInt + workerId_lte: BigInt + workerId_in: [BigInt!] + workerId_not_in: [BigInt!] status_isNull: Boolean status_eq: DistributionBucketOperatorStatus status_not_eq: DistributionBucketOperatorStatus @@ -1676,12 +1676,12 @@ input StorageBucketOperatorMetadataWhereInput { union StorageBucketOperatorStatus = StorageBucketOperatorStatusMissing | StorageBucketOperatorStatusInvited | StorageBucketOperatorStatusActive type StorageBucketOperatorStatusActive { - workerId: Int! + workerId: BigInt! transactorAccountId: String! } type StorageBucketOperatorStatusInvited { - workerId: Int! + workerId: BigInt! } type StorageBucketOperatorStatusMissing { @@ -1699,14 +1699,14 @@ input StorageBucketOperatorStatusWhereInput { phantom_in: [Int!] phantom_not_in: [Int!] workerId_isNull: Boolean - workerId_eq: Int - workerId_not_eq: Int - workerId_gt: Int - workerId_gte: Int - workerId_lt: Int - workerId_lte: Int - workerId_in: [Int!] - workerId_not_in: [Int!] + workerId_eq: BigInt + workerId_not_eq: BigInt + workerId_gt: BigInt + workerId_gte: BigInt + workerId_lt: BigInt + workerId_lte: BigInt + workerId_in: [BigInt!] + workerId_not_in: [BigInt!] transactorAccountId_isNull: Boolean transactorAccountId_eq: String transactorAccountId_not_eq: String @@ -2152,6 +2152,9 @@ input WhereIdInput { type Worker { """Worker id ({workingGroupName}-{workerId})""" id: String! + + """WorkerId in specific working group module""" + runtimeId: BigInt! } type WorkerEdge { @@ -2164,6 +2167,10 @@ enum WorkerOrderByInput { id_DESC id_ASC_NULLS_FIRST id_DESC_NULLS_LAST + runtimeId_ASC + runtimeId_DESC + runtimeId_ASC_NULLS_FIRST + runtimeId_DESC_NULLS_LAST } type WorkersConnection { @@ -2190,6 +2197,15 @@ input WorkerWhereInput { id_not_startsWith: String id_endsWith: String id_not_endsWith: String + runtimeId_isNull: Boolean + runtimeId_eq: BigInt + runtimeId_not_eq: BigInt + runtimeId_gt: BigInt + runtimeId_gte: BigInt + runtimeId_lt: BigInt + runtimeId_lte: BigInt + runtimeId_in: [BigInt!] + runtimeId_not_in: [BigInt!] AND: [WorkerWhereInput!] OR: [WorkerWhereInput!] } diff --git a/schema/storage.graphql b/schema/storage.graphql index e75a85e..4bcf152 100644 --- a/schema/storage.graphql +++ b/schema/storage.graphql @@ -3,11 +3,11 @@ type StorageBucketOperatorStatusMissing { } type StorageBucketOperatorStatusInvited { - workerId: Int! + workerId: BigInt! } type StorageBucketOperatorStatusActive { - workerId: Int! + workerId: BigInt! transactorAccountId: String! } @@ -300,7 +300,7 @@ type DistributionBucketOperator @entity { distributionBucket: DistributionBucket! "ID of the distribution group worker" - workerId: Int! + workerId: BigInt! "Current operator status" status: DistributionBucketOperatorStatus! diff --git a/schema/workingGroups.graphql b/schema/workingGroups.graphql index 208143e..c15776d 100644 --- a/schema/workingGroups.graphql +++ b/schema/workingGroups.graphql @@ -2,4 +2,7 @@ type Worker @entity { "Worker id ({workingGroupName}-{workerId})" id: ID! + + "WorkerId in specific working group module" + runtimeId: BigInt! } diff --git a/src/mappings/storage/index.ts b/src/mappings/storage/index.ts index 7d56dc1..0ae6cc7 100644 --- a/src/mappings/storage/index.ts +++ b/src/mappings/storage/index.ts @@ -60,7 +60,7 @@ export async function processStorageBucketCreatedEvent({ }) if (invitedWorkerId !== undefined) { storageBucket.operatorStatus = new StorageBucketOperatorStatusInvited({ - workerId: Number(invitedWorkerId), + workerId: invitedWorkerId, }) } else { storageBucket.operatorStatus = new StorageBucketOperatorStatusMissing() @@ -101,7 +101,7 @@ export async function processStorageBucketInvitationAcceptedEvent({ .getRepository(StorageBucket) .getByIdOrFail(bucketId.toString()) storageBucket.operatorStatus = new StorageBucketOperatorStatusActive({ - workerId: Number(workerId), + workerId, transactorAccountId: toAddress(hexToU8a(transactorAccountId)), }) } @@ -129,7 +129,7 @@ export async function processStorageBucketOperatorInvitedEvent({ .getRepository(StorageBucket) .getByIdOrFail(bucketId.toString()) storageBucket.operatorStatus = new StorageBucketOperatorStatusInvited({ - workerId: Number(workerId), + workerId, }) } @@ -482,7 +482,7 @@ export function processDistributionBucketOperatorInvitedEvent({ id: distributionOperatorId(bucketId, workerId), distributionBucketId: distributionBucketId(bucketId), status: DistributionBucketOperatorStatus.INVITED, - workerId: Number(workerId), + workerId, }) } From c4569504653406f1e13b40e852adc878fe1f5e57 Mon Sep 17 00:00:00 2001 From: Zeeshan Akram <97m.zeeshan@gmail.com> Date: Thu, 29 Feb 2024 14:16:36 +0500 Subject: [PATCH 3/9] ensure that only bucket operator can se the bucket's operational status --- src/mappings/storage/utils.ts | 40 ++++++++++----- src/mappings/workingGroups/index.ts | 76 ++++++++++------------------- src/processor.ts | 14 +++--- 3 files changed, 61 insertions(+), 69 deletions(-) diff --git a/src/mappings/storage/utils.ts b/src/mappings/storage/utils.ts index 89cd7ea..5855688 100644 --- a/src/mappings/storage/utils.ts +++ b/src/mappings/storage/utils.ts @@ -221,17 +221,27 @@ export async function processSetNodeOperationalStatusMessage( indexInBlock: number, extrinsicHash: string | undefined, workingGroup: 'storageWorkingGroup' | 'distributionWorkingGroup', - actorContext: 'lead' | 'worker', + actor: Worker | undefined, // worker who sent the message, undefined if it's a lead meta: ISetNodeOperationalStatus ): Promise { - const workerId = Number(meta.workerId) - const bucketId = String(meta.bucketId) - - if ((await overlay.getRepository(Worker).getById(`${workingGroup}-${workerId}`)) === undefined) { - return invalidMetadata( - SetNodeOperationalStatus, - `The worker ${workerId} does not exist in the ${workingGroup} working group` - ) + let workerId: bigint + const bucketId = meta.bucketId || '' + + // Get the bucket's worker id + if (actor) { + workerId = actor.runtimeId + } else { + const maybeWorker = await overlay + .getRepository(Worker) + .getById(`${workingGroup}-${meta.workerId}`) + + if (!maybeWorker) { + return invalidMetadata( + SetNodeOperationalStatus, + `The worker ${meta.workerId} does not exist in the ${workingGroup} working group` + ) + } + workerId = maybeWorker.runtimeId } // Update the operational status of Storage node @@ -247,7 +257,8 @@ export async function processSetNodeOperationalStatusMessage( SetNodeOperationalStatus, `The storage bucket ${bucketId} is not active` ) - } else if (storageBucket.operatorStatus.workerId !== workerId) { + // If the actor is a worker, check if the worker is the operator of the storage bucket + } else if (actor && storageBucket.operatorStatus.workerId !== actor.runtimeId) { return invalidMetadata( SetNodeOperationalStatus, `The worker ${workerId} is not the operator of the storage bucket ${bucketId}` @@ -263,7 +274,7 @@ export async function processSetNodeOperationalStatusMessage( if (isSet(meta.operationalStatus)) { metadataEntity.nodeOperationalStatus = processNodeOperationalStatusMetadata( - actorContext, + actor ? 'worker' : 'lead', metadataEntity.nodeOperationalStatus, meta.operationalStatus ) @@ -295,6 +306,11 @@ export async function processSetNodeOperationalStatusMessage( SetNodeOperationalStatus, `The distribution bucket operator ${distributionOperatorId} does not exist` ) + } else if (actor && operator.workerId !== actor.runtimeId) { + return invalidMetadata( + SetNodeOperationalStatus, + `The worker ${workerId} is not the operator of the distribution bucket ${bucketId}` + ) } // create metadata entity if it does not exist already @@ -309,7 +325,7 @@ export async function processSetNodeOperationalStatusMessage( if (isSet(meta.operationalStatus)) { metadataEntity.nodeOperationalStatus = processNodeOperationalStatusMetadata( - actorContext, + actor ? 'worker' : 'lead', metadataEntity.nodeOperationalStatus, meta.operationalStatus ) diff --git a/src/mappings/workingGroups/index.ts b/src/mappings/workingGroups/index.ts index 8ad9349..ed8b255 100644 --- a/src/mappings/workingGroups/index.ts +++ b/src/mappings/workingGroups/index.ts @@ -30,6 +30,7 @@ export async function processWorkingGroupsOpeningFilledEvent({ const workerId = getWorkerIdByApplicationId(applicationId) overlay.getRepository(Worker).new({ id: `${toLowerFirstLetter(workingGroupName)}-${workerId}`, + runtimeId: workerId, }) }) } @@ -63,84 +64,61 @@ export async function processWorkingGroupsWorkerTerminatedOrExitedEvent({ overlay.getRepository(Worker).remove(`${toLowerFirstLetter(workingGroupName)}-${workerId}`) } -export async function processStorageWorkingGroupLeadRemarkedEvent({ +export async function processWorkingGroupsLeadRemarkedEvent({ overlay, block, event, indexInBlock, extrinsicHash, eventDecoder, -}: EventHandlerContext<'StorageWorkingGroup.LeadRemarked'>) { +}: EventHandlerContext< + 'StorageWorkingGroup.LeadRemarked' | 'DistributionWorkingGroup.LeadRemarked' +>) { const [metadataBytes] = eventDecoder.v1000.decode(event) + // Get the working group name + const [workingGroup] = event.name.split('.') + const workingGroupName = toLowerFirstLetter(workingGroup) + await applyWorkingGroupsRemark( overlay, block, indexInBlock, extrinsicHash, - 'storageWorkingGroup', - 'lead', + workingGroupName as 'storageWorkingGroup' | 'distributionWorkingGroup', + undefined, metadataBytes ) } -export async function processStorageWorkingGroupWorkerRemarkedEvent({ +export async function processWorkingGroupsWorkerRemarkedEvent({ overlay, block, event, indexInBlock, extrinsicHash, eventDecoder, -}: EventHandlerContext<'StorageWorkingGroup.WorkerRemarked'>) { - const [, metadataBytes] = eventDecoder.v1000.decode(event) +}: EventHandlerContext< + 'StorageWorkingGroup.WorkerRemarked' | 'DistributionWorkingGroup.WorkerRemarked' +>) { + const [workerId, metadataBytes] = eventDecoder.v1000.decode(event) - await applyWorkingGroupsRemark( - overlay, - block, - indexInBlock, - extrinsicHash, - 'storageWorkingGroup', - 'worker', - metadataBytes - ) -} + // Get the working group name + const [workingGroup] = event.name.split('.') + const workingGroupName = toLowerFirstLetter(workingGroup) -export async function processDistributionWorkingGroupLeadRemarkedEvent({ - overlay, - block, - event, - indexInBlock, - extrinsicHash, - eventDecoder, -}: EventHandlerContext<'DistributionWorkingGroup.LeadRemarked'>) { - const [metadataBytes] = eventDecoder.v1000.decode(event) - await applyWorkingGroupsRemark( - overlay, - block, - indexInBlock, - extrinsicHash, - 'distributionWorkingGroup', - 'lead', - metadataBytes - ) -} + // Get the worker + const worker = await overlay + .getRepository(Worker) + .getByIdOrFail(`${workingGroupName}-${workerId}`) -export async function processDistributionWorkingGroupWorkerRemarkedEvent({ - overlay, - block, - event, - indexInBlock, - extrinsicHash, - eventDecoder, -}: EventHandlerContext<'DistributionWorkingGroup.WorkerRemarked'>) { - const [, metadataBytes] = eventDecoder.v1000.decode(event) await applyWorkingGroupsRemark( overlay, block, indexInBlock, extrinsicHash, - 'distributionWorkingGroup', - 'worker', + workingGroupName as 'storageWorkingGroup' | 'distributionWorkingGroup', + worker, metadataBytes ) } @@ -151,7 +129,7 @@ async function applyWorkingGroupsRemark( indexInBlock: number, extrinsicHash: string | undefined, workingGroup: 'storageWorkingGroup' | 'distributionWorkingGroup', - actorContext: 'lead' | 'worker', + actor: Worker | undefined, metadataBytes: string ): Promise { const metadata = deserializeMetadataStr(RemarkMetadataAction, metadataBytes) @@ -163,7 +141,7 @@ async function applyWorkingGroupsRemark( indexInBlock, extrinsicHash, workingGroup, - actorContext, + actor, metadata.setNodeOperationalStatus ) } else { diff --git a/src/processor.ts b/src/processor.ts index bd7cb6e..1b51ceb 100644 --- a/src/processor.ts +++ b/src/processor.ts @@ -41,11 +41,9 @@ import { processVoucherChangedEvent, } from './mappings/storage' import { - processDistributionWorkingGroupLeadRemarkedEvent, - processDistributionWorkingGroupWorkerRemarkedEvent, - processStorageWorkingGroupLeadRemarkedEvent, - processStorageWorkingGroupWorkerRemarkedEvent, + processWorkingGroupsLeadRemarkedEvent, processWorkingGroupsOpeningFilledEvent, + processWorkingGroupsWorkerRemarkedEvent, processWorkingGroupsWorkerTerminatedOrExitedEvent, } from './mappings/workingGroups' import { events } from './types' @@ -125,14 +123,14 @@ const eventHandlers: EventHandlers = { 'StorageWorkingGroup.TerminatedWorker': processWorkingGroupsWorkerTerminatedOrExitedEvent, 'StorageWorkingGroup.TerminatedLeader': processWorkingGroupsWorkerTerminatedOrExitedEvent, 'StorageWorkingGroup.WorkerExited': processWorkingGroupsWorkerTerminatedOrExitedEvent, - 'StorageWorkingGroup.LeadRemarked': processStorageWorkingGroupLeadRemarkedEvent, - 'StorageWorkingGroup.WorkerRemarked': processStorageWorkingGroupWorkerRemarkedEvent, + 'StorageWorkingGroup.LeadRemarked': processWorkingGroupsLeadRemarkedEvent, + 'StorageWorkingGroup.WorkerRemarked': processWorkingGroupsWorkerRemarkedEvent, 'DistributionWorkingGroup.OpeningFilled': processWorkingGroupsOpeningFilledEvent, 'DistributionWorkingGroup.TerminatedWorker': processWorkingGroupsWorkerTerminatedOrExitedEvent, 'DistributionWorkingGroup.TerminatedLeader': processWorkingGroupsWorkerTerminatedOrExitedEvent, 'DistributionWorkingGroup.WorkerExited': processWorkingGroupsWorkerTerminatedOrExitedEvent, - 'DistributionWorkingGroup.LeadRemarked': processDistributionWorkingGroupLeadRemarkedEvent, - 'DistributionWorkingGroup.WorkerRemarked': processDistributionWorkingGroupWorkerRemarkedEvent, + 'DistributionWorkingGroup.LeadRemarked': processWorkingGroupsLeadRemarkedEvent, + 'DistributionWorkingGroup.WorkerRemarked': processWorkingGroupsWorkerRemarkedEvent, } const eventNames = Object.keys(eventHandlers) From 611616e7e54e3f4e90fea88c58f7371bad3a21b3 Mon Sep 17 00:00:00 2001 From: Zeeshan Akram <97m.zeeshan@gmail.com> Date: Wed, 20 Mar 2024 17:03:14 +0500 Subject: [PATCH 4/9] address requested changes --- generated/schema.graphql | 2 +- schema/events.graphql | 2 +- src/mappings/storage/metadata.ts | 12 ++++++-- src/mappings/storage/utils.ts | 52 ++++++++++++++++++-------------- 4 files changed, 42 insertions(+), 26 deletions(-) diff --git a/generated/schema.graphql b/generated/schema.graphql index 6ea51f9..b47745a 100644 --- a/generated/schema.graphql +++ b/generated/schema.graphql @@ -678,7 +678,7 @@ input DistributionBucketWhereInput { type DistributionNodeOperationalStatusSetEvent { """Distribution bucket operator""" - bucketOperator: DistributionBucketOperator + bucketOperator: DistributionBucketOperator! """Operational status that was set""" operationalStatus: NodeOperationalStatus! diff --git a/schema/events.graphql b/schema/events.graphql index 779e231..84a4ec9 100644 --- a/schema/events.graphql +++ b/schema/events.graphql @@ -56,7 +56,7 @@ type StorageNodeOperationalStatusSetEvent { type DistributionNodeOperationalStatusSetEvent { "Distribution bucket operator" - bucketOperator: DistributionBucketOperator + bucketOperator: DistributionBucketOperator! "Operational status that was set" operationalStatus: NodeOperationalStatus! diff --git a/src/mappings/storage/metadata.ts b/src/mappings/storage/metadata.ts index 3e61e7a..cd81116 100644 --- a/src/mappings/storage/metadata.ts +++ b/src/mappings/storage/metadata.ts @@ -68,7 +68,11 @@ export async function processStorageOperatorMetadata( processNodeLocationMetadata(operatorMetadata, metadataUpdate.location) } if (isSet(metadataUpdate.operationalStatus)) { - processNodeOperationalStatusMetadata('worker', undefined, metadataUpdate.operationalStatus) + processNodeOperationalStatusMetadata( + 'worker', + operatorMetadata.nodeOperationalStatus, + metadataUpdate.operationalStatus + ) } if (isSet(metadataUpdate.extra)) { operatorMetadata.extra = metadataUpdate.extra || null @@ -204,7 +208,11 @@ export async function processDistributionOperatorMetadata( processNodeLocationMetadata(operatorMetadata, metadataUpdate.location) } if (isSet(metadataUpdate.operationalStatus)) { - processNodeOperationalStatusMetadata('worker', undefined, metadataUpdate.operationalStatus) + processNodeOperationalStatusMetadata( + 'worker', + operatorMetadata.nodeOperationalStatus, + metadataUpdate.operationalStatus + ) } if (isSet(metadataUpdate.extra)) { operatorMetadata.extra = metadataUpdate.extra || null diff --git a/src/mappings/storage/utils.ts b/src/mappings/storage/utils.ts index 5855688..4616ad3 100644 --- a/src/mappings/storage/utils.ts +++ b/src/mappings/storage/utils.ts @@ -273,25 +273,29 @@ export async function processSetNodeOperationalStatusMessage( .new({ id: bucketId, storageBucketId: bucketId }) if (isSet(meta.operationalStatus)) { + const currentNodeOperationalStatusType = metadataEntity.nodeOperationalStatus?.isTypeOf + metadataEntity.nodeOperationalStatus = processNodeOperationalStatusMetadata( actor ? 'worker' : 'lead', metadataEntity.nodeOperationalStatus, meta.operationalStatus ) - } - // event processing + // event processing - const operationalStatusSetEvent = new StorageNodeOperationalStatusSetEvent({ - ...genericEventFields(overlay, block, indexInBlock, extrinsicHash), - storageBucket: storageBucket.id, - operationalStatus: metadataEntity.nodeOperationalStatus || undefined, - }) + if (currentNodeOperationalStatusType !== metadataEntity.nodeOperationalStatus?.isTypeOf) { + const operationalStatusSetEvent = new StorageNodeOperationalStatusSetEvent({ + ...genericEventFields(overlay, block, indexInBlock, extrinsicHash), + storageBucket: storageBucket.id, + operationalStatus: metadataEntity.nodeOperationalStatus || undefined, + }) - overlay.getRepository(Event).new({ - ...genericEventFields(overlay, block, indexInBlock, extrinsicHash), - data: operationalStatusSetEvent, - }) + overlay.getRepository(Event).new({ + ...genericEventFields(overlay, block, indexInBlock, extrinsicHash), + data: operationalStatusSetEvent, + }) + } + } } // Update the operational status of Distribution node @@ -324,24 +328,28 @@ export async function processSetNodeOperationalStatusMessage( }) if (isSet(meta.operationalStatus)) { + const currentNodeOperationalStatusType = metadataEntity.nodeOperationalStatus?.isTypeOf + metadataEntity.nodeOperationalStatus = processNodeOperationalStatusMetadata( actor ? 'worker' : 'lead', metadataEntity.nodeOperationalStatus, meta.operationalStatus ) - } - // event processing + // event processing - const operationalStatusSetEvent = new DistributionNodeOperationalStatusSetEvent({ - ...genericEventFields(overlay, block, indexInBlock, extrinsicHash), - bucketOperator: operator.id, - operationalStatus: metadataEntity.nodeOperationalStatus || undefined, - }) + if (currentNodeOperationalStatusType !== metadataEntity.nodeOperationalStatus?.isTypeOf) { + const operationalStatusSetEvent = new DistributionNodeOperationalStatusSetEvent({ + ...genericEventFields(overlay, block, indexInBlock, extrinsicHash), + bucketOperator: operator.id, + operationalStatus: metadataEntity.nodeOperationalStatus || undefined, + }) - overlay.getRepository(Event).new({ - ...genericEventFields(overlay, block, indexInBlock, extrinsicHash), - data: operationalStatusSetEvent, - }) + overlay.getRepository(Event).new({ + ...genericEventFields(overlay, block, indexInBlock, extrinsicHash), + data: operationalStatusSetEvent, + }) + } + } } } From f9b2706e1102740cf12ee631dcede44a8358b87e Mon Sep 17 00:00:00 2001 From: Zeeshan Akram <97m.zeeshan@gmail.com> Date: Wed, 20 Mar 2024 23:09:27 +0500 Subject: [PATCH 5/9] removed Worker entity & mappings --- generated/schema.graphql | 65 -------------------------- schema/workingGroups.graphql | 8 ---- src/mappings/storage/utils.ts | 34 ++++---------- src/mappings/workingGroups/index.ts | 71 ++--------------------------- src/processor.ts | 10 ---- typegen.json | 18 +------- 6 files changed, 13 insertions(+), 193 deletions(-) delete mode 100644 schema/workingGroups.graphql diff --git a/generated/schema.graphql b/generated/schema.graphql index b47745a..70864b3 100644 --- a/generated/schema.graphql +++ b/generated/schema.graphql @@ -1181,10 +1181,6 @@ type Query { distributionBucketFamilyById(id: String!): DistributionBucketFamily distributionBucketFamilyByUniqueInput(where: WhereIdInput!): DistributionBucketFamily @deprecated(reason: "Use distributionBucketFamilyById") distributionBucketFamiliesConnection(orderBy: [DistributionBucketFamilyOrderByInput!]!, after: String, first: Int, where: DistributionBucketFamilyWhereInput): DistributionBucketFamiliesConnection! - workers(where: WorkerWhereInput, orderBy: [WorkerOrderByInput!], offset: Int, limit: Int): [Worker!]! - workerById(id: String!): Worker - workerByUniqueInput(where: WhereIdInput!): Worker @deprecated(reason: "Use workerById") - workersConnection(orderBy: [WorkerOrderByInput!]!, after: String, first: Int, where: WorkerWhereInput): WorkersConnection! squidStatus: SquidStatus squidVersion: SquidVersion! } @@ -2149,64 +2145,3 @@ input WhereIdInput { id: String! } -type Worker { - """Worker id ({workingGroupName}-{workerId})""" - id: String! - - """WorkerId in specific working group module""" - runtimeId: BigInt! -} - -type WorkerEdge { - node: Worker! - cursor: String! -} - -enum WorkerOrderByInput { - id_ASC - id_DESC - id_ASC_NULLS_FIRST - id_DESC_NULLS_LAST - runtimeId_ASC - runtimeId_DESC - runtimeId_ASC_NULLS_FIRST - runtimeId_DESC_NULLS_LAST -} - -type WorkersConnection { - edges: [WorkerEdge!]! - pageInfo: PageInfo! - totalCount: Int! -} - -input WorkerWhereInput { - id_isNull: Boolean - id_eq: String - id_not_eq: String - id_gt: String - id_gte: String - id_lt: String - id_lte: String - id_in: [String!] - id_not_in: [String!] - id_contains: String - id_not_contains: String - id_containsInsensitive: String - id_not_containsInsensitive: String - id_startsWith: String - id_not_startsWith: String - id_endsWith: String - id_not_endsWith: String - runtimeId_isNull: Boolean - runtimeId_eq: BigInt - runtimeId_not_eq: BigInt - runtimeId_gt: BigInt - runtimeId_gte: BigInt - runtimeId_lt: BigInt - runtimeId_lte: BigInt - runtimeId_in: [BigInt!] - runtimeId_not_in: [BigInt!] - AND: [WorkerWhereInput!] - OR: [WorkerWhereInput!] -} - diff --git a/schema/workingGroups.graphql b/schema/workingGroups.graphql deleted file mode 100644 index c15776d..0000000 --- a/schema/workingGroups.graphql +++ /dev/null @@ -1,8 +0,0 @@ -# Working Groups -type Worker @entity { - "Worker id ({workingGroupName}-{workerId})" - id: ID! - - "WorkerId in specific working group module" - runtimeId: BigInt! -} diff --git a/src/mappings/storage/utils.ts b/src/mappings/storage/utils.ts index 4616ad3..67d7f35 100644 --- a/src/mappings/storage/utils.ts +++ b/src/mappings/storage/utils.ts @@ -18,7 +18,6 @@ import { StorageDataObject, StorageNodeOperationalStatusSetEvent, VideoSubtitle, - Worker, } from '../../model' import { Block } from '../../processor' import { @@ -221,29 +220,11 @@ export async function processSetNodeOperationalStatusMessage( indexInBlock: number, extrinsicHash: string | undefined, workingGroup: 'storageWorkingGroup' | 'distributionWorkingGroup', - actor: Worker | undefined, // worker who sent the message, undefined if it's a lead + runtimeWorkerId: bigint | undefined, // ID of the worker who sent the message, undefined if it's the Lead meta: ISetNodeOperationalStatus ): Promise { - let workerId: bigint const bucketId = meta.bucketId || '' - // Get the bucket's worker id - if (actor) { - workerId = actor.runtimeId - } else { - const maybeWorker = await overlay - .getRepository(Worker) - .getById(`${workingGroup}-${meta.workerId}`) - - if (!maybeWorker) { - return invalidMetadata( - SetNodeOperationalStatus, - `The worker ${meta.workerId} does not exist in the ${workingGroup} working group` - ) - } - workerId = maybeWorker.runtimeId - } - // Update the operational status of Storage node if (workingGroup === 'storageWorkingGroup') { const storageBucket = await overlay.getRepository(StorageBucket).getById(bucketId) @@ -258,10 +239,10 @@ export async function processSetNodeOperationalStatusMessage( `The storage bucket ${bucketId} is not active` ) // If the actor is a worker, check if the worker is the operator of the storage bucket - } else if (actor && storageBucket.operatorStatus.workerId !== actor.runtimeId) { + } else if (runtimeWorkerId && storageBucket.operatorStatus.workerId !== runtimeWorkerId) { return invalidMetadata( SetNodeOperationalStatus, - `The worker ${workerId} is not the operator of the storage bucket ${bucketId}` + `The worker ${runtimeWorkerId} is not the operator of the storage bucket ${bucketId}` ) } @@ -276,7 +257,7 @@ export async function processSetNodeOperationalStatusMessage( const currentNodeOperationalStatusType = metadataEntity.nodeOperationalStatus?.isTypeOf metadataEntity.nodeOperationalStatus = processNodeOperationalStatusMetadata( - actor ? 'worker' : 'lead', + runtimeWorkerId ? 'worker' : 'lead', metadataEntity.nodeOperationalStatus, meta.operationalStatus ) @@ -300,6 +281,7 @@ export async function processSetNodeOperationalStatusMessage( // Update the operational status of Distribution node if (workingGroup === 'distributionWorkingGroup') { + const workerId = runtimeWorkerId ? runtimeWorkerId.toString() : meta.workerId || '' const distributionOperatorId = `${bucketId}-${workerId}` const operator = await overlay .getRepository(DistributionBucketOperator) @@ -310,10 +292,10 @@ export async function processSetNodeOperationalStatusMessage( SetNodeOperationalStatus, `The distribution bucket operator ${distributionOperatorId} does not exist` ) - } else if (actor && operator.workerId !== actor.runtimeId) { + } else if (runtimeWorkerId && operator.workerId !== runtimeWorkerId) { return invalidMetadata( SetNodeOperationalStatus, - `The worker ${workerId} is not the operator of the distribution bucket ${bucketId}` + `The worker ${runtimeWorkerId} is not the operator of the distribution bucket ${bucketId}` ) } @@ -331,7 +313,7 @@ export async function processSetNodeOperationalStatusMessage( const currentNodeOperationalStatusType = metadataEntity.nodeOperationalStatus?.isTypeOf metadataEntity.nodeOperationalStatus = processNodeOperationalStatusMetadata( - actor ? 'worker' : 'lead', + runtimeWorkerId ? 'worker' : 'lead', metadataEntity.nodeOperationalStatus, meta.operationalStatus ) diff --git a/src/mappings/workingGroups/index.ts b/src/mappings/workingGroups/index.ts index ed8b255..c8008c7 100644 --- a/src/mappings/workingGroups/index.ts +++ b/src/mappings/workingGroups/index.ts @@ -1,69 +1,9 @@ import { RemarkMetadataAction } from '@joystream/metadata-protobuf' -import { Worker } from '../../model' import { Block, EventHandlerContext } from '../../processor' -import { criticalError } from '../../utils/misc' import { EntityManagerOverlay } from '../../utils/overlay' import { processSetNodeOperationalStatusMessage } from '../storage/utils' import { deserializeMetadataStr, invalidMetadata, toLowerFirstLetter } from '../utils' -export async function processWorkingGroupsOpeningFilledEvent({ - overlay, - event, - eventDecoder, -}: EventHandlerContext< - 'StorageWorkingGroup.OpeningFilled' | 'DistributionWorkingGroup.OpeningFilled' ->) { - const [, applicationIdToWorkerIdMap, applicationIdsSet] = eventDecoder.v1000.decode(event) - const [workingGroupName] = event.name.split('.') - - const getWorkerIdByApplicationId = (applicationId: bigint): bigint => { - const tuple = applicationIdToWorkerIdMap.find(([appId, _]) => appId === applicationId) - if (!tuple) { - criticalError( - `Worker id for application id ${applicationId} not found in applicationIdToWorkerIdMap` - ) - } - return tuple[1] - } - - applicationIdsSet.forEach((applicationId) => { - const workerId = getWorkerIdByApplicationId(applicationId) - overlay.getRepository(Worker).new({ - id: `${toLowerFirstLetter(workingGroupName)}-${workerId}`, - runtimeId: workerId, - }) - }) -} - -export async function processWorkingGroupsWorkerTerminatedOrExitedEvent({ - overlay, - event, - eventDecoder, -}: EventHandlerContext< - | 'StorageWorkingGroup.TerminatedWorker' - | 'StorageWorkingGroup.TerminatedLeader' - | 'StorageWorkingGroup.WorkerExited' - | 'DistributionWorkingGroup.TerminatedWorker' - | 'DistributionWorkingGroup.TerminatedLeader' - | 'DistributionWorkingGroup.WorkerExited' ->) { - const decoded = eventDecoder.v1000.decode(event) - - let workerId: bigint - - if (typeof decoded === 'bigint') { - workerId = decoded - } else { - workerId = decoded[0] - } - - // Get the working group name - const [workingGroupName] = event.name.split('.') - - // Remove the worker - overlay.getRepository(Worker).remove(`${toLowerFirstLetter(workingGroupName)}-${workerId}`) -} - export async function processWorkingGroupsLeadRemarkedEvent({ overlay, block, @@ -107,18 +47,13 @@ export async function processWorkingGroupsWorkerRemarkedEvent({ const [workingGroup] = event.name.split('.') const workingGroupName = toLowerFirstLetter(workingGroup) - // Get the worker - const worker = await overlay - .getRepository(Worker) - .getByIdOrFail(`${workingGroupName}-${workerId}`) - await applyWorkingGroupsRemark( overlay, block, indexInBlock, extrinsicHash, workingGroupName as 'storageWorkingGroup' | 'distributionWorkingGroup', - worker, + workerId, metadataBytes ) } @@ -129,7 +64,7 @@ async function applyWorkingGroupsRemark( indexInBlock: number, extrinsicHash: string | undefined, workingGroup: 'storageWorkingGroup' | 'distributionWorkingGroup', - actor: Worker | undefined, + workerId: bigint | undefined, metadataBytes: string ): Promise { const metadata = deserializeMetadataStr(RemarkMetadataAction, metadataBytes) @@ -141,7 +76,7 @@ async function applyWorkingGroupsRemark( indexInBlock, extrinsicHash, workingGroup, - actor, + workerId, metadata.setNodeOperationalStatus ) } else { diff --git a/src/processor.ts b/src/processor.ts index 1b51ceb..7fa215e 100644 --- a/src/processor.ts +++ b/src/processor.ts @@ -42,9 +42,7 @@ import { } from './mappings/storage' import { processWorkingGroupsLeadRemarkedEvent, - processWorkingGroupsOpeningFilledEvent, processWorkingGroupsWorkerRemarkedEvent, - processWorkingGroupsWorkerTerminatedOrExitedEvent, } from './mappings/workingGroups' import { events } from './types' import { EntityManagerOverlay } from './utils/overlay' @@ -119,16 +117,8 @@ const eventHandlers: EventHandlers = { 'Storage.DistributionBucketFamilyCreated': processDistributionBucketFamilyCreatedEvent, 'Storage.DistributionBucketFamilyMetadataSet': processDistributionBucketFamilyMetadataSetEvent, 'Storage.DistributionBucketFamilyDeleted': processDistributionBucketFamilyDeletedEvent, - 'StorageWorkingGroup.OpeningFilled': processWorkingGroupsOpeningFilledEvent, - 'StorageWorkingGroup.TerminatedWorker': processWorkingGroupsWorkerTerminatedOrExitedEvent, - 'StorageWorkingGroup.TerminatedLeader': processWorkingGroupsWorkerTerminatedOrExitedEvent, - 'StorageWorkingGroup.WorkerExited': processWorkingGroupsWorkerTerminatedOrExitedEvent, 'StorageWorkingGroup.LeadRemarked': processWorkingGroupsLeadRemarkedEvent, 'StorageWorkingGroup.WorkerRemarked': processWorkingGroupsWorkerRemarkedEvent, - 'DistributionWorkingGroup.OpeningFilled': processWorkingGroupsOpeningFilledEvent, - 'DistributionWorkingGroup.TerminatedWorker': processWorkingGroupsWorkerTerminatedOrExitedEvent, - 'DistributionWorkingGroup.TerminatedLeader': processWorkingGroupsWorkerTerminatedOrExitedEvent, - 'DistributionWorkingGroup.WorkerExited': processWorkingGroupsWorkerTerminatedOrExitedEvent, 'DistributionWorkingGroup.LeadRemarked': processWorkingGroupsLeadRemarkedEvent, 'DistributionWorkingGroup.WorkerRemarked': processWorkingGroupsWorkerRemarkedEvent, } diff --git a/typegen.json b/typegen.json index 75ecb50..0db14fd 100644 --- a/typegen.json +++ b/typegen.json @@ -41,24 +41,10 @@ ] }, "StorageWorkingGroup": { - "events": [ - "WorkerRemarked", - "LeadRemarked", - "OpeningFilled", - "TerminatedWorker", - "TerminatedLeader", - "WorkerExited" - ] + "events": ["WorkerRemarked", "LeadRemarked"] }, "DistributionWorkingGroup": { - "events": [ - "WorkerRemarked", - "LeadRemarked", - "OpeningFilled", - "TerminatedWorker", - "TerminatedLeader", - "WorkerExited" - ] + "events": ["WorkerRemarked", "LeadRemarked"] } } } From 3ed2fddacbceca0aab1c3bb3b399de56fa0778db Mon Sep 17 00:00:00 2001 From: Zeeshan Akram <97m.zeeshan@gmail.com> Date: Wed, 27 Mar 2024 06:24:22 +0500 Subject: [PATCH 6/9] update mappings based on new protobuf message types for NodeOperationalStatus --- generated/schema.graphql | 46 ++++++------- schema/storage.graphql | 6 +- src/mappings/storage/metadata.ts | 112 +++++++++++++++---------------- src/mappings/utils.ts | 12 ++++ 4 files changed, 93 insertions(+), 83 deletions(-) diff --git a/generated/schema.graphql b/generated/schema.graphql index 70864b3..6991db6 100644 --- a/generated/schema.graphql +++ b/generated/schema.graphql @@ -432,10 +432,10 @@ enum DistributionBucketOperatorMetadataOrderByInput { nodeOperationalStatus_from_DESC nodeOperationalStatus_from_ASC_NULLS_FIRST nodeOperationalStatus_from_DESC_NULLS_LAST - nodeOperationalStatus_to_ASC - nodeOperationalStatus_to_DESC - nodeOperationalStatus_to_ASC_NULLS_FIRST - nodeOperationalStatus_to_DESC_NULLS_LAST + nodeOperationalStatus_until_ASC + nodeOperationalStatus_until_DESC + nodeOperationalStatus_until_ASC_NULLS_FIRST + nodeOperationalStatus_until_DESC_NULLS_LAST nodeOperationalStatus_isTypeOf_ASC nodeOperationalStatus_isTypeOf_DESC nodeOperationalStatus_isTypeOf_ASC_NULLS_FIRST @@ -1014,7 +1014,7 @@ input NodeLocationMetadataWhereInput { coordinates: GeoCoordinatesWhereInput } -union NodeOperationalStatus = NodeOperationalStatusNormal | NodeOperationalStatusNoService | NodeOperationalStatusNoServiceFrom | NodeOperationalStatusNoServiceDuring +union NodeOperationalStatus = NodeOperationalStatusNormal | NodeOperationalStatusNoService | NodeOperationalStatusNoServiceFrom | NodeOperationalStatusNoServiceUntil type NodeOperationalStatusNormal { """Reason why node was set to this state""" @@ -1032,7 +1032,7 @@ type NodeOperationalStatusNoService { rationale: String } -type NodeOperationalStatusNoServiceDuring { +type NodeOperationalStatusNoServiceFrom { """ Whether the state was set by lead (true) or by the operator (false), it is meant to prevent worker from unilaterally reversing. @@ -1042,14 +1042,11 @@ type NodeOperationalStatusNoServiceDuring { """The time from which the bucket would have to no service""" from: DateTime! - """The time until which the bucket would have to no service""" - to: DateTime! - """Reason why node was set to this state""" rationale: String } -type NodeOperationalStatusNoServiceFrom { +type NodeOperationalStatusNoServiceUntil { """ Whether the state was set by lead (true) or by the operator (false), it is meant to prevent worker from unilaterally reversing. @@ -1059,6 +1056,9 @@ type NodeOperationalStatusNoServiceFrom { """The time from which the bucket would have to no service""" from: DateTime! + """The time until which the bucket would have to no service""" + until: DateTime! + """Reason why node was set to this state""" rationale: String } @@ -1093,15 +1093,15 @@ input NodeOperationalStatusWhereInput { from_lte: DateTime from_in: [DateTime!] from_not_in: [DateTime!] - to_isNull: Boolean - to_eq: DateTime - to_not_eq: DateTime - to_gt: DateTime - to_gte: DateTime - to_lt: DateTime - to_lte: DateTime - to_in: [DateTime!] - to_not_in: [DateTime!] + until_isNull: Boolean + until_eq: DateTime + until_not_eq: DateTime + until_gt: DateTime + until_gte: DateTime + until_lt: DateTime + until_lte: DateTime + until_in: [DateTime!] + until_not_in: [DateTime!] isTypeOf_isNull: Boolean isTypeOf_eq: String isTypeOf_not_eq: String @@ -1593,10 +1593,10 @@ enum StorageBucketOperatorMetadataOrderByInput { nodeOperationalStatus_from_DESC nodeOperationalStatus_from_ASC_NULLS_FIRST nodeOperationalStatus_from_DESC_NULLS_LAST - nodeOperationalStatus_to_ASC - nodeOperationalStatus_to_DESC - nodeOperationalStatus_to_ASC_NULLS_FIRST - nodeOperationalStatus_to_DESC_NULLS_LAST + nodeOperationalStatus_until_ASC + nodeOperationalStatus_until_DESC + nodeOperationalStatus_until_ASC_NULLS_FIRST + nodeOperationalStatus_until_DESC_NULLS_LAST nodeOperationalStatus_isTypeOf_ASC nodeOperationalStatus_isTypeOf_DESC nodeOperationalStatus_isTypeOf_ASC_NULLS_FIRST diff --git a/schema/storage.graphql b/schema/storage.graphql index 4bcf152..64dad14 100644 --- a/schema/storage.graphql +++ b/schema/storage.graphql @@ -86,7 +86,7 @@ type NodeOperationalStatusNoServiceFrom { rationale: String } -type NodeOperationalStatusNoServiceDuring { +type NodeOperationalStatusNoServiceUntil { "Whether the state was set by lead (true) or by the operator (false), it is meant to prevent worker from unilaterally reversing." forced: Boolean! @@ -94,7 +94,7 @@ type NodeOperationalStatusNoServiceDuring { from: DateTime! "The time until which the bucket would have to no service" - to: DateTime! + until: DateTime! "Reason why node was set to this state" rationale: String @@ -104,7 +104,7 @@ union NodeOperationalStatus = NodeOperationalStatusNormal | NodeOperationalStatusNoService | NodeOperationalStatusNoServiceFrom - | NodeOperationalStatusNoServiceDuring + | NodeOperationalStatusNoServiceUntil type StorageBucketOperatorMetadata @entity { id: ID! diff --git a/src/mappings/storage/metadata.ts b/src/mappings/storage/metadata.ts index cd81116..ddeba17 100644 --- a/src/mappings/storage/metadata.ts +++ b/src/mappings/storage/metadata.ts @@ -4,9 +4,10 @@ import { IDistributionBucketOperatorMetadata, IGeographicalArea, INodeLocationMetadata, - INodeOperationalStatusMetadata, + INodeOperationalStatus, IStorageBucketOperatorMetadata, - NodeOperationalStatusMetadata, + NodeOperationalStatusNoServiceFrom as NodeOperationalStatusNoServiceFromMetadata, + NodeOperationalStatusNoServiceUntil as NodeOperationalStatusNoServiceUntilMetadata, } from '@joystream/metadata-protobuf' import { DecodedMetadataObject } from '@joystream/metadata-protobuf/types' import { @@ -29,13 +30,13 @@ import { NodeLocationMetadata, NodeOperationalStatus, NodeOperationalStatusNoService, - NodeOperationalStatusNoServiceDuring, NodeOperationalStatusNoServiceFrom, + NodeOperationalStatusNoServiceUntil, NodeOperationalStatusNormal, StorageBucketOperatorMetadata, } from '../../model' import { EntityManagerOverlay, Flat } from '../../utils/overlay' -import { invalidMetadata } from '../utils' +import { invalidMetadata, parseDateStr } from '../utils' export const protobufContinentToGraphlContinent: { [key in GeographicalAreaProto.Continent]: Continent @@ -68,7 +69,7 @@ export async function processStorageOperatorMetadata( processNodeLocationMetadata(operatorMetadata, metadataUpdate.location) } if (isSet(metadataUpdate.operationalStatus)) { - processNodeOperationalStatusMetadata( + operatorMetadata.nodeOperationalStatus = processNodeOperationalStatusMetadata( 'worker', operatorMetadata.nodeOperationalStatus, metadataUpdate.operationalStatus @@ -120,13 +121,13 @@ function processNodeLocationMetadata( export function processNodeOperationalStatusMetadata( actorContext: 'lead' | 'worker', currentStatus: NodeOperationalStatus | null | undefined, - meta: INodeOperationalStatusMetadata -): NodeOperationalStatus { + meta: INodeOperationalStatus +): NodeOperationalStatus | null | undefined { const isCurrentStatusForced = currentStatus && (currentStatus instanceof NodeOperationalStatusNoService || currentStatus instanceof NodeOperationalStatusNoServiceFrom || - currentStatus instanceof NodeOperationalStatusNoServiceDuring) && + currentStatus instanceof NodeOperationalStatusNoServiceUntil) && currentStatus.forced // if current state is forced by lead, then prevent worker from unilaterally reversing. @@ -134,59 +135,56 @@ export function processNodeOperationalStatusMetadata( return currentStatus } - // Validate date formats - let validatedNoServiceFrom: Date | undefined - let validatedNoServiceTo: Date | undefined - - try { - if (meta.noServiceFrom) { - new Date(meta.noServiceFrom).toISOString() - validatedNoServiceFrom = new Date(meta.noServiceFrom) - } - } catch (error) { - invalidMetadata(NodeOperationalStatusMetadata, `Invalid date format for "noServiceFrom"`, { - decodedMessage: meta, - }) + // For status type Normal + if (meta.normal) { + const status = new NodeOperationalStatusNormal() + status.rationale = meta.normal.rationale + return status } - - try { - if (meta.noServiceTo) { - new Date(meta.noServiceTo).toISOString() - validatedNoServiceTo = new Date(meta.noServiceTo) - } - } catch (error) { - invalidMetadata(NodeOperationalStatusMetadata, `Invalid date format for "noServiceTo"`, { - decodedMessage: meta, - }) + // For status type NoService + else if (meta.noService) { + const status = new NodeOperationalStatusNoService() + status.rationale = meta.noService.rationale + status.forced = actorContext === 'lead' + return status } + // For status type NoServiceFrom + else if (meta.noServiceFrom) { + const from = parseDateStr(meta.noServiceFrom.from) + if (!from) { + invalidMetadata( + NodeOperationalStatusNoServiceFromMetadata, + `Invalid date format for "noServiceFrom"`, + { decodedMessage: meta.noServiceFrom } + ) + return currentStatus + } - // set node state to NoService - if (meta.status === NodeOperationalStatusMetadata.OperationalStatus.NO_SERVICE) { - if (validatedNoServiceFrom && validatedNoServiceTo) { - const status = new NodeOperationalStatusNoServiceDuring() - status.rationale = meta.rationale - status.forced = actorContext === 'lead' - status.from = validatedNoServiceFrom - status.to = validatedNoServiceTo - return status - } else if (validatedNoServiceFrom && !validatedNoServiceTo) { - const status = new NodeOperationalStatusNoServiceFrom() - status.rationale = meta.rationale - status.forced = actorContext === 'lead' - status.from = validatedNoServiceFrom - return status - } else if (!validatedNoServiceFrom && !validatedNoServiceTo) { - const status = new NodeOperationalStatusNoService() - status.rationale = meta.rationale - status.forced = actorContext === 'lead' - return status + const status = new NodeOperationalStatusNoServiceFrom() + status.rationale = meta.noServiceFrom.rationale + status.forced = actorContext === 'lead' + status.from = from + return status + } + // For status type NoServiceUntil + else if (meta.noServiceUntil) { + const from = meta.noServiceUntil.from ? parseDateStr(meta.noServiceUntil.from) : new Date() + const until = parseDateStr(meta.noServiceUntil.until) + if (!from || !until) { + invalidMetadata( + NodeOperationalStatusNoServiceUntilMetadata, + `Invalid date format for "noServiceUntil"`, + { decodedMessage: meta.noServiceUntil } + ) + return currentStatus } + const status = new NodeOperationalStatusNoServiceUntil() + status.rationale = meta.noServiceUntil.rationale + status.forced = actorContext === 'lead' + status.from = from + status.until = until + return status } - - // Default operational status of the node - const status = new NodeOperationalStatusNormal() - status.rationale = meta.rationale - return status } export async function processDistributionOperatorMetadata( @@ -208,7 +206,7 @@ export async function processDistributionOperatorMetadata( processNodeLocationMetadata(operatorMetadata, metadataUpdate.location) } if (isSet(metadataUpdate.operationalStatus)) { - processNodeOperationalStatusMetadata( + operatorMetadata.nodeOperationalStatus = processNodeOperationalStatusMetadata( 'worker', operatorMetadata.nodeOperationalStatus, metadataUpdate.operationalStatus diff --git a/src/mappings/utils.ts b/src/mappings/utils.ts index 5e12382..3435df3 100644 --- a/src/mappings/utils.ts +++ b/src/mappings/utils.ts @@ -109,3 +109,15 @@ export function toLowerFirstLetter(str: string) { if (!str) return '' // Return an empty string if str is falsy return str.charAt(0).toLowerCase() + str.slice(1) } + +export function parseDateStr(date: string): Date | undefined { + try { + if (date) { + const dateObj = new Date(date) + dateObj.toISOString() // Throws an error if the date is invalid + return dateObj + } + } catch (error) { + console.error(`Invalid date format:`, date) + } +} From 692abf5c5c0a634cef4bb79bf1a114d5b9d41466 Mon Sep 17 00:00:00 2001 From: Zeeshan Akram <97m.zeeshan@gmail.com> Date: Wed, 27 Mar 2024 08:09:36 +0500 Subject: [PATCH 7/9] improve dates validation for setting nodes operational status --- src/mappings/storage/metadata.ts | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/src/mappings/storage/metadata.ts b/src/mappings/storage/metadata.ts index ddeba17..dd349eb 100644 --- a/src/mappings/storage/metadata.ts +++ b/src/mappings/storage/metadata.ts @@ -151,10 +151,12 @@ export function processNodeOperationalStatusMetadata( // For status type NoServiceFrom else if (meta.noServiceFrom) { const from = parseDateStr(meta.noServiceFrom.from) - if (!from) { + + // Date must be in the future + if (!from || from < new Date()) { invalidMetadata( NodeOperationalStatusNoServiceFromMetadata, - `Invalid date format for "noServiceFrom"`, + `Invalid date for "noServiceFrom"`, { decodedMessage: meta.noServiceFrom } ) return currentStatus @@ -170,10 +172,12 @@ export function processNodeOperationalStatusMetadata( else if (meta.noServiceUntil) { const from = meta.noServiceUntil.from ? parseDateStr(meta.noServiceUntil.from) : new Date() const until = parseDateStr(meta.noServiceUntil.until) - if (!from || !until) { + + // Dates must be in the future and "until" must be after "from" + if (!from || !until || from < new Date() || from > until) { invalidMetadata( NodeOperationalStatusNoServiceUntilMetadata, - `Invalid date format for "noServiceUntil"`, + `Invalid date/s for "noServiceUntil"`, { decodedMessage: meta.noServiceUntil } ) return currentStatus From 0f0ac7da1a8aaf55d0f123d5069c30477a208fd3 Mon Sep 17 00:00:00 2001 From: Zeeshan Akram <97m.zeeshan@gmail.com> Date: Mon, 15 Apr 2024 21:52:14 +0500 Subject: [PATCH 8/9] update '@joystream/js' dependency --- package-lock.json | 34 +++++++++++++++++----------------- package.json | 2 +- 2 files changed, 18 insertions(+), 18 deletions(-) diff --git a/package-lock.json b/package-lock.json index 3d53d48..77e4efa 100644 --- a/package-lock.json +++ b/package-lock.json @@ -9,7 +9,7 @@ "version": "1.5.0", "hasInstallScript": true, "dependencies": { - "@joystream/js": "^1.9.0", + "@joystream/js": "^1.12.0", "@opentelemetry/api": "^1.4.1", "@opentelemetry/auto-instrumentations-node": "0.37.0", "@opentelemetry/core": "1.13.0", @@ -3361,13 +3361,13 @@ "integrity": "sha512-CtzORUwWTTOTqfVtHaKRJ0I1kNQd1bpn3sUh8I3nJDVY+5/M/Oe1DnEWzPQvqq/xPIIkzzzIP7mfCoAjFRvDhg==" }, "node_modules/@joystream/js": { - "version": "1.9.0", - "resolved": "https://registry.npmjs.org/@joystream/js/-/js-1.9.0.tgz", - "integrity": "sha512-C0vSKID6BHvfwEKqvfRWkls1aVSE18i/OqzasVywPS8GRG2Kp5H7F6Yp/K5tpqbA3mZXbR2zNvxTmtHUO/V1LA==", + "version": "1.12.0", + "resolved": "https://registry.npmjs.org/@joystream/js/-/js-1.12.0.tgz", + "integrity": "sha512-aRDP1PEcQDU5lNaxHt501/P5LKfgB8VfD4dsjdaDYruls8wQFIg6/EEctjn0bT4HxY14EKK27hrQX00Nu1Ip/A==", "dependencies": { - "@joystream/metadata-protobuf": "^2.14.0", - "@joystream/types": "^2.0.0", - "@polkadot/util-crypto": "9.5.1", + "@joystream/metadata-protobuf": "^2.16.0", + "@joystream/types": "^4.4.0", + "@polkadot/util-crypto": "^12.6.2", "axios": "^1.2.1", "buffer": "^6.0.3", "lodash": "^4.17.21", @@ -3386,9 +3386,9 @@ "integrity": "sha512-GKSNGeNAtw8IryjjkhZxuKB3JzlcLTwjtiQCHKvqQet81I93kXslhDQruGI/QsddO83mcDToBVy7GqGS/zYf/A==" }, "node_modules/@joystream/metadata-protobuf": { - "version": "2.14.0", - "resolved": "https://registry.npmjs.org/@joystream/metadata-protobuf/-/metadata-protobuf-2.14.0.tgz", - "integrity": "sha512-1/FgUW6mGt3t+p0k7/KdKnRQPU0HghY4MJTSkip1SCTMICZWoDuSADJuM8nlh2Qdu9kqvodqtndKjz057b0t3g==", + "version": "2.16.0", + "resolved": "https://registry.npmjs.org/@joystream/metadata-protobuf/-/metadata-protobuf-2.16.0.tgz", + "integrity": "sha512-I5xg0Ko4fWPWFAQlA/QocAd7KXKdGxD4bIWc1dWxrKzxhlCe4zk6/iujGY2pH+2Nhtr5GjcKSmNoZ0/Cih9PXw==", "dependencies": { "@types/iso-3166-2": "^1.0.0", "@types/long": "^4.0.1", @@ -19220,11 +19220,11 @@ "integrity": "sha512-CtzORUwWTTOTqfVtHaKRJ0I1kNQd1bpn3sUh8I3nJDVY+5/M/Oe1DnEWzPQvqq/xPIIkzzzIP7mfCoAjFRvDhg==" }, "@joystream/js": { - "version": "1.9.0", - "resolved": "https://registry.npmjs.org/@joystream/js/-/js-1.9.0.tgz", - "integrity": "sha512-C0vSKID6BHvfwEKqvfRWkls1aVSE18i/OqzasVywPS8GRG2Kp5H7F6Yp/K5tpqbA3mZXbR2zNvxTmtHUO/V1LA==", + "version": "1.12.0", + "resolved": "https://registry.npmjs.org/@joystream/js/-/js-1.12.0.tgz", + "integrity": "sha512-aRDP1PEcQDU5lNaxHt501/P5LKfgB8VfD4dsjdaDYruls8wQFIg6/EEctjn0bT4HxY14EKK27hrQX00Nu1Ip/A==", "requires": { - "@joystream/metadata-protobuf": "^2.14.0", + "@joystream/metadata-protobuf": "^2.16.0", "@joystream/types": "0.20.5", "@polkadot/util-crypto": "9.5.1", "axios": "^1.2.1", @@ -19243,9 +19243,9 @@ } }, "@joystream/metadata-protobuf": { - "version": "2.14.0", - "resolved": "https://registry.npmjs.org/@joystream/metadata-protobuf/-/metadata-protobuf-2.14.0.tgz", - "integrity": "sha512-1/FgUW6mGt3t+p0k7/KdKnRQPU0HghY4MJTSkip1SCTMICZWoDuSADJuM8nlh2Qdu9kqvodqtndKjz057b0t3g==", + "version": "2.16.0", + "resolved": "https://registry.npmjs.org/@joystream/metadata-protobuf/-/metadata-protobuf-2.16.0.tgz", + "integrity": "sha512-I5xg0Ko4fWPWFAQlA/QocAd7KXKdGxD4bIWc1dWxrKzxhlCe4zk6/iujGY2pH+2Nhtr5GjcKSmNoZ0/Cih9PXw==", "requires": { "@types/iso-3166-2": "^1.0.0", "@types/long": "^4.0.1", diff --git a/package.json b/package.json index 1c15b89..7889cad 100644 --- a/package.json +++ b/package.json @@ -26,7 +26,7 @@ "@joystream/types": "0.20.5" }, "dependencies": { - "@joystream/js": "^1.9.0", + "@joystream/js": "^1.12.0", "@opentelemetry/api": "^1.4.1", "@opentelemetry/auto-instrumentations-node": "0.37.0", "@opentelemetry/core": "1.13.0", From 9985bcbb7fc7b41a05da476048bd26108af66a02 Mon Sep 17 00:00:00 2001 From: Zeeshan Akram <97m.zeeshan@gmail.com> Date: Mon, 15 Apr 2024 22:00:57 +0500 Subject: [PATCH 9/9] regenerate db migrations --- .../{1709195273064-Data.js => 1713200416940-Data.js} | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) rename db/migrations/{1709195273064-Data.js => 1713200416940-Data.js} (97%) diff --git a/db/migrations/1709195273064-Data.js b/db/migrations/1713200416940-Data.js similarity index 97% rename from db/migrations/1709195273064-Data.js rename to db/migrations/1713200416940-Data.js index 1042554..078aec6 100644 --- a/db/migrations/1709195273064-Data.js +++ b/db/migrations/1713200416940-Data.js @@ -1,5 +1,5 @@ -module.exports = class Data1709195273064 { - name = 'Data1709195273064' +module.exports = class Data1713200416940 { + name = 'Data1713200416940' async up(db) { await db.query(`CREATE TABLE "next_entity_id" ("entity_name" character varying NOT NULL, "next_id" bigint NOT NULL, CONSTRAINT "PK_09a3b40db622a65096e7344d7ae" PRIMARY KEY ("entity_name"))`) @@ -29,7 +29,6 @@ module.exports = class Data1709195273064 { await db.query(`CREATE INDEX "IDX_5510d3b244a63d6ec702faa426" ON "distribution_bucket_family_metadata" ("region") `) await db.query(`CREATE TABLE "distribution_bucket_operator_metadata" ("id" character varying NOT NULL, "distirbution_bucket_operator_id" character varying NOT NULL, "node_endpoint" text, "node_location" jsonb, "node_operational_status" jsonb, "extra" text, CONSTRAINT "DistributionBucketOperatorMetadata_distirbutionBucketOperator" UNIQUE ("distirbution_bucket_operator_id") DEFERRABLE INITIALLY DEFERRED, CONSTRAINT "REL_69ec9bdc975b95f7dff94a7106" UNIQUE ("distirbution_bucket_operator_id"), CONSTRAINT "PK_9bbecaa12f30e3826922688274f" PRIMARY KEY ("id"))`) await db.query(`CREATE INDEX "IDX_69ec9bdc975b95f7dff94a7106" ON "distribution_bucket_operator_metadata" ("distirbution_bucket_operator_id") `) - await db.query(`CREATE TABLE "worker" ("id" character varying NOT NULL, "runtime_id" numeric NOT NULL, CONSTRAINT "PK_dc8175fa0e34ce7a39e4ec73b94" PRIMARY KEY ("id"))`) await db.query(`ALTER TABLE "storage_bucket_bag" ADD CONSTRAINT "FK_791e2f82e3919ffcef8712aa1b9" FOREIGN KEY ("storage_bucket_id") REFERENCES "storage_bucket"("id") ON DELETE NO ACTION ON UPDATE NO ACTION DEFERRABLE INITIALLY DEFERRED`) await db.query(`ALTER TABLE "storage_bucket_bag" ADD CONSTRAINT "FK_aaf00b2c7d0cba49f97da14fbba" FOREIGN KEY ("bag_id") REFERENCES "storage_bag"("id") ON DELETE NO ACTION ON UPDATE NO ACTION DEFERRABLE INITIALLY DEFERRED`) await db.query(`ALTER TABLE "distribution_bucket_operator" ADD CONSTRAINT "FK_678dc5427cdde0cd4fef2c07a43" FOREIGN KEY ("distribution_bucket_id") REFERENCES "distribution_bucket"("id") ON DELETE NO ACTION ON UPDATE NO ACTION DEFERRABLE INITIALLY DEFERRED`) @@ -71,7 +70,6 @@ module.exports = class Data1709195273064 { await db.query(`DROP INDEX "public"."IDX_5510d3b244a63d6ec702faa426"`) await db.query(`DROP TABLE "distribution_bucket_operator_metadata"`) await db.query(`DROP INDEX "public"."IDX_69ec9bdc975b95f7dff94a7106"`) - await db.query(`DROP TABLE "worker"`) await db.query(`ALTER TABLE "storage_bucket_bag" DROP CONSTRAINT "FK_791e2f82e3919ffcef8712aa1b9"`) await db.query(`ALTER TABLE "storage_bucket_bag" DROP CONSTRAINT "FK_aaf00b2c7d0cba49f97da14fbba"`) await db.query(`ALTER TABLE "distribution_bucket_operator" DROP CONSTRAINT "FK_678dc5427cdde0cd4fef2c07a43"`)