• WebFlux+SSE流式传输


    关注 码龄 粉丝数 原力等级 -- 被采纳 被点赞 采纳率 沈山南丶 2024-03-27 10:07 采纳率: 50% 浏览 19 首页/ 编程语言 / WebFlux+SSE流式传输 java网络协议 服务端webflux传输SSE流式数据,客户端接收到数据格式不符合SSE协议规范,导致无法解析data和event,id等字段 这是我服务端用webflux每1s返回一个单词,返回类型是流式。使用ServerSentEvent对象符合sse协议规范字段,其中有data,id,event字段 /** * @Desc: 短暂的周期性事件流 * @Author:zhh * @Date:2024/3/26 15:16 */ @RestController public class WebFluxController { private static final String[] WORDS = "The quick brown fox jumps over the lazy dog.".split(" "); @GetMapping(value = "/chat", produces = MediaType.TEXT_EVENT_STREAM_VALUE) @CrossOrigin public Flux> subscribe() { return Flux.zip(Flux.just(WORDS), Flux.interval(Duration.ofSeconds(1))) .map(sequence -> ServerSentEvent.builder() .data(sequence.getT1()) .id(UUID.randomUUID().toString()) .event("MyCustomEvent") .build()); } } 这是我客户端接收到sse流式数据,将字节转化成字符串,因为sse传输的是纯文本格式。代码如下 public Flux initSse() { //使用webClient发送消息 return this.webClient.get() //请求uri .uri(API_URI) //设置流式响应 .header(HttpHeaders.CONTENT_TYPE, MediaType.TEXT_EVENT_STREAM_VALUE) .acceptCharset(Charset.defaultCharset()) .retrieve() .bodyToFlux(DataBuffer.class) .flatMap(dataBuffer -> { byte[] bytes = new byte[dataBuffer.readableByteCount()]; dataBuffer.read(bytes); DataBufferUtils.release(dataBuffer); return Mono.just(new String(bytes, StandardCharsets.UTF_8)); }) .doOnNext(str-> System.out.println(str)); } 运行结果及详细报错内容 id:37505324-7732-40ed-b6c5-b8e30cabfa75 event:MyCustomEvent data: quick id:84627318-bf50-491b-b5ce-34c180cc8e11 event:MyCustomEvent data: brown id:90769bdd-798a-43af-b6e4-4f628f868e01 event:MyCustomEvent data: fox id:892273ec-ba22-4015-be46-11902e77472b event:MyCustomEvent data: jumps 我的解答思路和尝试过的方法 按照sse协议规范,正确输出应该是 id:37505324-7732-40ed-b6c5-b8e30cabfa75 event:MyCustomEvent data:quick id:84627318-bf50-491b-b5ce-34c180cc8e11 event:MyCustomEvent data:brown id:90769bdd-798a-43af-b6e4-4f628f868e01 event:MyCustomEvent data:fox id:892273ec-ba22-4015-be46-11902e77472b event:MyCustomEvent data:jumps 错误的输出导致无法有效解析每个事件的内容, 如果直接接收data数据可以 public Flux initSse2() { //使用webClient发送消息 return this.webClient.get() //请求uri .uri(API_URI) //设置流式响应 .header(HttpHeaders.CONTENT_TYPE, MediaType.TEXT_EVENT_STREAM_VALUE) .acceptCharset(Charset.defaultCharset()) .retrieve() //直接将sse事件协议格式下的data提取转换成string .bodyToFlux(String.class) .flatMap(string -> { //如果是json字符串,需要处理json字符串转换成对象 ...... Object result = JSON.parseObject(string, Object.class); return Flux.just(result); }); } 问题就在于bodyToFlux只能提取data数据转成字符串,如果需要event,id等字段数据此时就不能生效了 此时想到bodyToFlux(ServerSentEvent.class)直接转成ServerSentEvent对象,结果报错了 根据报错日志,无法解析字段the id:37505324-7732-40ed-b6c5-b8e30cabfa75 event:MyCustomEvent data: the 通过打印知道传过来的数据是上面这样的,显然json解析失败了 public Flux initSse() { //使用webClient发送消息 return this.webClient.get() //请求uri .uri(API_URI) //设置流式响应 .header(HttpHeaders.CONTENT_TYPE, MediaType.TEXT_EVENT_STREAM_VALUE) .acceptCharset(Charset.defaultCharset()) .retrieve() .bodyToFlux(ServerSentEvent.class) .doOnNext(string -> System.out.println(string)) .flatMap(this::parseSseEventData); } public Flux parseSseEventData(ServerSentEvent serverSentEvent) { return Flux.just(serverSentEvent.data().toString()); } 报错 om.fasterxml.jackson.core.JsonParseException: Unrecognized token 'The': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false') at [Source: (org.springframework.core.io.buffer.DefaultDataBuffer$DefaultDataBufferInputStream); line: 1, column: 5] at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:2391) ~[jackson-core-2.13.5.jar:2.13.5] at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:745) ~[jackson-core-2.13.5.jar:2.13.5] at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3635) ~[jackson-core-2.13.5.jar:2.13.5] at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2734) ~[jackson-core-2.13.5.jar:2.13.5] at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:902) ~[jackson-core-2.13.5.jar:2.13.5] at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:794) ~[jackson-core-2.13.5.jar:2.13.5] at com.fasterxml.jackson.databind.ObjectReader._initForReading(ObjectReader.java:356) ~[jackson-databind-2.13.5.jar:2.13.5] at com.fasterxml.jackson.databind.ObjectReader._bin 到底是那一步出现了问题,导致接收到的sse流数据不符合sse定义规范,是服务端传输问题?还是客户端dataBuffer解析问题? 展开全部 收起 写回答 好问题 0 提建议 追加酬金 关注问题 微信扫一扫 点击复制链接 分享 邀请回答 编辑 收藏 删除 结题 收藏 举报 追加酬金 (90%的用户在追加酬金后获得了解决方案) 当前问题酬金 ¥ 0 (可追加 ¥500) ¥ 15¥ 20¥ 50¥ 100¥ 200 支付方式 扫码支付 二维码出错 点击刷新 支付金额 15 元 提供问题酬金的用户不参与问题酬金结算和分配 支付即为同意 《付费问题酬金结算规则》 结题 再想想 删除 再等等

    2条回答 默认 最新

    • 码农阿豪 Java领域优质创作者 2024-03-27 10:07
      关注
      让阿豪来帮你解答,本回答参考chatgpt3.5编写提供,如果还有疑问可以评论或留言
      问题描述:服务端使用WebFlux传输SSE流式数据,但客户端接收到数据格式不符合SSE协议规范,导致无法解析data和event,id等字段。 解决方法:根据SSE协议规范,每个事件都需要按照特定格式输出,包括id、event和data字段。因此,客户端需要按照协议规范解析接收到的数据。 以下是客户端接收到sse流式数据的代码: public Flux initSse() { return this.webClient.get() .uri(API_URI) .header(HttpHeaders.CONTENT_TYPE, MediaType.TEXT_EVENT_STREAM_VALUE) .acceptCharset(Charset.defaultCharset()) .retrieve() .bodyToFlux(DataBuffer.class) .flatMap(dataBuffer -> { byte[] bytes = new byte[dataBuffer.readableByteCount()]; dataBuffer.read(bytes); DataBufferUtils.release(dataBuffer); return Mono.just(new String(bytes, StandardCharsets.UTF_8)); }) .doOnNext(str -> System.out.println(str)); } 在上面的代码中,客户端接收到的是DataBuffer类型的数据。可以通过将其转换成字节数组再转换成字符串的方式来处理。 但是,这种方法会将一个事件的多条信息输出在同一行,导致无法区分每个事件中的不同字段。因此,我们需要修改代码,按照SSE协议规范解析每个事件。 修改后的代码如下: public Flux> initSse() { return this.webClient.get() .uri(API_URI) .header(HttpHeaders.CONTENT_TYPE, MediaType.TEXT_EVENT_STREAM_VALUE) .acceptCharset(Charset.defaultCharset()) .retrieve() .bodyToFlux(String.class) .map(text -> { String[] lines = text.split("\n"); String eventData = ""; String eventId = ""; String eventType = ""; for (String line : lines) { if (line.startsWith("data:")) { eventData = line.substring(5).trim(); } else if (line.startsWith("id:")) { eventId = line.substring(3).trim(); } else if (line.startsWith("event:")) { eventType = line.substring(6).trim(); } } return ServerSentEvent.builder(eventData) .id(eventId) .event(eventType) .build(); }); } 在上面的代码中,我们先将DataBuffer类型的数据转换成字符串,然后按照“\n”进行分割,逐个处理每个事件中的不同字段。最后将这些字段构造成一个ServerSentEvent对象,返回给客户端。 运行结果应该是: id:37505324-7732-40ed-b6c5-b8e30cabfa75 event:MyCustomEvent data:quick id:84627318-bf50-491b-b5ce-34c180cc8e11 event:MyCustomEvent data:brown id:90769bdd-798a-43af-b6e4-4f628f868e01 event:MyCustomEvent data:fox id:892273ec-ba22-4015-be46-11902e77472b event:MyCustomEvent data:jumps 这样就按照SSE协议规范解析了每个事件中的不同字段,方便客户端后续的处理。
      评论
      沈山南丶 2024-03-27 10:26

      不行, .bodyToFlux(String.class) 只能截取data数据,event和id等字段数据 webflux不会提取

      回复
    • 专家-皆知 2024-03-27 10:07
      关注

      引自免费微信小程序:皆我百晓生

      根据您的描述和提供的代码,我理解您的问题是在使用WebFlux和SSE(Server-Sent Event)进行流式传输时,遇到了JSON解析错误。您尝试使用Jackson库将接收到的数据从ServerSentEvent对象解析为JSON字符串,但遇到了无法解析的错误。

      首先,我建议您检查一下发送的消息是否符合SSE协议的格式。在您的代码中,您使用了MediaType.TEXT_EVENT_STREAM_VALUE,这表示您期望接收的是纯文本格式的流式数据。但是,从错误日志来看,接收到的数据可能包含了其他格式。

      为了更好地帮助您解决问题,您可以提供一下发送的消息的具体内容。另外,您也可以尝试将接收到的数据直接转换为ServerSentEvent对象,而不是尝试解析为JSON字符串,这样可以避免遇到解析错误。

      以下是一个将接收到的数据直接转换为ServerSentEvent对象的示例代码:

      public Flux> initSse() {
          //使用webClient发送消息
          return this.webClient.get()
                  //请求uri
                  .uri(API_URI)
                  //设置流式响应
                  .header(HttpHeaders.CONTENT_TYPE, MediaType.TEXT_EVENT_STREAM_VALUE)
                  .acceptCharset(Charset.defaultCharset())
                  .retrieve()
                  .bodyToFlux(ServerSentEvent.class)
                  .doOnNext(serverSentEvent -> {
                      //直接将接收到的ServerSentEvent对象作为数据发送
                      serverSentEvent.data("the");
                  });
      }
      

      希望这些建议对您有所帮助。如果您能提供更多关于发送的消息的内容,我将更好地帮助您解决问题。

      评论 编辑记录
      沈山南丶 2024-03-27 10:25

      不行在 .bodyToFlux(ServerSentEvent.class) 就直接报错了

      回复
    编辑
    预览

    报告相同问题?

  • 相关阅读:
    线性回归介绍以及实现
    C++11delete与default
    Android : ListView + BaseAdapter-简单应用
    九月 Web3 游戏报告:数量增长,巨头入场,用户获取和留存仍存挑战
    可移动表空间
    数据结构-冒泡排序Java实现
    【第2章 Node.js基础】2.4 Node.js 全局对象(二) process 对象
    acwing算法提高之图论--拓扑排序
    scrapy的入门使用
    【图】按公因数计算最大组件大小 并查集
  • 原文地址:https://ask.csdn.net/questions/8079625