diff --git a/hub/start_hub.bal b/hub/start_hub.bal index 4175565..c04b8d1 100644 --- a/hub/start_hub.bal +++ b/hub/start_hub.bal @@ -61,14 +61,8 @@ public function main() returns error? { check httpListener.attach(healthCheckService, "hub/actuator/health"); websubhub:Listener hubListener = check new (httpListener); check hubListener.attach(hubService, "hub"); - log:printInfo("Starting the hub..."); websubhub:Error? websubError = check hubListener.'start(); runtime:registerListener(hubListener); - if websubError is websubhub:Error { - log:printInfo("Hub started with error: ", hubStatus = websubError.toString()); - } else { - log:printInfo("Hub started : ", hubStatus = "No error"); - } } @@ -80,12 +74,10 @@ function validateConfigs() returns boolean|error { } return error("Found error in decoding the encryption key. Please set valid base64 encoded bytes as encryption key to proceed."); } - log:printInfo("Configs validated and returning true..."); return true; } function syncRegsisteredTopicsCache() { - log:printInfo("syncRegisteredTopicsCache started..."); do { while true { websubhub:TopicRegistration[]? persistedTopics = check getPersistedTopics(); @@ -101,11 +93,9 @@ function syncRegsisteredTopicsCache() { log:printError("Error occurred while gracefully closing kafka-consumer", err = result.message()); } } - log:printInfo("Exiting syncRegisteredTopicsCache method..."); } function getPersistedTopics() returns websubhub:TopicRegistration[]|error? { - log:printInfo("Entered getPersistedTopics method..."); kafka:BytesConsumerRecord[] records = check conn:registeredTopicsConsumer->poll(config:POLLING_INTERVAL); if records.length() > 0 { kafka:BytesConsumerRecord lastRecord = records.pop(); @@ -117,24 +107,20 @@ function getPersistedTopics() returns websubhub:TopicRegistration[]|error? { return lastPersistedData; } } - log:printInfo("Exiting getPersistedTopics method..."); return; } function deSerializeTopicsMessage(string lastPersistedData) returns websubhub:TopicRegistration[]|error { - log:printInfo("Entered deSerializeTopicsMessage method..."); websubhub:TopicRegistration[] currentTopics = []; json[] payload = check value:fromJsonString(lastPersistedData); foreach var data in payload { websubhub:TopicRegistration topic = check data.cloneWithType(websubhub:TopicRegistration); currentTopics.push(topic); } - log:printInfo("Exiting deSerializeTopicsMessage method..."); return currentTopics; } function refreshTopicCache(websubhub:TopicRegistration[] persistedTopics) { - log:printInfo("Entered refreshTopicCache method..."); lock { registeredTopicsCache.removeAll(); } @@ -144,11 +130,9 @@ function refreshTopicCache(websubhub:TopicRegistration[] persistedTopics) { registeredTopicsCache[topicName] = topic.cloneReadOnly(); } } - log:printInfo("Exiting refreshTopicCache method..."); } function syncSubscribersCache() { - log:printInfo("Entered syncSubscribersCache method..."); do { while true { websubhub:VerifiedSubscription[]? persistedSubscribers = check getPersistedSubscribers(); @@ -165,11 +149,9 @@ function syncSubscribersCache() { log:printError("Error occurred while gracefully closing kafka-consumer", err = result.message()); } } - log:printInfo("Exiting syncSubscribersCache method..."); } function getPersistedSubscribers() returns websubhub:VerifiedSubscription[]|error? { - log:printInfo("Entered getPersistedSubscribers method..."); kafka:BytesConsumerRecord[] records = check conn:subscribersConsumer->poll(config:POLLING_INTERVAL); if records.length() > 0 { kafka:BytesConsumerRecord lastRecord = records.pop(); @@ -181,24 +163,20 @@ function getPersistedSubscribers() returns websubhub:VerifiedSubscription[]|erro return lastPersistedData; } } - log:printInfo("Exiting getPersistedSubscribers method..."); return; } function deSerializeSubscribersMessage(string lastPersistedData) returns websubhub:VerifiedSubscription[]|error { - log:printInfo("Entered deSerializeSubscribersMessage method..."); websubhub:VerifiedSubscription[] currentSubscriptions = []; json[] payload = check value:fromJsonString(lastPersistedData); foreach var data in payload { websubhub:VerifiedSubscription subscription = check data.cloneWithType(websubhub:VerifiedSubscription); currentSubscriptions.push(subscription); } - log:printInfo("Exiting deSerializeSubscribersMessage method..."); return currentSubscriptions; } function refreshSubscribersCache(websubhub:VerifiedSubscription[] persistedSubscribers) { - log:printInfo("Entered refreshSubscribersCache method..."); final readonly & string[] subscriberIds = persistedSubscribers.'map(sub => util:generateSubscriberId(sub.hubTopic, sub.hubCallback)).cloneReadOnly(); lock { string[] unsubscribedSubscribers = subscribersCache.keys().filter('key => subscriberIds.indexOf('key) is ()); @@ -206,11 +184,9 @@ function refreshSubscribersCache(websubhub:VerifiedSubscription[] persistedSubsc _ = subscribersCache.removeIfHasKey(sub); } } - log:printInfo("Exiting refreshSubscribersCache method..."); } function startMissingSubscribers(websubhub:VerifiedSubscription[] persistedSubscribers) returns error? { - log:printInfo("Entered startMissingSubscribers method..."); foreach var subscriber in persistedSubscribers { string topicName = util:sanitizeTopicName(subscriber.hubTopic); string subscriberId = util:generateSubscriberId(subscriber.hubTopic, subscriber.hubCallback); @@ -245,7 +221,6 @@ function startMissingSubscribers(websubhub:VerifiedSubscription[] persistedSubsc }, timeout: config:MESSAGE_DELIVERY_TIMEOUT }); - log:printInfo("Calling pollForNewUpdates method..."); _ = @strand { thread: "any" } start pollForNewUpdates(hubClientEp, consumerEp, topicName, subscriberId, subscriber.hubCallback); } } @@ -293,7 +268,6 @@ isolated function isValidConsumer(string topicName, string subscriberId, string } isolated function notifySubscribers(kafka:BytesConsumerRecord[] records, websubhub:HubClient clientEp, kafka:Consumer consumerEp, string topic, string callback) returns error? { - log:printInfo("Entered NotifySubscribers method..."); foreach kafka:BytesConsumerRecord kafkaRecord in records { websubhub:ContentDistributionMessage|error message = deSerializeKafkaRecord(kafkaRecord);