From e1d481a5bb8ec9ec1dc038f37f0e845f2a2f5255 Mon Sep 17 00:00:00 2001 From: dakirily Date: Thu, 1 Feb 2024 16:14:54 +0100 Subject: [PATCH] QPID-8666: [Broker-J] Broker plugin jdbc-provider-bone replacement --- .../apache/qpid/server/model/BrokerModel.java | 5 +- ...onfigurationStoreUpgraderAndRecoverer.java | 5 ++ .../BrokerStoreUpgraderAndRecoverer.java | 51 +++++++++++- .../qpid/server/store/UpgraderHelper.java | 50 +++++++++++ .../VirtualHostStoreUpgraderAndRecoverer.java | 37 +++++++++ .../BrokerStoreUpgraderAndRecovererTest.java | 83 +++++++++++++++++-- ...tualHostStoreUpgraderAndRecovererTest.java | 77 +++++++++++++++++ 7 files changed, 296 insertions(+), 12 deletions(-) diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/BrokerModel.java b/broker-core/src/main/java/org/apache/qpid/server/model/BrokerModel.java index 68cedf31d7..4e774b5148 100644 --- a/broker-core/src/main/java/org/apache/qpid/server/model/BrokerModel.java +++ b/broker-core/src/main/java/org/apache/qpid/server/model/BrokerModel.java @@ -84,9 +84,12 @@ public final class BrokerModel extends Model * * 9.0 * Introduced PublishProducer as a child of Exchange and PointToPointProducer as child of Queue + * + * 9.1 + * Replaced JDBC connection pool provider from BoneCP to HikariCP */ public static final int MODEL_MAJOR_VERSION = 9; - public static final int MODEL_MINOR_VERSION = 0; + public static final int MODEL_MINOR_VERSION = 1; public static final String MODEL_VERSION = MODEL_MAJOR_VERSION + "." + MODEL_MINOR_VERSION; private static final Model MODEL_INSTANCE = new BrokerModel(); private final Map, Class> _parents = new HashMap<>(); diff --git a/broker-core/src/main/java/org/apache/qpid/server/store/AbstractConfigurationStoreUpgraderAndRecoverer.java b/broker-core/src/main/java/org/apache/qpid/server/store/AbstractConfigurationStoreUpgraderAndRecoverer.java index ea5842523b..924f48d568 100644 --- a/broker-core/src/main/java/org/apache/qpid/server/store/AbstractConfigurationStoreUpgraderAndRecoverer.java +++ b/broker-core/src/main/java/org/apache/qpid/server/store/AbstractConfigurationStoreUpgraderAndRecoverer.java @@ -31,6 +31,11 @@ abstract class AbstractConfigurationStoreUpgraderAndRecoverer { + protected static final String BROKER = "Broker"; + protected static final String VIRTUALHOST = "VirtualHost"; + protected static final String JDBC_VIRTUALHOST_TYPE = "JDBC"; + protected static final String CONTEXT = "context"; + private final Map _upgraders = new HashMap<>(); private final String _initialVersion; diff --git a/broker-core/src/main/java/org/apache/qpid/server/store/BrokerStoreUpgraderAndRecoverer.java b/broker-core/src/main/java/org/apache/qpid/server/store/BrokerStoreUpgraderAndRecoverer.java index c78884cbe8..319aaaf9ba 100644 --- a/broker-core/src/main/java/org/apache/qpid/server/store/BrokerStoreUpgraderAndRecoverer.java +++ b/broker-core/src/main/java/org/apache/qpid/server/store/BrokerStoreUpgraderAndRecoverer.java @@ -70,6 +70,7 @@ public BrokerStoreUpgraderAndRecoverer(SystemConfig systemConfig) register(new Upgrader_7_0_to_7_1()); register(new Upgrader_7_1_to_8_0()); register(new Upgrader_8_0_to_9_0()); + register(new Upgrader_9_0_to_9_1()); } private static final class Upgrader_1_0_to_1_1 extends StoreUpgraderPhase @@ -761,6 +762,47 @@ public void complete() } } + private static class Upgrader_9_0_to_9_1 extends StoreUpgraderPhase + { + public Upgrader_9_0_to_9_1() + { + super("modelVersion", "9.0", "9.1"); + } + + @Override + public void configuredObject(final ConfiguredObjectRecord record) + { + if (BROKER.equals(record.getType())) + { + upgradeRootRecord(record); + } + + final Map attributes = record.getAttributes(); + + if (attributes == null) + { + return; + } + + if (!(VIRTUALHOST.equals(record.getType()) && JDBC_VIRTUALHOST_TYPE.equals(attributes.get("type")))) + { + return; + } + + if (attributes.containsKey(CONTEXT)) + { + final ConfiguredObjectRecord updatedRecord = UpgraderHelper.upgradeConnectionPool(record); + getUpdateMap().put(updatedRecord.getId(), updatedRecord); + } + } + + @Override + public void complete() + { + + } + } + private static class VirtualHostEntryUpgrader { Map _messageStoreToNodeTransformers = Map.of("DERBY", new AttributesTransformer(). @@ -805,10 +847,11 @@ private static class VirtualHostEntryUpgrader addAttributeTransformer("jdbcBytesForBlob", addContextVar("qpid.jdbcstore.useBytesForBlob")). addAttributeTransformer("jdbcBlobType", addContextVar("qpid.jdbcstore.blobType")). addAttributeTransformer("jdbcVarbinaryType", addContextVar("qpid.jdbcstore.varBinaryType")). - addAttributeTransformer("maximumPoolSize", - addContextVar("qpid.jdbcstore.hikaricp.maximumPoolSize")). - addAttributeTransformer("minimumIdle", - addContextVar("qpid.jdbcstore.hikaricp.minimumIdle")), + addAttributeTransformer("partitionCount", addContextVar("qpid.jdbcstore.bonecp.partitionCount")). + addAttributeTransformer("maxConnectionsPerPartition", + addContextVar("qpid.jdbcstore.bonecp.maxConnectionsPerPartition")). + addAttributeTransformer("minConnectionsPerPartition", + addContextVar("qpid.jdbcstore.bonecp.minConnectionsPerPartition")), "BDB_HA", new AttributesTransformer(). addAttributeTransformer("id", copyAttribute()). addAttributeTransformer("createdTime", copyAttribute()). diff --git a/broker-core/src/main/java/org/apache/qpid/server/store/UpgraderHelper.java b/broker-core/src/main/java/org/apache/qpid/server/store/UpgraderHelper.java index 4428d67bbf..3569138697 100644 --- a/broker-core/src/main/java/org/apache/qpid/server/store/UpgraderHelper.java +++ b/broker-core/src/main/java/org/apache/qpid/server/store/UpgraderHelper.java @@ -26,6 +26,22 @@ public class UpgraderHelper { + static final String CONTEXT = "context"; + + static final String CP_TYPE = "connectionPoolType"; + static final String BONECP = "BONECP"; + static final String HIKARICP = "HIKARICP"; + + static final String PARTITION_COUNT_PARAM = "qpid.jdbcstore.bonecp.partitionCount"; + static final String MAX_POOL_SIZE_OLD_PARAM = "qpid.jdbcstore.bonecp.maxConnectionsPerPartition"; + static final String MIN_IDLE_OLD_PARAM = "qpid.jdbcstore.bonecp.minConnectionsPerPartition"; + + static final String MAX_POOL_SIZE_PARAM = "qpid.jdbcstore.hikaricp.maximumPoolSize"; + static final String MIN_IDLE_PARAM = "qpid.jdbcstore.hikaricp.minimumIdle"; + + static final Map RENAME_MAPPING = Map.of(MAX_POOL_SIZE_OLD_PARAM, MAX_POOL_SIZE_PARAM, + MIN_IDLE_OLD_PARAM, MIN_IDLE_PARAM); + public static final Map MODEL9_MAPPING_FOR_RENAME_TO_ALLOW_DENY_CONTEXT_VARIABLES = new HashMap<>(); static { @@ -57,4 +73,38 @@ public static Map reverse(Map map) { return map.entrySet().stream().collect(Collectors.toMap(Map.Entry::getValue, Map.Entry::getKey)); } + + /** Upgrades connection pool from BoneCP to HikariCP (model version 9.0 to 9.1) */ + public static ConfiguredObjectRecord upgradeConnectionPool(final ConfiguredObjectRecord record) + { + final Map attributes = record.getAttributes(); + + final Object contextObject = attributes.get(CONTEXT); + + if (contextObject instanceof Map) + { + final Map context = (Map) contextObject; + final Map newContext = UpgraderHelper.renameContextVariables(context, RENAME_MAPPING); + + if (BONECP.equals(attributes.get(CP_TYPE))) + { + final int partitionCount = newContext.get(PARTITION_COUNT_PARAM) != null + ? Integer.parseInt(newContext.remove(PARTITION_COUNT_PARAM)) : 0; + final int maximumPoolSize = newContext.get(MAX_POOL_SIZE_PARAM) != null && partitionCount != 0 + ? Integer.parseInt(newContext.get(MAX_POOL_SIZE_PARAM)) * partitionCount : 40; + final int minIdle = newContext.get(MIN_IDLE_PARAM) != null && partitionCount != 0 + ? Integer.parseInt(newContext.get(MIN_IDLE_PARAM)) * partitionCount : 20; + newContext.put(MAX_POOL_SIZE_PARAM, String.valueOf(maximumPoolSize)); + newContext.put(MIN_IDLE_PARAM, String.valueOf(minIdle)); + } + final Map updatedAttributes = new HashMap<>(record.getAttributes()); + if (BONECP.equals(attributes.get(CP_TYPE))) + { + updatedAttributes.put(CP_TYPE, HIKARICP); + } + updatedAttributes.put(CONTEXT, newContext); + return new ConfiguredObjectRecordImpl(record.getId(), record.getType(), updatedAttributes, record.getParents()); + } + return record; + } } diff --git a/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java b/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java index 2a3bc71393..f9068494e1 100644 --- a/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java +++ b/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java @@ -74,6 +74,7 @@ public VirtualHostStoreUpgraderAndRecoverer(VirtualHostNode virtualHostNode) register(new Upgrader_7_0_to_7_1()); register(new Upgrader_7_1_to_8_0()); register(new Upgrader_8_0_to_9_0()); + register(new Upgrader_9_0_to_9_1()); Map defaultExchangeIds = new HashMap<>(); for (String exchangeName : DEFAULT_EXCHANGES.keySet()) @@ -1109,6 +1110,42 @@ public void complete() } } + private static class Upgrader_9_0_to_9_1 extends StoreUpgraderPhase + { + public Upgrader_9_0_to_9_1() + { + super("modelVersion", "9.0", "9.1"); + } + + @Override + public void configuredObject(final ConfiguredObjectRecord record) + { + final Map attributes = record.getAttributes(); + + if (attributes == null) + { + return; + } + + if (!(VIRTUALHOST.equals(record.getType()) && JDBC_VIRTUALHOST_TYPE.equals(attributes.get("type")))) + { + return; + } + + if (attributes.containsKey(CONTEXT)) + { + final ConfiguredObjectRecord updatedRecord = UpgraderHelper.upgradeConnectionPool(record); + getUpdateMap().put(updatedRecord.getId(), updatedRecord); + } + } + + @Override + public void complete() + { + + } + } + public boolean upgradeAndRecover(final DurableConfigurationStore durableConfigurationStore, final ConfiguredObjectRecord... initialRecords) { diff --git a/broker-core/src/test/java/org/apache/qpid/server/store/BrokerStoreUpgraderAndRecovererTest.java b/broker-core/src/test/java/org/apache/qpid/server/store/BrokerStoreUpgraderAndRecovererTest.java index cc34fd03c4..b344e14c2a 100644 --- a/broker-core/src/test/java/org/apache/qpid/server/store/BrokerStoreUpgraderAndRecovererTest.java +++ b/broker-core/src/test/java/org/apache/qpid/server/store/BrokerStoreUpgraderAndRecovererTest.java @@ -41,6 +41,8 @@ import com.google.common.collect.ImmutableMap; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; import org.apache.qpid.server.configuration.updater.CurrentThreadTaskExecutor; import org.apache.qpid.server.configuration.updater.TaskExecutor; @@ -90,17 +92,18 @@ public void setUp() throws Exception } @Test - public void testUpgradeVirtualHostWithJDBCStoreAndHikariCPPool() + public void testUpgradeVirtualHostWithJDBCStoreAndBoneCPPool() { final Map hostAttributes = ImmutableMap.builder() .put("name", VIRTUALHOST_NAME) .put("modelVersion", "0.4") - .put("connectionPool", "HIKARICP") + .put("connectionPool", "BONECP") .put("connectionURL", "jdbc:derby://localhost:1527/tmp/vh/test;create=true") .put("createdBy", VIRTUALHOST_CREATED_BY) .put("createdTime", VIRTUALHOST_CREATE_TIME) - .put("maximumPoolSize", 7) - .put("minimumIdle", 6) + .put("maxConnectionsPerPartition", 7) + .put("minConnectionsPerPartition", 6) + .put("partitionCount", 2) .put("storeType", "jdbc") .put("type", "STANDARD") .put("jdbcBigIntType", "mybigint") @@ -121,9 +124,10 @@ public void testUpgradeVirtualHostWithJDBCStoreAndHikariCPPool() "qpid.jdbcstore.varBinaryType", "myvarbinary", "qpid.jdbcstore.blobType", "myblob", "qpid.jdbcstore.useBytesForBlob", true, - "qpid.jdbcstore.hikaricp.maximumPoolSize", 7, - "qpid.jdbcstore.hikaricp.minimumIdle", 6); - final Map expectedAttributes = Map.of("connectionPoolType", "HIKARICP", + "qpid.jdbcstore.bonecp.maxConnectionsPerPartition", 7, + "qpid.jdbcstore.bonecp.minConnectionsPerPartition", 6, + "qpid.jdbcstore.bonecp.partitionCount", 2); + final Map expectedAttributes = Map.of("connectionPoolType", "BONECP", "connectionUrl", "jdbc:derby://localhost:1527/tmp/vh/test;create=true", "createdBy", VIRTUALHOST_CREATED_BY, "createdTime", VIRTUALHOST_CREATE_TIME, @@ -877,6 +881,71 @@ public void testContextVariableUpgradeForTLSCipherSuitesSetOnAuthenticationProvi assertEquals("Ssl.*", contextMap.get(CommonProperties.QPID_SECURITY_TLS_CIPHER_SUITE_DENY_LIST)); } + @ParameterizedTest + @CsvSource(value = + { + "4,20,5,80,20", "0,20,5,40,20", "null,20,5,40,20", "4,null,null,40,20", "null,null,null,40,20" + }, nullValues = { "null" }) + public void testContextVariableUpgradeFromBoneCPToHikariCPProvider(final String partitionCount, + final String maxConnectionsPerPartition, + final String minConnectionsPerPartition, + final String maximumPoolSize, + final String minimumIdle) + { + _brokerRecord.getAttributes().put("modelVersion", "9.0"); + + final Map context = new HashMap<>(); + context.put("qpid.jdbcstore.bonecp.partitionCount", partitionCount); + context.put("qpid.jdbcstore.bonecp.maxConnectionsPerPartition", maxConnectionsPerPartition); + context.put("qpid.jdbcstore.bonecp.minConnectionsPerPartition", minConnectionsPerPartition); + final Map attributes = Map.of("name", getTestName(), + "type", "JDBC", + "connectionPoolType", "BONECP", + "context", context); + final ConfiguredObjectRecord virtualHostRecord = mock(ConfiguredObjectRecord.class);; + when(virtualHostRecord.getId()).thenReturn(randomUUID()); + when(virtualHostRecord.getType()).thenReturn("VirtualHost"); + when(virtualHostRecord.getAttributes()).thenReturn(attributes); + + final DurableConfigurationStore dcs = new DurableConfigurationStoreStub(virtualHostRecord, _brokerRecord); + final BrokerStoreUpgraderAndRecoverer recoverer = new BrokerStoreUpgraderAndRecoverer(_systemConfig); + final List records = upgrade(dcs, recoverer); + final Map contextMap = findCategoryRecordAndGetContext("VirtualHost", records); + + final ConfiguredObjectRecord upgradedVirtualHost = records.stream() + .filter(record -> record.getType().equals("VirtualHost")).findFirst() + .orElse(null); + + assertNotNull(upgradedVirtualHost); + assertEquals(maximumPoolSize, contextMap.get("qpid.jdbcstore.hikaricp.maximumPoolSize")); + assertEquals(minimumIdle, contextMap.get("qpid.jdbcstore.hikaricp.minimumIdle")); + assertEquals("HIKARICP", upgradedVirtualHost.getAttributes().get("connectionPoolType")); + } + + @Test + public void testContextVariableUpgradeFromDefaultCPToHikariCPProvider() + { + _brokerRecord.getAttributes().put("modelVersion", "9.0"); + + final Map attributes = Map.of("name", getTestName(), + "type", "JDBC", + "connectionPoolType", "NONE", + "context", new HashMap<>()); + final ConfiguredObjectRecord virtualHostRecord = mock(ConfiguredObjectRecord.class);; + when(virtualHostRecord.getId()).thenReturn(randomUUID()); + when(virtualHostRecord.getType()).thenReturn("VirtualHost"); + when(virtualHostRecord.getAttributes()).thenReturn(attributes); + + final DurableConfigurationStore dcs = new DurableConfigurationStoreStub(virtualHostRecord, _brokerRecord); + final BrokerStoreUpgraderAndRecoverer recoverer = new BrokerStoreUpgraderAndRecoverer(_systemConfig); + final List records = upgrade(dcs, recoverer); + final Map contextMap = findCategoryRecordAndGetContext("VirtualHost", records); + + assertNull(contextMap.get("qpid.jdbcstore.hikaricp.maximumPoolSize")); + assertNull(contextMap.get("qpid.jdbcstore.hikaricp.minimumIdle")); + assertEquals("NONE", virtualHostRecord.getAttributes().get("connectionPoolType")); + } + private ConfiguredObjectRecord createMockRecordForGivenCategoryTypeAndContext(final String category, final String type, final Map context) diff --git a/broker-core/src/test/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecovererTest.java b/broker-core/src/test/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecovererTest.java index 78466e11b1..39970112b1 100644 --- a/broker-core/src/test/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecovererTest.java +++ b/broker-core/src/test/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecovererTest.java @@ -22,6 +22,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -34,6 +35,8 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; import org.apache.qpid.server.configuration.CommonProperties; import org.apache.qpid.server.model.Broker; @@ -302,6 +305,80 @@ public void testContextVariableUpgradeForTLSCipherSuitesSetOnVirtualHostAccessCo assertEquals("Ssl.*", newContext.get(CommonProperties.QPID_SECURITY_TLS_CIPHER_SUITE_DENY_LIST)); } + @ParameterizedTest + @CsvSource(value = + { + "4,20,5,80,20", "0,20,5,40,20", "null,20,5,40,20", "4,null,null,40,20", "null,null,null,40,20" + }, nullValues = { "null" }) + public void testContextVariableUpgradeFromBoneCPToHikariCPProvider(final String partitionCount, + final String maxConnectionsPerPartition, + final String minConnectionsPerPartition, + final String maximumPoolSize, + final String minimumIdle) + { + final Map rootAttributes = Map.of("modelVersion", "9.0", "name", "root"); + final ConfiguredObjectRecord rootRecord = new ConfiguredObjectRecordImpl(randomUUID(), "VirtualHost", rootAttributes); + + final Map context = new HashMap<>(); + context.put("qpid.jdbcstore.bonecp.partitionCount", partitionCount); + context.put("qpid.jdbcstore.bonecp.maxConnectionsPerPartition", maxConnectionsPerPartition); + context.put("qpid.jdbcstore.bonecp.minConnectionsPerPartition", minConnectionsPerPartition); + final Map attributes = Map.of("name", getTestName(), + "modelVersion", "9.0", + "type", "JDBC", + "connectionPoolType", "BONECP", + "context", context); + final ConfiguredObjectRecord virtualHostRecord = mock(ConfiguredObjectRecord.class);; + when(virtualHostRecord.getId()).thenReturn(randomUUID()); + when(virtualHostRecord.getType()).thenReturn("VirtualHost"); + when(virtualHostRecord.getAttributes()).thenReturn(attributes); + + final List records = List.of(rootRecord, virtualHostRecord); + final List upgradedRecords = + _upgraderAndRecoverer.upgrade(_store, records, "VirtualHost", "modelVersion"); + + final ConfiguredObjectRecord upgradedVirtualHost = upgradedRecords.stream() + .filter(record -> record.getId().equals(virtualHostRecord.getId())).findFirst() + .orElse(null); + final Map contextMap = (Map) upgradedVirtualHost.getAttributes().get("context"); + + assertNotNull(upgradedVirtualHost); + assertEquals(maximumPoolSize, contextMap.get("qpid.jdbcstore.hikaricp.maximumPoolSize")); + assertEquals(minimumIdle, contextMap.get("qpid.jdbcstore.hikaricp.minimumIdle")); + assertEquals("HIKARICP", upgradedVirtualHost.getAttributes().get("connectionPoolType")); + } + + @Test + public void testContextVariableUpgradeFromDefaultCPToHikariCPProvider() + { + final Map rootAttributes = Map.of("modelVersion", "9.0", "name", "root"); + final ConfiguredObjectRecord rootRecord = new ConfiguredObjectRecordImpl(randomUUID(), "VirtualHost", rootAttributes); + + final Map attributes = Map.of("name", getTestName(), + "modelVersion", "9.0", + "type", "JDBC", + "connectionPoolType", "NONE", + "context", new HashMap<>()); + final ConfiguredObjectRecord virtualHostRecord = mock(ConfiguredObjectRecord.class);; + when(virtualHostRecord.getId()).thenReturn(randomUUID()); + when(virtualHostRecord.getType()).thenReturn("VirtualHost"); + when(virtualHostRecord.getAttributes()).thenReturn(attributes); + + final List records = List.of(rootRecord, virtualHostRecord); + final List upgradedRecords = + _upgraderAndRecoverer.upgrade(_store, records, "VirtualHost", "modelVersion"); + + final ConfiguredObjectRecord upgradedVirtualHost = upgradedRecords.stream() + .filter(record -> record.getId().equals(virtualHostRecord.getId())).findFirst() + .orElse(null); + final Map contextMap = (Map) upgradedVirtualHost.getAttributes().get("context"); + + assertNotNull(upgradedVirtualHost); + assertNull(contextMap.get("qpid.jdbcstore.hikaricp.maximumPoolSize")); + assertNull(contextMap.get("qpid.jdbcstore.hikaricp.minimumIdle")); + assertEquals("NONE", virtualHostRecord.getAttributes().get("connectionPoolType")); + } + private ConfiguredObjectRecord findRecordById(final UUID id, final List records) { return records.stream().filter(record -> record.getId().equals(id)).findFirst().orElse(null);