Apache 神禹(shenyu)源码阅读(二)——Admin 向 Gateway 的数据同步(Gateway 端)

2024-02-15 19:20

本文主要是介绍Apache 神禹(shenyu)源码阅读(二)——Admin 向 Gateway 的数据同步(Gateway 端),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

源码版本:2.6.1

前言

上一篇Apache 神禹(shenyu)源码阅读(一)——Admin向Gateway的数据同步(Admin端)写了Admin 端在接收到程序员对 Divide 插件的选择器 Selector 作出新增操作时,Admin 端是如何将要同步的数据发布给 Gateway 端的。

本篇介绍 Gateway 端是如何接收 Admin 端发布的数据的。

本文介绍的数据同步(sync data)在 Shenyu 架构图中的位置

在这里插入图片描述

正文

1. Gateway 端通过网络接收 Admin 端要同步的数据

  • ShenyuWebsocketClient.onMessage()
    由 Admin 端的 WebsocketCollector.send() 通过网络发送数据后(上一篇的内容),Gateway 端的 ShenyuWebsocketClient.onMessage() 收到数据,onMessage() 是 Spring 框架抽象类 WebSocketClient 的一个方法,在 ShenyuWebsocketClient 中实现了这个方法。
public final class ShenyuWebsocketClient extends WebSocketClient {// ...@Overridepublic void onMessage(final String result) {handleResult(result);}private void handleResult(final String result) {// 1. 打印日志LOG.info("handleResult({})", result);// 2. 调用 Gson 包,将 Json 字符串转换为 WebsocketDataWebsocketData<?> websocketData = GsonUtils.getInstance().fromJson(result, WebsocketData.class);// 3. 因为我们是新增的 Selector,所以这里 groupEnum 为 ConfigGroupEnum.SELECTORConfigGroupEnum groupEnum = ConfigGroupEnum.acquireByName(websocketData.getGroupType());// 4. 事件是 UPDATEString eventType = websocketData.getEventType();// 5. 再转成 Json 字符串String json = GsonUtils.getInstance().toJson(websocketData.getData());// 6. 交给 WebsocketDataHandler 处理数据websocketDataHandler.executor(groupEnum, json, eventType);}
}
  • ShenyuWebsocketClient.handleResult()

    如上面那段代码,

    1. 打印日志
    2. 调用 Gson 包,将 Json 字符串转换为 WebsocketData
    3. 因为我们是新增的 Selector,所以这里 groupEnumConfigGroupEnum.SELECTOR
    4. 事件是 UPDATE
    5. 再转成 Json 字符串
    6. 交给 WebsocketDataHandler 处理数据
  • WebsocketDataHandler.executor()

    WebsocketDataHandler 的一个 EnumMap 类型的成员变量存储了 ConfigGroupEnum -> DataHandler 的映射。在 executor 方法里拿到 ConfigGroupEnum 对应的 DataHandler 去处理数据

public class WebsocketDataHandler {// ...private static final EnumMap<ConfigGroupEnum, DataHandler> ENUM_MAP = new EnumMap<>(ConfigGroupEnum.class);    public void executor(final ConfigGroupEnum type, final String json, final String eventType) {ENUM_MAP.get(type).handle(json, eventType);}
}

2 交由 SelectorDataHandler 处理数据

  • DataHandler.handle

    DataHandler 是个接口:

public interface DataHandler {/*** Handle.** @param json  the data for json* @param eventType the event type*/void handle(String json, String eventType);
}

其继承关系如下图:
在这里插入图片描述

  • AbstractDataHandler.handle()

    这里 handle() 用到了一个设计模式——模板方法,里面用到的方法都是交由子类根据自己的逻辑去实现
    事件类型为 UPDATE 和 CREATE 的事件都由 doUpdate 方法处理

public abstract class AbstractDataHandler<T> implements DataHandler {/*** Convert list.** @param json the json* @return the list*/protected abstract List<T> convert(String json);/*** Do refresh.** @param dataList the data list*/protected abstract void doRefresh(List<T> dataList);/*** Do update.** @param dataList the data list*/protected abstract void doUpdate(List<T> dataList);/*** Do delete.** @param dataList the data list*/protected abstract void doDelete(List<T> dataList);@Overridepublic void handle(final String json, final String eventType) {List<T> dataList = convert(json);if (CollectionUtils.isEmpty(dataList)) {return;}DataEventTypeEnum eventTypeEnum = DataEventTypeEnum.acquireByName(eventType);switch (eventTypeEnum) {case REFRESH:case MYSELF:doRefresh(dataList);break;case UPDATE:case CREATE:// 事件类型为 UPDATE 和 CREATE 的事件都由 doUpdate 方法处理doUpdate(dataList);break;case DELETE:doDelete(dataList);break;default:break;}}
}
  • SelectorDataHandler.doUpdate()

    由插件数据订阅者 pluginDataSubscriber 去完成 Selector 数据的订阅和处理

 public class SelectorDataHandler extends AbstractDataHandler<SelectorData> {// ...private final PluginDataSubscriber pluginDataSubscriber;@Overrideprotected void doUpdate(final List<SelectorData> dataList) {dataList.forEach(pluginDataSubscriber::onSelectorSubscribe);}
}
  • CommonPluginDataSubscriber.onSelectorSubscribe()

    CommonPluginDataSubscriber 是 PluginDataSubscriber 的唯一一个实现类:

public class CommonPluginDataSubscriber implements PluginDataSubscriber {// ...@Overridepublic void onSelectorSubscribe(final SelectorData selectorData) {LOG.info("subscribe select data for selector: [id: {}, pluginName: {}, name: {}]", selectorData.getId(), selectorData.getPluginName(), selectorData.getName());subscribeDataHandler(selectorData, DataEventTypeEnum.UPDATE);}private <T> void subscribeDataHandler(final T classData, final DataEventTypeEnum dataType) {if (dataType == DataEventTypeEnum.UPDATE) {Optional.ofNullable(classData)// 如果要更新的数据不为空,则更新缓存数据.ifPresent(data -> updateCacheData(classData));} else if (dataType == DataEventTypeEnum.DELETE) {Optional.ofNullable(classData).ifPresent(data -> removeCacheData(classData));}}private <T> void updateCacheData(@NonNull final T data) {if (data instanceof PluginData) {PluginData pluginData = (PluginData) data;final PluginData oldPluginData = BaseDataCache.getInstance().obtainPluginData(pluginData.getName());BaseDataCache.getInstance().cachePluginData(pluginData);Optional.ofNullable(handlerMap.get(pluginData.getName())).ifPresent(handler -> handler.handlerPlugin(pluginData));// update enabled pluginsPluginHandlerEventEnum state = Boolean.TRUE.equals(pluginData.getEnabled())? PluginHandlerEventEnum.ENABLED : PluginHandlerEventEnum.DISABLED;eventPublisher.publishEvent(new PluginHandlerEvent(state, pluginData));// sorted pluginsortPluginIfOrderChange(oldPluginData, pluginData);final String pluginName = pluginData.getName();// if update plugin, remove selector and rule match cache/trie cacheif (selectorMatchConfig.getCache().getEnabled()) {MatchDataCache.getInstance().removeSelectorData(pluginName);}if (ruleMatchCacheConfig.getCache().getEnabled()) {MatchDataCache.getInstance().removeRuleData(pluginName);}} else if (data instanceof SelectorData) {SelectorData selectorData = (SelectorData) data;// BaseDataCache 缓存BaseDataCache.getInstance().cacheSelectData(selectorData);Optional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.handlerSelector(selectorData));// remove match cacheif (selectorMatchConfig.getCache().getEnabled()) {MatchDataCache.getInstance().removeSelectorData(selectorData.getPluginName(), selectorData.getId());MatchDataCache.getInstance().removeEmptySelectorData(selectorData.getPluginName());}if (ruleMatchCacheConfig.getCache().getEnabled()) {MatchDataCache.getInstance().removeRuleDataBySelector(selectorData.getPluginName(), selectorData.getId());MatchDataCache.getInstance().removeEmptyRuleData(selectorData.getPluginName());}updateSelectorTrieCache(selectorData);} else if (data instanceof RuleData) {RuleData ruleData = (RuleData) data;BaseDataCache.getInstance().cacheRuleData(ruleData);Optional.ofNullable(handlerMap.get(ruleData.getPluginName())).ifPresent(handler -> handler.handlerRule(ruleData));if (ruleMatchCacheConfig.getCache().getEnabled()) {MatchDataCache.getInstance().removeRuleData(ruleData.getPluginName(), ruleData.getId());MatchDataCache.getInstance().removeEmptyRuleData(ruleData.getPluginName());}updateRuleTrieCache(ruleData);}}
}

3. BaseDataCache 根据数据更新缓存

网关的 SELECTOR_MAP 等缓存是由 ConcurrentMap 实现的。

  1. 筛选出不是这个 selectorId 的选择器数据,保存到 resultList 中
  2. 向 resultList 加入要更新的数据。1、2 两步相当于先删除了原 selectorId 的数据,然后再添加进新的数据
  3. 然后将更新后的 selectorData 集合排序
  4. 更新 SELECTOR_MAP
public final class BaseDataCache {// ...private static final ConcurrentMap<String, List<SelectorData>> SELECTOR_MAP = Maps.newConcurrentMap();// 我觉得这个方法名可能是敲错了,应该是 cacheSelectorData 才对public void cacheSelectData(final SelectorData selectorData) {Optional.ofNullable(selectorData).ifPresent(this::selectorAccept);}private void selectorAccept(final SelectorData data) {String key = data.getPluginName();synchronized (SELECTOR_MAP) {if (SELECTOR_MAP.containsKey(key)) {// 存在 key,说明为更新操作List<SelectorData> existList = SELECTOR_MAP.get(key);// 1. 筛选出不是这个 selectorId 的选择器数据,保存到 resultList 中final List<SelectorData> resultList = existList.stream().filter(r -> !r.getId().equals(data.getId())).collect(Collectors.toList());// 2. 向 resultList 加入要更新的数据。1、2 两步相当于先删除了原 selectorId 的数据,然后再添加进新的数据resultList.add(data);// 3. 然后将更新后的 selectorData 集合排序final List<SelectorData> collect = resultList.stream().sorted(Comparator.comparing(SelectorData::getSort)).collect(Collectors.toList());// 4. 更新 SELECTOR_MAPSELECTOR_MAP.put(key, collect);} else {// 不存在 key,说明为新增操作SELECTOR_MAP.put(key, Lists.newArrayList(data));}}}
}

一张图总结

在这里插入图片描述

这篇关于Apache 神禹(shenyu)源码阅读(二)——Admin 向 Gateway 的数据同步(Gateway 端)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/712307

相关文章

SQL Server修改数据库名及物理数据文件名操作步骤

《SQLServer修改数据库名及物理数据文件名操作步骤》在SQLServer中重命名数据库是一个常见的操作,但需要确保用户具有足够的权限来执行此操作,:本文主要介绍SQLServer修改数据... 目录一、背景介绍二、操作步骤2.1 设置为单用户模式(断开连接)2.2 修改数据库名称2.3 查找逻辑文件名

canal实现mysql数据同步的详细过程

《canal实现mysql数据同步的详细过程》:本文主要介绍canal实现mysql数据同步的详细过程,本文通过实例图文相结合给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的... 目录1、canal下载2、mysql同步用户创建和授权3、canal admin安装和启动4、canal

使用SpringBoot整合Sharding Sphere实现数据脱敏的示例

《使用SpringBoot整合ShardingSphere实现数据脱敏的示例》ApacheShardingSphere数据脱敏模块,通过SQL拦截与改写实现敏感信息加密存储,解决手动处理繁琐及系统改... 目录痛点一:痛点二:脱敏配置Quick Start——Spring 显示配置:1.引入依赖2.创建脱敏

Linux实现线程同步的多种方式汇总

《Linux实现线程同步的多种方式汇总》本文详细介绍了Linux下线程同步的多种方法,包括互斥锁、自旋锁、信号量以及它们的使用示例,通过这些同步机制,可以解决线程安全问题,防止资源竞争导致的错误,示例... 目录什么是线程同步?一、互斥锁(单人洗手间规则)适用场景:特点:二、条件变量(咖啡厅取餐系统)工作流

SpringBoot整合Apache Flink的详细指南

《SpringBoot整合ApacheFlink的详细指南》这篇文章主要为大家详细介绍了SpringBoot整合ApacheFlink的详细过程,涵盖环境准备,依赖配置,代码实现及运行步骤,感兴趣的... 目录1. 背景与目标2. 环境准备2.1 开发工具2.2 技术版本3. 创建 Spring Boot

Linux中修改Apache HTTP Server(httpd)默认端口的完整指南

《Linux中修改ApacheHTTPServer(httpd)默认端口的完整指南》ApacheHTTPServer(简称httpd)是Linux系统中最常用的Web服务器之一,本文将详细介绍如何... 目录一、修改 httpd 默认端口的步骤1. 查找 httpd 配置文件路径2. 编辑配置文件3. 保存

Spring Boot 整合 Apache Flink 的详细过程

《SpringBoot整合ApacheFlink的详细过程》ApacheFlink是一个高性能的分布式流处理框架,而SpringBoot提供了快速构建企业级应用的能力,下面给大家介绍Spri... 目录Spring Boot 整合 Apache Flink 教程一、背景与目标二、环境准备三、创建项目 & 添

Mysql的主从同步/复制的原理分析

《Mysql的主从同步/复制的原理分析》:本文主要介绍Mysql的主从同步/复制的原理分析,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录为什么要主从同步?mysql主从同步架构有哪些?Mysql主从复制的原理/整体流程级联复制架构为什么好?Mysql主从复制注意

详解如何使用Python构建从数据到文档的自动化工作流

《详解如何使用Python构建从数据到文档的自动化工作流》这篇文章将通过真实工作场景拆解,为大家展示如何用Python构建自动化工作流,让工具代替人力完成这些数字苦力活,感兴趣的小伙伴可以跟随小编一起... 目录一、Excel处理:从数据搬运工到智能分析师二、PDF处理:文档工厂的智能生产线三、邮件自动化:

Python数据分析与可视化的全面指南(从数据清洗到图表呈现)

《Python数据分析与可视化的全面指南(从数据清洗到图表呈现)》Python是数据分析与可视化领域中最受欢迎的编程语言之一,凭借其丰富的库和工具,Python能够帮助我们快速处理、分析数据并生成高质... 目录一、数据采集与初步探索二、数据清洗的七种武器1. 缺失值处理策略2. 异常值检测与修正3. 数据