filebeat 数据采集流程

2024-02-29 19:48
文章标签 数据 流程 采集 filebeat

本文主要是介绍filebeat 数据采集流程,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

filebeat启动流程 讲解了filebeat的启动流程,filebeat在构建完crawler对象,开始采集流程。


Crawlerstart方法内,会启动Inputs

func (c *Crawler) Start(pipeline beat.Pipeline,r *registrar.Registrar,configInputs *common.Config,configModules *common.Config,pipelineLoaderFactory fileset.PipelineLoaderFactory,overwritePipelines bool,
) error {...for _, inputConfig := range c.inputConfigs {err := c.startInput(pipeline, inputConfig, r.GetStates())if err != nil {return err}}...
}

c.startInput(pipeline, inputConfig, r.GetStates())方法初始化Input

  1. 首先构建Input对象
  2. 运行Input
func (c *Crawler) startInput(pipeline beat.Pipeline,config *common.Config,states []file.State,
) error {if !config.Enabled() {return nil}connector := channel.ConnectTo(pipeline, c.out)p, err := input.New(config, connector, c.beatDone, states, nil)...// 开始收集p.Start()return nil
}

p.Start()方法内启动Input,他在一个单独的协程里运行。

这里的p是对Input的封装,他的Run方法是对某个接口的实现,因为我们用来收集日志,所以我们只需要关心filebeat/input/log/input.go文件内的Run方法。Run方法内部调用了Inputscan方法,开始采集数据。

// Run runs the input
func (p *Input) Run() {...p.scan()...
}

scan方法内首先获取所有的文件。其次获取文件状态,根据状态来判定收集最新数据,还是从历史文件收集。文件收集会构建Harvester对象。

// Scan starts a scanGlob for each provided path/glob
func (p *Input) scan() {var sortInfos []FileSortInfovar files []string// 获取应该获取到的所有文件paths := p.getFiles()var err error...for i := 0; i < len(paths); i++ {var path stringvar info os.FileInfoif sortInfos == nil {path = files[i]info = paths[path]} else {path = sortInfos[i].pathinfo = sortInfos[i].info}select {case <-p.done:logp.Info("Scan aborted because input stopped.")returndefault:}newState, err := getFileState(path, info, p)if err != nil {logp.Err("Skipping file %s due to error %s", path, err)}// Load last statelastState := p.states.FindPrevious(newState)...// Decides if previous state existsif lastState.IsEmpty() {logp.Debug("input", "Start harvester for new file: %s", newState.Source)// 准备构建 harvester 了err := p.startHarvester(newState, 0)if err == errHarvesterLimit {logp.Debug("input", harvesterErrMsg, newState.Source, err)continue}if err != nil {logp.Err(harvesterErrMsg, newState.Source, err)}} else {// 从历史文件开始处理p.harvestExistingFile(newState, lastState)}}
}

p.startHarvester(newState, 0)内构建harvester。(harvester是另一个filebeat官网描述的核心组件之一)

func (p *Input) startHarvester(state file.State, offset int64) error {if p.numHarvesters.Inc() > p.config.HarvesterLimit && p.config.HarvesterLimit > 0 {p.numHarvesters.Dec()harvesterSkipped.Add(1)return errHarvesterLimit}// Set state to "not" finished to indicate that a harvester is runningstate.Finished = falsestate.Offset = offset// Create harvester with state// 这部分构建了 harvesterh, err := p.createHarvester(state, func() { p.numHarvesters.Dec() })if err != nil {p.numHarvesters.Dec()return err}// 配置 harvestererr = h.Setup()if err != nil {p.numHarvesters.Dec()return fmt.Errorf("error setting up harvester: %s", err)}// Update state before staring harvester// This makes sure the states is set to Finished: false// This is synchronous state update as part of the scanh.SendStateUpdate()// 启动 harvesterif err = p.harvesters.Start(h); err != nil {p.numHarvesters.Dec()}return err
}
  1. p.createHarvester构建harvester
  2. p.Setup配置harvesterSetup方法内会初始化文件相关的内容,以及构建文件reader
  3. p.harvesters.Start(h)运行harvester

主要还是要看harvesters.Start方法,会在单独的协程内运行harvester

func (r *Registry) Start(h Harvester) error {// Make sure stop is not called during starting a harvesterr.Lock()defer r.Unlock()...go func() {defer func() {r.remove(h)r.wg.Done()}()// 异步运行err := h.Run()if err != nil {logp.Err("Error running input: %v", err)}}()return nil
}

harvester.Run方法真是长。。

func (h *Harvester) Run() error {// 这坨简直了for {...// 读取文件内容message, err := h.reader.Next()// 糟糕的异常处理。。。if err != nil {switch err {...}return nil}state := h.getState()startingOffset := state.Offsetstate.Offset += int64(message.Bytes)...// 读取到的文件内容text := string(message.Content)...// 数据内容都包装在 data 内,harvester 发送 data,其实就是 forwarder 转发的if !h.sendEvent(data, forwarder) {return nil}// Update state of harvester as successfully senth.state = state}
}

h.sendEvent(data, forwarder)这段代码将采集的数据发送到下游,内部其实就是用forwarder转发了数据。

到这里数据的采集流程应该就差不多了,剩下的是数据的发送流程。

这篇关于filebeat 数据采集流程的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/759876

相关文章

SpringBoot多环境配置数据读取方式

《SpringBoot多环境配置数据读取方式》SpringBoot通过环境隔离机制,支持properties/yaml/yml多格式配置,结合@Value、Environment和@Configura... 目录一、多环境配置的核心思路二、3种配置文件格式详解2.1 properties格式(传统格式)1.

解决pandas无法读取csv文件数据的问题

《解决pandas无法读取csv文件数据的问题》本文讲述作者用Pandas读取CSV文件时因参数设置不当导致数据错位,通过调整delimiter和on_bad_lines参数最终解决问题,并强调正确参... 目录一、前言二、问题复现1. 问题2. 通过 on_bad_lines=‘warn’ 跳过异常数据3

Spring Boot 中的默认异常处理机制及执行流程

《SpringBoot中的默认异常处理机制及执行流程》SpringBoot内置BasicErrorController,自动处理异常并生成HTML/JSON响应,支持自定义错误路径、配置及扩展,如... 目录Spring Boot 异常处理机制详解默认错误页面功能自动异常转换机制错误属性配置选项默认错误处理

C#监听txt文档获取新数据方式

《C#监听txt文档获取新数据方式》文章介绍通过监听txt文件获取最新数据,并实现开机自启动、禁用窗口关闭按钮、阻止Ctrl+C中断及防止程序退出等功能,代码整合于主函数中,供参考学习... 目录前言一、监听txt文档增加数据二、其他功能1. 设置开机自启动2. 禁止控制台窗口关闭按钮3. 阻止Ctrl +

java如何实现高并发场景下三级缓存的数据一致性

《java如何实现高并发场景下三级缓存的数据一致性》这篇文章主要为大家详细介绍了java如何实现高并发场景下三级缓存的数据一致性,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 下面代码是一个使用Java和Redisson实现的三级缓存服务,主要功能包括:1.缓存结构:本地缓存:使

在MySQL中实现冷热数据分离的方法及使用场景底层原理解析

《在MySQL中实现冷热数据分离的方法及使用场景底层原理解析》MySQL冷热数据分离通过分表/分区策略、数据归档和索引优化,将频繁访问的热数据与冷数据分开存储,提升查询效率并降低存储成本,适用于高并发... 目录实现冷热数据分离1. 分表策略2. 使用分区表3. 数据归档与迁移在mysql中实现冷热数据分

Spring Boot从main方法到内嵌Tomcat的全过程(自动化流程)

《SpringBoot从main方法到内嵌Tomcat的全过程(自动化流程)》SpringBoot启动始于main方法,创建SpringApplication实例,初始化上下文,准备环境,刷新容器并... 目录1. 入口:main方法2. SpringApplication初始化2.1 构造阶段3. 运行阶

C#解析JSON数据全攻略指南

《C#解析JSON数据全攻略指南》这篇文章主要为大家详细介绍了使用C#解析JSON数据全攻略指南,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录一、为什么jsON是C#开发必修课?二、四步搞定网络JSON数据1. 获取数据 - HttpClient最佳实践2. 动态解析 - 快速

使用Go实现文件复制的完整流程

《使用Go实现文件复制的完整流程》本案例将实现一个实用的文件操作工具:将一个文件的内容完整复制到另一个文件中,这是文件处理中的常见任务,比如配置文件备份、日志迁移、用户上传文件转存等,文中通过代码示例... 目录案例说明涉及China编程知识点示例代码代码解析示例运行练习扩展小结案例说明我们将通过标准库 os

Ubuntu 24.04启用root图形登录的操作流程

《Ubuntu24.04启用root图形登录的操作流程》Ubuntu默认禁用root账户的图形与SSH登录,这是为了安全,但在某些场景你可能需要直接用root登录GNOME桌面,本文以Ubuntu2... 目录一、前言二、准备工作三、设置 root 密码四、启用图形界面 root 登录1. 修改 GDM 配