Java处理SSE请求的最佳方案——WebFlux
| Java
评论 0 | 点赞 0 | 浏览 41

2023年,我写了一篇博客(Server-Sent Events(SSE)——一种服务器向浏览器推送信息的方法,不了解SSE的推荐先去看看),文章介绍了SSE的基本概念以及使用示例,那时候我还在另外一家公司工作,当时ChatGPT比较火爆,我写那篇博客也是因为AI大模型在使用SSE做传输。谁也没想到,当年只是抱着学习的态度写下的那篇博客对我后来的工作产生了较大的影响。

2024年我跳槽去了一家新公司,24年底,公司的技术部门向着AI应用领域转型。在AI大模型发展起来之前,很少有人了解过SSE,实在是因为它不常用,当时团队里的人一度以为AI的流式输出是靠WebSocket实现的,于是,凭借对SSE的了解,我成为了团队开发的主力,直到今天,我又有了新的心得体会。

在工作的过程中,我逐渐接触到了一些优秀的开源框架,比如LangChain4j(从python的langchain框架移植过来)、LangGraph,LangChain4j这个框架是我研究的最多的,如果你往前翻阅我的博客,还能看到我发过的另外一篇文章(扩展LangChain4j,使其支持Deepseek-R1模型,能输出推理内容),我看到LangChain4j在调用大模型的时候,使用的是WebFlux框架,一番研究之后,觉得这东西牛逼,有必要写篇博客记录下来。

一、业务背景

我用一个具体的业务场景为示例,介绍WebFlux 

我们有一个python团队,在做agent智能体开发,LangGraph和Dify都有用到,它们的agent智能体也是通过SSE协议和Java为主的业务系统做交互,流式输出是AI的灵魂,业务系统需要读取流并根据流中的数据做一些业务操作。

二、简单介绍WebFlux

Spring WebFlux 是 Spring Framework 5 引入的一个全新的、非阻塞响应式的 Web 框架。

它主要有两个核心特点:

  1. 非阻塞 I/O:与传统的 Spring MVC 基于 Servlet 容器的阻塞式处理不同,WebFlux 使用事件驱动模型,一个线程可以处理多个请求,当请求需要等待(如数据库查询、网络调用)时,线程不会被阻塞,而是去处理其他请求,极大地提高了并发处理能力。
  2. 响应式编程:它基于 Reactive Streams 规范,并与 Project Reactor(提供 Flux 和 Mono 两种核心发布者类型)深度集成。开发者可以使用函数式、声明式的方式来处理数据流,非常适合处理高并发、低延迟的场景,如实时数据推送、微服务网关等。

主要优势:

  • 更高的吞吐量:在高并发场景下,能以更少的硬件资源处理更多的请求。
  • 更好的资源利用率:非阻塞特性减少了线程等待,降低了线程池压力。
  • 支持函数式编程风格:除了注解方式(类似 MVC),还支持纯函数式路由和处理器。

三、捞B写法(阻塞式)

引入依赖

      <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>
        </dependency>

WebFlux提供的WebClient可以用来调用SSE接口,调用后会返回Flux对象,直接在Controller中return Flux对象就能实现流式输出效果,代码如下:

@PostMapping(value = "/chat")
    public Flux<String> chat(HttpServletResponse response) {
        //构建webClient
        WebClient webClient = WebClient.builder().clientConnector(new ReactorClientHttpConnector(
                HttpClient.create().responseTimeout(Duration.ofSeconds(600)) // 设置超时时间为600秒
        )).baseUrl("").build();
        
        JSONObject body = new JSONObject();
        body.put("query", "");
        Mono<JSONObject> requestBody = Mono.just(body);
        // 发出POST请求并接收SSE流式响应
        Flux<String> sseFlux = webClient.post()
                .header("Authorization", "") // 添加特定请求头
                .body(requestBody, JSONObject.class)
                .retrieve()
                .bodyToFlux(String.class);
        return sseFlux;
    }

但是这种情况只适用于直来直去的业务场景,大部分情况下,我们需要对流中的数据做复杂的操作(比如数据过滤,数据格式转换,数据拼接,数据保存等),那么自然第一反应就是,能不能阻塞住主线程,等待读取这个流,一边读取一边操作?于是就有了下面这样的代码:

 @PostMapping(value = "/chat")
    public void chat(HttpServletResponse response) {
        //构建webClient
        WebClient webClient = WebClient.builder().clientConnector(new ReactorClientHttpConnector(
                HttpClient.create().responseTimeout(Duration.ofSeconds(600)) // 设置超时时间为600秒
        )).baseUrl("").build();

        JSONObject body = new JSONObject();
        body.put("query", "");
        Mono<JSONObject> requestBody = Mono.just(body);
        // 发出POST请求并接收SSE流式响应
        Flux<String> sseFlux = webClient.post()
                .header("Authorization", "") // 添加特定请求头
                .body(requestBody, JSONObject.class)
                .retrieve()
                .bodyToFlux(String.class);
        final ServletOutputStream out = response.getOutputStream();
        Mono<Void> mono = sseFlux
                .doOnNext(message -> {
                    //do something
                    out.write(("data:" + JSON.toJSONString(aiMessageResultDTO) + "\n\n").getBytes());
                    out.flush();
                })
                .doOnComplete(() -> {
                    //do something
                    out.write(("data:" + JSON.toJSONString(aiMessageResultDTO) + "\n\n").getBytes());
                    out.flush();
                })
                .doOnError(error -> {
                    log.error("Error during message processing", error);
                    try {
                        out.write(("data:" + "我在分析需求时遇到了一些问题,请重新提问。" + "\n\n").getBytes());
                        out.close();
                    } catch (Exception e) {
                        log.error("Failed to close output stream", e);
                    }
                })
                .then();
        //阻塞直到流全部读取完毕
        mono.block();
    }

代码最后的mono.block()是可以阻塞整个主线程,直到流读取完毕的,然后通过多次向response中写入数据块的方式实现流式输出,这种方式比较灵活,可以在随心所欲的控制输出的内容,doOnNext(),doOnComplete(),doOnError()等钩子函数的引入也方便处理业务。看起来是美好的,而且我最初也是用这样的方式做的。

四、非阻塞式方案

flux提供了map方法可以用来对数据做转换

@PostMapping(value = "/chat")
    public Flux<String> chat(HttpServletResponse response) {
        //构建webClient
        WebClient webClient = WebClient.builder().clientConnector(new ReactorClientHttpConnector(
                HttpClient.create().responseTimeout(Duration.ofSeconds(600)) // 设置超时时间为600秒
        )).baseUrl("").build();
        JSONObject body = new JSONObject();
        body.put("query", "");
        Mono<JSONObject> requestBody = Mono.just(body);
        // 发出POST请求并接收SSE流式响应
        Flux<String> sseFlux = webClient.post()
                .header("Authorization", "") // 添加特定请求头
                .body(requestBody, JSONObject.class)
                .retrieve()
                .bodyToFlux(String.class);
        Flux<String> resultFlux = sseFlux.map(originalMessage -> {
                    JSONObject originalJson = JSONObject.parseObject(originalMessage);
                    return "";
                }).doOnComplete(() -> {
                    
                }).doOnCancel(() -> {
                   
                })
                .doOnError(error -> {
                   
                }).doFinally(signalType -> {
                   
                });
        return resultFlux;
    }

在map方法中,可以对数据做一些转换,将原始流数据转化为我们需要的格式。这种方法简单、非阻塞,但是不够灵活,至少有两个缺点:

  • map中只能对每个流数据做转化,无法做到过滤数据流,必须要有返回值
  • 无法在流的结尾拼接一些自己想要的流数据

更加灵活的方案:

@PostMapping(value = "/chat")
    public Flux<String> chat(HttpServletResponse response) {
        //构建webClient
        WebClient webClient = WebClient.builder().clientConnector(new ReactorClientHttpConnector(
                HttpClient.create().responseTimeout(Duration.ofSeconds(600)) // 设置超时时间为600秒
        )).baseUrl("").build();
        JSONObject body = new JSONObject();
        body.put("query", "");
        Mono<JSONObject> requestBody = Mono.just(body);
        // 发出POST请求并接收SSE流式响应
        Flux<String> sseFlux = webClient.post()
                .header("Authorization", "") // 添加特定请求头
                .body(requestBody, JSONObject.class)
                .retrieve()
                .bodyToFlux(String.class);
        //引入信号机制
        ReplayProcessor<String> signalBridge = ReplayProcessor.create(0);        //主流
        Flux<String> mainStream = sseFlux
                .<String>handle((message, sink) -> { // handle 结合了 map 和 filter 的能力,可以有条件地发出元素
                    System.out.println(message);
                    sink.next("自定义数据");
                })
                // 可以在这里添加 doOnNext 用于日志等副作用,但主要转换在 handle 中
                .doOnComplete(() -> {
                    signalBridge.onComplete();
                })
                .doOnError(error -> {
                    signalBridge.onError(error);
                });


        // 创建追加流:在 mainStream 完成后执行
        Flux<String> trailerStream = signalBridge
                .thenMany(Flux.defer(() -> {
                    // 收集要发送的额外 SSE 消息
                    List<String> trailerEvents = new ArrayList<>();
                    // 返回包含所有追加事件的 Flux
                    return Flux.fromIterable(trailerEvents);
                }))
                // 处理 trailerStream 内部的错误,避免整个流中断
                .onErrorResume(error -> {
                    return Flux.just("出现异常");
                });

        // 合并主流和追加流
        return mainStream.concatWith(trailerStream);
        
    }

这里引入了信号桥接的办法,ReplayProcessor signalBridge = ReplayProcessor.create(0);  这行代码创建了一个信号桥接器,主流mainStream中使用handle处理流中的数据,他结合了map和filter的能力,可以选择性的将数据转化后通过sink.next(""); 发出,也可以什么都不执行从而过滤掉数据,相比于单纯的map更加灵活;在主流的执行完毕后,会通signalBridge发送完成信号,拼接流trailerStream在接收到完成信号后会执行,其中Flux.fromIterable()可以一次性返回多个数据块,非常适合在主流执行完毕后发送一些收尾数据。

可以用流程图简单描述整个过程:

客户端 → /chat (POST)
         ↓
     构造 WebClient → 调用远端 SSE 接口
         ↓
   得到 Flux<String> sseFlux(原始 SSE 流)
         ↓
   经过 handle 处理 → 输出“自定义数据”
         ↓
   主流完成 → signalBridge 发出 onComplete
         ↓
   trailerStream 被触发 → 发送追加内容(当前为空)
         ↓
   客户端收到:“自定义数据” × N + (可选追加)

五、为什么WebFlux是非阻塞的?

为什么WebFlux是非阻塞的?因为WebFlux底层使用Reactor线程模型,也就是Event Loop,这一节我只想简单讲一下,不做多讲,因为网上关于NIO模型的文章太多了,没必要重复写,我这里只用通俗易懂的语言描述一下,如果读者对各种NIO模型有所了解,那么可以跳过此节了。

在传统的BIO模型(阻塞IO模型)中,当一个网络请求被发起后,主线程会阻塞等待,直到有响应后再继续下一步的业务处理,这种模式浪费资源,大量的资源都浪费在IO等待上了。

Reactor线程模型你可以这样理解:当发起网络请求后,将后续需要进行的业务处理操作注册进去,主线程不需要等待IO,有专门的线程去处理所有的IO事件和事件的分发,又有专门的线程去处理所有的业务事件。

那么这样的模型反应在代码上就是回调函数,当发起一个请求后,你需要把请求结束后需要做的业务处理写到回调函数里,这些回调函数不会立即执行,而是等待IO有结果后才会执行。

反观我们上面写的代码,map(),handle(),doOnComplete(),doOnError()等等都是回调函数,我们传进去的lambda表达式只是作为业务事件注册了进去,并不会立即执行;我们在主线程中直接返回Flux对象后,主线程就是结束了,那些回调函数实际上在其他的线程中执行的。

本文作者:不是好驴
本文链接:https://www.baddonkey.cn/detail/57
版权声明:原创文章,允许转载,转载请注明出处

高谈阔论

留言列表