Java中ES5.3批量插入_bulk实现方案

2024-09-02 11:48

本文主要是介绍Java中ES5.3批量插入_bulk实现方案,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

ES5.5.3众所周知是十分尴尬的,用不了RestHighLevelClient, TransportClient可以用但是网上各种不建议使用,其实TransportClient在7.0.0才开始废弃,8.0.0之后才正式移除,现在的5.5.3版本完全是可以光明正大使用的,不过考虑到后面的升级,还是做出了妥协。

这里分享一下两种方案:

方案一:使用TransportClient:

1.pom文件

repository务必要指定,在mvnrepository是没有的,另外网上给出大多数都不用引x-pack-transport,直接使用transport,这对于自建的MySQL数据中是没有问题的,但是要同步阿里云的RDS就会报错

...

<dependency>

    <groupId>org.elasticsearch.client</groupId>

    <artifactId>transport</artifactId>

    <version>5.5.3</version>

</dependency>

<dependency>

    <groupId>org.elasticsearch.plugin</groupId>

    <artifactId>transport-netty3-client</artifactId>

    <version>5.5.3</version>

</dependency>

<dependency>

    <groupId>org.elasticsearch.client</groupId>

    <artifactId>x-pack-transport</artifactId>

    <version>5.5.3</version>

</dependency>

...

<repository>

    <id>elasticsearch-releases</id>

    <url>https://artifacts.elastic.co/maven</url>

    <releases>

        <enabled>true</enabled>

    </releases>

    <snapshots>

        <enabled>false</enabled>

    </snapshots>

</repository>

...

2. 配置

需要注意的是的(1)网上普遍的TransportClient都是通过PreBuiltTransportClient,这在RDS上面也是行不通的,需要使用PreBuiltXPackTransportClient;(2)阿里云的clusterName是es的实例id,并不是名称。

 

@Configuration

@Log4j2

public class ElasticSearchConfig {

 

    @Value("${icec.elasticsearch.host}")

    private String host;

 

    @Value("${icec.elasticsearch.tcpPort}")

    private int tcpPort;

 

    @Value("${icec.elasticsearch.clusterName}")

    private String clusterName;

 

    @Value("${icec.elasticsearch.username}")

    private String username;

 

    @Value("${icec.elasticsearch.password}")

    private String password;

 

    @Bean

    public TransportClient transportClient() {

 

        TransportClient transportClient = null;

        TransportClient preBuiltTransportClient = new PreBuiltXPackTransportClient(Settings.builder()

                .put("cluster.name", clusterName)

                .put("xpack.security.user", username + ":" + password)

                .put("client.transport.sniff"false)

                .build());

 

        try {

            transportClient = preBuiltTransportClient

                    .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(host), tcpPort));

        catch (UnknownHostException e) {

            log.warn(e);

        }

 

        return transportClient;

    }

}

3. 使用示例:

public void batchInsert(List<Map> datas) {

 

    if (CollectionUtils.isEmpty(datas)) {

        return;

    }

 

    BulkRequestBuilder bulkRequestBuilder = transportClient.prepareBulk();

    datas.forEach(data -> bulkRequestBuilder.add(transportClient.prepareIndex(ES_INDEX, ES_TYPE, (String) data.get(OrderEsConstant.ORDER_ID)).setSource(data)));

 

    BulkResponse bulkResponse = bulkRequestBuilder.execute().actionGet();

 

    List<String> failuerMessages = Arrays.asList(bulkResponse.getItems())

            .stream().filter(p -> StringUtils.isNotBlank(p.getFailureMessage()))

            .map(BulkItemResponse::getFailureMessage).collect(Collectors.toList());

 

    if (CollectionUtils.isNotEmpty(failuerMessages)) {

        log.info("同步失败订单->{}", failuerMessages);

    }

 

    log.info("批处理完成 总条数:{}", datas.size());

 

}

 

方案二:手写批量操作,通过RestClient实现

public void batchInsert(List<Map> orders) {

 

    StringBuilder bulkRequestBody = new StringBuilder();

    int count = 1;

 

    for (Map order : orders) {

 

        String actionMetaData = String.format("{ \"index\" : {\"_id\" : \"%s\"} }%n", order.get(OrderEsConstant.ORDER_ID));

        String orderJson = JSON.toJSONString(order, SerializerFeature.WriteNullStringAsEmpty);

        bulkRequestBody.append(actionMetaData);

        bulkRequestBody.append(orderJson);

        bulkRequestBody.append("\n");

 

        if (count % 5000 == 0 || count == orders.size()) {

 

            Response response;

            Map result = Maps.newHashMap();

            String router = String.format(BASIC_FORMAT, ES_INDEX, ES_TYPE, "_bulk");

 

            try {

                response = restClient.performRequest(POST, router, Collections.emptyMap(), new StringEntity(bulkRequestBody.toString(), ContentType.APPLICATION_JSON));

                result = objectMapper.readValue(response.getEntity().getContent(), Map.class);

            catch (IOException e) {

                log.info("ES批量插入异常");

            }

 

            List<Map> itemResults = ((List<Map>) result.get("items")).stream().map(p -> (Map) p.get("index")).collect(Collectors.toList());

            List errorMsgs = itemResults.stream().filter(p -> 200 != (Integer) p.get("status")).map(this::getErrorMsg).collect(Collectors.toList());

            if (CollectionUtils.isNotEmpty(errorMsgs)) {

                log.warn("数据插入失败 -> {}", errorMsgs);

            }

 

            log.info("批处理完成,总条数: {}", itemResults.size());

        }

        count++;

    }

}

方案一可以一劳永逸,后续一些高级的操作都可以通过TransportClient去实现,但是如果ES升级的话,就会有问题,官方是在8.0.0才移除该功能,但是阿里云的数据订阅的话只支持到5.5.3;

方案二暂时不用考虑后续升级的问题,但是只要涉及到高级操作,都需要自己去实现,例如upsert操作。

这篇关于Java中ES5.3批量插入_bulk实现方案的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1129874

相关文章

Java实现字节字符转bcd编码

《Java实现字节字符转bcd编码》BCD是一种将十进制数字编码为二进制的表示方式,常用于数字显示和存储,本文将介绍如何在Java中实现字节字符转BCD码的过程,需要的小伙伴可以了解下... 目录前言BCD码是什么Java实现字节转bcd编码方法补充总结前言BCD码(Binary-Coded Decima

防止Linux rm命令误操作的多场景防护方案与实践

《防止Linuxrm命令误操作的多场景防护方案与实践》在Linux系统中,rm命令是删除文件和目录的高效工具,但一旦误操作,如执行rm-rf/或rm-rf/*,极易导致系统数据灾难,本文针对不同场景... 目录引言理解 rm 命令及误操作风险rm 命令基础常见误操作案例防护方案使用 rm编程 别名及安全删除

SpringBoot全局域名替换的实现

《SpringBoot全局域名替换的实现》本文主要介绍了SpringBoot全局域名替换的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一... 目录 项目结构⚙️ 配置文件application.yml️ 配置类AppProperties.Ja

Java使用Javassist动态生成HelloWorld类

《Java使用Javassist动态生成HelloWorld类》Javassist是一个非常强大的字节码操作和定义库,它允许开发者在运行时创建新的类或者修改现有的类,本文将简单介绍如何使用Javass... 目录1. Javassist简介2. 环境准备3. 动态生成HelloWorld类3.1 创建CtC

JavaScript中的高级调试方法全攻略指南

《JavaScript中的高级调试方法全攻略指南》什么是高级JavaScript调试技巧,它比console.log有何优势,如何使用断点调试定位问题,通过本文,我们将深入解答这些问题,带您从理论到实... 目录观点与案例结合观点1观点2观点3观点4观点5高级调试技巧详解实战案例断点调试:定位变量错误性能分

使用Python批量将.ncm格式的音频文件转换为.mp3格式的实战详解

《使用Python批量将.ncm格式的音频文件转换为.mp3格式的实战详解》本文详细介绍了如何使用Python通过ncmdump工具批量将.ncm音频转换为.mp3的步骤,包括安装、配置ffmpeg环... 目录1. 前言2. 安装 ncmdump3. 实现 .ncm 转 .mp34. 执行过程5. 执行结

Python实现批量CSV转Excel的高性能处理方案

《Python实现批量CSV转Excel的高性能处理方案》在日常办公中,我们经常需要将CSV格式的数据转换为Excel文件,本文将介绍一个基于Python的高性能解决方案,感兴趣的小伙伴可以跟随小编一... 目录一、场景需求二、技术方案三、核心代码四、批量处理方案五、性能优化六、使用示例完整代码七、小结一、

Java实现将HTML文件与字符串转换为图片

《Java实现将HTML文件与字符串转换为图片》在Java开发中,我们经常会遇到将HTML内容转换为图片的需求,本文小编就来和大家详细讲讲如何使用FreeSpire.DocforJava库来实现这一功... 目录前言核心实现:html 转图片完整代码场景 1:转换本地 HTML 文件为图片场景 2:转换 H

Java使用jar命令配置服务器端口的完整指南

《Java使用jar命令配置服务器端口的完整指南》本文将详细介绍如何使用java-jar命令启动应用,并重点讲解如何配置服务器端口,同时提供一个实用的Web工具来简化这一过程,希望对大家有所帮助... 目录1. Java Jar文件简介1.1 什么是Jar文件1.2 创建可执行Jar文件2. 使用java

C#使用Spire.Doc for .NET实现HTML转Word的高效方案

《C#使用Spire.Docfor.NET实现HTML转Word的高效方案》在Web开发中,HTML内容的生成与处理是高频需求,然而,当用户需要将HTML页面或动态生成的HTML字符串转换为Wor... 目录引言一、html转Word的典型场景与挑战二、用 Spire.Doc 实现 HTML 转 Word1