【ES实战】ES创建Transports客户端时间过长分析

2023-10-22 06:36

本文主要是介绍【ES实战】ES创建Transports客户端时间过长分析,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

ES创建Transports客户端时间过长分析

2023年10月19日

文章目录

  • ES创建Transports客户端时间过长分析
    • 问题描述
    • 问题重现
    • 问题分析
      • 是否可以配置链接超时时间
      • 节点建立连接超时逻辑
      • 为啥超时间会出现翻倍
    • 优化方案

在创建ES Transport客户端的时,当出现以下场景时,影响连接速度。

问题描述

  1. 使用ES Transport 客户端创建与集群的链接。
  2. 连接地址里面有不存在的IP
  3. 在增加ES节点时,采用逐个增加的方式

整个建立链接的过程会非常耗时。

问题重现

采用jar依赖如下

        <dependency><groupId>org.elasticsearch.client</groupId><artifactId>transport</artifactId><version>5.6.16</version></dependency><dependency><groupId>org.elasticsearch.client</groupId><artifactId>x-pack-transport</artifactId><version>5.6.1</version></dependency><dependency><groupId>org.elasticsearch.client</groupId><artifactId>sniffer</artifactId><version>5.4.2</version></dependency>

创建连接代码如下

        final Settings settings = Settings.builder().put("cluster.name", "common-es").put("client.transport.sniff", true).build();final TransportClient transportClient = new PreBuiltXPackTransportClient(settings);long t1 = System.currentTimeMillis();transportClient.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("192.168.80.37"), 9800));logger.info("第1个错误节点耗时:" + (System.currentTimeMillis() - t1) / 1000);long t2 = System.currentTimeMillis();transportClient.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("192.168.80.38"), 9800));logger.info("第2个错误节点耗时:" + (System.currentTimeMillis() - t2) / 1000);long t3 = System.currentTimeMillis();transportClient.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("192.168.80.39"), 9800));logger.info("第3个错误节点耗时:" + (System.currentTimeMillis() - t3) / 1000);

输出结果

[2023-10-19 15:31:31,398] [main] [INFO ] xxx.Client - 第1个错误节点耗时:21
[2023-10-19 15:32:13,414] [main] [INFO ] xxx.Client - 第2个错误节点耗时:42
[2023-10-19 15:32:55,436] [main] [INFO ] xxx.Client - 第3个错误节点耗时:42

问题分析

是否可以配置链接超时时间

通过new PreBuiltXPackTransportClient()方法创建客户端,跟踪源码发现其会在TransportClient.buildTemplate进行建立网络模块服务,在继续debug,会发现会在TcpTransport中方法buildDefaultConnectionProfile构建链接的配置文件。发现其TCP_CONNECT_TIMEOUT默认的配置是30s,起对应的配置参数是transport.tcp.connect_timeout

    static ConnectionProfile buildDefaultConnectionProfile(Settings settings) {int connectionsPerNodeRecovery = CONNECTIONS_PER_NODE_RECOVERY.get(settings);int connectionsPerNodeBulk = CONNECTIONS_PER_NODE_BULK.get(settings);int connectionsPerNodeReg = CONNECTIONS_PER_NODE_REG.get(settings);int connectionsPerNodeState = CONNECTIONS_PER_NODE_STATE.get(settings);int connectionsPerNodePing = CONNECTIONS_PER_NODE_PING.get(settings);ConnectionProfile.Builder builder = new ConnectionProfile.Builder();// 链接的超时时间builder.setConnectTimeout(TCP_CONNECT_TIMEOUT.get(settings));builder.setHandshakeTimeout(TCP_CONNECT_TIMEOUT.get(settings));builder.addConnections(connectionsPerNodeBulk, TransportRequestOptions.Type.BULK);builder.addConnections(connectionsPerNodePing, TransportRequestOptions.Type.PING);// if we are not master eligible we don't need a dedicated channel to publish the statebuilder.addConnections(DiscoveryNode.isMasterNode(settings) ? connectionsPerNodeState : 0, TransportRequestOptions.Type.STATE);// if we are not a data-node we don't need any dedicated channels for recoverybuilder.addConnections(DiscoveryNode.isDataNode(settings) ? connectionsPerNodeRecovery : 0, TransportRequestOptions.Type.RECOVERY);builder.addConnections(connectionsPerNodeReg, TransportRequestOptions.Type.REG);return builder.build();}
public static final Setting<TimeValue> TCP_CONNECT_TIMEOUT =timeSetting("transport.tcp.connect_timeout", NetworkService.TcpSettings.TCP_CONNECT_TIMEOUT, Setting.Property.NodeScope);

节点建立连接超时逻辑

TcpTransport.openConnection(DiscoveryNode node, ConnectionProfile connectionProfile)方法建立通信管道时,在通信之前重组连接的默认配置和自定义配置。在Netty4Transport.connectToChannels()方法内具体生效,future.awaitUninterruptibly((long) (connectTimeout.millis() * 1.5));

增加节点的方式

TransportClient类提供了数组方式增加节点和单个节点增加的方式,

    public TransportClient addTransportAddress(TransportAddress transportAddress) {nodesService.addTransportAddresses(transportAddress);return this;}public TransportClient addTransportAddresses(TransportAddress... transportAddress) {nodesService.addTransportAddresses(transportAddress);return this;}

不过根据代码,发现其都是调用的TransportClientNodesService类的addTransportAddresses(TransportAddress... transportAddresses)方法

    public TransportClientNodesService addTransportAddresses(TransportAddress... transportAddresses) {// 竞争对象锁mutexsynchronized (mutex) {if (closed) {throw new IllegalStateException("transport client is closed, can't add an address");}List<TransportAddress> filtered = new ArrayList<>(transportAddresses.length);for (TransportAddress transportAddress : transportAddresses) {boolean found = false;for (DiscoveryNode otherNode : listedNodes) {// 方式连接地址值重复,会自动过滤if (otherNode.getAddress().equals(transportAddress)) {found = true;logger.debug("address [{}] already exists with [{}], ignoring...", transportAddress, otherNode);break;}}if (!found) {filtered.add(transportAddress);}}if (filtered.isEmpty()) {return this;}List<DiscoveryNode> builder = new ArrayList<>(listedNodes);for (TransportAddress transportAddress : filtered) {DiscoveryNode node = new DiscoveryNode("#transport#-" + tempNodeIdGenerator.incrementAndGet(),transportAddress, Collections.emptyMap(), Collections.emptySet(), minCompatibilityVersion);logger.debug("adding address [{}]", node);builder.add(node);}// listNodes里面存放的是配置的连接节点列表listedNodes = Collections.unmodifiableList(builder);// 调用不同的节点采集-里面也对mutex锁进行竞争nodesSampler.sample();}return this;}

NodeSampler.sample()

		public void sample() {synchronized (mutex) {if (closed) {return;}doSample();}}

NodesSampler有两个具体的继承实现类

  • SniffNodesSampler:开启嗅探属性的客户端
  • SimpleNodeSampler:简单客户端

这边对SniffNodesSamplersample()方法进行分析。

        @Overrideprotected void doSample() {Set<DiscoveryNode> nodesToPing = new HashSet<>();// 最新要进行连接的一组节点列表for (DiscoveryNode node : listedNodes) {nodesToPing.add(node);}// nodes代表已经连接上的节点列表for (DiscoveryNode node : nodes) {nodesToPing.add(node);}// 并发控制辅助类final CountDownLatch latch = new CountDownLatch(nodesToPing.size());final ConcurrentMap<DiscoveryNode, ClusterStateResponse> clusterStateResponses = ConcurrentCollections.newConcurrentMap();try {for (final DiscoveryNode nodeToPing : nodesToPing) {// 采用线程池的方式去连接节点threadPool.executor(ThreadPool.Names.MANAGEMENT).execute(new AbstractRunnable() {Transport.Connection connectionToClose = null;void onDone() {try {IOUtils.closeWhileHandlingException(connectionToClose);} finally {latch.countDown();}}@Overridepublic void onFailure(Exception e) {onDone();......}@Overrideprotected void doRun() throws Exception {Transport.Connection pingConnection = null;if (nodes.contains(nodeToPing)) {try {pingConnection = transportService.getConnection(nodeToPing);} catch (NodeNotConnectedException e) {// will use a temp connection}}if (pingConnection == null) {logger.trace("connecting to cluster node [{}]", nodeToPing);// 尝试去连接节点,超时会抛出异常connectionToClose = transportService.openConnection(nodeToPing, LISTED_NODES_PROFILE);pingConnection = connectionToClose;}// 若有一个节点连接成功会进行集群状态查询,返回值里面包含了全部可用节点transportService.sendRequest(pingConnection, ClusterStateAction.NAME,Requests.clusterStateRequest().clear().nodes(true).local(true),TransportRequestOptions.builder().withType(TransportRequestOptions.Type.STATE).withTimeout(pingTimeout).build(),new TransportResponseHandler<ClusterStateResponse>() {@Overridepublic ClusterStateResponse newInstance() {return new ClusterStateResponse();}@Overridepublic String executor() {return ThreadPool.Names.SAME;}@Overridepublic void handleResponse(ClusterStateResponse response) {clusterStateResponses.put(nodeToPing, response);onDone();}@Overridepublic void handleException(TransportException e) {logger.info((Supplier<?>) () -> new ParameterizedMessage("failed to get local cluster state for {}, disconnecting...", nodeToPing), e);try {hostFailureListener.onNodeDisconnected(nodeToPing, e);} finally {onDone();}}});}});}latch.await();} catch (InterruptedException e) {Thread.currentThread().interrupt();return;}HashSet<DiscoveryNode> newNodes = new HashSet<>();HashSet<DiscoveryNode> newFilteredNodes = new HashSet<>();for (Map.Entry<DiscoveryNode, ClusterStateResponse> entry : clusterStateResponses.entrySet()) {if (!ignoreClusterName && !clusterName.equals(entry.getValue().getClusterName())) {logger.warn("node {} not part of the cluster {}, ignoring...",entry.getValue().getState().nodes().getLocalNode(), clusterName);newFilteredNodes.add(entry.getKey());continue;}for (ObjectCursor<DiscoveryNode> cursor : entry.getValue().getState().nodes().getDataNodes().values()) {newNodes.add(cursor.value);}}// 验证新节点是否可连接nodes = validateNewNodes(newNodes);filteredNodes = Collections.unmodifiableList(new ArrayList<>(newFilteredNodes));}

通过代码发现,其实用了线程池并发连接节点,但是也使用了CountDownLatch,这就导致了,如果有一个节点超时,那整个批次都需要等待这么长的时间。典型的长尾效应

为啥超时间会出现翻倍

建立TransportClientNodesService服务时,构造函数中增加了对NodeSampler的调度。

    TransportClientNodesService(Settings settings, TransportService transportService,ThreadPool threadPool, TransportClient.HostFailureListener hostFailureListener) {.........this.nodesSamplerFuture = threadPool.schedule(nodesSamplerInterval, ThreadPool.Names.GENERIC, new ScheduledNodeSampler());}

ScheduledNodeSampler

当调度触发之后,也会去执行nodesSampler.sample();,也就对mutex锁有了竞争,当调用增加连接方法之后,就会有两次调用 nodesSampler.sample();也就会将超时时间翻倍。

class ScheduledNodeSampler implements Runnable {@Overridepublic void run() {try {nodesSampler.sample();if (!closed) {nodesSamplerFuture = threadPool.schedule(nodesSamplerInterval, ThreadPool.Names.GENERIC, this);}} catch (Exception e) {logger.warn("failed to sample", e);}}
}

优化方案

  1. Settings增加超时transport的tcp超时配置。

     final Settings settings = Settings.builder().put("cluster.name", "common-es").put("transport.tcp.connect_timeout", "5s").put("client.transport.sniff", true).build();
    

    注意此配置的参数名不同版本之间存在差异。

  2. 使用数组方式增加连接节点,减少反复调用TransportClientNodesService addTransportAddresses次数,就是在减少分批次的产生阻塞耗时

这篇关于【ES实战】ES创建Transports客户端时间过长分析的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/259722

相关文章

精选20个好玩又实用的的Python实战项目(有图文代码)

《精选20个好玩又实用的的Python实战项目(有图文代码)》文章介绍了20个实用Python项目,涵盖游戏开发、工具应用、图像处理、机器学习等,使用Tkinter、PIL、OpenCV、Kivy等库... 目录① 猜字游戏② 闹钟③ 骰子模拟器④ 二维码⑤ 语言检测⑥ 加密和解密⑦ URL缩短⑧ 音乐播放

python panda库从基础到高级操作分析

《pythonpanda库从基础到高级操作分析》本文介绍了Pandas库的核心功能,包括处理结构化数据的Series和DataFrame数据结构,数据读取、清洗、分组聚合、合并、时间序列分析及大数据... 目录1. Pandas 概述2. 基本操作:数据读取与查看3. 索引操作:精准定位数据4. Group

MySQL中EXISTS与IN用法使用与对比分析

《MySQL中EXISTS与IN用法使用与对比分析》在MySQL中,EXISTS和IN都用于子查询中根据另一个查询的结果来过滤主查询的记录,本文将基于工作原理、效率和应用场景进行全面对比... 目录一、基本用法详解1. IN 运算符2. EXISTS 运算符二、EXISTS 与 IN 的选择策略三、性能对比

Redis客户端连接机制的实现方案

《Redis客户端连接机制的实现方案》本文主要介绍了Redis客户端连接机制的实现方案,包括事件驱动模型、非阻塞I/O处理、连接池应用及配置优化,具有一定的参考价值,感兴趣的可以了解一下... 目录1. Redis连接模型概述2. 连接建立过程详解2.1 连php接初始化流程2.2 关键配置参数3. 最大连

SQL Server跟踪自动统计信息更新实战指南

《SQLServer跟踪自动统计信息更新实战指南》本文详解SQLServer自动统计信息更新的跟踪方法,推荐使用扩展事件实时捕获更新操作及详细信息,同时结合系统视图快速检查统计信息状态,重点强调修... 目录SQL Server 如何跟踪自动统计信息更新:深入解析与实战指南 核心跟踪方法1️⃣ 利用系统目录

MySQL 内存使用率常用分析语句

《MySQL内存使用率常用分析语句》用户整理了MySQL内存占用过高的分析方法,涵盖操作系统层确认及数据库层bufferpool、内存模块差值、线程状态、performance_schema性能数据... 目录一、 OS层二、 DB层1. 全局情况2. 内存占js用详情最近连续遇到mysql内存占用过高导致

java中pdf模版填充表单踩坑实战记录(itextPdf、openPdf、pdfbox)

《java中pdf模版填充表单踩坑实战记录(itextPdf、openPdf、pdfbox)》:本文主要介绍java中pdf模版填充表单踩坑的相关资料,OpenPDF、iText、PDFBox是三... 目录准备Pdf模版方法1:itextpdf7填充表单(1)加入依赖(2)代码(3)遇到的问题方法2:pd

深度解析Nginx日志分析与499状态码问题解决

《深度解析Nginx日志分析与499状态码问题解决》在Web服务器运维和性能优化过程中,Nginx日志是排查问题的重要依据,本文将围绕Nginx日志分析、499状态码的成因、排查方法及解决方案展开讨论... 目录前言1. Nginx日志基础1.1 Nginx日志存放位置1.2 Nginx日志格式2. 499

Java获取当前时间String类型和Date类型方式

《Java获取当前时间String类型和Date类型方式》:本文主要介绍Java获取当前时间String类型和Date类型方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,... 目录Java获取当前时间String和Date类型String类型和Date类型输出结果总结Java获取

Python实现批量提取BLF文件时间戳

《Python实现批量提取BLF文件时间戳》BLF(BinaryLoggingFormat)作为Vector公司推出的CAN总线数据记录格式,被广泛用于存储车辆通信数据,本文将使用Python轻松提取... 目录一、为什么需要批量处理 BLF 文件二、核心代码解析:从文件遍历到数据导出1. 环境准备与依赖库