Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #9141] [RIP-75] Supports timer message on RocksDB #9142

Open
wants to merge 35 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 34 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
d692c56
init: Initialization preparations
Jan 13, 2025
382d846
init: Initialization preparations
Jan 13, 2025
c5905e4
init: Initialization preparations
Jan 13, 2025
69305b1
init: prepare for metric
Jan 13, 2025
cbfdfdc
init: Initialization preparations
Jan 14, 2025
0642c1d
init: Initialization preparations service
Jan 14, 2025
2fdbd7e
init: prepare for metric
Jan 14, 2025
efd3071
init: Initialization preparations service
Jan 15, 2025
1e41aa4
feat: Improve the message flow service
Jan 15, 2025
b0d6044
feat: Adaptation logic
Jan 15, 2025
8bb252c
feat: Adaptation logic
Jan 15, 2025
fc4c7a5
feat: Adaptation logic
Jan 15, 2025
6bc279a
init: prepare for metric
Jan 15, 2025
37bb349
init: prepare for metric
Jan 15, 2025
43df4ea
feat: Adaptation logic
Jan 16, 2025
ecff650
feat: Adaptation logic
Jan 16, 2025
d49f8cf
feat: Adaptation logic
Jan 16, 2025
7a858d7
feat: Adaptation logic
Jan 16, 2025
9a37312
feat: Adaptation logic
Jan 16, 2025
4b08bf9
feat: Adaptation logic
Jan 16, 2025
f29a325
feat: Adaptation logic
Jan 16, 2025
5588a3b
feat: Adaptation logic
Jan 17, 2025
4779614
feat: Adaptation logic
Jan 17, 2025
2d066ea
feat: Adaptation logic
Jan 17, 2025
20ce52a
feat: Adaptation logic
Jan 17, 2025
57bd15d
feat: Adaptation logic
Jan 17, 2025
29341ef
Merge branch 'apache:develop' into rocksdb_timer
3424672656 Jan 17, 2025
29e78c3
fix test
Jan 17, 2025
b2a5edb
fix test
Jan 17, 2025
4700b83
fix test
Jan 17, 2025
f458c23
fix test
Jan 17, 2025
92f210a
fix test
Jan 17, 2025
8f6efef
fix test
Jan 17, 2025
4823690
fix test
Jan 17, 2025
c40e210
fix test
Jan 17, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,9 @@
import org.apache.rocketmq.store.timer.TimerCheckpoint;
import org.apache.rocketmq.store.timer.TimerMessageStore;
import org.apache.rocketmq.store.timer.TimerMetrics;
import org.apache.rocketmq.store.timer.rocksdb.TimerMessageRocksDBStorage;
import org.apache.rocketmq.store.timer.rocksdb.TimerMessageRocksDBStore;
import org.rocksdb.RocksDB;

public class BrokerController {
protected static final Logger LOG = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
Expand Down Expand Up @@ -272,6 +275,7 @@ public class BrokerController {
private BrokerStats brokerStats;
private InetSocketAddress storeHost;
private TimerMessageStore timerMessageStore;
private TimerMessageRocksDBStore timerMessageRocksDBStore;
private TimerCheckpoint timerCheckpoint;
protected BrokerFastFailure brokerFastFailure;
private Configuration configuration;
Expand Down Expand Up @@ -831,13 +835,24 @@ public boolean initializeMessageStore() {
messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig, configuration);
this.messageStore = MessageStoreFactory.build(context, defaultMessageStore);
this.messageStore.getDispatcherList().addFirst(new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager));

TimerMetrics timerMetrics = new TimerMetrics(BrokerPathConfigHelper.getTimerMetricsPath(messageStoreConfig.getStorePathRootDir()));
if (messageStoreConfig.isTimerWheelEnable()) {
this.timerCheckpoint = new TimerCheckpoint(BrokerPathConfigHelper.getTimerCheckPath(messageStoreConfig.getStorePathRootDir()));
TimerMetrics timerMetrics = new TimerMetrics(BrokerPathConfigHelper.getTimerMetricsPath(messageStoreConfig.getStorePathRootDir()));
this.timerMessageStore = new TimerMessageStore(messageStore, messageStoreConfig, timerCheckpoint, timerMetrics, brokerStatsManager);
this.timerMessageStore.registerEscapeBridgeHook(msg -> escapeBridge.putMessage(msg));
this.messageStore.setTimerMessageStore(this.timerMessageStore);
}
if (messageStoreConfig.getEnableTimerMessageOnRocksDB()) {
this.timerMessageRocksDBStore = new TimerMessageRocksDBStore(messageStore, messageStoreConfig, timerMetrics, brokerStatsManager);
if (this.messageStoreConfig.isTimerWheelEnable()) {
this.messageStoreConfig.setTimerStopEnqueue(true);
}
this.messageStore.setTimerMessageRocksDBStore(this.timerMessageRocksDBStore);
this.timerMessageRocksDBStore.createTimer(RocksDB.DEFAULT_COLUMN_FAMILY);
this.timerMessageRocksDBStore.createTimer(TimerMessageRocksDBStorage.POP_COLUMN_FAMILY);
this.timerMessageRocksDBStore.createTimer(TimerMessageRocksDBStorage.TRANSACTION_COLUMN_FAMILY);
}
} catch (Exception e) {
result = false;
LOG.error("BrokerController#initialize: unexpected error occurs", e);
Expand Down Expand Up @@ -874,6 +889,9 @@ public boolean recoverAndInitService() throws CloneNotSupportedException {
result = this.messageStore.load();
}

if (messageStoreConfig.getEnableTimerMessageOnRocksDB()) {
result = result && this.timerMessageRocksDBStore.load();
}
if (messageStoreConfig.isTimerWheelEnable()) {
result = result && this.timerMessageStore.load();
}
Expand Down Expand Up @@ -1454,6 +1472,9 @@ protected void shutdownBasicService() {
if (this.timerMessageStore != null) {
this.timerMessageStore.shutdown();
}
if (this.timerMessageRocksDBStore != null) {
this.timerMessageRocksDBStore.shutdown();
}
if (this.fileWatchService != null) {
this.fileWatchService.shutdown();
}
Expand Down Expand Up @@ -1650,6 +1671,10 @@ protected void startBasicService() throws Exception {
this.timerMessageStore.start();
}

if (this.timerMessageRocksDBStore != null) {
this.timerMessageRocksDBStore.start();
}

if (this.replicasManager != null) {
this.replicasManager.start();
}
Expand Down Expand Up @@ -2578,5 +2603,7 @@ public void setColdDataCgCtrService(ColdDataCgCtrService coldDataCgCtrService) {
this.coldDataCgCtrService = coldDataCgCtrService;
}


public TimerMessageRocksDBStore getTimerMessageRocksDBStore() {
return timerMessageRocksDBStore;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2744,7 +2744,9 @@ private HashMap<String, String> prepareRuntimeInfo() throws RemotingCommandExcep
runtimeInfo.put("earliestMessageTimeStamp", String.valueOf(this.brokerController.getMessageStore().getEarliestMessageTime()));
runtimeInfo.put("startAcceptSendRequestTimeStamp", String.valueOf(this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp()));

if (this.brokerController.getMessageStoreConfig().isTimerWheelEnable()) {
if (this.brokerController.getMessageStoreConfig().getEnableTimerMessageOnRocksDB()) {
// TODO
} else if (this.brokerController.getMessageStoreConfig().isTimerWheelEnable()) {
runtimeInfo.put("timerReadBehind", String.valueOf(this.brokerController.getMessageStore().getTimerMessageStore().getDequeueBehind()));
runtimeInfo.put("timerOffsetBehind", String.valueOf(this.brokerController.getMessageStore().getTimerMessageStore().getEnqueueBehindMessages()));
runtimeInfo.put("timerCongestNum", String.valueOf(this.brokerController.getMessageStore().getTimerMessageStore().getAllCongestNum()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -662,6 +662,10 @@ public void attachRecallHandle(RemotingCommand request, MessageExt msg, SendMess
brokerController.getBrokerConfig().getBrokerName(), timestampStr, MessageClientIDSetter.getUniqID(msg));
responseHeader.setRecallHandle(recallHandle);
}

if (msg.getProperty(MessageConst.PROPERTY_TIMER_OUT_MS) != null) {
responseHeader.setDelayTime(Long.parseLong(msg.getProperty(MessageConst.PROPERTY_TIMER_OUT_MS)));
}
}

private String diskUtil() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,9 @@ private void syncTimerMetrics() {
String masterAddrBak = this.masterAddr;
if (masterAddrBak != null) {
try {
if (null != brokerController.getMessageStore().getTimerMessageStore()) {
if (null != brokerController.getMessageStore().getTimerMessageRocksDBStore()) {
// TODO
} else if (null != brokerController.getMessageStore().getTimerMessageStore()) {
TimerMetrics.TimerMetricsSerializeWrapper metricsSerializeWrapper =
this.brokerController.getBrokerOuterAPI().getTimerMetrics(masterAddrBak);
if (!brokerController.getMessageStore().getTimerMessageStore().getTimerMetrics().getDataVersion().equals(metricsSerializeWrapper.getDataVersion())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,8 @@ public static PutMessageResult handleScheduleMessage(BrokerController brokerCont
|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
if (!isRolledTimerMessage(msg)) {
if (checkIfTimerMessage(msg)) {
if (!brokerController.getMessageStoreConfig().isTimerWheelEnable()) {
if (!brokerController.getMessageStoreConfig().isTimerWheelEnable() && !brokerController.getMessageStoreConfig()
.getEnableTimerMessageOnRocksDB()) {
//wheel timer is not enabled, reject the message
return new PutMessageResult(PutMessageStatus.WHEEL_TIMER_NOT_ENABLE, null);
}
Expand Down Expand Up @@ -204,7 +205,7 @@ private static PutMessageResult transformTimerMessage(BrokerController brokerCon
deliverMs = deliverMs / timerPrecisionMs * timerPrecisionMs;
}

if (brokerController.getTimerMessageStore().isReject(deliverMs)) {
if (brokerController.getTimerMessageRocksDBStore() == null && brokerController.getTimerMessageStore().isReject(deliverMs)) {
return new PutMessageResult(PutMessageStatus.WHEEL_TIMER_FLOW_CONTROL, null);
}
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TIMER_OUT_MS, deliverMs + "");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -857,6 +857,9 @@ protected SendResult processSendResponse(
responseHeader.getMsgId(), messageQueue, responseHeader.getQueueOffset());
sendResult.setTransactionId(responseHeader.getTransactionId());
sendResult.setRecallHandle(responseHeader.getRecallHandle());
if (responseHeader.getDelayTime() != null) {
sendResult.setDelayTime(responseHeader.getDelayTime());
}
String regionId = response.getExtFields().get(MessageConst.PROPERTY_MSG_REGION);
if (regionId == null || regionId.isEmpty()) {
regionId = MixAll.DEFAULT_TRACE_REGION_ID;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public class SendResult {
private boolean traceOn = true;
private byte[] rawRespBody;
private String recallHandle;
private long delayTime;

public SendResult() {
}
Expand Down Expand Up @@ -135,10 +136,18 @@ public void setRecallHandle(String recallHandle) {
this.recallHandle = recallHandle;
}

public long getDelayTime() {
return delayTime;
}

public void setDelayTime(long delayTime) {
this.delayTime = delayTime;
}

@Override
public String toString() {
return "SendResult [sendStatus=" + sendStatus + ", msgId=" + msgId + ", offsetMsgId=" + offsetMsgId + ", messageQueue=" + messageQueue
+ ", queueOffset=" + queueOffset + ", recallHandle=" + recallHandle + "]";
+ ", queueOffset=" + queueOffset + ", recallHandle=" + recallHandle + ", delayTime=" + delayTime + "]";
}

public void setRawRespBody(byte[] body) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ public class MessageConst {
public static final String PROPERTY_TIMER_ROLL_TIMES = "TIMER_ROLL_TIMES";
public static final String PROPERTY_TIMER_OUT_MS = "TIMER_OUT_MS";
public static final String PROPERTY_TIMER_DEL_UNIQKEY = "TIMER_DEL_UNIQKEY";
public static final String PROPERTY_TIMER_DEL_FLAG = "TIMER_DEL_FLAG";
public static final String PROPERTY_TIMER_DELAY_LEVEL = "TIMER_DELAY_LEVEL";
public static final String PROPERTY_TIMER_DELAY_MS = "TIMER_DELAY_MS";
public static final String PROPERTY_CRC32 = "__CRC32#";
Expand Down Expand Up @@ -151,6 +152,7 @@ public class MessageConst {
STRING_HASH_SET.add(PROPERTY_TIMER_ROLL_TIMES);
STRING_HASH_SET.add(PROPERTY_TIMER_OUT_MS);
STRING_HASH_SET.add(PROPERTY_TIMER_DEL_UNIQKEY);
STRING_HASH_SET.add(PROPERTY_TIMER_DEL_FLAG);
STRING_HASH_SET.add(PROPERTY_TIMER_DELAY_LEVEL);
STRING_HASH_SET.add(PROPERTY_BORN_HOST);
STRING_HASH_SET.add(PROPERTY_BORN_TIMESTAMP);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public class SendMessageResponseHeader implements CommandCustomHeader, FastCodes
private String transactionId;
private String batchUniqId;
private String recallHandle;
private Long delayTime;

@Override
public void checkFields() throws RemotingCommandException {
Expand All @@ -50,6 +51,7 @@ public void encode(ByteBuf out) {
writeIfNotNull(out, "transactionId", transactionId);
writeIfNotNull(out, "batchUniqId", batchUniqId);
writeIfNotNull(out, "recallHandle", recallHandle);
writeIfNotNull(out, "delayTime", delayTime);
}

@Override
Expand Down Expand Up @@ -83,6 +85,11 @@ public void decode(HashMap<String, String> fields) throws RemotingCommandExcepti
if (str != null) {
this.recallHandle = str;
}

str = fields.get("delayTime");
if (str != null) {
this.delayTime = Long.parseLong(str);
}
}

public String getMsgId() {
Expand Down Expand Up @@ -132,4 +139,12 @@ public String getRecallHandle() {
public void setRecallHandle(String recallHandle) {
this.recallHandle = recallHandle;
}

public Long getDelayTime() {
return delayTime;
}

public void setDelayTime(Long delayTime) {
this.delayTime = delayTime;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@
import org.apache.rocketmq.store.queue.ReferredIterator;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
import org.apache.rocketmq.store.timer.TimerMessageStore;
import org.apache.rocketmq.store.timer.rocksdb.TimerMessageRocksDBStore;
import org.apache.rocketmq.store.util.PerfCounter;
import org.rocksdb.RocksDBException;

Expand Down Expand Up @@ -166,6 +167,7 @@ public class DefaultMessageStore implements MessageStore {

protected StoreCheckpoint storeCheckpoint;
private TimerMessageStore timerMessageStore;
private TimerMessageRocksDBStore timerMessageRocksDBStore;

private final LinkedList<CommitLogDispatcher> dispatcherList;

Expand Down Expand Up @@ -1028,6 +1030,16 @@ public void setTimerMessageStore(TimerMessageStore timerMessageStore) {
this.timerMessageStore = timerMessageStore;
}

@Override
public TimerMessageRocksDBStore getTimerMessageRocksDBStore() {
return this.timerMessageRocksDBStore;
}

@Override
public void setTimerMessageRocksDBStore(TimerMessageRocksDBStore timerMessageRocksDBStore) {
this.timerMessageRocksDBStore = timerMessageRocksDBStore;
}

@Override
public long getCommitLogOffsetInQueue(String topic, int queueId, long consumeQueueOffset) {
ConsumeQueueInterface consumeQueue = getConsumeQueue(topic, queueId);
Expand Down Expand Up @@ -1921,6 +1933,9 @@ private void recover(final boolean lastExitOK) throws RocksDBException {

@Override
public long getTimingMessageCount(String topic) {
if (timerMessageRocksDBStore != null) {
return timerMessageRocksDBStore.getTimerMetrics().getTimingCount(topic);
}
if (null == timerMessageStore) {
return 0L;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.rocketmq.store.queue.ConsumeQueueStoreInterface;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
import org.apache.rocketmq.store.timer.TimerMessageStore;
import org.apache.rocketmq.store.timer.rocksdb.TimerMessageRocksDBStore;
import org.apache.rocketmq.store.util.PerfCounter;
import org.rocksdb.RocksDBException;
import io.opentelemetry.api.common.AttributesBuilder;
Expand Down Expand Up @@ -208,6 +209,10 @@ CompletableFuture<GetMessageResult> getMessageAsync(final String group, final St

void setTimerMessageStore(TimerMessageStore timerMessageStore);

TimerMessageRocksDBStore getTimerMessageRocksDBStore();

void setTimerMessageRocksDBStore(TimerMessageRocksDBStore timerMessageRocksDBStore);

/**
* Get the offset of the message in the commit log, which is also known as physical offset.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,20 @@ public void setRocksdbCompressionType(String compressionType) {
**/
private boolean useABSLock = false;

/**
* Maximum number of messages to be read each time
* -1 : read all messages
*/
private int readCountTimerOnRocksDB = -1;

/**
* When enabled, the scheduled task is started.
* if time wheel is enabled, the time wheel only the correct ones are supported
* The message will be written to rocksdb.
* Close the time wheel when the file timing message is 0
*/
private boolean enableTimerMessageOnRocksDB = false;

public boolean isRocksdbCQDoubleWriteEnable() {
return rocksdbCQDoubleWriteEnable;
}
Expand Down Expand Up @@ -1950,4 +1964,20 @@ public void setUseABSLock(boolean useABSLock) {
public boolean getUseABSLock() {
return useABSLock;
}

public void setReadCountTimerOnRocksDB(int readCountTimerOnRocksDB) {
this.readCountTimerOnRocksDB = readCountTimerOnRocksDB;
}

public int getReadCountTimerOnRocksDB() {
return readCountTimerOnRocksDB;
}

public void setEnableTimerMessageOnRocksDB(boolean enableTimerMessageOnRocksDB) {
this.enableTimerMessageOnRocksDB = enableTimerMessageOnRocksDB;
}

public boolean getEnableTimerMessageOnRocksDB() {
return enableTimerMessageOnRocksDB;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,9 @@ public static void init(Meter meter, Supplier<AttributesBuilder> attributesBuild
measurement.record(System.currentTimeMillis() - earliestMessageTime, newAttributesBuilder().build());
});

if (messageStore.getMessageStoreConfig().isTimerWheelEnable()) {
if (messageStore.getMessageStoreConfig().getEnableTimerMessageOnRocksDB()) {
// TODO add timer metrics
} else if (messageStore.getMessageStoreConfig().isTimerWheelEnable()) {
timerEnqueueLag = meter.gaugeBuilder(GAUGE_TIMER_ENQUEUE_LAG)
.setDescription("Timer enqueue messages lag")
.ofLongs()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.apache.rocketmq.store.queue.ConsumeQueueStoreInterface;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
import org.apache.rocketmq.store.timer.TimerMessageStore;
import org.apache.rocketmq.store.timer.rocksdb.TimerMessageRocksDBStore;
import org.apache.rocketmq.store.util.PerfCounter;
import org.rocksdb.RocksDBException;

Expand Down Expand Up @@ -628,6 +629,16 @@ public void setTimerMessageStore(TimerMessageStore timerMessageStore) {
next.setTimerMessageStore(timerMessageStore);
}

@Override
public TimerMessageRocksDBStore getTimerMessageRocksDBStore() {
return next.getTimerMessageRocksDBStore();
}

@Override
public void setTimerMessageRocksDBStore(TimerMessageRocksDBStore timerMessageRocksDBStore) {
next.setTimerMessageRocksDBStore(timerMessageRocksDBStore);
}

@Override
public long getTimingMessageCount(String topic) {
return next.getTimingMessageCount(topic);
Expand Down
Loading
Loading