From 88e644756bab0ebc01feba53483d831d477f0627 Mon Sep 17 00:00:00 2001 From: Zhouxiang Zhan Date: Tue, 2 Apr 2024 11:31:30 +0800 Subject: [PATCH] [ISSUE #7988] Refector client trace (#7989) * [ISSUE #7988] Refector client trace * build trace dispatcher in start method * setNamespaceV2 for dispatcher * disable trace for inner traceProducer * fix tls --- .../apache/rocketmq/client/ClientConfig.java | 32 ++++++++++ .../consumer/DefaultLitePullConsumer.java | 31 ++++------ .../consumer/DefaultMQPushConsumer.java | 48 ++++++++------- .../client/producer/DefaultMQProducer.java | 60 ++++++++----------- .../client/trace/AsyncTraceDispatcher.java | 11 ++++ .../DefaultMQConsumerWithOpenTracingTest.java | 2 + .../trace/DefaultMQConsumerWithTraceTest.java | 10 ++-- .../DefaultMQProducerWithOpenTracingTest.java | 2 + .../trace/DefaultMQProducerWithTraceTest.java | 13 ++-- ...nsactionMQProducerWithOpenTracingTest.java | 2 + .../TransactionMQProducerWithTraceTest.java | 5 +- 11 files changed, 124 insertions(+), 92 deletions(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java index 8a7beffc704..48c995301ae 100644 --- a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java +++ b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java @@ -98,6 +98,16 @@ public class ClientConfig { private boolean enableHeartbeatChannelEventListener = true; + /** + * The switch for message trace + */ + protected boolean enableTrace = true; + + /** + * The name value of message trace topic. If not set, the default trace topic name will be used. + */ + protected String traceTopic; + public String buildMQClientId() { StringBuilder sb = new StringBuilder(); sb.append(this.getClientIP()); @@ -215,6 +225,8 @@ public void resetClientConfig(final ClientConfig cc) { this.detectInterval = cc.detectInterval; this.detectTimeout = cc.detectTimeout; this.namespaceV2 = cc.namespaceV2; + this.enableTrace = cc.enableTrace; + this.traceTopic = cc.traceTopic; } public ClientConfig cloneClientConfig() { @@ -245,6 +257,8 @@ public ClientConfig cloneClientConfig() { cc.detectInterval = detectInterval; cc.detectTimeout = detectTimeout; cc.namespaceV2 = namespaceV2; + cc.enableTrace = enableTrace; + cc.traceTopic = traceTopic; return cc; } @@ -474,6 +488,22 @@ public void setUseHeartbeatV2(boolean useHeartbeatV2) { this.useHeartbeatV2 = useHeartbeatV2; } + public boolean isEnableTrace() { + return enableTrace; + } + + public void setEnableTrace(boolean enableTrace) { + this.enableTrace = enableTrace; + } + + public String getTraceTopic() { + return traceTopic; + } + + public void setTraceTopic(String traceTopic) { + this.traceTopic = traceTopic; + } + @Override public String toString() { return "ClientConfig{" + @@ -505,6 +535,8 @@ public String toString() { ", sendLatencyEnable=" + sendLatencyEnable + ", startDetectorEnable=" + startDetectorEnable + ", enableHeartbeatChannelEventListener=" + enableHeartbeatChannelEventListener + + ", enableTrace=" + enableTrace + + ", traceTopic='" + traceTopic + '\'' + '}'; } } diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java index c193c6a42e4..3364df48f89 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java @@ -169,15 +169,7 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon */ private TraceDispatcher traceDispatcher = null; - /** - * The flag for message trace - */ - private boolean enableMsgTrace = false; - - /** - * The name value of message trace topic.If you don't config,you can use the default trace topic name. - */ - private String customizedTraceTopic; + private RPCHook rpcHook; /** * Default constructor. @@ -212,6 +204,7 @@ public DefaultLitePullConsumer(RPCHook rpcHook) { */ public DefaultLitePullConsumer(final String consumerGroup, RPCHook rpcHook) { this.consumerGroup = consumerGroup; + this.rpcHook = rpcHook; this.enableStreamRequestType = true; defaultLitePullConsumerImpl = new DefaultLitePullConsumerImpl(this, rpcHook); } @@ -226,6 +219,7 @@ public DefaultLitePullConsumer(final String consumerGroup, RPCHook rpcHook) { public DefaultLitePullConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook) { this.namespace = namespace; this.consumerGroup = consumerGroup; + this.rpcHook = rpcHook; this.enableStreamRequestType = true; defaultLitePullConsumerImpl = new DefaultLitePullConsumerImpl(this, rpcHook); } @@ -592,15 +586,12 @@ public TraceDispatcher getTraceDispatcher() { return traceDispatcher; } - public void setCustomizedTraceTopic(String customizedTraceTopic) { - this.customizedTraceTopic = customizedTraceTopic; - } - private void setTraceDispatcher() { - if (isEnableMsgTrace()) { + if (enableTrace) { try { - AsyncTraceDispatcher traceDispatcher = new AsyncTraceDispatcher(consumerGroup, TraceDispatcher.Type.CONSUME, customizedTraceTopic, null); + AsyncTraceDispatcher traceDispatcher = new AsyncTraceDispatcher(consumerGroup, TraceDispatcher.Type.CONSUME, traceTopic, rpcHook); traceDispatcher.getTraceProducer().setUseTLS(this.isUseTLS()); + traceDispatcher.setNamespaceV2(namespaceV2); this.traceDispatcher = traceDispatcher; this.defaultLitePullConsumerImpl.registerConsumeMessageHook( new ConsumeMessageTraceHookImpl(traceDispatcher)); @@ -611,14 +602,18 @@ private void setTraceDispatcher() { } public String getCustomizedTraceTopic() { - return customizedTraceTopic; + return traceTopic; + } + + public void setCustomizedTraceTopic(String customizedTraceTopic) { + this.traceTopic = customizedTraceTopic; } public boolean isEnableMsgTrace() { - return enableMsgTrace; + return enableTrace; } public void setEnableMsgTrace(boolean enableMsgTrace) { - this.enableMsgTrace = enableMsgTrace; + this.enableTrace = enableMsgTrace; } } diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java index 502c5ef184e..312f4632cab 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java @@ -293,6 +293,8 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume // force to use client rebalance private boolean clientRebalance = true; + private RPCHook rpcHook = null; + /** * Default constructor. */ @@ -327,6 +329,7 @@ public DefaultMQPushConsumer(RPCHook rpcHook) { */ public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook) { this.consumerGroup = consumerGroup; + this.rpcHook = rpcHook; this.allocateMessageQueueStrategy = new AllocateMessageQueueAveragely(); defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook); } @@ -353,6 +356,7 @@ public DefaultMQPushConsumer(final String consumerGroup, boolean enableMsgTrace, public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook, AllocateMessageQueueStrategy allocateMessageQueueStrategy) { this.consumerGroup = consumerGroup; + this.rpcHook = rpcHook; this.allocateMessageQueueStrategy = allocateMessageQueueStrategy; defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook); } @@ -369,18 +373,11 @@ public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook, public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook, AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean enableMsgTrace, final String customizedTraceTopic) { this.consumerGroup = consumerGroup; + this.rpcHook = rpcHook; this.allocateMessageQueueStrategy = allocateMessageQueueStrategy; defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook); - if (enableMsgTrace) { - try { - AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(consumerGroup, TraceDispatcher.Type.CONSUME, customizedTraceTopic, rpcHook); - dispatcher.setHostConsumer(this.defaultMQPushConsumerImpl); - traceDispatcher = dispatcher; - this.defaultMQPushConsumerImpl.registerConsumeMessageHook(new ConsumeMessageTraceHookImpl(traceDispatcher)); - } catch (Throwable e) { - log.error("system mqtrace hook init failed ,maybe can't send msg trace data"); - } - } + this.enableTrace = enableMsgTrace; + this.traceTopic = customizedTraceTopic; } /** @@ -419,6 +416,7 @@ public DefaultMQPushConsumer(final String namespace, final String consumerGroup, AllocateMessageQueueStrategy allocateMessageQueueStrategy) { this.consumerGroup = consumerGroup; this.namespace = namespace; + this.rpcHook = rpcHook; this.allocateMessageQueueStrategy = allocateMessageQueueStrategy; defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook); } @@ -438,18 +436,11 @@ public DefaultMQPushConsumer(final String namespace, final String consumerGroup, AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean enableMsgTrace, final String customizedTraceTopic) { this.consumerGroup = consumerGroup; this.namespace = namespace; + this.rpcHook = rpcHook; this.allocateMessageQueueStrategy = allocateMessageQueueStrategy; defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook); - if (enableMsgTrace) { - try { - AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(consumerGroup, TraceDispatcher.Type.CONSUME, customizedTraceTopic, rpcHook); - dispatcher.setHostConsumer(this.defaultMQPushConsumerImpl); - traceDispatcher = dispatcher; - this.defaultMQPushConsumerImpl.registerConsumeMessageHook(new ConsumeMessageTraceHookImpl(traceDispatcher)); - } catch (Throwable e) { - log.error("system mqtrace hook init failed ,maybe can't send msg trace data"); - } - } + this.enableTrace = enableMsgTrace; + this.traceTopic = customizedTraceTopic; } /** @@ -464,9 +455,6 @@ public void createTopic(String key, String newTopic, int queueNum, Map fetchSubscribeMessageQueues(String topic) throws MQClie public void start() throws MQClientException { setConsumerGroup(NamespaceUtil.wrapNamespace(this.getNamespace(), this.consumerGroup)); this.defaultMQPushConsumerImpl.start(); + if (enableTrace) { + try { + AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(consumerGroup, TraceDispatcher.Type.CONSUME, traceTopic, rpcHook); + dispatcher.setHostConsumer(this.defaultMQPushConsumerImpl); + dispatcher.setNamespaceV2(namespaceV2); + traceDispatcher = dispatcher; + this.defaultMQPushConsumerImpl.registerConsumeMessageHook(new ConsumeMessageTraceHookImpl(traceDispatcher)); + } catch (Throwable e) { + log.error("system mqtrace hook init failed ,maybe can't send msg trace data"); + } + } if (null != traceDispatcher) { + if (traceDispatcher instanceof AsyncTraceDispatcher) { + ((AsyncTraceDispatcher) traceDispatcher).getTraceProducer().setUseTLS(isUseTLS()); + } try { traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel()); } catch (MQClientException e) { diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java index cabe96ca7b7..0abf925a82a 100644 --- a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java +++ b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java @@ -167,6 +167,8 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { */ private int backPressureForAsyncSendSize = 100 * 1024 * 1024; + private RPCHook rpcHook = null; + /** * Default constructor. */ @@ -202,6 +204,7 @@ public DefaultMQProducer(final String producerGroup) { */ public DefaultMQProducer(final String producerGroup, RPCHook rpcHook) { this.producerGroup = producerGroup; + this.rpcHook = rpcHook; defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook); produceAccumulator = MQClientManager.getInstance().getOrCreateProduceAccumulator(this); } @@ -243,20 +246,8 @@ public DefaultMQProducer(final String producerGroup, boolean enableMsgTrace, fin public DefaultMQProducer(final String producerGroup, RPCHook rpcHook, boolean enableMsgTrace, final String customizedTraceTopic) { this(producerGroup, rpcHook); - //if client open the message trace feature - if (enableMsgTrace) { - try { - AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(producerGroup, TraceDispatcher.Type.PRODUCE, customizedTraceTopic, rpcHook); - dispatcher.setHostProducer(this.defaultMQProducerImpl); - traceDispatcher = dispatcher; - this.defaultMQProducerImpl.registerSendMessageHook( - new SendMessageTraceHookImpl(traceDispatcher)); - this.defaultMQProducerImpl.registerEndTransactionHook( - new EndTransactionTraceHookImpl(traceDispatcher)); - } catch (Throwable e) { - logger.error("system mqtrace hook init failed ,maybe can't send msg trace data"); - } - } + this.enableTrace = enableMsgTrace; + this.traceTopic = customizedTraceTopic; } /** @@ -298,6 +289,7 @@ public DefaultMQProducer(final String namespace, final String producerGroup) { public DefaultMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook) { this.namespace = namespace; this.producerGroup = producerGroup; + this.rpcHook = rpcHook; defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook); produceAccumulator = MQClientManager.getInstance().getOrCreateProduceAccumulator(this); } @@ -318,27 +310,8 @@ public DefaultMQProducer(final String namespace, final String producerGroup, RPC boolean enableMsgTrace, final String customizedTraceTopic) { this(namespace, producerGroup, rpcHook); //if client open the message trace feature - if (enableMsgTrace) { - try { - AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(producerGroup, TraceDispatcher.Type.PRODUCE, customizedTraceTopic, rpcHook); - dispatcher.setHostProducer(this.defaultMQProducerImpl); - traceDispatcher = dispatcher; - this.defaultMQProducerImpl.registerSendMessageHook( - new SendMessageTraceHookImpl(traceDispatcher)); - this.defaultMQProducerImpl.registerEndTransactionHook( - new EndTransactionTraceHookImpl(traceDispatcher)); - } catch (Throwable e) { - logger.error("system mqtrace hook init failed ,maybe can't send msg trace data"); - } - } - } - - @Override - public void setUseTLS(boolean useTLS) { - super.setUseTLS(useTLS); - if (traceDispatcher instanceof AsyncTraceDispatcher) { - ((AsyncTraceDispatcher) traceDispatcher).getTraceProducer().setUseTLS(useTLS); - } + this.enableTrace = enableMsgTrace; + this.traceTopic = customizedTraceTopic; } /** @@ -356,7 +329,24 @@ public void start() throws MQClientException { if (this.produceAccumulator != null) { this.produceAccumulator.start(); } + if (enableTrace) { + try { + AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(producerGroup, TraceDispatcher.Type.PRODUCE, traceTopic, rpcHook); + dispatcher.setHostProducer(this.defaultMQProducerImpl); + dispatcher.setNamespaceV2(this.namespaceV2); + traceDispatcher = dispatcher; + this.defaultMQProducerImpl.registerSendMessageHook( + new SendMessageTraceHookImpl(traceDispatcher)); + this.defaultMQProducerImpl.registerEndTransactionHook( + new EndTransactionTraceHookImpl(traceDispatcher)); + } catch (Throwable e) { + logger.error("system mqtrace hook init failed ,maybe can't send msg trace data"); + } + } if (null != traceDispatcher) { + if (traceDispatcher instanceof AsyncTraceDispatcher) { + ((AsyncTraceDispatcher) traceDispatcher).getTraceProducer().setUseTLS(isUseTLS()); + } try { traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel()); } catch (MQClientException e) { diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java b/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java index ea423b71766..d44f22616f4 100644 --- a/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java +++ b/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java @@ -78,6 +78,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher { private volatile AccessChannel accessChannel = AccessChannel.LOCAL; private String group; private Type type; + private String namespaceV2; public AsyncTraceDispatcher(String group, Type type, String traceTopicName, RPCHook rpcHook) { // queueSize is greater than or equal to the n power of 2 of value @@ -144,10 +145,20 @@ public void setHostConsumer(DefaultMQPushConsumerImpl hostConsumer) { this.hostConsumer = hostConsumer; } + public String getNamespaceV2() { + return namespaceV2; + } + + public void setNamespaceV2(String namespaceV2) { + this.namespaceV2 = namespaceV2; + } + public void start(String nameSrvAddr, AccessChannel accessChannel) throws MQClientException { if (isStarted.compareAndSet(false, true)) { traceProducer.setNamesrvAddr(nameSrvAddr); traceProducer.setInstanceName(TRACE_INSTANCE_NAME + "_" + nameSrvAddr); + traceProducer.setNamespaceV2(namespaceV2); + traceProducer.setEnableTrace(false); traceProducer.start(); } this.accessChannel = accessChannel; diff --git a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithOpenTracingTest.java b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithOpenTracingTest.java index a39ae4a4ded..028445ef2d7 100644 --- a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithOpenTracingTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithOpenTracingTest.java @@ -135,6 +135,8 @@ public PullResult answer(InvocationOnMock mock) throws Throwable { new ConsumeMessageOpenTracingHookImpl(tracer)); pushConsumer.setNamesrvAddr("127.0.0.1:9876"); pushConsumer.setPullInterval(60 * 1000); + // disable trace to let mock trace work + pushConsumer.setEnableTrace(false); OffsetStore offsetStore = Mockito.mock(OffsetStore.class); Mockito.when(offsetStore.readOffset(any(MessageQueue.class), any(ReadOffsetType.class))).thenReturn(0L); diff --git a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java index 60aa446bbe9..fc63cce1ce4 100644 --- a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java @@ -128,11 +128,9 @@ public void init() throws Exception { normalPushConsumer = new DefaultMQPushConsumer(consumerGroupNormal, false, ""); customTraceTopicPushConsumer = new DefaultMQPushConsumer(consumerGroup, true, customerTraceTopic); pushConsumer.setNamesrvAddr("127.0.0.1:9876"); + pushConsumer.setUseTLS(true); pushConsumer.setPullInterval(60 * 1000); - asyncTraceDispatcher = (AsyncTraceDispatcher) pushConsumer.getTraceDispatcher(); - traceProducer = asyncTraceDispatcher.getTraceProducer(); - pushConsumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List msgs, @@ -157,6 +155,9 @@ public ConsumeConcurrentlyStatus consumeMessage(List msgs, pushConsumer.start(); + asyncTraceDispatcher = (AsyncTraceDispatcher) pushConsumer.getTraceDispatcher(); + traceProducer = asyncTraceDispatcher.getTraceProducer(); + mQClientFactory = spy(pushConsumerImpl.getmQClientFactory()); mQClientTraceFactory = spy(pushConsumerImpl.getmQClientFactory()); @@ -242,9 +243,6 @@ public ConsumeConcurrentlyStatus consumeMessage(List msgs, @Test public void testPushConsumerWithTraceTLS() { - DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroup", true, null); - consumer.setUseTLS(true); - AsyncTraceDispatcher asyncTraceDispatcher = (AsyncTraceDispatcher) consumer.getTraceDispatcher(); Assert.assertTrue(asyncTraceDispatcher.getTraceProducer().isUseTLS()); } diff --git a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithOpenTracingTest.java b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithOpenTracingTest.java index 8fbc70ea44f..9ce9d6b4941 100644 --- a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithOpenTracingTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithOpenTracingTest.java @@ -88,6 +88,8 @@ public void init() throws Exception { new SendMessageOpenTracingHookImpl(tracer)); producer.setNamesrvAddr("127.0.0.1:9876"); message = new Message(topic, new byte[] {'a', 'b', 'c'}); + // disable trace to let mock trace work + producer.setEnableTrace(false); producer.start(); diff --git a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java index ee173351852..ed680d8e6cf 100644 --- a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java @@ -92,14 +92,14 @@ public void init() throws Exception { normalProducer.setNamesrvAddr("127.0.0.1:9877"); customTraceTopicproducer.setNamesrvAddr("127.0.0.1:9878"); message = new Message(topic, new byte[] {'a', 'b', 'c'}); - asyncTraceDispatcher = (AsyncTraceDispatcher) producer.getTraceDispatcher(); - asyncTraceDispatcher.setTraceTopicName(customerTraceTopic); - asyncTraceDispatcher.getHostProducer(); - asyncTraceDispatcher.getHostConsumer(); - traceProducer = asyncTraceDispatcher.getTraceProducer(); + producer.setTraceTopic(customerTraceTopic); + producer.setUseTLS(true); producer.start(); + asyncTraceDispatcher = (AsyncTraceDispatcher) producer.getTraceDispatcher(); + traceProducer = asyncTraceDispatcher.getTraceProducer(); + Field field = DefaultMQProducerImpl.class.getDeclaredField("mQClientFactory"); field.setAccessible(true); field.set(producer.getDefaultMQProducerImpl(), mQClientFactory); @@ -150,9 +150,6 @@ public void testSendMessageSync_WithTrace_NoBrokerSet_Exception() throws Remotin @Test public void testProducerWithTraceTLS() { - DefaultMQProducer producer = new DefaultMQProducer(producerGroupTemp, true, null); - producer.setUseTLS(true); - AsyncTraceDispatcher asyncTraceDispatcher = (AsyncTraceDispatcher) producer.getTraceDispatcher(); Assert.assertTrue(asyncTraceDispatcher.getTraceProducer().isUseTLS()); } diff --git a/client/src/test/java/org/apache/rocketmq/client/trace/TransactionMQProducerWithOpenTracingTest.java b/client/src/test/java/org/apache/rocketmq/client/trace/TransactionMQProducerWithOpenTracingTest.java index 5646a17dbe6..5d4b81d16db 100644 --- a/client/src/test/java/org/apache/rocketmq/client/trace/TransactionMQProducerWithOpenTracingTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/trace/TransactionMQProducerWithOpenTracingTest.java @@ -103,6 +103,8 @@ public LocalTransactionState checkLocalTransaction(MessageExt msg) { producer.getDefaultMQProducerImpl().registerSendMessageHook(new SendMessageOpenTracingHookImpl(tracer)); producer.getDefaultMQProducerImpl().registerEndTransactionHook(new EndTransactionOpenTracingHookImpl(tracer)); producer.setTransactionListener(transactionListener); + // disable trace to let mock trace work + producer.setEnableTrace(false); producer.setNamesrvAddr("127.0.0.1:9876"); message = new Message(topic, new byte[] {'a', 'b', 'c'}); diff --git a/client/src/test/java/org/apache/rocketmq/client/trace/TransactionMQProducerWithTraceTest.java b/client/src/test/java/org/apache/rocketmq/client/trace/TransactionMQProducerWithTraceTest.java index 8cf87444c0c..9f6036153bc 100644 --- a/client/src/test/java/org/apache/rocketmq/client/trace/TransactionMQProducerWithTraceTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/trace/TransactionMQProducerWithTraceTest.java @@ -111,11 +111,12 @@ public LocalTransactionState checkLocalTransaction(MessageExt msg) { producer.setNamesrvAddr("127.0.0.1:9876"); message = new Message(topic, new byte[] {'a', 'b', 'c'}); - asyncTraceDispatcher = (AsyncTraceDispatcher) producer.getTraceDispatcher(); - traceProducer = asyncTraceDispatcher.getTraceProducer(); producer.start(); + asyncTraceDispatcher = (AsyncTraceDispatcher) producer.getTraceDispatcher(); + traceProducer = asyncTraceDispatcher.getTraceProducer(); + Field field = DefaultMQProducerImpl.class.getDeclaredField("mQClientFactory"); field.setAccessible(true); field.set(producer.getDefaultMQProducerImpl(), mQClientFactory);