ES中摄取管道详解

2023-11-07 01:59
文章标签 es 详解 管道 摄取

本文主要是介绍ES中摄取管道详解,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

一、什么是摄取管道

摄取管道 Ingest pipelines

摄取管道主要用来在数据被索引之前对数据执行常见的转换。
例如,您可以使用管道来移除字段、从文本中提取值以及丰富数据。

管道由一系列称为处理器的可配置任务组成。每个处理器按顺序运行,对传入的文档进行特定的更改。在处理器运行之后,Elasticsearch 将转换后的文档添加到数据流或索引中。

管道的工作流程图如下:
在这里插入图片描述

二、摄取管道使用

1.创建管道

方式一:在kibana中创建
Stack Management > Ingest Pipelines
在这里插入图片描述
方式二:采用API创建
下面的 create pipeline API 请求创建一个包含两个 set 处理器和一个小写处理器的管道。处理器按指定的顺序顺序运行。

PUT _ingest/pipeline/my-pipeline
{"description": "My optional pipeline description","processors": [{"set": {"description": "My optional processor description","field": "my-long-field","value": 10}},{"set": {"description": "Set 'my-boolean-field' to true","field": "my-boolean-field","value": true}},{"lowercase": {"field": "my-keyword-field"}}]
}

2.测试管道

方式一:在kibana中测试
选择创建的管道,打开编辑页面,测试管道——》添加文档
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

方式二:采用_simulate的API测试
1、在请求URL中指定管道

POST _ingest/pipeline/my-pipeline/_simulate
{"docs": [{"_source": {"my-keyword-field": "FOO"}},{"_source": {"my-keyword-field": "BAR"}}]
}

2、在请求body中指定管道

POST _ingest/pipeline/_simulate
{"pipeline": {"processors": [{"lowercase": {"field": "my-keyword-field"}}]},"docs": [{"_source": {"my-keyword-field": "FOO"}},{"_source": {"my-keyword-field": "BAR"}}]
}

3.在索引请求中使用管道

说明:在向索引my-data-stream添加数据时,使用管道y-pipeline

POST my-data-stream/_doc?pipeline=my-pipeline
{"@timestamp": "2099-03-07T11:04:05.000Z","my-keyword-field": "foo"
}PUT my-data-stream/_bulk?pipeline=my-pipeline
{ "create":{ } }
{ "@timestamp": "2099-03-07T11:04:06.000Z", "my-keyword-field": "foo" }
{ "create":{ } }
{ "@timestamp": "2099-03-07T11:04:07.000Z", "my-keyword-field": "bar" }

在使用_update_by_query_reindex时使用管道:

POST my-data-stream/_update_by_query?pipeline=my-pipelinePOST _reindex
{"source": {"index": "my-data-stream"},"dest": {"index": "my-new-data-stream","op_type": "create","pipeline": "my-pipeline"}
}

4.给索引设置默认管道

通过index.default_pipeline属性,可以给索引设置默认的管道。

5.索引模板中设置默认管道

PUT _component_template/logs-my_app-settings
{"template": {"settings": {"index.default_pipeline": "logs-my_app-default","index.lifecycle.name": "logs"}}
}

6.管道异常处理

PUT _ingest/pipeline/my-pipeline
{"processors": [ ... ],"on_failure": [{"set": {"description": "Index document to 'failed-<index>'","field": "_index","value": "failed-{{{ _index }}}"}}]
}

三、管道功能演示

1、字段重命名

PUT _ingest/pipeline/my-pipeline
{"processors": [{"rename": {"description": "Rename 'provider' to 'cloud.provider'","field": "provider","target_field": "cloud.provider","ignore_failure": true}}]
}

2、删除特定记录

这里采用if配置管道处理函数的触发条件。

PUT _ingest/pipeline/my-pipeline
{"processors": [{"drop": {"description": "Drop documents with 'network.name' of 'Guest'","if": "ctx?.network?.name == 'Guest'"}}]
}

更复杂的条件可以采用scripts脚本:

PUT _ingest/pipeline/my-pipeline
{"processors": [{"drop": {"description": "Drop documents that don't contain 'prod' tag","if": """Collection tags = ctx.tags;if(tags != null){for (String tag : tags) {if (tag.toLowerCase().contains('prod')) {return false;}}}return true;"""}}]
}

注意⚠️:
尽量避免使用复杂或昂贵的条件脚本,昂贵的条件脚本会降低索引速度。

3、给字段赋值

PUT _ingest/pipeline/my-pipeline
{"processors": [{"set": {"field": "_source.my-long-field","value": 10}}]
}

采用元数据赋值

PUT _ingest/pipeline/my-pipeline
{"processors": [{"set": {"description": "Index the ingest timestamp as 'event.ingested'","field": "event.ingested","value": "{{{_ingest.timestamp}}}"}}]
}

总结

本文主要介绍了ES中摄取管道pipeline的使用。
摄取管道主要用来在数据被索引之前对数据执行常见的转换。
可以使用管道来移除字段、从文本中提取值以及丰富数据

这篇关于ES中摄取管道详解的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/360502

相关文章

Linux线程同步/互斥过程详解

《Linux线程同步/互斥过程详解》文章讲解多线程并发访问导致竞态条件,需通过互斥锁、原子操作和条件变量实现线程安全与同步,分析死锁条件及避免方法,并介绍RAII封装技术提升资源管理效率... 目录01. 资源共享问题1.1 多线程并发访问1.2 临界区与临界资源1.3 锁的引入02. 多线程案例2.1 为

Python使用Tenacity一行代码实现自动重试详解

《Python使用Tenacity一行代码实现自动重试详解》tenacity是一个专为Python设计的通用重试库,它的核心理念就是用简单、清晰的方式,为任何可能失败的操作添加重试能力,下面我们就来看... 目录一切始于一个简单的 API 调用Tenacity 入门:一行代码实现优雅重试精细控制:让重试按我

Python标准库之数据压缩和存档的应用详解

《Python标准库之数据压缩和存档的应用详解》在数据处理与存储领域,压缩和存档是提升效率的关键技术,Python标准库提供了一套完整的工具链,下面小编就来和大家简单介绍一下吧... 目录一、核心模块架构与设计哲学二、关键模块深度解析1.tarfile:专业级归档工具2.zipfile:跨平台归档首选3.

idea的终端(Terminal)cmd的命令换成linux的命令详解

《idea的终端(Terminal)cmd的命令换成linux的命令详解》本文介绍IDEA配置Git的步骤:安装Git、修改终端设置并重启IDEA,强调顺序,作为个人经验分享,希望提供参考并支持脚本之... 目录一编程、设置前二、前置条件三、android设置四、设置后总结一、php设置前二、前置条件

python中列表应用和扩展性实用详解

《python中列表应用和扩展性实用详解》文章介绍了Python列表的核心特性:有序数据集合,用[]定义,元素类型可不同,支持迭代、循环、切片,可执行增删改查、排序、推导式及嵌套操作,是常用的数据处理... 目录1、列表定义2、格式3、列表是可迭代对象4、列表的常见操作总结1、列表定义是处理一组有序项目的

python使用try函数详解

《python使用try函数详解》Pythontry语句用于异常处理,支持捕获特定/多种异常、else/final子句确保资源释放,结合with语句自动清理,可自定义异常及嵌套结构,灵活应对错误场景... 目录try 函数的基本语法捕获特定异常捕获多个异常使用 else 子句使用 finally 子句捕获所

C++11范围for初始化列表auto decltype详解

《C++11范围for初始化列表autodecltype详解》C++11引入auto类型推导、decltype类型推断、统一列表初始化、范围for循环及智能指针,提升代码简洁性、类型安全与资源管理效... 目录C++11新特性1. 自动类型推导auto1.1 基本语法2. decltype3. 列表初始化3

SQL Server 中的 WITH (NOLOCK) 示例详解

《SQLServer中的WITH(NOLOCK)示例详解》SQLServer中的WITH(NOLOCK)是一种表提示,等同于READUNCOMMITTED隔离级别,允许查询在不获取共享锁的情... 目录SQL Server 中的 WITH (NOLOCK) 详解一、WITH (NOLOCK) 的本质二、工作

springboot自定义注解RateLimiter限流注解技术文档详解

《springboot自定义注解RateLimiter限流注解技术文档详解》文章介绍了限流技术的概念、作用及实现方式,通过SpringAOP拦截方法、缓存存储计数器,结合注解、枚举、异常类等核心组件,... 目录什么是限流系统架构核心组件详解1. 限流注解 (@RateLimiter)2. 限流类型枚举 (

Java Thread中join方法使用举例详解

《JavaThread中join方法使用举例详解》JavaThread中join()方法主要是让调用改方法的thread完成run方法里面的东西后,在执行join()方法后面的代码,这篇文章主要介绍... 目录前言1.join()方法的定义和作用2.join()方法的三个重载版本3.join()方法的工作原