Java中ES5.3批量插入_bulk实现方案

2024-09-02 11:48

本文主要是介绍Java中ES5.3批量插入_bulk实现方案,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

ES5.5.3众所周知是十分尴尬的,用不了RestHighLevelClient, TransportClient可以用但是网上各种不建议使用,其实TransportClient在7.0.0才开始废弃,8.0.0之后才正式移除,现在的5.5.3版本完全是可以光明正大使用的,不过考虑到后面的升级,还是做出了妥协。

这里分享一下两种方案:

方案一:使用TransportClient:

1.pom文件

repository务必要指定,在mvnrepository是没有的,另外网上给出大多数都不用引x-pack-transport,直接使用transport,这对于自建的MySQL数据中是没有问题的,但是要同步阿里云的RDS就会报错

...

<dependency>

    <groupId>org.elasticsearch.client</groupId>

    <artifactId>transport</artifactId>

    <version>5.5.3</version>

</dependency>

<dependency>

    <groupId>org.elasticsearch.plugin</groupId>

    <artifactId>transport-netty3-client</artifactId>

    <version>5.5.3</version>

</dependency>

<dependency>

    <groupId>org.elasticsearch.client</groupId>

    <artifactId>x-pack-transport</artifactId>

    <version>5.5.3</version>

</dependency>

...

<repository>

    <id>elasticsearch-releases</id>

    <url>https://artifacts.elastic.co/maven</url>

    <releases>

        <enabled>true</enabled>

    </releases>

    <snapshots>

        <enabled>false</enabled>

    </snapshots>

</repository>

...

2. 配置

需要注意的是的(1)网上普遍的TransportClient都是通过PreBuiltTransportClient,这在RDS上面也是行不通的,需要使用PreBuiltXPackTransportClient;(2)阿里云的clusterName是es的实例id,并不是名称。

 

@Configuration

@Log4j2

public class ElasticSearchConfig {

 

    @Value("${icec.elasticsearch.host}")

    private String host;

 

    @Value("${icec.elasticsearch.tcpPort}")

    private int tcpPort;

 

    @Value("${icec.elasticsearch.clusterName}")

    private String clusterName;

 

    @Value("${icec.elasticsearch.username}")

    private String username;

 

    @Value("${icec.elasticsearch.password}")

    private String password;

 

    @Bean

    public TransportClient transportClient() {

 

        TransportClient transportClient = null;

        TransportClient preBuiltTransportClient = new PreBuiltXPackTransportClient(Settings.builder()

                .put("cluster.name", clusterName)

                .put("xpack.security.user", username + ":" + password)

                .put("client.transport.sniff"false)

                .build());

 

        try {

            transportClient = preBuiltTransportClient

                    .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(host), tcpPort));

        catch (UnknownHostException e) {

            log.warn(e);

        }

 

        return transportClient;

    }

}

3. 使用示例:

public void batchInsert(List<Map> datas) {

 

    if (CollectionUtils.isEmpty(datas)) {

        return;

    }

 

    BulkRequestBuilder bulkRequestBuilder = transportClient.prepareBulk();

    datas.forEach(data -> bulkRequestBuilder.add(transportClient.prepareIndex(ES_INDEX, ES_TYPE, (String) data.get(OrderEsConstant.ORDER_ID)).setSource(data)));

 

    BulkResponse bulkResponse = bulkRequestBuilder.execute().actionGet();

 

    List<String> failuerMessages = Arrays.asList(bulkResponse.getItems())

            .stream().filter(p -> StringUtils.isNotBlank(p.getFailureMessage()))

            .map(BulkItemResponse::getFailureMessage).collect(Collectors.toList());

 

    if (CollectionUtils.isNotEmpty(failuerMessages)) {

        log.info("同步失败订单->{}", failuerMessages);

    }

 

    log.info("批处理完成 总条数:{}", datas.size());

 

}

 

方案二:手写批量操作,通过RestClient实现

public void batchInsert(List<Map> orders) {

 

    StringBuilder bulkRequestBody = new StringBuilder();

    int count = 1;

 

    for (Map order : orders) {

 

        String actionMetaData = String.format("{ \"index\" : {\"_id\" : \"%s\"} }%n", order.get(OrderEsConstant.ORDER_ID));

        String orderJson = JSON.toJSONString(order, SerializerFeature.WriteNullStringAsEmpty);

        bulkRequestBody.append(actionMetaData);

        bulkRequestBody.append(orderJson);

        bulkRequestBody.append("\n");

 

        if (count % 5000 == 0 || count == orders.size()) {

 

            Response response;

            Map result = Maps.newHashMap();

            String router = String.format(BASIC_FORMAT, ES_INDEX, ES_TYPE, "_bulk");

 

            try {

                response = restClient.performRequest(POST, router, Collections.emptyMap(), new StringEntity(bulkRequestBody.toString(), ContentType.APPLICATION_JSON));

                result = objectMapper.readValue(response.getEntity().getContent(), Map.class);

            catch (IOException e) {

                log.info("ES批量插入异常");

            }

 

            List<Map> itemResults = ((List<Map>) result.get("items")).stream().map(p -> (Map) p.get("index")).collect(Collectors.toList());

            List errorMsgs = itemResults.stream().filter(p -> 200 != (Integer) p.get("status")).map(this::getErrorMsg).collect(Collectors.toList());

            if (CollectionUtils.isNotEmpty(errorMsgs)) {

                log.warn("数据插入失败 -> {}", errorMsgs);

            }

 

            log.info("批处理完成,总条数: {}", itemResults.size());

        }

        count++;

    }

}

方案一可以一劳永逸,后续一些高级的操作都可以通过TransportClient去实现,但是如果ES升级的话,就会有问题,官方是在8.0.0才移除该功能,但是阿里云的数据订阅的话只支持到5.5.3;

方案二暂时不用考虑后续升级的问题,但是只要涉及到高级操作,都需要自己去实现,例如upsert操作。

这篇关于Java中ES5.3批量插入_bulk实现方案的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1129874

相关文章

SpringBoot监控API请求耗时的6中解决解决方案

《SpringBoot监控API请求耗时的6中解决解决方案》本文介绍SpringBoot中记录API请求耗时的6种方案,包括手动埋点、AOP切面、拦截器、Filter、事件监听、Micrometer+... 目录1. 简介2.实战案例2.1 手动记录2.2 自定义AOP记录2.3 拦截器技术2.4 使用Fi

最新Spring Security的基于内存用户认证方式

《最新SpringSecurity的基于内存用户认证方式》本文讲解SpringSecurity内存认证配置,适用于开发、测试等场景,通过代码创建用户及权限管理,支持密码加密,虽简单但不持久化,生产环... 目录1. 前言2. 因何选择内存认证?3. 基础配置实战❶ 创建Spring Security配置文件

Python对接支付宝支付之使用AliPay实现的详细操作指南

《Python对接支付宝支付之使用AliPay实现的详细操作指南》支付宝没有提供PythonSDK,但是强大的github就有提供python-alipay-sdk,封装里很多复杂操作,使用这个我们就... 目录一、引言二、准备工作2.1 支付宝开放平台入驻与应用创建2.2 密钥生成与配置2.3 安装ali

Spring Security 单点登录与自动登录机制的实现原理

《SpringSecurity单点登录与自动登录机制的实现原理》本文探讨SpringSecurity实现单点登录(SSO)与自动登录机制,涵盖JWT跨系统认证、RememberMe持久化Token... 目录一、核心概念解析1.1 单点登录(SSO)1.2 自动登录(Remember Me)二、代码分析三、

PyCharm中配置PyQt的实现步骤

《PyCharm中配置PyQt的实现步骤》PyCharm是JetBrains推出的一款强大的PythonIDE,结合PyQt可以进行pythion高效开发桌面GUI应用程序,本文就来介绍一下PyCha... 目录1. 安装China编程PyQt1.PyQt 核心组件2. 基础 PyQt 应用程序结构3. 使用 Q

springboot自定义注解RateLimiter限流注解技术文档详解

《springboot自定义注解RateLimiter限流注解技术文档详解》文章介绍了限流技术的概念、作用及实现方式,通过SpringAOP拦截方法、缓存存储计数器,结合注解、枚举、异常类等核心组件,... 目录什么是限流系统架构核心组件详解1. 限流注解 (@RateLimiter)2. 限流类型枚举 (

Java Thread中join方法使用举例详解

《JavaThread中join方法使用举例详解》JavaThread中join()方法主要是让调用改方法的thread完成run方法里面的东西后,在执行join()方法后面的代码,这篇文章主要介绍... 目录前言1.join()方法的定义和作用2.join()方法的三个重载版本3.join()方法的工作原

Spring AI使用tool Calling和MCP的示例详解

《SpringAI使用toolCalling和MCP的示例详解》SpringAI1.0.0.M6引入ToolCalling与MCP协议,提升AI与工具交互的扩展性与标准化,支持信息检索、行动执行等... 目录深入探索 Spring AI聊天接口示例Function CallingMCPSTDIOSSE结束语

Java获取当前时间String类型和Date类型方式

《Java获取当前时间String类型和Date类型方式》:本文主要介绍Java获取当前时间String类型和Date类型方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,... 目录Java获取当前时间String和Date类型String类型和Date类型输出结果总结Java获取

Spring Boot Actuator应用监控与管理的详细步骤

《SpringBootActuator应用监控与管理的详细步骤》SpringBootActuator是SpringBoot的监控工具,提供健康检查、性能指标、日志管理等核心功能,支持自定义和扩展端... 目录一、 Spring Boot Actuator 概述二、 集成 Spring Boot Actuat