Skip to content

Commit

Permalink
Merge pull request #3 from ORIGYN-SA/feat/multiple-listeners
Browse files Browse the repository at this point in the history
added ability to filter incoming events for subscription, added ability to have multiple listener canisters
  • Loading branch information
ZhenyaUsenko authored Oct 18, 2022
2 parents e0e839d + 32138af commit cc2a70e
Show file tree
Hide file tree
Showing 16 changed files with 571 additions and 171 deletions.
4 changes: 3 additions & 1 deletion package-set.dhall
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
[
{ name = "base", repo = "https://github.com/dfinity/motoko-base.git", version = "c174fc4", dependencies = []: List Text },
{ name = "base", repo = "https://github.com/dfinity/motoko-base.git", version = "moc-0.7.0", dependencies = []: List Text },
{ name = "candy_0_1_9", repo = "https://github.com/aramakme/candy_library.git", version = "v0.1.9", dependencies = ["base"] },
{ name = "candy", repo = "https://github.com/aramakme/candy_library.git", version = "v0.1.9", dependencies = ["base"] },
{ name = "map_4_0_0", repo = "https://github.com/ZhenyaUsenko/motoko-hash-map.git", version = "v4.0.0", dependencies = ["base"] },
{ name = "map_8_0_0_alpha_5", repo = "https://github.com/ZhenyaUsenko/motoko-hash-map.git", version = "v8.0.0-alpha.5", dependencies = ["base"] },
{ name = "map", repo = "https://github.com/ZhenyaUsenko/motoko-hash-map.git", version = "v8.0.0-alpha.5", dependencies = ["base"] },
{ name = "candy_utils_0_2_0", repo = "https://github.com/ZhenyaUsenko/motoko-candy-utils.git", version = "v0.2.0", dependencies = ["base"] },
{ name = "candy_utils", repo = "https://github.com/ZhenyaUsenko/motoko-candy-utils.git", version = "v0.2.0", dependencies = ["base"] },
]
36 changes: 23 additions & 13 deletions src/main.mo
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import Admin "./modules/admin";
import Broadcast "./modules/broadcast";
import Candy "mo:candy/types";
import Cascade "./modules/cascade";
import Debug "mo:base/Debug";
import MigrationTypes "./migrations/types";
import Migrations "./migrations";
import Prim "mo:prim";
import Publish "./modules/publish";
import Stats "./modules/stats";
import Subscribe "./modules/subscribe";

shared (deployer) actor class EventSystem() {
Expand All @@ -16,22 +16,22 @@ shared (deployer) actor class EventSystem() {

stable var migrationState: MigrationTypes.State = #v0_0_0(#data);

migrationState := Migrations.migrate(migrationState, #v0_2_0(#id), {});
migrationState := Migrations.migrate(migrationState, #v0_3_0(#id), {});

let state = switch (migrationState) { case (#v0_2_0(#data(state))) state; case (_) Debug.trap("Unexpected migration state") };
let state = switch (migrationState) { case (#v0_3_0(#data(state))) state; case (_) Debug.trap("Unexpected migration state") };

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

let PublishModule = Publish.init(state, deployer.caller);

let SubscribeModule = Subscribe.init(state, deployer.caller);

let StatsModule = Stats.init(state, deployer.caller);

let BroadcastModule = Broadcast.init(state, deployer.caller);

let AdminModule = Admin.init(state, deployer.caller);

let CascadeModule = Cascade.init(state, deployer.caller);

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

public shared (context) func registerPublication(eventName: Text, options: Publish.PublicationOptions): async () {
Expand All @@ -48,6 +48,10 @@ shared (deployer) actor class EventSystem() {

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

public shared (context) func registerSubscriber(caller: Principal, options: Subscribe.SubscriberOptions): async () {
ignore SubscribeModule.registerSubscriber(context.caller, options);
};

public shared (context) func subscribe(eventName: Text, options: Subscribe.SubscriptionOptions): async () {
ignore SubscribeModule.subscribe(context.caller, eventName, options);
};
Expand All @@ -60,26 +64,32 @@ shared (deployer) actor class EventSystem() {
SubscribeModule.requestMissedEvents(context.caller, eventName, options);
};

public shared (context) func confirmListener(subscriberId: Principal, allow: Bool) {
SubscribeModule.confirmListener(context.caller, subscriberId, allow);
};

public shared (context) func confirmEventProcessed(eventId: Nat): async () {
SubscribeModule.confirmEventProcessed(context.caller, eventId);
};

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

public query (context) func fetchSubscribers(params: Admin.FetchSubscribersParams): async Admin.FetchSubscribersResponse {
AdminModule.fetchSubscribers(context.caller, params);
public shared (context) func getPublicationStats(options: Stats.StatsOptions): async Stats.Stats {
StatsModule.getPublicationStats(context.caller, options);
};

public query (context) func fetchEvents(params: Admin.FetchEventsParams): async Admin.FetchEventsResponse {
AdminModule.fetchEvents(context.caller, params);
public shared (context) func getSubscriptionStats(options: Stats.StatsOptions): async Stats.Stats {
StatsModule.getSubscriptionStats(context.caller, options);
};

public shared (context) func removeSubscribers(subscriberIds: [Principal]): async () {
AdminModule.removeSubscribers(context.caller, subscriberIds);
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

public query (context) func fetchSubscribers(params: Admin.FetchSubscribersParams): async Admin.FetchSubscribersResponse {
AdminModule.fetchSubscribers(context.caller, params);
};

public shared (context) func removeEvents(eventIds: [Nat]): async () {
AdminModule.removeEvents(context.caller, eventIds);
public query (context) func fetchEvents(params: Admin.FetchEventsParams): async Admin.FetchEventsResponse {
AdminModule.fetchEvents(context.caller, params);
};

public query (context) func getAdmins(): async [Principal] {
Expand Down
6 changes: 3 additions & 3 deletions src/migrations/00-02-00-publisher-entities/lib.mo
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,13 @@ module {
subscriptions = Set.fromIter(PrevSet.keys(subscriber.subscriptions), Map.thash);
}));

let subscriptions = Map.new<Text, Map.Map<Principal, Types.Subscription>>(Map.thash);
let subscriptions = Map.new<Text, Types.SubscriptionGroup>(Map.thash);

for (subscriber in PrevMap.vals(state.subscribers)) for (eventName in PrevSet.keys(subscriber.subscriptions)) {
let subscriberId = subscriber.canisterId;

let subscriptionGroup = Map.update<Text, Map.Map<Principal, Types.Subscription>>(subscriptions, Map.thash, eventName, func(key, value) {
return Option.get<Map.Map<Principal, Types.Subscription>>(value, Map.new(Map.phash));
let subscriptionGroup = Map.update<Text, Types.SubscriptionGroup>(subscriptions, Map.thash, eventName, func(key, value) {
return Option.get<Types.SubscriptionGroup>(value, Map.new(Map.phash));
});

ignore Map.update<Principal, Types.Subscription>(subscriptionGroup, Map.phash, subscriberId, func(key, value) = Option.get(value, {
Expand Down
38 changes: 21 additions & 17 deletions src/migrations/00-02-00-publisher-entities/types.mo
Original file line number Diff line number Diff line change
Expand Up @@ -3,47 +3,47 @@ import Map "mo:map_8_0_0_alpha_5/Map";
import Set "mo:map_8_0_0_alpha_5/Set";

module {
public type Subscriber = {
public type Publisher = {
id: Principal;
createdAt: Nat64;
var activeSubscriptions: Nat8;
subscriptions: Set.Set<Text>;
var activePublications: Nat8;
publications: Set.Set<Text>;
};

public type Subscription = {
public type Publication = {
eventName: Text;
subscriberId: Principal;
publisherId: Principal;
createdAt: Nat64;
var skip: Nat8;
var skipped: Nat8;
var active: Bool;
var stopped: Bool;
var numberOfEvents: Nat64;
var numberOfNotifications: Nat64;
var numberOfResendNotifications: Nat64;
var numberOfRequestedNotifications: Nat64;
var numberOfConfirmations: Nat64;
events: Set.Set<Nat>;
whitelist: Set.Set<Principal>;
};

public type Publisher = {
public type Subscriber = {
id: Principal;
createdAt: Nat64;
var activePublications: Nat8;
publications: Set.Set<Text>;
var activeSubscriptions: Nat8;
subscriptions: Set.Set<Text>;
};

public type Publication = {
public type Subscription = {
eventName: Text;
publisherId: Principal;
subscriberId: Principal;
createdAt: Nat64;
var skip: Nat8;
var skipped: Nat8;
var active: Bool;
var stopped: Bool;
var numberOfEvents: Nat64;
var numberOfNotifications: Nat64;
var numberOfResendNotifications: Nat64;
var numberOfRequestedNotifications: Nat64;
var numberOfConfirmations: Nat64;
whitelist: Set.Set<Principal>;
events: Set.Set<Nat>;
};

public type Event = {
Expand All @@ -58,6 +58,10 @@ module {
subscribers: Map.Map<Principal, Nat8>;
};

public type PublicationGroup = Map.Map<Principal, Publication>;

public type SubscriptionGroup = Map.Map<Principal, Subscription>;

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

public type State = {
Expand All @@ -66,9 +70,9 @@ module {
var nextBroadcastTime: Nat64;
admins: Set.Set<Principal>;
publishers: Map.Map<Principal, Publisher>;
publications: Map.Map<Text, Map.Map<Principal, Publication>>;
publications: Map.Map<Text, PublicationGroup>;
subscribers: Map.Map<Principal, Subscriber>;
subscriptions: Map.Map<Text, Map.Map<Principal, Subscription>>;
subscriptions: Map.Map<Text, SubscriptionGroup>;
events: Map.Map<Nat, Event>;
};
};
100 changes: 100 additions & 0 deletions src/migrations/00-03-00-multiple-listeners/lib.mo
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
import Debug "mo:base/Debug";
import Iter "mo:base/Iter";
import Map "mo:map_8_0_0_alpha_5/Map";
import Option "mo:base/Option";
import Set "mo:map_8_0_0_alpha_5/Set";
import MigrationTypes "../types";
import PrevTypes "../00-02-00-publisher-entities/types";
import Prim "mo:prim";
import Types "./types";

module {
public func upgrade(migrationState: MigrationTypes.State, args: MigrationTypes.Args): MigrationTypes.State {
let state = switch (migrationState) { case (#v0_2_0(#data(state))) state; case (_) Debug.trap("Unexpected migration state") };

let subscribers = Map.map<Principal, PrevTypes.Subscriber, Types.Subscriber>(state.subscribers, func(key, subscriber) = {
id = subscriber.id;
createdAt = subscriber.createdAt;
var activeSubscriptions = subscriber.activeSubscriptions;
listeners = Set.fromIter([subscriber.id].vals(), Map.phash);
confirmedListeners = Set.fromIter([subscriber.id].vals(), Map.phash);
subscriptions = subscriber.subscriptions;
});

let subscriptions = Map.map<Text, PrevTypes.SubscriptionGroup, Types.SubscriptionGroup>(state.subscriptions, func(key, subscriptionGroup) {
return Map.map<Principal, PrevTypes.Subscription, Types.Subscription>(subscriptionGroup, func(key, subscription) = {
eventName = subscription.eventName;
subscriberId = subscription.subscriberId;
createdAt = subscription.createdAt;
var skip = subscription.skip;
var skipped = subscription.skipped;
var active = subscription.active;
var stopped = subscription.stopped;
var filter = null;
var filterPath = null;
var numberOfEvents = subscription.numberOfEvents;
var numberOfNotifications = subscription.numberOfNotifications;
var numberOfResendNotifications = subscription.numberOfResendNotifications;
var numberOfRequestedNotifications = subscription.numberOfRequestedNotifications;
var numberOfConfirmations = subscription.numberOfConfirmations;
events = subscription.events;
})
});

return #v0_3_0(#data({
var eventId = state.eventId;
var broadcastActive = state.broadcastActive;
var nextBroadcastTime = state.nextBroadcastTime;
admins = state.admins;
publishers = state.publishers;
publications = state.publications;
subscribers = subscribers;
subscriptions = subscriptions;
confirmedListeners = Map.new(Map.phash);
events = state.events;
}));
};

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

public func downgrade(migrationState: MigrationTypes.State, args: MigrationTypes.Args): MigrationTypes.State {
let state = switch (migrationState) { case (#v0_3_0(#data(state))) state; case (_) Debug.trap("Unexpected migration state") };

let subscribers = Map.map<Principal, Types.Subscriber, PrevTypes.Subscriber>(state.subscribers, func((key, subscriber)) = {
id = subscriber.id;
createdAt = subscriber.createdAt;
var activeSubscriptions = subscriber.activeSubscriptions;
subscriptions = subscriber.subscriptions;
});

let subscriptions = Map.map<Text, Types.SubscriptionGroup, PrevTypes.SubscriptionGroup>(state.subscriptions, func(key, subscriptionGroup) {
return Map.map<Principal, Types.Subscription, PrevTypes.Subscription>(subscriptionGroup, func(key, subscription) = {
eventName = subscription.eventName;
subscriberId = subscription.subscriberId;
createdAt = subscription.createdAt;
var skip = subscription.skip;
var skipped = subscription.skipped;
var active = subscription.active;
var stopped = subscription.stopped;
var numberOfEvents = subscription.numberOfEvents;
var numberOfNotifications = subscription.numberOfNotifications;
var numberOfResendNotifications = subscription.numberOfResendNotifications;
var numberOfRequestedNotifications = subscription.numberOfRequestedNotifications;
var numberOfConfirmations = subscription.numberOfConfirmations;
events = subscription.events;
})
});

return #v0_2_0(#data({
var eventId = state.eventId;
var broadcastActive = state.broadcastActive;
var nextBroadcastTime = state.nextBroadcastTime;
admins = state.admins;
publishers = state.publishers;
publications = state.publications;
subscribers = subscribers;
subscriptions = subscriptions;
events = state.events;
}));
};
};
84 changes: 84 additions & 0 deletions src/migrations/00-03-00-multiple-listeners/types.mo
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
import Candy "mo:candy_0_1_9/types";
import CandyUtils "mo:candy_utils_0_2_0/CandyUtils";
import Map "mo:map_8_0_0_alpha_5/Map";
import Set "mo:map_8_0_0_alpha_5/Set";

module {
public type Publisher = {
id: Principal;
createdAt: Nat64;
var activePublications: Nat8;
publications: Set.Set<Text>;
};

public type Publication = {
eventName: Text;
publisherId: Principal;
createdAt: Nat64;
var active: Bool;
var numberOfEvents: Nat64;
var numberOfNotifications: Nat64;
var numberOfResendNotifications: Nat64;
var numberOfRequestedNotifications: Nat64;
var numberOfConfirmations: Nat64;
whitelist: Set.Set<Principal>;
};

public type Subscriber = {
id: Principal;
createdAt: Nat64;
var activeSubscriptions: Nat8;
listeners: Set.Set<Principal>;
confirmedListeners: Set.Set<Principal>;
subscriptions: Set.Set<Text>;
};

public type Subscription = {
eventName: Text;
subscriberId: Principal;
createdAt: Nat64;
var skip: Nat8;
var skipped: Nat8;
var active: Bool;
var stopped: Bool;
var filter: ?Text;
var filterPath: ?CandyUtils.Path;
var numberOfEvents: Nat64;
var numberOfNotifications: Nat64;
var numberOfResendNotifications: Nat64;
var numberOfRequestedNotifications: Nat64;
var numberOfConfirmations: Nat64;
events: Set.Set<Nat>;
};

public type Event = {
id: Nat;
eventName: Text;
publisherId: Principal;
payload: Candy.CandyValue;
createdAt: Nat64;
var nextBroadcastTime: Nat64;
var numberOfAttempts: Nat8;
resendRequests: Set.Set<Principal>;
subscribers: Map.Map<Principal, Nat8>;
};

public type PublicationGroup = Map.Map<Principal, Publication>;

public type SubscriptionGroup = Map.Map<Principal, Subscription>;

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

public type State = {
var eventId: Nat;
var broadcastActive: Bool;
var nextBroadcastTime: Nat64;
admins: Set.Set<Principal>;
publishers: Map.Map<Principal, Publisher>;
publications: Map.Map<Text, PublicationGroup>;
subscribers: Map.Map<Principal, Subscriber>;
subscriptions: Map.Map<Text, SubscriptionGroup>;
confirmedListeners: Map.Map<Principal, Principal>;
events: Map.Map<Nat, Event>;
};
};
Loading

0 comments on commit cc2a70e

Please sign in to comment.