【Reactor】优雅实现后端向前端的消息主动推送

java / Reactor / 2024-06-12
0 581

背景:有个需求,后端在一个时间点要向前端发送websocket消息,前端点击确认作出应答。为防止用户错误点击或页面刷新消息丢失,需要后端在5分钟内判断应答状态,如果5分钟内没有应答,则每隔10秒向前端推送一次,5分钟内应答了或者超过了5分钟,后端结束推送。

这是一个很简单的需求,起个线程,在5分钟之内循环,判断应答状态,如果没有应答则sleep10秒,重新推送一次websocket消息。这是我首先想到的方案。
伪代码:

public static void main(String[] args) {
        new Thread(() -> {
            long start = System.currentTimeMillis();
            do {
                LogResp logResp = logService.findById(logId);
                if (Status.ON.getStatus().equals(logResp.getStatus())) {
                    break;
                }else {
                    System.out.println("重新推送消息");
                    Thread.sleep(10000);
                }
            } while ((System.currentTimeMillis() - start) / ( 60 * 1000) < 5); // 小于5分钟

        }).start();
    }

恰巧那段时间在看Reactor的东西,就在想用Reactor流式函数的方式能不能实现。然后就试了试!

Reactor官网

需求的关键点在于:5分钟超时结束、5分钟内条件判断结束这两个。

5分钟超时结束可以用Flux.takeWhile满足,而5分钟条件判断结束可以用Flux.takeUntil满足。
所以最后的伪代码:

public static void main(String[] args) {
        Integer retentionPeriod = 5; // 分钟
        AtomicBoolean last = new AtomicBoolean(false);
        Flux.interval(Duration.ofSeconds(10))
                .takeWhile(index -> index < retentionPeriod * 60 / 5)
                .doOnNext(index -> {
                    Long logId = 11L;
//                    LogResp logResp = logService.findById(logId);
//                    if (Status.ON.getStatus().equals(logResp.getStatus())
//                            || retentionPeriod * 60 - (index * 10) <= 0) {
//                        last.set(true);
//                    }else {
//                        long retention = retentionPeriod * 60 - (index * 10);
//                        System.out.println("重新推送消息");
//                    }
                })
                .takeUntil(index -> last.get())
                .doOnError((err) -> {
                    System.out.println("推送异常");
                })
                .onErrorComplete()
                .subscribeOn(Schedulers.parallel())
                .subscribe();
    }