Soul网关源码分析-19期

2024-02-09 15:59
文章标签 分析 源码 网关 19 soul

本文主要是介绍Soul网关源码分析-19期,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

文章目录

  • 集群下数据同步探究
    • Websocket 表现
    • Websocket 增量更新实现
    • Http 长轮询表现
    • Http 长轮询更新实现
    • Zookeeper 表现
    • Nacos 表现
    • 总结


集群下数据同步探究


昨天配置集群时有个问题我一直惦记着, 集群间同步网关的数据会不会 相互覆盖 ?


在我看来, 后台集群间没有数据交互, 它们的桥梁仅仅是同一个数据库.


所以当后台为集群 A、B , 此时 A 做了信息变更, B 也做了信息变更, 两者的变更数据位置不同, 且后台如果使用缓存来更新数据并传导出去, 存在相互覆盖的可能 ?




Websocket 表现


首先我们来看看最常用的 websocket 模式.

要想验证这个问题, 我需要测试 A、B 集群变动不同数据时, 网关端接收到的信息情况. 开始第一步, 在网关监听处打印数据信息用于验证:

public final class SoulWebsocketClient extends WebSocketClient {@Overridepublic void onMessage(final String result) {log.info("websocket 路径: {}", uri.toString());log.info("传输数据: {}", result);handleResult(result);}
}

紧接着我们分别测试 A、B 改动不同地方的数据
在这里插入图片描述

看看这块网关端的表现, 首先是 A 的修改在网关端触发的日志打印:

2021-02-01 20:48:16.266  INFO 8463 --- [ctReadThread-31] o.d.s.p.s.d.w.c.SoulWebsocketClient      : websocket 路径: ws://localhost:9095/websocket
2021-02-01 20:48:16.267  INFO 8463 --- [ctReadThread-31] o.d.s.p.s.d.w.c.SoulWebsocketClient      : 传输数据: {"groupType":"RULE","eventType":"UPDATE","data":[{"id":"1355090604988162048","name":"/http/test/**","pluginName":"divide","selectorId":"1355090604493234176","matchMode":0,"sort":1,"enabled":false,"loged":true,"handle":"{\"requestVolumeThreshold\":\"0\",\"errorThresholdPercentage\":\"0\",\"maxConcurrentRequests\":\"0\",\"sleepWindowInMilliseconds\":\"0\",\"loadBalance\":\"roundRobin\",\"timeout\":3000,\"retry\":\"0\"}","conditionDataList":[{"paramType":"uri","operator":"match","paramName":"/","paramValue":"/http/test/**"}]}]}

可以看到 "/http/test/**"enabled 属性变为 false .


接着 B 的修改触发的日志打印:

2021-02-01 20:48:21.765  INFO 8463 --- [ctReadThread-36] o.d.s.p.s.d.w.c.SoulWebsocketClient      : websocket 路径: ws://localhost:9096/websocket
2021-02-01 20:48:21.766  INFO 8463 --- [ctReadThread-36] o.d.s.p.s.d.w.c.SoulWebsocketClient      : 传输数据: {"groupType":"RULE","eventType":"UPDATE","data":[{"id":"1355090605491478528","name":"/http/order/save","pluginName":"divide","selectorId":"1355090604493234176","matchMode":0,"sort":1,"enabled":false,"loged":true,"handle":"{\"requestVolumeThreshold\":\"0\",\"errorThresholdPercentage\":\"0\",\"maxConcurrentRequests\":\"0\",\"sleepWindowInMilliseconds\":\"0\",\"loadBalance\":\"random\",\"timeout\":3000,\"retry\":\"0\"}","conditionDataList":[{"paramType":"uri","operator":"\u003d","paramName":"/","paramValue":"/http/order/save"}]}]}

B 的改动仅传给了网关它所改动的数据, 这是 增量更新 , 如果仅仅增量更新, 就能有力证明 websocket 不会导致集群数据相互覆盖了.




Websocket 增量更新实现


再探究下后台如何通过 websocket 增量发送同步数据, 通过断点 admin 端的 DataChangedEventDispatcher 事件分发器, 我们追溯到后台的 Controller 层:

public class RuleController {@PutMapping("/{id}")public SoulAdminResult updateRule(@PathVariable("id") final String id, @RequestBody final RuleDTO ruleDTO) {Objects.requireNonNull(ruleDTO);ruleDTO.setId(id);Integer updateCount = ruleService.createOrUpdate(ruleDTO);return SoulAdminResult.success(SoulResultMessage.UPDATE_SUCCESS, updateCount);}
}

这是个根据 ID 进行特定数据修改的接口, 网页端的修改能精确到某一个数据, 增量同步的基础是接口 ID 隔离方式的更新.


之后的流程便是将特定更新数据传导到 webscoket 管理类 WebscoketController, 由它通知所持有的 session 会话进行增量数据更新.




Http 长轮询表现


我们再来测测长轮询的表现, 是否存在数据覆盖可能.


回顾之前我们对长轮询分析的文章( 后台与网关数据同步(Http长轮询篇 - 网关) ), 找到网关处关键接收信息的方法 HttpLongPollingDataChangedListener#doLongPolling


新增些日志信息便于观测:

public class HttpSyncDataService implements SyncDataService, AutoCloseable {private void doLongPolling(final String server) {// ...if (groupJson != null) {ConfigGroupEnum[] changedGroups = GSON.fromJson(groupJson, ConfigGroupEnum[].class);if (ArrayUtils.isNotEmpty(changedGroups)) {log.info("Group config changed: {}", Arrays.toString(changedGroups));this.doFetchGroupConfig(server, changedGroups);}}}
}

根据之前的实验方式, 当后台 A 配置变动的日志信息:

2021-02-01 21:37:27.299  INFO 9080 --- [-long-polling-1] o.d.s.s.data.http.HttpSyncDataService    : http 路径: http://localhost:90952021-02-01 21:37:27.301  INFO 9080 --- [-long-polling-1] o.d.s.s.data.http.HttpSyncDataService    : 传输数据: [RULE]2021-02-01 21:37:27.301  INFO 9080 --- [-long-polling-1] o.d.s.s.data.http.HttpSyncDataService    : request configs: [http://localhost:9095/configs/fetch?groupKeys=RULE]2021-02-01 21:37:27.325  INFO 9080 --- [-long-polling-1] o.d.s.s.d.h.refresh.AbstractDataRefresh  : update RULE config: {... }

最后面的信息太多, 我将重点数据转换为 json 格式

{"md5": "ab4cbb5760006e4653f4025c7356ccff","lastModifyTime": 1612186647296,"data": [{"id": "1355090604988162048","name": "/http/test/**","pluginName": "divide","selectorId": "1355090604493234176","matchMode": 0,"sort": 1,"enabled": false,"loged": true,"handle": "{\"requestVolumeThreshold\":\"0\",\"errorThresholdPercentage\":\"0\",\"maxConcurrentRequests\":\"0\",\"sleepWindowInMilliseconds\":\"0\",\"loadBalance\":\"roundRobin\",\"timeout\":3000,\"retry\":\"0\"}","conditionDataList": [{"paramType": "uri","operator": "match","paramName": "/","paramValue": "/http/test/**"}]}]
}

从信息可以看到 "/http/test/**"enabled 属性变为 false .

再看看 B 打印的日志内容

2021-02-01 21:37:37.423  INFO 9080 --- [-long-polling-2] o.d.s.s.data.http.HttpSyncDataService    : http 路径: http://localhost:9096
2021-02-01 21:37:37.424  INFO 9080 --- [-long-polling-2] o.d.s.s.data.http.HttpSyncDataService    : 传输数据: [RULE]
2021-02-01 21:37:37.424  INFO 9080 --- [-long-polling-2] o.d.s.s.data.http.HttpSyncDataService    : request configs: [http://localhost:9096/configs/fetch?groupKeys=RULE]
2021-02-01 21:37:37.467  INFO 9080 --- [-long-polling-2] o.d.s.s.d.h.refresh.AbstractDataRefresh  : update RULE config: {... }

同样将数据变动信息中的重点转换成 json 格式:

{"md5": "29173b55dff25770db3b23d634e88a29","lastModifyTime": 1612186657412,"data": [{"id": "1355090604988162048","name": "/http/test/**","pluginName": "divide","selectorId": "1355090604493234176","matchMode": 0,"sort": 1,"enabled": false,"loged": true,"handle": "{\"requestVolumeThreshold\":\"0\",\"errorThresholdPercentage\":\"0\",\"maxConcurrentRequests\":\"0\",\"sleepWindowInMilliseconds\":\"0\",\"loadBalance\":\"roundRobin\",\"timeout\":3000,\"retry\":\"0\"}","conditionDataList": [{"paramType": "uri","operator": "match","paramName": "/","paramValue": "/http/test/**"}]},{"id": "1355090605491478528","name": "/http/order/save","pluginName": "divide","selectorId": "1355090604493234176","matchMode": 0,"sort": 1,"enabled": false,"loged": true,"handle": "{\"requestVolumeThreshold\":\"0\",\"errorThresholdPercentage\":\"0\",\"maxConcurrentRequests\":\"0\",\"sleepWindowInMilliseconds\":\"0\",\"loadBalance\":\"random\",\"timeout\":3000,\"retry\":\"0\"}","conditionDataList": [{"paramType": "uri","operator": "=","paramName": "/","paramValue": "/http/order/save"}]}]
}

可以看到 "/http/test/**" 的状态 与 "/http/order/save" 的状态都是 false . 也就是说后台 B 的数据更新并没有导致 A 的更新在网关端被覆盖.


至此可以证明使用 Http 长轮询不会导致数据相互覆盖.




Http 长轮询更新实现


为什么 Http 长轮询的集群数据更新不会导致数据覆盖呢? 这还要从 Http 长轮询同步机制说起.

  • 长轮询方式中, 后台的数据变动仅会传递给网关监听方法少量数据, 这个数据就是 变动的元数据类型
  • 网关端接收到变动通知后, 请求后台的 /config/fetch?[数据类型] 接口, 主动拉取特定类型数据

那么现在问题就变成: 后台接收到请求后, 如何返回网关最新的数据?


仅仅集群下某个节点的缓存数据肯定不是最新的, 所以肯定是要拉取数据库中信息的. 我们找到后台这边/config/fetch? 对应类探究一二.


首先找到 ConfigController 类, 其中包括拉取数据的方法

public class ConfigController {@GetMapping("/fetch")public SoulAdminResult fetchConfigs(@NotNull final String[] groupKeys) {Map<String, ConfigData<?>> result = Maps.newHashMap();// 根据不同 groupKey 查找数据并返回for (String groupKey : groupKeys) {ConfigData<?> data = longPollingListener.fetchConfig(ConfigGroupEnum.valueOf(groupKey));result.put(groupKey, data);}return SoulAdminResult.success(SoulResultMessage.SUCCESS, result);}
}
public abstract class AbstractDataChangedListener implements DataChangedListener, InitializingBean {public ConfigData<?> fetchConfig(final ConfigGroupEnum groupKey) {// 仅仅取了缓存数据?ConfigDataCache config = CACHE.get(groupKey.name());switch (groupKey) {// ...case RULE:List<RuleData> ruleList = GsonUtils.getGson().fromJson(config.getJson(), new TypeToken<List<RuleData>>() {}.getType());return new ConfigData<>(config.getMd5(), config.getLastModifyTime(), ruleList);// ...}}
}

到这我有些傻眼了, 不是按我想的在同步时返回数据库中信息.


不过接着向上断点探索, 终于在 网页端触发后台数据更新 这块发现问题.


HTTP 轮询中的通知是沿用的 AbstractDataChangedListener#onRuleChanged 等方法, 而这些方法会重新刷新缓存.

public abstract class AbstractDataChangedListener implements DataChangedListener, InitializingBean {@Overridepublic void onRuleChanged(final List<RuleData> changed, final DataEventTypeEnum eventType) {if (CollectionUtils.isEmpty(changed)) {return;}// 刷新缓存this.updateRuleCache();this.afterRuleChanged(changed, eventType);}protected void updateRuleCache() {// 获取数据库中所有 rule 的信息并放入 CACHE 缓存中this.updateCache(ConfigGroupEnum.RULE, ruleService.listAll());}protected <T> void updateCache(final ConfigGroupEnum group, final List<T> data) {String json = GsonUtils.getInstance().toJson(data);ConfigDataCache newVal = new ConfigDataCache(group.name(), json, Md5Utils.md5(json), System.currentTimeMillis());ConfigDataCache oldVal = CACHE.put(newVal.getGroup(), newVal);log.info("update config cache[{}], old: {}, updated: {}", group, oldVal, newVal);}
}

梳理下后台 B 同步流程图:

时序1
后台A页面配置变动
数据库更新
省略...
时序2
后台B页面配置变动
查询数据库
更新后台B缓存
通知网关数据变动
时序3
网关接收数据变动
请求后台B
后台B返回缓存信息

整个流程步骤是按照 时序1 -> 时序2 -> 时序3 , 正是由于时序2 中后台B配置变动时会重新刷新相关变动元数据类型的所有数据, 保证缓存中数据是最新的. 在时序3中网关请求时才能返回最新的数据.




Zookeeper 表现


来测测 Zookeeper 下网关同步的表现, 是否存在数据覆盖可能.


由于网关端仅是与 Zookeeper 有数据交互, 所以引起覆盖可能性的地方, 仅可能是后台传输数据到 Zookeeper.


找到后台的事件分发器 DataChangedEventDispatcher, 这块 Zookeeper 的监听类为 ZookeeperDataChangedListener.


由于后台的修改是区分为事件类型进行通知的, 我们找到 Rule 事件对应的方法看看实现代码:

public class ZookeeperDataChangedListener implements DataChangedListener {@Overridepublic void onRuleChanged(final List<RuleData> changed, final DataEventTypeEnum eventType) {// ...for (RuleData data : changed) {String ruleRealPath = ZkPathConstants.buildRulePath(data.getPluginName(), data.getSelectorId(), data.getId());if (eventType == DataEventTypeEnum.DELETE) {deleteZkPath(ruleRealPath);continue;}String ruleParentPath = ZkPathConstants.buildRuleParentPath(data.getPluginName());createZkNode(ruleParentPath);// 写入数据 datainsertZkNode(ruleRealPath, data);}}
}

我们将 divide 的 /http/order/save 路径关闭, 查看 data 中的值

RuleData(id=1355090605491478528, name=/http/order/save, pluginName=divide, selectorId=1355090604493234176, matchMode=0, sort=1, enabled=false, loged=true, handle={"requestVolumeThreshold":"0","errorThresholdPercentage":"0","maxConcurrentRequests":"0","sleepWindowInMilliseconds":"0","loadBalance":"random","timeout":3000,"retry":"0"}, conditionDataList=[ConditionData(paramType=uri, operator==, paramName=/, paramValue=/http/order/save)])

可以发现, 后台对于 Zookeeper 的更新是增量的, 所以多个后台组成的集群环境下, 更新数据不会在 Zookeeper 端产生覆盖现象.


网关端是面向 Zookeeper 同步数据, 并没有直接关联后台集群, 所以这种模式下网关也不会有覆盖现象.




Nacos 表现


和 Zookeeper 一样, 使用 Nacos 同步时网关也不会直接面对后台集群, 所以只需保证后台集群对 Nacos 的更新没有覆盖问题即可.


找到关键监听类 NacosDataChangedListener

public class NacosDataChangedListener implements DataChangedListener {@Overridepublic void onRuleChanged(final List<RuleData> changed, final DataEventTypeEnum eventType) {// 从 Nacos 拉取最新数据并更新到缓存updateRuleMap(getConfig(NacosPathConstants.RULE_DATA_ID));switch (eventType) {// ...default:changed.forEach(rule -> {// MAP 集合中剔除变动的数据, 其余数据保留List<RuleData> ls = RULE_MAP.getOrDefault(rule.getSelectorId(), new ArrayList<>()).stream().filter(s -> !s.getId().equals(rule.getId())).collect(Collectors.toList());// 加入变动的数据, 构成一个最新缓存ls.add(rule);ls.sort(RULE_DATA_COMPARATOR);// 全量 RULE 缓存重置RULE_MAP.put(rule.getSelectorId(), ls);});break;}// 推送 RULE 类型的全量数据到 NacospublishConfig(NacosPathConstants.RULE_DATA_ID, RULE_MAP);}@SneakyThrowsprivate String getConfig(final String dataId) {// 从 Nacos 中获取数据String config = configService.getConfig(dataId, NacosPathConstants.GROUP, NacosPathConstants.DEFAULT_TIME_OUT);return StringUtils.hasLength(config) ? config : NacosPathConstants.EMPTY_CONFIG_DEFAULT_VALUE;}private void updateRuleMap(final String configInfo) {JsonObject jo = GsonUtils.getInstance().fromJson(configInfo, JsonObject.class);Set<String> set = new HashSet<>(RULE_MAP.keySet());for (Entry<String, JsonElement> e : jo.entrySet()) {set.remove(e.getKey());List<RuleData> ls = new ArrayList<>();e.getValue().getAsJsonArray().forEach(je -> ls.add(GsonUtils.getInstance().fromJson(je, RuleData.class)));// 将最新的数据放入缓存RULE_MAP.put(e.getKey(), ls);}RULE_MAP.keySet().removeAll(set);}
}

根据我们的分析, Nacos 虽然是将某一类型的数据 (比如 RULE) 从缓存中全部发布到 Nacos 中. 但在推送前, 会从 Nacos 中获取到最新数据, 这时就获取到其他集群节点变动的数据 , 以此保证不会出现覆盖其他节点数据的问题.


附上一个流程图说明 后台更新数据 的情况:

网页端更新
发送通知Nacos监听器
从Nacos拉取最新数据更新缓存
将变动增量写入缓存
缓存全部推送Nacos

Nacos 同步时, 后台通过推送前从 Nacos 获取数据, 达到节点间不覆盖的目的. 而网关端仅针对 Nacos 更新, 保证了此种方式下的数据正确性.




总结


集群下各种同步方式均可以保证 节点间数据变动同步给网关时不会相互影响, 造成数据覆盖, 它们的实现方式各不相同.

  • Websocket 模式下, 通过精准的 增量更新 , 保证集群间同步给网关不相同数据时, 不会带上它们的过时数据
  • Http 长轮询模式下, 每个节点在接收网页端变动信息时, 不仅变更自身相应数据的缓存, 也会 查询数据库, 更新相应类型缓存的所有数据. 以此保证通知网关时不会传出过时数据
  • Zookeeper 模式下, 后台对于 Zookeeper 的数据更新也是 增量更新 的, 这点和 Websocket 很像.
  • Nacos 模式下, 后台在相应类型缓存全量推送 Nacos 前, 会先 查询Nacos配置 并更新缓存, 依次保证不会给 Nacos 传入过时数据.

这篇关于Soul网关源码分析-19期的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!


原文地址:https://blog.csdn.net/zm469568595/article/details/113574299
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.chinasem.cn/article/694665

相关文章

8种快速易用的Python Matplotlib数据可视化方法汇总(附源码)

《8种快速易用的PythonMatplotlib数据可视化方法汇总(附源码)》你是否曾经面对一堆复杂的数据,却不知道如何让它们变得直观易懂?别慌,Python的Matplotlib库是你数据可视化的... 目录引言1. 折线图(Line Plot)——趋势分析2. 柱状图(Bar Chart)——对比分析3

Dubbo之SPI机制的实现原理和优势分析

《Dubbo之SPI机制的实现原理和优势分析》:本文主要介绍Dubbo之SPI机制的实现原理和优势,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录Dubbo中SPI机制的实现原理和优势JDK 中的 SPI 机制解析Dubbo 中的 SPI 机制解析总结Dubbo中

C#继承之里氏替换原则分析

《C#继承之里氏替换原则分析》:本文主要介绍C#继承之里氏替换原则,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录C#里氏替换原则一.概念二.语法表现三.类型检查与转换总结C#里氏替换原则一.概念里氏替换原则是面向对象设计的基本原则之一:核心思想:所有引py

基于Go语言实现Base62编码的三种方式以及对比分析

《基于Go语言实现Base62编码的三种方式以及对比分析》Base62编码是一种在字符编码中使用62个字符的编码方式,在计算机科学中,,Go语言是一种静态类型、编译型语言,它由Google开发并开源,... 目录一、标准库现状与解决方案1. 标准库对比表2. 解决方案完整实现代码(含边界处理)二、关键实现细

PostgreSQL 序列(Sequence) 与 Oracle 序列对比差异分析

《PostgreSQL序列(Sequence)与Oracle序列对比差异分析》PostgreSQL和Oracle都提供了序列(Sequence)功能,但在实现细节和使用方式上存在一些重要差异,... 目录PostgreSQL 序列(Sequence) 与 oracle 序列对比一 基本语法对比1.1 创建序

Android实现一键录屏功能(附源码)

《Android实现一键录屏功能(附源码)》在Android5.0及以上版本,系统提供了MediaProjectionAPI,允许应用在用户授权下录制屏幕内容并输出到视频文件,所以本文将基于此实现一个... 目录一、项目介绍二、相关技术与原理三、系统权限与用户授权四、项目架构与流程五、环境配置与依赖六、完整

Android实现定时任务的几种方式汇总(附源码)

《Android实现定时任务的几种方式汇总(附源码)》在Android应用中,定时任务(ScheduledTask)的需求几乎无处不在:从定时刷新数据、定时备份、定时推送通知,到夜间静默下载、循环执行... 目录一、项目介绍1. 背景与意义二、相关基础知识与系统约束三、方案一:Handler.postDel

慢sql提前分析预警和动态sql替换-Mybatis-SQL

《慢sql提前分析预警和动态sql替换-Mybatis-SQL》为防止慢SQL问题而开发的MyBatis组件,该组件能够在开发、测试阶段自动分析SQL语句,并在出现慢SQL问题时通过Ducc配置实现动... 目录背景解决思路开源方案调研设计方案详细设计使用方法1、引入依赖jar包2、配置组件XML3、核心配

Java NoClassDefFoundError运行时错误分析解决

《JavaNoClassDefFoundError运行时错误分析解决》在Java开发中,NoClassDefFoundError是一种常见的运行时错误,它通常表明Java虚拟机在尝试加载一个类时未能... 目录前言一、问题分析二、报错原因三、解决思路检查类路径配置检查依赖库检查类文件调试类加载器问题四、常见

Python中的Walrus运算符分析示例详解

《Python中的Walrus运算符分析示例详解》Python中的Walrus运算符(:=)是Python3.8引入的一个新特性,允许在表达式中同时赋值和返回值,它的核心作用是减少重复计算,提升代码简... 目录1. 在循环中避免重复计算2. 在条件判断中同时赋值变量3. 在列表推导式或字典推导式中简化逻辑