From 1b63e8e1981501f5cad761a4474cfc719052fac2 Mon Sep 17 00:00:00 2001 From: dingshuangxi888 Date: Tue, 19 Mar 2024 10:34:27 +0800 Subject: [PATCH] [ISSUE #7945] Make HAProxyMessageForwarder Scalable (#7946) --- .../http2proxy/HAProxyMessageForwarder.java | 33 ++++++++++++++----- 1 file changed, 25 insertions(+), 8 deletions(-) diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/HAProxyMessageForwarder.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/HAProxyMessageForwarder.java index 6764dbf03b5..39d7057bddd 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/HAProxyMessageForwarder.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/HAProxyMessageForwarder.java @@ -89,8 +89,6 @@ private void forwardHAProxyMessage(Channel inboundChannel, Channel outboundChann protected HAProxyMessage buildHAProxyMessage(Channel inboundChannel) throws IllegalAccessException, DecoderException { String sourceAddress = null, destinationAddress = null; int sourcePort = 0, destinationPort = 0; - List haProxyTLVs = new ArrayList<>(); - if (inboundChannel.hasAttr(AttributeKeys.PROXY_PROTOCOL_ADDR)) { Attribute[] attributes = (Attribute[]) FieldUtils.readField(FIELD_ATTRIBUTE, inboundChannel); if (ArrayUtils.isEmpty(attributes)) { @@ -117,12 +115,6 @@ protected HAProxyMessage buildHAProxyMessage(Channel inboundChannel) throws Ille if (attribute.key() == AttributeKeys.PROXY_PROTOCOL_SERVER_PORT) { destinationPort = Integer.parseInt(attributeValue); } - if (StringUtils.startsWith(attributeKey, HAProxyConstants.PROXY_PROTOCOL_TLV_PREFIX)) { - HAProxyTLV haProxyTLV = buildHAProxyTLV(attributeKey, attributeValue); - if (haProxyTLV != null) { - haProxyTLVs.add(haProxyTLV); - } - } } } else { String remoteAddr = RemotingHelper.parseChannelRemoteAddr(inboundChannel); @@ -137,10 +129,35 @@ protected HAProxyMessage buildHAProxyMessage(Channel inboundChannel) throws Ille HAProxyProxiedProtocol proxiedProtocol = AclUtils.isColon(sourceAddress) ? HAProxyProxiedProtocol.TCP6 : HAProxyProxiedProtocol.TCP4; + List haProxyTLVs = buildHAProxyTLV(inboundChannel); + return new HAProxyMessage(HAProxyProtocolVersion.V2, HAProxyCommand.PROXY, proxiedProtocol, sourceAddress, destinationAddress, sourcePort, destinationPort, haProxyTLVs); } + protected List buildHAProxyTLV(Channel inboundChannel) throws IllegalAccessException, DecoderException { + List result = new ArrayList<>(); + if (!inboundChannel.hasAttr(AttributeKeys.PROXY_PROTOCOL_ADDR)) { + return result; + } + Attribute[] attributes = (Attribute[]) FieldUtils.readField(FIELD_ATTRIBUTE, inboundChannel); + if (ArrayUtils.isEmpty(attributes)) { + return result; + } + for (Attribute attribute : attributes) { + String attributeKey = attribute.key().name(); + if (!StringUtils.startsWith(attributeKey, HAProxyConstants.PROXY_PROTOCOL_TLV_PREFIX)) { + continue; + } + String attributeValue = (String) attribute.get(); + HAProxyTLV haProxyTLV = buildHAProxyTLV(attributeKey, attributeValue); + if (haProxyTLV != null) { + result.add(haProxyTLV); + } + } + return result; + } + protected HAProxyTLV buildHAProxyTLV(String attributeKey, String attributeValue) throws DecoderException { String typeString = StringUtils.substringAfter(attributeKey, HAProxyConstants.PROXY_PROTOCOL_TLV_PREFIX); ByteBuf byteBuf = Unpooled.buffer();