Skip to content

Commit

Permalink
Add IManagementClient.GetShovelStatusesAsync() (api/shovels)
Browse files Browse the repository at this point in the history
  • Loading branch information
inikulshin authored Dec 25, 2023
1 parent b43357f commit bfd8f2c
Show file tree
Hide file tree
Showing 6 changed files with 289 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ namespace EasyNetQ.Management.Client
System.Threading.Tasks.Task<EasyNetQ.Management.Client.Model.PageResult<EasyNetQ.Management.Client.Model.Queue>> GetQueuesByPageAsync(EasyNetQ.Management.Client.Model.PageCriteria pageCriteria, System.Threading.CancellationToken cancellationToken = default);
System.Threading.Tasks.Task<EasyNetQ.Management.Client.Model.PageResult<EasyNetQ.Management.Client.Model.Queue>> GetQueuesByPageAsync(string vhostName, EasyNetQ.Management.Client.Model.PageCriteria pageCriteria, System.Threading.CancellationToken cancellationToken = default);
System.Threading.Tasks.Task<System.Collections.Generic.IReadOnlyList<EasyNetQ.Management.Client.Model.QueueWithoutStats>> GetQueuesWithoutStatsAsync(System.Threading.CancellationToken cancellationToken = default);
System.Threading.Tasks.Task<System.Collections.Generic.IReadOnlyList<EasyNetQ.Management.Client.Model.ShovelStatus>> GetShovelStatusesAsync(System.Threading.CancellationToken cancellationToken = default);
System.Threading.Tasks.Task<System.Collections.Generic.IReadOnlyList<EasyNetQ.Management.Client.Model.ShovelStatus>> GetShovelStatusesAsync(string vhostName, System.Threading.CancellationToken cancellationToken = default);
System.Threading.Tasks.Task<System.Collections.Generic.IReadOnlyList<EasyNetQ.Management.Client.Model.TopicPermission>> GetTopicPermissionsAsync(System.Threading.CancellationToken cancellationToken = default);
System.Threading.Tasks.Task<EasyNetQ.Management.Client.Model.User> GetUserAsync(string userName, System.Threading.CancellationToken cancellationToken = default);
System.Threading.Tasks.Task<System.Collections.Generic.IReadOnlyList<EasyNetQ.Management.Client.Model.User>> GetUsersAsync(System.Threading.CancellationToken cancellationToken = default);
Expand Down Expand Up @@ -139,6 +141,8 @@ namespace EasyNetQ.Management.Client
public System.Threading.Tasks.Task<EasyNetQ.Management.Client.Model.PageResult<EasyNetQ.Management.Client.Model.Queue>> GetQueuesByPageAsync(EasyNetQ.Management.Client.Model.PageCriteria pageCriteria, System.Threading.CancellationToken cancellationToken = default) { }
public System.Threading.Tasks.Task<EasyNetQ.Management.Client.Model.PageResult<EasyNetQ.Management.Client.Model.Queue>> GetQueuesByPageAsync(string vhostName, EasyNetQ.Management.Client.Model.PageCriteria pageCriteria, System.Threading.CancellationToken cancellationToken = default) { }
public System.Threading.Tasks.Task<System.Collections.Generic.IReadOnlyList<EasyNetQ.Management.Client.Model.QueueWithoutStats>> GetQueuesWithoutStatsAsync(System.Threading.CancellationToken cancellationToken = default) { }
public System.Threading.Tasks.Task<System.Collections.Generic.IReadOnlyList<EasyNetQ.Management.Client.Model.ShovelStatus>> GetShovelStatusesAsync(System.Threading.CancellationToken cancellationToken = default) { }
public System.Threading.Tasks.Task<System.Collections.Generic.IReadOnlyList<EasyNetQ.Management.Client.Model.ShovelStatus>> GetShovelStatusesAsync(string vhostName, System.Threading.CancellationToken cancellationToken = default) { }
public System.Threading.Tasks.Task<System.Collections.Generic.IReadOnlyList<EasyNetQ.Management.Client.Model.TopicPermission>> GetTopicPermissionsAsync(System.Threading.CancellationToken cancellationToken = default) { }
public System.Threading.Tasks.Task<EasyNetQ.Management.Client.Model.User> GetUserAsync(string userName, System.Threading.CancellationToken cancellationToken = default) { }
public System.Threading.Tasks.Task<System.Collections.Generic.IReadOnlyList<EasyNetQ.Management.Client.Model.User>> GetUsersAsync(System.Threading.CancellationToken cancellationToken = default) { }
Expand Down Expand Up @@ -1250,6 +1254,50 @@ namespace EasyNetQ.Management.Client.Model
public int MsgRatesIncr { get; init; }
public System.Collections.Generic.IReadOnlyDictionary<string, string> ToQueryParameters() { }
}
public class ShovelStatus : System.IEquatable<EasyNetQ.Management.Client.Model.ShovelStatus>
{
public ShovelStatus(
string Name,
string Vhost,
string Node,
string Timestamp,
string Type,
string State,
string? SrcProtocol = null,
string? SrcUri = null,
string? SrcQueue = null,
string? SrcExchange = null,
string? SrcExchangeKey = null,
string? DestProtocol = null,
string? DestUri = null,
string? DestQueue = null,
string? DestExchange = null,
string? DestExchangeKey = null,
string? BlockedStatus = null,
string? Reason = null) { }
[System.Text.Json.Serialization.JsonIgnore]
public System.Collections.Generic.IReadOnlyDictionary<string, object?>? ExtensionData { get; set; }
[System.Text.Json.Serialization.JsonExtensionData]
public System.Collections.Generic.IDictionary<string, System.Text.Json.JsonElement>? JsonExtensionData { get; set; }
public string? BlockedStatus { get; init; }
public string? DestExchangeKey { get; init; }
public string? DestExchange { get; init; }
public string? DestProtocol { get; init; }
public string? DestQueue { get; init; }
public string? DestUri { get; init; }
public string Name { get; init; }
public string Node { get; init; }
public string? Reason { get; init; }
public string? SrcExchangeKey { get; init; }
public string? SrcExchange { get; init; }
public string? SrcProtocol { get; init; }
public string? SrcQueue { get; init; }
public string? SrcUri { get; init; }
public string State { get; init; }
public string Timestamp { get; init; }
public string Type { get; init; }
public string Vhost { get; init; }
}
public class SocketOpts : System.IEquatable<EasyNetQ.Management.Client.Model.SocketOpts>
{
public SocketOpts(int? Backlog = default, bool? Nodelay = default, bool? ExitOnClose = default) { }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1419,4 +1419,148 @@ await fixture.ManagementClient.CreateShovelAsync(
var shovel = await fixture.ManagementClient.GetShovelAsync(Vhost.Name, shovelName);
Assert.Contains(shovelName, shovel.Name);
}

[Fact]
public async Task Should_be_able_to_get_shovel_statuses()
{
var srcUri = new AmqpUri(fixture.Endpoint.Host, 5672, fixture.User, fixture.Password);
var destUri = new AmqpUri(fixture.Endpoint.Host, 5672, fixture.User, fixture.Password);

var shovelName = "exchange-shovel";
var parameterShovelValue = new ParameterShovelValue
(
SrcProtocol: AmqpProtocol.AMQP091,
SrcUri: srcUri.ToString(),
SrcExchange: "test-exchange-src",
SrcExchangeKey: "aaa",
SrcDeleteAfter: "never",
SrcPrefetchCount: 10,
DestProtocol: AmqpProtocol.AMQP091,
DestUri: destUri.ToString(),
DestExchange: "test-exchange-dest",
DestExchangeKey: "bbb",
DestAddForwardHeaders: false,
DestAddTimestampHeader: false,
AckMode: "on-confirm",
ReconnectDelay: 10
);

await fixture.ManagementClient.CreateShovelAsync(
vhostName: Vhost.Name,
shovelName,
parameterShovelValue
);

while (true)
{
var shovelStatuses = await fixture.ManagementClient.GetShovelStatusesAsync();
try
{
shovelStatuses.Should().ContainEquivalentOf(
new ShovelStatus(
Name: shovelName,
Vhost: Vhost.Name,
Node: "rabbit@easynetq",
Timestamp: default,
Type: "dynamic",
State: "starting"),
options => options.Excluding(ss => ss.Timestamp));
}
catch
{
shovelStatuses.Should().ContainEquivalentOf(
new ShovelStatus(
Name: shovelName,
Vhost: Vhost.Name,
Node: "rabbit@easynetq",
Timestamp: default,
Type: "dynamic",
State: "terminated",

Reason: "\"needed a restart\""
),
options => options.Excluding(ss => ss.Timestamp));
break;
}
}
}

[Fact]
public async Task Should_be_able_to_get_shovel_statuses_by_vhost()
{
await fixture.ManagementClient.CreateVhostAsync(TestVHost);
var vhost = await fixture.ManagementClient.GetVhostAsync(TestVHost);
vhost.Name.Should().Be(TestVHost);

var srcUri = new AmqpUri(fixture.Endpoint.Host, 5672, fixture.User, fixture.Password);
var destUri = new AmqpUri(fixture.Endpoint.Host, 5672, fixture.User, fixture.Password);

var shovelName = "queue-shovel-vhost";
var parameterShovelValue = new ParameterShovelValue
(
SrcProtocol: AmqpProtocol.AMQP091,
SrcUri: srcUri.ToString(),
SrcQueue: "test-queue-src",
SrcQueueArguments: new Dictionary<string, object?> { { "x-queue-mode", "lazy" } },
SrcDeleteAfter: "never",
SrcPrefetchCount: 10,
DestProtocol: AmqpProtocol.AMQP091,
DestUri: destUri.ToString(),
DestQueue: "test-queue-dest",
DestQueueArguments: new Dictionary<string, object?> { { "x-queue-mode", "lazy" } },
DestAddForwardHeaders: false,
DestAddTimestampHeader: false,
AckMode: "on-confirm",
ReconnectDelay: 10
);

await fixture.ManagementClient.CreateShovelAsync(
vhostName: vhost.Name,
shovelName,
parameterShovelValue
);

while (true)
{
var shovelStatuses = await fixture.ManagementClient.GetShovelStatusesAsync(vhost.Name);
try
{
shovelStatuses.Should().ContainEquivalentOf(
new ShovelStatus(
Name: shovelName,
Vhost: vhost.Name,
Node: "rabbit@easynetq",
Timestamp: default,
Type: "dynamic",
State: "starting"),
options => options.Excluding(ss => ss.Timestamp));
}
catch
{
shovelStatuses.Should().ContainEquivalentOf(
new ShovelStatus(
Name: shovelName,
Vhost: vhost.Name,
Node: "rabbit@easynetq",
Timestamp: default,
Type: "dynamic",
State: "running",

SrcProtocol: parameterShovelValue.SrcProtocol,
SrcUri: parameterShovelValue.SrcUri,
SrcQueue: parameterShovelValue.SrcQueue,
SrcExchange: parameterShovelValue.SrcExchange,
SrcExchangeKey: parameterShovelValue.SrcExchangeKey,
DestProtocol: parameterShovelValue.DestProtocol,
DestUri: parameterShovelValue.DestUri,
DestQueue: parameterShovelValue.DestQueue,
DestExchange: parameterShovelValue.DestExchange,
DestExchangeKey: parameterShovelValue.DestExchangeKey,
BlockedStatus: "running"
),
options => options.Excluding(ss => ss.Timestamp));
break;
}
}
}
}
26 changes: 26 additions & 0 deletions Source/EasyNetQ.Management.Client/IManagementClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -747,4 +747,30 @@ Task DeleteParameterAsync(
/// <param name="cancellationToken"></param>
/// <returns></returns>
Task RebalanceQueuesAsync(CancellationToken cancellationToken = default);

/// <summary>
/// A list of all shovel statuses.
/// </summary>
/// <param name="cancellationToken"></param>
/// <returns></returns>
Task<IReadOnlyList<ShovelStatus>> GetShovelStatusesAsync(CancellationToken cancellationToken = default);

/// <summary>
/// A list of all shovel statuses for a virtual host.
/// </summary>
/// <param name="vhostName"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
Task<IReadOnlyList<ShovelStatus>> GetShovelStatusesAsync(string vhostName, CancellationToken cancellationToken = default);

// Current implementation in RabbitMQ has a lot of bugs
//
// /// <summary>
// /// Get an individual shovel status by name.
// /// </summary>
// /// <param name="vhostName"></param>
// /// <param name="shovelName"></param>
// /// <param name="cancellationToken"></param>
// /// <returns></returns>
// Task<ShovelStatus> GetShovelStatusAsync(string vhostName, string shovelName, CancellationToken cancellationToken = default);
}
11 changes: 11 additions & 0 deletions Source/EasyNetQ.Management.Client/ManagementClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public class ManagementClient : IManagementClient
private static readonly RelativePath Definitions = Api / "definitions";
private static readonly RelativePath Health = Api / "health";
private static readonly RelativePath Rebalance = Api / "rebalance";
private static readonly RelativePath ShovelStatuses = Api / "shovels";

private static readonly Dictionary<string, string> GetQueuesWithoutStatsQueryParameters = new Dictionary<string, string> {
{ "disable_stats", "true" },
Expand Down Expand Up @@ -823,4 +824,14 @@ private HttpRequestMessage CreateRequest(

return mergedQueryParameters;
}

public Task<IReadOnlyList<ShovelStatus>> GetShovelStatusesAsync(CancellationToken cancellationToken = default)
{
return GetAsync<IReadOnlyList<ShovelStatus>>(ShovelStatuses, cancellationToken);
}

public Task<IReadOnlyList<ShovelStatus>> GetShovelStatusesAsync(string vhostName, CancellationToken cancellationToken = default)
{
return GetAsync<IReadOnlyList<ShovelStatus>>(ShovelStatuses / vhostName, cancellationToken);
}
}
41 changes: 41 additions & 0 deletions Source/EasyNetQ.Management.Client/Model/ShovelStatus.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
using EasyNetQ.Management.Client.Serialization;
using System.Text.Json;
using System.Text.Json.Serialization;

namespace EasyNetQ.Management.Client.Model;

public record ShovelStatus
(
string Name,
string Vhost,
string Node,
[property: JsonConverter(typeof(DateTimeConverter))]
DateTime Timestamp,
string Type,
string State,

string? SrcProtocol = null,
string? SrcUri = null,
string? SrcQueue = null,
string? SrcExchange = null,
string? SrcExchangeKey = null,
string? DestProtocol = null,
string? DestUri = null,
string? DestQueue = null,
string? DestExchange = null,
string? DestExchangeKey = null,
string? BlockedStatus = null,

string? Reason = null
)
{
[JsonExtensionData()]
public IDictionary<string, JsonElement>? JsonExtensionData { get; set; }

[JsonIgnore()]
public IReadOnlyDictionary<string, object?>? ExtensionData
{
get { return JsonExtensionDataExtensions.ToExtensionData(JsonExtensionData); }
set { JsonExtensionData = JsonExtensionDataExtensions.ToJsonExtensionData(value); }
}
};
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
using System.Diagnostics;
using System.Text.Json;
using System.Text.Json.Serialization;

namespace EasyNetQ.Management.Client.Serialization;

internal class DateTimeConverter : JsonConverter<DateTime>
{
public override DateTime Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options)
{
Debug.Assert(typeToConvert == typeof(DateTime));
return DateTime.Parse(reader.GetString() ?? string.Empty);
}

public override void Write(Utf8JsonWriter writer, DateTime value, JsonSerializerOptions options)
{
writer.WriteStringValue(value.ToString());
}
}

0 comments on commit bfd8f2c

Please sign in to comment.