Logstash【从无到有从有到无】【L10】数据弹性(Data Resiliency)

本文主要是介绍Logstash【从无到有从有到无】【L10】数据弹性(Data Resiliency),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

目录

1.数据弹性(Data Resiliency)

2.持久队列(Persistent Queues)

2.1.持久队列的限制

2.2.持久队列如何工作

2.3.配置持久队列

2.4.处理背压(Handing Back Pressure)

2.5.控制耐久性(Controlling Durability)

2.6.磁盘垃圾收集

3.死信队列(Dead Letter Queues)

3.1.配置Logstash以使用死信队列

3.1.1.文件轮换(File Rotation)

3.2.处理死信队列

3.3.从时间戳读取

3.4.示例:处理具有映射错误的数据


1.数据弹性(Data Resiliency)

当数据流经事件处理管道时,Logstash可能会遇到阻止其向配置的输出传递事件的情况。例如,数据可能包含意外的数据类型,或者Logstash可能异常终止。

为防止数据丢失并确保事件不间断地流经管道,Logstash提供以下数据弹性功能。

  • 持久性队列通过将事件存储在磁盘上的内部队列中来防止数据丢失。
  • 死信队列为Logstash无法处理的事件提供磁盘存储。您可以使用dead_letter_queue输入插件轻松地重新处理死信队列中的事件。

默认情况下禁用这些弹性功能。要打开这些功能,您必须在Logstash 设置文件中明确启用它们。

 

2.持久队列(Persistent Queues)

默认情况下,在管道阶段(输入→管道工作者)之间Logstash使用内存中有界队列来缓冲事件。这些内存中队列的大小是固定的,不可配置。如果Logstash遇到临时计算机故障,则内存中队列的内容将丢失。临时机器故障是Logstash或其主机异常终止但能够重新启动的情况。

为了防止异常终止期间的数据丢失,Logstash具有持久队列功能,该功能将消息队列存储在磁盘上。持久队列提供Logstash中数据的持久性。

持久队列对于需要大缓冲区的Logstash部署也很有用。您可以启用持久队列来缓冲磁盘上的事件并删除消息代理,而不是部署和管理消息代理(如Redis,RabbitMQ或Apache Kafka)以促进缓冲的发布 - 订阅者模型。

总之,启用持久队列的好处如下:

  • 在不需要外部缓冲机制(如Redis或Apache Kafka)的情况下吸收突发事件。
  • 在正常关闭期间以及Logstash异常终止时提供至少一次传递保证以防止消息丢失。如果在事件发生时重新启动Logstash,Logstash将尝试传递存储在持久队列中的消息,直到传递成功至少一次。

您必须 queue.checkpoint.writes: 1 明确设置以保证所有输入事件的最大持久性。请参阅控制耐久性。

 

2.1.持久队列的限制

以下是持久队列功能无法解决的问题:

  • 不能保护不使用请求 - 响应协议的输入插件无法防止数据丢失。例如:tcp,udp,zeromq push + pull,以及许多其他输入没有确认收件人的机制。beats和http等具有确认功能的插件都受到这个队列的良好保护。
  • 它不处理永久性机器故障,例如磁盘损坏,磁盘故障和机器丢失。持久存储到磁盘的数据不会被复制。

 

2.2.持久队列如何工作

队列位于同一进程中的输入和过滤阶段之间:

input → queue → filter + output

输入→ 队列 → 过滤器 + 输出

  1. 当输入已准备好处理事件时,它会将它们写入队列。
  2. 当对队列的写入成功时,输入可以向其数据源发送确认。

处理队列中的事件时,只有在过滤器和输出完成后,Logstash才会在队列中确认(acknowledges)已完成的事件。队列记录管道已处理的事件。当且仅当事件已由Logstash管道完全处理时,事件被记录为已处理(在本文档中,称为“已确认(acknowledged)”或“已确认(ACKed)”)。

确认(acknowledges)什么意思?这意味着事件已由所有已配置的过滤器和输出处理。例如,如果您只有一个输出,Elasticsearch,则在Elasticsearch输出成功将此事件发送到Elasticsearch时会确认(ACKed )事件。

在正常关闭期间(CTRL + C或SIGTERM),Logstash将停止从队列中读取并将完成处理由过滤器和输出处理的正在进行的事件。重启后,Logstash将继续处理持久队列中的事件以及接受来自输入的新事件。

如果Logstash异常终止,则任何正在进行的事件都不会被确认,并且在重新启动Logstash时将由过滤器和输出进行重新处理。Logstash批量处理事件,因此对于任何给定的批次,可能已成功完成某些批次,但在发生异常终止时未记录为已确认。

有关队列写入和确认的特定行为的更多详细信息,请参阅 控制持久性。

 

2.3.配置持久队列

要配置持久队列,可以在 Logstash 设置文件中指定以下选项:

  • queue.type:指定persisted启用持久队列。默认情况下,禁用持久队列(默认值:queue.type: memory) 。
  • path.queue:将存储数据文件的目录路径。默认情况下,文件存储在 path.data/queue
  • queue.page_capacity:队列页面的最大大小(以字节为单位)。队列数据由称为“pages”的仅附加文件组成。默认大小为64mb。更改此值不太可能带来性能优势。
  • queue.drain:指定true,如果希望Logstash在关闭之前等待持久队列耗尽。耗尽队列所需的时间取决于队列中累积的事件数。因此,您应该避免使用此设置,除非队列(即使已满)相对较小并且可以快速耗尽。
  • queue.max_events:队列中允许的最大事件数。默认值为0(无限制)。
  • queue.max_bytes:队列的总容量,以字节数表示。默认值为1024mb(1gb)。确保磁盘驱动器的容量大于此处指定的值。

如果使用持久队列来防止数据丢失,但不需要太多缓冲,则可以设置queue.max_bytes较小的值(例如10mb)来生成较小的队列并提高队列性能。

如果同时指定了queue.max_events和 queue.max_bytes,则Logstash将使用最先达到的标准。请参阅处理背压(Handling Back Pressure)以了解达到这些队列限制时的行为。

您还可以通过设置queue.checkpoint.writes控制检查点文件何时更新。请参阅控制耐久性(Controlling Durability)。

配置示例:

queue.type: persisted
queue.max_bytes: 4gb

 

2.4.处理背压(Handing Back Pressure)

当队列已满时,Logstash会对输入施加压力,以阻止流入Logstash的数据。这种机制有助于Logstash控制输入阶段的数据流速率,而不会像Elasticsearch那样压倒性的输出。

使用queue.max_bytes设置配置磁盘上队列的总容量。以下示例将队列的总容量设置为8gb:

queue.type: persisted
queue.max_bytes: 8gb

通过指定这些设置,Logstash将缓冲磁盘上的事件,直到队列大小达到8gb。当队列中充满未记录事件且已达到大小限制时,Logstash将不再接受新事件。

每个输入独立处理背压。例如,当 beats输入遇到背压时,它不再接受新连接并等待持久队列有空间接受更多事件。在过滤器和输出阶段完成处理队列中的现有事件并确认它们之后,Logstash会自动开始接受新事件。

 

2.5.控制耐久性(Controlling Durability)

持久性是存储写入的属性,可确保数据在写入后可用。

启用持久队列功能后,Logstash会将事件存储在磁盘上。Logstash在名为checkpointing的机制中提交到磁盘。

为了讨论持久性,我们需要介绍一些有关如何实现持久队列的细节。

首先,队列本身是一组页面。有两种页面:头页和尾页。头页是写入新事件的地方。只有一个头页。当头页具有特定大小(请参考queue.page_capacity)时,它将成为尾页,并创建新的头页。尾页是不可变的,头页是仅附加的。其次,队列在称为检查点文件的单独文件中记录有关其自身的详细信息((pages, acknowledgements, etc)。

当记录检查点时,Logstash将会做以下事情:

  • 在头页上调用fsync。
  • 原子的(原子性、Atomically)写入磁盘队列的当前状态。

检查点(checkpointing)的过程是原子的(Atomically),这意味着如果成功,将保存对文件的任何更新。

如果Logstash终止,或者存在硬件级别故障,那么在持久队列中缓冲但尚未检查点的任何数据都将丢失。

通过设置queue.checkpoint.writes,您可以强制Logstash检查点更加频繁 。此设置指定在强制检查点之前可能写入磁盘的最大事件数。默认值为1024.要确保最大持久性并避免丢失持久队列中的数据,可以设置queue.checkpoint.writes: 1 在每个事件写入后强制检查点。请记住,磁盘写入会产生资源成本。将此值设置为1会严重影响性能。

 

2.6.磁盘垃圾收集

在磁盘上,队列存储为一组页(page),其中每个页是一个文件。每个页的最多有queue.page_capacity大小。在确认该页面中的所有事件之后删除页(垃圾回收)。如果旧页至少有一个尚未确认的事件,则整个页面将保留在磁盘上,直到该页面中的所有事件都成功处理完毕。包含未处理事件的每个页面将根据queue.max_bytes字节大小进行计数。

 

3.死信队列(Dead Letter Queues)


仅当elasticsearch输出支持死信队列功能 。此外,死信队列仅用于响应代码为400或404的情况,两者都表示无法重试的事件。在将来的Logstash插件版本中将提供对其他输出的支持。在配置Logstash以使用此功能之前,请参阅输出插件文档以验证插件是否支持死信队列功能。

 

默认情况下,当Logstash遇到由于数据包含映射错误或其他问题而无法处理的事件时,Logstash管道会挂起或丢弃不成功的事件。为了防止在这种情况下丢失数据,您可以配置Logstash将不成功的事件写入死信队列而不是丢弃它们。

写入死信队列的每个事件都包括原始事件,描述无法处理事件的原因的元数据,有关编写事件的插件的信息以及事件进入死信队列的时间戳。

要处理死信队列中的事件,只需创建一个Logstash管道配置,该配置使用 dead_letter_queue输入插件从队列中读取。

有关详细信息,请参阅处理死信队列中的事件。

 

3.1.配置Logstash以使用死信队列

默认情况下禁用死信队列。要启用死信队列,请dead_letter_queue_enablelogstash.yml 设置文件中设置该选项:

dead_letter_queue.enable: true

死信队列作为文件存储在Logstash实例的本地目录中。默认情况下,死信队列文件存储在path.data/dead_letter_queue。每个管道都有一个单独的队列。例如,默认情况下,main管道的 死信队列存储在LOGSTASH_HOME/data/dead_letter_queue/main。队列文件按顺序编号:1.log2.log等等。

您可以path.dead_letter_queuelogstash.yml文件中设置为文件指定不同的路径:

path.dead_letter_queue: "path/to/data/dead_letter_queue"

对于两个不同的logstash实例,不能使用相同的死信队列路径。

 

3.1.1.文件轮换(File Rotation)

死信队列具有内置的文件轮换策略,用于管理队列的文件大小。当文件大小达到预配置阈值时,将自动创建新文件。

默认情况下,每个死信队列的最大大小设置为1024mb。要更改此设置,请使用该dead_letter_queue.max_bytes选项。如果条目超过此设置会增加死信队列的大小,则会删除条目。

 

3.2.处理死信队列

当您准备好处理死信队列中的事件时,您将创建一个使用dead_letter_queue输入插件从死信队列中读取的管道 。您使用的管道配置当然取决于您需要做什么。例如,如果死信队列包含由Elasticsearch中的映射错误导致的事件,则可以创建读取“死(dead)”事件的管道,删除导致映射问题的字段,并将干净事件重新索引到Elasticsearch中。

以下示例显示了一个简单的管道,它从死信队列中读取事件,并将事件(包括元数据)写入标准输出:

input {dead_letter_queue {path => "/path/to/data/dead_letter_queue" commit_offsets => true pipeline_id => "main" }
}output {stdout {codec => rubydebug { metadata => true }}
}
  1. path:  包含死信队列的顶级目录的路径。此目录包含写入死信队列的每个管道的单独文件夹。要查找此目录的路径,请查看logstash.yml 设置文件。默认情况下,Logstash会在用于持久存储(path.data)的位置下创建dead_letter_queue 目录,例如LOGSTASH_HOME/data/dead_letter_queue。但是,如果path.dead_letter_queue设置,则使用该位置。
  2. commit_offsets: 设置true时,保存偏移量。当管道重新启动时,它将继续从它停止的位置读取,而不是重新处理队列中的所有项目。您可以设置commit_offsets为false在死信队列中探索事件并希望多次迭代事件。
  3. pipeline_id: 写入死信队列的管道的ID。默认是"main"

有关另一个示例,请参阅示例:处理具有映射错误的数据。

当管道完成处理死信队列中的所有事件时,它将继续运行并在流入队列时处理新事件。这意味着您无需停止生产系统来处理死信队列中的事件。

如果无法正确处理dead_letter_queue输入插件插件中发出的事件, 则不会将其重新提交到死信队列。

 

3.3.从时间戳读取

当您从死信队列中读取时,您可能不希望处理队列中的所有事件,尤其是在队列中存在大量旧事件的情况下。您可以使用start_timestamp选项开始在队列中的特定点处理事件 。此选项将管道配置为根据进入队列的时间戳开始处理事件:

input {dead_letter_queue {path => "/path/to/data/dead_letter_queue"start_timestamp => "2017-06-06T23:40:37"pipeline_id => "main"}
}

对于此示例,管道开始读取在2017年6月6日23:40:37或之后传递到死信队列的所有事件。

 

3.4.示例:处理具有映射错误的数据

在此示例中,用户尝试索引包含geo_ip数据的文档,但无法处理数据,因为它包含映射错误:

{"geoip":{"location":"home"}}

索引失败,因为Logstash输出插件在location字段中需要一个geo_point对象,但该值是字符串。失败的事件将写入死信队列,以及有关导致失败的错误的元数据:

{"@metadata" => {"dead_letter_queue" => {"entry_time" => #<Java::OrgLogstash::Timestamp:0x5b5dacd5>,"plugin_id" => "fb80f1925088497215b8d037e622dec5819b503e-4","plugin_type" => "elasticsearch","reason" => "Could not index event to Elasticsearch. status: 400, action: [\"index\", {:_id=>nil, :_index=>\"logstash-2017.06.22\", :_type=>\"doc\", :_routing=>nil}, 2017-06-22T01:29:29.804Z My-MacBook-Pro-2.local {\"geoip\":{\"location\":\"home\"}}], response: {\"index\"=>{\"_index\"=>\"logstash-2017.06.22\", \"_type\"=>\"doc\", \"_id\"=>\"AVzNayPze1iR9yDdI2MD\", \"status\"=>400, \"error\"=>{\"type\"=>\"mapper_parsing_exception\", \"reason\"=>\"failed to parse\", \"caused_by\"=>{\"type\"=>\"illegal_argument_exception\", \"reason\"=>\"illegal latitude value [266.30859375] for geoip.location\"}}}}"}},"@timestamp" => 2017-06-22T01:29:29.804Z,"@version" => "1","geoip" => {"location" => "home"},"host" => "My-MacBook-Pro-2.local","message" => "{\"geoip\":{\"location\":\"home\"}}"
}

要处理失败的事件,请创建以下管道,从死信队列中读取并删除映射问题:

input {dead_letter_queue {path => "/path/to/data/dead_letter_queue/" }
}
filter {mutate {remove_field => "[geoip][location]" }
}
output {elasticsearch{hosts => [ "localhost:9200" ] }
}
  1. path:  dead_letter_queue输入从死信队列中读取。
  2. remove_field:  mutate过滤器删除了名为location的问题字段。
  3. hosts:  clean事件被发送到Elasticsearch,可以将其被索引,因为映射问题已得到解决。

 

这篇关于Logstash【从无到有从有到无】【L10】数据弹性(Data Resiliency)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/288031

相关文章

SQL Server修改数据库名及物理数据文件名操作步骤

《SQLServer修改数据库名及物理数据文件名操作步骤》在SQLServer中重命名数据库是一个常见的操作,但需要确保用户具有足够的权限来执行此操作,:本文主要介绍SQLServer修改数据... 目录一、背景介绍二、操作步骤2.1 设置为单用户模式(断开连接)2.2 修改数据库名称2.3 查找逻辑文件名

canal实现mysql数据同步的详细过程

《canal实现mysql数据同步的详细过程》:本文主要介绍canal实现mysql数据同步的详细过程,本文通过实例图文相结合给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的... 目录1、canal下载2、mysql同步用户创建和授权3、canal admin安装和启动4、canal

使用SpringBoot整合Sharding Sphere实现数据脱敏的示例

《使用SpringBoot整合ShardingSphere实现数据脱敏的示例》ApacheShardingSphere数据脱敏模块,通过SQL拦截与改写实现敏感信息加密存储,解决手动处理繁琐及系统改... 目录痛点一:痛点二:脱敏配置Quick Start——Spring 显示配置:1.引入依赖2.创建脱敏

详解如何使用Python构建从数据到文档的自动化工作流

《详解如何使用Python构建从数据到文档的自动化工作流》这篇文章将通过真实工作场景拆解,为大家展示如何用Python构建自动化工作流,让工具代替人力完成这些数字苦力活,感兴趣的小伙伴可以跟随小编一起... 目录一、Excel处理:从数据搬运工到智能分析师二、PDF处理:文档工厂的智能生产线三、邮件自动化:

Python数据分析与可视化的全面指南(从数据清洗到图表呈现)

《Python数据分析与可视化的全面指南(从数据清洗到图表呈现)》Python是数据分析与可视化领域中最受欢迎的编程语言之一,凭借其丰富的库和工具,Python能够帮助我们快速处理、分析数据并生成高质... 目录一、数据采集与初步探索二、数据清洗的七种武器1. 缺失值处理策略2. 异常值检测与修正3. 数据

pandas实现数据concat拼接的示例代码

《pandas实现数据concat拼接的示例代码》pandas.concat用于合并DataFrame或Series,本文主要介绍了pandas实现数据concat拼接的示例代码,具有一定的参考价值,... 目录语法示例:使用pandas.concat合并数据默认的concat:参数axis=0,join=

C#代码实现解析WTGPS和BD数据

《C#代码实现解析WTGPS和BD数据》在现代的导航与定位应用中,准确解析GPS和北斗(BD)等卫星定位数据至关重要,本文将使用C#语言实现解析WTGPS和BD数据,需要的可以了解下... 目录一、代码结构概览1. 核心解析方法2. 位置信息解析3. 经纬度转换方法4. 日期和时间戳解析5. 辅助方法二、L

使用Python和Matplotlib实现可视化字体轮廓(从路径数据到矢量图形)

《使用Python和Matplotlib实现可视化字体轮廓(从路径数据到矢量图形)》字体设计和矢量图形处理是编程中一个有趣且实用的领域,通过Python的matplotlib库,我们可以轻松将字体轮廓... 目录背景知识字体轮廓的表示实现步骤1. 安装依赖库2. 准备数据3. 解析路径指令4. 绘制图形关键

解决mysql插入数据锁等待超时报错:Lock wait timeout exceeded;try restarting transaction

《解决mysql插入数据锁等待超时报错:Lockwaittimeoutexceeded;tryrestartingtransaction》:本文主要介绍解决mysql插入数据锁等待超时报... 目录报错信息解决办法1、数据库中执行如下sql2、再到 INNODB_TRX 事务表中查看总结报错信息Lock

使用C#删除Excel表格中的重复行数据的代码详解

《使用C#删除Excel表格中的重复行数据的代码详解》重复行是指在Excel表格中完全相同的多行数据,删除这些重复行至关重要,因为它们不仅会干扰数据分析,还可能导致错误的决策和结论,所以本文给大家介绍... 目录简介使用工具C# 删除Excel工作表中的重复行语法工作原理实现代码C# 删除指定Excel单元