使用 Meltano 将数据从 Snowflake 导入到 Elasticsearch:开发者之旅

本文主要是介绍使用 Meltano 将数据从 Snowflake 导入到 Elasticsearch:开发者之旅,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

作者:来自 Elastic Dmitrii Burlutskii

在 Elastic 的搜索团队中,我们一直在探索不同的 ETL 工具以及如何利用它们将数据传输到 Elasticsearch,并在传输的数据上实现 AI 助力搜索。今天,我想与大家分享我们与 Meltano 生态系统以及 Meltano Elasticsearch 加载器的故事。

Meltano 是一个声明式的代码优先数据集成引擎,允许你在不同的存储之间同步数据。在 hub.meltano.com 上有许提取器 (extractors) 和加载器 (loaders) 可用。如果你的数据存储在 Snowflake 中,并且想要为你的客户构建一个开箱即用的搜索体验,你可能会考虑使用 Elasticsearch,在那里你可以基于你拥有的数据为客户构建语义搜索。今天,我们将重点介绍如何将数据从 Snowflake 同步到 Elasticsearch。

要求

Snowflake 账号。 你在注册后将收到以下所有账号信息,或者你可以从 Snowflake 面板中获取它们。

  1. 账户用户名
  2. 账户密码
  3. 账户标识符(查看此处的说明以获取它)

Snowflake 数据集

如果你创建了一个新的 Snowflake 账户,你将拥有用于实验的示例数据。

然而,我将使用一个公共空气质量数据集,其中包含二氧化氮(NO2)的测量数据。

Elastic 账号

访问 https://cloud.elastic.co 并注册账号。

点击 “Create deployment”。在弹出窗口中,你可以更改或保留默认设置。

一旦准备好部署,请点击 “Continue”(或点击 “Open Kibana”)。它将重定向你到 Kibana 仪表板。

转到 Stack Management -> Security -> API keys,并生成一个新的 API 密钥。

安装 Meltano

在我的示例中,我将使用 Meltano Python 包,但你也可以将其作为 Docker 容器安装。

pip install "meltano"

添加 Snowflake 提取器

meltano add extractor tap-snowflake --variant=meltanolabs

验证提取器

meltano invoke tap-snowflake --test

添加 Elasticsearch 加载器

meltano add loader target-elasticsearch

配置提取器和加载器:

有多种方法可以配置 Meltano 提取器和加载器:

  • 编辑 meltano.yml
  • 使用 CLI 命令,例如
meltano config {loader} set config_name config_value

使用 CLI 交互模式

meltano config {loader} set --interactive

我将使用交互模式。

要配置 Snowflake 提取器,请运行以下命令并至少提​​供帐户标识符、用户名、密码和数据库。

meltano config tap-snowflake set --interactive

你应该会看到以下屏幕,你可以在其中选择要配置的选项。

配置提取后,你可以测试连接。 只需运行以下命令:

配置 Elasticsearch 加载器并提供主机、端口、架构和 API 密钥,

meltano config target-elasticsearch set --interactive

如果你想更改索引名称,可以运行以下命令并更改它:

meltano config target-elasticsearch set index_format my-index-name
meltano config target-elasticsearch set index_format my-index-name

比如, 默认索引字符串定义为 ecs-{{ stream_name }}-{{ current_timestamp_daily}} ,结果为 ecs-animals-2022-12-25,其中流名称为 animals。

配置完所有内容后,我们就可以开始同步数据。

meltano run tap-snowflake target-elasticsearch

同步开始后,你可以转到 Kibana 并看到创建了一个新索引并且有一些索引文档。

你可以通过单击索引名称来查看文档。 你应该查看你的文件。

使用你的索引设置(或映射)

如果我们开始同步数据,加载器将自动创建一个具有动态映射的新索引,这意味着 Elasticsearch 将处理索引中的字段及其类型。 如果我们愿意,我们可以通过提前创建索引并应用我们需要的设置来更改此行为。 咱们试试吧。

导航到 Kibana -> DevTools 并运行以下命令:

创建新的摄入管道

PUT _ingest/pipeline/drop-values-10
{"processors": [{"drop": {"description": "Drop documents with the value < 10","if": "ctx.datavalue < 10"}}]
}

这将删除 datavalue < 10 的所有文档。

创建新索引

PUT my-snowflake-data

应用索引设置

PUT my-snowflake-data/_settings
{"index": {"default_pipeline": "_ingest/pipeline/drop-values-10"}
}

更改 Meltano 中的索引名称

meltano config target-elasticsearch set index_format my-snowflake-data

开始同步作业

meltano run tap-snowflake target-elasticsearch

工作完成后,你可以看到索引中的文档比我们之前创建的要少

结论

我们已经成功地将数据从 Snowflake 同步到 Elastic Cloud。我们让 Meltano 为我们创建了一个新索引,并负责索引映射,我们将数据同步到了一个具有预定义管道的现有索引中。

我想强调在我旅程中记下的一些关键点:

Elasticsearch 加载器(Meltano Hub 上的页面)

  • 它尚未准备好处理大量的数据。你需要调整默认的 Elasticsearch 配置,使其更加健壮。我已经提交了一个 Pull Request,以暴露 “request_timeout” 和 “retry_on_timeout” 选项,这将会有所帮助。
  • 它使用 Elasticsearch Python 客户端的 8.x 分支,因此你可以确保它支持最新的 Elasticsearch 功能。
  • 它同步发送数据(不使用 Python AsyncIO),因此当您需要传输大量数据时可能会相当慢。

Meltano CLI

  • 它非常棒。你不需要 UI,所以一切都可以在终端中配置,这为工程师提供了大量的自动化选项。
  • 你可以仅通过一个命令即可运行按需同步。不需要其他正在运行的服务。

复制/增量同步

  • 如果你的管道需要数据复制或增量同步,你可以访问这个页面信息。

另外,我想提一下 Meltano Hub 真的很棒。它易于导航并找到你需要的内容。此外,你可以通过查看有多少客户使用它们来轻松比较不同的加载器或抽取器。

如果你对构建基于 AI 的应用程序感兴趣,请在以下博客文章中查找更多信息:

  • 在你的数据集上实现全文和语义搜索能力。
  • 连接你的数据与 LLMs,构建问题 - 答案。
  • 构建一个使用检索增强生成(RAG)模式的聊天机器人。

准备将 RAG 构建到你的应用中了吗?想要尝试不同的 LLMs 与向量数据库吗? 查看我们在 Github 上关于 LangChain、Cohere 等的示例 notebooks,并加入即将开始的 Elasticsearch 工程师培训!

原文:Ingest Data from Snowflake to Elasticsearch using Meltano: A developer’s journey — Elastic Search Labs

这篇关于使用 Meltano 将数据从 Snowflake 导入到 Elasticsearch:开发者之旅的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/887856

相关文章

Java中流式并行操作parallelStream的原理和使用方法

《Java中流式并行操作parallelStream的原理和使用方法》本文详细介绍了Java中的并行流(parallelStream)的原理、正确使用方法以及在实际业务中的应用案例,并指出在使用并行流... 目录Java中流式并行操作parallelStream0. 问题的产生1. 什么是parallelS

Linux join命令的使用及说明

《Linuxjoin命令的使用及说明》`join`命令用于在Linux中按字段将两个文件进行连接,类似于SQL的JOIN,它需要两个文件按用于匹配的字段排序,并且第一个文件的换行符必须是LF,`jo... 目录一. 基本语法二. 数据准备三. 指定文件的连接key四.-a输出指定文件的所有行五.-o指定输出

Linux jq命令的使用解读

《Linuxjq命令的使用解读》jq是一个强大的命令行工具,用于处理JSON数据,它可以用来查看、过滤、修改、格式化JSON数据,通过使用各种选项和过滤器,可以实现复杂的JSON处理任务... 目录一. 简介二. 选项2.1.2.2-c2.3-r2.4-R三. 字段提取3.1 普通字段3.2 数组字段四.

Linux kill正在执行的后台任务 kill进程组使用详解

《Linuxkill正在执行的后台任务kill进程组使用详解》文章介绍了两个脚本的功能和区别,以及执行这些脚本时遇到的进程管理问题,通过查看进程树、使用`kill`命令和`lsof`命令,分析了子... 目录零. 用到的命令一. 待执行的脚本二. 执行含子进程的脚本,并kill2.1 进程查看2.2 遇到的

详解SpringBoot+Ehcache使用示例

《详解SpringBoot+Ehcache使用示例》本文介绍了SpringBoot中配置Ehcache、自定义get/set方式,并实际使用缓存的过程,文中通过示例代码介绍的非常详细,对大家的学习或者... 目录摘要概念内存与磁盘持久化存储:配置灵活性:编码示例引入依赖:配置ehcache.XML文件:配置

Java 虚拟线程的创建与使用深度解析

《Java虚拟线程的创建与使用深度解析》虚拟线程是Java19中以预览特性形式引入,Java21起正式发布的轻量级线程,本文给大家介绍Java虚拟线程的创建与使用,感兴趣的朋友一起看看吧... 目录一、虚拟线程简介1.1 什么是虚拟线程?1.2 为什么需要虚拟线程?二、虚拟线程与平台线程对比代码对比示例:三

k8s按需创建PV和使用PVC详解

《k8s按需创建PV和使用PVC详解》Kubernetes中,PV和PVC用于管理持久存储,StorageClass实现动态PV分配,PVC声明存储需求并绑定PV,通过kubectl验证状态,注意回收... 目录1.按需创建 PV(使用 StorageClass)创建 StorageClass2.创建 PV

Redis 基本数据类型和使用详解

《Redis基本数据类型和使用详解》String是Redis最基本的数据类型,一个键对应一个值,它的功能十分强大,可以存储字符串、整数、浮点数等多种数据格式,本文给大家介绍Redis基本数据类型和... 目录一、Redis 入门介绍二、Redis 的五大基本数据类型2.1 String 类型2.2 Hash

Redis中Hash从使用过程到原理说明

《Redis中Hash从使用过程到原理说明》RedisHash结构用于存储字段-值对,适合对象数据,支持HSET、HGET等命令,采用ziplist或hashtable编码,通过渐进式rehash优化... 目录一、开篇:Hash就像超市的货架二、Hash的基本使用1. 常用命令示例2. Java操作示例三

Linux创建服务使用systemctl管理详解

《Linux创建服务使用systemctl管理详解》文章指导在Linux中创建systemd服务,设置文件权限为所有者读写、其他只读,重新加载配置,启动服务并检查状态,确保服务正常运行,关键步骤包括权... 目录创建服务 /usr/lib/systemd/system/设置服务文件权限:所有者读写js,其他