filebeat 数据采集流程

2024-02-29 19:48
文章标签 数据 流程 采集 filebeat

本文主要是介绍filebeat 数据采集流程,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

filebeat启动流程 讲解了filebeat的启动流程,filebeat在构建完crawler对象,开始采集流程。


Crawlerstart方法内,会启动Inputs

func (c *Crawler) Start(pipeline beat.Pipeline,r *registrar.Registrar,configInputs *common.Config,configModules *common.Config,pipelineLoaderFactory fileset.PipelineLoaderFactory,overwritePipelines bool,
) error {...for _, inputConfig := range c.inputConfigs {err := c.startInput(pipeline, inputConfig, r.GetStates())if err != nil {return err}}...
}

c.startInput(pipeline, inputConfig, r.GetStates())方法初始化Input

  1. 首先构建Input对象
  2. 运行Input
func (c *Crawler) startInput(pipeline beat.Pipeline,config *common.Config,states []file.State,
) error {if !config.Enabled() {return nil}connector := channel.ConnectTo(pipeline, c.out)p, err := input.New(config, connector, c.beatDone, states, nil)...// 开始收集p.Start()return nil
}

p.Start()方法内启动Input,他在一个单独的协程里运行。

这里的p是对Input的封装,他的Run方法是对某个接口的实现,因为我们用来收集日志,所以我们只需要关心filebeat/input/log/input.go文件内的Run方法。Run方法内部调用了Inputscan方法,开始采集数据。

// Run runs the input
func (p *Input) Run() {...p.scan()...
}

scan方法内首先获取所有的文件。其次获取文件状态,根据状态来判定收集最新数据,还是从历史文件收集。文件收集会构建Harvester对象。

// Scan starts a scanGlob for each provided path/glob
func (p *Input) scan() {var sortInfos []FileSortInfovar files []string// 获取应该获取到的所有文件paths := p.getFiles()var err error...for i := 0; i < len(paths); i++ {var path stringvar info os.FileInfoif sortInfos == nil {path = files[i]info = paths[path]} else {path = sortInfos[i].pathinfo = sortInfos[i].info}select {case <-p.done:logp.Info("Scan aborted because input stopped.")returndefault:}newState, err := getFileState(path, info, p)if err != nil {logp.Err("Skipping file %s due to error %s", path, err)}// Load last statelastState := p.states.FindPrevious(newState)...// Decides if previous state existsif lastState.IsEmpty() {logp.Debug("input", "Start harvester for new file: %s", newState.Source)// 准备构建 harvester 了err := p.startHarvester(newState, 0)if err == errHarvesterLimit {logp.Debug("input", harvesterErrMsg, newState.Source, err)continue}if err != nil {logp.Err(harvesterErrMsg, newState.Source, err)}} else {// 从历史文件开始处理p.harvestExistingFile(newState, lastState)}}
}

p.startHarvester(newState, 0)内构建harvester。(harvester是另一个filebeat官网描述的核心组件之一)

func (p *Input) startHarvester(state file.State, offset int64) error {if p.numHarvesters.Inc() > p.config.HarvesterLimit && p.config.HarvesterLimit > 0 {p.numHarvesters.Dec()harvesterSkipped.Add(1)return errHarvesterLimit}// Set state to "not" finished to indicate that a harvester is runningstate.Finished = falsestate.Offset = offset// Create harvester with state// 这部分构建了 harvesterh, err := p.createHarvester(state, func() { p.numHarvesters.Dec() })if err != nil {p.numHarvesters.Dec()return err}// 配置 harvestererr = h.Setup()if err != nil {p.numHarvesters.Dec()return fmt.Errorf("error setting up harvester: %s", err)}// Update state before staring harvester// This makes sure the states is set to Finished: false// This is synchronous state update as part of the scanh.SendStateUpdate()// 启动 harvesterif err = p.harvesters.Start(h); err != nil {p.numHarvesters.Dec()}return err
}
  1. p.createHarvester构建harvester
  2. p.Setup配置harvesterSetup方法内会初始化文件相关的内容,以及构建文件reader
  3. p.harvesters.Start(h)运行harvester

主要还是要看harvesters.Start方法,会在单独的协程内运行harvester

func (r *Registry) Start(h Harvester) error {// Make sure stop is not called during starting a harvesterr.Lock()defer r.Unlock()...go func() {defer func() {r.remove(h)r.wg.Done()}()// 异步运行err := h.Run()if err != nil {logp.Err("Error running input: %v", err)}}()return nil
}

harvester.Run方法真是长。。

func (h *Harvester) Run() error {// 这坨简直了for {...// 读取文件内容message, err := h.reader.Next()// 糟糕的异常处理。。。if err != nil {switch err {...}return nil}state := h.getState()startingOffset := state.Offsetstate.Offset += int64(message.Bytes)...// 读取到的文件内容text := string(message.Content)...// 数据内容都包装在 data 内,harvester 发送 data,其实就是 forwarder 转发的if !h.sendEvent(data, forwarder) {return nil}// Update state of harvester as successfully senth.state = state}
}

h.sendEvent(data, forwarder)这段代码将采集的数据发送到下游,内部其实就是用forwarder转发了数据。

到这里数据的采集流程应该就差不多了,剩下的是数据的发送流程。

这篇关于filebeat 数据采集流程的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/759876

相关文章

SpringBoot整合Flowable实现工作流的详细流程

《SpringBoot整合Flowable实现工作流的详细流程》Flowable是一个使用Java编写的轻量级业务流程引擎,Flowable流程引擎可用于部署BPMN2.0流程定义,创建这些流程定义的... 目录1、流程引擎介绍2、创建项目3、画流程图4、开发接口4.1 Java 类梳理4.2 查看流程图4

SQL Server修改数据库名及物理数据文件名操作步骤

《SQLServer修改数据库名及物理数据文件名操作步骤》在SQLServer中重命名数据库是一个常见的操作,但需要确保用户具有足够的权限来执行此操作,:本文主要介绍SQLServer修改数据... 目录一、背景介绍二、操作步骤2.1 设置为单用户模式(断开连接)2.2 修改数据库名称2.3 查找逻辑文件名

canal实现mysql数据同步的详细过程

《canal实现mysql数据同步的详细过程》:本文主要介绍canal实现mysql数据同步的详细过程,本文通过实例图文相结合给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的... 目录1、canal下载2、mysql同步用户创建和授权3、canal admin安装和启动4、canal

java Long 与long之间的转换流程

《javaLong与long之间的转换流程》Long类提供了一些方法,用于在long和其他数据类型(如String)之间进行转换,本文将详细介绍如何在Java中实现Long和long之间的转换,感... 目录概述流程步骤1:将long转换为Long对象步骤2:将Longhttp://www.cppcns.c

使用SpringBoot整合Sharding Sphere实现数据脱敏的示例

《使用SpringBoot整合ShardingSphere实现数据脱敏的示例》ApacheShardingSphere数据脱敏模块,通过SQL拦截与改写实现敏感信息加密存储,解决手动处理繁琐及系统改... 目录痛点一:痛点二:脱敏配置Quick Start——Spring 显示配置:1.引入依赖2.创建脱敏

详解如何使用Python构建从数据到文档的自动化工作流

《详解如何使用Python构建从数据到文档的自动化工作流》这篇文章将通过真实工作场景拆解,为大家展示如何用Python构建自动化工作流,让工具代替人力完成这些数字苦力活,感兴趣的小伙伴可以跟随小编一起... 目录一、Excel处理:从数据搬运工到智能分析师二、PDF处理:文档工厂的智能生产线三、邮件自动化:

Python数据分析与可视化的全面指南(从数据清洗到图表呈现)

《Python数据分析与可视化的全面指南(从数据清洗到图表呈现)》Python是数据分析与可视化领域中最受欢迎的编程语言之一,凭借其丰富的库和工具,Python能够帮助我们快速处理、分析数据并生成高质... 目录一、数据采集与初步探索二、数据清洗的七种武器1. 缺失值处理策略2. 异常值检测与修正3. 数据

pandas实现数据concat拼接的示例代码

《pandas实现数据concat拼接的示例代码》pandas.concat用于合并DataFrame或Series,本文主要介绍了pandas实现数据concat拼接的示例代码,具有一定的参考价值,... 目录语法示例:使用pandas.concat合并数据默认的concat:参数axis=0,join=

C#代码实现解析WTGPS和BD数据

《C#代码实现解析WTGPS和BD数据》在现代的导航与定位应用中,准确解析GPS和北斗(BD)等卫星定位数据至关重要,本文将使用C#语言实现解析WTGPS和BD数据,需要的可以了解下... 目录一、代码结构概览1. 核心解析方法2. 位置信息解析3. 经纬度转换方法4. 日期和时间戳解析5. 辅助方法二、L

使用Python和Matplotlib实现可视化字体轮廓(从路径数据到矢量图形)

《使用Python和Matplotlib实现可视化字体轮廓(从路径数据到矢量图形)》字体设计和矢量图形处理是编程中一个有趣且实用的领域,通过Python的matplotlib库,我们可以轻松将字体轮廓... 目录背景知识字体轮廓的表示实现步骤1. 安装依赖库2. 准备数据3. 解析路径指令4. 绘制图形关键