Skip to content

Commit

Permalink
Removed additional logs (#265)
Browse files Browse the repository at this point in the history
* Added registerListener to start_hub

Signed-off-by: HimajaDhanyamraju2 <[email protected]>

* Removed extra logs

Signed-off-by: HimajaDhanyamraju2 <[email protected]>

* Removed additional logs

Signed-off-by: HimajaDhanyamraju2 <[email protected]>

---------

Signed-off-by: HimajaDhanyamraju2 <[email protected]>
Co-authored-by: HimajaDhanyamraju2 <[email protected]>
  • Loading branch information
HimajaDhanyamraju2 and HimajaDhanyamraju2 authored May 30, 2024
1 parent afcaf13 commit d211bcd
Showing 1 changed file with 0 additions and 26 deletions.
26 changes: 0 additions & 26 deletions hub/start_hub.bal
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,8 @@ public function main() returns error? {
check httpListener.attach(healthCheckService, "hub/actuator/health");
websubhub:Listener hubListener = check new (httpListener);
check hubListener.attach(hubService, "hub");
log:printInfo("Starting the hub...");
websubhub:Error? websubError = check hubListener.'start();
runtime:registerListener(hubListener);
if websubError is websubhub:Error {
log:printInfo("Hub started with error: ", hubStatus = websubError.toString());
} else {
log:printInfo("Hub started : ", hubStatus = "No error");
}
}


Expand All @@ -80,12 +74,10 @@ function validateConfigs() returns boolean|error {
}
return error("Found error in decoding the encryption key. Please set valid base64 encoded bytes as encryption key to proceed.");
}
log:printInfo("Configs validated and returning true...");
return true;
}

function syncRegsisteredTopicsCache() {
log:printInfo("syncRegisteredTopicsCache started...");
do {
while true {
websubhub:TopicRegistration[]? persistedTopics = check getPersistedTopics();
Expand All @@ -101,11 +93,9 @@ function syncRegsisteredTopicsCache() {
log:printError("Error occurred while gracefully closing kafka-consumer", err = result.message());
}
}
log:printInfo("Exiting syncRegisteredTopicsCache method...");
}

function getPersistedTopics() returns websubhub:TopicRegistration[]|error? {
log:printInfo("Entered getPersistedTopics method...");
kafka:BytesConsumerRecord[] records = check conn:registeredTopicsConsumer->poll(config:POLLING_INTERVAL);
if records.length() > 0 {
kafka:BytesConsumerRecord lastRecord = records.pop();
Expand All @@ -117,24 +107,20 @@ function getPersistedTopics() returns websubhub:TopicRegistration[]|error? {
return lastPersistedData;
}
}
log:printInfo("Exiting getPersistedTopics method...");
return;
}

function deSerializeTopicsMessage(string lastPersistedData) returns websubhub:TopicRegistration[]|error {
log:printInfo("Entered deSerializeTopicsMessage method...");
websubhub:TopicRegistration[] currentTopics = [];
json[] payload = <json[]>check value:fromJsonString(lastPersistedData);
foreach var data in payload {
websubhub:TopicRegistration topic = check data.cloneWithType(websubhub:TopicRegistration);
currentTopics.push(topic);
}
log:printInfo("Exiting deSerializeTopicsMessage method...");
return currentTopics;
}

function refreshTopicCache(websubhub:TopicRegistration[] persistedTopics) {
log:printInfo("Entered refreshTopicCache method...");
lock {
registeredTopicsCache.removeAll();
}
Expand All @@ -144,11 +130,9 @@ function refreshTopicCache(websubhub:TopicRegistration[] persistedTopics) {
registeredTopicsCache[topicName] = topic.cloneReadOnly();
}
}
log:printInfo("Exiting refreshTopicCache method...");
}

function syncSubscribersCache() {
log:printInfo("Entered syncSubscribersCache method...");
do {
while true {
websubhub:VerifiedSubscription[]? persistedSubscribers = check getPersistedSubscribers();
Expand All @@ -165,11 +149,9 @@ function syncSubscribersCache() {
log:printError("Error occurred while gracefully closing kafka-consumer", err = result.message());
}
}
log:printInfo("Exiting syncSubscribersCache method...");
}

function getPersistedSubscribers() returns websubhub:VerifiedSubscription[]|error? {
log:printInfo("Entered getPersistedSubscribers method...");
kafka:BytesConsumerRecord[] records = check conn:subscribersConsumer->poll(config:POLLING_INTERVAL);
if records.length() > 0 {
kafka:BytesConsumerRecord lastRecord = records.pop();
Expand All @@ -181,36 +163,30 @@ function getPersistedSubscribers() returns websubhub:VerifiedSubscription[]|erro
return lastPersistedData;
}
}
log:printInfo("Exiting getPersistedSubscribers method...");
return;
}

function deSerializeSubscribersMessage(string lastPersistedData) returns websubhub:VerifiedSubscription[]|error {
log:printInfo("Entered deSerializeSubscribersMessage method...");
websubhub:VerifiedSubscription[] currentSubscriptions = [];
json[] payload = <json[]>check value:fromJsonString(lastPersistedData);
foreach var data in payload {
websubhub:VerifiedSubscription subscription = check data.cloneWithType(websubhub:VerifiedSubscription);
currentSubscriptions.push(subscription);
}
log:printInfo("Exiting deSerializeSubscribersMessage method...");
return currentSubscriptions;
}

function refreshSubscribersCache(websubhub:VerifiedSubscription[] persistedSubscribers) {
log:printInfo("Entered refreshSubscribersCache method...");
final readonly & string[] subscriberIds = persistedSubscribers.'map(sub => util:generateSubscriberId(sub.hubTopic, sub.hubCallback)).cloneReadOnly();
lock {
string[] unsubscribedSubscribers = subscribersCache.keys().filter('key => subscriberIds.indexOf('key) is ());
foreach var sub in unsubscribedSubscribers {
_ = subscribersCache.removeIfHasKey(sub);
}
}
log:printInfo("Exiting refreshSubscribersCache method...");
}

function startMissingSubscribers(websubhub:VerifiedSubscription[] persistedSubscribers) returns error? {
log:printInfo("Entered startMissingSubscribers method...");
foreach var subscriber in persistedSubscribers {
string topicName = util:sanitizeTopicName(subscriber.hubTopic);
string subscriberId = util:generateSubscriberId(subscriber.hubTopic, subscriber.hubCallback);
Expand Down Expand Up @@ -245,7 +221,6 @@ function startMissingSubscribers(websubhub:VerifiedSubscription[] persistedSubsc
},
timeout: config:MESSAGE_DELIVERY_TIMEOUT
});
log:printInfo("Calling pollForNewUpdates method...");
_ = @strand { thread: "any" } start pollForNewUpdates(hubClientEp, consumerEp, topicName, subscriberId, subscriber.hubCallback);
}
}
Expand Down Expand Up @@ -293,7 +268,6 @@ isolated function isValidConsumer(string topicName, string subscriberId, string
}

isolated function notifySubscribers(kafka:BytesConsumerRecord[] records, websubhub:HubClient clientEp, kafka:Consumer consumerEp, string topic, string callback) returns error? {
log:printInfo("Entered NotifySubscribers method...");
foreach kafka:BytesConsumerRecord kafkaRecord in records {
websubhub:ContentDistributionMessage|error message = deSerializeKafkaRecord(kafkaRecord);

Expand Down

0 comments on commit d211bcd

Please sign in to comment.