Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: Connection Limit Invalid for Queue/Topic Limits Greater than 100 #73

Merged
merged 3 commits into from
Jan 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions PurpleExplorer/Helpers/BaseHelper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ namespace PurpleExplorer.Helpers;

public abstract class BaseHelper
{
protected const int MaxRequestItemsPerPage = 100;

protected ManagementClient GetManagementClient(ServiceBusConnectionString connectionString)
{
if (connectionString.UseManagedIdentity)
Expand Down
3 changes: 0 additions & 3 deletions PurpleExplorer/Helpers/ITopicHelper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,6 @@ public interface ITopicHelper
{
public Task<NamespaceInfo> GetNamespaceInfo(ServiceBusConnectionString connectionString);
public Task<IList<ServiceBusTopic>> GetTopicsAndSubscriptions(ServiceBusConnectionString connectionString);
public Task<ServiceBusTopic> GetTopic(ServiceBusConnectionString connectionString, string topicPath, bool retrieveSubscriptions);
public Task<IList<ServiceBusSubscription>> GetSubscriptions(ServiceBusConnectionString connectionString, string topicPath);
public Task<ServiceBusSubscription> GetSubscription(ServiceBusConnectionString connectionString, string topicPath, string subscriptionName);
public Task<IList<Message>> GetDlqMessages(ServiceBusConnectionString connectionString, string topic, string subscription);
public Task<IList<Models.Message>> GetMessagesBySubscription(ServiceBusConnectionString connectionString, string topicName, string subscriptionName);
public Task SendMessage(ServiceBusConnectionString connectionString, string topicPath, string content);
Expand Down
81 changes: 55 additions & 26 deletions PurpleExplorer/Helpers/QueueHelper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.Threading.Tasks;
using Microsoft.Azure.ServiceBus;
using Microsoft.Azure.ServiceBus.Core;
using Microsoft.Azure.ServiceBus.Management;
using PurpleExplorer.Models;
using Message = PurpleExplorer.Models.Message;
using AzureMessage = Microsoft.Azure.ServiceBus.Message;
Expand All @@ -22,29 +23,15 @@ public QueueHelper(AppSettings appSettings)

public async Task<IList<ServiceBusQueue>> GetQueues(ServiceBusConnectionString connectionString)
{
IList<ServiceBusQueue> queues = new List<ServiceBusQueue>();
var client = GetManagementClient(connectionString);
var queuesInfo = await client.GetQueuesRuntimeInfoAsync(_appSettings.QueueListFetchCount);
await client.CloseAsync();

await Task.WhenAll(queuesInfo.Select(async queue =>
{
var queueName = queue.Path;

var newQueue = new ServiceBusQueue(queue)
{
Name = queueName
};

queues.Add(newQueue);
}));

var queues = await GetQueues(client);
await client.CloseAsync();
return queues;
}

public async Task SendMessage(ServiceBusConnectionString connectionString, string queueName, string content)
{
var message = new AzureMessage {Body = Encoding.UTF8.GetBytes(content)};
var message = new AzureMessage { Body = Encoding.UTF8.GetBytes(content) };
await SendMessage(connectionString, queueName, message);
}

Expand Down Expand Up @@ -72,7 +59,7 @@ public async Task<IList<Message>> GetDlqMessages(ServiceBusConnectionString conn

return receivedMessages.Select(message => new Message(message, true)).ToList();
}

public async Task DeadletterMessage(ServiceBusConnectionString connectionString, string queue, Message message)
{
var receiver = GetMessageReceiver(connectionString, queue, ReceiveMode.PeekLock);
Expand All @@ -95,7 +82,7 @@ public async Task DeadletterMessage(ServiceBusConnectionString connectionString,

await receiver.CloseAsync();
}

public async Task DeleteMessage(ServiceBusConnectionString connectionString, string queue,
Message message, bool isDlq)
{
Expand All @@ -121,18 +108,19 @@ public async Task DeleteMessage(ServiceBusConnectionString connectionString, str

await receiver.CloseAsync();
}

private async Task<AzureMessage> PeekDlqMessageBySequenceNumber(ServiceBusConnectionString connectionString, string queue, long sequenceNumber)

private async Task<AzureMessage> PeekDlqMessageBySequenceNumber(ServiceBusConnectionString connectionString,
string queue, long sequenceNumber)
{
var deadletterPath = EntityNameHelper.FormatDeadLetterPath(queue);

var receiver = GetMessageReceiver(connectionString, deadletterPath, ReceiveMode.PeekLock);
var azureMessage = await receiver.PeekBySequenceNumberAsync(sequenceNumber);
await receiver.CloseAsync();

return azureMessage;
}

public async Task ResubmitDlqMessage(ServiceBusConnectionString connectionString, string queue, Message message)
{
var azureMessage = await PeekDlqMessageBySequenceNumber(connectionString, queue, message.SequenceNumber);
Expand Down Expand Up @@ -165,7 +153,7 @@ public async Task<long> PurgeMessages(ServiceBusConnectionString connectionStrin
await receiver.CloseAsync();
return purgedCount;
}

public async Task<long> TransferDlqMessages(ServiceBusConnectionString connectionString, string queuePath)
{
var path = EntityNameHelper.FormatDeadLetterPath(queuePath);
Expand Down Expand Up @@ -193,7 +181,7 @@ public async Task<long> TransferDlqMessages(ServiceBusConnectionString connectio
}
finally
{
if (receiver != null)
if (receiver != null)
await receiver.CloseAsync();

if (sender != null)
Expand All @@ -202,4 +190,45 @@ public async Task<long> TransferDlqMessages(ServiceBusConnectionString connectio

return transferredCount;
}

private async Task<List<ServiceBusQueue>> GetQueues(ManagementClient client)
{
var queueInfos = new List<QueueRuntimeInfo>();
var numberOfPages = _appSettings.QueueListFetchCount / MaxRequestItemsPerPage;
var remainder = _appSettings.QueueListFetchCount % (numberOfPages * MaxRequestItemsPerPage);

for (int pageCount = 0; pageCount < numberOfPages; pageCount++)
{
var numberToSkip = MaxRequestItemsPerPage * pageCount;
var page = await client.GetQueuesRuntimeInfoAsync(MaxRequestItemsPerPage, numberToSkip);
if (page.Any())
{
queueInfos.AddRange(page);
}
else
{
return queueInfos
.Select(q => new ServiceBusQueue(q)
{
Name = q.Path
}).ToList();
}
}

if (remainder > 0)
{
var numberAlreadyFetched = numberOfPages > 0
? MaxRequestItemsPerPage * numberOfPages
: 0;
var remainingItems = await client.GetQueuesRuntimeInfoAsync(
remainder,
numberAlreadyFetched);
queueInfos.AddRange(remainingItems);
}

return queueInfos.Select(q => new ServiceBusQueue(q)
{
Name = q.Path
}).ToList();
}
}
106 changes: 75 additions & 31 deletions PurpleExplorer/Helpers/TopicHelper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using AvaloniaEdit.Utils;
using Message = PurpleExplorer.Models.Message;
using AzureMessage = Microsoft.Azure.ServiceBus.Message;

Expand All @@ -23,41 +24,76 @@ public TopicHelper(AppSettings appSettings)

public async Task<IList<ServiceBusTopic>> GetTopicsAndSubscriptions(ServiceBusConnectionString connectionString)
{
IList<ServiceBusTopic> topics = new List<ServiceBusTopic>();
var client = GetManagementClient(connectionString);
var busTopics = await client.GetTopicsAsync(_appSettings.TopicListFetchCount);
var topics = await GetTopicsWithSubscriptions(client);
await client.CloseAsync();
return topics;
}

private async Task<ServiceBusTopic> CreateTopicWithSubscriptions(ManagementClient client, TopicDescription topicDescription)
{
var topic = new ServiceBusTopic(topicDescription);
var subscriptions = await GetSubscriptions(client, topicDescription.Path);
topic.AddSubscriptions(subscriptions.ToArray());
return topic;
}

private async Task<List<ServiceBusTopic>> GetTopicsWithSubscriptions(ManagementClient client)
{
var topicDescriptions = new List<TopicDescription>();
var numberOfPages = _appSettings.TopicListFetchCount / MaxRequestItemsPerPage;
var remainder = _appSettings.TopicListFetchCount % (numberOfPages * MaxRequestItemsPerPage);

await Task.WhenAll(busTopics.Select(async topic =>
for (int pageCount = 0; pageCount < numberOfPages; pageCount++)
{
var newTopic = new ServiceBusTopic(topic);
var numberToSkip = MaxRequestItemsPerPage * pageCount;
var page = await client.GetTopicsAsync(MaxRequestItemsPerPage, numberToSkip);
if (page.Any())
{
topicDescriptions.AddRange(page);
}
else
{
return (await Task.WhenAll(topicDescriptions
.Select(async topic => await CreateTopicWithSubscriptions(client, topic)))).ToList();
}
}

var subscriptions = await GetSubscriptions(connectionString, newTopic.Name);
newTopic.AddSubscriptions(subscriptions.ToArray());
topics.Add(newTopic);
}));
if (remainder > 0)
{
var numberAlreadyFetched = numberOfPages > 0
? MaxRequestItemsPerPage * numberOfPages
: 0;
var remainingItems = await client.GetTopicsAsync(
remainder,
numberAlreadyFetched);
topicDescriptions.AddRange(remainingItems);
}

return topics;
var topics = await Task.WhenAll(topicDescriptions
.Select(async topic => await CreateTopicWithSubscriptions(client, topic)));
return topics.ToList();
}

public async Task<ServiceBusTopic> GetTopic(ServiceBusConnectionString connectionString, string topicPath, bool retrieveSubscriptions)
public async Task<ServiceBusTopic> GetTopic(ServiceBusConnectionString connectionString, string topicPath,
bool retrieveSubscriptions)
{
var client = GetManagementClient(connectionString);
var busTopics = await client.GetTopicAsync(topicPath);
await client.CloseAsync();

var newTopic = new ServiceBusTopic(busTopics);

if (retrieveSubscriptions)
{
var subscriptions = await GetSubscriptions(connectionString, newTopic.Name);
var subscriptions = await GetSubscriptions(client, newTopic.Name);
newTopic.AddSubscriptions(subscriptions.ToArray());
}


await client.CloseAsync();
return newTopic;
}

public async Task<ServiceBusSubscription> GetSubscription(ServiceBusConnectionString connectionString, string topicPath, string subscriptionName)

public async Task<ServiceBusSubscription> GetSubscription(ServiceBusConnectionString connectionString,
string topicPath, string subscriptionName)
{
var client = GetManagementClient(connectionString);
var runtimeInfo = await client.GetSubscriptionRuntimeInfoAsync(topicPath, subscriptionName);
Expand All @@ -66,12 +102,12 @@ public async Task<ServiceBusSubscription> GetSubscription(ServiceBusConnectionSt
return new ServiceBusSubscription(runtimeInfo);
}

public async Task<IList<ServiceBusSubscription>> GetSubscriptions(ServiceBusConnectionString connectionString, string topicPath)
private async Task<IList<ServiceBusSubscription>> GetSubscriptions(
ManagementClient client,
string topicPath)
{
IList<ServiceBusSubscription> subscriptions = new List<ServiceBusSubscription>();
var client = GetManagementClient(connectionString);
var topicSubscription = await client.GetSubscriptionsRuntimeInfoAsync(topicPath);
await client.CloseAsync();

foreach (var sub in topicSubscription)
{
Expand All @@ -81,7 +117,8 @@ public async Task<IList<ServiceBusSubscription>> GetSubscriptions(ServiceBusConn
return subscriptions;
}

public async Task<IList<Message>> GetMessagesBySubscription(ServiceBusConnectionString connectionString, string topicName,
public async Task<IList<Message>> GetMessagesBySubscription(ServiceBusConnectionString connectionString,
string topicName,
string subscriptionName)
{
var path = EntityNameHelper.FormatSubscriptionPath(topicName, subscriptionName);
Expand All @@ -94,7 +131,8 @@ public async Task<IList<Message>> GetMessagesBySubscription(ServiceBusConnection
return result;
}

public async Task<IList<Message>> GetDlqMessages(ServiceBusConnectionString connectionString, string topic, string subscription)
public async Task<IList<Message>> GetDlqMessages(ServiceBusConnectionString connectionString, string topic,
string subscription)
{
var path = EntityNameHelper.FormatSubscriptionPath(topic, subscription);
var deadletterPath = EntityNameHelper.FormatDeadLetterPath(path);
Expand All @@ -115,7 +153,7 @@ public async Task<NamespaceInfo> GetNamespaceInfo(ServiceBusConnectionString con

public async Task SendMessage(ServiceBusConnectionString connectionString, string topicPath, string content)
{
var message = new AzureMessage {Body = Encoding.UTF8.GetBytes(content)};
var message = new AzureMessage { Body = Encoding.UTF8.GetBytes(content) };
await SendMessage(connectionString, topicPath, message);
}

Expand All @@ -126,7 +164,8 @@ public async Task SendMessage(ServiceBusConnectionString connectionString, strin
await topicClient.CloseAsync();
}

public async Task DeleteMessage(ServiceBusConnectionString connectionString, string topicPath, string subscriptionPath,
public async Task DeleteMessage(ServiceBusConnectionString connectionString, string topicPath,
string subscriptionPath,
Message message, bool isDlq)
{
var path = EntityNameHelper.FormatSubscriptionPath(topicPath, subscriptionPath);
Expand All @@ -153,7 +192,8 @@ public async Task DeleteMessage(ServiceBusConnectionString connectionString, str
await receiver.CloseAsync();
}

public async Task<long> PurgeMessages(ServiceBusConnectionString connectionString, string topicPath, string subscriptionPath,
public async Task<long> PurgeMessages(ServiceBusConnectionString connectionString, string topicPath,
string subscriptionPath,
bool isDlq)
{
var path = EntityNameHelper.FormatSubscriptionPath(topicPath, subscriptionPath);
Expand All @@ -177,11 +217,12 @@ public async Task<long> PurgeMessages(ServiceBusConnectionString connectionStrin
return purgedCount;
}

public async Task<long> TransferDlqMessages(ServiceBusConnectionString connectionString, string topicPath, string subscriptionPath)
public async Task<long> TransferDlqMessages(ServiceBusConnectionString connectionString, string topicPath,
string subscriptionPath)
{
var path = EntityNameHelper.FormatSubscriptionPath(topicPath, subscriptionPath);
path = EntityNameHelper.FormatDeadLetterPath(path);

long transferredCount = 0;
MessageReceiver receiver = null;
TopicClient sender = null;
Expand All @@ -205,7 +246,7 @@ public async Task<long> TransferDlqMessages(ServiceBusConnectionString connectio
}
finally
{
if (receiver != null)
if (receiver != null)
await receiver.CloseAsync();

if (sender != null)
Expand All @@ -215,7 +256,8 @@ public async Task<long> TransferDlqMessages(ServiceBusConnectionString connectio
return transferredCount;
}

private async Task<AzureMessage> PeekDlqMessageBySequenceNumber(ServiceBusConnectionString connectionString, string topicPath,
private async Task<AzureMessage> PeekDlqMessageBySequenceNumber(ServiceBusConnectionString connectionString,
string topicPath,
string subscriptionPath, long sequenceNumber)
{
var path = EntityNameHelper.FormatSubscriptionPath(topicPath, subscriptionPath);
Expand All @@ -224,11 +266,12 @@ private async Task<AzureMessage> PeekDlqMessageBySequenceNumber(ServiceBusConnec
var receiver = GetMessageReceiver(connectionString, deadletterPath, ReceiveMode.PeekLock);
var azureMessage = await receiver.PeekBySequenceNumberAsync(sequenceNumber);
await receiver.CloseAsync();

return azureMessage;
}

public async Task ResubmitDlqMessage(ServiceBusConnectionString connectionString, string topicPath, string subscriptionPath,
public async Task ResubmitDlqMessage(ServiceBusConnectionString connectionString, string topicPath,
string subscriptionPath,
Message message)
{
var azureMessage = await PeekDlqMessageBySequenceNumber(connectionString, topicPath, subscriptionPath,
Expand All @@ -240,7 +283,8 @@ public async Task ResubmitDlqMessage(ServiceBusConnectionString connectionString
await DeleteMessage(connectionString, topicPath, subscriptionPath, message, true);
}

public async Task DeadletterMessage(ServiceBusConnectionString connectionString, string topicPath, string subscriptionPath,
public async Task DeadletterMessage(ServiceBusConnectionString connectionString, string topicPath,
string subscriptionPath,
Message message)
{
var path = EntityNameHelper.FormatSubscriptionPath(topicPath, subscriptionPath);
Expand Down