ElasticSearch常用的增删查改操作

2024-08-28 02:38

本文主要是介绍ElasticSearch常用的增删查改操作,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

ElasticSearch常用的增删查改操作

使用Java对ElasticSearch增删查改操作,分为两个步骤:
1.拼接sql语句
2.执行增删查改操作
以下提供了一些常用的轮子。

sql拼接

1.最普通的sql拼接

	/*** Get query DSL* @param queryString* @return Query DSL*/public String getQueryDSL(String queryString) {String dsl = "{" + " \"query\": { " + " \"query_string\": { " + " \"query\":\"" + queryString + "\" " + "}" + " } "+ " } ";return dsl;}

2.查询并根据字段排序

	/*** Get query DSL with sort* @param filter* @param sortByASCOrDesc : key:sort field, value: asc or desc* @return query DSL with sort*/public String getQueryDSL(String queryString, Map<String, String> sortByASCOrDesc) {String dsl = "{" + " \"query\": { " + " \"query_string\": { " + " \"query\":\"" + queryString + "\" " + "}" + " } ";StringBuilder builder = new StringBuilder(dsl);builder.append(", \"sort\": [");int index = 0;for (Map.Entry<String, String> pars : sortByASCOrDesc.entrySet()) {if (index == 0) {builder.append(String.format("{\"%s\":\"%s\"}", pars.getKey(), pars.getValue()));} else {builder.append(String.format(",{\"%s\":\"%s\"}", pars.getKey(), pars.getValue()));}index++;}builder.append("]");builder.append(" } ");return builder.toString();}

3.查询区间并排序

	/*** Get query DSL with range* <p>queryString</p>* <p>rangeMap: Key = range field, value = key = gte or gt or lte or lt, value = value </p>* <p> sortByASCOrDesc: key:sort field, value: asc or desc. input null if no need sort </p>* @return query DSL with range*/public String getRangeQueryDSL(String queryString, Map<String, Map<String, String>> rangeMap, Map<String, String> sortByASCOrDesc) {StringBuilder strBuilder = new StringBuilder();strBuilder.append("{").append("\"query\": {").append("\"bool\": {").append("\"must\": [").append("{").append("\"query_string\": {").append(" \"query\":\"" + queryString + "\" ").append("}").append("},");int jj = 0;for (Map.Entry<String, Map<String, String>> pair : rangeMap.entrySet()) {String field = pair.getKey();Map<String, String> range = pair.getValue();if (jj == 0) {strBuilder.append("{").append("\"range\": {").append(" \"" + field + "\": { ");int ii = 0;for (Map.Entry<String, String> map : range.entrySet()) {if (ii == 0 && range.size() > 1) {strBuilder.append(" \"" + map.getKey() + "\": \"" + map.getValue() + "\" , ");} else {strBuilder.append(" \"" + map.getKey() + "\": \"" + map.getValue() + "\"  ");}ii++;}strBuilder.append("}").append("}").append("}");} else {strBuilder.append(",{").append("\"range\": {").append(" \"" + field + "\": { ");int ii = 0;for (Map.Entry<String, String> map : range.entrySet()) {if (ii == 0 && range.size() > 1) {strBuilder.append(" \"" + map.getKey() + "\": \"" + map.getValue() + "\" , ");} else {strBuilder.append(" \"" + map.getKey() + "\": \"" + map.getValue() + "\"  ");}ii++;}strBuilder.append("}").append("}").append("}");}jj++;}strBuilder.append("]").append("}").append("}");if(sortByASCOrDesc !=null){strBuilder.append(", \"sort\": [");jj = 0;for (Map.Entry<String, String> pars : sortByASCOrDesc.entrySet()) {if (jj == 0) {strBuilder.append(String.format("{\"%s\":\"%s\"}", pars.getKey(), pars.getValue()));} else {strBuilder.append(String.format(",{\"%s\":\"%s\"}", pars.getKey(), pars.getValue()));}jj++;}strBuilder.append("]");}strBuilder.append("}");return strBuilder.toString();}

4.查所有

	/*** @return*/public String getQueryAllDSL() {String dsl = "{ " + " \"query\": { " + " \"match_all\": {} " + " } " + " } ";return dsl;}

5.根据字段分组

    private String getAggsQueryDsl(String queryString, String strAggs) {return "{" + "\"query\": {" + "\"bool\": {" + "\"must\": [" + "{" + "\"query_string\": {" + "\"default_field\": \"_all\"," + "\"query\": \"" + queryString + "\"" + "}" + "}" + "]" + "}" + "}," +"\"size\": 0," +"\"aggs\": {" + strAggs +"}"+ "}";}

注意:
1).使用aggs聚合函数可以根据字段去重分组,如果有多字段需要去重分组,可以嵌套使用aggs;
2).实际项目中,建议尽量少的使用aggs,会比较吃性能,建议将数据查询后,在后台代码中进行去重分组

查询(Query)

1.执行查询操作,返回结果用实体类封装

	/*** Get Documents by query DSL* @param index* @param type* @param dsl variable sample: { "query":{ "query_string":{ "query":"field:value" } } }* @param classOfT sample: xxxVO.class* @return List<xxxVO>*/public <T> List<T> getDocListByQueryDSL(String index, String type, String dsl, Class<T> classOfT) {String scrollId = "";List<T> list;try {list = new ArrayList<T>();SearchResponse scrollResp = this.esClient.prepareSearch(index).setTypes(type).setExtraSource(dsl).setFrom(0).setSize(2000).setScroll(new TimeValue(60000)).execute().actionGet();scrollId = scrollResp.getScrollId();Gson gson = new Gson();do {for (SearchHit searchHit : scrollResp.getHits().getHits()) {list.add(gson.fromJson(searchHit.getSourceAsString(), classOfT));}scrollResp = this.esClient.prepareSearchScroll(scrollId).setScroll(new TimeValue(60000)).execute().actionGet();} while (scrollResp.getHits().getHits().length > 0);return list;} catch (Exception ex) {throw ex;} finally {this.clearESScrollId(scrollId);}}

2.执行查询操作,返回结果用map封装

	/*** Get document map by query DSL *  * @param index* @param type* @param dsl sample: { "query":{ "query_string":{ "query":"field:value" } } }* @param classOfT* @return Map<String, T> Key = document ID , value = Document VO*/public <T> Map<String, T> getDocMapByQueryDSL(String index, String type, String dsl, Class<T> classOfT) {String scrollId = "";Map<String, T> retMap;try {retMap = new HashMap<String, T>();SearchResponse scrollResp = this.esClient.prepareSearch(index).setTypes(type).setExtraSource(dsl).setFrom(0).setSize(2000).setScroll(new TimeValue(60000)).execute().actionGet();scrollId = scrollResp.getScrollId();Gson gson = new Gson();do {for (SearchHit searchHit : scrollResp.getHits().getHits()) {retMap.put(searchHit.getId(), gson.fromJson(searchHit.getSourceAsString(), classOfT));}scrollResp = this.esClient.prepareSearchScroll(scrollId).setScroll(new TimeValue(60000)).execute().actionGet();} while (scrollResp.getHits().getHits().length > 0);return retMap;} catch (Exception ex) {throw ex;} finally {this.clearESScrollId(scrollId);}}

3.根据ID查询

	/*** Get one document by document ID* * @param index* @param type* @param docID sample: AAA_BBB_CCC* @param classOfT * @return xxxVO*/public <T> T getDocByDocID(String index, String type, String docID, Class<T> classOfT) {GetResponse response = this.esClient.prepareGet(index, type, docID).get();Gson gson = new Gson();return gson.fromJson(response.getSourceAsString(), classOfT);}

4.根据ID的List查询

	/*** Get documents by their ID* * @param index* @param type* @param docIDs sample:List<AAA_BBB_CCC  >* @param classOfT* @return* @throws Exception*/public <T> List<T> getDocListByDocID(String index, String type, List<String> docIDs, Class<T> classOfT)throws Exception {String scrollId = "";List<T> list;try {list = new ArrayList<T>();SearchResponse scrollResp = this.esClient.prepareSearch(index).setTypes(type).setQuery(QueryBuilders.idsQuery().ids(docIDs)).setScroll(new TimeValue(60000)).setFrom(0).setSize(1000).execute().actionGet();Gson gson = new Gson();do {for (SearchHit searchHit : scrollResp.getHits().getHits()) {list.add(gson.fromJson(searchHit.getSourceAsString(), classOfT));}scrollId = scrollResp.getScrollId();scrollResp = this.esClient.prepareSearchScroll(scrollId).setScroll(new TimeValue(60000)).execute().actionGet();} while (scrollResp.getHits().getHits().length > 0);return list;} catch (Exception ex) {throw ex;} finally {this.clearESScrollId(scrollId);}}

5.根据ID查询字段值,返回结果用map封装

	/*** Get extra source of documents by document IDs* @param index* @param type* @param ids* @param fields* @return <p>Map<String, Object> key = id_field, value = document id_field's value.*            sample:Map<AAA_BBB_CCC_field, value></p>* @throws Exception*/public Map<String, Object> getExtraMapByDocID(String index, String type, List<String> ids, String... fields)throws Exception {SearchResponse response = this.esClient.prepareSearch(index).setTypes(type).setQuery(QueryBuilders.idsQuery().ids(ids)).setFetchSource(fields, null).setScroll(new TimeValue(60000)).setFrom(0).setSize(1000).execute().actionGet();Map<String, Object> retMap = Maps.newHashMap();while (true) {for (SearchHit hit : response.getHits().getHits()) {for (String field : fields) {String key = String.format("%s_%s", hit.getId(), field);retMap.put(key, hit.getSource().get(field));}}response = this.esClient.prepareSearchScroll(response.getScrollId()).setScroll(new TimeValue(60000)).execute().actionGet();// Break condition: No hits are returnedif (response.getHits().getHits().length == 0) {break;}}return retMap;}

6.根据sql查询字段值

	/*** Get Extra _source* * @param index* @param type* @param queryString* @param fields* @return List<Map<String, Object>> Map<String, Object> each document key &*         value List<Map<String, Object>> all document key & value* @throws Exception*/public List<Map<String, Object>> getExtraMapByQueryDSL(String index, String type, String queryDSL, String... fields)throws Exception {List<Map<String, Object>> rtnLst = Lists.newArrayList();String scrollId = "";try {SearchResponse response = this.esClient.prepareSearch(index).setTypes(type).setExtraSource(queryDSL).setFetchSource(fields, null).setScroll(new TimeValue(60000)).setFrom(0).setSize(1000).execute().actionGet();scrollId = response.getScrollId();return this.getHitsResult(response, rtnLst, fields);} catch (Exception ex) {throw ex;} finally {this.clearESScrollId(scrollId);}}/*** @param response* @param rtnLst* @param fields* @return*/private List<Map<String, Object>> getHitsResult(SearchResponse response, List<Map<String, Object>> rtnLst,String... fields) { while (true) {for (SearchHit hit : response.getHits().getHits()) {Map<String, Object> tempMap = Maps.newHashMap();for (String field : fields) {tempMap.put(field, hit.getSource().get(field));}rtnLst.add(tempMap);}response = this.esClient.prepareSearchScroll(response.getScrollId()).setScroll(new TimeValue(60000)).execute().actionGet();if (response.getHits().getHits().length == 0) {break;}}return rtnLst;}

7.查询排序后的第一笔数据

	/*** Get Top 1 document by sorted DSL* * @param index* @param type* @param dsl*            should by with sort* @param classOfT* @return*/public <T> T getTop1DocByQueryDSL(String index, String type, String dsl, Class<T> classOfT) {String scrollId = "";try {SearchResponse scrollResp = this.esClient.prepareSearch(index).setTypes(type).setExtraSource(dsl).setFrom(0).setSize(1).setScroll(new TimeValue(60000)).execute().actionGet();scrollId = scrollResp.getScrollId();Gson gson = new Gson();for (SearchHit searchHit : scrollResp.getHits().getHits()) {return gson.fromJson(searchHit.getSourceAsString(), classOfT);}return null;} catch (Exception ex) {throw ex;} finally {this.clearESScrollId(scrollId);}}

8.通过sql查询数据是否存在

	/*** Check document exist or not by query DSL* * @param queryDsl* @param esIndex* @param esType* @return* @throws Exception*/public boolean checkDocExistByQueryDSL(String esIndex, String esType, String queryDsl) throws Exception {try {SearchRequestBuilder searchBuilder = this.esClient.prepareSearch(esIndex).setTypes(esType).setFetchSource(new String[] { "_id" }, null).setExtraSource(queryDsl).setFrom(0).setSize(1).setScroll(new TimeValue(60000));SearchResponse scrollResp = searchBuilder.execute().actionGet();if (scrollResp.getHits().getTotalHits() > 0) {return true;}return false;} catch (Exception e) {throw e;}}

9.查询数据笔数

    /*** Return total count by DSL*/protected long getTotalHitsByDsl(String index, String type, String dsl) {String scrollId = "";try {SearchResponse scrollResp = this.esClient.prepareSearch(index).setTypes(type).setExtraSource(dsl).setFrom(0).setSize(0).setScroll(new TimeValue(60000)).execute().actionGet();scrollId = scrollResp.getScrollId();return scrollResp.getHits().getTotalHits();} catch (Exception ex) {throw ex;} finally {this.clearESScrollId(scrollId);}}

新增(Upsert)

注:upsert用法:有记录就更新,没有记录就新增

1.通过ID 执行upsert操作

    /*** Upsert data by VO + document ID.* * @param index* @param type* @param id* @param vo* @return*/public <T> IndexResponse upsert(String index, String type, String docID, T vo) {Gson gson = new Gson();String source = gson.toJson(vo);return this.esClient.prepareIndex(index, type, docID).setRefresh(true).setSource(source).execute().actionGet();}

2.批量upsert

    /*** @param index* @param type* @param voList* @param getDocIDMethodInVo*                sample : public String getDocumentID() { return*               String.format("%s_%s", AAA,BBB); }* @param isAutoRefresh* @throws Exception*/public <T> void bulkProcessUpsert(String index, String type, List<T> voList, String getDocIDMethodInVo, boolean isAutoRefresh) throws Exception {Gson gson = new Gson();BulkRequestBuilder bulkRequest = this.esClient.prepareBulk();bulkRequest.setTimeout(TimeValue.timeValueMinutes(20));for (T vo : voList) {Method addMethod = vo.getClass().getMethod(getDocIDMethodInVo, new Class[] {});Object result = addMethod.invoke(vo, new Object[] {});IndexRequestBuilder indexRequestBuilder = this.esClient.prepareIndex(index, type, result.toString()).setSource(gson.toJson(vo));bulkRequest.add(indexRequestBuilder);}if (bulkRequest.numberOfActions() > 0) {BulkResponse resp = bulkRequest.setRefresh(isAutoRefresh).execute().actionGet();if (resp.hasFailures()) {throw new Exception(String.format("Bulk index items failed, message:%s", resp.buildFailureMessage()));}}}

更新(Update)

1.根据ID执行update操作

	/*** update Extra fields by document ID* * @param index* @param type* @param id* @param fieldValMap:key=index field, value = value to be update* @return* @throws Exception*/public UpdateResponse update(String index, String type, String id, Map<String, Object> fieldValMap) throws Exception {return this.esClient.update(new UpdateRequest(index, type, id).refresh(true).doc(fieldValMap)).get();}

2.批量update

    /*** Update datas. Map key is doc_id, Map Value is update field-value map.*/protected void bulkProcessUpdate(String index, String type, Map<String, Map<String, Object>> mapUpdates, boolean isAutoRefresh)throws Exception {BulkRequestBuilder bulkRequest = this.esClient.prepareBulk();bulkRequest.setTimeout(TimeValue.timeValueMinutes(20));for (String docId : mapUpdates.keySet()) {Map<String, Object> mapUpdateFieldValue = mapUpdates.get(docId);XContentBuilder jsonBuilder = XContentFactory.jsonBuilder().startObject();for (String key : mapUpdateFieldValue.keySet()) {jsonBuilder.field(key, mapUpdateFieldValue.get(key));}jsonBuilder.endObject();UpdateRequestBuilder updateRequestBuilder = this.esClient.prepareUpdate(index, type, docId).setDoc(jsonBuilder);bulkRequest.add(updateRequestBuilder);}if (bulkRequest.numberOfActions() > 0) {BulkResponse resp = bulkRequest.setRefresh(isAutoRefresh).execute().actionGet();if (resp.hasFailures()) {throw new Exception(String.format("Bulk update items failed, message:%s", resp.buildFailureMessage()));}}}

删除(Delete)

1.根据ID 删除

	/*** Delete data by id*/public boolean deleteById(String index, String type, String id) {return this.esClient.prepareDelete(index, type, id).setRefresh(true).execute().actionGet().isFound();}

2.批量删除

	/*** Bulk delete by document IDs* * @param index* @param type* @param lstId* @throws Exception*/public void bulkProcessDelete(String index, String type, List<String> lstId) throws Exception {final BulkProcessor bulkProcessor = BulkProcessor.builder(this.esClient, new BulkProcessor.Listener() {public void beforeBulk(long executionId, BulkRequest request) {}public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {}public void afterBulk(long executionId, BulkRequest request, Throwable failure) {failure.printStackTrace();logger.error("ElasticsearchHelper.bulkProcessDelete.Index is {}. error is {}. ", index,failure.getMessage());}}).setBulkActions(BULK_MAX_SIZE).setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB)).setFlushInterval(TimeValue.timeValueSeconds(5)).build();try {for (String docId : lstId) {bulkProcessor.add(new DeleteRequest(index, type, docId));}} catch (Exception ex) {logger.error("ElasticsearchHelper.bulkProcessUpdateItems.Index is {}. error is {}. ", ex);throw ex;} finally {bulkProcessor.awaitClose(1, TimeUnit.MINUTES);this.esClient.admin().indices().prepareRefresh(index).get();}}

3.根据DSL删除

	/*** Delete by query* * @param queryDsl* @param index* @param type* @throws Exception*/public void deleteByQueryDsl(String queryDsl , String index, String type, boolean isAutoRefresh) throws Exception {Client client = null;try{client =this.esClient;SearchRequestBuilder searchBuilder = client.prepareSearch(index).setTypes(type).setExtraSource(queryDsl).setFrom(0).setSize(1500).setScroll(new TimeValue(60000));SearchResponse scrollResp = searchBuilder.execute().actionGet();BulkRequestBuilder bulkRequest = client.prepareBulk();bulkRequest.setTimeout(TimeValue.timeValueMinutes(20));while (true) {for (SearchHit hit : scrollResp.getHits().getHits()) {bulkRequest.add(client.prepareDelete(index, type, hit.getId()));}scrollResp = client.prepareSearchScroll(scrollResp.getScrollId()).setScroll(new TimeValue(60000)).execute().actionGet();// Break condition: No hits are returnedif (scrollResp.getHits().getHits().length == 0) {break;}}if (bulkRequest.numberOfActions() > 0) {BulkResponse response = bulkRequest.setRefresh(isAutoRefresh).execute().actionGet();if (response.hasFailures()) {throw new Exception("deleteByFilter failure:" + response.buildFailureMessage());}}}catch(Exception ex) {throw ex;}}
	/*** delete documents By Query DSL* * @param index* @param type* @param dsl* @throws Exception*/public void deleteByQueryDsl(String index, String type, String dsl) throws Exception {String scrollId = "";List<String> lstId;try {lstId = new ArrayList<String>();SearchResponse scrollResp = this.esClient.prepareSearch(index).setTypes(type).setExtraSource(dsl).setFrom(0).setSize(2000).setScroll(new TimeValue(60000)).execute().actionGet();scrollId = scrollResp.getScrollId();do {for (SearchHit searchHit : scrollResp.getHits().getHits()) {lstId.add(searchHit.getId());}scrollResp = this.esClient.prepareSearchScroll(scrollId).setScroll(new TimeValue(60000)).execute().actionGet();} while (scrollResp.getHits().getHits().length > 0);this.bulkProcessDelete(index, type, lstId);} catch (Exception ex) {throw ex;} finally {this.clearESScrollId(scrollId);}}

补充

我们在执行ES操作的时候,为了避免一次操作的数据量过大,通常会设置size;因此,我们用到了scrollId;在方法finally中,需要将scrollId清除。
例如:

public void clearESScrollId(String scrollId) {if (StringUtils.isNoneEmpty(scrollId)) {ClearScrollRequest clearScrollerRequest = new ClearScrollRequest();clearScrollerRequest.addScrollId(scrollId);this.esClient.clearScroll(clearScrollerRequest).actionGet();}}

这篇关于ElasticSearch常用的增删查改操作的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1113490

相关文章

python判断文件是否存在常用的几种方式

《python判断文件是否存在常用的几种方式》在Python中我们在读写文件之前,首先要做的事情就是判断文件是否存在,否则很容易发生错误的情况,:本文主要介绍python判断文件是否存在常用的几种... 目录1. 使用 os.path.exists()2. 使用 os.path.isfile()3. 使用

SQL中JOIN操作的条件使用总结与实践

《SQL中JOIN操作的条件使用总结与实践》在SQL查询中,JOIN操作是多表关联的核心工具,本文将从原理,场景和最佳实践三个方面总结JOIN条件的使用规则,希望可以帮助开发者精准控制查询逻辑... 目录一、ON与WHERE的本质区别二、场景化条件使用规则三、最佳实践建议1.优先使用ON条件2.WHERE用

Linux链表操作方式

《Linux链表操作方式》:本文主要介绍Linux链表操作方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、链表基础概念与内核链表优势二、内核链表结构与宏解析三、内核链表的优点四、用户态链表示例五、双向循环链表在内核中的实现优势六、典型应用场景七、调试技巧与

Java Multimap实现类与操作的具体示例

《JavaMultimap实现类与操作的具体示例》Multimap出现在Google的Guava库中,它为Java提供了更加灵活的集合操作,:本文主要介绍JavaMultimap实现类与操作的... 目录一、Multimap 概述Multimap 主要特点:二、Multimap 实现类1. ListMult

Python中文件读取操作漏洞深度解析与防护指南

《Python中文件读取操作漏洞深度解析与防护指南》在Web应用开发中,文件操作是最基础也最危险的功能之一,这篇文章将全面剖析Python环境中常见的文件读取漏洞类型,成因及防护方案,感兴趣的小伙伴可... 目录引言一、静态资源处理中的路径穿越漏洞1.1 典型漏洞场景1.2 os.path.join()的陷

Python使用Code2flow将代码转化为流程图的操作教程

《Python使用Code2flow将代码转化为流程图的操作教程》Code2flow是一款开源工具,能够将代码自动转换为流程图,该工具对于代码审查、调试和理解大型代码库非常有用,在这篇博客中,我们将深... 目录引言1nVflRA、为什么选择 Code2flow?2、安装 Code2flow3、基本功能演示

Python中OpenCV与Matplotlib的图像操作入门指南

《Python中OpenCV与Matplotlib的图像操作入门指南》:本文主要介绍Python中OpenCV与Matplotlib的图像操作指南,本文通过实例代码给大家介绍的非常详细,对大家的学... 目录一、环境准备二、图像的基本操作1. 图像读取、显示与保存 使用OpenCV操作2. 像素级操作3.

Java实现本地缓存的常用方案介绍

《Java实现本地缓存的常用方案介绍》本地缓存的代表技术主要有HashMap,GuavaCache,Caffeine和Encahche,这篇文章主要来和大家聊聊java利用这些技术分别实现本地缓存的方... 目录本地缓存实现方式HashMapConcurrentHashMapGuava CacheCaffe

python操作redis基础

《python操作redis基础》Redis(RemoteDictionaryServer)是一个开源的、基于内存的键值对(Key-Value)存储系统,它通常用作数据库、缓存和消息代理,这篇文章... 目录1. Redis 简介2. 前提条件3. 安装 python Redis 客户端库4. 连接到 Re

Java Stream.reduce()方法操作实际案例讲解

《JavaStream.reduce()方法操作实际案例讲解》reduce是JavaStreamAPI中的一个核心操作,用于将流中的元素组合起来产生单个结果,:本文主要介绍JavaStream.... 目录一、reduce的基本概念1. 什么是reduce操作2. reduce方法的三种形式二、reduce