filebeat 数据采集流程

2024-02-29 19:48
文章标签 数据 流程 采集 filebeat

本文主要是介绍filebeat 数据采集流程,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

filebeat启动流程 讲解了filebeat的启动流程,filebeat在构建完crawler对象,开始采集流程。


Crawlerstart方法内,会启动Inputs

func (c *Crawler) Start(pipeline beat.Pipeline,r *registrar.Registrar,configInputs *common.Config,configModules *common.Config,pipelineLoaderFactory fileset.PipelineLoaderFactory,overwritePipelines bool,
) error {...for _, inputConfig := range c.inputConfigs {err := c.startInput(pipeline, inputConfig, r.GetStates())if err != nil {return err}}...
}

c.startInput(pipeline, inputConfig, r.GetStates())方法初始化Input

  1. 首先构建Input对象
  2. 运行Input
func (c *Crawler) startInput(pipeline beat.Pipeline,config *common.Config,states []file.State,
) error {if !config.Enabled() {return nil}connector := channel.ConnectTo(pipeline, c.out)p, err := input.New(config, connector, c.beatDone, states, nil)...// 开始收集p.Start()return nil
}

p.Start()方法内启动Input,他在一个单独的协程里运行。

这里的p是对Input的封装,他的Run方法是对某个接口的实现,因为我们用来收集日志,所以我们只需要关心filebeat/input/log/input.go文件内的Run方法。Run方法内部调用了Inputscan方法,开始采集数据。

// Run runs the input
func (p *Input) Run() {...p.scan()...
}

scan方法内首先获取所有的文件。其次获取文件状态,根据状态来判定收集最新数据,还是从历史文件收集。文件收集会构建Harvester对象。

// Scan starts a scanGlob for each provided path/glob
func (p *Input) scan() {var sortInfos []FileSortInfovar files []string// 获取应该获取到的所有文件paths := p.getFiles()var err error...for i := 0; i < len(paths); i++ {var path stringvar info os.FileInfoif sortInfos == nil {path = files[i]info = paths[path]} else {path = sortInfos[i].pathinfo = sortInfos[i].info}select {case <-p.done:logp.Info("Scan aborted because input stopped.")returndefault:}newState, err := getFileState(path, info, p)if err != nil {logp.Err("Skipping file %s due to error %s", path, err)}// Load last statelastState := p.states.FindPrevious(newState)...// Decides if previous state existsif lastState.IsEmpty() {logp.Debug("input", "Start harvester for new file: %s", newState.Source)// 准备构建 harvester 了err := p.startHarvester(newState, 0)if err == errHarvesterLimit {logp.Debug("input", harvesterErrMsg, newState.Source, err)continue}if err != nil {logp.Err(harvesterErrMsg, newState.Source, err)}} else {// 从历史文件开始处理p.harvestExistingFile(newState, lastState)}}
}

p.startHarvester(newState, 0)内构建harvester。(harvester是另一个filebeat官网描述的核心组件之一)

func (p *Input) startHarvester(state file.State, offset int64) error {if p.numHarvesters.Inc() > p.config.HarvesterLimit && p.config.HarvesterLimit > 0 {p.numHarvesters.Dec()harvesterSkipped.Add(1)return errHarvesterLimit}// Set state to "not" finished to indicate that a harvester is runningstate.Finished = falsestate.Offset = offset// Create harvester with state// 这部分构建了 harvesterh, err := p.createHarvester(state, func() { p.numHarvesters.Dec() })if err != nil {p.numHarvesters.Dec()return err}// 配置 harvestererr = h.Setup()if err != nil {p.numHarvesters.Dec()return fmt.Errorf("error setting up harvester: %s", err)}// Update state before staring harvester// This makes sure the states is set to Finished: false// This is synchronous state update as part of the scanh.SendStateUpdate()// 启动 harvesterif err = p.harvesters.Start(h); err != nil {p.numHarvesters.Dec()}return err
}
  1. p.createHarvester构建harvester
  2. p.Setup配置harvesterSetup方法内会初始化文件相关的内容,以及构建文件reader
  3. p.harvesters.Start(h)运行harvester

主要还是要看harvesters.Start方法,会在单独的协程内运行harvester

func (r *Registry) Start(h Harvester) error {// Make sure stop is not called during starting a harvesterr.Lock()defer r.Unlock()...go func() {defer func() {r.remove(h)r.wg.Done()}()// 异步运行err := h.Run()if err != nil {logp.Err("Error running input: %v", err)}}()return nil
}

harvester.Run方法真是长。。

func (h *Harvester) Run() error {// 这坨简直了for {...// 读取文件内容message, err := h.reader.Next()// 糟糕的异常处理。。。if err != nil {switch err {...}return nil}state := h.getState()startingOffset := state.Offsetstate.Offset += int64(message.Bytes)...// 读取到的文件内容text := string(message.Content)...// 数据内容都包装在 data 内,harvester 发送 data,其实就是 forwarder 转发的if !h.sendEvent(data, forwarder) {return nil}// Update state of harvester as successfully senth.state = state}
}

h.sendEvent(data, forwarder)这段代码将采集的数据发送到下游,内部其实就是用forwarder转发了数据。

到这里数据的采集流程应该就差不多了,剩下的是数据的发送流程。

这篇关于filebeat 数据采集流程的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/759876

相关文章

使用Java将各种数据写入Excel表格的操作示例

《使用Java将各种数据写入Excel表格的操作示例》在数据处理与管理领域,Excel凭借其强大的功能和广泛的应用,成为了数据存储与展示的重要工具,在Java开发过程中,常常需要将不同类型的数据,本文... 目录前言安装免费Java库1. 写入文本、或数值到 Excel单元格2. 写入数组到 Excel表格

python处理带有时区的日期和时间数据

《python处理带有时区的日期和时间数据》这篇文章主要为大家详细介绍了如何在Python中使用pytz库处理时区信息,包括获取当前UTC时间,转换为特定时区等,有需要的小伙伴可以参考一下... 目录时区基本信息python datetime使用timezonepandas处理时区数据知识延展时区基本信息

Qt实现网络数据解析的方法总结

《Qt实现网络数据解析的方法总结》在Qt中解析网络数据通常涉及接收原始字节流,并将其转换为有意义的应用层数据,这篇文章为大家介绍了详细步骤和示例,感兴趣的小伙伴可以了解下... 目录1. 网络数据接收2. 缓冲区管理(处理粘包/拆包)3. 常见数据格式解析3.1 jsON解析3.2 XML解析3.3 自定义

SpringMVC 通过ajax 前后端数据交互的实现方法

《SpringMVC通过ajax前后端数据交互的实现方法》:本文主要介绍SpringMVC通过ajax前后端数据交互的实现方法,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价... 在前端的开发过程中,经常在html页面通过AJAX进行前后端数据的交互,SpringMVC的controll

Pandas统计每行数据中的空值的方法示例

《Pandas统计每行数据中的空值的方法示例》处理缺失数据(NaN值)是一个非常常见的问题,本文主要介绍了Pandas统计每行数据中的空值的方法示例,具有一定的参考价值,感兴趣的可以了解一下... 目录什么是空值?为什么要统计空值?准备工作创建示例数据统计每行空值数量进一步分析www.chinasem.cn处

如何使用 Python 读取 Excel 数据

《如何使用Python读取Excel数据》:本文主要介绍使用Python读取Excel数据的详细教程,通过pandas和openpyxl,你可以轻松读取Excel文件,并进行各种数据处理操... 目录使用 python 读取 Excel 数据的详细教程1. 安装必要的依赖2. 读取 Excel 文件3. 读

Spring 请求之传递 JSON 数据的操作方法

《Spring请求之传递JSON数据的操作方法》JSON就是一种数据格式,有自己的格式和语法,使用文本表示一个对象或数组的信息,因此JSON本质是字符串,主要负责在不同的语言中数据传递和交换,这... 目录jsON 概念JSON 语法JSON 的语法JSON 的两种结构JSON 字符串和 Java 对象互转

C++如何通过Qt反射机制实现数据类序列化

《C++如何通过Qt反射机制实现数据类序列化》在C++工程中经常需要使用数据类,并对数据类进行存储、打印、调试等操作,所以本文就来聊聊C++如何通过Qt反射机制实现数据类序列化吧... 目录设计预期设计思路代码实现使用方法在 C++ 工程中经常需要使用数据类,并对数据类进行存储、打印、调试等操作。由于数据类

SpringBoot使用GZIP压缩反回数据问题

《SpringBoot使用GZIP压缩反回数据问题》:本文主要介绍SpringBoot使用GZIP压缩反回数据问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录SpringBoot使用GZIP压缩反回数据1、初识gzip2、gzip是什么,可以干什么?3、Spr

将Java项目提交到云服务器的流程步骤

《将Java项目提交到云服务器的流程步骤》所谓将项目提交到云服务器即将你的项目打成一个jar包然后提交到云服务器即可,因此我们需要准备服务器环境为:Linux+JDK+MariDB(MySQL)+Gi... 目录1. 安装 jdk1.1 查看 jdk 版本1.2 下载 jdk2. 安装 mariadb(my