技术大停滞_检测和测试停滞的流– RxJava常见问题解答

2023-11-11 22:20

本文主要是介绍技术大停滞_检测和测试停滞的流– RxJava常见问题解答,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

技术大停滞

技术大停滞

假设您有一个流以不可预测的频率发布事件。 有时您可以预期每秒会有数十条消息,但是偶尔几秒钟都看不到任何事件。 如果流是通过Web套接字,SSE或任何其他网络协议传输的,则可能会出现问题。 静默时间过长(停顿)可以解释为网络问题。 因此,我们经常不时发送人造事件( ping ),以确保:

  • 客户还活着
  • 让客户知道我们还活着

举一个更具体的例子,假设我们有一个Flowable<String>流,它会产生一些事件。 如果没有事件超过一秒钟,我们应该发送一个占位符"PING"消息。 当寂静时间更长时,应该每秒发出一个"PING"消息。 我们如何在RxJava中实现这样的要求? 最明显但不正确的解决方案是将原始流与ping合并:

Flowable<String> events = //...
Flowable<String> pings = Flowable.interval(1, SECONDS).map(x -> "PING");Flowable<String> eventsWithPings = events.mergeWith(pings);

mergeWith()运算符至关重要:它接受真正的events ,并将它们与恒定的ping流合并。 当然,当不存在真正的事件时,将显示"PING"消息。 不幸的是,它们与原始流完全无关。 这意味着即使有很多正常事件,我们也会继续发送ping命令。 而且,当静默开始时,我们不会在一秒钟后精确发送"PING" 。 如果您对这种机制感到满意,则可以在此处停止阅读。

一种更复杂的方法需要发现持续超过1秒的静音。 我们可以使用timeout()运算符。 不幸的是,它会产生TimeoutException并从上游退订-行为过于激进。 我们只想收到某种通知。 事实证明,可以使用debounce()运算符。 通常,此操作员会推迟新事件的发出,以防万一有新事件出现,而又覆盖了旧事件。 所以,如果我说:

Flowable<String> events = //...
Flowable<String> delayed = events.debounce(1, SECONDS);

这意味着delayed流仅在1秒内跟随其他事件的情况下才会发出事件。 如果events流使产生事件的速度足够快,则从技术上讲, delayed可能永远不会发出任何东西。 我们将使用delayed流通过以下方式发现沉默:

Flowable<String> events = //...
Flowable<String> delayed = events.debounce(1, SECONDS);
Flowable<String> pings = delayed.map(ev -> "PING");
Flowable<String> eventsWithPings = Flowable.merge(events, pings);

请记住, mergeWith()与它的static merge()对应项之间没有区别。 所以我们到了某个地方。 如果流繁忙,则delayed流将永远不会收到任何事件,因此不会发送"PING"消息。 但是,当原始流不发送任何事件超过1秒时, delayed接收到最后一次看到的事件,将其忽略并转换为"PING" 。 聪明,但坏了。 此实现仅在发现停顿之后才发送一个"PING" ,而不是每秒发送一次定期ping。 很容易修复! 除了将最后一次看到的事件转换为单个"PING"我们还可以将其转换为周期性的ping序列:

Flowable<String> events = //...
Flowable<String> delayed = events.debounce(1, SECONDS);
Flowable<String> pings = delayed.flatMap(x -> Flowable.interval(0, 1, SECONDS).map(e -> "PING"));
Flowable<String> eventsWithPings = Flowable.merge(events, pings);

您能看到缺陷在哪里吗? 每当原始流中出现一点沉默时,我们就会每秒发出一次ping 。 但是,一旦出现真正的事件,我们应该停止这样做。 我们没有。 上游的每个停顿都会导致新的无限ping流出现在最终合并的流中。 我们必须以某种方式告诉pings流,因为原始流发出了真正的事件,所以它应该停止发出ping 。 猜猜是什么,有takeUntil()运算符可以做到这一点!

Flowable<String> events = //...
Flowable<String> delayed = events.debounce(1, SECONDS);
Flowable<String> pings = delayed.flatMap(x -> Flowable.interval(0, 1, SECONDS).map(e -> "PING").takeUntil(events));
Flowable<String> eventsWithPings = Flowable.merge(events, pings);

花一点时间完全掌握上面的代码片段。 每当原始流上超过1秒没有任何React时, delayed流就会发出一个事件。 pings流发射的序列"PING"每秒从发射每个事件的事件delayed 。 但是,一旦事件出现在events流上,便会终止pings流。 您甚至可以将所有这些定义为单个表达式:

Flowable<String> events = //...
Flowable<String> eventsWithPings = events.mergeWith(events.debounce(1, SECONDS).flatMap(x1 -> Flowable.interval(0, 1, SECONDS).map(e -> "PING").takeUntil(events)));

可测性

好的,我们已经编写了所有这些内容,但是我们应该如何测试事件驱动代码的这个三层嵌套的Blob? 我们如何确保ping在正确的时间出现并在静音结束后停止? 如何模拟各种与时间相关的场景? RxJava具有许多杀手级功能,但是测试时间流逝可能是最大的功能。 首先,让我们的ping代码更具可测试性和通用性:

<T> Flowable<T> withPings(Flowable<T> events, Scheduler clock, T ping) {return events.mergeWith(events.debounce(1, SECONDS, clock).flatMap(x1 -> Flowable.interval(0, 1, SECONDS, clock).map(e -> ping).takeUntil(events)));}

此实用程序方法采用任意的T流并添加ping ,以防该流在较长时间内不产生任何事件。 我们在测试中像这样使用它:

PublishProcessor<String> events = PublishProcessor.create();
TestScheduler clock = new TestScheduler();
Flowable<String> eventsWithPings = withPings(events, clock, "PING");

哦,男孩, PublishProcessorTestSchedulerPublishProcessor是一个有趣的类,它是一个亚型Flowable (所以我们可以使用它作为一个普通的流)。 另一方面,我们可以使用其onNext()方法强制发出事件:

events.onNext("A");

如果有人收听events流,他将立即收到"A"事件。 这clock是怎么回事? RxJava中以任何方式处理时间的每个运算符(例如debounce debounce()interval()timeout()window() )都可以采用可选的Scheduler参数。 它充当时间的外部来源。 特殊的TestScheduler是我们完全控制的人为时间来源。 也就是说,只要我们不显式调用advanceTimeBy()时间就保持静止:

clock.advanceTimeBy(999, MILLISECONDS);

999毫秒不是巧合。 Ping在1秒钟后开始精确显示,因此在999毫秒后将不可见。 现在是时候揭示完整的测试用例了:

@Test
public void shouldAddPings() throws Exception {PublishProcessor<String> events = PublishProcessor.create();final TestScheduler clock = new TestScheduler();final Flowable<String> eventsWithPings = withPings(events, clock, "PING");final TestSubscriber<String> test = eventsWithPings.test();events.onNext("A");test.assertValues("A");clock.advanceTimeBy(999, MILLISECONDS);events.onNext("B");test.assertValues("A", "B");clock.advanceTimeBy(999, MILLISECONDS);test.assertValues("A", "B");clock.advanceTimeBy(1, MILLISECONDS);test.assertValues("A", "B", "PING");clock.advanceTimeBy(999, MILLISECONDS);test.assertValues("A", "B", "PING");events.onNext("C");test.assertValues("A", "B", "PING", "C");clock.advanceTimeBy(1000, MILLISECONDS);test.assertValues("A", "B", "PING", "C", "PING");clock.advanceTimeBy(999, MILLISECONDS);test.assertValues("A", "B", "PING", "C", "PING");clock.advanceTimeBy(1, MILLISECONDS);test.assertValues("A", "B", "PING", "C", "PING", "PING");clock.advanceTimeBy(999, MILLISECONDS);test.assertValues("A", "B", "PING", "C", "PING", "PING");events.onNext("D");test.assertValues("A", "B", "PING", "C", "PING", "PING", "D");clock.advanceTimeBy(999, MILLISECONDS);events.onNext("E");test.assertValues("A", "B", "PING", "C", "PING", "PING", "D", "E");clock.advanceTimeBy(999, MILLISECONDS);test.assertValues("A", "B", "PING", "C", "PING", "PING", "D", "E");clock.advanceTimeBy(1, MILLISECONDS);test.assertValues("A", "B", "PING", "C", "PING", "PING", "D", "E", "PING");clock.advanceTimeBy(3_000, MILLISECONDS);test.assertValues("A", "B", "PING", "C", "PING", "PING", "D", "E", "PING", "PING", "PING", "PING");
}

看起来像一堵墙,但这实际上是我们逻辑的完整测试方案。 它可以确保在1000毫秒后准确地执行ping操作;如果寂静时间很长,则会重复进行ping操作;当出现真正的事件时, ping操作会变得很慢。 但最重要的部分是:该测试是100%可预测的并且非常快。 没有Awaitility ,忙等待,轮询,间歇性测试失败和缓慢。 我们完全控制的人工时钟可确保所有这些组合流均按预期工作。

翻译自: https://www.javacodegeeks.com/2017/09/detecting-testing-stalled-streams-rxjava-faq.html

技术大停滞

这篇关于技术大停滞_检测和测试停滞的流– RxJava常见问题解答的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/393213

相关文章

Java中流式并行操作parallelStream的原理和使用方法

《Java中流式并行操作parallelStream的原理和使用方法》本文详细介绍了Java中的并行流(parallelStream)的原理、正确使用方法以及在实际业务中的应用案例,并指出在使用并行流... 目录Java中流式并行操作parallelStream0. 问题的产生1. 什么是parallelS

Java中Redisson 的原理深度解析

《Java中Redisson的原理深度解析》Redisson是一个高性能的Redis客户端,它通过将Redis数据结构映射为Java对象和分布式对象,实现了在Java应用中方便地使用Redis,本文... 目录前言一、核心设计理念二、核心架构与通信层1. 基于 Netty 的异步非阻塞通信2. 编解码器三、

SpringBoot基于注解实现数据库字段回填的完整方案

《SpringBoot基于注解实现数据库字段回填的完整方案》这篇文章主要为大家详细介绍了SpringBoot如何基于注解实现数据库字段回填的相关方法,文中的示例代码讲解详细,感兴趣的小伙伴可以了解... 目录数据库表pom.XMLRelationFieldRelationFieldMapping基础的一些代

一篇文章彻底搞懂macOS如何决定java环境

《一篇文章彻底搞懂macOS如何决定java环境》MacOS作为一个功能强大的操作系统,为开发者提供了丰富的开发工具和框架,下面:本文主要介绍macOS如何决定java环境的相关资料,文中通过代码... 目录方法一:使用 which命令方法二:使用 Java_home工具(Apple 官方推荐)那问题来了,

Java HashMap的底层实现原理深度解析

《JavaHashMap的底层实现原理深度解析》HashMap基于数组+链表+红黑树结构,通过哈希算法和扩容机制优化性能,负载因子与树化阈值平衡效率,是Java开发必备的高效数据结构,本文给大家介绍... 目录一、概述:HashMap的宏观结构二、核心数据结构解析1. 数组(桶数组)2. 链表节点(Node

Java AOP面向切面编程的概念和实现方式

《JavaAOP面向切面编程的概念和实现方式》AOP是面向切面编程,通过动态代理将横切关注点(如日志、事务)与核心业务逻辑分离,提升代码复用性和可维护性,本文给大家介绍JavaAOP面向切面编程的概... 目录一、AOP 是什么?二、AOP 的核心概念与实现方式核心概念实现方式三、Spring AOP 的关

详解SpringBoot+Ehcache使用示例

《详解SpringBoot+Ehcache使用示例》本文介绍了SpringBoot中配置Ehcache、自定义get/set方式,并实际使用缓存的过程,文中通过示例代码介绍的非常详细,对大家的学习或者... 目录摘要概念内存与磁盘持久化存储:配置灵活性:编码示例引入依赖:配置ehcache.XML文件:配置

Java 虚拟线程的创建与使用深度解析

《Java虚拟线程的创建与使用深度解析》虚拟线程是Java19中以预览特性形式引入,Java21起正式发布的轻量级线程,本文给大家介绍Java虚拟线程的创建与使用,感兴趣的朋友一起看看吧... 目录一、虚拟线程简介1.1 什么是虚拟线程?1.2 为什么需要虚拟线程?二、虚拟线程与平台线程对比代码对比示例:三

Java中的.close()举例详解

《Java中的.close()举例详解》.close()方法只适用于通过window.open()打开的弹出窗口,对于浏览器的主窗口,如果没有得到用户允许是不能关闭的,:本文主要介绍Java中的.... 目录当你遇到以下三种情况时,一定要记得使用 .close():用法作用举例如何判断代码中的 input

Spring Gateway动态路由实现方案

《SpringGateway动态路由实现方案》本文主要介绍了SpringGateway动态路由实现方案,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随... 目录前沿何为路由RouteDefinitionRouteLocator工作流程动态路由实现尾巴前沿S