Pulsar Schema使用原理介绍

2024-03-09 19:04

本文主要是介绍Pulsar Schema使用原理介绍,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

一、引言

关于Pulsar Schema,咱们要想想以下几个问题

  • Pulsar 中的 Schema 是什么?
  • Pulsar Schema Registry的作用是什么?
  • 怎么使用?
  • 原理是什么?

二、Schema 是什么

Schema是定义结构化数据和二进制字节数组之间转换的逻辑,Pulsar的消息是以非结构化的二进制数组进行存储的,Schema只有在读写时才会被应用于数据上,因此生产者和消费者需要对Schema达成一致。Pulsar通过Schema Registry作为一个中央仓库存储Schema信息,它可以协调生产者和消费者保证相同的Schema,它可以存储多个版本的Schema,支持不同的兼容性配置以及根据兼容性的要求进行Schema的演进。

Pulsar将Schema存储在Bookie上,Schema的写入、读取都通过Broker和Bookie交互,这个逻辑跟消息的读写操作是一只的,因此不需要额外考虑Schema的可用性和可靠性问题,因此整体看Pulsar实现Schema Registry的方式非常优雅

类型安全在所有数据应用中都非常重要,生产者和消费者需要某种机制协调数据类型来避免各种潜在的问题,比如序列化和反序列化方式不一致。数据安全通常有两种处理方式client-side和service-side,本质上就是客户端用时决定和服务端提前保证

client-side:将一切交给用户,客户端自行负责消息的序列化和反序列化并且保证生产消费时消息的类型安全,这种方式的最大问题就是类型是通过约定的,一旦生产者写入非约定的数据,下游的消费者将没有办法解析数据

server-side:数据安全由服务端保证,生产者和消费者都需要跟服务端提前确定数据类型。这种方式真正意义上保证了数据的类型安全,避免了生产者写入非法数据的问题

两种差异如下图
在这里插入图片描述

三、怎么使用

1. client-side

生产者代码逻辑

    //schema在第一次写入的时候就已经决定好了,后续用其他的schema消息类型会写入失败public static void customSchemaProducer()  {try {String serverUrl = "http://localhost:8080";PulsarClient pulsarClient =PulsarClient.builder().serviceUrl(serverUrl).build();Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://sherlock-api-tenant-1/sherlock-namespace-1/topic_8").create();User user = new User();user.setName("老六");user.setAge(21);user.setAddress("海南");//由用户自行做序列化逻辑producer.send(JSON.toJSONString(user).getBytes());producer.close();pulsarClient.close();} catch (Exception e) {}}

消费者逻辑代码

  public static void customSchemaConsumer() {try {String serverUrl = "http://localhost:8080";PulsarClient pulsarClient =PulsarClient.builder().serviceUrl(serverUrl).build();Consumer<byte[]> consumer = pulsarClient.newConsumer().topic("persistent://sherlock-api-tenant-1/sherlock-namespace-1/topic_8").subscriptionName("sub_03").subscribe();while(true) {Message<byte[]> message = consumer.receive();//由用户自行做序列化逻辑byte[] user = message.getValue();System.out.println("消息数据为:"+JSON.parseObject(user, User.class).toString());consumer.acknowledge(message);//consumer.negativeAcknowledge(message);}} catch (Exception e) {}}

执行效果如下

消息数据为:User{name='老六', age=21, address='海南'}

2. server-side

生产者代码逻辑

   //schema在第一次写入的时候就已经决定好了,后续用其他的schema消息类型会写入失败public static void customSchemaProducer()  {try {String serverUrl = "http://localhost:8080";PulsarClient pulsarClient =PulsarClient.builder().serviceUrl(serverUrl).build();//由Pulsar做序列化逻辑Producer<User> producer = pulsarClient.newProducer(AvroSchema.of(User.class)).topic("persistent://sherlock-api-tenant-1/sherlock-namespace-1/topic_7").create();User user = new User();user.setName("王武");user.setAge(36);user.setAddress("海南");producer.send(user);producer.close();pulsarClient.close();} catch (Exception e) {}}

消费者逻辑代码

   public static void customSchemaConsumer() {try {String serverUrl = "http://localhost:8080";PulsarClient pulsarClient =PulsarClient.builder().serviceUrl(serverUrl).build();//由Pulsar做序列化逻辑Consumer<User> consumer = pulsarClient.newConsumer(AvroSchema.of(User.class)).topic("persistent://sherlock-api-tenant-1/sherlock-namespace-1/topic_7").subscriptionName("sub_03").subscribe();while(true) {Message<User> message = consumer.receive();User user = message.getValue();System.out.println("消息数据为:"+user);consumer.acknowledge(message);//consumer.negativeAcknowledge(message);}} catch (Exception e) {}}

执行效果如下

消息数据为:User{name='王武', age=36, address='海南'}

3. 小结

分别查看两个Topic的Schema信息如下图

通过查询client-side的Schema信息,会发现Pulsar服务端其实并没有进行存储,相当于不指定Schema的话Pulsar默认都用byte数组
在这里插入图片描述

再来看看server-side的Schema信息,可以看到打印如下,namespace是pojo类的包路径,name是pojo类名,然后fields就是pojo类的各个字段的属性(像不像mysql里面的表结构,不少场景Topic就是当作表来用的),然后type是AVRO是由于咱们是用的avro进行序列化的。
在这里插入图片描述

除了在读写数据时指定Schema,Pulsar还支持通过admin管理流提前指定好,具体指令在这里。如果是用Pulsar来作为实时数仓场景,强烈建议提前通过admin管理流进行指定好,配置isSchemaValidationEnforced可以考虑开启。如果条件允许可以考虑做成服务化,例如通过Web页面提供新建Schema、修改Schema操作并接入公司内部的审批流等

pulsar-admin schemas upload --filename
POST /admin/v2/schemas/:tenant/:namespace/:topic/schema
pulsar-admin schemas get sherlock-api-tenant-1/sherlock-namespace-1/partition_partition_topic_3

四、原理解析

Schema相关的流程咱们需要关注以下几个

  • 注册Schema流程
    • 生产者端侧
    • 消费者端侧
    • 指定服务器
  • Schema生效流程
  • 更新Schema流程

1. 注册Schema流程

生产者端侧
在这里插入图片描述

  1. 生产者实例会在内部构造schema实例,生产者会通过它对数据进行转换
  2. 生产者会请求连接Broker,并传递schema信息 SchemaInfo
  3. Broker会在schema registry中查找这个schema是否被注册,如果已经注册了就将注册的schema版本返回给生产者
  4. Broker检查是否支持自动更新schema,如果配置不允许自动更新,则这个schema不能被注册并且拒绝生产者
  5. Broker进行schema兼容性检查,如果通过检查则将此schema存储在schema registry并返回版本给生产者,生产者所有消息以这个schema格式进行发送;若是检查没通过则拒绝生产者

消费者端
在这里插入图片描述

  1. 消费者实例会在内部构造schema实例
  2. 消费者请求连接Broker,并传递schema信息 SchemaInfo
  3. Broker检查这个Topic是否已经在使用,有的话跳到第五步,否则跳到第四步
  4. Broker检查是否支持自动更新schema,如果支持则注册这个schema,否则拒绝客户端
  5. Broker进行schema兼容性检查,通过则连接否则拒绝客户端

五、参考文献

  1. Pulsar:Schema Registry介绍
  2. 官方文档
  3. 深度解读 Pulsar Schema

这篇关于Pulsar Schema使用原理介绍的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/791658

相关文章

Go语言使用slices包轻松实现排序功能

《Go语言使用slices包轻松实现排序功能》在Go语言开发中,对数据进行排序是常见的需求,Go1.18版本引入的slices包提供了简洁高效的排序解决方案,支持内置类型和用户自定义类型的排序操作,本... 目录一、内置类型排序:字符串与整数的应用1. 字符串切片排序2. 整数切片排序二、检查切片排序状态:

使用Java将实体类转换为JSON并输出到控制台的完整过程

《使用Java将实体类转换为JSON并输出到控制台的完整过程》在软件开发的过程中,Java是一种广泛使用的编程语言,而在众多应用中,数据的传输和存储经常需要使用JSON格式,用Java将实体类转换为J... 在软件开发的过程中,Java是一种广泛使用的编程语言,而在众多应用中,数据的传输和存储经常需要使用j

Nginx使用Keepalived部署web集群(高可用高性能负载均衡)实战案例

《Nginx使用Keepalived部署web集群(高可用高性能负载均衡)实战案例》本文介绍Nginx+Keepalived实现Web集群高可用负载均衡的部署与测试,涵盖架构设计、环境配置、健康检查、... 目录前言一、架构设计二、环境准备三、案例部署配置 前端 Keepalived配置 前端 Nginx

Python logging模块使用示例详解

《Pythonlogging模块使用示例详解》Python的logging模块是一个灵活且强大的日志记录工具,广泛应用于应用程序的调试、运行监控和问题排查,下面给大家介绍Pythonlogging模... 目录一、为什么使用 logging 模块?二、核心组件三、日志级别四、基本使用步骤五、快速配置(bas

使用animation.css库快速实现CSS3旋转动画效果

《使用animation.css库快速实现CSS3旋转动画效果》随着Web技术的不断发展,动画效果已经成为了网页设计中不可或缺的一部分,本文将深入探讨animation.css的工作原理,如何使用以及... 目录1. css3动画技术简介2. animation.css库介绍2.1 animation.cs

使用雪花算法产生id导致前端精度缺失问题解决方案

《使用雪花算法产生id导致前端精度缺失问题解决方案》雪花算法由Twitter提出,设计目的是生成唯一的、递增的ID,下面:本文主要介绍使用雪花算法产生id导致前端精度缺失问题的解决方案,文中通过代... 目录一、问题根源二、解决方案1. 全局配置Jackson序列化规则2. 实体类必须使用Long封装类3.

Python文件操作与IO流的使用方式

《Python文件操作与IO流的使用方式》:本文主要介绍Python文件操作与IO流的使用方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、python文件操作基础1. 打开文件2. 关闭文件二、文件读写操作1.www.chinasem.cn 读取文件2. 写

PyQt6中QMainWindow组件的使用详解

《PyQt6中QMainWindow组件的使用详解》QMainWindow是PyQt6中用于构建桌面应用程序的基础组件,本文主要介绍了PyQt6中QMainWindow组件的使用,具有一定的参考价值,... 目录1. QMainWindow 组php件概述2. 使用 QMainWindow3. QMainW

使用Python自动化生成PPT并结合LLM生成内容的代码解析

《使用Python自动化生成PPT并结合LLM生成内容的代码解析》PowerPoint是常用的文档工具,但手动设计和排版耗时耗力,本文将展示如何通过Python自动化提取PPT样式并生成新PPT,同时... 目录核心代码解析1. 提取 PPT 样式到 jsON关键步骤:代码片段:2. 应用 JSON 样式到

java变量内存中存储的使用方式

《java变量内存中存储的使用方式》:本文主要介绍java变量内存中存储的使用方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录1、介绍2、变量的定义3、 变量的类型4、 变量的作用域5、 内存中的存储方式总结1、介绍在 Java 中,变量是用于存储程序中数据