Skip to content

Commit

Permalink
QPID-8666: [Broker-J] Broker plugin jdbc-provider-bone replacement
Browse files Browse the repository at this point in the history
  • Loading branch information
dakirily committed Feb 1, 2024
1 parent f21bf46 commit e1d481a
Show file tree
Hide file tree
Showing 7 changed files with 296 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,12 @@ public final class BrokerModel extends Model
*
* 9.0
* Introduced PublishProducer as a child of Exchange and PointToPointProducer as child of Queue
*
* 9.1
* Replaced JDBC connection pool provider from BoneCP to HikariCP
*/
public static final int MODEL_MAJOR_VERSION = 9;
public static final int MODEL_MINOR_VERSION = 0;
public static final int MODEL_MINOR_VERSION = 1;
public static final String MODEL_VERSION = MODEL_MAJOR_VERSION + "." + MODEL_MINOR_VERSION;
private static final Model MODEL_INSTANCE = new BrokerModel();
private final Map<Class<? extends ConfiguredObject>, Class<? extends ConfiguredObject>> _parents = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@

abstract class AbstractConfigurationStoreUpgraderAndRecoverer
{
protected static final String BROKER = "Broker";
protected static final String VIRTUALHOST = "VirtualHost";
protected static final String JDBC_VIRTUALHOST_TYPE = "JDBC";
protected static final String CONTEXT = "context";

private final Map<String, StoreUpgraderPhase> _upgraders = new HashMap<>();
private final String _initialVersion;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public BrokerStoreUpgraderAndRecoverer(SystemConfig<?> systemConfig)
register(new Upgrader_7_0_to_7_1());
register(new Upgrader_7_1_to_8_0());
register(new Upgrader_8_0_to_9_0());
register(new Upgrader_9_0_to_9_1());
}

private static final class Upgrader_1_0_to_1_1 extends StoreUpgraderPhase
Expand Down Expand Up @@ -761,6 +762,47 @@ public void complete()
}
}

private static class Upgrader_9_0_to_9_1 extends StoreUpgraderPhase
{
public Upgrader_9_0_to_9_1()
{
super("modelVersion", "9.0", "9.1");
}

@Override
public void configuredObject(final ConfiguredObjectRecord record)
{
if (BROKER.equals(record.getType()))
{
upgradeRootRecord(record);
}

final Map<String, Object> attributes = record.getAttributes();

if (attributes == null)
{
return;
}

if (!(VIRTUALHOST.equals(record.getType()) && JDBC_VIRTUALHOST_TYPE.equals(attributes.get("type"))))
{
return;
}

if (attributes.containsKey(CONTEXT))
{
final ConfiguredObjectRecord updatedRecord = UpgraderHelper.upgradeConnectionPool(record);
getUpdateMap().put(updatedRecord.getId(), updatedRecord);
}
}

@Override
public void complete()
{

}
}

private static class VirtualHostEntryUpgrader
{
Map<String, AttributesTransformer> _messageStoreToNodeTransformers = Map.of("DERBY", new AttributesTransformer().
Expand Down Expand Up @@ -805,10 +847,11 @@ private static class VirtualHostEntryUpgrader
addAttributeTransformer("jdbcBytesForBlob", addContextVar("qpid.jdbcstore.useBytesForBlob")).
addAttributeTransformer("jdbcBlobType", addContextVar("qpid.jdbcstore.blobType")).
addAttributeTransformer("jdbcVarbinaryType", addContextVar("qpid.jdbcstore.varBinaryType")).
addAttributeTransformer("maximumPoolSize",
addContextVar("qpid.jdbcstore.hikaricp.maximumPoolSize")).
addAttributeTransformer("minimumIdle",
addContextVar("qpid.jdbcstore.hikaricp.minimumIdle")),
addAttributeTransformer("partitionCount", addContextVar("qpid.jdbcstore.bonecp.partitionCount")).
addAttributeTransformer("maxConnectionsPerPartition",
addContextVar("qpid.jdbcstore.bonecp.maxConnectionsPerPartition")).
addAttributeTransformer("minConnectionsPerPartition",
addContextVar("qpid.jdbcstore.bonecp.minConnectionsPerPartition")),
"BDB_HA", new AttributesTransformer().
addAttributeTransformer("id", copyAttribute()).
addAttributeTransformer("createdTime", copyAttribute()).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,22 @@

public class UpgraderHelper
{
static final String CONTEXT = "context";

static final String CP_TYPE = "connectionPoolType";
static final String BONECP = "BONECP";
static final String HIKARICP = "HIKARICP";

static final String PARTITION_COUNT_PARAM = "qpid.jdbcstore.bonecp.partitionCount";
static final String MAX_POOL_SIZE_OLD_PARAM = "qpid.jdbcstore.bonecp.maxConnectionsPerPartition";
static final String MIN_IDLE_OLD_PARAM = "qpid.jdbcstore.bonecp.minConnectionsPerPartition";

static final String MAX_POOL_SIZE_PARAM = "qpid.jdbcstore.hikaricp.maximumPoolSize";
static final String MIN_IDLE_PARAM = "qpid.jdbcstore.hikaricp.minimumIdle";

static final Map<String, String> RENAME_MAPPING = Map.of(MAX_POOL_SIZE_OLD_PARAM, MAX_POOL_SIZE_PARAM,
MIN_IDLE_OLD_PARAM, MIN_IDLE_PARAM);

public static final Map<String, String> MODEL9_MAPPING_FOR_RENAME_TO_ALLOW_DENY_CONTEXT_VARIABLES = new HashMap<>();
static
{
Expand Down Expand Up @@ -57,4 +73,38 @@ public static Map<String, String> reverse(Map<String, String> map)
{
return map.entrySet().stream().collect(Collectors.toMap(Map.Entry::getValue, Map.Entry::getKey));
}

/** Upgrades connection pool from BoneCP to HikariCP (model version 9.0 to 9.1) */
public static ConfiguredObjectRecord upgradeConnectionPool(final ConfiguredObjectRecord record)
{
final Map<String, Object> attributes = record.getAttributes();

final Object contextObject = attributes.get(CONTEXT);

if (contextObject instanceof Map)
{
final Map <String, String> context = (Map<String, String>) contextObject;
final Map<String, String> newContext = UpgraderHelper.renameContextVariables(context, RENAME_MAPPING);

if (BONECP.equals(attributes.get(CP_TYPE)))
{
final int partitionCount = newContext.get(PARTITION_COUNT_PARAM) != null
? Integer.parseInt(newContext.remove(PARTITION_COUNT_PARAM)) : 0;
final int maximumPoolSize = newContext.get(MAX_POOL_SIZE_PARAM) != null && partitionCount != 0
? Integer.parseInt(newContext.get(MAX_POOL_SIZE_PARAM)) * partitionCount : 40;
final int minIdle = newContext.get(MIN_IDLE_PARAM) != null && partitionCount != 0
? Integer.parseInt(newContext.get(MIN_IDLE_PARAM)) * partitionCount : 20;
newContext.put(MAX_POOL_SIZE_PARAM, String.valueOf(maximumPoolSize));
newContext.put(MIN_IDLE_PARAM, String.valueOf(minIdle));
}
final Map<String, Object> updatedAttributes = new HashMap<>(record.getAttributes());
if (BONECP.equals(attributes.get(CP_TYPE)))
{
updatedAttributes.put(CP_TYPE, HIKARICP);
}
updatedAttributes.put(CONTEXT, newContext);
return new ConfiguredObjectRecordImpl(record.getId(), record.getType(), updatedAttributes, record.getParents());
}
return record;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ public VirtualHostStoreUpgraderAndRecoverer(VirtualHostNode<?> virtualHostNode)
register(new Upgrader_7_0_to_7_1());
register(new Upgrader_7_1_to_8_0());
register(new Upgrader_8_0_to_9_0());
register(new Upgrader_9_0_to_9_1());

Map<String, UUID> defaultExchangeIds = new HashMap<>();
for (String exchangeName : DEFAULT_EXCHANGES.keySet())
Expand Down Expand Up @@ -1109,6 +1110,42 @@ public void complete()
}
}

private static class Upgrader_9_0_to_9_1 extends StoreUpgraderPhase
{
public Upgrader_9_0_to_9_1()
{
super("modelVersion", "9.0", "9.1");
}

@Override
public void configuredObject(final ConfiguredObjectRecord record)
{
final Map<String, Object> attributes = record.getAttributes();

if (attributes == null)
{
return;
}

if (!(VIRTUALHOST.equals(record.getType()) && JDBC_VIRTUALHOST_TYPE.equals(attributes.get("type"))))
{
return;
}

if (attributes.containsKey(CONTEXT))
{
final ConfiguredObjectRecord updatedRecord = UpgraderHelper.upgradeConnectionPool(record);
getUpdateMap().put(updatedRecord.getId(), updatedRecord);
}
}

@Override
public void complete()
{

}
}

public boolean upgradeAndRecover(final DurableConfigurationStore durableConfigurationStore,
final ConfiguredObjectRecord... initialRecords)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
import com.google.common.collect.ImmutableMap;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;

import org.apache.qpid.server.configuration.updater.CurrentThreadTaskExecutor;
import org.apache.qpid.server.configuration.updater.TaskExecutor;
Expand Down Expand Up @@ -90,17 +92,18 @@ public void setUp() throws Exception
}

@Test
public void testUpgradeVirtualHostWithJDBCStoreAndHikariCPPool()
public void testUpgradeVirtualHostWithJDBCStoreAndBoneCPPool()
{
final Map<String, Object> hostAttributes = ImmutableMap.<String, Object>builder()
.put("name", VIRTUALHOST_NAME)
.put("modelVersion", "0.4")
.put("connectionPool", "HIKARICP")
.put("connectionPool", "BONECP")
.put("connectionURL", "jdbc:derby://localhost:1527/tmp/vh/test;create=true")
.put("createdBy", VIRTUALHOST_CREATED_BY)
.put("createdTime", VIRTUALHOST_CREATE_TIME)
.put("maximumPoolSize", 7)
.put("minimumIdle", 6)
.put("maxConnectionsPerPartition", 7)
.put("minConnectionsPerPartition", 6)
.put("partitionCount", 2)
.put("storeType", "jdbc")
.put("type", "STANDARD")
.put("jdbcBigIntType", "mybigint")
Expand All @@ -121,9 +124,10 @@ public void testUpgradeVirtualHostWithJDBCStoreAndHikariCPPool()
"qpid.jdbcstore.varBinaryType", "myvarbinary",
"qpid.jdbcstore.blobType", "myblob",
"qpid.jdbcstore.useBytesForBlob", true,
"qpid.jdbcstore.hikaricp.maximumPoolSize", 7,
"qpid.jdbcstore.hikaricp.minimumIdle", 6);
final Map<String,Object> expectedAttributes = Map.of("connectionPoolType", "HIKARICP",
"qpid.jdbcstore.bonecp.maxConnectionsPerPartition", 7,
"qpid.jdbcstore.bonecp.minConnectionsPerPartition", 6,
"qpid.jdbcstore.bonecp.partitionCount", 2);
final Map<String,Object> expectedAttributes = Map.of("connectionPoolType", "BONECP",
"connectionUrl", "jdbc:derby://localhost:1527/tmp/vh/test;create=true",
"createdBy", VIRTUALHOST_CREATED_BY,
"createdTime", VIRTUALHOST_CREATE_TIME,
Expand Down Expand Up @@ -877,6 +881,71 @@ public void testContextVariableUpgradeForTLSCipherSuitesSetOnAuthenticationProvi
assertEquals("Ssl.*", contextMap.get(CommonProperties.QPID_SECURITY_TLS_CIPHER_SUITE_DENY_LIST));
}

@ParameterizedTest
@CsvSource(value =
{
"4,20,5,80,20", "0,20,5,40,20", "null,20,5,40,20", "4,null,null,40,20", "null,null,null,40,20"
}, nullValues = { "null" })
public void testContextVariableUpgradeFromBoneCPToHikariCPProvider(final String partitionCount,
final String maxConnectionsPerPartition,
final String minConnectionsPerPartition,
final String maximumPoolSize,
final String minimumIdle)
{
_brokerRecord.getAttributes().put("modelVersion", "9.0");

final Map<String, String> context = new HashMap<>();
context.put("qpid.jdbcstore.bonecp.partitionCount", partitionCount);
context.put("qpid.jdbcstore.bonecp.maxConnectionsPerPartition", maxConnectionsPerPartition);
context.put("qpid.jdbcstore.bonecp.minConnectionsPerPartition", minConnectionsPerPartition);
final Map<String, Object> attributes = Map.of("name", getTestName(),
"type", "JDBC",
"connectionPoolType", "BONECP",
"context", context);
final ConfiguredObjectRecord virtualHostRecord = mock(ConfiguredObjectRecord.class);;
when(virtualHostRecord.getId()).thenReturn(randomUUID());
when(virtualHostRecord.getType()).thenReturn("VirtualHost");
when(virtualHostRecord.getAttributes()).thenReturn(attributes);

final DurableConfigurationStore dcs = new DurableConfigurationStoreStub(virtualHostRecord, _brokerRecord);
final BrokerStoreUpgraderAndRecoverer recoverer = new BrokerStoreUpgraderAndRecoverer(_systemConfig);
final List<ConfiguredObjectRecord> records = upgrade(dcs, recoverer);
final Map<String, String> contextMap = findCategoryRecordAndGetContext("VirtualHost", records);

final ConfiguredObjectRecord upgradedVirtualHost = records.stream()
.filter(record -> record.getType().equals("VirtualHost")).findFirst()
.orElse(null);

assertNotNull(upgradedVirtualHost);
assertEquals(maximumPoolSize, contextMap.get("qpid.jdbcstore.hikaricp.maximumPoolSize"));
assertEquals(minimumIdle, contextMap.get("qpid.jdbcstore.hikaricp.minimumIdle"));
assertEquals("HIKARICP", upgradedVirtualHost.getAttributes().get("connectionPoolType"));
}

@Test
public void testContextVariableUpgradeFromDefaultCPToHikariCPProvider()
{
_brokerRecord.getAttributes().put("modelVersion", "9.0");

final Map<String, Object> attributes = Map.of("name", getTestName(),
"type", "JDBC",
"connectionPoolType", "NONE",
"context", new HashMap<>());
final ConfiguredObjectRecord virtualHostRecord = mock(ConfiguredObjectRecord.class);;
when(virtualHostRecord.getId()).thenReturn(randomUUID());
when(virtualHostRecord.getType()).thenReturn("VirtualHost");
when(virtualHostRecord.getAttributes()).thenReturn(attributes);

final DurableConfigurationStore dcs = new DurableConfigurationStoreStub(virtualHostRecord, _brokerRecord);
final BrokerStoreUpgraderAndRecoverer recoverer = new BrokerStoreUpgraderAndRecoverer(_systemConfig);
final List<ConfiguredObjectRecord> records = upgrade(dcs, recoverer);
final Map<String, String> contextMap = findCategoryRecordAndGetContext("VirtualHost", records);

assertNull(contextMap.get("qpid.jdbcstore.hikaricp.maximumPoolSize"));
assertNull(contextMap.get("qpid.jdbcstore.hikaricp.minimumIdle"));
assertEquals("NONE", virtualHostRecord.getAttributes().get("connectionPoolType"));
}

private ConfiguredObjectRecord createMockRecordForGivenCategoryTypeAndContext(final String category,
final String type,
final Map<String, String> context)
Expand Down
Loading

0 comments on commit e1d481a

Please sign in to comment.