Apache 神禹(shenyu)源码阅读(二)——Admin 向 Gateway 的数据同步(Gateway 端)

2024-02-15 19:20

本文主要是介绍Apache 神禹(shenyu)源码阅读(二)——Admin 向 Gateway 的数据同步(Gateway 端),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

源码版本:2.6.1

前言

上一篇Apache 神禹(shenyu)源码阅读(一)——Admin向Gateway的数据同步(Admin端)写了Admin 端在接收到程序员对 Divide 插件的选择器 Selector 作出新增操作时,Admin 端是如何将要同步的数据发布给 Gateway 端的。

本篇介绍 Gateway 端是如何接收 Admin 端发布的数据的。

本文介绍的数据同步(sync data)在 Shenyu 架构图中的位置

在这里插入图片描述

正文

1. Gateway 端通过网络接收 Admin 端要同步的数据

  • ShenyuWebsocketClient.onMessage()
    由 Admin 端的 WebsocketCollector.send() 通过网络发送数据后(上一篇的内容),Gateway 端的 ShenyuWebsocketClient.onMessage() 收到数据,onMessage() 是 Spring 框架抽象类 WebSocketClient 的一个方法,在 ShenyuWebsocketClient 中实现了这个方法。
public final class ShenyuWebsocketClient extends WebSocketClient {// ...@Overridepublic void onMessage(final String result) {handleResult(result);}private void handleResult(final String result) {// 1. 打印日志LOG.info("handleResult({})", result);// 2. 调用 Gson 包,将 Json 字符串转换为 WebsocketDataWebsocketData<?> websocketData = GsonUtils.getInstance().fromJson(result, WebsocketData.class);// 3. 因为我们是新增的 Selector,所以这里 groupEnum 为 ConfigGroupEnum.SELECTORConfigGroupEnum groupEnum = ConfigGroupEnum.acquireByName(websocketData.getGroupType());// 4. 事件是 UPDATEString eventType = websocketData.getEventType();// 5. 再转成 Json 字符串String json = GsonUtils.getInstance().toJson(websocketData.getData());// 6. 交给 WebsocketDataHandler 处理数据websocketDataHandler.executor(groupEnum, json, eventType);}
}
  • ShenyuWebsocketClient.handleResult()

    如上面那段代码,

    1. 打印日志
    2. 调用 Gson 包,将 Json 字符串转换为 WebsocketData
    3. 因为我们是新增的 Selector,所以这里 groupEnumConfigGroupEnum.SELECTOR
    4. 事件是 UPDATE
    5. 再转成 Json 字符串
    6. 交给 WebsocketDataHandler 处理数据
  • WebsocketDataHandler.executor()

    WebsocketDataHandler 的一个 EnumMap 类型的成员变量存储了 ConfigGroupEnum -> DataHandler 的映射。在 executor 方法里拿到 ConfigGroupEnum 对应的 DataHandler 去处理数据

public class WebsocketDataHandler {// ...private static final EnumMap<ConfigGroupEnum, DataHandler> ENUM_MAP = new EnumMap<>(ConfigGroupEnum.class);    public void executor(final ConfigGroupEnum type, final String json, final String eventType) {ENUM_MAP.get(type).handle(json, eventType);}
}

2 交由 SelectorDataHandler 处理数据

  • DataHandler.handle

    DataHandler 是个接口:

public interface DataHandler {/*** Handle.** @param json  the data for json* @param eventType the event type*/void handle(String json, String eventType);
}

其继承关系如下图:
在这里插入图片描述

  • AbstractDataHandler.handle()

    这里 handle() 用到了一个设计模式——模板方法,里面用到的方法都是交由子类根据自己的逻辑去实现
    事件类型为 UPDATE 和 CREATE 的事件都由 doUpdate 方法处理

public abstract class AbstractDataHandler<T> implements DataHandler {/*** Convert list.** @param json the json* @return the list*/protected abstract List<T> convert(String json);/*** Do refresh.** @param dataList the data list*/protected abstract void doRefresh(List<T> dataList);/*** Do update.** @param dataList the data list*/protected abstract void doUpdate(List<T> dataList);/*** Do delete.** @param dataList the data list*/protected abstract void doDelete(List<T> dataList);@Overridepublic void handle(final String json, final String eventType) {List<T> dataList = convert(json);if (CollectionUtils.isEmpty(dataList)) {return;}DataEventTypeEnum eventTypeEnum = DataEventTypeEnum.acquireByName(eventType);switch (eventTypeEnum) {case REFRESH:case MYSELF:doRefresh(dataList);break;case UPDATE:case CREATE:// 事件类型为 UPDATE 和 CREATE 的事件都由 doUpdate 方法处理doUpdate(dataList);break;case DELETE:doDelete(dataList);break;default:break;}}
}
  • SelectorDataHandler.doUpdate()

    由插件数据订阅者 pluginDataSubscriber 去完成 Selector 数据的订阅和处理

 public class SelectorDataHandler extends AbstractDataHandler<SelectorData> {// ...private final PluginDataSubscriber pluginDataSubscriber;@Overrideprotected void doUpdate(final List<SelectorData> dataList) {dataList.forEach(pluginDataSubscriber::onSelectorSubscribe);}
}
  • CommonPluginDataSubscriber.onSelectorSubscribe()

    CommonPluginDataSubscriber 是 PluginDataSubscriber 的唯一一个实现类:

public class CommonPluginDataSubscriber implements PluginDataSubscriber {// ...@Overridepublic void onSelectorSubscribe(final SelectorData selectorData) {LOG.info("subscribe select data for selector: [id: {}, pluginName: {}, name: {}]", selectorData.getId(), selectorData.getPluginName(), selectorData.getName());subscribeDataHandler(selectorData, DataEventTypeEnum.UPDATE);}private <T> void subscribeDataHandler(final T classData, final DataEventTypeEnum dataType) {if (dataType == DataEventTypeEnum.UPDATE) {Optional.ofNullable(classData)// 如果要更新的数据不为空,则更新缓存数据.ifPresent(data -> updateCacheData(classData));} else if (dataType == DataEventTypeEnum.DELETE) {Optional.ofNullable(classData).ifPresent(data -> removeCacheData(classData));}}private <T> void updateCacheData(@NonNull final T data) {if (data instanceof PluginData) {PluginData pluginData = (PluginData) data;final PluginData oldPluginData = BaseDataCache.getInstance().obtainPluginData(pluginData.getName());BaseDataCache.getInstance().cachePluginData(pluginData);Optional.ofNullable(handlerMap.get(pluginData.getName())).ifPresent(handler -> handler.handlerPlugin(pluginData));// update enabled pluginsPluginHandlerEventEnum state = Boolean.TRUE.equals(pluginData.getEnabled())? PluginHandlerEventEnum.ENABLED : PluginHandlerEventEnum.DISABLED;eventPublisher.publishEvent(new PluginHandlerEvent(state, pluginData));// sorted pluginsortPluginIfOrderChange(oldPluginData, pluginData);final String pluginName = pluginData.getName();// if update plugin, remove selector and rule match cache/trie cacheif (selectorMatchConfig.getCache().getEnabled()) {MatchDataCache.getInstance().removeSelectorData(pluginName);}if (ruleMatchCacheConfig.getCache().getEnabled()) {MatchDataCache.getInstance().removeRuleData(pluginName);}} else if (data instanceof SelectorData) {SelectorData selectorData = (SelectorData) data;// BaseDataCache 缓存BaseDataCache.getInstance().cacheSelectData(selectorData);Optional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.handlerSelector(selectorData));// remove match cacheif (selectorMatchConfig.getCache().getEnabled()) {MatchDataCache.getInstance().removeSelectorData(selectorData.getPluginName(), selectorData.getId());MatchDataCache.getInstance().removeEmptySelectorData(selectorData.getPluginName());}if (ruleMatchCacheConfig.getCache().getEnabled()) {MatchDataCache.getInstance().removeRuleDataBySelector(selectorData.getPluginName(), selectorData.getId());MatchDataCache.getInstance().removeEmptyRuleData(selectorData.getPluginName());}updateSelectorTrieCache(selectorData);} else if (data instanceof RuleData) {RuleData ruleData = (RuleData) data;BaseDataCache.getInstance().cacheRuleData(ruleData);Optional.ofNullable(handlerMap.get(ruleData.getPluginName())).ifPresent(handler -> handler.handlerRule(ruleData));if (ruleMatchCacheConfig.getCache().getEnabled()) {MatchDataCache.getInstance().removeRuleData(ruleData.getPluginName(), ruleData.getId());MatchDataCache.getInstance().removeEmptyRuleData(ruleData.getPluginName());}updateRuleTrieCache(ruleData);}}
}

3. BaseDataCache 根据数据更新缓存

网关的 SELECTOR_MAP 等缓存是由 ConcurrentMap 实现的。

  1. 筛选出不是这个 selectorId 的选择器数据,保存到 resultList 中
  2. 向 resultList 加入要更新的数据。1、2 两步相当于先删除了原 selectorId 的数据,然后再添加进新的数据
  3. 然后将更新后的 selectorData 集合排序
  4. 更新 SELECTOR_MAP
public final class BaseDataCache {// ...private static final ConcurrentMap<String, List<SelectorData>> SELECTOR_MAP = Maps.newConcurrentMap();// 我觉得这个方法名可能是敲错了,应该是 cacheSelectorData 才对public void cacheSelectData(final SelectorData selectorData) {Optional.ofNullable(selectorData).ifPresent(this::selectorAccept);}private void selectorAccept(final SelectorData data) {String key = data.getPluginName();synchronized (SELECTOR_MAP) {if (SELECTOR_MAP.containsKey(key)) {// 存在 key,说明为更新操作List<SelectorData> existList = SELECTOR_MAP.get(key);// 1. 筛选出不是这个 selectorId 的选择器数据,保存到 resultList 中final List<SelectorData> resultList = existList.stream().filter(r -> !r.getId().equals(data.getId())).collect(Collectors.toList());// 2. 向 resultList 加入要更新的数据。1、2 两步相当于先删除了原 selectorId 的数据,然后再添加进新的数据resultList.add(data);// 3. 然后将更新后的 selectorData 集合排序final List<SelectorData> collect = resultList.stream().sorted(Comparator.comparing(SelectorData::getSort)).collect(Collectors.toList());// 4. 更新 SELECTOR_MAPSELECTOR_MAP.put(key, collect);} else {// 不存在 key,说明为新增操作SELECTOR_MAP.put(key, Lists.newArrayList(data));}}}
}

一张图总结

在这里插入图片描述

这篇关于Apache 神禹(shenyu)源码阅读(二)——Admin 向 Gateway 的数据同步(Gateway 端)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/712307

相关文章

MySQL主从同步延迟问题的全面解决方案

《MySQL主从同步延迟问题的全面解决方案》MySQL主从同步延迟是分布式数据库系统中的常见问题,会导致从库读取到过期数据,影响业务一致性,下面我将深入分析延迟原因并提供多层次的解决方案,需要的朋友可... 目录一、同步延迟原因深度分析1.1 主从复制原理回顾1.2 延迟产生的关键环节二、实时监控与诊断方案

Android实现定时任务的几种方式汇总(附源码)

《Android实现定时任务的几种方式汇总(附源码)》在Android应用中,定时任务(ScheduledTask)的需求几乎无处不在:从定时刷新数据、定时备份、定时推送通知,到夜间静默下载、循环执行... 目录一、项目介绍1. 背景与意义二、相关基础知识与系统约束三、方案一:Handler.postDel

Java注解之超越Javadoc的元数据利器详解

《Java注解之超越Javadoc的元数据利器详解》本文将深入探讨Java注解的定义、类型、内置注解、自定义注解、保留策略、实际应用场景及最佳实践,无论是初学者还是资深开发者,都能通过本文了解如何利用... 目录什么是注解?注解的类型内置注编程解自定义注解注解的保留策略实际用例最佳实践总结在 Java 编程

一文教你Python如何快速精准抓取网页数据

《一文教你Python如何快速精准抓取网页数据》这篇文章主要为大家详细介绍了如何利用Python实现快速精准抓取网页数据,文中的示例代码简洁易懂,具有一定的借鉴价值,有需要的小伙伴可以了解下... 目录1. 准备工作2. 基础爬虫实现3. 高级功能扩展3.1 抓取文章详情3.2 保存数据到文件4. 完整示例

使用Java将各种数据写入Excel表格的操作示例

《使用Java将各种数据写入Excel表格的操作示例》在数据处理与管理领域,Excel凭借其强大的功能和广泛的应用,成为了数据存储与展示的重要工具,在Java开发过程中,常常需要将不同类型的数据,本文... 目录前言安装免费Java库1. 写入文本、或数值到 Excel单元格2. 写入数组到 Excel表格

python处理带有时区的日期和时间数据

《python处理带有时区的日期和时间数据》这篇文章主要为大家详细介绍了如何在Python中使用pytz库处理时区信息,包括获取当前UTC时间,转换为特定时区等,有需要的小伙伴可以参考一下... 目录时区基本信息python datetime使用timezonepandas处理时区数据知识延展时区基本信息

Qt实现网络数据解析的方法总结

《Qt实现网络数据解析的方法总结》在Qt中解析网络数据通常涉及接收原始字节流,并将其转换为有意义的应用层数据,这篇文章为大家介绍了详细步骤和示例,感兴趣的小伙伴可以了解下... 目录1. 网络数据接收2. 缓冲区管理(处理粘包/拆包)3. 常见数据格式解析3.1 jsON解析3.2 XML解析3.3 自定义

SpringMVC 通过ajax 前后端数据交互的实现方法

《SpringMVC通过ajax前后端数据交互的实现方法》:本文主要介绍SpringMVC通过ajax前后端数据交互的实现方法,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价... 在前端的开发过程中,经常在html页面通过AJAX进行前后端数据的交互,SpringMVC的controll

Pandas统计每行数据中的空值的方法示例

《Pandas统计每行数据中的空值的方法示例》处理缺失数据(NaN值)是一个非常常见的问题,本文主要介绍了Pandas统计每行数据中的空值的方法示例,具有一定的参考价值,感兴趣的可以了解一下... 目录什么是空值?为什么要统计空值?准备工作创建示例数据统计每行空值数量进一步分析www.chinasem.cn处

如何使用 Python 读取 Excel 数据

《如何使用Python读取Excel数据》:本文主要介绍使用Python读取Excel数据的详细教程,通过pandas和openpyxl,你可以轻松读取Excel文件,并进行各种数据处理操... 目录使用 python 读取 Excel 数据的详细教程1. 安装必要的依赖2. 读取 Excel 文件3. 读