Skip to content

Commit

Permalink
[ISSUE apache#7945] Make HAProxyMessageForwarder Scalable (apache#7946)
Browse files Browse the repository at this point in the history
  • Loading branch information
dingshuangxi888 authored Mar 19, 2024
1 parent 9b4f16a commit 1b63e8e
Showing 1 changed file with 25 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,6 @@ private void forwardHAProxyMessage(Channel inboundChannel, Channel outboundChann
protected HAProxyMessage buildHAProxyMessage(Channel inboundChannel) throws IllegalAccessException, DecoderException {
String sourceAddress = null, destinationAddress = null;
int sourcePort = 0, destinationPort = 0;
List<HAProxyTLV> haProxyTLVs = new ArrayList<>();

if (inboundChannel.hasAttr(AttributeKeys.PROXY_PROTOCOL_ADDR)) {
Attribute<?>[] attributes = (Attribute<?>[]) FieldUtils.readField(FIELD_ATTRIBUTE, inboundChannel);
if (ArrayUtils.isEmpty(attributes)) {
Expand All @@ -117,12 +115,6 @@ protected HAProxyMessage buildHAProxyMessage(Channel inboundChannel) throws Ille
if (attribute.key() == AttributeKeys.PROXY_PROTOCOL_SERVER_PORT) {
destinationPort = Integer.parseInt(attributeValue);
}
if (StringUtils.startsWith(attributeKey, HAProxyConstants.PROXY_PROTOCOL_TLV_PREFIX)) {
HAProxyTLV haProxyTLV = buildHAProxyTLV(attributeKey, attributeValue);
if (haProxyTLV != null) {
haProxyTLVs.add(haProxyTLV);
}
}
}
} else {
String remoteAddr = RemotingHelper.parseChannelRemoteAddr(inboundChannel);
Expand All @@ -137,10 +129,35 @@ protected HAProxyMessage buildHAProxyMessage(Channel inboundChannel) throws Ille
HAProxyProxiedProtocol proxiedProtocol = AclUtils.isColon(sourceAddress) ? HAProxyProxiedProtocol.TCP6 :
HAProxyProxiedProtocol.TCP4;

List<HAProxyTLV> haProxyTLVs = buildHAProxyTLV(inboundChannel);

return new HAProxyMessage(HAProxyProtocolVersion.V2, HAProxyCommand.PROXY,
proxiedProtocol, sourceAddress, destinationAddress, sourcePort, destinationPort, haProxyTLVs);
}

protected List<HAProxyTLV> buildHAProxyTLV(Channel inboundChannel) throws IllegalAccessException, DecoderException {
List<HAProxyTLV> result = new ArrayList<>();
if (!inboundChannel.hasAttr(AttributeKeys.PROXY_PROTOCOL_ADDR)) {
return result;
}
Attribute<?>[] attributes = (Attribute<?>[]) FieldUtils.readField(FIELD_ATTRIBUTE, inboundChannel);
if (ArrayUtils.isEmpty(attributes)) {
return result;
}
for (Attribute<?> attribute : attributes) {
String attributeKey = attribute.key().name();
if (!StringUtils.startsWith(attributeKey, HAProxyConstants.PROXY_PROTOCOL_TLV_PREFIX)) {
continue;
}
String attributeValue = (String) attribute.get();
HAProxyTLV haProxyTLV = buildHAProxyTLV(attributeKey, attributeValue);
if (haProxyTLV != null) {
result.add(haProxyTLV);
}
}
return result;
}

protected HAProxyTLV buildHAProxyTLV(String attributeKey, String attributeValue) throws DecoderException {
String typeString = StringUtils.substringAfter(attributeKey, HAProxyConstants.PROXY_PROTOCOL_TLV_PREFIX);
ByteBuf byteBuf = Unpooled.buffer();
Expand Down

0 comments on commit 1b63e8e

Please sign in to comment.