ZooKeeper 实战(四) Curator Watch事件监听

2024-01-13 18:36

本文主要是介绍ZooKeeper 实战(四) Curator Watch事件监听,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

文章目录

  • ZooKeeper 实战(四) Curator Watch事件监听
    • 0.前言
    • 1.Watch 事件监听概念
    • 2.NodeCache
      • 2.1.全参构造器参数
      • 2.2.代码DEMO
      • 2.3.日志输出
    • 3.PathChildrenCache
      • 3.1.全参构造器参数
      • 3.2.子节点监听时间类型
      • 3.2.代码DEMO
    • 4.TreeCache
      • 4.1.构造器参数
      • 4.2.代码DEMO
      • 4.3.日志输出

ZooKeeper 实战(四) Curator Watch事件监听

0.前言

上一篇博客只介绍了有关Curator中对ZNode的CRUD操作,从本篇起开始逐步介绍更加高级的API操作。

1.Watch 事件监听概念

ZooKeeper 中引入了Watcher机制来实现了发布/订阅功能能,能够让多个订阅者同时监听某一个对象,当一个对象自身状态变化时,会通知所有订阅者。虽然ZooKeeper原生支持通过注册Watcher来进行事件监听,但是其使用并不是特别方便,需要开发人员反复注册Watcher,比较繁琐。

而 Curator 引入了Cache 来实现对 ZooKeeper 服务端事件的监听。

Curator 中提供了三种 Cache(Watcher)来监听不同节点变化类型:

  • NodeCache:监听指定的节点。
  • PathChildrenCache:监听指定节点的子节点。
  • TreeCache:监听指定节点及其子孙节点。

2.NodeCache

监听指定的节点,增删改都会监听。

2.1.全参构造器参数

/*** @param: client 注册监听的客户端* @param: path 节点路径* @param: dataIsCompressed 是否开启数据压缩。传递的数据会进行压缩,传递速度快,但取数据时需要把压缩的数据进行转换,默认为false*/
public NodeCache(CuratorFramework client, String path, boolean dataIsCompressed);

2.2.代码DEMO

    @Overridepublic void run(ApplicationArguments args) throws Exception {log.info("。。。。。。。。。。。。。容器初始化完毕。。。。。。。。。。。。。。");String path = "/ahao/watcher";TimeUnit.SECONDS.sleep(3);// 创建NodeCache对象NodeCache nodeCache = new NodeCache(client,path);// 添加监听器nodeCache.getListenable().addListener(new NodeCacheListener() {@Overridepublic void nodeChanged() throws Exception {ChildData currentData = nodeCache.getCurrentData();if (currentData != null){String s = new String(currentData.getData(),StandardCharsets.UTF_8);log.info("监听{}节点发生变化,数据内容:{}",path,s);}else {log.info("监听{}节点被删除了",path);}}});// 开启监听nodeCache.start();TimeUnit.SECONDS.sleep(2);// 创建节点client.create().creatingParentsIfNeeded().forPath(path,"第一次新增".getBytes(StandardCharsets.UTF_8));TimeUnit.SECONDS.sleep(2);// 更新节点client.setData().forPath(path,"数据修改了".getBytes(StandardCharsets.UTF_8));TimeUnit.SECONDS.sleep(2);// 删除节点client.delete().deletingChildrenIfNeeded().forPath(path);}

2.3.日志输出

在这里插入图片描述

3.PathChildrenCache

监听指定节点的子节点。当一个子节点增删改时, PathChildrenCache会包含最新的子节点的数据和状态。

3.1.全参构造器参数

/*** @param: client 注册监听的客户端* @param: path 节点路径* @param: cacheData 是否缓存节点内容(包含节点状态)* @param: dataIsCompressed 是否开启数据压缩。传递的数据会进行压缩,传递速度快,但取数据时需要把压缩的数据进行转换,默认为false* @param: executorService 用于PathChildrenCache的后台线程的线程池。该线程池应该是单线程的,否则缓存可能会看到不一致的结果*/
public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, final CloseableExecutorService executorService)

3.2.子节点监听时间类型

public enum Type
{// 子节点添加CHILD_ADDED,// 子节点的数据变更CHILD_UPDATED,// 子节点被删除CHILD_REMOVED,// 以下三个事件类型表示:当连接断开时,PathChildrenCache将继续保持其断开连接之前的状态,并且在连接恢复后,PathChildrenCache将为断开连接期间发生的所有添加、删除和更新发出正常的子事件。// 当连接状态处于ConnectionState.SUSPENDED。CONNECTION_SUSPENDED,// 当连接状态处于ConnectionState.RECONNECTEDCONNECTION_RECONNECTED,// 当连接状态处于ConnectionState.LOSTCONNECTION_LOST,// 当通过PathChildrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT)启动监听时,该事件表示PathChildrenCache初始化完成This event signals that the initial cache has been populated.INITIALIZED
}

3.2.代码DEMO

    @Overridepublic void run(ApplicationArguments args) throws Exception {log.info("。。。。。。。。。。。。。容器初始化完毕。。。。。。。。。。。。。。");String path = "/ahao/watcher";TimeUnit.SECONDS.sleep(3);// 创建PathChildrenCache对象// 此处的cacheData参数一定要设置为true,不然Curator不会缓存数据当本地,// 那么后续pathChildrenCache.getCurrentData()得到的数据都为nullPathChildrenCache pathChildrenCache = new PathChildrenCache(client,path,true);// 添加监听器pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {@Overridepublic void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {if (event.getType() == PathChildrenCacheEvent.Type.INITIALIZED){log.info("PathChildrenCache初始化完,事件类型:{}", event.getType());}else {ChildData currentData = event.getData();log.info("事件类型:{},监听到的子节点发生变化:{}",event.getType(),currentData.getPath());}}});// 开启监听pathChildrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);// 创建子节点TimeUnit.SECONDS.sleep(2);client.create().creatingParentsIfNeeded().forPath(path+"/c1");client.create().creatingParentsIfNeeded().forPath(path+"/c2");client.create().creatingParentsIfNeeded().forPath(path+"/c3/age");// 修改子节点TimeUnit.SECONDS.sleep(2);client.setData().forPath(path+"/c1","c1更新了".getBytes(StandardCharsets.UTF_8));client.setData().forPath(path+"/c2","c2更新了".getBytes(StandardCharsets.UTF_8));// 删除子节点TimeUnit.SECONDS.sleep(2);client.delete().deletingChildrenIfNeeded().forPath(path+"/c3");}

3.3.日志输出

可以看出,PathChildrenCache只会监听直属子节点的变化,其非直属子节点的后代节点如/c3/age,没有发布通知。

在这里插入图片描述

4.TreeCache

监听指定节点及其子孙节点。

4.1.构造器参数

/*** @param: client 注册监听的客户端* @param: path 节点路径*/
public TreeCache(CuratorFramework client, String path)/*** @param: client 注册监听的客户端* @param: path 节点路径* @param: cacheData 是否缓存节点内容(包含节点状态)* @param: dataIsCompressed 是否开启数据压缩。传递的数据会进行压缩,传递速度快,但取数据时需要把压缩的数据进行转换,默认为false* @param: maxDepth 最大深度。最深的那个后代节点到path所需要经过的节点数* @param: executorService 用于PathChildrenCache的后台线程的线程池。该线程池应该是单线程的,否则缓存可能会看到不一致的结果* @param: createParentNodes 是否需要创建父节点。如果父节点不存在泽创建父节点(容器节点)* @param: TreeCacheSelector TreeCache选择器。根据指定的策略和条件,选择适合的缓存树来创建和维护TreeCache*/
TreeCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, int maxDepth, final ExecutorService executorService, boolean createParentNodes, TreeCacheSelector selector)

4.2.代码DEMO

    @Overridepublic void run(ApplicationArguments args) throws Exception {log.info("。。。。。。。。。。。。。容器初始化完毕。。。。。。。。。。。。。。");String path = "/ahao/watcher/tree";TimeUnit.SECONDS.sleep(3);// 创建TreeCache对象,也可通过TreeCache.newBuilder()创建TreeCache treeCache = new TreeCache(client,path);treeCache.getListenable().addListener(new TreeCacheListener() {@Overridepublic void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {if (event.getType() == TreeCacheEvent.Type.INITIALIZED){log.info("TreeCache初始化完,事件类型:{}", event.getType());}else {ChildData currentData = event.getData();log.info("事件类型:{},监听到的子节点发生变化:{}",event.getType(),currentData.getPath());}}});// 开启监听treeCache.start();// 创建节点TimeUnit.SECONDS.sleep(2);client.create().creatingParentsIfNeeded().forPath(path);client.create().creatingParentsIfNeeded().forPath(path +"/t1");client.create().creatingParentsIfNeeded().forPath(path +"/t2/ccc");// 修改子节点TimeUnit.SECONDS.sleep(2);client.setData().forPath(path,"根节点更新了".getBytes(StandardCharsets.UTF_8));client.setData().forPath(path +"/t2/ccc","/t2/ccc更新了".getBytes(StandardCharsets.UTF_8));// 删除子节点TimeUnit.SECONDS.sleep(2);client.delete().deletingChildrenIfNeeded().forPath(path +"/t2");}

4.3.日志输出

可以看出TreeCache会监听当前节点和后代节点的变化。

在这里插入图片描述

这篇关于ZooKeeper 实战(四) Curator Watch事件监听的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/602407

相关文章

Spring Boot 整合 SSE(Server-Sent Events)实战案例(全网最全)

《SpringBoot整合SSE(Server-SentEvents)实战案例(全网最全)》本文通过实战案例讲解SpringBoot整合SSE技术,涵盖实现原理、代码配置、异常处理及前端交互,... 目录Spring Boot 整合 SSE(Server-Sent Events)1、简述SSE与其他技术的对

MyBatis-Plus 与 Spring Boot 集成原理实战示例

《MyBatis-Plus与SpringBoot集成原理实战示例》MyBatis-Plus通过自动配置与核心组件集成SpringBoot实现零配置,提供分页、逻辑删除等插件化功能,增强MyBa... 目录 一、MyBATis-Plus 简介 二、集成方式(Spring Boot)1. 引入依赖 三、核心机制

MySQL 数据库表操作完全指南:创建、读取、更新与删除实战

《MySQL数据库表操作完全指南:创建、读取、更新与删除实战》本文系统讲解MySQL表的增删查改(CURD)操作,涵盖创建、更新、查询、删除及插入查询结果,也是贯穿各类项目开发全流程的基础数据交互原... 目录mysql系列前言一、Create(创建)并插入数据1.1 单行数据 + 全列插入1.2 多行数据

MySQL 数据库表与查询操作实战案例

《MySQL数据库表与查询操作实战案例》本文将通过实际案例,详细介绍MySQL中数据库表的设计、数据插入以及常用的查询操作,帮助初学者快速上手,感兴趣的朋友跟随小编一起看看吧... 目录mysql 数据库表操作与查询实战案例项目一:产品相关数据库设计与创建一、数据库及表结构设计二、数据库与表的创建项目二:员

从基础到高阶详解Python多态实战应用指南

《从基础到高阶详解Python多态实战应用指南》这篇文章主要从基础到高阶为大家详细介绍Python中多态的相关应用与技巧,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录一、多态的本质:python的“鸭子类型”哲学二、多态的三大实战场景场景1:数据处理管道——统一处理不同数据格式

Java慢查询排查与性能调优完整实战指南

《Java慢查询排查与性能调优完整实战指南》Java调优是一个广泛的话题,它涵盖了代码优化、内存管理、并发处理等多个方面,:本文主要介绍Java慢查询排查与性能调优的相关资料,文中通过代码介绍的非... 目录1. 事故全景:从告警到定位1.1 事故时间线1.2 关键指标异常1.3 排查工具链2. 深度剖析:

Python实现Word转PDF全攻略(从入门到实战)

《Python实现Word转PDF全攻略(从入门到实战)》在数字化办公场景中,Word文档的跨平台兼容性始终是个难题,而PDF格式凭借所见即所得的特性,已成为文档分发和归档的标准格式,下面小编就来和大... 目录一、为什么需要python处理Word转PDF?二、主流转换方案对比三、五套实战方案详解方案1:

SpringBoot实现RSA+AES自动接口解密的实战指南

《SpringBoot实现RSA+AES自动接口解密的实战指南》在当今数据泄露频发的网络环境中,接口安全已成为开发者不可忽视的核心议题,RSA+AES混合加密方案因其安全性高、性能优越而被广泛采用,本... 目录一、项目依赖与环境准备1.1 Maven依赖配置1.2 密钥生成与配置二、加密工具类实现2.1

Nginx进行平滑升级的实战指南(不中断服务版本更新)

《Nginx进行平滑升级的实战指南(不中断服务版本更新)》Nginx的平滑升级(也称为热升级)是一种在不停止服务的情况下更新Nginx版本或添加模块的方法,这种升级方式确保了服务的高可用性,避免了因升... 目录一.下载并编译新版Nginx1.下载解压2.编译二.替换可执行文件,并平滑升级1.替换可执行文件

Go语言使用select监听多个channel的示例详解

《Go语言使用select监听多个channel的示例详解》本文将聚焦Go并发中的一个强力工具,select,这篇文章将通过实际案例学习如何优雅地监听多个Channel,实现多任务处理、超时控制和非阻... 目录一、前言:为什么要使用select二、实战目标三、案例代码:监听两个任务结果和超时四、运行示例五