From 006817ea918d48c83b2fb0f1318a8a3b339f9528 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=A4=A7=E7=9F=B3=E5=A4=B4?= Date: Sat, 5 Oct 2024 01:14:36 +0800 Subject: [PATCH] =?UTF-8?q?v2.6.2024.1004=20=E6=94=AF=E6=8C=81RocketMQ=20v?= =?UTF-8?q?5.3=EF=BC=8C=E5=9C=A8=E5=85=AC=E7=BD=91=E6=B5=8B=E8=AF=95?= =?UTF-8?q?=E9=80=9A=E8=BF=87=E3=80=82=E9=BB=98=E8=AE=A4=E5=86=85=E7=BD=91?= =?UTF-8?q?broker=E5=9C=B0=E5=9D=80=E6=9B=BF=E6=8D=A2=E4=B8=BA=E5=85=AC?= =?UTF-8?q?=E7=BD=91=E5=9C=B0=E5=9D=80=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- NewLife.RocketMQ/MqBase.cs | 22 +++++++- NewLife.RocketMQ/NewLife.RocketMQ.csproj | 2 +- NewLife.RocketMQ/Protocol/Command.cs | 4 +- NewLife.RocketMQ/Protocol/MessageExt.cs | 2 +- Test/Program.cs | 48 +++++++++++++---- XUnitTestRocketMQ/BasicTest.cs | 36 +++++++++++++ XUnitTestRocketMQ/CommandTests.cs | 4 -- XUnitTestRocketMQ/ConsumerTests.cs | 66 ++++++++++++------------ XUnitTestRocketMQ/ProducerTests.cs | 64 +++++++++++------------ 9 files changed, 162 insertions(+), 86 deletions(-) create mode 100644 XUnitTestRocketMQ/BasicTest.cs diff --git a/NewLife.RocketMQ/MqBase.cs b/NewLife.RocketMQ/MqBase.cs index e3b0eb2..b00c2eb 100644 --- a/NewLife.RocketMQ/MqBase.cs +++ b/NewLife.RocketMQ/MqBase.cs @@ -2,6 +2,7 @@ using System.Diagnostics; using System.Reflection; using NewLife.Log; +using NewLife.Net; using NewLife.RocketMQ.Protocol; using NewLife.Serialization; @@ -267,8 +268,21 @@ protected BrokerClient GetBroker(String name) { if (_Brokers.TryGetValue(name, out client)) return client; + // broker可能在内网,转为公网地址 + var uri = new NetUri(NameServerAddress.Split(";").FirstOrDefault()); + var addrs = bk.Addresses.ToArray(); + for (var i = 0; i < addrs.Length; i++) + { + var addr = addrs[i]; + if (addr.StartsWithIgnoreCase("10.", "192.", "172.")) + { + var p = addr.IndexOf(':'); + addrs[i] = p > 0 ? uri.Host + addr[p..] : uri.Host; + } + } + // 实例化客户端 - client = CreateBroker(bk.Name, bk.Addresses); + client = CreateBroker(bk.Name, addrs); client.Start(); @@ -316,7 +330,7 @@ protected virtual BrokerClient CreateBroker(String name, String[] addrs) /// 主题 /// 队列数 /// - public virtual void CreateTopic(String topic, Int32 queueNum, Int32 topicSysFlag = 0) + public virtual Int32 CreateTopic(String topic, Int32 queueNum, Int32 topicSysFlag = 0) { var header = new { @@ -330,6 +344,7 @@ public virtual void CreateTopic(String topic, Int32 queueNum, Int32 topicSysFlag order = false, }; + var count = 0; using var span = Tracer?.NewSpan($"mq:{Name}:CreateTopic", header); try { @@ -341,6 +356,7 @@ public virtual void CreateTopic(String topic, Int32 queueNum, Int32 topicSysFlag { var bk = GetBroker(item.Name); var rs = bk.Invoke(RequestCode.UPDATE_AND_CREATE_TOPIC, null, header); + if (rs != null && rs.Header.Code == (Int32)ResponseCode.SUCCESS) count++; } catch (Exception ex) { @@ -354,6 +370,8 @@ public virtual void CreateTopic(String topic, Int32 queueNum, Int32 topicSysFlag throw; } + + return count; } #endregion diff --git a/NewLife.RocketMQ/NewLife.RocketMQ.csproj b/NewLife.RocketMQ/NewLife.RocketMQ.csproj index 374e69e..f140267 100644 --- a/NewLife.RocketMQ/NewLife.RocketMQ.csproj +++ b/NewLife.RocketMQ/NewLife.RocketMQ.csproj @@ -59,7 +59,7 @@ - + diff --git a/NewLife.RocketMQ/Protocol/Command.cs b/NewLife.RocketMQ/Protocol/Command.cs index 7b59e13..51a36c2 100644 --- a/NewLife.RocketMQ/Protocol/Command.cs +++ b/NewLife.RocketMQ/Protocol/Command.cs @@ -258,9 +258,9 @@ public IPacket ToPacket() { var ms = new MemoryStream(); Write(ms, null); - ms.Position = 0; - return new Packet(ms); + ms.Position = 0; + return new ArrayPacket(ms); } /// 创建响应 diff --git a/NewLife.RocketMQ/Protocol/MessageExt.cs b/NewLife.RocketMQ/Protocol/MessageExt.cs index 8dd6c82..dba3528 100644 --- a/NewLife.RocketMQ/Protocol/MessageExt.cs +++ b/NewLife.RocketMQ/Protocol/MessageExt.cs @@ -121,7 +121,7 @@ public Boolean Read(Stream stream, Object context = null) ms.Write(port.GetBytes(false)); ms.Write(CommitLogOffset.GetBytes(false)); - MsgId = ms.Put(true).ToHex(0, 16); + MsgId = ms.Return(true).ToHex(0, 16); return true; } diff --git a/Test/Program.cs b/Test/Program.cs index af44b54..55fe83d 100644 --- a/Test/Program.cs +++ b/Test/Program.cs @@ -17,8 +17,8 @@ static void Main(String[] args) { XTrace.UseConsole(); - //Test5(); - TestAliyun(); + Test1(); + //TestAliyun(); Console.WriteLine("OK!"); Console.ReadKey(); @@ -26,35 +26,61 @@ static void Main(String[] args) static void Test1() { - var mq = new Producer + XTrace.WriteLine(""); + XTrace.WriteLine("创建生产者……"); + var producer = new Producer { Topic = "nx_test", - NameServerAddress = "127.0.0.1:9876", + NameServerAddress = "rocketmq.newlifex.com:9876", Log = XTrace.Log, }; - mq.Configure(MqSetting.Current); - mq.Start(); + producer.Configure(MqSetting.Current); + producer.Start(); //mq.CreateTopic("nx_test", 2); - for (var i = 0; i < 1000_000; i++) + XTrace.WriteLine(""); + XTrace.WriteLine("创建消费者……"); + var consumer = new Consumer + { + Topic = producer.Topic, + Group = "test", + NameServerAddress = producer.NameServerAddress, + + FromLastOffset = false, + //SkipOverStoredMsgCount = 0, + //BatchSize = 20, + + Log = XTrace.Log, + ClientLog = XTrace.Log, + }; + + consumer.OnConsume = OnConsume; + + consumer.Configure(MqSetting.Current); + consumer.Start(); + Thread.Sleep(1000); + + XTrace.WriteLine(""); + XTrace.WriteLine("发布测试消息……"); + for (var i = 0; i < 10; i++) { var str = "学无先后达者为师" + i; //var str = Rand.NextString(1337); - var sr = mq.Publish(str, "TagA", null); + var sr = producer.Publish(str, "TagA", null); //Console.WriteLine("[{0}] {1} {2} {3}", sr.Queue.BrokerName, sr.Queue.QueueId, sr.MsgId, sr.QueueOffset); // 阿里云发送消息不能过快,否则报错“服务不可用” - Thread.Sleep(100); + Thread.Sleep(500); } Console.WriteLine("完成"); - mq.Dispose(); + producer.Dispose(); } private static Consumer _consumer; @@ -121,7 +147,7 @@ private static Boolean OnConsume(MessageQueue q, MessageExt[] ms) foreach (var item in ms.ToList()) { - Console.WriteLine($"消息:主键【{item.Keys}】 Topic 【{item.Topic}】,产生时间【{item.BornTimestamp.ToDateTime()}】,内容【{item.Body.ToStr(null, 0, 64)}】"); + Console.WriteLine($"消息:主键【{item.Keys}】 Topic 【{item.Topic}】,产生时间【{item.BornTimestamp.ToDateTime().ToFullString()}】,内容【{item.Body.ToStr(null, 0, 64)}】"); } return true; diff --git a/XUnitTestRocketMQ/BasicTest.cs b/XUnitTestRocketMQ/BasicTest.cs new file mode 100644 index 0000000..b290acb --- /dev/null +++ b/XUnitTestRocketMQ/BasicTest.cs @@ -0,0 +1,36 @@ +using System; +using System.Linq; +using NewLife; +using NewLife.Log; +using NewLife.RocketMQ; +using Xunit; + +// 所有测试用例放入一个汇编级集合,除非单独指定Collection特性 +[assembly: CollectionBehavior(CollectionBehavior.CollectionPerAssembly)] + +namespace XUnitTestRocketMQ; + +[Collection("Basic")] +public class BasicTest +{ + private static MqSetting _config; + public static MqSetting GetConfig() + { + if (_config != null) return _config; + lock (typeof(BasicTest)) + { + if (_config != null) return _config; + + var set = MqSetting.Current; + if (set.IsNew) + { + set.NameServer = "rocketmq.newlifex.com:9876"; + set.Save(); + } + + XTrace.WriteLine("RocketMQ配置:{0}", set.NameServer); + + return _config = set; + } + } +} diff --git a/XUnitTestRocketMQ/CommandTests.cs b/XUnitTestRocketMQ/CommandTests.cs index de6c7c1..e5b6724 100644 --- a/XUnitTestRocketMQ/CommandTests.cs +++ b/XUnitTestRocketMQ/CommandTests.cs @@ -1,9 +1,5 @@ using System; -using System.Collections.Generic; using System.IO; -using System.Linq; -using System.Text; -using System.Threading.Tasks; using NewLife; using NewLife.Data; using NewLife.RocketMQ.Protocol; diff --git a/XUnitTestRocketMQ/ConsumerTests.cs b/XUnitTestRocketMQ/ConsumerTests.cs index 030109e..91f9987 100644 --- a/XUnitTestRocketMQ/ConsumerTests.cs +++ b/XUnitTestRocketMQ/ConsumerTests.cs @@ -7,49 +7,49 @@ using System.Threading; using Xunit; -namespace XUnitTestRocketMQ +namespace XUnitTestRocketMQ; + +public class ConsumerTests { - public class ConsumerTests + private static Consumer _consumer; + [Fact] + public static void ConsumeTest() { - private static Consumer _consumer; - [Fact] - static void ConsumeTest() + var set = BasicTest.GetConfig(); + var consumer = new Consumer { - var consumer = new Consumer - { - Topic = "nx_test", - Group = "test", - NameServerAddress = "127.0.0.1:9876", - - FromLastOffset = true, - BatchSize = 20, + Topic = "nx_test", + Group = "test", + NameServerAddress = set.NameServer, - Log = XTrace.Log, - }; + FromLastOffset = true, + BatchSize = 20, - consumer.OnConsume = OnConsume; - consumer.Start(); + Log = XTrace.Log, + }; - _consumer = consumer; + consumer.OnConsume = OnConsume; + consumer.Start(); - Thread.Sleep(3000); - //foreach (var item in consumer.Clients) - //{ - // var rs = item.GetRuntimeInfo(); - // Console.WriteLine("{0}\t{1}", item.Name, rs["brokerVersionDesc"]); - //} - } + _consumer = consumer; - private static Boolean OnConsume(MessageQueue q, MessageExt[] ms) - { - Console.WriteLine("[{0}@{1}]收到消息[{2}]", q.BrokerName, q.QueueId, ms.Length); + Thread.Sleep(3000); + //foreach (var item in consumer.Clients) + //{ + // var rs = item.GetRuntimeInfo(); + // Console.WriteLine("{0}\t{1}", item.Name, rs["brokerVersionDesc"]); + //} + } - foreach (var item in ms.ToList()) - { - Console.WriteLine($"消息:主键【{item.Keys}】,产生时间【{item.BornTimestamp.ToDateTime()}】,内容【{item.Body.ToStr()}】"); - } + private static Boolean OnConsume(MessageQueue q, MessageExt[] ms) + { + Console.WriteLine("[{0}@{1}]收到消息[{2}]", q.BrokerName, q.QueueId, ms.Length); - return true; + foreach (var item in ms.ToList()) + { + Console.WriteLine($"消息:主键【{item.Keys}】,产生时间【{item.BornTimestamp.ToDateTime()}】,内容【{item.Body.ToStr()}】"); } + + return true; } } \ No newline at end of file diff --git a/XUnitTestRocketMQ/ProducerTests.cs b/XUnitTestRocketMQ/ProducerTests.cs index 2e5d813..cae2e22 100644 --- a/XUnitTestRocketMQ/ProducerTests.cs +++ b/XUnitTestRocketMQ/ProducerTests.cs @@ -1,52 +1,52 @@ using NewLife.Log; using NewLife.RocketMQ; -using System; -using System.Linq; using Xunit; -namespace XUnitTestRocketMQ +namespace XUnitTestRocketMQ; + +public class ProducerTests { - public class ProducerTests + [Fact] + public void CreateTopic() { - [Fact] - public void CreateTopic() + var set = BasicTest.GetConfig(); + var mq = new Producer { - var mq = new Producer - { - //Topic = "nx_test", - NameServerAddress = "127.0.0.1:9876", + //Topic = "nx_test", + NameServerAddress = set.NameServer, - Log = XTrace.Log, - }; + Log = XTrace.Log, + }; - mq.Start(); + mq.Start(); - // 创建topic时,start前不能指定topic,让其使用默认TBW102 - Assert.Equal("TBW102", mq.Topic); + // 创建topic时,start前不能指定topic,让其使用默认TBW102 + Assert.Equal("TBW102", mq.Topic); - mq.CreateTopic("nx_test", 2); - } + var rs = mq.CreateTopic("nx_test", 2); + Assert.True(rs > 0); + } - [Fact] - static void ProduceTest() + [Fact] + public static void ProduceTest() + { + var set = BasicTest.GetConfig(); + using var mq = new Producer { - using var mq = new Producer - { - Topic = "nx_test", - NameServerAddress = "127.0.0.1:9876", + Topic = "nx_test", + NameServerAddress = set.NameServer, - Log = XTrace.Log, - }; + Log = XTrace.Log, + }; - mq.Start(); + mq.Start(); - for (var i = 0; i < 10; i++) - { - var str = "学无先后达者为师" + i; - //var str = Rand.NextString(1337); + for (var i = 0; i < 10; i++) + { + var str = "学无先后达者为师" + i; + //var str = Rand.NextString(1337); - var sr = mq.Publish(str, "TagA", null); - } + var sr = mq.Publish(str, "TagA", null); } } } \ No newline at end of file