Skip to content

Commit

Permalink
[ISSUE apache#7699] Refector NamespaceRpcHook (apache#7769)
Browse files Browse the repository at this point in the history
* [ISSUE apache#7699] Refector NamespaceRpcHook

* fix
  • Loading branch information
drpmma authored Jan 23, 2024
1 parent 6d75134 commit 5262358
Show file tree
Hide file tree
Showing 8 changed files with 78 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,13 @@
import org.apache.rocketmq.remoting.protocol.header.GetMaxOffsetResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.GetMinOffsetRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.GetMinOffsetResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.LockBatchMqRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.PullMessageRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.PullMessageResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeaderV2;
import org.apache.rocketmq.remoting.protocol.header.SendMessageResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.UnlockBatchMqRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.AlterSyncStateSetRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterResponseHeader;
Expand Down Expand Up @@ -906,7 +908,7 @@ public void lockBatchMQAsync(
final LockBatchRequestBody requestBody,
final long timeoutMillis,
final LockCallback callback) throws RemotingException, InterruptedException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.LOCK_BATCH_MQ, null);
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.LOCK_BATCH_MQ, new LockBatchMqRequestHeader());

request.setBody(requestBody.encode());
this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {
Expand Down Expand Up @@ -945,7 +947,7 @@ public void unlockBatchMQAsync(
final UnlockBatchRequestBody requestBody,
final long timeoutMillis,
final UnlockCallback callback) throws RemotingException, InterruptedException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UNLOCK_BATCH_MQ, null);
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UNLOCK_BATCH_MQ, new UnlockBatchMqRequestHeader());

request.setBody(requestBody.encode());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@
import org.apache.rocketmq.remoting.protocol.header.GetTopicStatsInfoRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.GetTopicsByClusterRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.HeartbeatRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.LockBatchMqRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.PopMessageRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.PopMessageResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.PullMessageRequestHeader;
Expand All @@ -195,6 +196,7 @@
import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeaderV2;
import org.apache.rocketmq.remoting.protocol.header.SendMessageResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.UnlockBatchMqRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.UnregisterClientRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.UpdateConsumerOffsetRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.UpdateGlobalWhiteAddrsConfigRequestHeader;
Expand Down Expand Up @@ -1613,7 +1615,7 @@ public Set<MessageQueue> lockBatchMQ(
final String addr,
final LockBatchRequestBody requestBody,
final long timeoutMillis) throws RemotingException, MQBrokerException, InterruptedException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.LOCK_BATCH_MQ, null);
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.LOCK_BATCH_MQ, new LockBatchMqRequestHeader());

request.setBody(requestBody.encode());
RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
Expand All @@ -1637,7 +1639,7 @@ public void unlockBatchMQ(
final long timeoutMillis,
final boolean oneway
) throws RemotingException, MQBrokerException, InterruptedException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UNLOCK_BATCH_MQ, null);
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UNLOCK_BATCH_MQ, new UnlockBatchMqRequestHeader());

request.setBody(requestBody.encode());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import org.apache.rocketmq.remoting.protocol.header.GetMinOffsetRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.GetMinOffsetResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.HeartbeatRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.LockBatchMqRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.NotificationRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.NotificationResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.PopMessageRequestHeader;
Expand All @@ -77,6 +78,7 @@
import org.apache.rocketmq.remoting.protocol.header.SearchOffsetResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeaderV2;
import org.apache.rocketmq.remoting.protocol.header.UnlockBatchMqRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.UpdateConsumerOffsetRequestHeader;
import org.apache.rocketmq.remoting.protocol.heartbeat.HeartbeatData;

Expand Down Expand Up @@ -543,7 +545,7 @@ public CompletableFuture<Long> searchOffset(String brokerAddr, SearchOffsetReque

public CompletableFuture<Set<MessageQueue>> lockBatchMQWithFuture(String brokerAddr,
LockBatchRequestBody requestBody, long timeoutMillis) {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.LOCK_BATCH_MQ, null);
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.LOCK_BATCH_MQ, new LockBatchMqRequestHeader());
request.setBody(requestBody.encode());
return this.getRemotingClient().invoke(brokerAddr, request, timeoutMillis).thenCompose(response -> {
CompletableFuture<Set<MessageQueue>> future0 = new CompletableFuture<>();
Expand All @@ -565,7 +567,7 @@ public CompletableFuture<Set<MessageQueue>> lockBatchMQWithFuture(String brokerA
public CompletableFuture<Void> unlockBatchMQOneway(String brokerAddr,
UnlockBatchRequestBody requestBody, long timeoutMillis) {
CompletableFuture<Void> future = new CompletableFuture<>();
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UNLOCK_BATCH_MQ, null);
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UNLOCK_BATCH_MQ, new UnlockBatchMqRequestHeader());
request.setBody(requestBody.encode());
try {
this.getRemotingClient().invokeOneway(brokerAddr, request, timeoutMillis);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,9 @@

import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.rpc.RpcRequestHeader;

public class NamespaceRpcHook implements RPCHook {
private final ClientConfig clientConfig;
Expand All @@ -33,13 +32,9 @@ public NamespaceRpcHook(ClientConfig clientConfig) {

@Override
public void doBeforeRequest(String remoteAddr, RemotingCommand request) {
CommandCustomHeader customHeader = request.readCustomHeader();
if (customHeader instanceof RpcRequestHeader) {
RpcRequestHeader requestHeader = (RpcRequestHeader) customHeader;
if (StringUtils.isNotEmpty(clientConfig.getNamespaceV2())) {
requestHeader.setNamespaced(true);
requestHeader.setNamespace(clientConfig.getNamespaceV2());
}
if (StringUtils.isNotEmpty(clientConfig.getNamespaceV2())) {
request.addExtField(MixAll.RPC_REQUEST_HEADER_NAMESPACED_FIELD, "true");
request.addExtField(MixAll.RPC_REQUEST_HEADER_NAMESPACE_FIELD, clientConfig.getNamespaceV2());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.rocketmq.client.rpchook;

import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.RequestCode;
import org.apache.rocketmq.remoting.protocol.header.PullMessageRequestHeader;
Expand All @@ -39,8 +40,8 @@ public void testDoBeforeRequestWithNamespace() {
PullMessageRequestHeader pullMessageRequestHeader = new PullMessageRequestHeader();
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, pullMessageRequestHeader);
namespaceRpcHook.doBeforeRequest("", request);
assertThat(pullMessageRequestHeader.getNamespaced()).isTrue();
assertThat(pullMessageRequestHeader.getNamespace()).isEqualTo(namespace);
assertThat(request.getExtFields().get(MixAll.RPC_REQUEST_HEADER_NAMESPACED_FIELD)).isEqualTo("true");
assertThat(request.getExtFields().get(MixAll.RPC_REQUEST_HEADER_NAMESPACE_FIELD)).isEqualTo(namespace);
}

@Test
Expand All @@ -50,7 +51,6 @@ public void testDoBeforeRequestWithoutNamespace() {
PullMessageRequestHeader pullMessageRequestHeader = new PullMessageRequestHeader();
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, pullMessageRequestHeader);
namespaceRpcHook.doBeforeRequest("", request);
assertThat(pullMessageRequestHeader.getNamespaced()).isNull();
assertThat(pullMessageRequestHeader.getNamespace()).isNull();
assertThat(request.getExtFields()).isNull();
}
}
2 changes: 2 additions & 0 deletions common/src/main/java/org/apache/rocketmq/common/MixAll.java
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ public class MixAll {
public static final String ROCKETMQ_ZONE_MODE_PROPERTY = "rocketmq.zone.mode";
public static final String ZONE_NAME = "__ZONE_NAME";
public static final String ZONE_MODE = "__ZONE_MODE";
public final static String RPC_REQUEST_HEADER_NAMESPACED_FIELD = "nsd";
public final static String RPC_REQUEST_HEADER_NAMESPACE_FIELD = "ns";

private static final Logger log = LoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
public static final String LOGICAL_QUEUE_MOCK_BROKER_PREFIX = "__syslo__";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.rocketmq.remoting.protocol.header;

import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.rpc.RpcRequestHeader;

public class LockBatchMqRequestHeader extends RpcRequestHeader {
@Override
public void checkFields() throws RemotingCommandException {

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.rocketmq.remoting.protocol.header;

import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.rpc.RpcRequestHeader;

public class UnlockBatchMqRequestHeader extends RpcRequestHeader {
@Override
public void checkFields() throws RemotingCommandException {

}
}

0 comments on commit 5262358

Please sign in to comment.