ZooKeeper 实战(四) Curator Watch事件监听

2024-01-13 18:36

本文主要是介绍ZooKeeper 实战(四) Curator Watch事件监听,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

文章目录

  • ZooKeeper 实战(四) Curator Watch事件监听
    • 0.前言
    • 1.Watch 事件监听概念
    • 2.NodeCache
      • 2.1.全参构造器参数
      • 2.2.代码DEMO
      • 2.3.日志输出
    • 3.PathChildrenCache
      • 3.1.全参构造器参数
      • 3.2.子节点监听时间类型
      • 3.2.代码DEMO
    • 4.TreeCache
      • 4.1.构造器参数
      • 4.2.代码DEMO
      • 4.3.日志输出

ZooKeeper 实战(四) Curator Watch事件监听

0.前言

上一篇博客只介绍了有关Curator中对ZNode的CRUD操作,从本篇起开始逐步介绍更加高级的API操作。

1.Watch 事件监听概念

ZooKeeper 中引入了Watcher机制来实现了发布/订阅功能能,能够让多个订阅者同时监听某一个对象,当一个对象自身状态变化时,会通知所有订阅者。虽然ZooKeeper原生支持通过注册Watcher来进行事件监听,但是其使用并不是特别方便,需要开发人员反复注册Watcher,比较繁琐。

而 Curator 引入了Cache 来实现对 ZooKeeper 服务端事件的监听。

Curator 中提供了三种 Cache(Watcher)来监听不同节点变化类型:

  • NodeCache:监听指定的节点。
  • PathChildrenCache:监听指定节点的子节点。
  • TreeCache:监听指定节点及其子孙节点。

2.NodeCache

监听指定的节点,增删改都会监听。

2.1.全参构造器参数

/*** @param: client 注册监听的客户端* @param: path 节点路径* @param: dataIsCompressed 是否开启数据压缩。传递的数据会进行压缩,传递速度快,但取数据时需要把压缩的数据进行转换,默认为false*/
public NodeCache(CuratorFramework client, String path, boolean dataIsCompressed);

2.2.代码DEMO

    @Overridepublic void run(ApplicationArguments args) throws Exception {log.info("。。。。。。。。。。。。。容器初始化完毕。。。。。。。。。。。。。。");String path = "/ahao/watcher";TimeUnit.SECONDS.sleep(3);// 创建NodeCache对象NodeCache nodeCache = new NodeCache(client,path);// 添加监听器nodeCache.getListenable().addListener(new NodeCacheListener() {@Overridepublic void nodeChanged() throws Exception {ChildData currentData = nodeCache.getCurrentData();if (currentData != null){String s = new String(currentData.getData(),StandardCharsets.UTF_8);log.info("监听{}节点发生变化,数据内容:{}",path,s);}else {log.info("监听{}节点被删除了",path);}}});// 开启监听nodeCache.start();TimeUnit.SECONDS.sleep(2);// 创建节点client.create().creatingParentsIfNeeded().forPath(path,"第一次新增".getBytes(StandardCharsets.UTF_8));TimeUnit.SECONDS.sleep(2);// 更新节点client.setData().forPath(path,"数据修改了".getBytes(StandardCharsets.UTF_8));TimeUnit.SECONDS.sleep(2);// 删除节点client.delete().deletingChildrenIfNeeded().forPath(path);}

2.3.日志输出

在这里插入图片描述

3.PathChildrenCache

监听指定节点的子节点。当一个子节点增删改时, PathChildrenCache会包含最新的子节点的数据和状态。

3.1.全参构造器参数

/*** @param: client 注册监听的客户端* @param: path 节点路径* @param: cacheData 是否缓存节点内容(包含节点状态)* @param: dataIsCompressed 是否开启数据压缩。传递的数据会进行压缩,传递速度快,但取数据时需要把压缩的数据进行转换,默认为false* @param: executorService 用于PathChildrenCache的后台线程的线程池。该线程池应该是单线程的,否则缓存可能会看到不一致的结果*/
public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, final CloseableExecutorService executorService)

3.2.子节点监听时间类型

public enum Type
{// 子节点添加CHILD_ADDED,// 子节点的数据变更CHILD_UPDATED,// 子节点被删除CHILD_REMOVED,// 以下三个事件类型表示:当连接断开时,PathChildrenCache将继续保持其断开连接之前的状态,并且在连接恢复后,PathChildrenCache将为断开连接期间发生的所有添加、删除和更新发出正常的子事件。// 当连接状态处于ConnectionState.SUSPENDED。CONNECTION_SUSPENDED,// 当连接状态处于ConnectionState.RECONNECTEDCONNECTION_RECONNECTED,// 当连接状态处于ConnectionState.LOSTCONNECTION_LOST,// 当通过PathChildrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT)启动监听时,该事件表示PathChildrenCache初始化完成This event signals that the initial cache has been populated.INITIALIZED
}

3.2.代码DEMO

    @Overridepublic void run(ApplicationArguments args) throws Exception {log.info("。。。。。。。。。。。。。容器初始化完毕。。。。。。。。。。。。。。");String path = "/ahao/watcher";TimeUnit.SECONDS.sleep(3);// 创建PathChildrenCache对象// 此处的cacheData参数一定要设置为true,不然Curator不会缓存数据当本地,// 那么后续pathChildrenCache.getCurrentData()得到的数据都为nullPathChildrenCache pathChildrenCache = new PathChildrenCache(client,path,true);// 添加监听器pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {@Overridepublic void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {if (event.getType() == PathChildrenCacheEvent.Type.INITIALIZED){log.info("PathChildrenCache初始化完,事件类型:{}", event.getType());}else {ChildData currentData = event.getData();log.info("事件类型:{},监听到的子节点发生变化:{}",event.getType(),currentData.getPath());}}});// 开启监听pathChildrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);// 创建子节点TimeUnit.SECONDS.sleep(2);client.create().creatingParentsIfNeeded().forPath(path+"/c1");client.create().creatingParentsIfNeeded().forPath(path+"/c2");client.create().creatingParentsIfNeeded().forPath(path+"/c3/age");// 修改子节点TimeUnit.SECONDS.sleep(2);client.setData().forPath(path+"/c1","c1更新了".getBytes(StandardCharsets.UTF_8));client.setData().forPath(path+"/c2","c2更新了".getBytes(StandardCharsets.UTF_8));// 删除子节点TimeUnit.SECONDS.sleep(2);client.delete().deletingChildrenIfNeeded().forPath(path+"/c3");}

3.3.日志输出

可以看出,PathChildrenCache只会监听直属子节点的变化,其非直属子节点的后代节点如/c3/age,没有发布通知。

在这里插入图片描述

4.TreeCache

监听指定节点及其子孙节点。

4.1.构造器参数

/*** @param: client 注册监听的客户端* @param: path 节点路径*/
public TreeCache(CuratorFramework client, String path)/*** @param: client 注册监听的客户端* @param: path 节点路径* @param: cacheData 是否缓存节点内容(包含节点状态)* @param: dataIsCompressed 是否开启数据压缩。传递的数据会进行压缩,传递速度快,但取数据时需要把压缩的数据进行转换,默认为false* @param: maxDepth 最大深度。最深的那个后代节点到path所需要经过的节点数* @param: executorService 用于PathChildrenCache的后台线程的线程池。该线程池应该是单线程的,否则缓存可能会看到不一致的结果* @param: createParentNodes 是否需要创建父节点。如果父节点不存在泽创建父节点(容器节点)* @param: TreeCacheSelector TreeCache选择器。根据指定的策略和条件,选择适合的缓存树来创建和维护TreeCache*/
TreeCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, int maxDepth, final ExecutorService executorService, boolean createParentNodes, TreeCacheSelector selector)

4.2.代码DEMO

    @Overridepublic void run(ApplicationArguments args) throws Exception {log.info("。。。。。。。。。。。。。容器初始化完毕。。。。。。。。。。。。。。");String path = "/ahao/watcher/tree";TimeUnit.SECONDS.sleep(3);// 创建TreeCache对象,也可通过TreeCache.newBuilder()创建TreeCache treeCache = new TreeCache(client,path);treeCache.getListenable().addListener(new TreeCacheListener() {@Overridepublic void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {if (event.getType() == TreeCacheEvent.Type.INITIALIZED){log.info("TreeCache初始化完,事件类型:{}", event.getType());}else {ChildData currentData = event.getData();log.info("事件类型:{},监听到的子节点发生变化:{}",event.getType(),currentData.getPath());}}});// 开启监听treeCache.start();// 创建节点TimeUnit.SECONDS.sleep(2);client.create().creatingParentsIfNeeded().forPath(path);client.create().creatingParentsIfNeeded().forPath(path +"/t1");client.create().creatingParentsIfNeeded().forPath(path +"/t2/ccc");// 修改子节点TimeUnit.SECONDS.sleep(2);client.setData().forPath(path,"根节点更新了".getBytes(StandardCharsets.UTF_8));client.setData().forPath(path +"/t2/ccc","/t2/ccc更新了".getBytes(StandardCharsets.UTF_8));// 删除子节点TimeUnit.SECONDS.sleep(2);client.delete().deletingChildrenIfNeeded().forPath(path +"/t2");}

4.3.日志输出

可以看出TreeCache会监听当前节点和后代节点的变化。

在这里插入图片描述

这篇关于ZooKeeper 实战(四) Curator Watch事件监听的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/602407

相关文章

Python版本信息获取方法详解与实战

《Python版本信息获取方法详解与实战》在Python开发中,获取Python版本号是调试、兼容性检查和版本控制的重要基础操作,本文详细介绍了如何使用sys和platform模块获取Python的主... 目录1. python版本号获取基础2. 使用sys模块获取版本信息2.1 sys模块概述2.1.1

Python爬虫HTTPS使用requests,httpx,aiohttp实战中的证书异步等问题

《Python爬虫HTTPS使用requests,httpx,aiohttp实战中的证书异步等问题》在爬虫工程里,“HTTPS”是绕不开的话题,HTTPS为传输加密提供保护,同时也给爬虫带来证书校验、... 目录一、核心问题与优先级检查(先问三件事)二、基础示例:requests 与证书处理三、高并发选型:

vue监听属性watch的用法及使用场景详解

《vue监听属性watch的用法及使用场景详解》watch是vue中常用的监听器,它主要用于侦听数据的变化,在数据发生变化的时候执行一些操作,:本文主要介绍vue监听属性watch的用法及使用场景... 目录1. 监听属性 watch2. 常规用法3. 监听对象和route变化4. 使用场景附Watch 的

Java中的分布式系统开发基于 Zookeeper 与 Dubbo 的应用案例解析

《Java中的分布式系统开发基于Zookeeper与Dubbo的应用案例解析》本文将通过实际案例,带你走进基于Zookeeper与Dubbo的分布式系统开发,本文通过实例代码给大家介绍的非常详... 目录Java 中的分布式系统开发基于 Zookeeper 与 Dubbo 的应用案例一、分布式系统中的挑战二

Oracle Scheduler任务故障诊断方法实战指南

《OracleScheduler任务故障诊断方法实战指南》Oracle数据库作为企业级应用中最常用的关系型数据库管理系统之一,偶尔会遇到各种故障和问题,:本文主要介绍OracleSchedul... 目录前言一、故障场景:当定时任务突然“消失”二、基础环境诊断:搭建“全局视角”1. 数据库实例与PDB状态2

Git进行版本控制的实战指南

《Git进行版本控制的实战指南》Git是一种分布式版本控制系统,广泛应用于软件开发中,它可以记录和管理项目的历史修改,并支持多人协作开发,通过Git,开发者可以轻松地跟踪代码变更、合并分支、回退版本等... 目录一、Git核心概念解析二、环境搭建与配置1. 安装Git(Windows示例)2. 基础配置(必

MyBatis分页查询实战案例完整流程

《MyBatis分页查询实战案例完整流程》MyBatis是一个强大的Java持久层框架,支持自定义SQL和高级映射,本案例以员工工资信息管理为例,详细讲解如何在IDEA中使用MyBatis结合Page... 目录1. MyBATis框架简介2. 分页查询原理与应用场景2.1 分页查询的基本原理2.1.1 分

使用Python批量将.ncm格式的音频文件转换为.mp3格式的实战详解

《使用Python批量将.ncm格式的音频文件转换为.mp3格式的实战详解》本文详细介绍了如何使用Python通过ncmdump工具批量将.ncm音频转换为.mp3的步骤,包括安装、配置ffmpeg环... 目录1. 前言2. 安装 ncmdump3. 实现 .ncm 转 .mp34. 执行过程5. 执行结

SpringBoot 多环境开发实战(从配置、管理与控制)

《SpringBoot多环境开发实战(从配置、管理与控制)》本文详解SpringBoot多环境配置,涵盖单文件YAML、多文件模式、MavenProfile分组及激活策略,通过优先级控制灵活切换环境... 目录一、多环境开发基础(单文件 YAML 版)(一)配置原理与优势(二)实操示例二、多环境开发多文件版

Three.js构建一个 3D 商品展示空间完整实战项目

《Three.js构建一个3D商品展示空间完整实战项目》Three.js是一个强大的JavaScript库,专用于在Web浏览器中创建3D图形,:本文主要介绍Three.js构建一个3D商品展... 目录引言项目核心技术1. 项目架构与资源组织2. 多模型切换、交互热点绑定3. 移动端适配与帧率优化4. 可