Apache Pulsar源码解析之Lookup机制

2024-04-08 01:36

本文主要是介绍Apache Pulsar源码解析之Lookup机制,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

文章目录

  • 引言
  • Lookup是什么
  • 客户端实现原理
  • 服务端实现原理
  • 总结

引言

在学习Pulsar一段时间后,相信大家也或多或少听说Lookup这个词,今天就一起来深入剖析下Pulsar是怎么设计的它吧

Lookup是什么

在客户端跟服务端建立TCP连接前有些信息需要提前获取,这个获取方式就是Lookup机制。所获取的信息有以下几种

  • 应该跟哪台Broker建立连接
  • Topic的Schema信息
  • Topic的分区信息

其中第一个是最重要的,因此今天就针对第一点进行深入剖析,大致流程如下图
在这里插入图片描述

  1. 在创建生产者/消费者时会触发Lookup,一般是通过HTTP请求Broker来获取目标Topic所归属的Broker节点信息,这样才知道跟哪台机器建立TCP连接进行数据交互
  2. Broker接收到Lookup命令,此时会进行限流检查、身份/权限认证、校验集群等检测动作后,根据请求中携带的Namespace信息获取对应的Namespace对象进行处理,这里Namespace会对Topic进行哈希运算并判断它落在数组的哪一个节点,算出来后就根据数组的信息来从Bundle数组中获得对应的Bundle,这个过程其实就是一致性哈希算法寻址过程。
  3. 在获得Bundle后会尝试从本机Cache中查询该Bundle所归属的Broker信息。
  4. 如果在Cache中没有命中,则会去Zookeeper中进行读取,如果发现该Bundle还未归属Broker则触发归属Broker的流程
  5. 获取到该Topic所归属的Broker信息后返回给客户端,客户端解析结果并跟所归属的Broker建立TCP连接,用于后续生产者往Broker节点进行消息写入

补充说明确定Bundle的归属,如果Broker的loadManager使用的是中心化策略,则需要Broker Leader来当裁判决定,否则当前Broker就可当作裁判。虽然Broker是无状态的,但会通过Zookeeper选举出一个Leader用于监控负载、为Bundle分配Broker等事情,裁判Broker通过loadManager查找负载最低的Broker并把Bundle分配给它。

客户端实现原理

Lookup机制是由客户端发起的,在创建生产者/消费者对象时会初始化网络连接,以生产者代码为例进行跟踪看看。无论是创建分区还是非分区生产者,最终都会走到ProducerImpl的构造函数,就从这里开始看吧

   public ProducerImpl(PulsarClientImpl client, String topic, ProducerConfigurationData conf,CompletableFuture<Producer<T>> producerCreatedFuture, int partitionIndex, Schema<T> schema,ProducerInterceptors interceptors, Optional<String> overrideProducerName) {....//这里进去就是创建跟Broker的网络连接grabCnx();}void grabCnx() {//实际上是调用ConnectionHandler进行的this.connectionHandler.grabCnx();}protected void grabCnx(Optional<URI> hostURI) {....//这里是核心,相当于最终又调用回PulsarClientImpl类的getConnection方法cnxFuture = state.client.getConnection(state.topic, (state.redirectedClusterURI.toString()));....}public CompletableFuture<ClientCnx> getConnection(final String topic, final String url) {TopicName topicName = TopicName.get(topic);//看到方法名就知道到了Lookup的时候了,所以说好的命名远胜于注释return getLookup(url).getBroker(topicName).thenCompose(lookupResult -> getConnection(lookupResult.getLogicalAddress(),lookupResult.getPhysicalAddress(), cnxPool.genRandomKeyToSelectCon()));}public LookupService getLookup(String serviceUrl) {return urlLookupMap.computeIfAbsent(serviceUrl, url -> {try {//忽略其他的,直接跟这里进去return createLookup(serviceUrl);} catch (PulsarClientException e) {log.warn("Failed to update url to lookup service {}, {}", url, e.getMessage());throw new IllegalStateException("Failed to update url " + url);}});}public LookupService createLookup(String url) throws PulsarClientException {//这里可以看到如果咱们在配置客户端的地址是http开头就会通过http方式进行Loopup,否则走二进制协议进行查询if (url.startsWith("http")) {return new HttpLookupService(conf, eventLoopGroup);} else {return new BinaryProtoLookupService(this, url, conf.getListenerName(), conf.isUseTls(),externalExecutorProvider.getExecutor());}}public HttpLookupService(ClientConfigurationData conf, EventLoopGroup eventLoopGroup)throws PulsarClientException {//进到可能会误会Pulsar是通过HttpClient工具包进行的HTTP通信,继续看HttpClient构造函数this.httpClient = new HttpClient(conf, eventLoopGroup);this.useTls = conf.isUseTls();this.listenerName = conf.getListenerName();}protected HttpClient(ClientConfigurationData conf, EventLoopGroup eventLoopGroup) throws PulsarClientException {....//可以看到实际上最终是调用的AsyncHttpClient进行HTTP通信,这是一个封装Netty的async-http-client-2.12.1.jar的外部包httpClient = new DefaultAsyncHttpClient(config);....}

通过上面可以看到Lookup服务已经完成初始化,接下来就来看看客户端如何发起Lookup请求,回到PulsarClientImpl的getConnection方法,可以看到这里是链式调用,上面是从getLookup看到了其实是对Lookup进行初始化的过程,那么接下来就跟踪getBroker方法看看是怎么获取的服务端信息

    public CompletableFuture<ClientCnx> getConnection(final String topic, final String url) {TopicName topicName = TopicName.get(topic);return getLookup(url).getBroker(topicName).thenCompose(lookupResult -> getConnection(lookupResult.getLogicalAddress(),lookupResult.getPhysicalAddress(), cnxPool.genRandomKeyToSelectCon()));}public CompletableFuture<LookupTopicResult> getBroker(TopicName topicName) {//判断访问哪个版本的接口String basePath = topicName.isV2() ? BasePathV2 : BasePathV1;String path = basePath + topicName.getLookupName();path = StringUtils.isBlank(listenerName) ? path : path + "?listenerName=" + Codec.encode(listenerName);//获取要访问的Broker地址return httpClient.get(path, LookupData.class).thenCompose(lookupData -> {URI uri = null;try {//解析服务端返回的数据,本质上就是返回的就是Topic所在Broker的节点IP+端口InetSocketAddress brokerAddress = InetSocketAddress.createUnresolved(uri.getHost(), uri.getPort());//HTTP通过Lookup方式访问服务端绝对不会走代理return CompletableFuture.completedFuture(new LookupTopicResult(brokerAddress, brokerAddress,false /* HTTP lookups never use the proxy */));} catch (Exception e) {....}});}public class LookupTopicResult {//LookupTopicResult是查询Topic归属Broker的结果后包装的一层结果,可以看到这里其实就是Socket信息也就是IP+端口private final InetSocketAddress logicalAddress;private final InetSocketAddress physicalAddress;private final boolean isUseProxy;
}

客户端的流程走到这里基本就结束了,是否有些意犹未尽迫不及待的想知道服务端又是怎么处理的?那么就看看下一节

服务端实现原理

服务端的入口在TopicLookup类的lookupTopicAsync方法,服务端大致步骤是这样的:1. 获取Topic所归属的Bundle 2. 查询Bundle所归属的Broker 3. 返回该Broker的url

    public void lookupTopicAsync(@Suspended AsyncResponse asyncResponse,@PathParam("topic-domain") String topicDomain, @PathParam("tenant") String tenant,@PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic,@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,@QueryParam("listenerName") String listenerName,@HeaderParam(LISTENERNAME_HEADER) String listenerNameHeader) {TopicName topicName = getTopicName(topicDomain, tenant, namespace, encodedTopic);if (StringUtils.isEmpty(listenerName) && StringUtils.isNotEmpty(listenerNameHeader)) {listenerName = listenerNameHeader;}//可以看得到这里是获取Lookup的,跟踪进去看看internalLookupTopicAsync(topicName, authoritative, listenerName).thenAccept(lookupData -> asyncResponse.resume(lookupData)).exceptionally(ex -> {....});}protected CompletableFuture<LookupData> internalLookupTopicAsync(final TopicName topicName, boolean authoritative, String listenerName) {
CompletableFuture<Optional<LookupResult>> lookupFuture = pulsar().getNamespaceService()//获得目标Broker地址, 继续从这里进去.getBrokerServiceUrlAsync(topicName,LookupOptions.builder().advertisedListenerName(listenerName).authoritative(authoritative).loadTopicsInBundle(false).build());}public CompletableFuture<Optional<LookupResult>> getBrokerServiceUrlAsync(TopicName topic, LookupOptions options) {long startTime = System.nanoTime();// 获取这个Topic所归属的BundleCompletableFuture<Optional<LookupResult>> future = getBundleAsync(topic).thenCompose(bundle -> {//根据获得的bundle信息查询归属的Brokerreturn findRedirectLookupResultAsync(bundle).thenCompose(optResult -> {//如果findRedirectLookupResultAsync方式没查到则走这里进行查询return findBrokerServiceUrl(bundle, options); });});future.thenAccept(optResult -> {....}).exceptionally(ex -> {....});return future;}

先看看是怎么获取Topic所归属的Bundle的吧,就从getBundleAsync方法跟踪进去

    public CompletableFuture<NamespaceBundle> getBundleAsync(TopicName topic) {return bundleFactory.getBundlesAsync(topic.getNamespaceObject())//直接看findBundle,命名意思已经很清晰了.thenApply(bundles -> bundles.findBundle(topic));}public NamespaceBundle findBundle(TopicName topicName) {checkArgument(nsname.equals(topicName.getNamespaceObject()));//同理,继续跟踪进去return factory.getTopicBundleAssignmentStrategy().findBundle(topicName, this);}public NamespaceBundle findBundle(TopicName topicName, NamespaceBundles namespaceBundles) {//计算Topic名称的哈希值long hashCode = Hashing.crc32().hashString(topicName.toString(), StandardCharsets.UTF_8).padToLong();//根据哈希值来获取所归属的bundle,一致性哈希的设计。跟进去看看是怎么计算的NamespaceBundle bundle = namespaceBundles.getBundle(hashCode);if (topicName.getDomain().equals(TopicDomain.non_persistent)) {bundle.setHasNonPersistentTopic(true);}return bundle;}protected NamespaceBundle getBundle(long hash) {//通过数组的二分查找进行计算,数组的元素个数跟存储Bundle的bundles的集合大小是一样的,能获取对应的Bundle//思路其实就是一致性哈希的查找方式,计算出哈希值处于哈希环所处的位置并查找其下一个节点的信息int idx = Arrays.binarySearch(partitions, hash);int lowerIdx = idx < 0 ? -(idx + 2) : idx;return bundles.get(lowerIdx);}

知道Bundle之后,下一步就是根据这个Bundle来查询其所归属的Broker节点,也就是上面的NamespaceService类的findRedirectLookupResultAsync方法,这里一路跟下去就是查询缓存中获取映射信息的地方了,感兴趣的伙伴可以继续跟下去

    private CompletableFuture<Optional<LookupResult>> findRedirectLookupResultAsync(ServiceUnitId bundle) {if (isSLAOrHeartbeatNamespace(bundle.getNamespaceObject().toString())) {return CompletableFuture.completedFuture(Optional.empty());}return redirectManager.findRedirectLookupResultAsync();}

总结

以上就是Pulsar的Lookup机制的实现流程,在寻址的过程中,需要阅读的伙伴具备一致性哈希的知识,因为Pulsar的Topic归属就是引入了一致性哈希算法来实现的。

这篇关于Apache Pulsar源码解析之Lookup机制的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/884221

相关文章

深度解析Java @Serial 注解及常见错误案例

《深度解析Java@Serial注解及常见错误案例》Java14引入@Serial注解,用于编译时校验序列化成员,替代传统方式解决运行时错误,适用于Serializable类的方法/字段,需注意签... 目录Java @Serial 注解深度解析1. 注解本质2. 核心作用(1) 主要用途(2) 适用位置3

Java MCP 的鉴权深度解析

《JavaMCP的鉴权深度解析》文章介绍JavaMCP鉴权的实现方式,指出客户端可通过queryString、header或env传递鉴权信息,服务器端支持工具单独鉴权、过滤器集中鉴权及启动时鉴权... 目录一、MCP Client 侧(负责传递,比较简单)(1)常见的 mcpServers json 配置

从原理到实战解析Java Stream 的并行流性能优化

《从原理到实战解析JavaStream的并行流性能优化》本文给大家介绍JavaStream的并行流性能优化:从原理到实战的全攻略,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的... 目录一、并行流的核心原理与适用场景二、性能优化的核心策略1. 合理设置并行度:打破默认阈值2. 避免装箱

Maven中生命周期深度解析与实战指南

《Maven中生命周期深度解析与实战指南》这篇文章主要为大家详细介绍了Maven生命周期实战指南,包含核心概念、阶段详解、SpringBoot特化场景及企业级实践建议,希望对大家有一定的帮助... 目录一、Maven 生命周期哲学二、default生命周期核心阶段详解(高频使用)三、clean生命周期核心阶

深入解析C++ 中std::map内存管理

《深入解析C++中std::map内存管理》文章详解C++std::map内存管理,指出clear()仅删除元素可能不释放底层内存,建议用swap()与空map交换以彻底释放,针对指针类型需手动de... 目录1️、基本清空std::map2️、使用 swap 彻底释放内存3️、map 中存储指针类型的对象

Java Scanner类解析与实战教程

《JavaScanner类解析与实战教程》JavaScanner类(java.util包)是文本输入解析工具,支持基本类型和字符串读取,基于Readable接口与正则分隔符实现,适用于控制台、文件输... 目录一、核心设计与工作原理1.底层依赖2.解析机制A.核心逻辑基于分隔符(delimiter)和模式匹

Java+AI驱动实现PDF文件数据提取与解析

《Java+AI驱动实现PDF文件数据提取与解析》本文将和大家分享一套基于AI的体检报告智能评估方案,详细介绍从PDF上传、内容提取到AI分析、数据存储的全流程自动化实现方法,感兴趣的可以了解下... 目录一、核心流程:从上传到评估的完整链路二、第一步:解析 PDF,提取体检报告内容1. 引入依赖2. 封装

基于Redis自动过期的流处理暂停机制

《基于Redis自动过期的流处理暂停机制》基于Redis自动过期的流处理暂停机制是一种高效、可靠且易于实现的解决方案,防止延时过大的数据影响实时处理自动恢复处理,以避免积压的数据影响实时性,下面就来详... 目录核心思路代码实现1. 初始化Redis连接和键前缀2. 接收数据时检查暂停状态3. 检测到延时过

Redis中哨兵机制和集群的区别及说明

《Redis中哨兵机制和集群的区别及说明》Redis哨兵通过主从复制实现高可用,适用于中小规模数据;集群采用分布式分片,支持动态扩展,适合大规模数据,哨兵管理简单但扩展性弱,集群性能更强但架构复杂,根... 目录一、架构设计与节点角色1. 哨兵机制(Sentinel)2. 集群(Cluster)二、数据分片

深度解析Python yfinance的核心功能和高级用法

《深度解析Pythonyfinance的核心功能和高级用法》yfinance是一个功能强大且易于使用的Python库,用于从YahooFinance获取金融数据,本教程将深入探讨yfinance的核... 目录yfinance 深度解析教程 (python)1. 简介与安装1.1 什么是 yfinance?