From 1f7f3c2ad4f94fdb0c759fbf44aa29be74b8d143 Mon Sep 17 00:00:00 2001 From: Zhanhui Li Date: Tue, 17 Oct 2023 16:29:41 +0800 Subject: [PATCH] fix: debug and fix cluster issues (#326) Signed-off-by: Li Zhanhui --- broker/src/main/resources/log4j2.xml | 2 +- .../common/config/ControllerConfig.java | 2 +- .../controller/ControllerServiceImpl.java | 2 +- .../controller/metadata/BrokerNode.java | 6 ++++ .../controller/metadata/ControllerClient.java | 3 +- .../metadata/GrpcControllerClient.java | 18 ++++++++---- .../controller/metadata/MetadataStore.java | 13 +++++++++ .../database/DefaultMetadataStore.java | 29 +++++++++++++++++-- .../database/HikariCPDataSourceFactory.java | 2 -- ...{KeepAliveTask.java => HeartbeatTask.java} | 7 ++--- .../controller/tasks/ScanNodeTask.java | 10 +++++-- distribution/conf/broker-2.yaml | 26 +++++++++++++++++ 12 files changed, 99 insertions(+), 21 deletions(-) rename controller/src/main/java/com/automq/rocketmq/controller/tasks/{KeepAliveTask.java => HeartbeatTask.java} (82%) create mode 100644 distribution/conf/broker-2.yaml diff --git a/broker/src/main/resources/log4j2.xml b/broker/src/main/resources/log4j2.xml index 27ba58ae4..a1be06c6f 100644 --- a/broker/src/main/resources/log4j2.xml +++ b/broker/src/main/resources/log4j2.xml @@ -30,7 +30,7 @@ - + diff --git a/common/src/main/java/com/automq/rocketmq/common/config/ControllerConfig.java b/common/src/main/java/com/automq/rocketmq/common/config/ControllerConfig.java index 6200b11ab..c89733b66 100644 --- a/common/src/main/java/com/automq/rocketmq/common/config/ControllerConfig.java +++ b/common/src/main/java/com/automq/rocketmq/common/config/ControllerConfig.java @@ -36,7 +36,7 @@ public interface ControllerConfig { void setEpoch(long epoch); default int scanIntervalInSecs() { - return 3; + return 30; } default int leaseLifeSpanInSecs() { diff --git a/controller/src/main/java/com/automq/rocketmq/controller/ControllerServiceImpl.java b/controller/src/main/java/com/automq/rocketmq/controller/ControllerServiceImpl.java index 1c8246ca3..1c77f01b9 100644 --- a/controller/src/main/java/com/automq/rocketmq/controller/ControllerServiceImpl.java +++ b/controller/src/main/java/com/automq/rocketmq/controller/ControllerServiceImpl.java @@ -129,7 +129,7 @@ public void unregisterNode(NodeUnregistrationRequest request, @Override public void heartbeat(HeartbeatRequest request, StreamObserver responseObserver) { - LOGGER.trace("Received HeartbeatRequest {}", TextFormat.shortDebugString(request)); + LOGGER.debug("Received HeartbeatRequest {}", TextFormat.shortDebugString(request)); metadataStore.keepAlive(request.getId(), request.getEpoch(), request.getGoingAway()); diff --git a/controller/src/main/java/com/automq/rocketmq/controller/metadata/BrokerNode.java b/controller/src/main/java/com/automq/rocketmq/controller/metadata/BrokerNode.java index 097c437fe..9eea01110 100644 --- a/controller/src/main/java/com/automq/rocketmq/controller/metadata/BrokerNode.java +++ b/controller/src/main/java/com/automq/rocketmq/controller/metadata/BrokerNode.java @@ -20,11 +20,16 @@ import com.automq.rocketmq.common.config.ControllerConfig; import com.automq.rocketmq.controller.metadata.database.dao.Node; import java.util.concurrent.TimeUnit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Node with runtime information. */ public class BrokerNode { + + private static final Logger LOGGER = LoggerFactory.getLogger(BrokerNode.class); + private final Node node; private long lastKeepAlive; @@ -38,6 +43,7 @@ public BrokerNode(Node node) { public void keepAlive(long epoch, boolean goingAway) { if (epoch < node.getEpoch()) { + LOGGER.warn("Heartbeat epoch={} is deprecated, known epoch={}", epoch, node.getEpoch()); return; } diff --git a/controller/src/main/java/com/automq/rocketmq/controller/metadata/ControllerClient.java b/controller/src/main/java/com/automq/rocketmq/controller/metadata/ControllerClient.java index 3db38ce97..015dae50e 100644 --- a/controller/src/main/java/com/automq/rocketmq/controller/metadata/ControllerClient.java +++ b/controller/src/main/java/com/automq/rocketmq/controller/metadata/ControllerClient.java @@ -52,8 +52,7 @@ public interface ControllerClient extends Closeable { CompletableFuture describeTopic(String target, Long topicId, String topicName); - CompletableFuture heartbeat(String target, int nodeId, long epoch, - boolean goingAway) throws ControllerException; + CompletableFuture heartbeat(String target, int nodeId, long epoch, boolean goingAway); CompletableFuture reassignMessageQueue(String target, long topicId, int queueId, int dstNodeId); diff --git a/controller/src/main/java/com/automq/rocketmq/controller/metadata/GrpcControllerClient.java b/controller/src/main/java/com/automq/rocketmq/controller/metadata/GrpcControllerClient.java index 54887a126..4ce69c83f 100644 --- a/controller/src/main/java/com/automq/rocketmq/controller/metadata/GrpcControllerClient.java +++ b/controller/src/main/java/com/automq/rocketmq/controller/metadata/GrpcControllerClient.java @@ -265,10 +265,14 @@ public void onFailure(@Nonnull Throwable t) { } @Override - public CompletableFuture heartbeat(String target, int nodeId, long epoch, - boolean goingAway) throws ControllerException { - buildStubForTarget(target); - + public CompletableFuture heartbeat(String target, int nodeId, long epoch, boolean goingAway) { + + try { + buildStubForTarget(target); + } catch (ControllerException e) { + return CompletableFuture.failedFuture(e); + } + ControllerServiceGrpc.ControllerServiceFutureStub stub = stubs.get(target); HeartbeatRequest request = HeartbeatRequest .newBuilder() @@ -330,7 +334,8 @@ public void onFailure(@Nonnull Throwable t) { } @Override - public CompletableFuture notifyQueueClose(String target, long topicId, int queueId) throws ControllerException { + public CompletableFuture notifyQueueClose(String target, long topicId, + int queueId) throws ControllerException { CompletableFuture future = new CompletableFuture<>(); NotifyMessageQueuesAssignableRequest request = NotifyMessageQueuesAssignableRequest.newBuilder() @@ -542,7 +547,8 @@ public void onFailure(@Nonnull Throwable t) { } @Override - public CompletableFuture commitStreamObject(String target, CommitStreamObjectRequest request) { + public CompletableFuture commitStreamObject(String target, + CommitStreamObjectRequest request) { CompletableFuture future = new CompletableFuture<>(); try { Futures.addCallback(this.buildStubForTarget(target).commitStreamObject(request), new FutureCallback<>() { diff --git a/controller/src/main/java/com/automq/rocketmq/controller/metadata/MetadataStore.java b/controller/src/main/java/com/automq/rocketmq/controller/metadata/MetadataStore.java index b9352dbdd..7b1b8f272 100644 --- a/controller/src/main/java/com/automq/rocketmq/controller/metadata/MetadataStore.java +++ b/controller/src/main/java/com/automq/rocketmq/controller/metadata/MetadataStore.java @@ -84,8 +84,21 @@ public interface MetadataStore extends Closeable { */ void registerCurrentNode(String name, String address, String instanceId) throws ControllerException; + /** + * If the node is a leader, it should keep the sending node alive once it receives heartbeat requests + * from it. + * + * @param nodeId Heartbeat sender node-id + * @param epoch Epoch of the node + * @param goingAway Flag if the node is going away shortly + */ void keepAlive(int nodeId, long epoch, boolean goingAway); + /** + * Send heartbeat request to leader to keep current node alive. + */ + void heartbeat(); + CompletableFuture createTopic(String topicName, int queueNum, List acceptMessageTypesList) throws ControllerException; diff --git a/controller/src/main/java/com/automq/rocketmq/controller/metadata/database/DefaultMetadataStore.java b/controller/src/main/java/com/automq/rocketmq/controller/metadata/database/DefaultMetadataStore.java index c5bb3d0bf..2922ef375 100644 --- a/controller/src/main/java/com/automq/rocketmq/controller/metadata/database/DefaultMetadataStore.java +++ b/controller/src/main/java/com/automq/rocketmq/controller/metadata/database/DefaultMetadataStore.java @@ -76,7 +76,7 @@ import com.automq.rocketmq.controller.metadata.database.mapper.SequenceMapper; import com.automq.rocketmq.controller.metadata.database.mapper.StreamMapper; import com.automq.rocketmq.controller.metadata.database.mapper.TopicMapper; -import com.automq.rocketmq.controller.tasks.KeepAliveTask; +import com.automq.rocketmq.controller.tasks.HeartbeatTask; import com.automq.rocketmq.controller.tasks.LeaseTask; import com.automq.rocketmq.controller.tasks.ReclaimS3ObjectTask; import com.automq.rocketmq.controller.tasks.RecycleGroupTask; @@ -220,7 +220,7 @@ public void start() { config.scanIntervalInSecs(), TimeUnit.SECONDS); this.scheduledExecutorService.scheduleWithFixedDelay(new SchedulerTask(this), 1, config.scanIntervalInSecs(), TimeUnit.SECONDS); - this.scheduledExecutorService.scheduleAtFixedRate(new KeepAliveTask(this), 3, + this.scheduledExecutorService.scheduleAtFixedRate(new HeartbeatTask(this), 3, Math.max(config().nodeAliveIntervalInSecs() / 2, 10), TimeUnit.SECONDS); this.scheduledExecutorService.scheduleWithFixedDelay(new RecycleTopicTask(this), 1, config.deletedTopicLingersInSecs(), TimeUnit.SECONDS); @@ -314,12 +314,37 @@ public void registerCurrentNode(String name, String address, String instanceId) @Override public void keepAlive(int nodeId, long epoch, boolean goingAway) { + if (!isLeader()) { + LOGGER.warn("Non-leader node cannot keep the node[node-id={}, epoch={}] alive", nodeId, epoch); + return; + } BrokerNode brokerNode = nodes.get(nodeId); if (null != brokerNode) { brokerNode.keepAlive(epoch, goingAway); } } + @Override + public void heartbeat() { + if (isLeader()) { + LOGGER.debug("Node of leader does not need to send heartbeat request"); + } + + try { + String target = leaderAddress(); + controllerClient.heartbeat(target, config.nodeId(), config.epoch(), false) + .whenComplete((r, e) -> { + if (null != e) { + LOGGER.error("Failed to maintain heartbeat to {}", target); + return; + } + LOGGER.debug("Heartbeat to {} OK", target); + }); + } catch (ControllerException e) { + LOGGER.error("Failed to send heartbeat to leader node", e); + } + } + public boolean maintainLeadershipWithSharedLock(SqlSession session) { LeaseMapper leaseMapper = session.getMapper(LeaseMapper.class); Lease current = leaseMapper.currentWithShareLock(); diff --git a/controller/src/main/java/com/automq/rocketmq/controller/metadata/database/HikariCPDataSourceFactory.java b/controller/src/main/java/com/automq/rocketmq/controller/metadata/database/HikariCPDataSourceFactory.java index 37186c07a..c57fca571 100644 --- a/controller/src/main/java/com/automq/rocketmq/controller/metadata/database/HikariCPDataSourceFactory.java +++ b/controller/src/main/java/com/automq/rocketmq/controller/metadata/database/HikariCPDataSourceFactory.java @@ -32,8 +32,6 @@ public void setProperties(Properties properties) { if (null == dataSource) { HikariConfig config = new HikariConfig(properties); config.setMaximumPoolSize(10); - config.setLeakDetectionThreshold(20000); - config.setMaxLifetime(30000); dataSource = new HikariDataSource(config); } } diff --git a/controller/src/main/java/com/automq/rocketmq/controller/tasks/KeepAliveTask.java b/controller/src/main/java/com/automq/rocketmq/controller/tasks/HeartbeatTask.java similarity index 82% rename from controller/src/main/java/com/automq/rocketmq/controller/tasks/KeepAliveTask.java rename to controller/src/main/java/com/automq/rocketmq/controller/tasks/HeartbeatTask.java index 575426c25..5ae563228 100644 --- a/controller/src/main/java/com/automq/rocketmq/controller/tasks/KeepAliveTask.java +++ b/controller/src/main/java/com/automq/rocketmq/controller/tasks/HeartbeatTask.java @@ -20,16 +20,15 @@ import com.automq.rocketmq.controller.exception.ControllerException; import com.automq.rocketmq.controller.metadata.MetadataStore; -public class KeepAliveTask extends ControllerTask { - public KeepAliveTask(MetadataStore metadataStore) { +public class HeartbeatTask extends ControllerTask { + public HeartbeatTask(MetadataStore metadataStore) { super(metadataStore); } @Override public void process() throws ControllerException { - int nodeId = metadataStore.config().nodeId(); if (!metadataStore.isLeader()) { - metadataStore.keepAlive(nodeId, metadataStore.config().epoch(), false); + metadataStore.heartbeat(); } } } diff --git a/controller/src/main/java/com/automq/rocketmq/controller/tasks/ScanNodeTask.java b/controller/src/main/java/com/automq/rocketmq/controller/tasks/ScanNodeTask.java index a8ab6a956..cb35320a9 100644 --- a/controller/src/main/java/com/automq/rocketmq/controller/tasks/ScanNodeTask.java +++ b/controller/src/main/java/com/automq/rocketmq/controller/tasks/ScanNodeTask.java @@ -21,13 +21,17 @@ import com.automq.rocketmq.controller.metadata.MetadataStore; import com.automq.rocketmq.controller.metadata.database.dao.Node; import com.automq.rocketmq.controller.metadata.database.mapper.NodeMapper; +import com.google.gson.Gson; import java.util.List; import org.apache.ibatis.session.SqlSession; public class ScanNodeTask extends ScanTask { + private final Gson gson; + public ScanNodeTask(MetadataStore metadataStore) { super(metadataStore); + gson = new Gson(); } @Override @@ -35,8 +39,10 @@ public void process() throws ControllerException { try (SqlSession session = this.metadataStore.openSession()) { NodeMapper mapper = session.getMapper(NodeMapper.class); List nodes = mapper.list(this.lastScanTime); - if (!nodes.isEmpty()) { - LOGGER.debug("Found {} broker nodes", nodes.size()); + if (!nodes.isEmpty() && LOGGER.isDebugEnabled()) { + for (Node node : nodes) { + LOGGER.debug("Found broker node: {}", gson.toJson(node)); + } } updateBrokers(nodes); } diff --git a/distribution/conf/broker-2.yaml b/distribution/conf/broker-2.yaml new file mode 100644 index 000000000..190ef7268 --- /dev/null +++ b/distribution/conf/broker-2.yaml @@ -0,0 +1,26 @@ +name: DefaultBroker2 +instanceId: "ec2-test-2" +bindAddress: "0.0.0.0:8081" +s3Stream: + s3WALPath: "/tmp/s3rocketmq/wal" + s3Endpoint: "http://minio.hellocorp.test" + s3Bucket: "lzh" + s3Region: "us-east-1" + s3ForcePathStyle: true + s3AccessKey: "mcFoJPLvxNmRq497dhK0" + s3SecretKey: "UU7PE0VOo3fRtqYEBeIIQzOSGFT3GctuiLoZg2hY" +db: + url: "jdbc:mysql://10.1.0.110:3306/rocketmq" + userName: "root" + password: "password" +metrics: + exporterType: "PROM" + grpcExporterTarget: "" + grpcExporterHeader: "" + grpcExporterTimeOutInMills: 31000 + grpcExporterIntervalInMills: 60000 + promExporterPort: 5557 + promExporterHost: "localhost" + loggingExporterIntervalInMills: 10000 + labels: "" + exportInDelta: false \ No newline at end of file