Skip to content

Commit

Permalink
QPID-8666: [Broker-J] Broker plugin jdbc-provider-bone replacement (#239
Browse files Browse the repository at this point in the history
)
  • Loading branch information
dakirily authored Feb 6, 2024
1 parent 484d4b7 commit 86978bd
Show file tree
Hide file tree
Showing 14 changed files with 195 additions and 90 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
abstract class AbstractConfigurationStoreUpgraderAndRecoverer
{
protected static final String VIRTUALHOST = "VirtualHost";
protected static final String JDBC_VIRTUALHOST_TYPE = "JDBC";
protected static final String JDBC = "JDBC";
protected static final String CONTEXT = "context";

private final Map<String, StoreUpgraderPhase> _upgraders = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -772,11 +772,13 @@ public Upgrader_9_0_to_9_1()
}

@Override
public void configuredObject(final ConfiguredObjectRecord record)
public void configuredObject(ConfiguredObjectRecord record)
{
if (BROKER.equals(record.getType()))
{
upgradeRootRecord(record);
record = upgradeRootRecord(UpgraderHelper.upgradeConnectionPool(record));
getUpdateMap().put(record.getId(), record);
return;
}

final Map<String, Object> attributes = record.getAttributes();
Expand All @@ -786,16 +788,13 @@ public void configuredObject(final ConfiguredObjectRecord record)
return;
}

if (!(VIRTUALHOST.equals(record.getType()) && JDBC_VIRTUALHOST_TYPE.equals(attributes.get("type"))))
if (!JDBC.equals(attributes.get("type")))
{
return;
}

if (attributes.containsKey(CONTEXT))
{
final ConfiguredObjectRecord updatedRecord = UpgraderHelper.upgradeConnectionPool(record);
getUpdateMap().put(updatedRecord.getId(), updatedRecord);
}
final ConfiguredObjectRecord updatedRecord = UpgraderHelper.upgradeConnectionPool(record);
getUpdateMap().put(updatedRecord.getId(), updatedRecord);
}

@Override
Expand Down Expand Up @@ -829,7 +828,7 @@ private static class VirtualHostEntryUpgrader
addAttributeTransformer("storeUnderfullSize", copyAttribute()).
addAttributeTransformer("storeOverfullSize", copyAttribute()).
addAttributeTransformer("bdbEnvironmentConfig", mutateAttributeName(CONTEXT)),
JDBC_VIRTUALHOST_TYPE, new AttributesTransformer().
JDBC, new AttributesTransformer().
addAttributeTransformer("id", copyAttribute()).
addAttributeTransformer("name", copyAttribute()).
addAttributeTransformer("createdTime", copyAttribute()).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ public static Map<String, String> renameContextVariables(final Map<String, Strin
oldToNewNameMapping.forEach((oldName, newName) -> {
if (newContext.containsKey(oldName))
{
final String value = newContext.remove(oldName);
final Object object = newContext.remove(oldName);
final String value = object == null ? null : String.valueOf(object);
newContext.put(newName, value);
}
});
Expand All @@ -79,32 +80,40 @@ public static ConfiguredObjectRecord upgradeConnectionPool(final ConfiguredObjec
{
final Map<String, Object> attributes = record.getAttributes();

final Map<String, Object> updatedAttributes = new HashMap<>(record.getAttributes());
if (BONECP.equals(attributes.get(CP_TYPE)))
{
updatedAttributes.put(CP_TYPE, HIKARICP);
}

final Object contextObject = attributes.get(CONTEXT);

if (contextObject instanceof Map)
{
final Map <String, String> context = (Map<String, String>) contextObject;
final Map<String, String> newContext = UpgraderHelper.renameContextVariables(context, RENAME_MAPPING);

if (BONECP.equals(attributes.get(CP_TYPE)))
{
final int partitionCount = newContext.get(PARTITION_COUNT_PARAM) != null
? Integer.parseInt(newContext.remove(PARTITION_COUNT_PARAM)) : 0;
final int maximumPoolSize = newContext.get(MAX_POOL_SIZE_PARAM) != null && partitionCount != 0
? Integer.parseInt(newContext.get(MAX_POOL_SIZE_PARAM)) * partitionCount : 40;
final int minIdle = newContext.get(MIN_IDLE_PARAM) != null && partitionCount != 0
? Integer.parseInt(newContext.get(MIN_IDLE_PARAM)) * partitionCount : 20;
newContext.put(MAX_POOL_SIZE_PARAM, String.valueOf(maximumPoolSize));
newContext.put(MIN_IDLE_PARAM, String.valueOf(minIdle));
}
final Map<String, Object> updatedAttributes = new HashMap<>(record.getAttributes());
if (BONECP.equals(attributes.get(CP_TYPE)))
final int partitionCount = newContext.get(PARTITION_COUNT_PARAM) != null
? Integer.parseInt(String.valueOf(newContext.remove(PARTITION_COUNT_PARAM))) : 0;
final int maximumPoolSize = newContext.get(MAX_POOL_SIZE_PARAM) != null && partitionCount != 0
? Integer.parseInt(String.valueOf(newContext.get(MAX_POOL_SIZE_PARAM))) * partitionCount : 40;
final int minIdle = newContext.get(MIN_IDLE_PARAM) != null && partitionCount != 0
? Integer.parseInt(String.valueOf(newContext.get(MIN_IDLE_PARAM))) * partitionCount : 20;

if (BONECP.equals(attributes.get(CP_TYPE)) || "Broker".equals(record.getType()))
{
updatedAttributes.put(CP_TYPE, HIKARICP);
if (newContext.containsKey(MAX_POOL_SIZE_PARAM))
{
newContext.put(MAX_POOL_SIZE_PARAM, String.valueOf(maximumPoolSize));
}
if (newContext.containsKey(MIN_IDLE_PARAM))
{
newContext.put(MIN_IDLE_PARAM, String.valueOf(minIdle));
}
}

updatedAttributes.put(CONTEXT, newContext);
return new ConfiguredObjectRecordImpl(record.getId(), record.getType(), updatedAttributes, record.getParents());
}
return record;
return new ConfiguredObjectRecordImpl(record.getId(), record.getType(), updatedAttributes, record.getParents());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1127,16 +1127,13 @@ public void configuredObject(final ConfiguredObjectRecord record)
return;
}

if (!(VIRTUALHOST.equals(record.getType()) && JDBC_VIRTUALHOST_TYPE.equals(attributes.get("type"))))
if (!JDBC.equals(attributes.get("type")))
{
return;
}

if (attributes.containsKey(CONTEXT))
{
final ConfiguredObjectRecord updatedRecord = UpgraderHelper.upgradeConnectionPool(record);
getUpdateMap().put(updatedRecord.getId(), updatedRecord);
}
final ConfiguredObjectRecord updatedRecord = UpgraderHelper.upgradeConnectionPool(record);
getUpdateMap().put(updatedRecord.getId(), updatedRecord);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.store;

import static org.apache.qpid.server.test.AttributesUtils.createAttributesMap;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
Expand Down Expand Up @@ -124,10 +125,9 @@ public void testUpgradeVirtualHostWithJDBCStoreAndBoneCPPool()
"qpid.jdbcstore.varBinaryType", "myvarbinary",
"qpid.jdbcstore.blobType", "myblob",
"qpid.jdbcstore.useBytesForBlob", true,
"qpid.jdbcstore.bonecp.maxConnectionsPerPartition", 7,
"qpid.jdbcstore.bonecp.minConnectionsPerPartition", 6,
"qpid.jdbcstore.bonecp.partitionCount", 2);
final Map<String,Object> expectedAttributes = Map.of("connectionPoolType", "BONECP",
"qpid.jdbcstore.hikaricp.maximumPoolSize", "14",
"qpid.jdbcstore.hikaricp.minimumIdle", "12");
final Map<String,Object> expectedAttributes = Map.of("connectionPoolType", "HIKARICP",
"connectionUrl", "jdbc:derby://localhost:1527/tmp/vh/test;create=true",
"createdBy", VIRTUALHOST_CREATED_BY,
"createdTime", VIRTUALHOST_CREATE_TIME,
Expand Down Expand Up @@ -898,27 +898,87 @@ public void testContextVariableUpgradeFromBoneCPToHikariCPProvider(final String
context.put("qpid.jdbcstore.bonecp.partitionCount", partitionCount);
context.put("qpid.jdbcstore.bonecp.maxConnectionsPerPartition", maxConnectionsPerPartition);
context.put("qpid.jdbcstore.bonecp.minConnectionsPerPartition", minConnectionsPerPartition);
context.put("qpid.jdbcstore.property1", "1");
context.put("qpid.jdbcstore.property2", "two");
context.put("qpid.jdbcstore.property3", "_3_");

final Map<String, Object> attributes = Map.of("name", getTestName(),
"type", "JDBC",
"connectionPoolType", "BONECP",
"context", context);
final ConfiguredObjectRecord virtualHostRecord = mock(ConfiguredObjectRecord.class);;
"connectionPoolType", "BONECP");

_brokerRecord.getAttributes().put("context", new HashMap<>(context));

final ConfiguredObjectRecord systemConfigRecord = mock(ConfiguredObjectRecord.class);
when(systemConfigRecord.getId()).thenReturn(randomUUID());
when(systemConfigRecord.getType()).thenReturn("SystemConfig");
when(systemConfigRecord.getAttributes()).thenReturn(createAttributesMap(attributes, context));

final ConfiguredObjectRecord virtualHostNodeRecord = mock(ConfiguredObjectRecord.class);
when(virtualHostNodeRecord.getId()).thenReturn(randomUUID());
when(virtualHostNodeRecord.getType()).thenReturn("VirtualHostNode");
when(virtualHostNodeRecord.getAttributes()).thenReturn(createAttributesMap(attributes, context));

final ConfiguredObjectRecord virtualHostRecord = mock(ConfiguredObjectRecord.class);
when(virtualHostRecord.getId()).thenReturn(randomUUID());
when(virtualHostRecord.getType()).thenReturn("VirtualHost");
when(virtualHostRecord.getAttributes()).thenReturn(attributes);
when(virtualHostRecord.getAttributes()).thenReturn(createAttributesMap(attributes, context));

final DurableConfigurationStore dcs = new DurableConfigurationStoreStub(virtualHostRecord, _brokerRecord);
final ConfiguredObjectRecord jdbcBrokerLoggerRecord = mock(ConfiguredObjectRecord.class);
when(jdbcBrokerLoggerRecord.getId()).thenReturn(randomUUID());
when(jdbcBrokerLoggerRecord.getType()).thenReturn("BrokerLogger");
when(jdbcBrokerLoggerRecord.getAttributes()).thenReturn(createAttributesMap(attributes, context));

final ConfiguredObjectRecord jdbcVirtualHostLoggerRecord = mock(ConfiguredObjectRecord.class);
when(jdbcVirtualHostLoggerRecord.getId()).thenReturn(randomUUID());
when(jdbcVirtualHostLoggerRecord.getType()).thenReturn("VirtualHostLogger");
when(jdbcVirtualHostLoggerRecord.getAttributes()).thenReturn(createAttributesMap(attributes, context));

final DurableConfigurationStore dcs =
new DurableConfigurationStoreStub(jdbcVirtualHostLoggerRecord, jdbcBrokerLoggerRecord, virtualHostRecord,
virtualHostNodeRecord, systemConfigRecord, _brokerRecord);
final BrokerStoreUpgraderAndRecoverer recoverer = new BrokerStoreUpgraderAndRecoverer(_systemConfig);
final List<ConfiguredObjectRecord> records = upgrade(dcs, recoverer);
final Map<String, String> contextMap = findCategoryRecordAndGetContext("VirtualHost", records);

records.forEach(record ->
{
final Map<String, String> upgradedContext =
(Map<String, String>) record.getAttributes().get("context");

assertNull(upgradedContext.get("qpid.jdbcstore.bonecp.partitionCount"));
assertEquals(maximumPoolSize, upgradedContext.get("qpid.jdbcstore.hikaricp.maximumPoolSize"));
assertEquals(minimumIdle, upgradedContext.get("qpid.jdbcstore.hikaricp.minimumIdle"));
assertEquals("1", upgradedContext.get("qpid.jdbcstore.property1"));
assertEquals("two", upgradedContext.get("qpid.jdbcstore.property2"));
assertEquals("_3_", upgradedContext.get("qpid.jdbcstore.property3"));
if (!"Broker".equals(record.getType()))
{
assertEquals("HIKARICP", record.getAttributes().get("connectionPoolType"));
}
});
}

@Test
public void testUpgradeFromBoneCPToHikariCPProviderWithEmptyContext()
{
_brokerRecord.getAttributes().put("modelVersion", "9.0");

final Map<String, Object> attributes = Map.of("name", getTestName(),
"type", "JDBC",
"connectionPoolType", "BONECP");
final ConfiguredObjectRecord virtualHostRecord = mock(ConfiguredObjectRecord.class);
when(virtualHostRecord.getId()).thenReturn(randomUUID());
when(virtualHostRecord.getType()).thenReturn("VirtualHost");
when(virtualHostRecord.getAttributes()).thenReturn(createAttributesMap(attributes, new HashMap<>()));

final DurableConfigurationStore dcs = new DurableConfigurationStoreStub(virtualHostRecord, _brokerRecord);
final BrokerStoreUpgraderAndRecoverer recoverer = new BrokerStoreUpgraderAndRecoverer(_systemConfig);
final List<ConfiguredObjectRecord> records = upgrade(dcs, recoverer);
final ConfiguredObjectRecord upgradedVirtualHost = records.stream()
.filter(record -> record.getType().equals("VirtualHost")).findFirst()
.orElse(null);
.filter(record -> "VirtualHost".equals(record.getType())).findFirst().orElse(null);
final Map<String, String> contextMap = findCategoryRecordAndGetContext("VirtualHost", records);

assertNotNull(upgradedVirtualHost);
assertEquals(maximumPoolSize, contextMap.get("qpid.jdbcstore.hikaricp.maximumPoolSize"));
assertEquals(minimumIdle, contextMap.get("qpid.jdbcstore.hikaricp.minimumIdle"));
assertNull(contextMap.get("qpid.jdbcstore.hikaricp.maximumPoolSize"));
assertNull(contextMap.get("qpid.jdbcstore.hikaricp.minimumIdle"));
assertEquals("HIKARICP", upgradedVirtualHost.getAttributes().get("connectionPoolType"));
}

Expand All @@ -929,12 +989,11 @@ public void testContextVariableUpgradeFromDefaultCPToHikariCPProvider()

final Map<String, Object> attributes = Map.of("name", getTestName(),
"type", "JDBC",
"connectionPoolType", "NONE",
"context", new HashMap<>());
final ConfiguredObjectRecord virtualHostRecord = mock(ConfiguredObjectRecord.class);;
"connectionPoolType", "NONE");
final ConfiguredObjectRecord virtualHostRecord = mock(ConfiguredObjectRecord.class);
when(virtualHostRecord.getId()).thenReturn(randomUUID());
when(virtualHostRecord.getType()).thenReturn("VirtualHost");
when(virtualHostRecord.getAttributes()).thenReturn(attributes);
when(virtualHostRecord.getAttributes()).thenReturn(createAttributesMap(attributes, new HashMap<>()));

final DurableConfigurationStore dcs = new DurableConfigurationStoreStub(virtualHostRecord, _brokerRecord);
final BrokerStoreUpgraderAndRecoverer recoverer = new BrokerStoreUpgraderAndRecoverer(_systemConfig);
Expand Down
Loading

0 comments on commit 86978bd

Please sign in to comment.