Skip to content

Commit

Permalink
added ability to filter incoming events for subscription, added abili…
Browse files Browse the repository at this point in the history
…ty to have multiple listener canisters
  • Loading branch information
ZhenyaUsenko committed Oct 17, 2022
1 parent e0e839d commit 32138af
Show file tree
Hide file tree
Showing 16 changed files with 571 additions and 171 deletions.
4 changes: 3 additions & 1 deletion package-set.dhall
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
[
{ name = "base", repo = "https://github.com/dfinity/motoko-base.git", version = "c174fc4", dependencies = []: List Text },
{ name = "base", repo = "https://github.com/dfinity/motoko-base.git", version = "moc-0.7.0", dependencies = []: List Text },
{ name = "candy_0_1_9", repo = "https://github.com/aramakme/candy_library.git", version = "v0.1.9", dependencies = ["base"] },
{ name = "candy", repo = "https://github.com/aramakme/candy_library.git", version = "v0.1.9", dependencies = ["base"] },
{ name = "map_4_0_0", repo = "https://github.com/ZhenyaUsenko/motoko-hash-map.git", version = "v4.0.0", dependencies = ["base"] },
{ name = "map_8_0_0_alpha_5", repo = "https://github.com/ZhenyaUsenko/motoko-hash-map.git", version = "v8.0.0-alpha.5", dependencies = ["base"] },
{ name = "map", repo = "https://github.com/ZhenyaUsenko/motoko-hash-map.git", version = "v8.0.0-alpha.5", dependencies = ["base"] },
{ name = "candy_utils_0_2_0", repo = "https://github.com/ZhenyaUsenko/motoko-candy-utils.git", version = "v0.2.0", dependencies = ["base"] },
{ name = "candy_utils", repo = "https://github.com/ZhenyaUsenko/motoko-candy-utils.git", version = "v0.2.0", dependencies = ["base"] },
]
36 changes: 23 additions & 13 deletions src/main.mo
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import Admin "./modules/admin";
import Broadcast "./modules/broadcast";
import Candy "mo:candy/types";
import Cascade "./modules/cascade";
import Debug "mo:base/Debug";
import MigrationTypes "./migrations/types";
import Migrations "./migrations";
import Prim "mo:prim";
import Publish "./modules/publish";
import Stats "./modules/stats";
import Subscribe "./modules/subscribe";

shared (deployer) actor class EventSystem() {
Expand All @@ -16,22 +16,22 @@ shared (deployer) actor class EventSystem() {

stable var migrationState: MigrationTypes.State = #v0_0_0(#data);

migrationState := Migrations.migrate(migrationState, #v0_2_0(#id), {});
migrationState := Migrations.migrate(migrationState, #v0_3_0(#id), {});

let state = switch (migrationState) { case (#v0_2_0(#data(state))) state; case (_) Debug.trap("Unexpected migration state") };
let state = switch (migrationState) { case (#v0_3_0(#data(state))) state; case (_) Debug.trap("Unexpected migration state") };

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

let PublishModule = Publish.init(state, deployer.caller);

let SubscribeModule = Subscribe.init(state, deployer.caller);

let StatsModule = Stats.init(state, deployer.caller);

let BroadcastModule = Broadcast.init(state, deployer.caller);

let AdminModule = Admin.init(state, deployer.caller);

let CascadeModule = Cascade.init(state, deployer.caller);

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

public shared (context) func registerPublication(eventName: Text, options: Publish.PublicationOptions): async () {
Expand All @@ -48,6 +48,10 @@ shared (deployer) actor class EventSystem() {

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

public shared (context) func registerSubscriber(caller: Principal, options: Subscribe.SubscriberOptions): async () {
ignore SubscribeModule.registerSubscriber(context.caller, options);
};

public shared (context) func subscribe(eventName: Text, options: Subscribe.SubscriptionOptions): async () {
ignore SubscribeModule.subscribe(context.caller, eventName, options);
};
Expand All @@ -60,26 +64,32 @@ shared (deployer) actor class EventSystem() {
SubscribeModule.requestMissedEvents(context.caller, eventName, options);
};

public shared (context) func confirmListener(subscriberId: Principal, allow: Bool) {
SubscribeModule.confirmListener(context.caller, subscriberId, allow);
};

public shared (context) func confirmEventProcessed(eventId: Nat): async () {
SubscribeModule.confirmEventProcessed(context.caller, eventId);
};

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

public query (context) func fetchSubscribers(params: Admin.FetchSubscribersParams): async Admin.FetchSubscribersResponse {
AdminModule.fetchSubscribers(context.caller, params);
public shared (context) func getPublicationStats(options: Stats.StatsOptions): async Stats.Stats {
StatsModule.getPublicationStats(context.caller, options);
};

public query (context) func fetchEvents(params: Admin.FetchEventsParams): async Admin.FetchEventsResponse {
AdminModule.fetchEvents(context.caller, params);
public shared (context) func getSubscriptionStats(options: Stats.StatsOptions): async Stats.Stats {
StatsModule.getSubscriptionStats(context.caller, options);
};

public shared (context) func removeSubscribers(subscriberIds: [Principal]): async () {
AdminModule.removeSubscribers(context.caller, subscriberIds);
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

public query (context) func fetchSubscribers(params: Admin.FetchSubscribersParams): async Admin.FetchSubscribersResponse {
AdminModule.fetchSubscribers(context.caller, params);
};

public shared (context) func removeEvents(eventIds: [Nat]): async () {
AdminModule.removeEvents(context.caller, eventIds);
public query (context) func fetchEvents(params: Admin.FetchEventsParams): async Admin.FetchEventsResponse {
AdminModule.fetchEvents(context.caller, params);
};

public query (context) func getAdmins(): async [Principal] {
Expand Down
6 changes: 3 additions & 3 deletions src/migrations/00-02-00-publisher-entities/lib.mo
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,13 @@ module {
subscriptions = Set.fromIter(PrevSet.keys(subscriber.subscriptions), Map.thash);
}));

let subscriptions = Map.new<Text, Map.Map<Principal, Types.Subscription>>(Map.thash);
let subscriptions = Map.new<Text, Types.SubscriptionGroup>(Map.thash);

for (subscriber in PrevMap.vals(state.subscribers)) for (eventName in PrevSet.keys(subscriber.subscriptions)) {
let subscriberId = subscriber.canisterId;

let subscriptionGroup = Map.update<Text, Map.Map<Principal, Types.Subscription>>(subscriptions, Map.thash, eventName, func(key, value) {
return Option.get<Map.Map<Principal, Types.Subscription>>(value, Map.new(Map.phash));
let subscriptionGroup = Map.update<Text, Types.SubscriptionGroup>(subscriptions, Map.thash, eventName, func(key, value) {
return Option.get<Types.SubscriptionGroup>(value, Map.new(Map.phash));
});

ignore Map.update<Principal, Types.Subscription>(subscriptionGroup, Map.phash, subscriberId, func(key, value) = Option.get(value, {
Expand Down
38 changes: 21 additions & 17 deletions src/migrations/00-02-00-publisher-entities/types.mo
Original file line number Diff line number Diff line change
Expand Up @@ -3,47 +3,47 @@ import Map "mo:map_8_0_0_alpha_5/Map";
import Set "mo:map_8_0_0_alpha_5/Set";

module {
public type Subscriber = {
public type Publisher = {
id: Principal;
createdAt: Nat64;
var activeSubscriptions: Nat8;
subscriptions: Set.Set<Text>;
var activePublications: Nat8;
publications: Set.Set<Text>;
};

public type Subscription = {
public type Publication = {
eventName: Text;
subscriberId: Principal;
publisherId: Principal;
createdAt: Nat64;
var skip: Nat8;
var skipped: Nat8;
var active: Bool;
var stopped: Bool;
var numberOfEvents: Nat64;
var numberOfNotifications: Nat64;
var numberOfResendNotifications: Nat64;
var numberOfRequestedNotifications: Nat64;
var numberOfConfirmations: Nat64;
events: Set.Set<Nat>;
whitelist: Set.Set<Principal>;
};

public type Publisher = {
public type Subscriber = {
id: Principal;
createdAt: Nat64;
var activePublications: Nat8;
publications: Set.Set<Text>;
var activeSubscriptions: Nat8;
subscriptions: Set.Set<Text>;
};

public type Publication = {
public type Subscription = {
eventName: Text;
publisherId: Principal;
subscriberId: Principal;
createdAt: Nat64;
var skip: Nat8;
var skipped: Nat8;
var active: Bool;
var stopped: Bool;
var numberOfEvents: Nat64;
var numberOfNotifications: Nat64;
var numberOfResendNotifications: Nat64;
var numberOfRequestedNotifications: Nat64;
var numberOfConfirmations: Nat64;
whitelist: Set.Set<Principal>;
events: Set.Set<Nat>;
};

public type Event = {
Expand All @@ -58,6 +58,10 @@ module {
subscribers: Map.Map<Principal, Nat8>;
};

public type PublicationGroup = Map.Map<Principal, Publication>;

public type SubscriptionGroup = Map.Map<Principal, Subscription>;

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

public type State = {
Expand All @@ -66,9 +70,9 @@ module {
var nextBroadcastTime: Nat64;
admins: Set.Set<Principal>;
publishers: Map.Map<Principal, Publisher>;
publications: Map.Map<Text, Map.Map<Principal, Publication>>;
publications: Map.Map<Text, PublicationGroup>;
subscribers: Map.Map<Principal, Subscriber>;
subscriptions: Map.Map<Text, Map.Map<Principal, Subscription>>;
subscriptions: Map.Map<Text, SubscriptionGroup>;
events: Map.Map<Nat, Event>;
};
};
100 changes: 100 additions & 0 deletions src/migrations/00-03-00-multiple-listeners/lib.mo
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
import Debug "mo:base/Debug";
import Iter "mo:base/Iter";
import Map "mo:map_8_0_0_alpha_5/Map";
import Option "mo:base/Option";
import Set "mo:map_8_0_0_alpha_5/Set";
import MigrationTypes "../types";
import PrevTypes "../00-02-00-publisher-entities/types";
import Prim "mo:prim";
import Types "./types";

module {
public func upgrade(migrationState: MigrationTypes.State, args: MigrationTypes.Args): MigrationTypes.State {
let state = switch (migrationState) { case (#v0_2_0(#data(state))) state; case (_) Debug.trap("Unexpected migration state") };

let subscribers = Map.map<Principal, PrevTypes.Subscriber, Types.Subscriber>(state.subscribers, func(key, subscriber) = {
id = subscriber.id;
createdAt = subscriber.createdAt;
var activeSubscriptions = subscriber.activeSubscriptions;
listeners = Set.fromIter([subscriber.id].vals(), Map.phash);
confirmedListeners = Set.fromIter([subscriber.id].vals(), Map.phash);
subscriptions = subscriber.subscriptions;
});

let subscriptions = Map.map<Text, PrevTypes.SubscriptionGroup, Types.SubscriptionGroup>(state.subscriptions, func(key, subscriptionGroup) {
return Map.map<Principal, PrevTypes.Subscription, Types.Subscription>(subscriptionGroup, func(key, subscription) = {
eventName = subscription.eventName;
subscriberId = subscription.subscriberId;
createdAt = subscription.createdAt;
var skip = subscription.skip;
var skipped = subscription.skipped;
var active = subscription.active;
var stopped = subscription.stopped;
var filter = null;
var filterPath = null;
var numberOfEvents = subscription.numberOfEvents;
var numberOfNotifications = subscription.numberOfNotifications;
var numberOfResendNotifications = subscription.numberOfResendNotifications;
var numberOfRequestedNotifications = subscription.numberOfRequestedNotifications;
var numberOfConfirmations = subscription.numberOfConfirmations;
events = subscription.events;
})
});

return #v0_3_0(#data({
var eventId = state.eventId;
var broadcastActive = state.broadcastActive;
var nextBroadcastTime = state.nextBroadcastTime;
admins = state.admins;
publishers = state.publishers;
publications = state.publications;
subscribers = subscribers;
subscriptions = subscriptions;
confirmedListeners = Map.new(Map.phash);
events = state.events;
}));
};

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

public func downgrade(migrationState: MigrationTypes.State, args: MigrationTypes.Args): MigrationTypes.State {
let state = switch (migrationState) { case (#v0_3_0(#data(state))) state; case (_) Debug.trap("Unexpected migration state") };

let subscribers = Map.map<Principal, Types.Subscriber, PrevTypes.Subscriber>(state.subscribers, func((key, subscriber)) = {
id = subscriber.id;
createdAt = subscriber.createdAt;
var activeSubscriptions = subscriber.activeSubscriptions;
subscriptions = subscriber.subscriptions;
});

let subscriptions = Map.map<Text, Types.SubscriptionGroup, PrevTypes.SubscriptionGroup>(state.subscriptions, func(key, subscriptionGroup) {
return Map.map<Principal, Types.Subscription, PrevTypes.Subscription>(subscriptionGroup, func(key, subscription) = {
eventName = subscription.eventName;
subscriberId = subscription.subscriberId;
createdAt = subscription.createdAt;
var skip = subscription.skip;
var skipped = subscription.skipped;
var active = subscription.active;
var stopped = subscription.stopped;
var numberOfEvents = subscription.numberOfEvents;
var numberOfNotifications = subscription.numberOfNotifications;
var numberOfResendNotifications = subscription.numberOfResendNotifications;
var numberOfRequestedNotifications = subscription.numberOfRequestedNotifications;
var numberOfConfirmations = subscription.numberOfConfirmations;
events = subscription.events;
})
});

return #v0_2_0(#data({
var eventId = state.eventId;
var broadcastActive = state.broadcastActive;
var nextBroadcastTime = state.nextBroadcastTime;
admins = state.admins;
publishers = state.publishers;
publications = state.publications;
subscribers = subscribers;
subscriptions = subscriptions;
events = state.events;
}));
};
};
84 changes: 84 additions & 0 deletions src/migrations/00-03-00-multiple-listeners/types.mo
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
import Candy "mo:candy_0_1_9/types";
import CandyUtils "mo:candy_utils_0_2_0/CandyUtils";
import Map "mo:map_8_0_0_alpha_5/Map";
import Set "mo:map_8_0_0_alpha_5/Set";

module {
public type Publisher = {
id: Principal;
createdAt: Nat64;
var activePublications: Nat8;
publications: Set.Set<Text>;
};

public type Publication = {
eventName: Text;
publisherId: Principal;
createdAt: Nat64;
var active: Bool;
var numberOfEvents: Nat64;
var numberOfNotifications: Nat64;
var numberOfResendNotifications: Nat64;
var numberOfRequestedNotifications: Nat64;
var numberOfConfirmations: Nat64;
whitelist: Set.Set<Principal>;
};

public type Subscriber = {
id: Principal;
createdAt: Nat64;
var activeSubscriptions: Nat8;
listeners: Set.Set<Principal>;
confirmedListeners: Set.Set<Principal>;
subscriptions: Set.Set<Text>;
};

public type Subscription = {
eventName: Text;
subscriberId: Principal;
createdAt: Nat64;
var skip: Nat8;
var skipped: Nat8;
var active: Bool;
var stopped: Bool;
var filter: ?Text;
var filterPath: ?CandyUtils.Path;
var numberOfEvents: Nat64;
var numberOfNotifications: Nat64;
var numberOfResendNotifications: Nat64;
var numberOfRequestedNotifications: Nat64;
var numberOfConfirmations: Nat64;
events: Set.Set<Nat>;
};

public type Event = {
id: Nat;
eventName: Text;
publisherId: Principal;
payload: Candy.CandyValue;
createdAt: Nat64;
var nextBroadcastTime: Nat64;
var numberOfAttempts: Nat8;
resendRequests: Set.Set<Principal>;
subscribers: Map.Map<Principal, Nat8>;
};

public type PublicationGroup = Map.Map<Principal, Publication>;

public type SubscriptionGroup = Map.Map<Principal, Subscription>;

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

public type State = {
var eventId: Nat;
var broadcastActive: Bool;
var nextBroadcastTime: Nat64;
admins: Set.Set<Principal>;
publishers: Map.Map<Principal, Publisher>;
publications: Map.Map<Text, PublicationGroup>;
subscribers: Map.Map<Principal, Subscriber>;
subscriptions: Map.Map<Text, SubscriptionGroup>;
confirmedListeners: Map.Map<Principal, Principal>;
events: Map.Map<Nat, Event>;
};
};
Loading

0 comments on commit 32138af

Please sign in to comment.