From 3a9c47e59eeb53755c76208a7550bc7b8745c974 Mon Sep 17 00:00:00 2001 From: "eko.zhan" Date: Wed, 18 Dec 2024 17:57:08 +0800 Subject: [PATCH] feat(sse): sse samples --- kbase-stack-sse/pom.xml | 30 ++++- .../heap/sse/controller/HelloController.java | 72 ++++++++++-- .../heap/sse/model/vo/req/MessageReq.java | 18 +++ .../heap/sse/model/vo/resp/MessageResp.java | 18 +++ kbase-stack-sse/src/main/resources/data.json | 111 ++++++++++++++++++ .../src/main/resources/static/index.html | 65 ++++++++++ .../ibothub/heap/sse/HelloServiceTests.java | 102 ++++++++++++++++ pom.xml | 5 +- 8 files changed, 403 insertions(+), 18 deletions(-) create mode 100644 kbase-stack-sse/src/main/java/com/ibothub/heap/sse/model/vo/req/MessageReq.java create mode 100644 kbase-stack-sse/src/main/java/com/ibothub/heap/sse/model/vo/resp/MessageResp.java create mode 100644 kbase-stack-sse/src/main/resources/data.json create mode 100644 kbase-stack-sse/src/main/resources/static/index.html create mode 100644 kbase-stack-sse/src/test/java/com/ibothub/heap/sse/HelloServiceTests.java diff --git a/kbase-stack-sse/pom.xml b/kbase-stack-sse/pom.xml index e194079..b82d467 100644 --- a/kbase-stack-sse/pom.xml +++ b/kbase-stack-sse/pom.xml @@ -11,18 +11,36 @@ kbase-stack-sse - - 8 - 8 - UTF-8 - - + + com.ibothub.heap + kbase-stack-base + ${revision} + + + org.springframework.boot + spring-boot-starter-web + + + + org.springframework.boot + spring-boot-starter-data-jdbc + + + + org.springframework.boot spring-boot-starter-webflux + + + + commons-io + commons-io + ${commons-io.version} + \ No newline at end of file diff --git a/kbase-stack-sse/src/main/java/com/ibothub/heap/sse/controller/HelloController.java b/kbase-stack-sse/src/main/java/com/ibothub/heap/sse/controller/HelloController.java index 7df69f1..c1a519a 100644 --- a/kbase-stack-sse/src/main/java/com/ibothub/heap/sse/controller/HelloController.java +++ b/kbase-stack-sse/src/main/java/com/ibothub/heap/sse/controller/HelloController.java @@ -3,31 +3,83 @@ */ package com.ibothub.heap.sse.controller; -import java.time.Duration; -import java.time.LocalDateTime; +import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson2.JSONArray; +import com.alibaba.fastjson2.JSONObject; +import com.google.common.collect.Lists; +import com.ibothub.heap.base.model.vo.ResponseEntity; +import com.ibothub.heap.sse.model.vo.req.MessageReq; +import com.ibothub.heap.sse.model.vo.resp.MessageResp; +import jakarta.annotation.PostConstruct; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.Random; +import java.util.concurrent.ConcurrentHashMap; +import org.apache.commons.io.FileUtils; +import org.springframework.http.MediaType; import org.springframework.http.codec.ServerSentEvent; +import org.springframework.util.ResourceUtils; import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestHeader; import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; import reactor.core.publisher.Flux; +import reactor.core.publisher.Sinks; /** * @author eko.zhan * @version v1.0 - * @since 2024/12/11 14:08 + * @since 2024/12/17 14:52 */ @RestController @RequestMapping("/api/v1/sse") public class HelloController { - @GetMapping("/events") - public Flux> getEvents() { - return Flux.interval(Duration.ofSeconds(1)) - .map(sequence -> ServerSentEvent.builder() - .id(String.valueOf(sequence)) - .event("message") - .data("Event #" + sequence + " at " + LocalDateTime.now()) + List famousList = Lists.newArrayList(); + + private final ConcurrentHashMap> sinks = new ConcurrentHashMap<>(); + + @GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE) + public Flux> stream(@RequestParam("token") String token) { + Sinks.Many sink = Sinks.many().multicast().directBestEffort(); + sinks.put(token, sink); + + return sink.asFlux().map(data -> + ServerSentEvent.builder() + .data(data) .build()); } + @PostMapping("/send") + public ResponseEntity send(@RequestBody MessageReq req, @RequestHeader("Authorization") String token) { + Sinks.Many sink = sinks.get(token); + if (sink != null) { + MessageResp messageResp = new MessageResp(); + messageResp.setMessage(req.getMessage() + "," + getFamous()); + sink.tryEmitNext(messageResp); + return ResponseEntity.ok(""); + } else { + return ResponseEntity.failure(""); + } + } + + @PostConstruct + public void init() throws IOException { + String txt = FileUtils.readFileToString(ResourceUtils.getFile("classpath:data.json"), + StandardCharsets.UTF_8); + JSONObject jsonObject = JSON.parseObject(txt); + JSONArray famousArr = jsonObject.getJSONArray("famous"); + famousArr.forEach(famous -> famousList.add(famous.toString())); + } + + private String getFamous(){ + Random random = new Random(); + int num = random.nextInt(famousList.size()); + return famousList.get(num); + } + } diff --git a/kbase-stack-sse/src/main/java/com/ibothub/heap/sse/model/vo/req/MessageReq.java b/kbase-stack-sse/src/main/java/com/ibothub/heap/sse/model/vo/req/MessageReq.java new file mode 100644 index 0000000..d1e7394 --- /dev/null +++ b/kbase-stack-sse/src/main/java/com/ibothub/heap/sse/model/vo/req/MessageReq.java @@ -0,0 +1,18 @@ +/* + * powered by https://ekozhan.com + */ +package com.ibothub.heap.sse.model.vo.req; + +import java.io.Serializable; +import lombok.Data; + +/** + * @author eko.zhan + * @version v1.0 + * @since 2024/12/18 15:12 + */ +@Data +public class MessageReq implements Serializable { + + private String message; +} diff --git a/kbase-stack-sse/src/main/java/com/ibothub/heap/sse/model/vo/resp/MessageResp.java b/kbase-stack-sse/src/main/java/com/ibothub/heap/sse/model/vo/resp/MessageResp.java new file mode 100644 index 0000000..9dcab00 --- /dev/null +++ b/kbase-stack-sse/src/main/java/com/ibothub/heap/sse/model/vo/resp/MessageResp.java @@ -0,0 +1,18 @@ +/* + * powered by https://ekozhan.com + */ +package com.ibothub.heap.sse.model.vo.resp; + +import java.io.Serializable; +import lombok.Data; + +/** + * @author eko.zhan + * @version v1.0 + * @since 2024/12/18 15:12 + */ +@Data +public class MessageResp implements Serializable { + + private String message; +} diff --git a/kbase-stack-sse/src/main/resources/data.json b/kbase-stack-sse/src/main/resources/data.json new file mode 100644 index 0000000..f3157ec --- /dev/null +++ b/kbase-stack-sse/src/main/resources/data.json @@ -0,0 +1,111 @@ +{ + "famous":[ + "爱迪生说过,天才是百分之一的勤奋加百分之九十九的汗水。", + "查尔斯·史说过,一个人几乎可以在任何他怀有无限热忱的事情上成功。", + "培根说过,深窥自己的心,而后发觉一切的奇迹在你自己。", + "歌德曾经说过,流水在碰到底处时才会释放活力。", + "莎士比亚说过,那脑袋里的智慧,就像打火石里的火花一样,不去打它是不肯出来的。", + "戴尔·卡耐基说过,多数人都拥有自己不了解的能力和机会,都有可能做到未曾梦想的事情。", + "白哲特说过,坚强的信念能赢得强者的心,并使他们变得更坚强。", + "伏尔泰说过,不经巨大的困难,不会有伟大的事业。", + "富勒曾经说过,苦难磨炼一些人,也毁灭另一些人。", + "文森特·皮尔说过,改变你的想法,你就改变了自己的世界。", + "拿破仑·希尔说过,不要等待,时机永远不会恰到好处。", + "塞涅卡说过,生命如同寓言,其价值不在与长短,而在与内容。", + "奥普拉·温弗瑞说过,你相信什么,你就成为什么样的人。", + "吕凯特说过,生命不可能有两次,但许多人连一次也不善于度过。", + "莎士比亚说过,人的一生是短的,但如果卑劣地过这一生,就太长了。", + "笛卡儿说过,我的努力求学没有得到别的好处,只不过是愈来愈发觉自己的无知。", + "左拉说过,生活的道路一旦选定,就要勇敢地走到底,决不回头。", + "米歇潘说过,生命是一条艰险的峡谷,只有勇敢的人才能通过。", + "吉姆·罗恩说过,要么你主宰生活,要么你被生活主宰。", + "日本谚语说过,不幸可能成为通向幸福的桥梁。", + "海贝尔说过,人生就是学校。在那里,与其说好的教师是幸福,不如说好的教师是不幸。", + "杰纳勒尔·乔治·S·巴顿说过,接受挑战,就可以享受胜利的喜悦。", + "德谟克利特说过,节制使快乐增加并使享受加强。", + "裴斯泰洛齐说过,今天应做的事没有做,明天再早也是耽误了。", + "歌德说过,决定一个人的一生,以及整个命运的,只是一瞬之间。", + "卡耐基说过,一个不注意小事情的人,永远不会成就大事业。", + "卢梭说过,浪费时间是一桩大罪过。", + "康德说过,既然我已经踏上这条道路,那么,任何东西都不应妨碍我沿着这条路走下去。", + "克劳斯·莫瑟爵士说过,教育需要花费钱,而无知也是一样。", + "伏尔泰说过,坚持意志伟大的事业需要始终不渝的精神。", + "亚伯拉罕·林肯说过,你活了多少岁不算什么,重要的是你是如何度过这些岁月的。", + "韩非说过,内外相应,言行相称。", + "富兰克林说过,你热爱生命吗?那么别浪费时间,因为时间是组成生命的材料。", + "马尔顿说过,坚强的信心,能使平凡的人做出惊人的事业。", + "笛卡儿说过,读一切好书,就是和许多高尚的人谈话。", + "塞涅卡说过,真正的人生,只有在经过艰难卓绝的斗争之后才能实现。", + "易卜生说过,伟大的事业,需要决心,能力,组织和责任感。", + "歌德说过,没有人事先了解自己到底有多大的力量,直到他试过以后才知道。", + "达尔文说过,敢于浪费哪怕一个钟头时间的人,说明他还不懂得珍惜生命的全部价值。", + "佚名说过,感激每一个新的挑战,因为它会锻造你的意志和品格。", + "奥斯特洛夫斯基说过,共同的事业,共同的斗争,可以使人们产生忍受一切的力量。 b", + "苏轼说过,古之立大事者,不惟有超世之才,亦必有坚忍不拔之志。", + "王阳明说过,故立志者,为学之心也;为学者,立志之事也。", + "歌德说过,读一本好书,就如同和一个高尚的人在交谈。", + "乌申斯基说过,学习是劳动,是充满思想的劳动。", + "别林斯基说过,好的书籍是最贵重的珍宝。", + "富兰克林说过,读书是易事,思索是难事,但两者缺一,便全无用处。", + "鲁巴金说过,读书是在别人思想的帮助下,建立起自己的思想。", + "培根说过,合理安排时间,就等于节约时间。", + "屠格涅夫说过,你想成为幸福的人吗?但愿你首先学会吃得起苦。", + "莎士比亚说过,抛弃时间的人,时间也抛弃他。", + "叔本华说过,普通人只想到如何度过时间,有才能的人设法利用时间。", + "博说过,一次失败,只是证明我们成功的决心还够坚强。 维b", + "拉罗什夫科说过,取得成就时坚持不懈,要比遭到失败时顽强不屈更重要。", + "莎士比亚说过,人的一生是短的,但如果卑劣地过这一生,就太长了。", + "俾斯麦说过,失败是坚忍的最后考验。", + "池田大作说过,不要回避苦恼和困难,挺起身来向它挑战,进而克服它。", + "莎士比亚说过,那脑袋里的智慧,就像打火石里的火花一样,不去打它是不肯出来的。", + "希腊说过,最困难的事情就是认识自己。", + "黑塞说过,有勇气承担命运这才是英雄好汉。", + "非洲说过,最灵繁的人也看不见自己的背脊。", + "培根说过,阅读使人充实,会谈使人敏捷,写作使人精确。", + "斯宾诺莎说过,最大的骄傲于最大的自卑都表示心灵的最软弱无力。", + "西班牙说过,自知之明是最难得的知识。", + "塞内加说过,勇气通往天堂,怯懦通往地狱。", + "赫尔普斯说过,有时候读书是一种巧妙地避开思考的方法。", + "笛卡儿说过,阅读一切好书如同和过去最杰出的人谈话。", + "邓拓说过,越是没有本领的就越加自命不凡。", + "爱尔兰说过,越是无能的人,越喜欢挑剔别人的错儿。", + "老子说过,知人者智,自知者明。胜人者有力,自胜者强。", + "歌德说过,意志坚强的人能把世界放在手中像泥块一样任意揉捏。", + "迈克尔·F·斯特利说过,最具挑战性的挑战莫过于提升自我。", + "爱迪生说过,失败也是我需要的,它和成功对我一样有价值。", + "罗素·贝克说过,一个人即使已登上顶峰,也仍要自强不息。", + "马云说过,最大的挑战和突破在于用人,而用人最大的突破在于信任人。", + "雷锋说过,自己活着,就是为了使别人过得更美好。", + "布尔沃说过,要掌握书,莫被书掌握;要为生而读,莫为读而生。", + "培根说过,要知道对好事的称颂过于夸大,也会招来人们的反感轻蔑和嫉妒。", + "莫扎特说过,谁和我一样用功,谁就会和我一样成功。", + "马克思说过,一切节省,归根到底都归结为时间的节省。", + "莎士比亚说过,意志命运往往背道而驰,决心到最后会全部推倒。", + "卡莱尔说过,过去一切时代的精华尽在书中。", + "培根说过,深窥自己的心,而后发觉一切的奇迹在你自己。", + "罗曼·罗兰说过,只有把抱怨环境的心情,化为上进的力量,才是成功的保证。", + "孔子说过,知之者不如好之者,好之者不如乐之者。", + "达·芬奇说过,大胆和坚定的决心能够抵得上武器的精良。", + "叔本华说过,意志是一个强壮的盲人,倚靠在明眼的跛子肩上。", + "黑格尔说过,只有永远躺在泥坑里的人,才不会再掉进坑里。", + "普列姆昌德说过,希望的灯一旦熄灭,生活刹那间变成了一片黑暗。", + "维龙说过,要成功不需要什么特别的才能,只要把你能做的小事做得好就行了。", + "郭沫若说过,形成天才的决定因素应该是勤奋。", + "洛克说过,学到很多东西的诀窍,就是一下子不要学很多。", + "西班牙说过,自己的鞋子,自己知道紧在哪里。", + "拉罗什福科说过,我们唯一不会改正的缺点是软弱。", + "亚伯拉罕·林肯说过,我这个人走得很慢,但是我从不后退。", + "美华纳说过,勿问成功的秘诀为何,且尽全力做你应该做的事吧。", + "俾斯麦说过,对于不屈不挠的人来说,没有失败这回事。", + "阿卜·日·法拉兹说过,学问是异常珍贵的东西,从任何源泉吸收都不可耻。", + "白哲特说过,坚强的信念能赢得强者的心,并使他们变得更坚强。 b", + "查尔斯·史考伯说过,一个人几乎可以在任何他怀有无限热忱的事情上成功。 b", + "贝多芬说过,卓越的人一大优点是:在不利与艰难的遭遇里百折不饶。", + "莎士比亚说过,本来无望的事,大胆尝试,往往能成功。", + "卡耐基说过,我们若已接受最坏的,就再没有什么损失。", + "德国说过,只有在人群中间,才能认识自己。", + "史美尔斯说过,书籍把我们引入最美好的社会,使我们认识各个时代的伟大智者。", + "冯学峰说过,当一个人用工作去迎接光明,光明很快就会来照耀着他。", + "吉格·金克拉说过,如果你能做梦,你就能实现它。" + ] +} \ No newline at end of file diff --git a/kbase-stack-sse/src/main/resources/static/index.html b/kbase-stack-sse/src/main/resources/static/index.html new file mode 100644 index 0000000..10b8853 --- /dev/null +++ b/kbase-stack-sse/src/main/resources/static/index.html @@ -0,0 +1,65 @@ + + + + + SSE Samples + + + +

Server-Sent Events (SSE) Client

+
+ + + + + + + \ No newline at end of file diff --git a/kbase-stack-sse/src/test/java/com/ibothub/heap/sse/HelloServiceTests.java b/kbase-stack-sse/src/test/java/com/ibothub/heap/sse/HelloServiceTests.java new file mode 100644 index 0000000..efa0780 --- /dev/null +++ b/kbase-stack-sse/src/test/java/com/ibothub/heap/sse/HelloServiceTests.java @@ -0,0 +1,102 @@ +/* + * powered by https://ekozhan.com + */ +package com.ibothub.heap.sse; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONArray; +import com.alibaba.fastjson.JSONObject; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.List; +import javax.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.io.FileUtils; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.http.MediaType; +import org.springframework.http.codec.ServerSentEvent; +import org.springframework.util.ResourceUtils; +import org.springframework.web.reactive.function.client.WebClient; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.util.context.Context; + +/** + * @author eko.zhan + * @version v1.0 + * @since 2024/12/13 10:27 + */ +@Slf4j +public class HelloServiceTests { + + static WebClient webClient = WebClient.create("http://localhost:8080"); + + public static void main(String[] args) { + func3(); + } + + private static void func3(){ + webClient + .post() + .uri("/api/v1/sse/events") + .accept(MediaType.TEXT_EVENT_STREAM) + .retrieve() + .bodyToFlux(String.class) + .transformDeferredContextual((stream, ctx) -> { + return stream.handle((event, sink) -> { + System.out.println("111" + event); + sink.next("222" + event); + }); + }).subscribe(System.out::println); + + try { + Thread.sleep(60 * 1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + @Test + public void func1(){ + webClient + .post() + .uri("/api/v1/sse/events") + .accept(MediaType.TEXT_EVENT_STREAM) + .retrieve() + .bodyToFlux(String.class) + .subscribe(System.out::println); + +// try { +// Thread.sleep(60 * 1000); +// } catch (InterruptedException e) { +// e.printStackTrace(); +// } + + log.info("end"); + } + + private static void func2(){ + Mono> listMono = webClient + .post() + .uri("/api/v1/sse/events") + .accept(MediaType.TEXT_EVENT_STREAM) + .retrieve() + .bodyToFlux(String.class) + .collectList(); + + System.out.println("end"); + listMono.block().forEach(System.out::println); + } + + @Test + public void readFile() throws IOException { + String txt = FileUtils.readFileToString(ResourceUtils.getFile("classpath:data.json"), + StandardCharsets.UTF_8); + JSONObject jsonObject = JSON.parseObject(txt); + JSONArray famousArr = jsonObject.getJSONArray("famous"); + famousArr.forEach(System.out::println); + } +} diff --git a/pom.xml b/pom.xml index 300c834..c9ca3c9 100644 --- a/pom.xml +++ b/pom.xml @@ -55,6 +55,7 @@ 2.10.0 3.10 1.9.4 + 2.18.0 1.6.3 1.6.3 3.4.0 @@ -152,8 +153,8 @@ - com.alibaba - fastjson + com.alibaba.fastjson2 + fastjson2 ${fastjson.version}