From e8bf1b943416d75a33d69cbc33cd5b3725e7430a Mon Sep 17 00:00:00 2001 From: Luke Warren Date: Fri, 27 Jan 2023 16:13:58 +0200 Subject: [PATCH 1/3] fix: connection string invalid for queue limit > 100 --- PurpleExplorer/Helpers/QueueHelper.cs | 83 ++++++++++++++++++--------- 1 file changed, 57 insertions(+), 26 deletions(-) diff --git a/PurpleExplorer/Helpers/QueueHelper.cs b/PurpleExplorer/Helpers/QueueHelper.cs index 1e674da..0befaf3 100644 --- a/PurpleExplorer/Helpers/QueueHelper.cs +++ b/PurpleExplorer/Helpers/QueueHelper.cs @@ -5,6 +5,7 @@ using System.Threading.Tasks; using Microsoft.Azure.ServiceBus; using Microsoft.Azure.ServiceBus.Core; +using Microsoft.Azure.ServiceBus.Management; using PurpleExplorer.Models; using Message = PurpleExplorer.Models.Message; using AzureMessage = Microsoft.Azure.ServiceBus.Message; @@ -13,6 +14,8 @@ namespace PurpleExplorer.Helpers; public class QueueHelper : BaseHelper, IQueueHelper { + private const int MaxQueueListFetchCountPerPage = 100; + private readonly AppSettings _appSettings; public QueueHelper(AppSettings appSettings) @@ -22,29 +25,15 @@ public QueueHelper(AppSettings appSettings) public async Task> GetQueues(ServiceBusConnectionString connectionString) { - IList queues = new List(); var client = GetManagementClient(connectionString); - var queuesInfo = await client.GetQueuesRuntimeInfoAsync(_appSettings.QueueListFetchCount); - await client.CloseAsync(); - - await Task.WhenAll(queuesInfo.Select(async queue => - { - var queueName = queue.Path; - - var newQueue = new ServiceBusQueue(queue) - { - Name = queueName - }; - - queues.Add(newQueue); - })); - + var queues = await GetQueues(client); + await client.CloseAsync(); return queues; } - + public async Task SendMessage(ServiceBusConnectionString connectionString, string queueName, string content) { - var message = new AzureMessage {Body = Encoding.UTF8.GetBytes(content)}; + var message = new AzureMessage { Body = Encoding.UTF8.GetBytes(content) }; await SendMessage(connectionString, queueName, message); } @@ -72,7 +61,7 @@ public async Task> GetDlqMessages(ServiceBusConnectionString conn return receivedMessages.Select(message => new Message(message, true)).ToList(); } - + public async Task DeadletterMessage(ServiceBusConnectionString connectionString, string queue, Message message) { var receiver = GetMessageReceiver(connectionString, queue, ReceiveMode.PeekLock); @@ -95,7 +84,7 @@ public async Task DeadletterMessage(ServiceBusConnectionString connectionString, await receiver.CloseAsync(); } - + public async Task DeleteMessage(ServiceBusConnectionString connectionString, string queue, Message message, bool isDlq) { @@ -121,18 +110,19 @@ public async Task DeleteMessage(ServiceBusConnectionString connectionString, str await receiver.CloseAsync(); } - - private async Task PeekDlqMessageBySequenceNumber(ServiceBusConnectionString connectionString, string queue, long sequenceNumber) + + private async Task PeekDlqMessageBySequenceNumber(ServiceBusConnectionString connectionString, + string queue, long sequenceNumber) { var deadletterPath = EntityNameHelper.FormatDeadLetterPath(queue); var receiver = GetMessageReceiver(connectionString, deadletterPath, ReceiveMode.PeekLock); var azureMessage = await receiver.PeekBySequenceNumberAsync(sequenceNumber); await receiver.CloseAsync(); - + return azureMessage; } - + public async Task ResubmitDlqMessage(ServiceBusConnectionString connectionString, string queue, Message message) { var azureMessage = await PeekDlqMessageBySequenceNumber(connectionString, queue, message.SequenceNumber); @@ -165,7 +155,7 @@ public async Task PurgeMessages(ServiceBusConnectionString connectionStrin await receiver.CloseAsync(); return purgedCount; } - + public async Task TransferDlqMessages(ServiceBusConnectionString connectionString, string queuePath) { var path = EntityNameHelper.FormatDeadLetterPath(queuePath); @@ -193,7 +183,7 @@ public async Task TransferDlqMessages(ServiceBusConnectionString connectio } finally { - if (receiver != null) + if (receiver != null) await receiver.CloseAsync(); if (sender != null) @@ -202,4 +192,45 @@ public async Task TransferDlqMessages(ServiceBusConnectionString connectio return transferredCount; } + + private async Task> GetQueues(ManagementClient client) + { + var queueInfos = new List(); + var numberOfPages = _appSettings.QueueListFetchCount / MaxQueueListFetchCountPerPage; + var remainder = _appSettings.QueueListFetchCount % (numberOfPages * MaxQueueListFetchCountPerPage); + + for (int pageCount = 0; pageCount < numberOfPages; pageCount++) + { + var numberToSkip = MaxQueueListFetchCountPerPage * pageCount; + var page = await client.GetQueuesRuntimeInfoAsync(MaxQueueListFetchCountPerPage, numberToSkip); + if (page.Any()) + { + queueInfos.AddRange(page); + } + else + { + return queueInfos + .Select(q => new ServiceBusQueue(q) + { + Name = q.Path + }).ToList(); + } + } + + if (remainder > 0) + { + var numberAlreadyFetched = numberOfPages > 0 + ? MaxQueueListFetchCountPerPage * numberOfPages + : 0; + var remainingItems = await client.GetQueuesRuntimeInfoAsync( + remainder, + numberAlreadyFetched); + queueInfos.AddRange(remainingItems); + } + + return queueInfos.Select(q => new ServiceBusQueue(q) + { + Name = q.Path + }).ToList(); + } } \ No newline at end of file From e46bf9fdf804250b9cccfee4fcde859d1ad3601e Mon Sep 17 00:00:00 2001 From: Luke Warren Date: Fri, 27 Jan 2023 16:14:15 +0200 Subject: [PATCH 2/3] fix: connection string invalid for topic limit > 100 --- PurpleExplorer/Helpers/ITopicHelper.cs | 3 - PurpleExplorer/Helpers/TopicHelper.cs | 107 ++++++++++++++++++------- 2 files changed, 76 insertions(+), 34 deletions(-) diff --git a/PurpleExplorer/Helpers/ITopicHelper.cs b/PurpleExplorer/Helpers/ITopicHelper.cs index 0e7b6cf..7ca8b12 100644 --- a/PurpleExplorer/Helpers/ITopicHelper.cs +++ b/PurpleExplorer/Helpers/ITopicHelper.cs @@ -10,9 +10,6 @@ public interface ITopicHelper { public Task GetNamespaceInfo(ServiceBusConnectionString connectionString); public Task> GetTopicsAndSubscriptions(ServiceBusConnectionString connectionString); - public Task GetTopic(ServiceBusConnectionString connectionString, string topicPath, bool retrieveSubscriptions); - public Task> GetSubscriptions(ServiceBusConnectionString connectionString, string topicPath); - public Task GetSubscription(ServiceBusConnectionString connectionString, string topicPath, string subscriptionName); public Task> GetDlqMessages(ServiceBusConnectionString connectionString, string topic, string subscription); public Task> GetMessagesBySubscription(ServiceBusConnectionString connectionString, string topicName, string subscriptionName); public Task SendMessage(ServiceBusConnectionString connectionString, string topicPath, string content); diff --git a/PurpleExplorer/Helpers/TopicHelper.cs b/PurpleExplorer/Helpers/TopicHelper.cs index f8e1212..568f4f7 100644 --- a/PurpleExplorer/Helpers/TopicHelper.cs +++ b/PurpleExplorer/Helpers/TopicHelper.cs @@ -7,6 +7,7 @@ using System.Linq; using System.Text; using System.Threading.Tasks; +using AvaloniaEdit.Utils; using Message = PurpleExplorer.Models.Message; using AzureMessage = Microsoft.Azure.ServiceBus.Message; @@ -14,6 +15,7 @@ namespace PurpleExplorer.Helpers; public class TopicHelper : BaseHelper, ITopicHelper { + private const int MaxTopicListFetchCountPerPage = 100; private readonly AppSettings _appSettings; public TopicHelper(AppSettings appSettings) @@ -23,41 +25,76 @@ public TopicHelper(AppSettings appSettings) public async Task> GetTopicsAndSubscriptions(ServiceBusConnectionString connectionString) { - IList topics = new List(); var client = GetManagementClient(connectionString); - var busTopics = await client.GetTopicsAsync(_appSettings.TopicListFetchCount); + var topics = await GetTopicsWithSubscriptions(client); await client.CloseAsync(); + return topics; + } + + private async Task CreateTopicWithSubscriptions(ManagementClient client, TopicDescription topicDescription) + { + var topic = new ServiceBusTopic(topicDescription); + var subscriptions = await GetSubscriptions(client, topicDescription.Path); + topic.AddSubscriptions(subscriptions.ToArray()); + return topic; + } + + private async Task> GetTopicsWithSubscriptions(ManagementClient client) + { + var topicDescriptions = new List(); + var numberOfPages = _appSettings.TopicListFetchCount / MaxTopicListFetchCountPerPage; + var remainder = _appSettings.TopicListFetchCount % (numberOfPages * MaxTopicListFetchCountPerPage); - await Task.WhenAll(busTopics.Select(async topic => + for (int pageCount = 0; pageCount < numberOfPages; pageCount++) { - var newTopic = new ServiceBusTopic(topic); + var numberToSkip = MaxTopicListFetchCountPerPage * pageCount; + var page = await client.GetTopicsAsync(MaxTopicListFetchCountPerPage, numberToSkip); + if (page.Any()) + { + topicDescriptions.AddRange(page); + } + else + { + return (await Task.WhenAll(topicDescriptions + .Select(async topic => await CreateTopicWithSubscriptions(client, topic)))).ToList(); + } + } - var subscriptions = await GetSubscriptions(connectionString, newTopic.Name); - newTopic.AddSubscriptions(subscriptions.ToArray()); - topics.Add(newTopic); - })); + if (remainder > 0) + { + var numberAlreadyFetched = numberOfPages > 0 + ? MaxTopicListFetchCountPerPage * numberOfPages + : 0; + var remainingItems = await client.GetTopicsAsync( + remainder, + numberAlreadyFetched); + topicDescriptions.AddRange(remainingItems); + } - return topics; + var topics = await Task.WhenAll(topicDescriptions + .Select(async topic => await CreateTopicWithSubscriptions(client, topic))); + return topics.ToList(); } - public async Task GetTopic(ServiceBusConnectionString connectionString, string topicPath, bool retrieveSubscriptions) + public async Task GetTopic(ServiceBusConnectionString connectionString, string topicPath, + bool retrieveSubscriptions) { var client = GetManagementClient(connectionString); var busTopics = await client.GetTopicAsync(topicPath); - await client.CloseAsync(); - var newTopic = new ServiceBusTopic(busTopics); if (retrieveSubscriptions) { - var subscriptions = await GetSubscriptions(connectionString, newTopic.Name); + var subscriptions = await GetSubscriptions(client, newTopic.Name); newTopic.AddSubscriptions(subscriptions.ToArray()); } - + + await client.CloseAsync(); return newTopic; } - - public async Task GetSubscription(ServiceBusConnectionString connectionString, string topicPath, string subscriptionName) + + public async Task GetSubscription(ServiceBusConnectionString connectionString, + string topicPath, string subscriptionName) { var client = GetManagementClient(connectionString); var runtimeInfo = await client.GetSubscriptionRuntimeInfoAsync(topicPath, subscriptionName); @@ -66,12 +103,12 @@ public async Task GetSubscription(ServiceBusConnectionSt return new ServiceBusSubscription(runtimeInfo); } - public async Task> GetSubscriptions(ServiceBusConnectionString connectionString, string topicPath) + private async Task> GetSubscriptions( + ManagementClient client, + string topicPath) { IList subscriptions = new List(); - var client = GetManagementClient(connectionString); var topicSubscription = await client.GetSubscriptionsRuntimeInfoAsync(topicPath); - await client.CloseAsync(); foreach (var sub in topicSubscription) { @@ -81,7 +118,8 @@ public async Task> GetSubscriptions(ServiceBusConn return subscriptions; } - public async Task> GetMessagesBySubscription(ServiceBusConnectionString connectionString, string topicName, + public async Task> GetMessagesBySubscription(ServiceBusConnectionString connectionString, + string topicName, string subscriptionName) { var path = EntityNameHelper.FormatSubscriptionPath(topicName, subscriptionName); @@ -94,7 +132,8 @@ public async Task> GetMessagesBySubscription(ServiceBusConnection return result; } - public async Task> GetDlqMessages(ServiceBusConnectionString connectionString, string topic, string subscription) + public async Task> GetDlqMessages(ServiceBusConnectionString connectionString, string topic, + string subscription) { var path = EntityNameHelper.FormatSubscriptionPath(topic, subscription); var deadletterPath = EntityNameHelper.FormatDeadLetterPath(path); @@ -115,7 +154,7 @@ public async Task GetNamespaceInfo(ServiceBusConnectionString con public async Task SendMessage(ServiceBusConnectionString connectionString, string topicPath, string content) { - var message = new AzureMessage {Body = Encoding.UTF8.GetBytes(content)}; + var message = new AzureMessage { Body = Encoding.UTF8.GetBytes(content) }; await SendMessage(connectionString, topicPath, message); } @@ -126,7 +165,8 @@ public async Task SendMessage(ServiceBusConnectionString connectionString, strin await topicClient.CloseAsync(); } - public async Task DeleteMessage(ServiceBusConnectionString connectionString, string topicPath, string subscriptionPath, + public async Task DeleteMessage(ServiceBusConnectionString connectionString, string topicPath, + string subscriptionPath, Message message, bool isDlq) { var path = EntityNameHelper.FormatSubscriptionPath(topicPath, subscriptionPath); @@ -153,7 +193,8 @@ public async Task DeleteMessage(ServiceBusConnectionString connectionString, str await receiver.CloseAsync(); } - public async Task PurgeMessages(ServiceBusConnectionString connectionString, string topicPath, string subscriptionPath, + public async Task PurgeMessages(ServiceBusConnectionString connectionString, string topicPath, + string subscriptionPath, bool isDlq) { var path = EntityNameHelper.FormatSubscriptionPath(topicPath, subscriptionPath); @@ -177,11 +218,12 @@ public async Task PurgeMessages(ServiceBusConnectionString connectionStrin return purgedCount; } - public async Task TransferDlqMessages(ServiceBusConnectionString connectionString, string topicPath, string subscriptionPath) + public async Task TransferDlqMessages(ServiceBusConnectionString connectionString, string topicPath, + string subscriptionPath) { var path = EntityNameHelper.FormatSubscriptionPath(topicPath, subscriptionPath); path = EntityNameHelper.FormatDeadLetterPath(path); - + long transferredCount = 0; MessageReceiver receiver = null; TopicClient sender = null; @@ -205,7 +247,7 @@ public async Task TransferDlqMessages(ServiceBusConnectionString connectio } finally { - if (receiver != null) + if (receiver != null) await receiver.CloseAsync(); if (sender != null) @@ -215,7 +257,8 @@ public async Task TransferDlqMessages(ServiceBusConnectionString connectio return transferredCount; } - private async Task PeekDlqMessageBySequenceNumber(ServiceBusConnectionString connectionString, string topicPath, + private async Task PeekDlqMessageBySequenceNumber(ServiceBusConnectionString connectionString, + string topicPath, string subscriptionPath, long sequenceNumber) { var path = EntityNameHelper.FormatSubscriptionPath(topicPath, subscriptionPath); @@ -224,11 +267,12 @@ private async Task PeekDlqMessageBySequenceNumber(ServiceBusConnec var receiver = GetMessageReceiver(connectionString, deadletterPath, ReceiveMode.PeekLock); var azureMessage = await receiver.PeekBySequenceNumberAsync(sequenceNumber); await receiver.CloseAsync(); - + return azureMessage; } - public async Task ResubmitDlqMessage(ServiceBusConnectionString connectionString, string topicPath, string subscriptionPath, + public async Task ResubmitDlqMessage(ServiceBusConnectionString connectionString, string topicPath, + string subscriptionPath, Message message) { var azureMessage = await PeekDlqMessageBySequenceNumber(connectionString, topicPath, subscriptionPath, @@ -240,7 +284,8 @@ public async Task ResubmitDlqMessage(ServiceBusConnectionString connectionString await DeleteMessage(connectionString, topicPath, subscriptionPath, message, true); } - public async Task DeadletterMessage(ServiceBusConnectionString connectionString, string topicPath, string subscriptionPath, + public async Task DeadletterMessage(ServiceBusConnectionString connectionString, string topicPath, + string subscriptionPath, Message message) { var path = EntityNameHelper.FormatSubscriptionPath(topicPath, subscriptionPath); From 4bd4fcdb031f2e4f78e1faa0cc874f69b8c89e1d Mon Sep 17 00:00:00 2001 From: Luke Warren Date: Mon, 30 Jan 2023 09:34:31 +0200 Subject: [PATCH 3/3] chore: move max per page constant to BaseHelper.cs --- PurpleExplorer/Helpers/BaseHelper.cs | 2 ++ PurpleExplorer/Helpers/QueueHelper.cs | 12 +++++------- PurpleExplorer/Helpers/TopicHelper.cs | 11 +++++------ 3 files changed, 12 insertions(+), 13 deletions(-) diff --git a/PurpleExplorer/Helpers/BaseHelper.cs b/PurpleExplorer/Helpers/BaseHelper.cs index 17fa0db..5cd1aef 100644 --- a/PurpleExplorer/Helpers/BaseHelper.cs +++ b/PurpleExplorer/Helpers/BaseHelper.cs @@ -9,6 +9,8 @@ namespace PurpleExplorer.Helpers; public abstract class BaseHelper { + protected const int MaxRequestItemsPerPage = 100; + protected ManagementClient GetManagementClient(ServiceBusConnectionString connectionString) { if (connectionString.UseManagedIdentity) diff --git a/PurpleExplorer/Helpers/QueueHelper.cs b/PurpleExplorer/Helpers/QueueHelper.cs index 0befaf3..53d51f3 100644 --- a/PurpleExplorer/Helpers/QueueHelper.cs +++ b/PurpleExplorer/Helpers/QueueHelper.cs @@ -14,8 +14,6 @@ namespace PurpleExplorer.Helpers; public class QueueHelper : BaseHelper, IQueueHelper { - private const int MaxQueueListFetchCountPerPage = 100; - private readonly AppSettings _appSettings; public QueueHelper(AppSettings appSettings) @@ -196,13 +194,13 @@ public async Task TransferDlqMessages(ServiceBusConnectionString connectio private async Task> GetQueues(ManagementClient client) { var queueInfos = new List(); - var numberOfPages = _appSettings.QueueListFetchCount / MaxQueueListFetchCountPerPage; - var remainder = _appSettings.QueueListFetchCount % (numberOfPages * MaxQueueListFetchCountPerPage); + var numberOfPages = _appSettings.QueueListFetchCount / MaxRequestItemsPerPage; + var remainder = _appSettings.QueueListFetchCount % (numberOfPages * MaxRequestItemsPerPage); for (int pageCount = 0; pageCount < numberOfPages; pageCount++) { - var numberToSkip = MaxQueueListFetchCountPerPage * pageCount; - var page = await client.GetQueuesRuntimeInfoAsync(MaxQueueListFetchCountPerPage, numberToSkip); + var numberToSkip = MaxRequestItemsPerPage * pageCount; + var page = await client.GetQueuesRuntimeInfoAsync(MaxRequestItemsPerPage, numberToSkip); if (page.Any()) { queueInfos.AddRange(page); @@ -220,7 +218,7 @@ private async Task> GetQueues(ManagementClient client) if (remainder > 0) { var numberAlreadyFetched = numberOfPages > 0 - ? MaxQueueListFetchCountPerPage * numberOfPages + ? MaxRequestItemsPerPage * numberOfPages : 0; var remainingItems = await client.GetQueuesRuntimeInfoAsync( remainder, diff --git a/PurpleExplorer/Helpers/TopicHelper.cs b/PurpleExplorer/Helpers/TopicHelper.cs index 568f4f7..9774ee4 100644 --- a/PurpleExplorer/Helpers/TopicHelper.cs +++ b/PurpleExplorer/Helpers/TopicHelper.cs @@ -15,7 +15,6 @@ namespace PurpleExplorer.Helpers; public class TopicHelper : BaseHelper, ITopicHelper { - private const int MaxTopicListFetchCountPerPage = 100; private readonly AppSettings _appSettings; public TopicHelper(AppSettings appSettings) @@ -42,13 +41,13 @@ private async Task CreateTopicWithSubscriptions(ManagementClien private async Task> GetTopicsWithSubscriptions(ManagementClient client) { var topicDescriptions = new List(); - var numberOfPages = _appSettings.TopicListFetchCount / MaxTopicListFetchCountPerPage; - var remainder = _appSettings.TopicListFetchCount % (numberOfPages * MaxTopicListFetchCountPerPage); + var numberOfPages = _appSettings.TopicListFetchCount / MaxRequestItemsPerPage; + var remainder = _appSettings.TopicListFetchCount % (numberOfPages * MaxRequestItemsPerPage); for (int pageCount = 0; pageCount < numberOfPages; pageCount++) { - var numberToSkip = MaxTopicListFetchCountPerPage * pageCount; - var page = await client.GetTopicsAsync(MaxTopicListFetchCountPerPage, numberToSkip); + var numberToSkip = MaxRequestItemsPerPage * pageCount; + var page = await client.GetTopicsAsync(MaxRequestItemsPerPage, numberToSkip); if (page.Any()) { topicDescriptions.AddRange(page); @@ -63,7 +62,7 @@ private async Task> GetTopicsWithSubscriptions(ManagementC if (remainder > 0) { var numberAlreadyFetched = numberOfPages > 0 - ? MaxTopicListFetchCountPerPage * numberOfPages + ? MaxRequestItemsPerPage * numberOfPages : 0; var remainingItems = await client.GetTopicsAsync( remainder,