Skip to content

Commit

Permalink
Fix
Browse files Browse the repository at this point in the history
  • Loading branch information
inikulshin authored Dec 14, 2023
1 parent eaffc33 commit fbd414b
Show file tree
Hide file tree
Showing 2 changed files with 143 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ public async Task Should_be_able_to_create_all_the_definitions_in_a_policy()
const long maxLengthBytes = 5000;
const Overflow overflow = Overflow.RejectPublish;
uint? consumerTimeout = fixture.RabbitmqVersion >= new Version("3.12") ? 3600000 : null;
Dictionary<string, object> extensionData = new Dictionary<string, object> { { "max-in-memory-length", 1000000 } };

await fixture.ManagementClient.CreatePolicyAsync(
new Policy(
Expand Down Expand Up @@ -265,7 +266,7 @@ await fixture.ManagementClient.CreatePolicyAsync(
MaxLengthBytes: maxLengthBytes,
Overflow: overflow,
ConsumerTimeout: consumerTimeout
),
) { ExtensionData = extensionData },
Priority: priority
)
);
Expand All @@ -292,7 +293,8 @@ await fixture.ManagementClient.CreatePolicyAsync(
&& p.Definition.MaxLength == maxLength
&& p.Definition.MaxLengthBytes == maxLengthBytes
&& p.Definition.Overflow == overflow
&& p.Definition.ConsumerTimeout == consumerTimeout)
&& p.Definition.ConsumerTimeout == consumerTimeout
&& p.Definition.ExtensionData.Keys.Order().SequenceEqual(extensionData.Keys.Order()))
);
}

Expand Down Expand Up @@ -1207,7 +1209,28 @@ public async Task Should_get_permissions()
public async Task Should_get_queues()
{
await CreateTestQueue(TestQueue);
(await fixture.ManagementClient.GetQueuesAsync()).Count.Should().BeGreaterThan(0);
while (true)
{
var queues = await fixture.ManagementClient.GetQueuesAsync();
queues.Should().NotBeNullOrEmpty();
if (queues[0].State != null)
{
queues[0].ExtensionData.Should().NotBeNullOrEmpty();
break;
}
else
{
queues[0].ExtensionData.Should().BeNull();
}
}
}

[Fact]
public async Task Should_get_queues_without_stats()
{
await CreateTestQueue(TestQueue);
var queues = await fixture.ManagementClient.GetQueuesWithoutStatsAsync();
queues.Count.Should().BeGreaterThan(0);
}


Expand Down
134 changes: 117 additions & 17 deletions Source/EasyNetQ.Management.Client/ManagementClientExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ public static Overview GetOverview(
.GetResult();
}


/// <summary>
/// A list of nodes in the RabbitMQ cluster.
/// </summary>
Expand Down Expand Up @@ -74,6 +73,24 @@ public static IReadOnlyList<Connection> GetConnections(
.GetResult();
}

/// <summary>
/// A list of all open connections on the specified VHost.
/// </summary>
/// <param name="client"></param>
/// <param name="vhostName"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public static IReadOnlyList<Connection> GetConnections(
this IManagementClient client,
string vhostName,
CancellationToken cancellationToken = default
)
{
return client.GetConnectionsAsync(vhostName, cancellationToken)
.GetAwaiter()
.GetResult();
}

/// <summary>
/// A list of all open channels.
/// </summary>
Expand All @@ -91,7 +108,25 @@ public static IReadOnlyList<Channel> GetChannels(
}

/// <summary>
/// A list of all open channels.
/// A list of all open channels for the given connection.
/// </summary>
/// <param name="client"></param>
/// <param name="connectionName"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public static IReadOnlyList<Channel> GetChannels(
this IManagementClient client,
string connectionName,
CancellationToken cancellationToken = default
)
{
return client.GetChannelsAsync(connectionName, cancellationToken)
.GetAwaiter()
.GetResult();
}

/// <summary>
/// A list of all open channels for the given connection.
/// </summary>
/// <param name="client"></param>
/// <param name="connection"></param>
Expand All @@ -103,6 +138,24 @@ public static Task<IReadOnlyList<Channel>> GetChannelsAsync(
CancellationToken cancellationToken = default
) => client.GetChannelsAsync(connection.Name, cancellationToken);

/// <summary>
/// A list of all open channels for the given connection.
/// </summary>
/// <param name="client"></param>
/// <param name="connection"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public static IReadOnlyList<Channel> GetChannels(
this IManagementClient client,
Connection connection,
CancellationToken cancellationToken = default
)
{
return client.GetChannelsAsync(connection, cancellationToken)
.GetAwaiter()
.GetResult();
}

/// <summary>
/// Gets the channel. This returns more detail, including consumers than the GetChannels method.
/// </summary>
Expand Down Expand Up @@ -139,6 +192,24 @@ public static IReadOnlyList<Exchange> GetExchanges(
.GetResult();
}

/// <summary>
/// A list of all exchanges for a virtual host.
/// </summary>
/// <param name="client"></param>
/// <param name="vhostName"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public static IReadOnlyList<Exchange> GetExchanges(
this IManagementClient client,
string vhostName,
CancellationToken cancellationToken = default
)
{
return client.GetExchangesAsync(vhostName, cancellationToken)
.GetAwaiter()
.GetResult();
}

/// <summary>
/// A list of all queues.
/// </summary>
Expand All @@ -155,6 +226,24 @@ public static IReadOnlyList<Queue> GetQueues(
.GetResult();
}

/// <summary>
/// A list of all queues for a virtual host.
/// </summary>
/// <param name="client"></param>
/// <param name="vhostName"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public static IReadOnlyList<Queue> GetQueues(
this IManagementClient client,
string vhostName,
CancellationToken cancellationToken = default
)
{
return client.GetQueuesAsync(vhostName, cancellationToken)
.GetAwaiter()
.GetResult();
}

/// <summary>
/// A list of all queues for a virtual host.
/// </summary>
Expand Down Expand Up @@ -239,6 +328,17 @@ public static PageResult<Queue> GetQueuesByPage(
.GetResult();
}

/// <summary>
/// A list of all queues without stats.
/// </summary>
/// <param name="client"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public static IReadOnlyList<QueueWithoutStats> GetQueuesWithoutStats(
this IManagementClient client,
CancellationToken cancellationToken = default
) => client.GetQueuesWithoutStatsAsync(cancellationToken).GetAwaiter().GetResult();

/// <summary>
/// A list of all bindings.
/// </summary>
Expand Down Expand Up @@ -1658,13 +1758,13 @@ public static Parameter GetShovel(
string vhostName,
string shovelName,
CancellationToken cancellationToken = default
)
{
return client.GetParameterAsync(vhostName, "shovel", shovelName, cancellationToken)
.GetAwaiter()
.GetResult();
)
{
return client.GetParameterAsync(vhostName, "shovel", shovelName, cancellationToken)
.GetAwaiter()
.GetResult();
}


/// <summary>
/// Creates a federation upstream in a specific vhost
/// </summary>
Expand All @@ -1680,7 +1780,7 @@ public static Task CreateFederationUpstreamAsync(
ParameterFederationValue federationUpstreamDescription,
CancellationToken cancellationToken = default
) => client.CreateParameterAsync("federation-upstream", vhostName, federationUpstreamName, federationUpstreamDescription, cancellationToken);


/// <summary>
/// Creates a federation upstream in a specific vhost
/// </summary>
Expand All @@ -1695,11 +1795,11 @@ public static void CreateFederationUpstream(
string federationUpstreamName,
ParameterFederationValue federationUpstreamDescription,
CancellationToken cancellationToken = default
)
{
)
{
client.CreateParameterAsync("federation-upstream", vhostName, federationUpstreamName, federationUpstreamDescription, cancellationToken)
.GetAwaiter()
.GetResult();
.GetResult();
}

/// <summary>
Expand Down Expand Up @@ -1728,10 +1828,10 @@ public static Parameter GetFederationUpstream(
string vhostName,
string federationUpstreamName,
CancellationToken cancellationToken = default
)
{
return client.GetParameterAsync(vhostName, "federation-upstream", federationUpstreamName, cancellationToken)
.GetAwaiter()
.GetResult();
)
{
return client.GetParameterAsync(vhostName, "federation-upstream", federationUpstreamName, cancellationToken)
.GetAwaiter()
.GetResult();
}
}

0 comments on commit fbd414b

Please sign in to comment.