diff --git a/package-set.dhall b/package-set.dhall index dd6cae0..4f2fbcb 100644 --- a/package-set.dhall +++ b/package-set.dhall @@ -1,8 +1,10 @@ [ - { name = "base", repo = "https://github.com/dfinity/motoko-base.git", version = "c174fc4", dependencies = []: List Text }, + { name = "base", repo = "https://github.com/dfinity/motoko-base.git", version = "moc-0.7.0", dependencies = []: List Text }, { name = "candy_0_1_9", repo = "https://github.com/aramakme/candy_library.git", version = "v0.1.9", dependencies = ["base"] }, { name = "candy", repo = "https://github.com/aramakme/candy_library.git", version = "v0.1.9", dependencies = ["base"] }, { name = "map_4_0_0", repo = "https://github.com/ZhenyaUsenko/motoko-hash-map.git", version = "v4.0.0", dependencies = ["base"] }, { name = "map_8_0_0_alpha_5", repo = "https://github.com/ZhenyaUsenko/motoko-hash-map.git", version = "v8.0.0-alpha.5", dependencies = ["base"] }, { name = "map", repo = "https://github.com/ZhenyaUsenko/motoko-hash-map.git", version = "v8.0.0-alpha.5", dependencies = ["base"] }, + { name = "candy_utils_0_2_0", repo = "https://github.com/ZhenyaUsenko/motoko-candy-utils.git", version = "v0.2.0", dependencies = ["base"] }, + { name = "candy_utils", repo = "https://github.com/ZhenyaUsenko/motoko-candy-utils.git", version = "v0.2.0", dependencies = ["base"] }, ] diff --git a/src/main.mo b/src/main.mo index ea45f1f..679ee8f 100644 --- a/src/main.mo +++ b/src/main.mo @@ -1,12 +1,12 @@ import Admin "./modules/admin"; import Broadcast "./modules/broadcast"; import Candy "mo:candy/types"; -import Cascade "./modules/cascade"; import Debug "mo:base/Debug"; import MigrationTypes "./migrations/types"; import Migrations "./migrations"; import Prim "mo:prim"; import Publish "./modules/publish"; +import Stats "./modules/stats"; import Subscribe "./modules/subscribe"; shared (deployer) actor class EventSystem() { @@ -16,9 +16,9 @@ shared (deployer) actor class EventSystem() { stable var migrationState: MigrationTypes.State = #v0_0_0(#data); - migrationState := Migrations.migrate(migrationState, #v0_2_0(#id), {}); + migrationState := Migrations.migrate(migrationState, #v0_3_0(#id), {}); - let state = switch (migrationState) { case (#v0_2_0(#data(state))) state; case (_) Debug.trap("Unexpected migration state") }; + let state = switch (migrationState) { case (#v0_3_0(#data(state))) state; case (_) Debug.trap("Unexpected migration state") }; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -26,12 +26,12 @@ shared (deployer) actor class EventSystem() { let SubscribeModule = Subscribe.init(state, deployer.caller); + let StatsModule = Stats.init(state, deployer.caller); + let BroadcastModule = Broadcast.init(state, deployer.caller); let AdminModule = Admin.init(state, deployer.caller); - let CascadeModule = Cascade.init(state, deployer.caller); - //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// public shared (context) func registerPublication(eventName: Text, options: Publish.PublicationOptions): async () { @@ -48,6 +48,10 @@ shared (deployer) actor class EventSystem() { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + public shared (context) func registerSubscriber(caller: Principal, options: Subscribe.SubscriberOptions): async () { + ignore SubscribeModule.registerSubscriber(context.caller, options); + }; + public shared (context) func subscribe(eventName: Text, options: Subscribe.SubscriptionOptions): async () { ignore SubscribeModule.subscribe(context.caller, eventName, options); }; @@ -60,26 +64,32 @@ shared (deployer) actor class EventSystem() { SubscribeModule.requestMissedEvents(context.caller, eventName, options); }; + public shared (context) func confirmListener(subscriberId: Principal, allow: Bool) { + SubscribeModule.confirmListener(context.caller, subscriberId, allow); + }; + public shared (context) func confirmEventProcessed(eventId: Nat): async () { SubscribeModule.confirmEventProcessed(context.caller, eventId); }; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - public query (context) func fetchSubscribers(params: Admin.FetchSubscribersParams): async Admin.FetchSubscribersResponse { - AdminModule.fetchSubscribers(context.caller, params); + public shared (context) func getPublicationStats(options: Stats.StatsOptions): async Stats.Stats { + StatsModule.getPublicationStats(context.caller, options); }; - public query (context) func fetchEvents(params: Admin.FetchEventsParams): async Admin.FetchEventsResponse { - AdminModule.fetchEvents(context.caller, params); + public shared (context) func getSubscriptionStats(options: Stats.StatsOptions): async Stats.Stats { + StatsModule.getSubscriptionStats(context.caller, options); }; - public shared (context) func removeSubscribers(subscriberIds: [Principal]): async () { - AdminModule.removeSubscribers(context.caller, subscriberIds); + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + + public query (context) func fetchSubscribers(params: Admin.FetchSubscribersParams): async Admin.FetchSubscribersResponse { + AdminModule.fetchSubscribers(context.caller, params); }; - public shared (context) func removeEvents(eventIds: [Nat]): async () { - AdminModule.removeEvents(context.caller, eventIds); + public query (context) func fetchEvents(params: Admin.FetchEventsParams): async Admin.FetchEventsResponse { + AdminModule.fetchEvents(context.caller, params); }; public query (context) func getAdmins(): async [Principal] { diff --git a/src/migrations/00-02-00-publisher-entities/lib.mo b/src/migrations/00-02-00-publisher-entities/lib.mo index 9688d9e..cf4d426 100644 --- a/src/migrations/00-02-00-publisher-entities/lib.mo +++ b/src/migrations/00-02-00-publisher-entities/lib.mo @@ -42,13 +42,13 @@ module { subscriptions = Set.fromIter(PrevSet.keys(subscriber.subscriptions), Map.thash); })); - let subscriptions = Map.new>(Map.thash); + let subscriptions = Map.new(Map.thash); for (subscriber in PrevMap.vals(state.subscribers)) for (eventName in PrevSet.keys(subscriber.subscriptions)) { let subscriberId = subscriber.canisterId; - let subscriptionGroup = Map.update>(subscriptions, Map.thash, eventName, func(key, value) { - return Option.get>(value, Map.new(Map.phash)); + let subscriptionGroup = Map.update(subscriptions, Map.thash, eventName, func(key, value) { + return Option.get(value, Map.new(Map.phash)); }); ignore Map.update(subscriptionGroup, Map.phash, subscriberId, func(key, value) = Option.get(value, { diff --git a/src/migrations/00-02-00-publisher-entities/types.mo b/src/migrations/00-02-00-publisher-entities/types.mo index b0236d8..bfd47cb 100644 --- a/src/migrations/00-02-00-publisher-entities/types.mo +++ b/src/migrations/00-02-00-publisher-entities/types.mo @@ -3,47 +3,47 @@ import Map "mo:map_8_0_0_alpha_5/Map"; import Set "mo:map_8_0_0_alpha_5/Set"; module { - public type Subscriber = { + public type Publisher = { id: Principal; createdAt: Nat64; - var activeSubscriptions: Nat8; - subscriptions: Set.Set; + var activePublications: Nat8; + publications: Set.Set; }; - public type Subscription = { + public type Publication = { eventName: Text; - subscriberId: Principal; + publisherId: Principal; createdAt: Nat64; - var skip: Nat8; - var skipped: Nat8; var active: Bool; - var stopped: Bool; var numberOfEvents: Nat64; var numberOfNotifications: Nat64; var numberOfResendNotifications: Nat64; var numberOfRequestedNotifications: Nat64; var numberOfConfirmations: Nat64; - events: Set.Set; + whitelist: Set.Set; }; - public type Publisher = { + public type Subscriber = { id: Principal; createdAt: Nat64; - var activePublications: Nat8; - publications: Set.Set; + var activeSubscriptions: Nat8; + subscriptions: Set.Set; }; - public type Publication = { + public type Subscription = { eventName: Text; - publisherId: Principal; + subscriberId: Principal; createdAt: Nat64; + var skip: Nat8; + var skipped: Nat8; var active: Bool; + var stopped: Bool; var numberOfEvents: Nat64; var numberOfNotifications: Nat64; var numberOfResendNotifications: Nat64; var numberOfRequestedNotifications: Nat64; var numberOfConfirmations: Nat64; - whitelist: Set.Set; + events: Set.Set; }; public type Event = { @@ -58,6 +58,10 @@ module { subscribers: Map.Map; }; + public type PublicationGroup = Map.Map; + + public type SubscriptionGroup = Map.Map; + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// public type State = { @@ -66,9 +70,9 @@ module { var nextBroadcastTime: Nat64; admins: Set.Set; publishers: Map.Map; - publications: Map.Map>; + publications: Map.Map; subscribers: Map.Map; - subscriptions: Map.Map>; + subscriptions: Map.Map; events: Map.Map; }; }; diff --git a/src/migrations/00-03-00-multiple-listeners/lib.mo b/src/migrations/00-03-00-multiple-listeners/lib.mo new file mode 100644 index 0000000..a0e3604 --- /dev/null +++ b/src/migrations/00-03-00-multiple-listeners/lib.mo @@ -0,0 +1,100 @@ +import Debug "mo:base/Debug"; +import Iter "mo:base/Iter"; +import Map "mo:map_8_0_0_alpha_5/Map"; +import Option "mo:base/Option"; +import Set "mo:map_8_0_0_alpha_5/Set"; +import MigrationTypes "../types"; +import PrevTypes "../00-02-00-publisher-entities/types"; +import Prim "mo:prim"; +import Types "./types"; + +module { + public func upgrade(migrationState: MigrationTypes.State, args: MigrationTypes.Args): MigrationTypes.State { + let state = switch (migrationState) { case (#v0_2_0(#data(state))) state; case (_) Debug.trap("Unexpected migration state") }; + + let subscribers = Map.map(state.subscribers, func(key, subscriber) = { + id = subscriber.id; + createdAt = subscriber.createdAt; + var activeSubscriptions = subscriber.activeSubscriptions; + listeners = Set.fromIter([subscriber.id].vals(), Map.phash); + confirmedListeners = Set.fromIter([subscriber.id].vals(), Map.phash); + subscriptions = subscriber.subscriptions; + }); + + let subscriptions = Map.map(state.subscriptions, func(key, subscriptionGroup) { + return Map.map(subscriptionGroup, func(key, subscription) = { + eventName = subscription.eventName; + subscriberId = subscription.subscriberId; + createdAt = subscription.createdAt; + var skip = subscription.skip; + var skipped = subscription.skipped; + var active = subscription.active; + var stopped = subscription.stopped; + var filter = null; + var filterPath = null; + var numberOfEvents = subscription.numberOfEvents; + var numberOfNotifications = subscription.numberOfNotifications; + var numberOfResendNotifications = subscription.numberOfResendNotifications; + var numberOfRequestedNotifications = subscription.numberOfRequestedNotifications; + var numberOfConfirmations = subscription.numberOfConfirmations; + events = subscription.events; + }) + }); + + return #v0_3_0(#data({ + var eventId = state.eventId; + var broadcastActive = state.broadcastActive; + var nextBroadcastTime = state.nextBroadcastTime; + admins = state.admins; + publishers = state.publishers; + publications = state.publications; + subscribers = subscribers; + subscriptions = subscriptions; + confirmedListeners = Map.new(Map.phash); + events = state.events; + })); + }; + + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + + public func downgrade(migrationState: MigrationTypes.State, args: MigrationTypes.Args): MigrationTypes.State { + let state = switch (migrationState) { case (#v0_3_0(#data(state))) state; case (_) Debug.trap("Unexpected migration state") }; + + let subscribers = Map.map(state.subscribers, func((key, subscriber)) = { + id = subscriber.id; + createdAt = subscriber.createdAt; + var activeSubscriptions = subscriber.activeSubscriptions; + subscriptions = subscriber.subscriptions; + }); + + let subscriptions = Map.map(state.subscriptions, func(key, subscriptionGroup) { + return Map.map(subscriptionGroup, func(key, subscription) = { + eventName = subscription.eventName; + subscriberId = subscription.subscriberId; + createdAt = subscription.createdAt; + var skip = subscription.skip; + var skipped = subscription.skipped; + var active = subscription.active; + var stopped = subscription.stopped; + var numberOfEvents = subscription.numberOfEvents; + var numberOfNotifications = subscription.numberOfNotifications; + var numberOfResendNotifications = subscription.numberOfResendNotifications; + var numberOfRequestedNotifications = subscription.numberOfRequestedNotifications; + var numberOfConfirmations = subscription.numberOfConfirmations; + events = subscription.events; + }) + }); + + return #v0_2_0(#data({ + var eventId = state.eventId; + var broadcastActive = state.broadcastActive; + var nextBroadcastTime = state.nextBroadcastTime; + admins = state.admins; + publishers = state.publishers; + publications = state.publications; + subscribers = subscribers; + subscriptions = subscriptions; + events = state.events; + })); + }; +}; diff --git a/src/migrations/00-03-00-multiple-listeners/types.mo b/src/migrations/00-03-00-multiple-listeners/types.mo new file mode 100644 index 0000000..088ecee --- /dev/null +++ b/src/migrations/00-03-00-multiple-listeners/types.mo @@ -0,0 +1,84 @@ +import Candy "mo:candy_0_1_9/types"; +import CandyUtils "mo:candy_utils_0_2_0/CandyUtils"; +import Map "mo:map_8_0_0_alpha_5/Map"; +import Set "mo:map_8_0_0_alpha_5/Set"; + +module { + public type Publisher = { + id: Principal; + createdAt: Nat64; + var activePublications: Nat8; + publications: Set.Set; + }; + + public type Publication = { + eventName: Text; + publisherId: Principal; + createdAt: Nat64; + var active: Bool; + var numberOfEvents: Nat64; + var numberOfNotifications: Nat64; + var numberOfResendNotifications: Nat64; + var numberOfRequestedNotifications: Nat64; + var numberOfConfirmations: Nat64; + whitelist: Set.Set; + }; + + public type Subscriber = { + id: Principal; + createdAt: Nat64; + var activeSubscriptions: Nat8; + listeners: Set.Set; + confirmedListeners: Set.Set; + subscriptions: Set.Set; + }; + + public type Subscription = { + eventName: Text; + subscriberId: Principal; + createdAt: Nat64; + var skip: Nat8; + var skipped: Nat8; + var active: Bool; + var stopped: Bool; + var filter: ?Text; + var filterPath: ?CandyUtils.Path; + var numberOfEvents: Nat64; + var numberOfNotifications: Nat64; + var numberOfResendNotifications: Nat64; + var numberOfRequestedNotifications: Nat64; + var numberOfConfirmations: Nat64; + events: Set.Set; + }; + + public type Event = { + id: Nat; + eventName: Text; + publisherId: Principal; + payload: Candy.CandyValue; + createdAt: Nat64; + var nextBroadcastTime: Nat64; + var numberOfAttempts: Nat8; + resendRequests: Set.Set; + subscribers: Map.Map; + }; + + public type PublicationGroup = Map.Map; + + public type SubscriptionGroup = Map.Map; + + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + + public type State = { + var eventId: Nat; + var broadcastActive: Bool; + var nextBroadcastTime: Nat64; + admins: Set.Set; + publishers: Map.Map; + publications: Map.Map; + subscribers: Map.Map; + subscriptions: Map.Map; + confirmedListeners: Map.Map; + events: Map.Map; + }; +}; diff --git a/src/migrations/lib.mo b/src/migrations/lib.mo index 14cf2a1..35ceb40 100644 --- a/src/migrations/lib.mo +++ b/src/migrations/lib.mo @@ -1,16 +1,19 @@ import V0_1_0 "./00-01-00-initial"; import V0_2_0 "./00-02-00-publisher-entities"; +import V0_3_0 "./00-03-00-multiple-listeners"; import MigrationTypes "./types"; module { let upgrades = [ V0_1_0.upgrade, V0_2_0.upgrade, + V0_3_0.upgrade, ]; let downgrades = [ V0_1_0.downgrade, V0_2_0.downgrade, + V0_3_0.downgrade, ]; func getMigrationId(state: MigrationTypes.State): Nat { @@ -18,6 +21,7 @@ module { case (#v0_0_0(_)) 0; case (#v0_1_0(_)) 1; case (#v0_2_0(_)) 2; + case (#v0_3_0(_)) 3; }; }; diff --git a/src/migrations/types.mo b/src/migrations/types.mo index 0a4fe4a..697703b 100644 --- a/src/migrations/types.mo +++ b/src/migrations/types.mo @@ -1,8 +1,9 @@ import V0_1_0 "./00-01-00-initial/types"; import V0_2_0 "./00-02-00-publisher-entities/types"; +import V0_3_0 "./00-03-00-multiple-listeners/types"; module { - public let Current = V0_2_0; + public let Current = V0_3_0; public type Args = {}; @@ -10,5 +11,6 @@ module { #v0_0_0: { #id; #data: () }; #v0_1_0: { #id; #data: V0_1_0.State }; #v0_2_0: { #id; #data: V0_2_0.State }; + #v0_3_0: { #id; #data: V0_3_0.State }; }; }; \ No newline at end of file diff --git a/src/modules/admin.mo b/src/modules/admin.mo index eb7f993..0079d3c 100644 --- a/src/modules/admin.mo +++ b/src/modules/admin.mo @@ -1,5 +1,4 @@ import Array "mo:base/Array"; -import Cascade "./cascade"; import Candy "mo:candy/types"; import Debug "mo:base/Debug"; import Map "mo:map/Map"; @@ -75,14 +74,10 @@ module { public func init(state: State.State, deployer: Principal): { fetchSubscribers: (caller: Principal, params: FetchSubscribersParams) -> FetchSubscribersResponse; fetchEvents: (caller: Principal, params: FetchEventsParams) -> FetchEventsResponse; - removeSubscribers: (caller: Principal, subscriberIds: [Principal]) -> (); - removeEvents: (caller: Principal, eventIds: [Nat]) -> (); getAdmins: (caller: Principal) -> [Principal]; addAdmin: (caller: Principal, principalId: Principal) -> (); removeAdmin: (caller: Principal, principalId: Principal) -> (); } = object { - let { removeEventCascade; removeSubscriberCascade } = Cascade.init(state, deployer); - let { admins; subscribers; events } = state; ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -157,20 +152,6 @@ module { ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - public func removeSubscribers(caller: Principal, subscriberIds: [Principal]) { - if (not isAdmin(caller)) Debug.trap("Not authorized"); - - for (subscriberId in subscriberIds.vals()) removeSubscriberCascade(subscriberId); - }; - - public func removeEvents(caller: Principal, eventIds: [Nat]) { - if (not isAdmin(caller)) Debug.trap("Not authorized"); - - for (eventId in eventIds.vals()) removeEventCascade(eventId); - }; - - ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - public func getAdmins(caller: Principal): [Principal] { if (not isAdmin(caller)) Debug.trap("Not authorized"); diff --git a/src/modules/broadcast.mo b/src/modules/broadcast.mo index 552e6b1..b402869 100644 --- a/src/modules/broadcast.mo +++ b/src/modules/broadcast.mo @@ -1,11 +1,10 @@ -import Cascade "./cascade"; +import Candy "mo:candy/types"; import Const "./const"; import Map "mo:map/Map"; import MigrationTypes "../migrations/types"; import Prim "mo:prim"; import Principal "mo:base/Principal"; import Set "mo:map/Set"; -import Subscribe "./subscribe"; import Utils "../utils/misc"; module { @@ -19,11 +18,15 @@ module { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + public type ListenerActor = actor { + handleEvent: (eventId: Nat, publisherId: Principal, eventName: Text, payload: Candy.CandyValue) -> (); + }; + + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + public func init(state: State.State, deployer: Principal): { broadcast: () -> async (); } = object { - let { removeEventCascade } = Cascade.init(state, deployer); - let { publications; subscribers; subscriptions; events } = state; ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -36,24 +39,25 @@ module { var iterActive = true; while (iterActive) await async label broadcastBatch { - for ((subscriberId, numberOfAttempts) in subscribersIter) if (numberOfAttempts <= event.numberOfAttempts) ignore do ?{ - let subscriptionGroup = Map.get(subscriptions, thash, event.eventName)!; - let subscription = Map.get(subscriptionGroup, phash, subscriberId)!; + for ((subscriberId, numberOfAttempts) in subscribersIter) if (numberOfAttempts <= event.numberOfAttempts) { + ignore do ?{ + let subscriptionGroup = Map.get(subscriptions, thash, event.eventName)!; + let subscription = Map.get(subscriptionGroup, phash, subscriberId)!; - if (subscription.active and not subscription.stopped) { - let subscriberActor: Subscribe.SubscriberActor = actor(Principal.toText(subscriberId)); + if (subscription.active and not subscription.stopped) { + let subscriber = Map.get(subscribers, phash, subscriberId)!; + let listenerId = Set.popFront(subscriber.confirmedListeners)!; + let listenerActor = actor(Principal.toText(listenerId)):ListenerActor; - subscriberActor.handleEvent(event.id, event.publisherId, event.eventName, event.payload); + Set.add(subscriber.confirmedListeners, phash, listenerId); + Set.delete(event.resendRequests, phash, subscriberId); - Map.set(event.subscribers, phash, subscriberId, numberOfAttempts +% 1); - Set.delete(event.resendRequests, phash, subscriberId); + listenerActor.handleEvent(event.id, event.publisherId, event.eventName, event.payload); - notificationsCount +%= 1; - subscription.numberOfNotifications +%= 1; + subscription.numberOfNotifications +%= 1; - if (numberOfAttempts > 0) subscription.numberOfResendNotifications +%= 1; + if (numberOfAttempts > 0) subscription.numberOfResendNotifications +%= 1; - ignore do ?{ let publicationGroup = Map.get(publications, thash, event.eventName)!; let publication = Map.get(publicationGroup, phash, event.publisherId)!; @@ -61,9 +65,13 @@ module { if (numberOfAttempts > 0) publication.numberOfResendNotifications +%= 1; }; - - if (notificationsCount % Const.BROADCAST_BATCH_SIZE == 0) break broadcastBatch; }; + + Map.set(event.subscribers, phash, subscriberId, numberOfAttempts +% 1); + + notificationsCount +%= 1; + + if (notificationsCount % Const.BROADCAST_BATCH_SIZE == 0) break broadcastBatch; }; iterActive := false; @@ -72,7 +80,14 @@ module { event.numberOfAttempts +%= 1; event.nextBroadcastTime := eventBroadcastStartTime + Const.RESEND_DELAY * 2 ** nat8ToNat64(event.numberOfAttempts -% 1); } else { - removeEventCascade(event.id); + Map.delete(events, nhash, event.id); + + for (subscriberId in Map.keys(event.subscribers)) ignore do ?{ + let subscriptionGroup = Map.get(subscriptions, thash, event.eventName)!; + let subscription = Map.get(subscriptionGroup, phash, subscriberId)!; + + Set.delete(subscription.events, nhash, event.id); + }; }; }; @@ -85,23 +100,20 @@ module { while (iterActive) await async label broadcastBatch { for (subscriberId in subscribersIter) { - let subscriberActor: Subscribe.SubscriberActor = actor(Principal.toText(subscriberId)); - - subscriberActor.handleEvent(event.id, event.publisherId, event.eventName, event.payload); - - Set.delete(event.resendRequests, phash, subscriberId); - - notificationsCount +%= 1; - ignore do ?{ let subscriptionGroup = Map.get(subscriptions, thash, event.eventName)!; let subscription = Map.get(subscriptionGroup, phash, subscriberId)!; + let subscriber = Map.get(subscribers, phash, subscriberId)!; + let listenerId = Set.popFront(subscriber.confirmedListeners)!; + let listenerActor = actor(Principal.toText(listenerId)):ListenerActor; + + Set.add(subscriber.confirmedListeners, phash, listenerId); + + listenerActor.handleEvent(event.id, event.publisherId, event.eventName, event.payload); subscription.numberOfNotifications +%= 1; subscription.numberOfRequestedNotifications +%= 1; - }; - ignore do ?{ let publicationGroup = Map.get(publications, thash, event.eventName)!; let publication = Map.get(publicationGroup, phash, event.publisherId)!; @@ -109,6 +121,10 @@ module { publication.numberOfRequestedNotifications +%= 1; }; + Set.delete(event.resendRequests, phash, subscriberId); + + notificationsCount +%= 1; + if (notificationsCount % Const.BROADCAST_BATCH_SIZE == 0) break broadcastBatch; }; diff --git a/src/modules/cascade.mo b/src/modules/cascade.mo deleted file mode 100644 index ecde3fb..0000000 --- a/src/modules/cascade.mo +++ /dev/null @@ -1,57 +0,0 @@ -import Map "mo:map/Map"; -import MigrationTypes "../migrations/types"; -import Set "mo:map/Set"; - -module { - let State = MigrationTypes.Current; - - let { nhash; thash; phash; lhash } = Map; - - //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - - public func init(state: State.State, deployer: Principal): { - removeEventCascade: (eventId: Nat) -> (); - removeSubscriberCascade: (subscriberId: Principal) -> (); - } = object { - let { subscribers; subscriptions; events } = state; - - ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - - public func removeEventCascade(eventId: Nat) { - ignore do ?{ - let event = Map.remove(events, nhash, eventId)!; - - for (subscriberId in Map.keys(event.subscribers)) ignore do ?{ - let subscriptionGroup = Map.get(subscriptions, thash, event.eventName)!; - let subscription = Map.get(subscriptionGroup, phash, subscriberId)!; - - Set.delete(subscription.events, nhash, eventId); - }; - }; - }; - - ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - - public func removeSubscriberCascade(subscriberId: Principal) { - ignore do ?{ - let subscriber = Map.remove(subscribers, phash, subscriberId)!; - - for (eventName in Set.keys(subscriber.subscriptions)) ignore do ?{ - let subscriptionGroup = Map.get(subscriptions, thash, eventName)!; - let subscription = Map.remove(subscriptionGroup, phash, subscriberId)!; - - if (Map.size(subscriptionGroup) == 0) Map.delete(subscriptions, thash, eventName); - - for (eventId in Set.keys(subscription.events)) ignore do ?{ - let event = Map.get(events, nhash, eventId)!; - - Set.delete(event.resendRequests, phash, subscriberId); - Map.delete(event.subscribers, phash, subscriberId); - - if (Map.size(event.subscribers) == 0) removeEventCascade(eventId); - }; - }; - }; - }; - }; -}; diff --git a/src/modules/const.mo b/src/modules/const.mo index 8667825..9bf4724 100644 --- a/src/modules/const.mo +++ b/src/modules/const.mo @@ -18,4 +18,6 @@ module { public let PUBLICATIONS_LIMIT = 500; public let WHITELIST_LIMIT = 500; + + public let LISTENERS_LIMIT = 100; }; diff --git a/src/modules/publish.mo b/src/modules/publish.mo index 264fc6c..0be50f8 100644 --- a/src/modules/publish.mo +++ b/src/modules/publish.mo @@ -1,4 +1,5 @@ import Candy "mo:candy/types"; +import CandyUtils "mo:candy_utils/CandyUtils"; import Const "./const"; import Debug "mo:base/Debug"; import Map "mo:map/Map"; @@ -13,6 +14,8 @@ module { let { get = coalesce } = Option; + let { get } = CandyUtils; + let { nhash; thash; phash; lhash } = Map; let { time } = Prim; @@ -59,8 +62,8 @@ module { if (Set.size(publisher.publications) > Const.PUBLICATIONS_LIMIT) Debug.trap("Publications limit reached"); - let publicationGroup = Map.update>(publications, thash, eventName, func(key, value) { - return coalesce>(value, Map.new(phash)); + let publicationGroup = Map.update(publications, thash, eventName, func(key, value) { + return coalesce(value, Map.new(phash)); }); let publication = Map.update(publicationGroup, phash, caller, func(key, value) = coalesce(value, { @@ -85,10 +88,10 @@ module { for (option in options.vals()) switch (option) { case (#whitelist(principalIds)) { - Set.clear(publication.whitelist); - if (principalIds.size() > Const.WHITELIST_LIMIT) Debug.trap("Whitelist option length limit reached"); + Set.clear(publication.whitelist); + for (principalId in principalIds.vals()) Set.add(publication.whitelist, phash, principalId); }; @@ -146,22 +149,31 @@ module { let publication = registerPublication(caller, eventName, []); let subscriptionGroup = Map.get(subscriptions, thash, eventName)!; let eventSubscribers = Map.new(phash); - let subscriberIds = if (Set.size(publication.whitelist) > 0) Set.keys(publication.whitelist) else Map.keys(subscriptionGroup); + let subscriberIdsIter = if (Set.size(publication.whitelist) > 0) Set.keys(publication.whitelist) else Map.keys(subscriptionGroup); publication.numberOfEvents +%= 1; - for (subscriberId in subscriberIds) ignore do ?{ + for (subscriberId in subscriberIdsIter) ignore do ?{ let subscription = Map.get(subscriptionGroup, phash, subscriberId)!; - if (subscription.active) if (subscription.skipped >= subscription.skip) { - subscription.skipped := 0; + if (subscription.active) { + let filterPassed = switch (subscription.filterPath) { + case (?filterPath) switch (get(payload, filterPath)) { case (#Bool(bool)) bool; case (_) true }; + case (_) true; + }; + + if (filterPassed) { + if (subscription.skipped >= subscription.skip) { + subscription.skipped := 0; - Map.set(eventSubscribers, phash, subscriberId, 0:Nat8); - Set.add(subscription.events, nhash, state.eventId); + Map.set(eventSubscribers, phash, subscriberId, 0:Nat8); + Set.add(subscription.events, nhash, state.eventId); - subscription.numberOfEvents +%= 1; - } else { - subscription.skipped +%= 1; + subscription.numberOfEvents +%= 1; + } else { + subscription.skipped +%= 1; + }; + }; }; }; diff --git a/src/modules/stats.mo b/src/modules/stats.mo new file mode 100644 index 0000000..7ca265f --- /dev/null +++ b/src/modules/stats.mo @@ -0,0 +1,127 @@ +import Debug "mo:base/Debug"; +import Map "mo:map/Map"; +import MigrationTypes "../migrations/types"; +import Option "mo:base/Option"; +import Set "mo:map/Set"; + +module { + let State = MigrationTypes.Current; + + let { isNull } = Option; + + let { nhash; thash; phash; lhash } = Map; + + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + + let StatsOptionsSize = 2; + + public type StatsOptions = [{ + #active: Bool; + #eventNames: [Text]; + }]; + + public type Stats = { + numberOfEvents: Nat64; + numberOfNotifications: Nat64; + numberOfResendNotifications: Nat64; + numberOfRequestedNotifications: Nat64; + numberOfConfirmations: Nat64; + }; + + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + + public func init(state: State.State, deployer: Principal): { + getPublicationStats: (caller: Principal, options: StatsOptions) -> Stats; + getSubscriptionStats: (caller: Principal, options: StatsOptions) -> Stats; + } = object { + let { publishers; publications; subscribers; subscriptions } = state; + + ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + + public func getPublicationStats(caller: Principal, options: StatsOptions): Stats { + if (options.size() > StatsOptionsSize) Debug.trap("Invalid number of options"); + + var numberOfEvents = 0:Nat64; + var numberOfNotifications = 0:Nat64; + var numberOfResendNotifications = 0:Nat64; + var numberOfRequestedNotifications = 0:Nat64; + var numberOfConfirmations = 0:Nat64; + + ignore do ?{ + let publisher = Map.get(publishers, phash, caller)!; + var eventNamesIter = Set.keys(publisher.publications); + var activeFilter = null:?Bool; + + for (option in options.vals()) switch (option) { + case (#active(active)) activeFilter := ?active; + case (#eventNames(eventNames)) eventNamesIter := eventNames.vals(); + }; + + for (eventName in eventNamesIter) ignore do ?{ + let publicationGroup = Map.get(publications, thash, eventName)!; + let publication = Map.get(publicationGroup, phash, caller)!; + + if (isNull(activeFilter) or publication.active == activeFilter!) { + numberOfEvents +%= publication.numberOfEvents; + numberOfNotifications +%= publication.numberOfNotifications; + numberOfResendNotifications +%= publication.numberOfResendNotifications; + numberOfRequestedNotifications +%= publication.numberOfRequestedNotifications; + numberOfConfirmations +%= publication.numberOfConfirmations; + }; + }; + }; + + return { + numberOfEvents; + numberOfNotifications; + numberOfResendNotifications; + numberOfRequestedNotifications; + numberOfConfirmations; + }; + }; + + ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + + public func getSubscriptionStats(caller: Principal, options: StatsOptions): Stats { + if (options.size() > StatsOptionsSize) Debug.trap("Invalid number of options"); + + var numberOfEvents = 0:Nat64; + var numberOfNotifications = 0:Nat64; + var numberOfResendNotifications = 0:Nat64; + var numberOfRequestedNotifications = 0:Nat64; + var numberOfConfirmations = 0:Nat64; + + ignore do ?{ + let subscriber = Map.get(subscribers, phash, caller)!; + var eventNamesIter = Set.keys(subscriber.subscriptions); + var activeFilter = null:?Bool; + + for (option in options.vals()) switch (option) { + case (#active(active)) activeFilter := ?active; + case (#eventNames(eventNames)) eventNamesIter := eventNames.vals(); + }; + + for (eventName in eventNamesIter) ignore do ?{ + let subscriptionGroup = Map.get(subscriptions, thash, eventName)!; + let subscription = Map.get(subscriptionGroup, phash, caller)!; + + if (isNull(activeFilter) or subscription.active == activeFilter!) { + numberOfEvents +%= subscription.numberOfEvents; + numberOfNotifications +%= subscription.numberOfNotifications; + numberOfResendNotifications +%= subscription.numberOfResendNotifications; + numberOfRequestedNotifications +%= subscription.numberOfRequestedNotifications; + numberOfConfirmations +%= subscription.numberOfConfirmations; + }; + }; + }; + + return { + numberOfEvents; + numberOfNotifications; + numberOfResendNotifications; + numberOfRequestedNotifications; + numberOfConfirmations; + }; + }; + }; +}; diff --git a/src/modules/subscribe.mo b/src/modules/subscribe.mo index af9dfa0..6e6dd21 100644 --- a/src/modules/subscribe.mo +++ b/src/modules/subscribe.mo @@ -1,5 +1,4 @@ -import Candy "mo:candy/types"; -import Cascade "./cascade"; +import CandyUtils "mo:candy_utils/CandyUtils"; import Const "./const"; import Debug "mo:base/Debug"; import Map "mo:map/Map"; @@ -13,7 +12,9 @@ import State "../migrations/00-01-00-initial/types"; module { let State = MigrationTypes.Current; - let { get = coalesce } = Option; + let { isNull; get = coalesce } = Option; + + let { path } = CandyUtils; let { nhash; thash; phash; lhash } = Map; @@ -21,17 +22,20 @@ module { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - public type SubscriberActor = actor { - handleEvent: (eventId: Nat, publisherId: Principal, eventName: Text, payload: Candy.CandyValue) -> (); - }; + let SubscriberOptionsSize = 6; - //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + public type SubscriberOptions = [{ + #listeners: [Principal]; + #listenersAdd: [Principal]; + #listenersRemove: [Principal]; + }]; - let SubscriptionOptionsSize = 2; + let SubscriptionOptionsSize = 6; public type SubscriptionOptions = [{ #stopped: Bool; #skip: Nat8; + #filter: ?Text; }]; let UnsubscribeOptionsSize = 1; @@ -50,34 +54,86 @@ module { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// public func init(state: State.State, deployer: Principal): { + registerSubscriber: (caller: Principal, options: SubscriberOptions) -> State.Subscriber; subscribe: (caller: Principal, eventName: Text, options: SubscriptionOptions) -> State.Subscription; unsubscribe: (caller: Principal, eventName: Text, options: UnsubscribeOptions) -> (); requestMissedEvents: (caller: Principal, eventName: Text, options: MissedEventOptions) -> (); + confirmListener: (caller: Principal, subscriberId: Principal, allow: Bool) -> (); confirmEventProcessed: (caller: Principal, eventId: Nat) -> (); } = object { - let { removeEventCascade } = Cascade.init(state, deployer); - - let { publications; subscribers; subscriptions; events } = state; + let { publications; subscribers; subscriptions; confirmedListeners; events } = state; ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - public func subscribe(caller: Principal, eventName: Text, options: SubscriptionOptions): State.Subscription { - if (eventName.size() > Const.EVENT_NAME_LENGTH_LIMIT) Debug.trap("Event name length limit reached"); - if (options.size() > SubscriptionOptionsSize) Debug.trap("Invalid number of options"); + public func registerSubscriber(caller: Principal, options: SubscriberOptions): State.Subscriber { + if (options.size() > SubscriberOptionsSize) Debug.trap("Invalid number of options"); let subscriber = Map.update(subscribers, phash, caller, func(key, value) = coalesce(value, { id = caller; createdAt = time(); var activeSubscriptions = 0:Nat8; + listeners = Set.fromIter([caller].vals(), phash); + confirmedListeners = Set.fromIter([caller].vals(), phash); subscriptions = Set.new(thash); })); + for (option in options.vals()) switch (option) { + case (#listeners(principalIds)) { + if (principalIds.size() > Const.LISTENERS_LIMIT) Debug.trap("Listeners option length limit reached"); + + Set.clear(subscriber.listeners); + Set.clear(subscriber.confirmedListeners); + + for (principalId in principalIds.vals()) ignore do ?{ + Set.add(subscriber.listeners, phash, principalId); + + if (principalId == caller or Map.get(confirmedListeners, phash, principalId)! == caller) { + Set.add(subscriber.confirmedListeners, phash, principalId); + }; + }; + }; + + case (#listenersAdd(principalIds)) { + if (principalIds.size() > Const.LISTENERS_LIMIT) Debug.trap("ListenersAdd option length limit reached"); + + for (principalId in principalIds.vals()) ignore do ?{ + Set.add(subscriber.listeners, phash, principalId); + + if (principalId == caller or Map.get(confirmedListeners, phash, principalId)! == caller) { + Set.add(subscriber.confirmedListeners, phash, principalId); + }; + }; + + if (Set.size(subscriber.listeners) > Const.LISTENERS_LIMIT) Debug.trap("Listeners length limit reached"); + }; + + case (#listenersRemove(principalIds)) { + if (principalIds.size() > Const.LISTENERS_LIMIT) Debug.trap("ListenersRemove option length limit reached"); + + for (principalId in principalIds.vals()) { + Set.delete(subscriber.listeners, phash, principalId); + Set.delete(subscriber.confirmedListeners, phash, principalId); + }; + }; + }; + + return subscriber; + }; + + ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + + public func subscribe(caller: Principal, eventName: Text, options: SubscriptionOptions): State.Subscription { + if (eventName.size() > Const.EVENT_NAME_LENGTH_LIMIT) Debug.trap("Event name length limit reached"); + if (options.size() > SubscriptionOptionsSize) Debug.trap("Invalid number of options"); + + let subscriber = registerSubscriber(caller, []); + Set.add(subscriber.subscriptions, thash, eventName); if (Set.size(subscriber.subscriptions) > Const.SUBSCRIPTIONS_LIMIT) Debug.trap("Subscriptions limit reached"); - let subscriptionGroup = Map.update>(subscriptions, thash, eventName, func(key, value) { - return coalesce>(value, Map.new(phash)); + let subscriptionGroup = Map.update(subscriptions, thash, eventName, func(key, value) { + return coalesce(value, Map.new(phash)); }); let subscription = Map.update(subscriptionGroup, phash, caller, func(key, value) = coalesce(value, { @@ -88,6 +144,8 @@ module { var skipped = 0:Nat8; var active = false; var stopped = false; + var filter = null:?Text; + var filterPath = null:?CandyUtils.Path; var numberOfEvents = 0:Nat64; var numberOfNotifications = 0:Nat64; var numberOfResendNotifications = 0:Nat64; @@ -105,7 +163,13 @@ module { for (option in options.vals()) switch (option) { case (#stopped(stopped)) subscription.stopped := stopped; + case (#skip(skip)) subscription.skip := skip; + + case (#filter(filter)) { + subscription.filter := filter; + subscription.filterPath := do ?{ path(filter!) }; + }; }; return subscription; @@ -132,6 +196,15 @@ module { Set.delete(subscriber.subscriptions, thash, eventName); Map.delete(subscriptionGroup, phash, caller); + for (eventId in Set.keys(subscription.events)) ignore do ?{ + let event = Map.get(events, nhash, eventId)!; + + Set.delete(event.resendRequests, phash, caller); + Map.delete(event.subscribers, phash, caller); + + if (Map.size(event.subscribers) == 0) Map.delete(events, nhash, eventId); + }; + if (Map.size(subscriptionGroup) == 0) Map.delete(subscriptions, thash, eventName); }; }; @@ -169,13 +242,49 @@ module { ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + public func confirmListener(caller: Principal, subscriberId: Principal, allow: Bool) { + if (caller == subscriberId) Debug.trap("Can not confirm self as listener"); + + if (allow) { + let prevSubscriberId = Map.put(confirmedListeners, phash, caller, subscriberId); + + ignore do ?{ + if (subscriberId != prevSubscriberId!) { + let prevSubscriber = Map.get(subscribers, phash, prevSubscriberId!)!; + + Set.delete(prevSubscriber.confirmedListeners, phash, caller); + }; + }; + + ignore do ?{ + if (isNull(prevSubscriberId) or subscriberId != prevSubscriberId!) { + let subscriber = Map.get(subscribers, phash, subscriberId)!; + + if (Set.has(subscriber.listeners, phash, caller)) Set.add(subscriber.confirmedListeners, phash, caller); + }; + }; + } else { + ignore do ?{ + if (Map.get(confirmedListeners, phash, caller)! == subscriberId) { + Map.delete(confirmedListeners, phash, caller); + + let subscriber = Map.get(subscribers, phash, subscriberId)!; + + Set.delete(subscriber.confirmedListeners, phash, caller); + }; + }; + }; + }; + + ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + public func confirmEventProcessed(caller: Principal, eventId: Nat) { ignore do ?{ let event = Map.get(events, nhash, eventId)!; ignore Map.remove(event.subscribers, phash, caller)!; - if (Map.size(event.subscribers) == 0) removeEventCascade(eventId); + Set.delete(event.resendRequests, phash, caller); ignore do ?{ let publicationGroup = Map.get(publications, thash, event.eventName)!; @@ -189,7 +298,11 @@ module { let subscription = Map.get(subscriptionGroup, phash, caller)!; subscription.numberOfConfirmations +%= 1; + + Set.delete(subscription.events, nhash, eventId); }; + + if (Map.size(event.subscribers) == 0) Map.delete(events, nhash, eventId); }; }; }; diff --git a/vessel.dhall b/vessel.dhall index e1d53f4..cd524f7 100644 --- a/vessel.dhall +++ b/vessel.dhall @@ -1,4 +1,4 @@ { - dependencies = ["base", "candy_0_1_9", "candy", "map_4_0_0", "map_8_0_0_alpha_5", "map"], + dependencies = ["base", "candy_0_1_9", "candy", "map_4_0_0", "map_8_0_0_alpha_5", "map", "candy_utils_0_2_0", "candy_utils"], compiler = None Text, }