如何用logstash处理列式存储的文件

2023-10-23 14:59
文章标签 处理 存储 logstash 列式

本文主要是介绍如何用logstash处理列式存储的文件,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

背景

最近遇到一个问题,朋友需要使用es去处理一些基因数据,其特点和其他的数据不一样,对象的个数很少,但每个对象下面有很多field。并且field的值是动态添加的,用列式存储数据是最为方便的。

方便起见,画了个示意图,file1是行式存储,即我们常见的csv,第一行是标题,后面每一行就是一条记录。

而file2,则是列式存储,第一列式header,后面每一列都是一条记录
在这里插入图片描述
要使用logstash或者其他任何工具处理这个文本都会带来不小的麻烦。因为,对于文件的处理,我们是按行写入的,通过\n等换行符进行行的区分(计算机语言里面没有换列符的说法);同理,在读取的时候,我们顺序从文件开头读取,也是每检测到一个换行符认为是一行。

我们比较读取第一行和第一列的区别,如果我们要读取文件的第一行,只需要遇到第一个换行符就可以结束了,而要读取第一列,则非得读完所有的行才行,基本上是读完整个文件。

但这样也有一个好处,就是为每条记录增加一个属性时,只需要增加一行即可,而行式存储则无法做到。

需求

现在,假设我们遇到这样的一个csv文件:
在这里插入图片描述
它有几个特点:

  • 标签在第一列,标签的值在第2列~第N列
  • 有些标签只有一个值,有些标签有N个值
  • ,作为分隔符

我们希望logstash将该csv解释为如下数据,并存储到ES中:
在这里插入图片描述
即:

  • 第一列作为field
  • 每一列的标签值作为一个doc
  • 只有一列的标签值,复制到每一列当中

解决思路

在上文已经提到了,如果我们要按列来生成记录(doc)存储到elasitcsearch里面,必须一次性读取整个文件。这样会带来一个问题,即文件有新增的时候,即为每条记录增加一个属性时,我们需要update之前生成的所有doc,这个问题可以解决,但我们先不在这里讨论。总之,要处理列式数据,我们不可能一行一行的读数据,因为logstash是流式处理,来一条数据会马上开始处理,处理之后会直接放到es,然后开始下一个数据的处理,而不会等所有数据来了之后再合并处理。而且根据worker数量的设置,该流程是并发的,并没有时序保证。因此,必须一次读完整个文件。我们可以使用filebeat,或者直接使用file plugin:

input{file {path => "/tmp/test.csv"start_position => "beginning"sincedb_path => "/dev/null"ignore_older => 0close_older => 0codec => multiline {pattern => "^\r\n"negate => "false"what => "previous"}}
}

注意,每个版本的logstash的参数不一样,而且最后一行需要有一个空行

当我们读完整个文件,该文件在logstash里面就是一个完整的event,此时,我们首先要提取第一列来作为field。这个可以采用kv插件。
文件读进来,在内存中是如下模型:

#Platform,V40_BGISEQXXX\r\n
#DateTime,2019-06-15 14:21:44\r\n
fovname,C003R003,C003R004,C003R005,C003R006,C003R007,C003R008,C003R009\r\n
...

我们首先要把第一个,转为其他符号,比如=,来方便kv插件操作。mutate插件的gsub可以帮我们做到:

filter {mutate {gsub => ["message", "(^.*?),","\1="]}
}

然后使用kv:

  kv {field_split => "\r\n"remove_field => ["message"]}

注意,处理完之后我们就可以丢弃message了。此时logstash的event应该包含为:

{"#Platform": "V40_BGISEQXXX","#DateTime": "2019-06-15 14:21:44","fovname": “C003R003,C003R004,C003R005,C003R006,C003R007,C003R008,C003R009”,...
}

接下来,我们需要将这个event按列拆分成多个event,然后每个event输出为一个doc到elasticsearch。具体可以参考split插件的做法,但这里必须使用ruby插件自己实现逻辑,这里给出参考࿱

这篇关于如何用logstash处理列式存储的文件的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/268582

相关文章

Spring Boot 中的默认异常处理机制及执行流程

《SpringBoot中的默认异常处理机制及执行流程》SpringBoot内置BasicErrorController,自动处理异常并生成HTML/JSON响应,支持自定义错误路径、配置及扩展,如... 目录Spring Boot 异常处理机制详解默认错误页面功能自动异常转换机制错误属性配置选项默认错误处理

SpringBoot 异常处理/自定义格式校验的问题实例详解

《SpringBoot异常处理/自定义格式校验的问题实例详解》文章探讨SpringBoot中自定义注解校验问题,区分参数级与类级约束触发的异常类型,建议通过@RestControllerAdvice... 目录1. 问题简要描述2. 异常触发1) 参数级别约束2) 类级别约束3. 异常处理1) 字段级别约束

Java堆转储文件之1.6G大文件处理完整指南

《Java堆转储文件之1.6G大文件处理完整指南》堆转储文件是优化、分析内存消耗的重要工具,:本文主要介绍Java堆转储文件之1.6G大文件处理的相关资料,文中通过代码介绍的非常详细,需要的朋友可... 目录前言文件为什么这么大?如何处理这个文件?分析文件内容(推荐)删除文件(如果不需要)查看错误来源如何避

使用Python构建一个高效的日志处理系统

《使用Python构建一个高效的日志处理系统》这篇文章主要为大家详细讲解了如何使用Python开发一个专业的日志分析工具,能够自动化处理、分析和可视化各类日志文件,大幅提升运维效率,需要的可以了解下... 目录环境准备工具功能概述完整代码实现代码深度解析1. 类设计与初始化2. 日志解析核心逻辑3. 文件处

Java docx4j高效处理Word文档的实战指南

《Javadocx4j高效处理Word文档的实战指南》对于需要在Java应用程序中生成、修改或处理Word文档的开发者来说,docx4j是一个强大而专业的选择,下面我们就来看看docx4j的具体使用... 目录引言一、环境准备与基础配置1.1 Maven依赖配置1.2 初始化测试类二、增强版文档操作示例2.

MyBatis-Plus通用中等、大量数据分批查询和处理方法

《MyBatis-Plus通用中等、大量数据分批查询和处理方法》文章介绍MyBatis-Plus分页查询处理,通过函数式接口与Lambda表达式实现通用逻辑,方法抽象但功能强大,建议扩展分批处理及流式... 目录函数式接口获取分页数据接口数据处理接口通用逻辑工具类使用方法简单查询自定义查询方法总结函数式接口

SpringBoot3.X 整合 MinIO 存储原生方案

《SpringBoot3.X整合MinIO存储原生方案》本文详细介绍了SpringBoot3.X整合MinIO的原生方案,从环境搭建到核心功能实现,涵盖了文件上传、下载、删除等常用操作,并补充了... 目录SpringBoot3.X整合MinIO存储原生方案:从环境搭建到实战开发一、前言:为什么选择MinI

SpringBoot结合Docker进行容器化处理指南

《SpringBoot结合Docker进行容器化处理指南》在当今快速发展的软件工程领域,SpringBoot和Docker已经成为现代Java开发者的必备工具,本文将深入讲解如何将一个SpringBo... 目录前言一、为什么选择 Spring Bootjavascript + docker1. 快速部署与

Python使用vllm处理多模态数据的预处理技巧

《Python使用vllm处理多模态数据的预处理技巧》本文深入探讨了在Python环境下使用vLLM处理多模态数据的预处理技巧,我们将从基础概念出发,详细讲解文本、图像、音频等多模态数据的预处理方法,... 目录1. 背景介绍1.1 目的和范围1.2 预期读者1.3 文档结构概述1.4 术语表1.4.1 核

Spring Boot @RestControllerAdvice全局异常处理最佳实践

《SpringBoot@RestControllerAdvice全局异常处理最佳实践》本文详解SpringBoot中通过@RestControllerAdvice实现全局异常处理,强调代码复用、统... 目录前言一、为什么要使用全局异常处理?二、核心注解解析1. @RestControllerAdvice2