如何用logstash处理列式存储的文件

2023-10-23 14:59
文章标签 处理 存储 logstash 列式

本文主要是介绍如何用logstash处理列式存储的文件,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

背景

最近遇到一个问题,朋友需要使用es去处理一些基因数据,其特点和其他的数据不一样,对象的个数很少,但每个对象下面有很多field。并且field的值是动态添加的,用列式存储数据是最为方便的。

方便起见,画了个示意图,file1是行式存储,即我们常见的csv,第一行是标题,后面每一行就是一条记录。

而file2,则是列式存储,第一列式header,后面每一列都是一条记录
在这里插入图片描述
要使用logstash或者其他任何工具处理这个文本都会带来不小的麻烦。因为,对于文件的处理,我们是按行写入的,通过\n等换行符进行行的区分(计算机语言里面没有换列符的说法);同理,在读取的时候,我们顺序从文件开头读取,也是每检测到一个换行符认为是一行。

我们比较读取第一行和第一列的区别,如果我们要读取文件的第一行,只需要遇到第一个换行符就可以结束了,而要读取第一列,则非得读完所有的行才行,基本上是读完整个文件。

但这样也有一个好处,就是为每条记录增加一个属性时,只需要增加一行即可,而行式存储则无法做到。

需求

现在,假设我们遇到这样的一个csv文件:
在这里插入图片描述
它有几个特点:

  • 标签在第一列,标签的值在第2列~第N列
  • 有些标签只有一个值,有些标签有N个值
  • ,作为分隔符

我们希望logstash将该csv解释为如下数据,并存储到ES中:
在这里插入图片描述
即:

  • 第一列作为field
  • 每一列的标签值作为一个doc
  • 只有一列的标签值,复制到每一列当中

解决思路

在上文已经提到了,如果我们要按列来生成记录(doc)存储到elasitcsearch里面,必须一次性读取整个文件。这样会带来一个问题,即文件有新增的时候,即为每条记录增加一个属性时,我们需要update之前生成的所有doc,这个问题可以解决,但我们先不在这里讨论。总之,要处理列式数据,我们不可能一行一行的读数据,因为logstash是流式处理,来一条数据会马上开始处理,处理之后会直接放到es,然后开始下一个数据的处理,而不会等所有数据来了之后再合并处理。而且根据worker数量的设置,该流程是并发的,并没有时序保证。因此,必须一次读完整个文件。我们可以使用filebeat,或者直接使用file plugin:

input{file {path => "/tmp/test.csv"start_position => "beginning"sincedb_path => "/dev/null"ignore_older => 0close_older => 0codec => multiline {pattern => "^\r\n"negate => "false"what => "previous"}}
}

注意,每个版本的logstash的参数不一样,而且最后一行需要有一个空行

当我们读完整个文件,该文件在logstash里面就是一个完整的event,此时,我们首先要提取第一列来作为field。这个可以采用kv插件。
文件读进来,在内存中是如下模型:

#Platform,V40_BGISEQXXX\r\n
#DateTime,2019-06-15 14:21:44\r\n
fovname,C003R003,C003R004,C003R005,C003R006,C003R007,C003R008,C003R009\r\n
...

我们首先要把第一个,转为其他符号,比如=,来方便kv插件操作。mutate插件的gsub可以帮我们做到:

filter {mutate {gsub => ["message", "(^.*?),","\1="]}
}

然后使用kv:

  kv {field_split => "\r\n"remove_field => ["message"]}

注意,处理完之后我们就可以丢弃message了。此时logstash的event应该包含为:

{"#Platform": "V40_BGISEQXXX","#DateTime": "2019-06-15 14:21:44","fovname": “C003R003,C003R004,C003R005,C003R006,C003R007,C003R008,C003R009”,...
}

接下来,我们需要将这个event按列拆分成多个event,然后每个event输出为一个doc到elasticsearch。具体可以参考split插件的做法,但这里必须使用ruby插件自己实现逻辑,这里给出参考࿱

这篇关于如何用logstash处理列式存储的文件的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/268582

相关文章

SQL Server数据库死锁处理超详细攻略

《SQLServer数据库死锁处理超详细攻略》SQLServer作为主流数据库管理系统,在高并发场景下可能面临死锁问题,影响系统性能和稳定性,这篇文章主要给大家介绍了关于SQLServer数据库死... 目录一、引言二、查询 Sqlserver 中造成死锁的 SPID三、用内置函数查询执行信息1. sp_w

Java对异常的认识与异常的处理小结

《Java对异常的认识与异常的处理小结》Java程序在运行时可能出现的错误或非正常情况称为异常,下面给大家介绍Java对异常的认识与异常的处理,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参... 目录一、认识异常与异常类型。二、异常的处理三、总结 一、认识异常与异常类型。(1)简单定义-什么是

MySQL存储过程之循环遍历查询的结果集详解

《MySQL存储过程之循环遍历查询的结果集详解》:本文主要介绍MySQL存储过程之循环遍历查询的结果集,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录前言1. 表结构2. 存储过程3. 关于存储过程的SQL补充总结前言近来碰到这样一个问题:在生产上导入的数据发现

Golang 日志处理和正则处理的操作方法

《Golang日志处理和正则处理的操作方法》:本文主要介绍Golang日志处理和正则处理的操作方法,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考... 目录1、logx日志处理1.1、logx简介1.2、日志初始化与配置1.3、常用方法1.4、配合defer

springboot加载不到nacos配置中心的配置问题处理

《springboot加载不到nacos配置中心的配置问题处理》:本文主要介绍springboot加载不到nacos配置中心的配置问题处理,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑... 目录springboot加载不到nacos配置中心的配置两种可能Spring Boot 版本Nacos

MySQL 存储引擎 MyISAM详解(最新推荐)

《MySQL存储引擎MyISAM详解(最新推荐)》使用MyISAM存储引擎的表占用空间很小,但是由于使用表级锁定,所以限制了读/写操作的性能,通常用于中小型的Web应用和数据仓库配置中的只读或主要... 目录mysql 5.5 之前默认的存储引擎️‍一、MyISAM 存储引擎的特性️‍二、MyISAM 的主

Linux lvm实例之如何创建一个专用于MySQL数据存储的LVM卷组

《Linuxlvm实例之如何创建一个专用于MySQL数据存储的LVM卷组》:本文主要介绍使用Linux创建一个专用于MySQL数据存储的LVM卷组的实例,具有很好的参考价值,希望对大家有所帮助,... 目录在Centos 7上创建卷China编程组并配置mysql数据目录1. 检查现有磁盘2. 创建物理卷3. 创

python web 开发之Flask中间件与请求处理钩子的最佳实践

《pythonweb开发之Flask中间件与请求处理钩子的最佳实践》Flask作为轻量级Web框架,提供了灵活的请求处理机制,中间件和请求钩子允许开发者在请求处理的不同阶段插入自定义逻辑,实现诸如... 目录Flask中间件与请求处理钩子完全指南1. 引言2. 请求处理生命周期概述3. 请求钩子详解3.1

Python处理大量Excel文件的十个技巧分享

《Python处理大量Excel文件的十个技巧分享》每天被大量Excel文件折磨的你看过来!这是一份Python程序员整理的实用技巧,不说废话,直接上干货,文章通过代码示例讲解的非常详细,需要的朋友可... 目录一、批量读取多个Excel文件二、选择性读取工作表和列三、自动调整格式和样式四、智能数据清洗五、

SpringBoot如何对密码等敏感信息进行脱敏处理

《SpringBoot如何对密码等敏感信息进行脱敏处理》这篇文章主要为大家详细介绍了SpringBoot对密码等敏感信息进行脱敏处理的几个常用方法,文中的示例代码讲解详细,感兴趣的小伙伴可以了解下... 目录​1. 配置文件敏感信息脱敏​​2. 日志脱敏​​3. API响应脱敏​​4. 其他注意事项​​总结