阅读其他语言版本: English
基于Spring Cloud Stream 规范实现 Redis 消息 发送、接收, 正式版本 与 Spring Cloud Stream 保持一致
https://guoshiqiufeng.github.io/spring-cloud-stream-redis/
- Spring Cloud Stream 4
- Spring Boot 3
- PUBLISH SUBSCRIBE 消息
- QUEUE 消息(BLPOP BRPOP LPUSH RPUSH)
注1: 两个功能模式不能混合使用,即 使用 PUBLISH SUBSCRIBE 模式 发送消息 时,不能使用 QUEUE 模式接收消息,反之亦然
注2: PUBLISH SUBSCRIBE 模式消息接收不到会丢失,QUEUE 模式不会
<dependencyManagement>
<dependencies>
<dependency>
<groupId>io.github.guoshiqiufeng.cloud</groupId>
<artifactId>spring-cloud-stream-dependencies</artifactId>
<version>0.5.2</version>
<type>import</type>
</dependency>
</dependencies>
</dependencyManagement>
<dependency>
<groupId>io.github.guoshiqiufeng.cloud</groupId>
<artifactId>spring-cloud-starter-stream-redis</artifactId>
</dependency>
spring:
cloud:
function:
definition: send;recall
stream:
default-binder: redis
binders:
redis:
type: redis
redis:
binder:
configuration:
host: 127.0.0.1
port: 6379
password: 123456
database: 7
support-type: queue_channel
# bindings:
# send-in-0:
# consumer:
# destination-is-pattern: true
bindings:
out-0:
destination: test-topic
content-type: text/plain
group: push-producer-group
send-in-0:
destination: test-topic
content-type: text/plain
group: test-send-group
@Autowired
private StreamBridge streamBridge;
@GetMapping("/send")
public String send() {
MessageVO messageVO = new MessageVO();
messageVO.setKey(UUID.randomUUID().toString());
messageVO.setMsg("hello ");
messageVO.setIds(Set.of("1", "2"));
messageVO.setCreateTime(LocalDateTime.now());
streamBridge.send("out-0", JSON.toJSONString(messageVO, JSONWriter.Feature.WriteClassName));
return "success";
}
@Slf4j
@Component("send")
public class MessageHandler implements Consumer<Message<String>> {
/**
* Performs this operation on the given argument.
*
* @param messageVOMessage the input argument
*/
@Override
public void accept(Message<String> messageVOMessage) {
log.info("send Receive New Messages: {}", messageVOMessage.getPayload());
}
}
更多使用参考查看文档