Skip to content

Commit

Permalink
[ISSUE apache#7988] Refector client trace (apache#7989)
Browse files Browse the repository at this point in the history
* [ISSUE apache#7988] Refector client trace

* build trace dispatcher in start method
* setNamespaceV2 for dispatcher
* disable trace for inner traceProducer
* fix tls
  • Loading branch information
drpmma authored Apr 2, 2024
1 parent bf24ffd commit 88e6447
Show file tree
Hide file tree
Showing 11 changed files with 124 additions and 92 deletions.
32 changes: 32 additions & 0 deletions client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,16 @@ public class ClientConfig {

private boolean enableHeartbeatChannelEventListener = true;

/**
* The switch for message trace
*/
protected boolean enableTrace = true;

/**
* The name value of message trace topic. If not set, the default trace topic name will be used.
*/
protected String traceTopic;

public String buildMQClientId() {
StringBuilder sb = new StringBuilder();
sb.append(this.getClientIP());
Expand Down Expand Up @@ -215,6 +225,8 @@ public void resetClientConfig(final ClientConfig cc) {
this.detectInterval = cc.detectInterval;
this.detectTimeout = cc.detectTimeout;
this.namespaceV2 = cc.namespaceV2;
this.enableTrace = cc.enableTrace;
this.traceTopic = cc.traceTopic;
}

public ClientConfig cloneClientConfig() {
Expand Down Expand Up @@ -245,6 +257,8 @@ public ClientConfig cloneClientConfig() {
cc.detectInterval = detectInterval;
cc.detectTimeout = detectTimeout;
cc.namespaceV2 = namespaceV2;
cc.enableTrace = enableTrace;
cc.traceTopic = traceTopic;
return cc;
}

Expand Down Expand Up @@ -474,6 +488,22 @@ public void setUseHeartbeatV2(boolean useHeartbeatV2) {
this.useHeartbeatV2 = useHeartbeatV2;
}

public boolean isEnableTrace() {
return enableTrace;
}

public void setEnableTrace(boolean enableTrace) {
this.enableTrace = enableTrace;
}

public String getTraceTopic() {
return traceTopic;
}

public void setTraceTopic(String traceTopic) {
this.traceTopic = traceTopic;
}

@Override
public String toString() {
return "ClientConfig{" +
Expand Down Expand Up @@ -505,6 +535,8 @@ public String toString() {
", sendLatencyEnable=" + sendLatencyEnable +
", startDetectorEnable=" + startDetectorEnable +
", enableHeartbeatChannelEventListener=" + enableHeartbeatChannelEventListener +
", enableTrace=" + enableTrace +
", traceTopic='" + traceTopic + '\'' +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -169,15 +169,7 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon
*/
private TraceDispatcher traceDispatcher = null;

/**
* The flag for message trace
*/
private boolean enableMsgTrace = false;

/**
* The name value of message trace topic.If you don't config,you can use the default trace topic name.
*/
private String customizedTraceTopic;
private RPCHook rpcHook;

/**
* Default constructor.
Expand Down Expand Up @@ -212,6 +204,7 @@ public DefaultLitePullConsumer(RPCHook rpcHook) {
*/
public DefaultLitePullConsumer(final String consumerGroup, RPCHook rpcHook) {
this.consumerGroup = consumerGroup;
this.rpcHook = rpcHook;
this.enableStreamRequestType = true;
defaultLitePullConsumerImpl = new DefaultLitePullConsumerImpl(this, rpcHook);
}
Expand All @@ -226,6 +219,7 @@ public DefaultLitePullConsumer(final String consumerGroup, RPCHook rpcHook) {
public DefaultLitePullConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook) {
this.namespace = namespace;
this.consumerGroup = consumerGroup;
this.rpcHook = rpcHook;
this.enableStreamRequestType = true;
defaultLitePullConsumerImpl = new DefaultLitePullConsumerImpl(this, rpcHook);
}
Expand Down Expand Up @@ -592,15 +586,12 @@ public TraceDispatcher getTraceDispatcher() {
return traceDispatcher;
}

public void setCustomizedTraceTopic(String customizedTraceTopic) {
this.customizedTraceTopic = customizedTraceTopic;
}

private void setTraceDispatcher() {
if (isEnableMsgTrace()) {
if (enableTrace) {
try {
AsyncTraceDispatcher traceDispatcher = new AsyncTraceDispatcher(consumerGroup, TraceDispatcher.Type.CONSUME, customizedTraceTopic, null);
AsyncTraceDispatcher traceDispatcher = new AsyncTraceDispatcher(consumerGroup, TraceDispatcher.Type.CONSUME, traceTopic, rpcHook);
traceDispatcher.getTraceProducer().setUseTLS(this.isUseTLS());
traceDispatcher.setNamespaceV2(namespaceV2);
this.traceDispatcher = traceDispatcher;
this.defaultLitePullConsumerImpl.registerConsumeMessageHook(
new ConsumeMessageTraceHookImpl(traceDispatcher));
Expand All @@ -611,14 +602,18 @@ private void setTraceDispatcher() {
}

public String getCustomizedTraceTopic() {
return customizedTraceTopic;
return traceTopic;
}

public void setCustomizedTraceTopic(String customizedTraceTopic) {
this.traceTopic = customizedTraceTopic;
}

public boolean isEnableMsgTrace() {
return enableMsgTrace;
return enableTrace;
}

public void setEnableMsgTrace(boolean enableMsgTrace) {
this.enableMsgTrace = enableMsgTrace;
this.enableTrace = enableMsgTrace;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,8 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
// force to use client rebalance
private boolean clientRebalance = true;

private RPCHook rpcHook = null;

/**
* Default constructor.
*/
Expand Down Expand Up @@ -327,6 +329,7 @@ public DefaultMQPushConsumer(RPCHook rpcHook) {
*/
public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook) {
this.consumerGroup = consumerGroup;
this.rpcHook = rpcHook;
this.allocateMessageQueueStrategy = new AllocateMessageQueueAveragely();
defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
}
Expand All @@ -353,6 +356,7 @@ public DefaultMQPushConsumer(final String consumerGroup, boolean enableMsgTrace,
public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook,
AllocateMessageQueueStrategy allocateMessageQueueStrategy) {
this.consumerGroup = consumerGroup;
this.rpcHook = rpcHook;
this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
}
Expand All @@ -369,18 +373,11 @@ public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook,
public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook,
AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean enableMsgTrace, final String customizedTraceTopic) {
this.consumerGroup = consumerGroup;
this.rpcHook = rpcHook;
this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
if (enableMsgTrace) {
try {
AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(consumerGroup, TraceDispatcher.Type.CONSUME, customizedTraceTopic, rpcHook);
dispatcher.setHostConsumer(this.defaultMQPushConsumerImpl);
traceDispatcher = dispatcher;
this.defaultMQPushConsumerImpl.registerConsumeMessageHook(new ConsumeMessageTraceHookImpl(traceDispatcher));
} catch (Throwable e) {
log.error("system mqtrace hook init failed ,maybe can't send msg trace data");
}
}
this.enableTrace = enableMsgTrace;
this.traceTopic = customizedTraceTopic;
}

/**
Expand Down Expand Up @@ -419,6 +416,7 @@ public DefaultMQPushConsumer(final String namespace, final String consumerGroup,
AllocateMessageQueueStrategy allocateMessageQueueStrategy) {
this.consumerGroup = consumerGroup;
this.namespace = namespace;
this.rpcHook = rpcHook;
this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
}
Expand All @@ -438,18 +436,11 @@ public DefaultMQPushConsumer(final String namespace, final String consumerGroup,
AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean enableMsgTrace, final String customizedTraceTopic) {
this.consumerGroup = consumerGroup;
this.namespace = namespace;
this.rpcHook = rpcHook;
this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
if (enableMsgTrace) {
try {
AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(consumerGroup, TraceDispatcher.Type.CONSUME, customizedTraceTopic, rpcHook);
dispatcher.setHostConsumer(this.defaultMQPushConsumerImpl);
traceDispatcher = dispatcher;
this.defaultMQPushConsumerImpl.registerConsumeMessageHook(new ConsumeMessageTraceHookImpl(traceDispatcher));
} catch (Throwable e) {
log.error("system mqtrace hook init failed ,maybe can't send msg trace data");
}
}
this.enableTrace = enableMsgTrace;
this.traceTopic = customizedTraceTopic;
}

/**
Expand All @@ -464,9 +455,6 @@ public void createTopic(String key, String newTopic, int queueNum, Map<String, S
@Override
public void setUseTLS(boolean useTLS) {
super.setUseTLS(useTLS);
if (traceDispatcher instanceof AsyncTraceDispatcher) {
((AsyncTraceDispatcher) traceDispatcher).getTraceProducer().setUseTLS(useTLS);
}
}

/**
Expand Down Expand Up @@ -750,7 +738,21 @@ public Set<MessageQueue> fetchSubscribeMessageQueues(String topic) throws MQClie
public void start() throws MQClientException {
setConsumerGroup(NamespaceUtil.wrapNamespace(this.getNamespace(), this.consumerGroup));
this.defaultMQPushConsumerImpl.start();
if (enableTrace) {
try {
AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(consumerGroup, TraceDispatcher.Type.CONSUME, traceTopic, rpcHook);
dispatcher.setHostConsumer(this.defaultMQPushConsumerImpl);
dispatcher.setNamespaceV2(namespaceV2);
traceDispatcher = dispatcher;
this.defaultMQPushConsumerImpl.registerConsumeMessageHook(new ConsumeMessageTraceHookImpl(traceDispatcher));
} catch (Throwable e) {
log.error("system mqtrace hook init failed ,maybe can't send msg trace data");
}
}
if (null != traceDispatcher) {
if (traceDispatcher instanceof AsyncTraceDispatcher) {
((AsyncTraceDispatcher) traceDispatcher).getTraceProducer().setUseTLS(isUseTLS());
}
try {
traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
} catch (MQClientException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,8 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
*/
private int backPressureForAsyncSendSize = 100 * 1024 * 1024;

private RPCHook rpcHook = null;

/**
* Default constructor.
*/
Expand Down Expand Up @@ -202,6 +204,7 @@ public DefaultMQProducer(final String producerGroup) {
*/
public DefaultMQProducer(final String producerGroup, RPCHook rpcHook) {
this.producerGroup = producerGroup;
this.rpcHook = rpcHook;
defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
produceAccumulator = MQClientManager.getInstance().getOrCreateProduceAccumulator(this);
}
Expand Down Expand Up @@ -243,20 +246,8 @@ public DefaultMQProducer(final String producerGroup, boolean enableMsgTrace, fin
public DefaultMQProducer(final String producerGroup, RPCHook rpcHook, boolean enableMsgTrace,
final String customizedTraceTopic) {
this(producerGroup, rpcHook);
//if client open the message trace feature
if (enableMsgTrace) {
try {
AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(producerGroup, TraceDispatcher.Type.PRODUCE, customizedTraceTopic, rpcHook);
dispatcher.setHostProducer(this.defaultMQProducerImpl);
traceDispatcher = dispatcher;
this.defaultMQProducerImpl.registerSendMessageHook(
new SendMessageTraceHookImpl(traceDispatcher));
this.defaultMQProducerImpl.registerEndTransactionHook(
new EndTransactionTraceHookImpl(traceDispatcher));
} catch (Throwable e) {
logger.error("system mqtrace hook init failed ,maybe can't send msg trace data");
}
}
this.enableTrace = enableMsgTrace;
this.traceTopic = customizedTraceTopic;
}

/**
Expand Down Expand Up @@ -298,6 +289,7 @@ public DefaultMQProducer(final String namespace, final String producerGroup) {
public DefaultMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook) {
this.namespace = namespace;
this.producerGroup = producerGroup;
this.rpcHook = rpcHook;
defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
produceAccumulator = MQClientManager.getInstance().getOrCreateProduceAccumulator(this);
}
Expand All @@ -318,27 +310,8 @@ public DefaultMQProducer(final String namespace, final String producerGroup, RPC
boolean enableMsgTrace, final String customizedTraceTopic) {
this(namespace, producerGroup, rpcHook);
//if client open the message trace feature
if (enableMsgTrace) {
try {
AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(producerGroup, TraceDispatcher.Type.PRODUCE, customizedTraceTopic, rpcHook);
dispatcher.setHostProducer(this.defaultMQProducerImpl);
traceDispatcher = dispatcher;
this.defaultMQProducerImpl.registerSendMessageHook(
new SendMessageTraceHookImpl(traceDispatcher));
this.defaultMQProducerImpl.registerEndTransactionHook(
new EndTransactionTraceHookImpl(traceDispatcher));
} catch (Throwable e) {
logger.error("system mqtrace hook init failed ,maybe can't send msg trace data");
}
}
}

@Override
public void setUseTLS(boolean useTLS) {
super.setUseTLS(useTLS);
if (traceDispatcher instanceof AsyncTraceDispatcher) {
((AsyncTraceDispatcher) traceDispatcher).getTraceProducer().setUseTLS(useTLS);
}
this.enableTrace = enableMsgTrace;
this.traceTopic = customizedTraceTopic;
}

/**
Expand All @@ -356,7 +329,24 @@ public void start() throws MQClientException {
if (this.produceAccumulator != null) {
this.produceAccumulator.start();
}
if (enableTrace) {
try {
AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(producerGroup, TraceDispatcher.Type.PRODUCE, traceTopic, rpcHook);
dispatcher.setHostProducer(this.defaultMQProducerImpl);
dispatcher.setNamespaceV2(this.namespaceV2);
traceDispatcher = dispatcher;
this.defaultMQProducerImpl.registerSendMessageHook(
new SendMessageTraceHookImpl(traceDispatcher));
this.defaultMQProducerImpl.registerEndTransactionHook(
new EndTransactionTraceHookImpl(traceDispatcher));
} catch (Throwable e) {
logger.error("system mqtrace hook init failed ,maybe can't send msg trace data");
}
}
if (null != traceDispatcher) {
if (traceDispatcher instanceof AsyncTraceDispatcher) {
((AsyncTraceDispatcher) traceDispatcher).getTraceProducer().setUseTLS(isUseTLS());
}
try {
traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
} catch (MQClientException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
private volatile AccessChannel accessChannel = AccessChannel.LOCAL;
private String group;
private Type type;
private String namespaceV2;

public AsyncTraceDispatcher(String group, Type type, String traceTopicName, RPCHook rpcHook) {
// queueSize is greater than or equal to the n power of 2 of value
Expand Down Expand Up @@ -144,10 +145,20 @@ public void setHostConsumer(DefaultMQPushConsumerImpl hostConsumer) {
this.hostConsumer = hostConsumer;
}

public String getNamespaceV2() {
return namespaceV2;
}

public void setNamespaceV2(String namespaceV2) {
this.namespaceV2 = namespaceV2;
}

public void start(String nameSrvAddr, AccessChannel accessChannel) throws MQClientException {
if (isStarted.compareAndSet(false, true)) {
traceProducer.setNamesrvAddr(nameSrvAddr);
traceProducer.setInstanceName(TRACE_INSTANCE_NAME + "_" + nameSrvAddr);
traceProducer.setNamespaceV2(namespaceV2);
traceProducer.setEnableTrace(false);
traceProducer.start();
}
this.accessChannel = accessChannel;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ public PullResult answer(InvocationOnMock mock) throws Throwable {
new ConsumeMessageOpenTracingHookImpl(tracer));
pushConsumer.setNamesrvAddr("127.0.0.1:9876");
pushConsumer.setPullInterval(60 * 1000);
// disable trace to let mock trace work
pushConsumer.setEnableTrace(false);

OffsetStore offsetStore = Mockito.mock(OffsetStore.class);
Mockito.when(offsetStore.readOffset(any(MessageQueue.class), any(ReadOffsetType.class))).thenReturn(0L);
Expand Down
Loading

0 comments on commit 88e6447

Please sign in to comment.