Nacos集群数据同步方式

2024-12-28 22:50

本文主要是介绍Nacos集群数据同步方式,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

《Nacos集群数据同步方式》文章主要介绍了Nacos集群中服务注册信息的同步机制,涉及到负责节点和非负责节点之间的数据同步过程,以及DistroProtocol协议在同步中的应用...

引言

在Nacos属于集群时,当服务器收到服务注册请求后,发生了ClientEvent.ClientChangedEvent事件,就会触发将注册的服务信息同步给集群中的其他Nacos-server节点。

// DistroClientDataProcessor
private void syncToAllServer(ClientEvent event) {
    Client client = event.getClient();
    // Only ephemeral data sync by Distro, persist client should sync by raft.
    if (null == client || !client.isEphemeral() || !clientManager.isResponsibleClient(client)) {
        return;
    }
    if (event instanceof ClientEvent.ClientDisconnectEvent) {
        DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);
        distroProtocol.sync(distroKey, DataOperation.DELETE);
    } else if (event instanceof ClientEvent.ClientChangedEvent) {
        DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);
        distroProtocol.sync(distroKey, DataOperation.CHANGE);
    }
}

同步时,会涉及到一个负责节点和非负责节点

负责节点(发起同步)

也就是收到客户端事件ClientChangedEvent后负责同步信息给其他非负责节点, 所以这里只能有负责节点来进行同步,非负责节点只能接收同步事件

// DistroClientDataProcessor
// Only ephemeral data sync by Distro, persist client should sync by raft.
if (null == client || !client.isEphemeral() || !clientManager.isResponsibleClient(client)) {
    return;
}

DistroProtocol

Distro是阿里巴巴的私有协议,distro协议是为了注册中心而创造出的协议;

DistroProtocol会循环所有其他nacos节点,提交一个异步任务,这个异步任务会延迟1s,其实这里我们就可以看到这里涉及到客户端的断开和客户端的新增和修改,对于Delete操作,由DistroSyncDeleteTask处理,对于Change操作,由DistroSyncChangeTask处理,这里我们从DistroSyncChangeTask来看

// DistroProtocol
public void sync(DistroKey distroKey, DataOperation action, long delay) {
    for (Member each : memberManager.allMembersWithoutSelf()) {
        syncToTarget(distroKey, action, each.getAddress(), delay);
    }
}

在调用syncToTarget后,会触发任务DistroDelayTaskProcessor处理任务,这是Distro协议的一个默认延迟任务处理器,可以看到。 对于删除类型的任务,触发任务DistroSyncDeleteTask , 对于删除的任务:DistroSyncChangeTask

public class DistroDelayTaskProcessor implements NacosTaskProcessor {
    @Override
    public boolean process(NacosTask task) {
        
        DistroDelayTask distroDelayTask = (DistroDelayTask) task;
        DistroKey distroKey = distroDelayTask.getDistroKey();
        switch (distroDelayTask.getAction()) {
            case DELETE:
                DistroSyncDeleteTask syncDeleteTask = new DistroSyncDeleteTask(distroKey, distroComponentHolder);
                distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, syncDeleteTask);
                return true;
            case CHANGE:
            case ADD:
                DistroSyncChangeTask syncChangeTask = new DistroSyncChangeTask(distroKey, distroComponentHolder);
                distroTaskEpythonngineHolder.getExecuteWorkersManager().addTask(distroKey, syncChangeTask);
                return true;
            default:
                return false;
        }
    }
}

DistroSyncChangeTask

public class DistroSyncChangeTask extends AbstractDistroExecuteTask {
    ...
    
    // 无回调
    @Override
    protected boolean doExecute() {
        String type = getDistroKey().getResourceType();
        DistroData distroData = getDistroData(type);
        if (null == distroData) {
            Loggers.DISTRO.warn("[DISTRO] {} with null data to sync, skip", toString());
            return true;
        }
        return getDistroComponentHolder().findTransportAgent(type)
                .syncData(distroData, getDistroKey().getTargetServer());
    }
    
    // 有回调
    @Override
    protected void doExecuteWithCallback(DistroCallback callback) {
        String type = getDistroKey().getResourceType();
        DistroData distroData = getDistroData(type);
        if (null == distroData) {
            Loggers.DISTRO.warn("[DISTRO] {} with null data to sync, skip", toString());
            return;
        }
        //将得到的数据同步给其他服务节点
        getDistroComponentHolder().findTransportAgent(type)
                .syncData(distroData, getDistroKey().getTargetServer(), callback);
    }
    
    // 从DistroClientDataProcessor获取DistroData
    private DistroData getDistroData(String type) {
        DistroData result = getDistroComponentHolder().findDataStorage(type).getDistroData(getDistroKey());
        if (null != result) {
            result.setType(OPERATION);
        }
        return result;
    }
}

获取同步数据getDistroData

这里获取同步数据其实是从DistroClientDataProcessor 中获取的,所以为Client的相关注册服务信息

// DistroClientDataProcessor extends SmartSubscriber implements DistroDataStorage, DistroDataProcessor
@Override
public DistroData getDistroData(DistroKey distroKey) {
    Client client = clientManager.getClient(distroKey.getResourceKey());
    if (null == client) {
        return null;
    }
    byte[] data = ApplicationUtils.getBean(Serializer.class).serialize(client.generateSyncData());
    return new DistroData(distroKey, data);
}

可以看到generateSyncData 方法是关键获取服务的方法,该方法提供了同步数据,包含Client的注册信息,包括客户端注册了哪些namespace,哪些group,哪些service,哪些instance。

// AbstractClient implements Client 
@Override
public ClientSyncData generateSyncData() {
    List<www.chinasem.cn;String> namespaces = new LinkedList<>();
    List<String> groupNames = new LinkedList<>();
    List<String> serviceNames = new LinkedList<>();
    List<InstancePublishInfo> instances = new LinkedList<>();
    for (Map.Entry<Service, InstancePublishInfo> entry : publishers.entrySet()) {
        namespaces.add(entry.getKey().getNamespace());
        groupNames.add(entry.getKey().getGroup());
        serviceNames.add(entry.getKey().getName());
        instances.add(entry.getValue());
    }
    return new ClientSyncData(getClientId(), namespaces, groupNames, serviceNames, instances);
}

执行同步数据syncData

这里的同步实际是由DistroClientTransportAgent来负责的,将数据分装成DistroDataRequest 然后查询到对于的服务节点Member然后调用asyncRequest异步方法执行同步,后面的方法我就不跟了, 这时我们主要关注非负责节点收到同步请求后如何处理。

// DistroClientTransportAgent
@Override
public void syncData(DistroData data, String targetServer, DistroCallback callback) {
    if (isNoExistTarget(targetServer)) {
        callback.onSuccess();
        return;
    }
    DistroDataRequest request = new DistroDataRequest(data, data.getType());
    Member member = memberManager.find(targetServer);
    try {
        clusterRpcClientProxy.asyncRequest(member, request, new DistroRpcCallbackWrapper(callback, member));
    } catch (NacosException nacosException) {
        callback.onFailed(nacosException);
    }
}

非负责节点(接收请求)

当负责节点将数据发送给非负责节点以后,将要处理发送过来的Client数据。通过DistroController收到数据后, 然后最终会DistroClientDataProcessor.processData方法来进行处理

// DistroController.Java
@PutMapping("/datum")
public ResponseEntity onSyncDatum(@RequestBody Map<String, Datum<Instances>> dataMap) throws Exception {
    ...
    DistroHttpData distroHttpData = new DistroHttpData(createDistroKey(entry.getKey()), entry.getValue());
    distroProtocol.onReceive(distroHttpData);
    ...
}
// DistroClientDataProcessor.java
@Override
public boolean processData(DistroData distroData) {
    switch (distroData.getType()) {
        case ADD:
        case CHANGE:
            ClientSyncData clientSyncData = ApplicationUtils.getBean(Serializer.class)
                    .deserialize(distroData.getContent(), ClientSyncData.class);
            handlerClientSyncData(clientSyncData);
            return true;
        case DELETE:
            String deleteClientId = distroData.getDistroKey().getResourceKey();
            Loggers.DISTRO.info("[Client-Delete] Received distro client sync data {}", deleteClientId);
            clientManager.clientDisconnected(deleteClientId);
            return true;
        default:
            return false;
    }
}

可以看出,这里分别对ADD/CHANGE和DELETE进行了处理,这里我主要关注ADD/CHANGE,所以主要关注handlerClientSyncData方法。

private void handlerClientSyncData(ClientSyncData clientSyncData) {
    Loggers.DISTRO.info("[Client-Add] Received distro client sync data {}", clientSyncData.getClientId());
    // 同步客户端连接,此时如果客户端不存在,则会注册一个非负责节点client,后面就会获取到该客户端操作
    clientManager.syncClientConnected(clientSyncData.getClientId(), clientSyncData.getAttributes());
    // 获取Client(此时注册到的是ConnectionBasedClient)
    Client client = clientManager.getClient(clientSyncData.getClientId());
    // 更新Client数据
    upgradeClient(client, clientSyncData);
}

**注意:**这里要注意下此时的Client实现类ConnectionBasedClient,它的isNative属性为false,这是非负责节点和负责节点的主要区别。

其实判断当前nacos节点是否为负责节点的依据就是这个**isNative属性**,如果是客户端直接注册在这个nacos节点上的ConnectionBasedClient,它的isNative属性为true;如果是由Distro协议,同步到这个nacos节点上的ConnectionBasedClient,它的isNative属性为false。

那其实我们都知道2.x的版本以后使用了长连接,所以**通过长连接建立在哪个节点上,哪个节点就是责任节点,客户端也只会向这个责任节点发送请求**。

DistroClientDataProcessor的upgradeClient方法,更新Client里的注册表信息,发布对应事件

private void upgradeClient(Client client, ClientSyncData clientSyncData) {
    List<String> namespaces = clientSyncData.getNamespaces();
    List<String> groupNames = clientSyncData.getGroupNames();
    List<String> serviceNames = clientSyncData.getServiceNames();
    List<InstancePublishInfo> instances = clientSyncData.getInstancePublishInfos();
    Set<Service> syncedService = new HashSet<>();
    for (int i = 0; i < namespaces.size(); i++) {
        Service service = Service.newService(namespaces.get(i), groupNames.get(i), serviceNames.get(i));
        Service singleton = ServiceManager.getInstance().getSingleton(service);
        syncedService.add(singleton);
        InstancePublishInfo instancePublishInfo = instances.get(i);
        if (!instancePublishInfo.equals(client.getInstancePublishInfo(singleton))) {
            client.addServiceInstance(singleton, instancePublishInfo);
            NotifyCenter.publishEvent(
                new ClientOperationEvent.ClientRegisterServiceEvent(singleton, client.getClientId()));
        }
    }
    for (Service each : client.getAllPublishedService()) {
        if (!syncedService.contains(each)fHuiAw) {
            client.removeServiceInstance(each);
            NotifyCenter.publishEvent(
                new ClientOperationEvent.ClientDeregisterServiceEvent(each, client.getClientId()));
        }
    }
}

Distro协议负责集群数据统一

Distro为了确保集群间数据一致,不仅仅依赖于数据发生改变时的实时同步,后台有定时任务做数据同步。

在1.x版本中,责任节点每5s同步所有Service的Instance列表的摘要(md5)给非责任节点,非责任节点用对端传来的服务md5比对本地服务的md5,如果发生改变js,需要反查责任节点。

在2.x版本中,对这个流程做了改造,责任节点会发送Client全量数据,非责任节点定时检测同步过来的Client是否过期,减少1.x版本中的反查。

责任节点每5s向其他节点发送DataOperation=VERIFY类型的DistroData,来维持非责任节点的Client数据不过期。

// DistroVerifyTimedTask.java
@Override
public void run() {
    // 所有其他节点
    List<Member> targetServer = serverMemberManager.allMembersWithoutSelf();
    for (String eawww.chinasem.cnch : distroComponentHolder.getDataStorageTypes()) {
        // 遍历这些节点发送Client.isNative=true的DistroData,type = VERIFY
        verifyForDataStorage(each, targetServer);
    }
}

非责任节点每5s扫描isNative=false的client,如果client 30s内没有被VERIFY的DistroData更新过续租时间,会删除这个同步过来的Client数据。

//ConnectionBasedClientManager->ExpiredClientCleaner
private static class ExpiredClientCleaner implements Runnable {
    @Override
    public void run() {
        long currentTime = System.currentTimeMillis();
        for (String each : clientManager.allClientId()) {
            ConnectionBasedClient client = (ConnectionBasedClient) clientManager.getClient(each);
            if (null != client && client.isExpire(currentTime)) {
                clientManager.clientDisconnected(each);
            }
        }
    }
} 

// ConnectionBasedClient.java
@Override
public boolean isExpire(long currentTime) {
    // 判断30s内没有续租 认为过期
    return !isNative() && currentTime - getLastRenewTime() > ClientConfig.getInstance().getClientExpiredTime();
}

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持China编程(www.chinasem.cn)。

这篇关于Nacos集群数据同步方式的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1152821

相关文章

Java+AI驱动实现PDF文件数据提取与解析

《Java+AI驱动实现PDF文件数据提取与解析》本文将和大家分享一套基于AI的体检报告智能评估方案,详细介绍从PDF上传、内容提取到AI分析、数据存储的全流程自动化实现方法,感兴趣的可以了解下... 目录一、核心流程:从上传到评估的完整链路二、第一步:解析 PDF,提取体检报告内容1. 引入依赖2. 封装

Spring创建Bean的八种主要方式详解

《Spring创建Bean的八种主要方式详解》Spring(尤其是SpringBoot)提供了多种方式来让容器创建和管理Bean,@Component、@Configuration+@Bean、@En... 目录引言一、Spring 创建 Bean 的 8 种主要方式1. @Component 及其衍生注解

Redis中哨兵机制和集群的区别及说明

《Redis中哨兵机制和集群的区别及说明》Redis哨兵通过主从复制实现高可用,适用于中小规模数据;集群采用分布式分片,支持动态扩展,适合大规模数据,哨兵管理简单但扩展性弱,集群性能更强但架构复杂,根... 目录一、架构设计与节点角色1. 哨兵机制(Sentinel)2. 集群(Cluster)二、数据分片

python中的显式声明类型参数使用方式

《python中的显式声明类型参数使用方式》文章探讨了Python3.10+版本中类型注解的使用,指出FastAPI官方示例强调显式声明参数类型,通过|操作符替代Union/Optional,可提升代... 目录背景python函数显式声明的类型汇总基本类型集合类型Optional and Union(py

MySQL中查询和展示LONGBLOB类型数据的技巧总结

《MySQL中查询和展示LONGBLOB类型数据的技巧总结》在MySQL中LONGBLOB是一种二进制大对象(BLOB)数据类型,用于存储大量的二进制数据,:本文主要介绍MySQL中查询和展示LO... 目录前言1. 查询 LONGBLOB 数据的大小2. 查询并展示 LONGBLOB 数据2.1 转换为十

Linux系统管理与进程任务管理方式

《Linux系统管理与进程任务管理方式》本文系统讲解Linux管理核心技能,涵盖引导流程、服务控制(Systemd与GRUB2)、进程管理(前台/后台运行、工具使用)、计划任务(at/cron)及常用... 目录引言一、linux系统引导过程与服务控制1.1 系统引导的五个关键阶段1.2 GRUB2的进化优

使用SpringBoot+InfluxDB实现高效数据存储与查询

《使用SpringBoot+InfluxDB实现高效数据存储与查询》InfluxDB是一个开源的时间序列数据库,特别适合处理带有时间戳的监控数据、指标数据等,下面详细介绍如何在SpringBoot项目... 目录1、项目介绍2、 InfluxDB 介绍3、Spring Boot 配置 InfluxDB4、I

Java整合Protocol Buffers实现高效数据序列化实践

《Java整合ProtocolBuffers实现高效数据序列化实践》ProtocolBuffers是Google开发的一种语言中立、平台中立、可扩展的结构化数据序列化机制,类似于XML但更小、更快... 目录一、Protocol Buffers简介1.1 什么是Protocol Buffers1.2 Pro

IDEA与MyEclipse代码量统计方式

《IDEA与MyEclipse代码量统计方式》文章介绍在项目中不安装第三方工具统计代码行数的方法,分别说明MyEclipse通过正则搜索(排除空行和注释)及IDEA使用Statistic插件或调整搜索... 目录项目场景MyEclipse代码量统计IDEA代码量统计总结项目场景在项目中,有时候我们需要统计

C#和Unity中的中介者模式使用方式

《C#和Unity中的中介者模式使用方式》中介者模式通过中介者封装对象交互,降低耦合度,集中控制逻辑,适用于复杂系统组件交互场景,C#中可用事件、委托或MediatR实现,提升可维护性与灵活性... 目录C#中的中介者模式详解一、中介者模式的基本概念1. 定义2. 组成要素3. 模式结构二、中介者模式的特点