elasticsearch 使用 Logstash 做数据采集

2024-08-22 16:08

本文主要是介绍elasticsearch 使用 Logstash 做数据采集,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

1,下载
下载地址(根据自己需要的版本下载):
https://www.elastic.co/cn/downloads/logstash

我这里是使用的6.2.1版本,直接下载就可以了

wget https://artifacts.elastic.co/downloads/logstash/logstash-6.2.1.tar.gz

2,解压

tar -zxvf logstash-6.2.1.tar.gz

将解压后的目录移动到/usr/local/目录下

mv logstash-6.2.1 /usr/local/
cd /usr/local/logstash-6.2.1/

3,安装 logstash 所需依赖 ruby 和 rubygems(注意:需要 ruby 的版本在 1.8.7 以上)

yum install -y ruby rubygems

检查 ruby 版本

ruby -v

输出如下,表示安装成功
在这里插入图片描述
4,安装 logstash-input-jdbc

cd /usr/local/logstash-6.2.1/
./bin/logstash-plugin install --no-verify  logstash-input-jdbc

5,编写配置文件
我这里的配置文件主要是2个配置文件,mysql同步表文件(mysql.conf)和索引库映射文件(question_template.json),都放在 logstash 的 config 配置文件下
1,mysql.conf

input {stdin {}jdbc {jdbc_connection_string => "jdbc:mysql://192.168.1.1:3306/java_interview_dev?characterEncoding=utf-8&autoReconnect=true&zeroDateTimeBehavior=convertToNull&useSSL=false&serverTimezone=Asia/Shanghai"# the user we wish to excute our statement asjdbc_user => "root"jdbc_password => "123456"# the path to our downloaded jdbc driverjdbc_driver_library => "/usr/local/logstash-6.2.1/lib/mysql-connector-java-8.0.16.jar"# the name of the driver class for mysqljdbc_driver_class => "com.mysql.cj.jdbc.Driver"jdbc_paging_enabled => "true"jdbc_page_size => "50000"#要执行的sql文件#statement_filepath => "/conf/course.sql"statement => "SELECT question_id, title, answer, type_ids, DATE_FORMAT(create_time, '%Y-%m-%d %H:%i:%S') AS create_time FROM question WHERE `timestamp` > DATE_ADD(:sql_last_value,INTERVAL 8 HOUR)"#定时配置schedule => "* * * * *"record_last_run => true#记录最后采集时间点,保存到logstash_metadata文件中last_run_metadata_path => "/usr/local/logstash-6.2.1/config/logstash_metadata"}
}output {elasticsearch {#ES的ip地址和端口hosts => "localhost:9200"#hosts => ["localhost:9200"]#ES索引库名称index => "question_dev"document_id => "%{question_id}"document_type => "doc"template =>"/usr/local/logstash-6.2.1/config/question_template.json"template_name =>"question_dev"template_overwrite =>"true"}stdout {#日志输出codec => json_lines}
}

2,question_template.json

{"mappings": {"doc": {"properties": {"question_id": {"type": "integer"},"title": {"type": "text","analyzer": "ik_max_word","search_analyzer": "ik_smart"},"answer": {"type": "text","analyzer": "ik_max_word","search_analyzer": "ik_smart"},"type_ids": {"type": "text"},"create_time": {"format": "yyyy-MM-dd HH:mm:ss","type": "date"}}}},"template": "question_dev"
}

6,运行

/usr/local/logstash-6.2.1/bin/logstash -f /usr/local/logstash-6.2.1/config/mysql.conf

使 logstash 一直保持在后台运行命令:

nohup /usr/local/logstash-6.2.1/bin/logstash -f /usr/local/logstash-6.2.1/config/mysql.conf 2>&1 &

运行前:
在这里插入图片描述
索引库里面没有一条数据,
运行后:
在这里插入图片描述
运行后我们发现,logstash 会根据 mysql.conf 里面的配置项 statement 执行的sql所查询到的数据全部录入到索引库,默认的 logstash 会每分钟执行一次,可以根据配置的 schedule 定时任务修改

到这里使用 logstash 做es数据采集的过程就已经全部完成了

备注:配置不太明白的可以看我附件上传的教学视频

https://download.csdn.net/download/u012946310/11827678

备注:如果需要同时对多个数据采集并且输出到不同的索引库,参考如下配置:

input {stdin {}#dev数据库问题索引jdbc {#采集类型,避免输出时混淆,使用此类型判断输出type => "dev_question"jdbc_connection_string => "jdbc:mysql://localhost:3306/cx_blockchain_dev?characterEncoding=utf-8&autoReconnect=true&zeroDateTimeBehavior=convertToNull&useSSL=false&serverTimezone=Asia/Shanghai"# the user we wish to excute our statement asjdbc_user => "root"jdbc_password => "cx123456789cx"# the path to our downloaded jdbc driverjdbc_driver_library => "/usr/local/logstash-6.2.1/lib/mysql-connector-java-8.0.16.jar"# the name of the driver class for mysqljdbc_driver_class => "com.mysql.cj.jdbc.Driver"jdbc_paging_enabled => "true"jdbc_page_size => "50000"#要执行的sql文件#statement_filepath => "/conf/course.sql"statement => "SELECT question_id, title, `desc`, label_code, answer_count, create_user_id, DATE_FORMAT(create_time, '%Y-%m-%d %H:%i:%S') AS create_time FROM question WHERE `timestamp` > DATE_ADD(:sql_last_value,INTERVAL 8 HOUR)"#定时配置#schedule => "*/10 * * * *"schedule => "* * * * *"record_last_run => true#记录最后采集时间点,保存到dev_question_run_log文件中last_run_metadata_path => "/usr/local/logstash-6.2.1/config/es-conf/dev_question_run_log"}#test数据库问题索引jdbc {#采集类型,避免输出时混淆,使用此类型判断输出type => "test_question"jdbc_connection_string => "jdbc:mysql://localhost:3306/cx_blockchain_test?characterEncoding=utf-8&autoReconnect=true&zeroDateTimeBehavior=convertToNull&useSSL=false&serverTimezone=Asia/Shanghai"# the user we wish to excute our statement asjdbc_user => "root"jdbc_password => "cx123456789cx"# the path to our downloaded jdbc driverjdbc_driver_library => "/usr/local/logstash-6.2.1/lib/mysql-connector-java-8.0.16.jar"# the name of the driver class for mysqljdbc_driver_class => "com.mysql.cj.jdbc.Driver"jdbc_paging_enabled => "true"jdbc_page_size => "50000"#要执行的sql文件#statement_filepath => "/conf/course.sql"statement => "SELECT question_id, title, `desc`, label_code, answer_count, create_user_id, DATE_FORMAT(create_time, '%Y-%m-%d %H:%i:%S') AS create_time FROM question WHERE `timestamp` > DATE_ADD(:sql_last_value,INTERVAL 8 HOUR)"#定时配置#schedule => "*/12 * * * *"schedule => "* * * * *"record_last_run => true#记录最后采集时间点,保存到test_question_run_log文件中last_run_metadata_path => "/usr/local/logstash-6.2.1/config/es-conf/test_question_run_log"}
}output {#dev_question索引输出if[type]=="dev_question"{elasticsearch {#ES的ip地址和端口hosts => "localhost:9200"#hosts => ["localhost:9200"]#ES索引库名称index => "dev_question"document_id => "%{question_id}"document_type => "doc"template =>"/usr/local/logstash-6.2.1/config/es-conf/question_template.json"template_name =>"question"template_overwrite =>"true"}stdout {#日志输出codec => json_lines}}#test_question索引输出if[type]=="test_question"{elasticsearch {#ES的ip地址和端口hosts => "localhost:9200"#hosts => ["localhost:9200"]#ES索引库名称index => "test_question"document_id => "%{question_id}"document_type => "doc"template =>"/usr/local/logstash-6.2.1/config/es-conf/question_template.json"template_name =>"question"template_overwrite =>"true"}stdout {#日志输出codec => json_lines}}
}

上面主要新增了一个 type 字段,并且在输出的时候判断 type 字段,以此来区分采集的数据输出到不同的索引库

这篇关于elasticsearch 使用 Logstash 做数据采集的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1096745

相关文章

Linux lvm实例之如何创建一个专用于MySQL数据存储的LVM卷组

《Linuxlvm实例之如何创建一个专用于MySQL数据存储的LVM卷组》:本文主要介绍使用Linux创建一个专用于MySQL数据存储的LVM卷组的实例,具有很好的参考价值,希望对大家有所帮助,... 目录在Centos 7上创建卷China编程组并配置mysql数据目录1. 检查现有磁盘2. 创建物理卷3. 创

MySQL 事务的概念及ACID属性和使用详解

《MySQL事务的概念及ACID属性和使用详解》MySQL通过多线程实现存储工作,因此在并发访问场景中,事务确保了数据操作的一致性和可靠性,下面通过本文给大家介绍MySQL事务的概念及ACID属性和... 目录一、什么是事务二、事务的属性及使用2.1 事务的 ACID 属性2.2 为什么存在事务2.3 事务

Nacos日志与Raft的数据清理指南

《Nacos日志与Raft的数据清理指南》随着运行时间的增长,Nacos的日志文件(logs/)和Raft持久化数据(data/protocol/raft/)可能会占用大量磁盘空间,影响系统稳定性,本... 目录引言1. Nacos 日志文件(logs/ 目录)清理1.1 日志文件的作用1.2 是否可以删除

使用Python实现网页表格转换为markdown

《使用Python实现网页表格转换为markdown》在日常工作中,我们经常需要从网页上复制表格数据,并将其转换成Markdown格式,本文将使用Python编写一个网页表格转Markdown工具,需... 在日常工作中,我们经常需要从网页上复制表格数据,并将其转换成Markdown格式,以便在文档、邮件或

Python使用pynput模拟实现键盘自动输入工具

《Python使用pynput模拟实现键盘自动输入工具》在日常办公和软件开发中,我们经常需要处理大量重复的文本输入工作,所以本文就来和大家介绍一款使用Python的PyQt5库结合pynput键盘控制... 目录概述:当自动化遇上可视化功能全景图核心功能矩阵技术栈深度效果展示使用教程四步操作指南核心代码解析

使用Python获取JS加载的数据的多种实现方法

《使用Python获取JS加载的数据的多种实现方法》在当今的互联网时代,网页数据的动态加载已经成为一种常见的技术手段,许多现代网站通过JavaScript(JS)动态加载内容,这使得传统的静态网页爬取... 目录引言一、动态 网页与js加载数据的原理二、python爬取JS加载数据的方法(一)分析网络请求1

SpringCloud使用Nacos 配置中心实现配置自动刷新功能使用

《SpringCloud使用Nacos配置中心实现配置自动刷新功能使用》SpringCloud项目中使用Nacos作为配置中心可以方便开发及运维人员随时查看配置信息,及配置共享,并且Nacos支持配... 目录前言一、Nacos中集中配置方式?二、使用步骤1.使用$Value 注解2.使用@Configur

Mac备忘录怎么导出/备份和云同步? Mac备忘录使用技巧

《Mac备忘录怎么导出/备份和云同步?Mac备忘录使用技巧》备忘录作为iOS里简单而又不可或缺的一个系统应用,上手容易,可以满足我们日常生活中各种记录的需求,今天我们就来看看Mac备忘录的导出、... 「备忘录」是 MAC 上的一款常用应用,它可以帮助我们捕捉灵感、记录待办事项或保存重要信息。为了便于在不同

如何Python使用设置word的页边距

《如何Python使用设置word的页边距》在编写或处理Word文档的过程中,页边距是一个不可忽视的排版要素,本文将介绍如何使用Python设置Word文档中各个节的页边距,需要的可以参考下... 目录操作步骤代码示例页边距单位说明应用场景与高级用China编程途小结在编写或处理Word文档的过程中,页边距是一个

SpringBoot项目Web拦截器使用的多种方式

《SpringBoot项目Web拦截器使用的多种方式》在SpringBoot应用中,Web拦截器(Interceptor)是一种用于在请求处理的不同阶段执行自定义逻辑的机制,下面给大家介绍Sprin... 目录一、实现 HandlerInterceptor 接口1、创建HandlerInterceptor实