Apache 神禹(shenyu)源码阅读(二)——Admin 向 Gateway 的数据同步(Gateway 端)

2024-02-15 19:20

本文主要是介绍Apache 神禹(shenyu)源码阅读(二)——Admin 向 Gateway 的数据同步(Gateway 端),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

源码版本:2.6.1

前言

上一篇Apache 神禹(shenyu)源码阅读(一)——Admin向Gateway的数据同步(Admin端)写了Admin 端在接收到程序员对 Divide 插件的选择器 Selector 作出新增操作时,Admin 端是如何将要同步的数据发布给 Gateway 端的。

本篇介绍 Gateway 端是如何接收 Admin 端发布的数据的。

本文介绍的数据同步(sync data)在 Shenyu 架构图中的位置

在这里插入图片描述

正文

1. Gateway 端通过网络接收 Admin 端要同步的数据

  • ShenyuWebsocketClient.onMessage()
    由 Admin 端的 WebsocketCollector.send() 通过网络发送数据后(上一篇的内容),Gateway 端的 ShenyuWebsocketClient.onMessage() 收到数据,onMessage() 是 Spring 框架抽象类 WebSocketClient 的一个方法,在 ShenyuWebsocketClient 中实现了这个方法。
public final class ShenyuWebsocketClient extends WebSocketClient {// ...@Overridepublic void onMessage(final String result) {handleResult(result);}private void handleResult(final String result) {// 1. 打印日志LOG.info("handleResult({})", result);// 2. 调用 Gson 包,将 Json 字符串转换为 WebsocketDataWebsocketData<?> websocketData = GsonUtils.getInstance().fromJson(result, WebsocketData.class);// 3. 因为我们是新增的 Selector,所以这里 groupEnum 为 ConfigGroupEnum.SELECTORConfigGroupEnum groupEnum = ConfigGroupEnum.acquireByName(websocketData.getGroupType());// 4. 事件是 UPDATEString eventType = websocketData.getEventType();// 5. 再转成 Json 字符串String json = GsonUtils.getInstance().toJson(websocketData.getData());// 6. 交给 WebsocketDataHandler 处理数据websocketDataHandler.executor(groupEnum, json, eventType);}
}
  • ShenyuWebsocketClient.handleResult()

    如上面那段代码,

    1. 打印日志
    2. 调用 Gson 包,将 Json 字符串转换为 WebsocketData
    3. 因为我们是新增的 Selector,所以这里 groupEnumConfigGroupEnum.SELECTOR
    4. 事件是 UPDATE
    5. 再转成 Json 字符串
    6. 交给 WebsocketDataHandler 处理数据
  • WebsocketDataHandler.executor()

    WebsocketDataHandler 的一个 EnumMap 类型的成员变量存储了 ConfigGroupEnum -> DataHandler 的映射。在 executor 方法里拿到 ConfigGroupEnum 对应的 DataHandler 去处理数据

public class WebsocketDataHandler {// ...private static final EnumMap<ConfigGroupEnum, DataHandler> ENUM_MAP = new EnumMap<>(ConfigGroupEnum.class);    public void executor(final ConfigGroupEnum type, final String json, final String eventType) {ENUM_MAP.get(type).handle(json, eventType);}
}

2 交由 SelectorDataHandler 处理数据

  • DataHandler.handle

    DataHandler 是个接口:

public interface DataHandler {/*** Handle.** @param json  the data for json* @param eventType the event type*/void handle(String json, String eventType);
}

其继承关系如下图:
在这里插入图片描述

  • AbstractDataHandler.handle()

    这里 handle() 用到了一个设计模式——模板方法,里面用到的方法都是交由子类根据自己的逻辑去实现
    事件类型为 UPDATE 和 CREATE 的事件都由 doUpdate 方法处理

public abstract class AbstractDataHandler<T> implements DataHandler {/*** Convert list.** @param json the json* @return the list*/protected abstract List<T> convert(String json);/*** Do refresh.** @param dataList the data list*/protected abstract void doRefresh(List<T> dataList);/*** Do update.** @param dataList the data list*/protected abstract void doUpdate(List<T> dataList);/*** Do delete.** @param dataList the data list*/protected abstract void doDelete(List<T> dataList);@Overridepublic void handle(final String json, final String eventType) {List<T> dataList = convert(json);if (CollectionUtils.isEmpty(dataList)) {return;}DataEventTypeEnum eventTypeEnum = DataEventTypeEnum.acquireByName(eventType);switch (eventTypeEnum) {case REFRESH:case MYSELF:doRefresh(dataList);break;case UPDATE:case CREATE:// 事件类型为 UPDATE 和 CREATE 的事件都由 doUpdate 方法处理doUpdate(dataList);break;case DELETE:doDelete(dataList);break;default:break;}}
}
  • SelectorDataHandler.doUpdate()

    由插件数据订阅者 pluginDataSubscriber 去完成 Selector 数据的订阅和处理

 public class SelectorDataHandler extends AbstractDataHandler<SelectorData> {// ...private final PluginDataSubscriber pluginDataSubscriber;@Overrideprotected void doUpdate(final List<SelectorData> dataList) {dataList.forEach(pluginDataSubscriber::onSelectorSubscribe);}
}
  • CommonPluginDataSubscriber.onSelectorSubscribe()

    CommonPluginDataSubscriber 是 PluginDataSubscriber 的唯一一个实现类:

public class CommonPluginDataSubscriber implements PluginDataSubscriber {// ...@Overridepublic void onSelectorSubscribe(final SelectorData selectorData) {LOG.info("subscribe select data for selector: [id: {}, pluginName: {}, name: {}]", selectorData.getId(), selectorData.getPluginName(), selectorData.getName());subscribeDataHandler(selectorData, DataEventTypeEnum.UPDATE);}private <T> void subscribeDataHandler(final T classData, final DataEventTypeEnum dataType) {if (dataType == DataEventTypeEnum.UPDATE) {Optional.ofNullable(classData)// 如果要更新的数据不为空,则更新缓存数据.ifPresent(data -> updateCacheData(classData));} else if (dataType == DataEventTypeEnum.DELETE) {Optional.ofNullable(classData).ifPresent(data -> removeCacheData(classData));}}private <T> void updateCacheData(@NonNull final T data) {if (data instanceof PluginData) {PluginData pluginData = (PluginData) data;final PluginData oldPluginData = BaseDataCache.getInstance().obtainPluginData(pluginData.getName());BaseDataCache.getInstance().cachePluginData(pluginData);Optional.ofNullable(handlerMap.get(pluginData.getName())).ifPresent(handler -> handler.handlerPlugin(pluginData));// update enabled pluginsPluginHandlerEventEnum state = Boolean.TRUE.equals(pluginData.getEnabled())? PluginHandlerEventEnum.ENABLED : PluginHandlerEventEnum.DISABLED;eventPublisher.publishEvent(new PluginHandlerEvent(state, pluginData));// sorted pluginsortPluginIfOrderChange(oldPluginData, pluginData);final String pluginName = pluginData.getName();// if update plugin, remove selector and rule match cache/trie cacheif (selectorMatchConfig.getCache().getEnabled()) {MatchDataCache.getInstance().removeSelectorData(pluginName);}if (ruleMatchCacheConfig.getCache().getEnabled()) {MatchDataCache.getInstance().removeRuleData(pluginName);}} else if (data instanceof SelectorData) {SelectorData selectorData = (SelectorData) data;// BaseDataCache 缓存BaseDataCache.getInstance().cacheSelectData(selectorData);Optional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.handlerSelector(selectorData));// remove match cacheif (selectorMatchConfig.getCache().getEnabled()) {MatchDataCache.getInstance().removeSelectorData(selectorData.getPluginName(), selectorData.getId());MatchDataCache.getInstance().removeEmptySelectorData(selectorData.getPluginName());}if (ruleMatchCacheConfig.getCache().getEnabled()) {MatchDataCache.getInstance().removeRuleDataBySelector(selectorData.getPluginName(), selectorData.getId());MatchDataCache.getInstance().removeEmptyRuleData(selectorData.getPluginName());}updateSelectorTrieCache(selectorData);} else if (data instanceof RuleData) {RuleData ruleData = (RuleData) data;BaseDataCache.getInstance().cacheRuleData(ruleData);Optional.ofNullable(handlerMap.get(ruleData.getPluginName())).ifPresent(handler -> handler.handlerRule(ruleData));if (ruleMatchCacheConfig.getCache().getEnabled()) {MatchDataCache.getInstance().removeRuleData(ruleData.getPluginName(), ruleData.getId());MatchDataCache.getInstance().removeEmptyRuleData(ruleData.getPluginName());}updateRuleTrieCache(ruleData);}}
}

3. BaseDataCache 根据数据更新缓存

网关的 SELECTOR_MAP 等缓存是由 ConcurrentMap 实现的。

  1. 筛选出不是这个 selectorId 的选择器数据,保存到 resultList 中
  2. 向 resultList 加入要更新的数据。1、2 两步相当于先删除了原 selectorId 的数据,然后再添加进新的数据
  3. 然后将更新后的 selectorData 集合排序
  4. 更新 SELECTOR_MAP
public final class BaseDataCache {// ...private static final ConcurrentMap<String, List<SelectorData>> SELECTOR_MAP = Maps.newConcurrentMap();// 我觉得这个方法名可能是敲错了,应该是 cacheSelectorData 才对public void cacheSelectData(final SelectorData selectorData) {Optional.ofNullable(selectorData).ifPresent(this::selectorAccept);}private void selectorAccept(final SelectorData data) {String key = data.getPluginName();synchronized (SELECTOR_MAP) {if (SELECTOR_MAP.containsKey(key)) {// 存在 key,说明为更新操作List<SelectorData> existList = SELECTOR_MAP.get(key);// 1. 筛选出不是这个 selectorId 的选择器数据,保存到 resultList 中final List<SelectorData> resultList = existList.stream().filter(r -> !r.getId().equals(data.getId())).collect(Collectors.toList());// 2. 向 resultList 加入要更新的数据。1、2 两步相当于先删除了原 selectorId 的数据,然后再添加进新的数据resultList.add(data);// 3. 然后将更新后的 selectorData 集合排序final List<SelectorData> collect = resultList.stream().sorted(Comparator.comparing(SelectorData::getSort)).collect(Collectors.toList());// 4. 更新 SELECTOR_MAPSELECTOR_MAP.put(key, collect);} else {// 不存在 key,说明为新增操作SELECTOR_MAP.put(key, Lists.newArrayList(data));}}}
}

一张图总结

在这里插入图片描述

这篇关于Apache 神禹(shenyu)源码阅读(二)——Admin 向 Gateway 的数据同步(Gateway 端)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/712307

相关文章

使用Python开发一个Ditto剪贴板数据导出工具

《使用Python开发一个Ditto剪贴板数据导出工具》在日常工作中,我们经常需要处理大量的剪贴板数据,下面将介绍如何使用Python的wxPython库开发一个图形化工具,实现从Ditto数据库中读... 目录前言运行结果项目需求分析技术选型核心功能实现1. Ditto数据库结构分析2. 数据库自动定位3

pandas数据的合并concat()和merge()方式

《pandas数据的合并concat()和merge()方式》Pandas中concat沿轴合并数据框(行或列),merge基于键连接(内/外/左/右),concat用于纵向或横向拼接,merge用于... 目录concat() 轴向连接合并(1) join='outer',axis=0(2)join='o

Linux线程同步/互斥过程详解

《Linux线程同步/互斥过程详解》文章讲解多线程并发访问导致竞态条件,需通过互斥锁、原子操作和条件变量实现线程安全与同步,分析死锁条件及避免方法,并介绍RAII封装技术提升资源管理效率... 目录01. 资源共享问题1.1 多线程并发访问1.2 临界区与临界资源1.3 锁的引入02. 多线程案例2.1 为

批量导入txt数据到的redis过程

《批量导入txt数据到的redis过程》用户通过将Redis命令逐行写入txt文件,利用管道模式运行客户端,成功执行批量删除以Product*匹配的Key操作,提高了数据清理效率... 目录批量导入txt数据到Redisjs把redis命令按一条 一行写到txt中管道命令运行redis客户端成功了批量删除k

SpringBoot多环境配置数据读取方式

《SpringBoot多环境配置数据读取方式》SpringBoot通过环境隔离机制,支持properties/yaml/yml多格式配置,结合@Value、Environment和@Configura... 目录一、多环境配置的核心思路二、3种配置文件格式详解2.1 properties格式(传统格式)1.

Apache Ignite 与 Spring Boot 集成详细指南

《ApacheIgnite与SpringBoot集成详细指南》ApacheIgnite官方指南详解如何通过SpringBootStarter扩展实现自动配置,支持厚/轻客户端模式,简化Ign... 目录 一、背景:为什么需要这个集成? 二、两种集成方式(对应两种客户端模型) 三、方式一:自动配置 Thick

解决pandas无法读取csv文件数据的问题

《解决pandas无法读取csv文件数据的问题》本文讲述作者用Pandas读取CSV文件时因参数设置不当导致数据错位,通过调整delimiter和on_bad_lines参数最终解决问题,并强调正确参... 目录一、前言二、问题复现1. 问题2. 通过 on_bad_lines=‘warn’ 跳过异常数据3

C#监听txt文档获取新数据方式

《C#监听txt文档获取新数据方式》文章介绍通过监听txt文件获取最新数据,并实现开机自启动、禁用窗口关闭按钮、阻止Ctrl+C中断及防止程序退出等功能,代码整合于主函数中,供参考学习... 目录前言一、监听txt文档增加数据二、其他功能1. 设置开机自启动2. 禁止控制台窗口关闭按钮3. 阻止Ctrl +

java如何实现高并发场景下三级缓存的数据一致性

《java如何实现高并发场景下三级缓存的数据一致性》这篇文章主要为大家详细介绍了java如何实现高并发场景下三级缓存的数据一致性,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 下面代码是一个使用Java和Redisson实现的三级缓存服务,主要功能包括:1.缓存结构:本地缓存:使

Apache Ignite缓存基本操作实例详解

《ApacheIgnite缓存基本操作实例详解》文章介绍了ApacheIgnite中IgniteCache的基本操作,涵盖缓存获取、动态创建、销毁、原子及条件更新、异步执行,强调线程池注意事项,避免... 目录一、获取缓存实例(Getting an Instance of a Cache)示例代码:二、动态