从零开发短视频电商 Low Level Client(推荐)连接OpenSearch进行CRUD

本文主要是介绍从零开发短视频电商 Low Level Client(推荐)连接OpenSearch进行CRUD,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

文章目录

      • 依赖
      • 初始化客户端
      • 发起请求
      • 请求参数
      • 请求头
      • 设置超时时间
      • 设置线程数
      • 设置用户名密码
      • 结果解析
      • 节点选择器
      • 配置嗅探器
      • 整体示例
      • 问题
      • 参考

OpenSearch开发环境安装Docker和Docker-Compose两种方式

依赖

<dependency><groupId>org.elasticsearch.client</groupId><artifactId>elasticsearch-rest-client</artifactId><version>7.13.4</version>  <!-- 建议就是这个版本 -->
</dependency>
<!-- 或者 -->
<dependency><groupId>org.opensearch.client</groupId><artifactId>opensearch-java</artifactId><version>2.8.1</version>
</dependency>

初始化客户端

// 构建客户端
RestClient restClient = RestClient.builder(new HttpHost("localhost", 9200, "http"),new HttpHost("localhost", 9201, "http")).build();

发起请求

  • performRequest: 是同步请求方法: 将阻塞调用线程,并在请求成功时返回响应,或在请求失败时引发异常
  • performRequestAsync: 是异步方法:接收一个ResponseListener对象作为参数。如果请求成功,则该参数使用响应进行调用;如果请求失败,则使用异常进行调用
// 同步请求Request request = new Request("GET","/posts/_search");Response response = restClient.performRequest(request); // 执行同步请求response.toString();// 异步请求Request request = new Request("GET", "/posts/_search");restClient.performRequestAsync(request, new ResponseListener() {@Overridepublic void onSuccess(Response response) {log.info("异步请求成功!" + response.toString());}@Overridepublic void onFailure(Exception e) {log.error("异步请求失败!");e.printStackTrace();}});

请求参数

// 第一种
request.addParameter("pretty","true");
// 第二种
request.setEntity(new NStringEntity("{\"json\":\"text\"}",ContentType.APPLICATION_JSON));
// 第三种
request.setJsonEntity("{\"json\":\"text\"}");

请求头

RequestOptions.Builder builder = RequestOptions.DEFAULT.toBuilder();builder.addHeader("Authorization", "Bearer " + "my-token");builder.setHttpAsyncResponseConsumerFactory(new HttpAsyncResponseConsumerFactory.HeapBufferedResponseConsumerFactory(30 * 1024 * 1024 * 1024));COMMON_OPTIONS = builder.build();
Request request = new Request("GET", "/");
request.setOptions(COMMON_OPTIONS);

设置超时时间

 RestClientBuilder builder = RestClient.builder(new HttpHost("localhost", 9200, "http"));builder.setRequestConfigCallback(new RestClientBuilder.RequestConfigCallback() {@Overridepublic RequestConfig.Builder customizeRequestConfig(RequestConfig.Builder builder) {return builder.setConnectTimeout(50000) // 连接超时默认1s .setSocketTimeout(10000); // 套接字超时默认30s.setConnectionRequestTimeout(10000);}});

设置线程数

Apache HTTP异常客户端默认启动一个调度程序线程,连接管理器使用多个工作线程。

        RestClientBuilder builder = RestClient.builder(new HttpHost("localhost", 9200)).setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {@Overridepublic HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpAsyncClientBuilder) {return httpAsyncClientBuilder.setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(threadNumber).build());}});

设置用户名密码

// 创建凭证提供程序,设置用户名和密码
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials("admin", "admin"));// 使用 RestClient 构建器连接到 OpenSearch
RestClient restClient = RestClient.builder(new HttpHost("localhost", 9200, "http")).setHttpClientConfigCallback(httpClientBuilder -> {// 配置连接超时,连接建立后两个节点之间数据传输的套接字超时和连接请求超时// 连接超时:客户端和服务器建立连接的最长时间RequestConfig.Builder requestConfigBuilder = RequestConfig.custom().setConnectTimeout(5000)  // 连接超时为5秒.setSocketTimeout(10000) // 套接字超时为10秒.setConnectionRequestTimeout(10000); // 连接请求超时为10秒httpClientBuilder.setDefaultRequestConfig(requestConfigBuilder.build());// 设置凭证提供程序httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);return httpClientBuilder;}).build();

结果解析

            Response response = restClient.performRequest(new Request("GET", "/"));// 已执行请求的信息RequestLine requestLine = response.getRequestLine();// Host返回的信息HttpHost httpHost = response.getHost();// 响应状态行,从中解析状态代码int statusCode = response.getStatusLine().getStatusCode();// 响应头,可以通过getHeader(string)按名称获取Header[] headers = response.getHeaders();String responseBody = EntityUtils.toString(response.getEntity());

节点选择器

在默认情况下,客户端以轮询的方式将每个请求发送到配置的各个节点中
ES允许用户自由选择要连接的节点,通过初始化客户端来配置节点选择器,以便筛选节点。该功能在启用嗅探器时可以用来防止HTTP请求只命中专用的主节点。
配置后,对于每个请求,客户端都通过节点选择器来筛选备选节点。

        RestClientBuilder builder = RestClient.builder(new HttpHost("localhost",9200,"http"));builder.setNodeSelector(new NodeSelector(){@Overridepublic void select(Iterable<Node> nodes){boolean foundOne = false;for(Node node : nodes){String rackId = node.getAttributes().get("rack_id").get(0);if("targetId".equals(rackId)){foundOne = true;break;}}if(foundOne){Iterator<Node> nodesIt = nodes.iterator();while(nodesIt.hasNext()){Node node = nodesIt.next();String rackId = node.getAttributes().get("rack_id").get(0);if("targetId".equals(rackId) == false){nodesIt.remove();}}}}

配置嗅探器

嗅探器允许自动发现运行中ES集群中的节点,并将其设置为现有的RestClient实例
默认i情况下,嗅探器使用nodes info API检索属于集群的节点并采用jackson解析获得JSON响应

  <dependency><groupId>org.elasticsearch.client</groupId><artifactId>elasticsearch-rest-client-sniffer</artifactId><version>${elasticsearch.version}</version></dependency>

创建RestClient实例就可以采用嗅探器与其互联。嗅探器利用RestClient提供的定期机制(默认定期时间为5min),从集群中获取当前节点的列表,通过调用RestClient类中的setNodes方法来更新。

整体示例

import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.util.EntityUtils;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.client.RestClient;import java.io.IOException;public class OpenSearchExample {public static void main(String[] args) throws IOException {// Connect to OpenSearchfinal CredentialsProvider credentialsProvider = new BasicCredentialsProvider();credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials("admin", "admin"));RestClient restClient = RestClient.builder(new HttpHost("10.12.23.1", 9200, "http")).setHttpClientConfigCallback(httpClientBuilder -> {RequestConfig.Builder requestConfigBuilder = RequestConfig.custom().setConnectTimeout(5000).setSocketTimeout(10000).setConnectionRequestTimeout(10000);httpClientBuilder.setDefaultRequestConfig(requestConfigBuilder.build());httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);return httpClientBuilder;}).build();try {// Delete IndexdeleteIndex(restClient, "my_index");// Create IndexcreateIndex(restClient, "my_index");// Index DocumentindexDocument(restClient, "{\"index\":{\"_index\":\"my_index\",\"_id\":1}}\n{ \"field\": \"value\" }\n");// Get DocumentgetDocument(restClient, "my_index");// Delete DocumentdeleteDocument(restClient, "my_index", "1");// Delete IndexdeleteIndex(restClient, "my_index");} catch (ResponseException e) {e.printStackTrace();// Handle response exceptionSystem.err.println("Error: " + e.getResponse().getStatusLine().getReasonPhrase());} finally {// Close the clientrestClient.close();}}private static void createIndex(RestClient restClient, String index) throws IOException {// Create Index requestRequest request = new Request("PUT", "/" + index);// Execute the requestrestClient.performRequest(request);}private static void indexDocument(RestClient restClient, String s ) throws IOException {// Index Document requestRequest request = new Request("POST", "/_bulk" );request.setJsonEntity(s);// Execute the requestrestClient.performRequest(request);}private static void getDocument(RestClient restClient, String index) throws IOException {// Get Document requestRequest request = new Request("GET", "/" + index +  "/_search");// Execute the requestResponse response = restClient.performRequest(request);// Handle the responseSystem.out.println("Document found: " + EntityUtils.toString(response.getEntity()));}private static void deleteDocument(RestClient restClient, String index,  String id) throws IOException {// Delete Document requestRequest request = new Request("DELETE", "/" + index  + "/_doc/" + id);// Execute the requestrestClient.performRequest(request);}private static void deleteIndex(RestClient restClient, String index) throws IOException {// Delete Index requestRequest request = new Request("DELETE", "/" + index);// Execute the requestrestClient.performRequest(request);}
}

问题

异常如下

Caused by: sun.security.validator.ValidatorException: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested targetat java.base/sun.security.validator.PKIXValidator.doBuild(PKIXValidator.java:439)at java.base/sun.security.validator.PKIXValidator.engineValidate(PKIXValidator.java:306)at java.base/sun.security.validator.Validator.validate(Validator.java:264)at java.base/sun.security.ssl.X509TrustManagerImpl.checkTrusted(X509TrustManagerImpl.java:285)at java.base/sun.security.ssl.X509TrustManagerImpl.checkServerTrusted(X509TrustManagerImpl.java:144)at java.base/sun.security.ssl.CertificateMessage$T13CertificateConsumer.checkServerCerts(CertificateMessage.java:1335)... 19 more
Caused by: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested targetat java.base/sun.security.provider.certpath.SunCertPathBuilder.build(SunCertPathBuilder.java:146)at java.base/sun.security.provider.certpath.SunCertPathBuilder.engineBuild(SunCertPathBuilder.java:127)

解决方案

因为证书问题,我们用的是测试环境,就不要费劲的去下载私有证书再安装了,直接配置opensearch支持http即可。

opensearch.yml

plugins.security.ssl.http.enabled: false

或者直接禁用安全插件。

参考

  • https://www.cnblogs.com/openmind-ink/p/13951767.html

这篇关于从零开发短视频电商 Low Level Client(推荐)连接OpenSearch进行CRUD的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/476597

相关文章

python web 开发之Flask中间件与请求处理钩子的最佳实践

《pythonweb开发之Flask中间件与请求处理钩子的最佳实践》Flask作为轻量级Web框架,提供了灵活的请求处理机制,中间件和请求钩子允许开发者在请求处理的不同阶段插入自定义逻辑,实现诸如... 目录Flask中间件与请求处理钩子完全指南1. 引言2. 请求处理生命周期概述3. 请求钩子详解3.1

如何基于Python开发一个微信自动化工具

《如何基于Python开发一个微信自动化工具》在当今数字化办公场景中,自动化工具已成为提升工作效率的利器,本文将深入剖析一个基于Python的微信自动化工具开发全过程,有需要的小伙伴可以了解下... 目录概述功能全景1. 核心功能模块2. 特色功能效果展示1. 主界面概览2. 定时任务配置3. 操作日志演示

电脑蓝牙连不上怎么办? 5 招教你轻松修复Mac蓝牙连接问题的技巧

《电脑蓝牙连不上怎么办?5招教你轻松修复Mac蓝牙连接问题的技巧》蓝牙连接问题是一些Mac用户经常遇到的常见问题之一,在本文章中,我们将提供一些有用的提示和技巧,帮助您解决可能出现的蓝牙连接问... 蓝牙作为一种流行的无线技术,已经成为我们连接各种设备的重要工具。在 MAC 上,你可以根据自己的需求,轻松地

Go语言中使用JWT进行身份验证的几种方式

《Go语言中使用JWT进行身份验证的几种方式》本文主要介绍了Go语言中使用JWT进行身份验证的几种方式,包括dgrijalva/jwt-go、golang-jwt/jwt、lestrrat-go/jw... 目录简介1. github.com/dgrijalva/jwt-go安装:使用示例:解释:2. gi

SpringBoot如何对密码等敏感信息进行脱敏处理

《SpringBoot如何对密码等敏感信息进行脱敏处理》这篇文章主要为大家详细介绍了SpringBoot对密码等敏感信息进行脱敏处理的几个常用方法,文中的示例代码讲解详细,感兴趣的小伙伴可以了解下... 目录​1. 配置文件敏感信息脱敏​​2. 日志脱敏​​3. API响应脱敏​​4. 其他注意事项​​总结

JavaScript实战:智能密码生成器开发指南

本文通过JavaScript实战开发智能密码生成器,详解如何运用crypto.getRandomValues实现加密级随机密码生成,包含多字符组合、安全强度可视化、易混淆字符排除等企业级功能。学习密码强度检测算法与信息熵计算原理,获取可直接嵌入项目的完整代码,提升Web应用的安全开发能力 目录

C++ HTTP框架推荐(特点及优势)

《C++HTTP框架推荐(特点及优势)》:本文主要介绍C++HTTP框架推荐的相关资料,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧... 目录1. Crow2. Drogon3. Pistache4. cpp-httplib5. Beast (Boos

python进行while遍历的常见错误解析

《python进行while遍历的常见错误解析》在Python中选择合适的遍历方式需要综合考虑可读性、性能和具体需求,本文就来和大家讲解一下python中while遍历常见错误以及所有遍历方法的优缺点... 目录一、超出数组范围问题分析错误复现解决方法关键区别二、continue使用问题分析正确写法关键点三

宝塔安装的MySQL无法连接的情况及解决方案

《宝塔安装的MySQL无法连接的情况及解决方案》宝塔面板是一款流行的服务器管理工具,其中集成的MySQL数据库有时会出现连接问题,本文详细介绍两种最常见的MySQL连接错误:“1130-Hostisn... 目录一、错误 1130:Host ‘xxx.xxx.xxx.xxx’ is not allowed

Python多进程、多线程、协程典型示例解析(最新推荐)

《Python多进程、多线程、协程典型示例解析(最新推荐)》:本文主要介绍Python多进程、多线程、协程典型示例解析(最新推荐),本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定... 目录一、multiprocessing(多进程)1. 模块简介2. 案例详解:并行计算平方和3. 实现逻