Skip to content

Commit

Permalink
fix: debug and fix cluster issues (#326)
Browse files Browse the repository at this point in the history
Signed-off-by: Li Zhanhui <[email protected]>
  • Loading branch information
lizhanhui authored Oct 17, 2023
1 parent 438c6e6 commit 1f7f3c2
Show file tree
Hide file tree
Showing 12 changed files with 99 additions and 21 deletions.
2 changes: 1 addition & 1 deletion broker/src/main/resources/log4j2.xml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
</Appenders>

<Loggers>
<Root level="info">
<Root level="debug">
<AppenderRef ref="rollingFile"/>
</Root>
</Loggers>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public interface ControllerConfig {
void setEpoch(long epoch);

default int scanIntervalInSecs() {
return 3;
return 30;
}

default int leaseLifeSpanInSecs() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ public void unregisterNode(NodeUnregistrationRequest request,
@Override
public void heartbeat(HeartbeatRequest request,
StreamObserver<HeartbeatReply> responseObserver) {
LOGGER.trace("Received HeartbeatRequest {}", TextFormat.shortDebugString(request));
LOGGER.debug("Received HeartbeatRequest {}", TextFormat.shortDebugString(request));

metadataStore.keepAlive(request.getId(), request.getEpoch(), request.getGoingAway());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,16 @@
import com.automq.rocketmq.common.config.ControllerConfig;
import com.automq.rocketmq.controller.metadata.database.dao.Node;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Node with runtime information.
*/
public class BrokerNode {

private static final Logger LOGGER = LoggerFactory.getLogger(BrokerNode.class);

private final Node node;

private long lastKeepAlive;
Expand All @@ -38,6 +43,7 @@ public BrokerNode(Node node) {

public void keepAlive(long epoch, boolean goingAway) {
if (epoch < node.getEpoch()) {
LOGGER.warn("Heartbeat epoch={} is deprecated, known epoch={}", epoch, node.getEpoch());
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,7 @@ public interface ControllerClient extends Closeable {

CompletableFuture<Topic> describeTopic(String target, Long topicId, String topicName);

CompletableFuture<Void> heartbeat(String target, int nodeId, long epoch,
boolean goingAway) throws ControllerException;
CompletableFuture<Void> heartbeat(String target, int nodeId, long epoch, boolean goingAway);

CompletableFuture<Void> reassignMessageQueue(String target, long topicId, int queueId, int dstNodeId);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,10 +265,14 @@ public void onFailure(@Nonnull Throwable t) {
}

@Override
public CompletableFuture<Void> heartbeat(String target, int nodeId, long epoch,
boolean goingAway) throws ControllerException {
buildStubForTarget(target);

public CompletableFuture<Void> heartbeat(String target, int nodeId, long epoch, boolean goingAway) {

try {
buildStubForTarget(target);
} catch (ControllerException e) {
return CompletableFuture.failedFuture(e);
}

ControllerServiceGrpc.ControllerServiceFutureStub stub = stubs.get(target);
HeartbeatRequest request = HeartbeatRequest
.newBuilder()
Expand Down Expand Up @@ -330,7 +334,8 @@ public void onFailure(@Nonnull Throwable t) {
}

@Override
public CompletableFuture<Void> notifyQueueClose(String target, long topicId, int queueId) throws ControllerException {
public CompletableFuture<Void> notifyQueueClose(String target, long topicId,
int queueId) throws ControllerException {
CompletableFuture<Void> future = new CompletableFuture<>();

NotifyMessageQueuesAssignableRequest request = NotifyMessageQueuesAssignableRequest.newBuilder()
Expand Down Expand Up @@ -542,7 +547,8 @@ public void onFailure(@Nonnull Throwable t) {
}

@Override
public CompletableFuture<CommitStreamObjectReply> commitStreamObject(String target, CommitStreamObjectRequest request) {
public CompletableFuture<CommitStreamObjectReply> commitStreamObject(String target,
CommitStreamObjectRequest request) {
CompletableFuture<CommitStreamObjectReply> future = new CompletableFuture<>();
try {
Futures.addCallback(this.buildStubForTarget(target).commitStreamObject(request), new FutureCallback<>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,21 @@ public interface MetadataStore extends Closeable {
*/
void registerCurrentNode(String name, String address, String instanceId) throws ControllerException;

/**
* If the node is a leader, it should keep the sending node alive once it receives heartbeat requests
* from it.
*
* @param nodeId Heartbeat sender node-id
* @param epoch Epoch of the node
* @param goingAway Flag if the node is going away shortly
*/
void keepAlive(int nodeId, long epoch, boolean goingAway);

/**
* Send heartbeat request to leader to keep current node alive.
*/
void heartbeat();

CompletableFuture<Long> createTopic(String topicName, int queueNum,
List<MessageType> acceptMessageTypesList) throws ControllerException;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@
import com.automq.rocketmq.controller.metadata.database.mapper.SequenceMapper;
import com.automq.rocketmq.controller.metadata.database.mapper.StreamMapper;
import com.automq.rocketmq.controller.metadata.database.mapper.TopicMapper;
import com.automq.rocketmq.controller.tasks.KeepAliveTask;
import com.automq.rocketmq.controller.tasks.HeartbeatTask;
import com.automq.rocketmq.controller.tasks.LeaseTask;
import com.automq.rocketmq.controller.tasks.ReclaimS3ObjectTask;
import com.automq.rocketmq.controller.tasks.RecycleGroupTask;
Expand Down Expand Up @@ -220,7 +220,7 @@ public void start() {
config.scanIntervalInSecs(), TimeUnit.SECONDS);
this.scheduledExecutorService.scheduleWithFixedDelay(new SchedulerTask(this), 1,
config.scanIntervalInSecs(), TimeUnit.SECONDS);
this.scheduledExecutorService.scheduleAtFixedRate(new KeepAliveTask(this), 3,
this.scheduledExecutorService.scheduleAtFixedRate(new HeartbeatTask(this), 3,
Math.max(config().nodeAliveIntervalInSecs() / 2, 10), TimeUnit.SECONDS);
this.scheduledExecutorService.scheduleWithFixedDelay(new RecycleTopicTask(this), 1,
config.deletedTopicLingersInSecs(), TimeUnit.SECONDS);
Expand Down Expand Up @@ -314,12 +314,37 @@ public void registerCurrentNode(String name, String address, String instanceId)

@Override
public void keepAlive(int nodeId, long epoch, boolean goingAway) {
if (!isLeader()) {
LOGGER.warn("Non-leader node cannot keep the node[node-id={}, epoch={}] alive", nodeId, epoch);
return;
}
BrokerNode brokerNode = nodes.get(nodeId);
if (null != brokerNode) {
brokerNode.keepAlive(epoch, goingAway);
}
}

@Override
public void heartbeat() {
if (isLeader()) {
LOGGER.debug("Node of leader does not need to send heartbeat request");
}

try {
String target = leaderAddress();
controllerClient.heartbeat(target, config.nodeId(), config.epoch(), false)
.whenComplete((r, e) -> {
if (null != e) {
LOGGER.error("Failed to maintain heartbeat to {}", target);
return;
}
LOGGER.debug("Heartbeat to {} OK", target);
});
} catch (ControllerException e) {
LOGGER.error("Failed to send heartbeat to leader node", e);
}
}

public boolean maintainLeadershipWithSharedLock(SqlSession session) {
LeaseMapper leaseMapper = session.getMapper(LeaseMapper.class);
Lease current = leaseMapper.currentWithShareLock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ public void setProperties(Properties properties) {
if (null == dataSource) {
HikariConfig config = new HikariConfig(properties);
config.setMaximumPoolSize(10);
config.setLeakDetectionThreshold(20000);
config.setMaxLifetime(30000);
dataSource = new HikariDataSource(config);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,15 @@
import com.automq.rocketmq.controller.exception.ControllerException;
import com.automq.rocketmq.controller.metadata.MetadataStore;

public class KeepAliveTask extends ControllerTask {
public KeepAliveTask(MetadataStore metadataStore) {
public class HeartbeatTask extends ControllerTask {
public HeartbeatTask(MetadataStore metadataStore) {
super(metadataStore);
}

@Override
public void process() throws ControllerException {
int nodeId = metadataStore.config().nodeId();
if (!metadataStore.isLeader()) {
metadataStore.keepAlive(nodeId, metadataStore.config().epoch(), false);
metadataStore.heartbeat();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,28 @@
import com.automq.rocketmq.controller.metadata.MetadataStore;
import com.automq.rocketmq.controller.metadata.database.dao.Node;
import com.automq.rocketmq.controller.metadata.database.mapper.NodeMapper;
import com.google.gson.Gson;
import java.util.List;
import org.apache.ibatis.session.SqlSession;

public class ScanNodeTask extends ScanTask {

private final Gson gson;

public ScanNodeTask(MetadataStore metadataStore) {
super(metadataStore);
gson = new Gson();
}

@Override
public void process() throws ControllerException {
try (SqlSession session = this.metadataStore.openSession()) {
NodeMapper mapper = session.getMapper(NodeMapper.class);
List<Node> nodes = mapper.list(this.lastScanTime);
if (!nodes.isEmpty()) {
LOGGER.debug("Found {} broker nodes", nodes.size());
if (!nodes.isEmpty() && LOGGER.isDebugEnabled()) {
for (Node node : nodes) {
LOGGER.debug("Found broker node: {}", gson.toJson(node));
}
}
updateBrokers(nodes);
}
Expand Down
26 changes: 26 additions & 0 deletions distribution/conf/broker-2.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
name: DefaultBroker2
instanceId: "ec2-test-2"
bindAddress: "0.0.0.0:8081"
s3Stream:
s3WALPath: "/tmp/s3rocketmq/wal"
s3Endpoint: "http://minio.hellocorp.test"
s3Bucket: "lzh"
s3Region: "us-east-1"
s3ForcePathStyle: true
s3AccessKey: "mcFoJPLvxNmRq497dhK0"
s3SecretKey: "UU7PE0VOo3fRtqYEBeIIQzOSGFT3GctuiLoZg2hY"
db:
url: "jdbc:mysql://10.1.0.110:3306/rocketmq"
userName: "root"
password: "password"
metrics:
exporterType: "PROM"
grpcExporterTarget: ""
grpcExporterHeader: ""
grpcExporterTimeOutInMills: 31000
grpcExporterIntervalInMills: 60000
promExporterPort: 5557
promExporterHost: "localhost"
loggingExporterIntervalInMills: 10000
labels: ""
exportInDelta: false

0 comments on commit 1f7f3c2

Please sign in to comment.