From b7d0910676bd56144034943746809eacb8968001 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=99=BA=E8=83=BD=E5=A4=A7=E7=9F=B3=E5=A4=B4?= Date: Thu, 11 Apr 2024 00:49:12 +0800 Subject: [PATCH] =?UTF-8?q?[improv]=E6=94=B9=E8=BF=9B=E9=98=BF=E9=87=8C?= =?UTF-8?q?=E4=BA=91rocketmq=E5=AF=B9=E6=8E=A5=E3=80=82=E5=9C=A8=E8=BF=87?= =?UTF-8?q?=E5=8E=BB=E4=B8=A4=E5=B9=B4=E6=97=B6=E9=97=B4=E9=87=8C=EF=BC=8C?= =?UTF-8?q?=E9=98=BF=E9=87=8C=E4=BA=91rocketmq=E5=81=9A=E4=BA=86=E5=8D=87?= =?UTF-8?q?=E7=BA=A7=EF=BC=8C=E5=AF=BC=E8=87=B4=E6=9F=90=E4=BA=9B=E6=8C=87?= =?UTF-8?q?=E4=BB=A4=E5=85=BC=E5=AE=B9=E6=80=A7=E6=B2=A1=E9=82=A3=E4=B9=88?= =?UTF-8?q?=E5=A5=BD=EF=BC=8C=E8=BF=99=E9=87=8C=E7=BB=9F=E4=B8=80=E5=81=9A?= =?UTF-8?q?=E5=85=BC=E5=AE=B9=E5=A4=84=E7=90=86=E3=80=82=E9=98=BF=E9=87=8C?= =?UTF-8?q?=E4=BA=91rmq=E7=9A=84=E7=BD=91=E7=BB=9C=E6=9E=B6=E6=9E=84?= =?UTF-8?q?=E9=9D=9E=E5=B8=B8=E7=89=B9=E6=AE=8A=EF=BC=8C=E5=9C=A8vpc?= =?UTF-8?q?=E5=86=85=E7=BD=91=E6=97=B6=EF=BC=8C=E5=B0=B1=E5=BD=93=E4=BD=9C?= =?UTF-8?q?=E6=99=AE=E9=80=9Armq=E4=BD=BF=E7=94=A8=EF=BC=8C=E6=B2=A1?= =?UTF-8?q?=E6=9C=89=E7=89=B9=E5=88=AB=E4=B9=8B=E5=A4=84=E3=80=82=E5=9C=A8?= =?UTF-8?q?=E5=85=AC=E7=BD=91=E6=97=B6=EF=BC=8C=E8=8E=B7=E5=8F=96=E5=BE=97?= =?UTF-8?q?=E5=88=B0=E7=9A=84broker=E5=AE=9E=E9=99=85=E4=B8=8A=E6=98=AF?= =?UTF-8?q?=E7=BD=91=E5=85=B3=EF=BC=8C=E7=84=B6=E5=90=8E=E8=8E=B7=E5=8F=96?= =?UTF-8?q?=E6=B6=88=E8=B4=B9=E7=BB=84=E7=8A=B6=E6=80=81=E6=97=B6=EF=BC=8C?= =?UTF-8?q?=E5=BE=97=E5=88=B0=E7=9A=84=E5=8D=B4=E6=98=AF=E5=86=85=E7=BD=91?= =?UTF-8?q?broker=E7=8A=B6=E6=80=81=EF=BC=8C=E8=BF=99=E9=87=8C=E4=BF=AE?= =?UTF-8?q?=E6=94=B9=E4=BB=A3=E7=A0=81=E5=BC=BA=E8=A1=8C=E9=80=9A=E8=BF=87?= =?UTF-8?q?=EF=BC=8C=E4=BD=86=E6=98=AF=E6=B6=88=E8=B4=B9=E6=97=B6=E4=BB=8D?= =?UTF-8?q?=E7=84=B6=E5=BE=97=E5=88=B0=E4=B8=8D=E6=94=AF=E6=8C=81lite=20pu?= =?UTF-8?q?ll=E7=9A=84=E9=94=99=E8=AF=AF=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- NewLife.RocketMQ/BrokerClient.cs | 8 +- NewLife.RocketMQ/Consumer.cs | 78 ++++------ NewLife.RocketMQ/MqBase.cs | 107 +++++++------ NewLife.RocketMQ/NameClient.cs | 3 + NewLife.RocketMQ/Producer.cs | 14 +- .../ConsumerStates/MessageQueueModel.cs | 53 ++++--- NewLife.RocketMQ/Protocol/MessageQueue.cs | 2 +- Test/Program.cs | 10 +- XUnitTestRocketMQ/AliyunIssuesTests.cs | 113 +++++++------- XUnitTestRocketMQ/AliyunTests.cs | 145 +++++++++--------- 10 files changed, 270 insertions(+), 263 deletions(-) diff --git a/NewLife.RocketMQ/BrokerClient.cs b/NewLife.RocketMQ/BrokerClient.cs index 2353d5d..c3189bb 100644 --- a/NewLife.RocketMQ/BrokerClient.cs +++ b/NewLife.RocketMQ/BrokerClient.cs @@ -99,15 +99,15 @@ public void Ping() // 生产者 和 消费者 略有不同 if (cfg is Producer pd) { - body.ProducerDataSet = new[] { + body.ProducerDataSet = [ new ProducerData { GroupName = pd.Group }, new ProducerData { GroupName = "CLIENT_INNER_PRODUCER" }, - }; - body.ConsumerDataSet = new ConsumerData[] { }; + ]; + body.ConsumerDataSet = []; } else if (cfg is Consumer cm) { - body.ProducerDataSet = new[] { new ProducerData { GroupName = "CLIENT_INNER_PRODUCER" } }; + body.ProducerDataSet = [new ProducerData { GroupName = "CLIENT_INNER_PRODUCER" }]; body.ConsumerDataSet = cm.Data.ToArray(); } diff --git a/NewLife.RocketMQ/Consumer.cs b/NewLife.RocketMQ/Consumer.cs index 29086db..c34eda3 100644 --- a/NewLife.RocketMQ/Consumer.cs +++ b/NewLife.RocketMQ/Consumer.cs @@ -80,50 +80,36 @@ protected override void Dispose(Boolean disposing) /// 启动 /// - public override Boolean Start() + protected override void OnStart() { - if (Active) return true; - WriteLog("正在准备消费 {0}", Topic); - using var span = Tracer?.NewSpan($"mq:{Topic}:Start"); - try + var list = Data; + if (list == null) { - var list = Data; - if (list == null) + // 建立消费者数据,用于心跳 + var sd = new SubscriptionData { - // 建立消费者数据,用于心跳 - var sd = new SubscriptionData - { - Topic = Topic, - TagsSet = Tags - }; - var cd = new ConsumerData - { - GroupName = Group, - ConsumeFromWhere = FromLastOffset ? "CONSUME_FROM_LAST_OFFSET" : "CONSUME_FROM_FIRST_OFFSET", - MessageModel = MessageModel.ToString().ToUpper(), - SubscriptionDataSet = new[] { sd }, - }; - - list = new List { cd }; - - Data = list; - } + Topic = Topic, + TagsSet = Tags + }; + var cd = new ConsumerData + { + GroupName = Group, + ConsumeFromWhere = FromLastOffset ? "CONSUME_FROM_LAST_OFFSET" : "CONSUME_FROM_FIRST_OFFSET", + MessageModel = MessageModel.ToString().ToUpper(), + SubscriptionDataSet = [sd], + }; - if (!base.Start()) return false; + list = [cd]; - // 默认自动开始调度 - if (AutoSchedule) StartSchedule(); + Data = list; } - catch (Exception ex) - { - span?.SetError(ex, null); - throw; - } + base.OnStart(); - return true; + // 默认自动开始调度 + if (AutoSchedule) StartSchedule(); } /// @@ -133,7 +119,7 @@ public override void Stop() { if (!Active) return; - using var span = Tracer?.NewSpan($"mq:{Topic}:Stop"); + using var span = Tracer?.NewSpan($"mq:{Name}:Stop"); try { // 停止并保存偏移 @@ -207,7 +193,7 @@ public async Task Pull(MessageQueue mq, Int64 offset, Int32 maxNums, else { pr.Status = PullStatus.Unknown; - Log.Warn("响应编号:{0} 响应备注:{1} 序列编号:{2} 序列偏移量:{3}", rs.Header.Code, rs.Header.Remark, mq.QueueId, offset); + Log.Warn("[{0}]{1} 序列编号:{2} 序列偏移量:{3}", (ResponseCode)rs.Header.Code, rs.Header.Remark, mq.QueueId, offset); } pr.Read(rs.Header?.ExtFields); @@ -331,7 +317,7 @@ public async Task> GetConsumers(String group = null) // 在所有Broker上查询 foreach (var item in Brokers) { - using var span = Tracer?.NewSpan($"mq:{Topic}:GetConsumers", item.Name); + using var span = Tracer?.NewSpan($"mq:{Name}:GetConsumers", item.Name); try { var bk = GetBroker(item.Name); @@ -476,7 +462,7 @@ private async Task DoPull(QueueStore st, CancellationToken cancellationToken) DefaultSpan.Current = null; // 性能埋点 - using var span = Tracer?.NewSpan($"mq:{Topic}:Consume", pr.Messages); + using var span = Tracer?.NewSpan($"mq:{Name}:Consume", pr.Messages); try { // 触发消费 @@ -512,7 +498,7 @@ private async Task DoPull(QueueStore st, CancellationToken cancellationToken) break; case PullStatus.Unknown: - Log.Error("未知响应类型消息序列[{1}]偏移量{0}", st.Offset, st.Queue.QueueId); + Log.Error("未知响应类型消息,序列[{1}]偏移量{0}", st.Offset, st.Queue.QueueId); break; default: break; @@ -587,7 +573,8 @@ private async Task PersistAll(IEnumerable stores) class QueueStore { - [XmlIgnore] public MessageQueue Queue { get; set; } + [XmlIgnore] + public MessageQueue Queue { get; set; } public Int64 Offset { get; set; } = -1; public Int64 CommitOffset { get; set; } = -1; @@ -602,6 +589,7 @@ class QueueStore /// public override Int32 GetHashCode() => (Queue == null ? 0 : Queue.GetHashCode()) ^ Offset.GetHashCode(); + public override String ToString() => Queue?.ToString(); #endregion } @@ -682,7 +670,7 @@ public async Task Rebalance() var str = dic.Join(";", e => $"{e.Key}[{e.Value}]"); WriteLog("消费重新平衡,当前消费者负责queue分片:{0}", str); - using var span = Tracer?.NewSpan($"mq:{Topic}:Rebalance", str); + using var span = Tracer?.NewSpan($"mq:{Name}:Rebalance", str); _Queues = rs.ToArray(); await InitOffsetAsync(); @@ -710,7 +698,7 @@ private async Task CheckGroup(Object state = null) if (_checking) return; _checking = true; - using var span = Tracer?.NewSpan($"mq:{Topic}:CheckGroup"); + using var span = Tracer?.NewSpan($"mq:{Name}:CheckGroup"); try { var rs = await Rebalance(); @@ -766,7 +754,8 @@ private async Task InitOffsetAsync(CancellationToken cancellationToken = default if (store.Offset >= 0) continue; var item = offsetTables.FirstOrDefault(t => t.Key.BrokerName == store.Queue.BrokerName && t.Key.QueueId == store.Queue.QueueId); - var offsetTable = item.Value; + //!! 阿里云公网版RocketMQ,消费者状态返回的是真正brokerName,而前面Broker得到的是网关名,导致这里无法匹配 + var offsetTable = item.Value ?? new OffsetWrapperModel(); if (neverConsumed) { var offset = 0L; @@ -937,10 +926,11 @@ private Command GetConsumerRunningInfo(Command cmd) ci.Properties = dic; var sd = new SubscriptionData { Topic = Topic, }; - ci.SubscriptionSet = new[] { sd }; + ci.SubscriptionSet = [sd]; var sb = new StringBuilder(); sb.Append('{'); + if (_Queues != null) { sb.Append("\"mqTable\":{"); for (var i = 0; i < _Queues.Length; i++) diff --git a/NewLife.RocketMQ/MqBase.cs b/NewLife.RocketMQ/MqBase.cs index f165d5f..ea15684 100644 --- a/NewLife.RocketMQ/MqBase.cs +++ b/NewLife.RocketMQ/MqBase.cs @@ -10,42 +10,19 @@ namespace NewLife.RocketMQ.Client; public abstract class MqBase : DisposeBase { #region 属性 + /// 名称 + public String Name { get; set; } + /// 名称服务器地址 public String NameServerAddress { get; set; } - private String _group = "DEFAULT_PRODUCER"; /// 消费组 /// 阿里云目前需要在Group前面带上实例ID并用【%】连接,组成路由Group[用来路由到实例Group] - public String Group - { - get - { - // 阿里云目前需要在Group前面带上实例ID并用【%】连接,组成路由Group[用来路由到实例Group] - var ins = Aliyun?.InstanceId; - return ins.IsNullOrEmpty() ? _group : $"{ins}%{_group}"; - } - set - { - _group = value; - } - } + public String Group { get; set; } = "DEFAULT_PRODUCER"; - private String _topic = "TBW102"; /// 主题 /// 阿里云目前需要在Topic前面带上实例ID并用【%】连接,组成路由Topic[用来路由到实例Topic] - public String Topic - { - get - { - // 阿里云目前需要在Topic前面带上实例ID并用【%】连接,组成路由Topic[用来路由到实例Topic] - var ins = Aliyun?.InstanceId; - return ins.IsNullOrEmpty() ? _topic : $"{ins}%{_topic}"; - } - set - { - _topic = value; - } - } + public String Topic { get; set; } = "TBW102"; /// 本地IP地址 public String ClientIP { get; set; } = NetHelper.MyIP() + ""; @@ -91,6 +68,9 @@ public String Topic /// 性能跟踪 public ITracer Tracer { get; set; } = DefaultTracer.Instance; + private String _group; + private String _topic; + /// 名称服务器 protected NameClient _NameServer; #endregion @@ -143,7 +123,7 @@ protected override void Dispose(Boolean disposing) /// 友好字符串 /// - public override String ToString() => Group; + public override String ToString() => _group; #endregion #region 基础方法 @@ -151,28 +131,63 @@ protected override void Dispose(Boolean disposing) /// public virtual void Configure(MqSetting setting) { - NameServerAddress = setting.NameServer; - Topic = setting.Topic; - Group = setting.Group; + if (!setting.NameServer.IsNullOrEmpty()) NameServerAddress = setting.NameServer; + if (!setting.Topic.IsNullOrEmpty()) Topic = setting.Topic; + if (!setting.Group.IsNullOrEmpty()) Group = setting.Group; + + Aliyun ??= new AliyunOptions(); + if (!setting.Server.IsNullOrEmpty()) Aliyun.Server = setting.Server; + if (!setting.AccessKey.IsNullOrEmpty()) Aliyun.AccessKey = setting.AccessKey; + if (!setting.SecretKey.IsNullOrEmpty()) Aliyun.SecretKey = setting.SecretKey; + } + + /// 开始 + /// + public Boolean Start() + { + if (Active) return true; - if (!setting.Server.IsNullOrEmpty() && - !setting.AccessKey.IsNullOrEmpty()) + _group = Group; + _topic = Topic; + if (Name.IsNullOrEmpty()) Name = Topic; + + // 解析阿里云实例 + var aliyun = Aliyun; + if (aliyun != null && !aliyun.AccessKey.IsNullOrEmpty()) + { + var ns = NameServerAddress; + if (aliyun.InstanceId.IsNullOrEmpty() && !ns.IsNullOrEmpty() && ns.Contains("MQ_INST_")) + { + aliyun.InstanceId = ns.Substring("://", "."); + } + } + + using var span = Tracer?.NewSpan($"mq:{Name}:Start"); + try { - Aliyun = new AliyunOptions + // 阿里云目前需要在Topic前面带上实例ID并用【%】连接,组成路由Topic[用来路由到实例Topic] + var ins = Aliyun?.InstanceId; + if (!ins.IsNullOrEmpty()) { - Server = setting.Server, - AccessKey = setting.AccessKey, - SecretKey = setting.SecretKey, - }; + if (!Topic.StartsWith(ins)) Topic = $"{ins}%{Topic}"; + if (!Group.StartsWith(ins)) Group = $"{ins}%{Group}"; + } + + OnStart(); + } + catch (Exception ex) + { + span?.SetError(ex, null); + + throw; } + + return Active = true; } /// 开始 - /// - public virtual Boolean Start() + protected virtual void OnStart() { - if (Active) return true; - if (NameServerAddress.IsNullOrEmpty()) { // 获取阿里云ONS的名称服务器地址 @@ -190,7 +205,7 @@ public virtual Boolean Start() var client = new NameClient(ClientId, this) { - Name = Topic, + Name = Name, Tracer = Tracer, Log = Log }; @@ -204,8 +219,6 @@ public virtual Boolean Start() } _NameServer = client; - - return Active = true; } /// 停止 @@ -313,7 +326,7 @@ public virtual void CreateTopic(String topic, Int32 queueNum, Int32 topicSysFlag order = false, }; - using var span = Tracer?.NewSpan($"mq:{Topic}:CreateTopic", header); + using var span = Tracer?.NewSpan($"mq:{Name}:CreateTopic", header); try { // 在所有Broker上创建Topic diff --git a/NewLife.RocketMQ/NameClient.cs b/NewLife.RocketMQ/NameClient.cs index 36efce5..1ff889c 100644 --- a/NewLife.RocketMQ/NameClient.cs +++ b/NewLife.RocketMQ/NameClient.cs @@ -46,6 +46,9 @@ protected override void Dispose(Boolean disposing) protected override void OnStart() { var cfg = Config; + if (cfg.NameServerAddress.IsNullOrEmpty()) + throw new ArgumentNullException(nameof(cfg.NameServerAddress), "未指定NameServer地址"); + var ss = cfg.NameServerAddress.Split(";"); var list = new List(); diff --git a/NewLife.RocketMQ/Producer.cs b/NewLife.RocketMQ/Producer.cs index 7c5d611..bb1a144 100644 --- a/NewLife.RocketMQ/Producer.cs +++ b/NewLife.RocketMQ/Producer.cs @@ -33,9 +33,9 @@ public class Producer : MqBase #region 基础方法 /// 启动 /// - public override Boolean Start() + protected override void OnStart() { - if (!base.Start()) return false; + base.OnStart(); LoadBalance ??= new WeightRoundRobin(); @@ -48,8 +48,6 @@ public override Boolean Start() LoadBalance.Ready = false; }; } - - return true; } #endregion @@ -72,7 +70,7 @@ public virtual SendResult Publish(Message message, MessageQueue queue, Int32 tim header.QueueId = mq.QueueId; // 性能埋点 - using var span = Tracer?.NewSpan($"mq:{Topic}:Publish", message.BodyString); + using var span = Tracer?.NewSpan($"mq:{Name}:Publish", message.BodyString); span?.AppendTag($"queue={mq}"); try { @@ -179,7 +177,7 @@ public virtual async Task PublishAsync(Message message, MessageQueue header.QueueId = mq.QueueId; // 性能埋点 - using var span = Tracer?.NewSpan($"mq:{Topic}:PublishAsync", message.BodyString); + using var span = Tracer?.NewSpan($"mq:{Name}:PublishAsync", message.BodyString); try { // 根据队列获取Broker客户端 @@ -260,7 +258,7 @@ public virtual SendResult PublishOneway(Message message, MessageQueue queue) header.QueueId = mq.QueueId; // 性能埋点 - using var span = Tracer?.NewSpan($"mq:{Topic}:PublishOneway", message.BodyString); + using var span = Tracer?.NewSpan($"mq:{Name}:PublishOneway", message.BodyString); try { // 根据队列获取Broker客户端 @@ -333,7 +331,7 @@ public virtual SendResult PublishDelay(Message message, MessageQueue queue, Dela header.QueueId = mq.QueueId; // 性能埋点 - using var span = Tracer?.NewSpan($"mq:{Topic}:PublishDelay", new { level, message.BodyString }); + using var span = Tracer?.NewSpan($"mq:{Name}:PublishDelay", new { level, message.BodyString }); try { // 根据队列获取Broker客户端 diff --git a/NewLife.RocketMQ/Protocol/ConsumerStates/MessageQueueModel.cs b/NewLife.RocketMQ/Protocol/ConsumerStates/MessageQueueModel.cs index 23ba90d..67cf1b3 100644 --- a/NewLife.RocketMQ/Protocol/ConsumerStates/MessageQueueModel.cs +++ b/NewLife.RocketMQ/Protocol/ConsumerStates/MessageQueueModel.cs @@ -1,33 +1,36 @@ -namespace NewLife.RocketMQ.Protocol.ConsumerStates +namespace NewLife.RocketMQ.Protocol.ConsumerStates; + +/// +/// 消息队列信息模型 +/// +public class MessageQueueModel { /// - /// 消息队列信息模型 + /// Broker服务器名称 /// - public class MessageQueueModel - { - /// - /// Broker服务器名称 - /// - public String BrokerName { get; set; } + public String BrokerName { get; set; } - /// - /// 队列编码 - /// - public Int32 QueueId { get; set; } + /// + /// 队列编码 + /// + public Int32 QueueId { get; set; } + + /// + /// 主题 + /// + public String Topic { get; set; } - /// - /// 主题 - /// - public String Topic { get; set; } + /// + /// 阿里版本返回字段 + /// + public Boolean MainQueue { get; set; } - /// - /// 阿里版本返回字段 - /// - public Boolean MainQueue { get; set; } + /// + /// 阿里版本返回字段 + /// + public Int32 QueueGroupId { get; set; } - /// - /// 阿里版本返回字段 - /// - public Int32 QueueGroupId { get; set; } - } + /// 已重载。 + /// + public override String ToString() => $"{BrokerName}[{QueueId}]"; } diff --git a/NewLife.RocketMQ/Protocol/MessageQueue.cs b/NewLife.RocketMQ/Protocol/MessageQueue.cs index b0e58a5..918881d 100644 --- a/NewLife.RocketMQ/Protocol/MessageQueue.cs +++ b/NewLife.RocketMQ/Protocol/MessageQueue.cs @@ -38,6 +38,6 @@ public override Int32 GetHashCode() #region 辅助 /// 友好字符串 /// - public override String ToString() => $"MessageQueue[Topic={Topic}, BrokerName={BrokerName}, QueueId={QueueId}]"; + public override String ToString() => $"{BrokerName}[{QueueId}]"; #endregion } \ No newline at end of file diff --git a/Test/Program.cs b/Test/Program.cs index c10c09a..af44b54 100644 --- a/Test/Program.cs +++ b/Test/Program.cs @@ -17,8 +17,8 @@ static void Main(String[] args) { XTrace.UseConsole(); - Test5(); - //TestAliyun(); + //Test5(); + TestAliyun(); Console.WriteLine("OK!"); Console.ReadKey(); @@ -84,6 +84,8 @@ static void Test2() static void TestAliyun() { + // 2024-04-10 对接阿里云RocketMQ v4测试通过。 + // 创建RocketMQ实例后,需要手工创建Topic和Group,并创建正确的AccessKey var consumer = new Consumer { Topic = "newlife_test_02", @@ -94,7 +96,7 @@ static void TestAliyun() { AccessKey = "LTAI5tKTGShu31C61xRARVC4", SecretKey = "a9oPwph1IcMGanWckzUOwOf3Ork8LO", - InstanceId = "MQ_INST_1827694722767531_BXxCwUhm", + //InstanceId = "MQ_INST_1827694722767531_BXxCwUhm", }, FromLastOffset = true, @@ -107,7 +109,7 @@ static void TestAliyun() consumer.OnConsume = OnConsume; - //consumer.Configure(MqSetting.Current); + consumer.Configure(MqSetting.Current); consumer.Start(); _consumer = consumer; diff --git a/XUnitTestRocketMQ/AliyunIssuesTests.cs b/XUnitTestRocketMQ/AliyunIssuesTests.cs index 0ed91a8..f0da090 100644 --- a/XUnitTestRocketMQ/AliyunIssuesTests.cs +++ b/XUnitTestRocketMQ/AliyunIssuesTests.cs @@ -7,76 +7,75 @@ using NewLife.RocketMQ.Protocol; using Xunit; -namespace XUnitTestRocketMQ +namespace XUnitTestRocketMQ; + +/// +/// 修复Issues调用阿里云版RocketMQ相关问题 +/// #35、#24 +/// +public class AliyunIssuesTests { - /// - /// 修复Issues调用阿里云版RocketMQ相关问题 - /// #35、#24 - /// - public class AliyunIssuesTests + private readonly String _testTopic = "newlife_test_01"; + private readonly String _testGroup = "GID_newlife_Group01"; + private static readonly AliyunOptions _aliyunOptions = new AliyunOptions() + { + AccessKey = "LTAIxxxxxxxxxxxxRARVC4", + SecretKey = "a9oPwxxxxxxxxxxx3OrxxLO", + Server = "http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet", + InstanceId = "MQ_INST_xxxxxxxxxxxx_AXxCwUhm" + }; + + [Fact] + public void ProducerForAliyun_Test() { - private readonly String _testTopic = "newlife_test_01"; - private readonly String _testGroup = "GID_newlife_Group01"; - private static readonly AliyunOptions _aliyunOptions = new AliyunOptions() + var producer = new Producer() { - AccessKey = "LTAIxxxxxxxxxxxxRARVC4", - SecretKey = "a9oPwxxxxxxxxxxx3OrxxLO", - Server = "http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet", - InstanceId = "MQ_INST_xxxxxxxxxxxx_AXxCwUhm" + Topic = _testTopic, + Aliyun = _aliyunOptions, + //NameServerAddress = "http://MQ_INST_xxxxxxxxxx_AXxCwUhm.mq-internet-access.mq-internet.aliyuncs.com:80", + //如果不用上面的默认Server地址,直接将NameServerAddress设为你自己的TCP公网接收点地址也是可以的 }; - [Fact] - public void ProducerForAliyun_Test() - { - var producer = new Producer() - { - Topic = _testTopic, - Aliyun = _aliyunOptions, - //NameServerAddress = "http://MQ_INST_xxxxxxxxxx_AXxCwUhm.mq-internet-access.mq-internet.aliyuncs.com:80", - //如果不用上面的默认Server地址,直接将NameServerAddress设为你自己的TCP公网接收点地址也是可以的 - }; + producer.Start(); - producer.Start(); + var pubResultList = new List(); + for (var i = 0; i < 2; i++) + { + var message = "大家好才是真的好!"; + var pubResult = producer.Publish(message, "newlife_test_tag"); + pubResultList.Add(pubResult.Status == SendStatus.SendOK); + } + Assert.True(pubResultList.All(t => true)); - var pubResultList = new List(); - for (var i = 0; i < 2; i++) - { - var message = "大家好才是真的好!"; - var pubResult = producer.Publish(message, "newlife_test_tag"); - pubResultList.Add(pubResult.Status == SendStatus.SendOK); - } - Assert.True(pubResultList.All(t => true)); + producer.Dispose(); + } - producer.Dispose(); - } + [Fact] + public void ConsumerForAliyun_Test() + { + var consumer = new Consumer() + { + Topic = _testTopic, + Aliyun = _aliyunOptions, + Group = _testGroup, + FromLastOffset = true, + BatchSize = 1, + }; - [Fact] - public void ConsumerForAliyun_Test() + consumer.OnConsume = OnConsume; + consumer.Start(); + Thread.Sleep(3000); + + static Boolean OnConsume(MessageQueue q, MessageExt[] ms) { - var consumer = new Consumer() - { - Topic = _testTopic, - Aliyun = _aliyunOptions, - Group = _testGroup, - FromLastOffset = true, - BatchSize = 1, - }; + Console.WriteLine("[{0}@{1}]收到消息[{2}]", q.BrokerName, q.QueueId, ms.Length); - consumer.OnConsume = OnConsume; - consumer.Start(); - Thread.Sleep(3000); - - static Boolean OnConsume(MessageQueue q, MessageExt[] ms) + foreach (var item in ms.ToList()) { - Console.WriteLine("[{0}@{1}]收到消息[{2}]", q.BrokerName, q.QueueId, ms.Length); - - foreach (var item in ms.ToList()) - { - Console.WriteLine($"消息:主键【{item.Keys}】,产生时间【{item.BornTimestamp.ToDateTime()}】,内容【{item.Body.ToStr()}】"); - } - - return true; + Console.WriteLine($"消息:主键【{item.Keys}】,产生时间【{item.BornTimestamp.ToDateTime()}】,内容【{item.Body.ToStr()}】"); } + + return true; } } } diff --git a/XUnitTestRocketMQ/AliyunTests.cs b/XUnitTestRocketMQ/AliyunTests.cs index 45e48c3..a871a80 100644 --- a/XUnitTestRocketMQ/AliyunTests.cs +++ b/XUnitTestRocketMQ/AliyunTests.cs @@ -9,107 +9,106 @@ using System.Threading.Tasks; using Xunit; -namespace XUnitTestRocketMQ +namespace XUnitTestRocketMQ; + +public class AliyunTests { - public class AliyunTests + private static void SetConfig(MqBase mq) { - private static void SetConfig(MqBase mq) - { - //mq.Server = "http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet"; - mq.Configure(MqSetting.Current); + //mq.Server = "http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet"; + mq.Configure(MqSetting.Current); - mq.Log = XTrace.Log; - } + mq.Log = XTrace.Log; + } - [Fact] - public void CreateTopic() + [Fact] + public void CreateTopic() + { + var mq = new Producer { - var mq = new Producer - { - //Topic = "nx_test", - }; - SetConfig(mq); + //Topic = "nx_test", + }; + SetConfig(mq); - mq.Start(); + mq.Start(); - // 创建topic时,start前不能指定topic,让其使用默认TBW102 - Assert.Equal("TBW102", mq.Topic); + // 创建topic时,start前不能指定topic,让其使用默认TBW102 + Assert.Equal("TBW102", mq.Topic); - mq.CreateTopic("nx_test", 2); - } + mq.CreateTopic("nx_test", 2); + } - [Fact] - static void ProduceTest() + [Fact] + static void ProduceTest() + { + using var mq = new Producer { - using var mq = new Producer - { - Topic = "test1", - }; - SetConfig(mq); + Topic = "test1", + }; + SetConfig(mq); - mq.Start(); + mq.Start(); - for (var i = 0; i < 10; i++) - { - var str = "学无先后达者为师" + i; - //var str = Rand.NextString(1337); + for (var i = 0; i < 10; i++) + { + var str = "学无先后达者为师" + i; + //var str = Rand.NextString(1337); - var sr = mq.Publish(str, "TagA", null); - } + var sr = mq.Publish(str, "TagA", null); } + } - [Fact] - static async Task ProduceAsyncTest() + [Fact] + static async Task ProduceAsyncTest() + { + using var mq = new Producer { - using var mq = new Producer - { - Topic = "test1", - }; - SetConfig(mq); + Topic = "test1", + }; + SetConfig(mq); - mq.Start(); + mq.Start(); - for (var i = 0; i < 10; i++) - { - var str = "学无先后达者为师" + i; - //var str = Rand.NextString(1337); + for (var i = 0; i < 10; i++) + { + var str = "学无先后达者为师" + i; + //var str = Rand.NextString(1337); - var sr = await mq.PublishAsync(str, "TagA", null); - } + var sr = await mq.PublishAsync(str, "TagA", null); } + } - private static Consumer _consumer; - [Fact] - static void ConsumeTest() + private static Consumer _consumer; + [Fact] + static void ConsumeTest() + { + var consumer = new Consumer { - var consumer = new Consumer - { - Topic = "test1", - Group = "test", + Topic = "test1", + Group = "test", - FromLastOffset = true, - BatchSize = 20, - }; - SetConfig(consumer); + FromLastOffset = true, + BatchSize = 20, + }; + SetConfig(consumer); - consumer.OnConsume = OnConsume; - consumer.Start(); + consumer.OnConsume = OnConsume; + consumer.Start(); - _consumer = consumer; - - Thread.Sleep(3000); - } + _consumer = consumer; - private static Boolean OnConsume(MessageQueue q, MessageExt[] ms) - { - Console.WriteLine("[{0}@{1}]收到消息[{2}]", q.BrokerName, q.QueueId, ms.Length); + Thread.Sleep(3000); + } - foreach (var item in ms.ToList()) - { - Console.WriteLine($"消息:主键【{item.Keys}】,产生时间【{item.BornTimestamp.ToDateTime()}】,内容【{item.Body.ToStr()}】"); - } + private static Boolean OnConsume(MessageQueue q, MessageExt[] ms) + { + Console.WriteLine("[{0}@{1}]收到消息[{2}]", q.BrokerName, q.QueueId, ms.Length); - return true; + foreach (var item in ms.ToList()) + { + Console.WriteLine($"消息:主键【{item.Keys}】,产生时间【{item.BornTimestamp.ToDateTime()}】,内容【{item.Body.ToStr()}】"); } + + return true; } } \ No newline at end of file