【ES实战】ES创建Transports客户端时间过长分析

2023-10-22 06:36

本文主要是介绍【ES实战】ES创建Transports客户端时间过长分析,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

ES创建Transports客户端时间过长分析

2023年10月19日

文章目录

  • ES创建Transports客户端时间过长分析
    • 问题描述
    • 问题重现
    • 问题分析
      • 是否可以配置链接超时时间
      • 节点建立连接超时逻辑
      • 为啥超时间会出现翻倍
    • 优化方案

在创建ES Transport客户端的时,当出现以下场景时,影响连接速度。

问题描述

  1. 使用ES Transport 客户端创建与集群的链接。
  2. 连接地址里面有不存在的IP
  3. 在增加ES节点时,采用逐个增加的方式

整个建立链接的过程会非常耗时。

问题重现

采用jar依赖如下

        <dependency><groupId>org.elasticsearch.client</groupId><artifactId>transport</artifactId><version>5.6.16</version></dependency><dependency><groupId>org.elasticsearch.client</groupId><artifactId>x-pack-transport</artifactId><version>5.6.1</version></dependency><dependency><groupId>org.elasticsearch.client</groupId><artifactId>sniffer</artifactId><version>5.4.2</version></dependency>

创建连接代码如下

        final Settings settings = Settings.builder().put("cluster.name", "common-es").put("client.transport.sniff", true).build();final TransportClient transportClient = new PreBuiltXPackTransportClient(settings);long t1 = System.currentTimeMillis();transportClient.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("192.168.80.37"), 9800));logger.info("第1个错误节点耗时:" + (System.currentTimeMillis() - t1) / 1000);long t2 = System.currentTimeMillis();transportClient.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("192.168.80.38"), 9800));logger.info("第2个错误节点耗时:" + (System.currentTimeMillis() - t2) / 1000);long t3 = System.currentTimeMillis();transportClient.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("192.168.80.39"), 9800));logger.info("第3个错误节点耗时:" + (System.currentTimeMillis() - t3) / 1000);

输出结果

[2023-10-19 15:31:31,398] [main] [INFO ] xxx.Client - 第1个错误节点耗时:21
[2023-10-19 15:32:13,414] [main] [INFO ] xxx.Client - 第2个错误节点耗时:42
[2023-10-19 15:32:55,436] [main] [INFO ] xxx.Client - 第3个错误节点耗时:42

问题分析

是否可以配置链接超时时间

通过new PreBuiltXPackTransportClient()方法创建客户端,跟踪源码发现其会在TransportClient.buildTemplate进行建立网络模块服务,在继续debug,会发现会在TcpTransport中方法buildDefaultConnectionProfile构建链接的配置文件。发现其TCP_CONNECT_TIMEOUT默认的配置是30s,起对应的配置参数是transport.tcp.connect_timeout

    static ConnectionProfile buildDefaultConnectionProfile(Settings settings) {int connectionsPerNodeRecovery = CONNECTIONS_PER_NODE_RECOVERY.get(settings);int connectionsPerNodeBulk = CONNECTIONS_PER_NODE_BULK.get(settings);int connectionsPerNodeReg = CONNECTIONS_PER_NODE_REG.get(settings);int connectionsPerNodeState = CONNECTIONS_PER_NODE_STATE.get(settings);int connectionsPerNodePing = CONNECTIONS_PER_NODE_PING.get(settings);ConnectionProfile.Builder builder = new ConnectionProfile.Builder();// 链接的超时时间builder.setConnectTimeout(TCP_CONNECT_TIMEOUT.get(settings));builder.setHandshakeTimeout(TCP_CONNECT_TIMEOUT.get(settings));builder.addConnections(connectionsPerNodeBulk, TransportRequestOptions.Type.BULK);builder.addConnections(connectionsPerNodePing, TransportRequestOptions.Type.PING);// if we are not master eligible we don't need a dedicated channel to publish the statebuilder.addConnections(DiscoveryNode.isMasterNode(settings) ? connectionsPerNodeState : 0, TransportRequestOptions.Type.STATE);// if we are not a data-node we don't need any dedicated channels for recoverybuilder.addConnections(DiscoveryNode.isDataNode(settings) ? connectionsPerNodeRecovery : 0, TransportRequestOptions.Type.RECOVERY);builder.addConnections(connectionsPerNodeReg, TransportRequestOptions.Type.REG);return builder.build();}
public static final Setting<TimeValue> TCP_CONNECT_TIMEOUT =timeSetting("transport.tcp.connect_timeout", NetworkService.TcpSettings.TCP_CONNECT_TIMEOUT, Setting.Property.NodeScope);

节点建立连接超时逻辑

TcpTransport.openConnection(DiscoveryNode node, ConnectionProfile connectionProfile)方法建立通信管道时,在通信之前重组连接的默认配置和自定义配置。在Netty4Transport.connectToChannels()方法内具体生效,future.awaitUninterruptibly((long) (connectTimeout.millis() * 1.5));

增加节点的方式

TransportClient类提供了数组方式增加节点和单个节点增加的方式,

    public TransportClient addTransportAddress(TransportAddress transportAddress) {nodesService.addTransportAddresses(transportAddress);return this;}public TransportClient addTransportAddresses(TransportAddress... transportAddress) {nodesService.addTransportAddresses(transportAddress);return this;}

不过根据代码,发现其都是调用的TransportClientNodesService类的addTransportAddresses(TransportAddress... transportAddresses)方法

    public TransportClientNodesService addTransportAddresses(TransportAddress... transportAddresses) {// 竞争对象锁mutexsynchronized (mutex) {if (closed) {throw new IllegalStateException("transport client is closed, can't add an address");}List<TransportAddress> filtered = new ArrayList<>(transportAddresses.length);for (TransportAddress transportAddress : transportAddresses) {boolean found = false;for (DiscoveryNode otherNode : listedNodes) {// 方式连接地址值重复,会自动过滤if (otherNode.getAddress().equals(transportAddress)) {found = true;logger.debug("address [{}] already exists with [{}], ignoring...", transportAddress, otherNode);break;}}if (!found) {filtered.add(transportAddress);}}if (filtered.isEmpty()) {return this;}List<DiscoveryNode> builder = new ArrayList<>(listedNodes);for (TransportAddress transportAddress : filtered) {DiscoveryNode node = new DiscoveryNode("#transport#-" + tempNodeIdGenerator.incrementAndGet(),transportAddress, Collections.emptyMap(), Collections.emptySet(), minCompatibilityVersion);logger.debug("adding address [{}]", node);builder.add(node);}// listNodes里面存放的是配置的连接节点列表listedNodes = Collections.unmodifiableList(builder);// 调用不同的节点采集-里面也对mutex锁进行竞争nodesSampler.sample();}return this;}

NodeSampler.sample()

		public void sample() {synchronized (mutex) {if (closed) {return;}doSample();}}

NodesSampler有两个具体的继承实现类

  • SniffNodesSampler:开启嗅探属性的客户端
  • SimpleNodeSampler:简单客户端

这边对SniffNodesSamplersample()方法进行分析。

        @Overrideprotected void doSample() {Set<DiscoveryNode> nodesToPing = new HashSet<>();// 最新要进行连接的一组节点列表for (DiscoveryNode node : listedNodes) {nodesToPing.add(node);}// nodes代表已经连接上的节点列表for (DiscoveryNode node : nodes) {nodesToPing.add(node);}// 并发控制辅助类final CountDownLatch latch = new CountDownLatch(nodesToPing.size());final ConcurrentMap<DiscoveryNode, ClusterStateResponse> clusterStateResponses = ConcurrentCollections.newConcurrentMap();try {for (final DiscoveryNode nodeToPing : nodesToPing) {// 采用线程池的方式去连接节点threadPool.executor(ThreadPool.Names.MANAGEMENT).execute(new AbstractRunnable() {Transport.Connection connectionToClose = null;void onDone() {try {IOUtils.closeWhileHandlingException(connectionToClose);} finally {latch.countDown();}}@Overridepublic void onFailure(Exception e) {onDone();......}@Overrideprotected void doRun() throws Exception {Transport.Connection pingConnection = null;if (nodes.contains(nodeToPing)) {try {pingConnection = transportService.getConnection(nodeToPing);} catch (NodeNotConnectedException e) {// will use a temp connection}}if (pingConnection == null) {logger.trace("connecting to cluster node [{}]", nodeToPing);// 尝试去连接节点,超时会抛出异常connectionToClose = transportService.openConnection(nodeToPing, LISTED_NODES_PROFILE);pingConnection = connectionToClose;}// 若有一个节点连接成功会进行集群状态查询,返回值里面包含了全部可用节点transportService.sendRequest(pingConnection, ClusterStateAction.NAME,Requests.clusterStateRequest().clear().nodes(true).local(true),TransportRequestOptions.builder().withType(TransportRequestOptions.Type.STATE).withTimeout(pingTimeout).build(),new TransportResponseHandler<ClusterStateResponse>() {@Overridepublic ClusterStateResponse newInstance() {return new ClusterStateResponse();}@Overridepublic String executor() {return ThreadPool.Names.SAME;}@Overridepublic void handleResponse(ClusterStateResponse response) {clusterStateResponses.put(nodeToPing, response);onDone();}@Overridepublic void handleException(TransportException e) {logger.info((Supplier<?>) () -> new ParameterizedMessage("failed to get local cluster state for {}, disconnecting...", nodeToPing), e);try {hostFailureListener.onNodeDisconnected(nodeToPing, e);} finally {onDone();}}});}});}latch.await();} catch (InterruptedException e) {Thread.currentThread().interrupt();return;}HashSet<DiscoveryNode> newNodes = new HashSet<>();HashSet<DiscoveryNode> newFilteredNodes = new HashSet<>();for (Map.Entry<DiscoveryNode, ClusterStateResponse> entry : clusterStateResponses.entrySet()) {if (!ignoreClusterName && !clusterName.equals(entry.getValue().getClusterName())) {logger.warn("node {} not part of the cluster {}, ignoring...",entry.getValue().getState().nodes().getLocalNode(), clusterName);newFilteredNodes.add(entry.getKey());continue;}for (ObjectCursor<DiscoveryNode> cursor : entry.getValue().getState().nodes().getDataNodes().values()) {newNodes.add(cursor.value);}}// 验证新节点是否可连接nodes = validateNewNodes(newNodes);filteredNodes = Collections.unmodifiableList(new ArrayList<>(newFilteredNodes));}

通过代码发现,其实用了线程池并发连接节点,但是也使用了CountDownLatch,这就导致了,如果有一个节点超时,那整个批次都需要等待这么长的时间。典型的长尾效应

为啥超时间会出现翻倍

建立TransportClientNodesService服务时,构造函数中增加了对NodeSampler的调度。

    TransportClientNodesService(Settings settings, TransportService transportService,ThreadPool threadPool, TransportClient.HostFailureListener hostFailureListener) {.........this.nodesSamplerFuture = threadPool.schedule(nodesSamplerInterval, ThreadPool.Names.GENERIC, new ScheduledNodeSampler());}

ScheduledNodeSampler

当调度触发之后,也会去执行nodesSampler.sample();,也就对mutex锁有了竞争,当调用增加连接方法之后,就会有两次调用 nodesSampler.sample();也就会将超时时间翻倍。

class ScheduledNodeSampler implements Runnable {@Overridepublic void run() {try {nodesSampler.sample();if (!closed) {nodesSamplerFuture = threadPool.schedule(nodesSamplerInterval, ThreadPool.Names.GENERIC, this);}} catch (Exception e) {logger.warn("failed to sample", e);}}
}

优化方案

  1. Settings增加超时transport的tcp超时配置。

     final Settings settings = Settings.builder().put("cluster.name", "common-es").put("transport.tcp.connect_timeout", "5s").put("client.transport.sniff", true).build();
    

    注意此配置的参数名不同版本之间存在差异。

  2. 使用数组方式增加连接节点,减少反复调用TransportClientNodesService addTransportAddresses次数,就是在减少分批次的产生阻塞耗时

这篇关于【ES实战】ES创建Transports客户端时间过长分析的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/259722

相关文章

Python并行处理实战之如何使用ProcessPoolExecutor加速计算

《Python并行处理实战之如何使用ProcessPoolExecutor加速计算》Python提供了多种并行处理的方式,其中concurrent.futures模块的ProcessPoolExecu... 目录简介完整代码示例代码解释1. 导入必要的模块2. 定义处理函数3. 主函数4. 生成数字列表5.

Python中使用uv创建环境及原理举例详解

《Python中使用uv创建环境及原理举例详解》uv是Astral团队开发的高性能Python工具,整合包管理、虚拟环境、Python版本控制等功能,:本文主要介绍Python中使用uv创建环境及... 目录一、uv工具简介核心特点:二、安装uv1. 通过pip安装2. 通过脚本安装验证安装:配置镜像源(可

C++ 函数 strftime 和时间格式示例详解

《C++函数strftime和时间格式示例详解》strftime是C/C++标准库中用于格式化日期和时间的函数,定义在ctime头文件中,它将tm结构体中的时间信息转换为指定格式的字符串,是处理... 目录C++ 函数 strftipythonme 详解一、函数原型二、功能描述三、格式字符串说明四、返回值五

MyBatis Plus 中 update_time 字段自动填充失效的原因分析及解决方案(最新整理)

《MyBatisPlus中update_time字段自动填充失效的原因分析及解决方案(最新整理)》在使用MyBatisPlus时,通常我们会在数据库表中设置create_time和update... 目录前言一、问题现象二、原因分析三、总结:常见原因与解决方法对照表四、推荐写法前言在使用 MyBATis

从基础到进阶详解Pandas时间数据处理指南

《从基础到进阶详解Pandas时间数据处理指南》Pandas构建了完整的时间数据处理生态,核心由四个基础类构成,Timestamp,DatetimeIndex,Period和Timedelta,下面我... 目录1. 时间数据类型与基础操作1.1 核心时间对象体系1.2 时间数据生成技巧2. 时间索引与数据

Python主动抛出异常的各种用法和场景分析

《Python主动抛出异常的各种用法和场景分析》在Python中,我们不仅可以捕获和处理异常,还可以主动抛出异常,也就是以类的方式自定义错误的类型和提示信息,这在编程中非常有用,下面我将详细解释主动抛... 目录一、为什么要主动抛出异常?二、基本语法:raise关键字基本示例三、raise的多种用法1. 抛

MYSQL查询结果实现发送给客户端

《MYSQL查询结果实现发送给客户端》:本文主要介绍MYSQL查询结果实现发送给客户端方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录mysql取数据和发数据的流程(边读边发)Sending to clientSending DataLRU(Least Rec

Java中实现线程的创建和启动的方法

《Java中实现线程的创建和启动的方法》在Java中,实现线程的创建和启动是两个不同但紧密相关的概念,理解为什么要启动线程(调用start()方法)而非直接调用run()方法,是掌握多线程编程的关键,... 目录1. 线程的生命周期2. start() vs run() 的本质区别3. 为什么必须通过 st

github打不开的问题分析及解决

《github打不开的问题分析及解决》:本文主要介绍github打不开的问题分析及解决,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、找到github.com域名解析的ip地址二、找到github.global.ssl.fastly.net网址解析的ip地址三

Python FastMCP构建MCP服务端与客户端的详细步骤

《PythonFastMCP构建MCP服务端与客户端的详细步骤》MCP(Multi-ClientProtocol)是一种用于构建可扩展服务的通信协议框架,本文将使用FastMCP搭建一个支持St... 目录简介环境准备服务端实现(server.py)客户端实现(client.py)运行效果扩展方向常见问题结