Java中ES5.3批量插入_bulk实现方案

2024-09-02 11:48

本文主要是介绍Java中ES5.3批量插入_bulk实现方案,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

ES5.5.3众所周知是十分尴尬的,用不了RestHighLevelClient, TransportClient可以用但是网上各种不建议使用,其实TransportClient在7.0.0才开始废弃,8.0.0之后才正式移除,现在的5.5.3版本完全是可以光明正大使用的,不过考虑到后面的升级,还是做出了妥协。

这里分享一下两种方案:

方案一:使用TransportClient:

1.pom文件

repository务必要指定,在mvnrepository是没有的,另外网上给出大多数都不用引x-pack-transport,直接使用transport,这对于自建的MySQL数据中是没有问题的,但是要同步阿里云的RDS就会报错

...

<dependency>

    <groupId>org.elasticsearch.client</groupId>

    <artifactId>transport</artifactId>

    <version>5.5.3</version>

</dependency>

<dependency>

    <groupId>org.elasticsearch.plugin</groupId>

    <artifactId>transport-netty3-client</artifactId>

    <version>5.5.3</version>

</dependency>

<dependency>

    <groupId>org.elasticsearch.client</groupId>

    <artifactId>x-pack-transport</artifactId>

    <version>5.5.3</version>

</dependency>

...

<repository>

    <id>elasticsearch-releases</id>

    <url>https://artifacts.elastic.co/maven</url>

    <releases>

        <enabled>true</enabled>

    </releases>

    <snapshots>

        <enabled>false</enabled>

    </snapshots>

</repository>

...

2. 配置

需要注意的是的(1)网上普遍的TransportClient都是通过PreBuiltTransportClient,这在RDS上面也是行不通的,需要使用PreBuiltXPackTransportClient;(2)阿里云的clusterName是es的实例id,并不是名称。

 

@Configuration

@Log4j2

public class ElasticSearchConfig {

 

    @Value("${icec.elasticsearch.host}")

    private String host;

 

    @Value("${icec.elasticsearch.tcpPort}")

    private int tcpPort;

 

    @Value("${icec.elasticsearch.clusterName}")

    private String clusterName;

 

    @Value("${icec.elasticsearch.username}")

    private String username;

 

    @Value("${icec.elasticsearch.password}")

    private String password;

 

    @Bean

    public TransportClient transportClient() {

 

        TransportClient transportClient = null;

        TransportClient preBuiltTransportClient = new PreBuiltXPackTransportClient(Settings.builder()

                .put("cluster.name", clusterName)

                .put("xpack.security.user", username + ":" + password)

                .put("client.transport.sniff"false)

                .build());

 

        try {

            transportClient = preBuiltTransportClient

                    .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(host), tcpPort));

        catch (UnknownHostException e) {

            log.warn(e);

        }

 

        return transportClient;

    }

}

3. 使用示例:

public void batchInsert(List<Map> datas) {

 

    if (CollectionUtils.isEmpty(datas)) {

        return;

    }

 

    BulkRequestBuilder bulkRequestBuilder = transportClient.prepareBulk();

    datas.forEach(data -> bulkRequestBuilder.add(transportClient.prepareIndex(ES_INDEX, ES_TYPE, (String) data.get(OrderEsConstant.ORDER_ID)).setSource(data)));

 

    BulkResponse bulkResponse = bulkRequestBuilder.execute().actionGet();

 

    List<String> failuerMessages = Arrays.asList(bulkResponse.getItems())

            .stream().filter(p -> StringUtils.isNotBlank(p.getFailureMessage()))

            .map(BulkItemResponse::getFailureMessage).collect(Collectors.toList());

 

    if (CollectionUtils.isNotEmpty(failuerMessages)) {

        log.info("同步失败订单->{}", failuerMessages);

    }

 

    log.info("批处理完成 总条数:{}", datas.size());

 

}

 

方案二:手写批量操作,通过RestClient实现

public void batchInsert(List<Map> orders) {

 

    StringBuilder bulkRequestBody = new StringBuilder();

    int count = 1;

 

    for (Map order : orders) {

 

        String actionMetaData = String.format("{ \"index\" : {\"_id\" : \"%s\"} }%n", order.get(OrderEsConstant.ORDER_ID));

        String orderJson = JSON.toJSONString(order, SerializerFeature.WriteNullStringAsEmpty);

        bulkRequestBody.append(actionMetaData);

        bulkRequestBody.append(orderJson);

        bulkRequestBody.append("\n");

 

        if (count % 5000 == 0 || count == orders.size()) {

 

            Response response;

            Map result = Maps.newHashMap();

            String router = String.format(BASIC_FORMAT, ES_INDEX, ES_TYPE, "_bulk");

 

            try {

                response = restClient.performRequest(POST, router, Collections.emptyMap(), new StringEntity(bulkRequestBody.toString(), ContentType.APPLICATION_JSON));

                result = objectMapper.readValue(response.getEntity().getContent(), Map.class);

            catch (IOException e) {

                log.info("ES批量插入异常");

            }

 

            List<Map> itemResults = ((List<Map>) result.get("items")).stream().map(p -> (Map) p.get("index")).collect(Collectors.toList());

            List errorMsgs = itemResults.stream().filter(p -> 200 != (Integer) p.get("status")).map(this::getErrorMsg).collect(Collectors.toList());

            if (CollectionUtils.isNotEmpty(errorMsgs)) {

                log.warn("数据插入失败 -> {}", errorMsgs);

            }

 

            log.info("批处理完成,总条数: {}", itemResults.size());

        }

        count++;

    }

}

方案一可以一劳永逸,后续一些高级的操作都可以通过TransportClient去实现,但是如果ES升级的话,就会有问题,官方是在8.0.0才移除该功能,但是阿里云的数据订阅的话只支持到5.5.3;

方案二暂时不用考虑后续升级的问题,但是只要涉及到高级操作,都需要自己去实现,例如upsert操作。

这篇关于Java中ES5.3批量插入_bulk实现方案的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!


原文地址:
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.chinasem.cn/article/1129874

相关文章

深入解析 Java Future 类及代码示例

《深入解析JavaFuture类及代码示例》JavaFuture是java.util.concurrent包中用于表示异步计算结果的核心接口,下面给大家介绍JavaFuture类及实例代码,感兴... 目录一、Future 类概述二、核心工作机制代码示例执行流程2. 状态机模型3. 核心方法解析行为总结:三

Spring @RequestMapping 注解及使用技巧详解

《Spring@RequestMapping注解及使用技巧详解》@RequestMapping是SpringMVC中定义请求映射规则的核心注解,用于将HTTP请求映射到Controller处理方法... 目录一、核心作用二、关键参数说明三、快捷组合注解四、动态路径参数(@PathVariable)五、匹配请

Java -jar命令如何运行外部依赖JAR包

《Java-jar命令如何运行外部依赖JAR包》在Java应用部署中,java-jar命令是启动可执行JAR包的标准方式,但当应用需要依赖外部JAR文件时,直接使用java-jar会面临类加载困... 目录引言:外部依赖JAR的必要性一、问题本质:类加载机制的限制1. Java -jar的默认行为2. 类加

Java进程CPU使用率过高排查步骤详细讲解

《Java进程CPU使用率过高排查步骤详细讲解》:本文主要介绍Java进程CPU使用率过高排查的相关资料,针对Java进程CPU使用率高的问题,我们可以遵循以下步骤进行排查和优化,文中通过代码介绍... 目录前言一、初步定位问题1.1 确认进程状态1.2 确定Java进程ID1.3 快速生成线程堆栈二、分析

Swagger在java中的运用及常见问题解决

《Swagger在java中的运用及常见问题解决》Swagger插件是一款深受Java开发者喜爱的工具,它在前后端分离的开发模式下发挥着重要作用,:本文主要介绍Swagger在java中的运用及常... 目录前言1. Swagger 的主要功能1.1 交互式 API 文档1.2 客户端 SDK 生成1.3

Python实现自动化Word文档样式复制与内容生成

《Python实现自动化Word文档样式复制与内容生成》在办公自动化领域,高效处理Word文档的样式和内容复制是一个常见需求,本文将展示如何利用Python的python-docx库实现... 目录一、为什么需要自动化 Word 文档处理二、核心功能实现:样式与表格的深度复制1. 表格复制(含样式与内容)2

Java中的登录技术保姆级详细教程

《Java中的登录技术保姆级详细教程》:本文主要介绍Java中登录技术保姆级详细教程的相关资料,在Java中我们可以使用各种技术和框架来实现这些功能,文中通过代码介绍的非常详细,需要的朋友可以参考... 目录1.登录思路2.登录标记1.会话技术2.会话跟踪1.Cookie技术2.Session技术3.令牌技

Java 枚举的基本使用方法及实际使用场景

《Java枚举的基本使用方法及实际使用场景》枚举是Java中一种特殊的类,用于定义一组固定的常量,枚举类型提供了更好的类型安全性和可读性,适用于需要定义一组有限且固定的值的场景,本文给大家介绍Jav... 目录一、什么是枚举?二、枚举的基本使用方法定义枚举三、实际使用场景代替常量状态机四、更多用法1.实现接

python获取cmd环境变量值的实现代码

《python获取cmd环境变量值的实现代码》:本文主要介绍在Python中获取命令行(cmd)环境变量的值,可以使用标准库中的os模块,需要的朋友可以参考下... 前言全局说明在执行py过程中,总要使用到系统环境变量一、说明1.1 环境:Windows 11 家庭版 24H2 26100.4061

java String.join()方法实例详解

《javaString.join()方法实例详解》String.join()是Java提供的一个实用方法,用于将多个字符串按照指定的分隔符连接成一个字符串,这一方法是Java8中引入的,极大地简化了... 目录bVARxMJava String.join() 方法详解1. 方法定义2. 基本用法2.1 拼接