go语言并发实战——日志收集系统(十一)基于etcd来监视配置文件的变化

本文主要是介绍go语言并发实战——日志收集系统(十一)基于etcd来监视配置文件的变化,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

前言

在我们实际生产中,我们常常因为新的项目或者新的功能进而要对配置文件进行修改,但是在生产环境下我们不是每次配置文件发生变化都重启一次系统,这无疑是不切实际的,所以我们需要对配置文件进行实时监控,而今天我们所要展示的也就是如何基于etcd来监控配置文件的变化。

etcd对配置项监控的流程

需求分析

首先我们来看我们日志收集服务的主要工作流程:

func main() {//读取配置文件,获取配置信息filename := "G:\\goproject\\-goroutine-\\log-agent\\conf\\config.ini"ConfigObj := new(Config)err := ini.MapTo(ConfigObj, filename)if err != nil {logrus.Error("%s Load failed,err:", filename, err)}//初始化Kafkaerr = Kafka.InitKafka(ConfigObj.Kafakaddress.Addr, ConfigObj.Kafakaddress.MessageSize)if err != nil {logrus.Error("InitKafka failed, err:%v", err)return}logrus.Infof("InitKafka success")//初始化etcderr = etcd.Init(ConfigObj.Etcdaddress.Addr)if err != nil {logrus.Error("InitEtcd failed, err:%v", err)return}logrus.Infof("InitEtcd success")//拉取要收集日志文件的配置项err, collectEntryList := etcd.GetConf(ConfigObj.Etcdaddress.Key)if err != nil {logrus.Error("GetConf failed, err:%v", err)return}fmt.Println(collectEntryList)//初始化tailerr = tailFile.InitTail(collectEntryList)if err != nil {logrus.Error("InitTail failed, err:%v", err)return}logrus.Infof("InitTail success")run()
}

在上述主要工作逻辑的基础上,现在我们需要etcd来实现对配置文件的实时监控,而这就需要我们在后态去运行一个监控程序来实时监控查看需要见监控的配置文件是否变化。并且将变化发送到tailFile模块中

实现Watch监控

所以这里我们对main.go进行一点简单的修改,添加一个后台程序 go etcd.WatchConf(ConfigObj.Etcdaddress.Key):

package mainimport ("fmt""github.com/Shopify/toxiproxy/Godeps/_workspace/src/github.com/Sirupsen/logrus""github.com/go-ini/ini""log-agent/Kafka""log-agent/etcd""log-agent/tailFile"
)type Config struct {Kafakaddress Kafkaddress `ini:"kafka"`LogFilePath  LogFilePath `ini:"collect"`Etcdaddress  EtcdAddress `ini:"etcd"`
}type Kafkaddress struct {Addr        []string `ini:"address"`Topic       string   `ini:"topic"`MessageSize int64    `ini:"chan_size"`
}type LogFilePath struct {Path string `ini:"logfile_path"`
}type EtcdAddress struct {Addr []string `ini:"address"`Key  string   `ini:"collect_key"`
}func run() {select {}
}func main() {//读取配置文件,获取配置信息filename := "G:\\goproject\\-goroutine-\\log-agent\\conf\\config.ini"ConfigObj := new(Config)err := ini.MapTo(ConfigObj, filename)if err != nil {logrus.Error("%s Load failed,err:", filename, err)}//初始化Kafkaerr = Kafka.InitKafka(ConfigObj.Kafakaddress.Addr, ConfigObj.Kafakaddress.MessageSize)if err != nil {logrus.Error("InitKafka failed, err:%v", err)return}logrus.Infof("InitKafka success")//初始化etcderr = etcd.Init(ConfigObj.Etcdaddress.Addr)if err != nil {logrus.Error("InitEtcd failed, err:%v", err)return}logrus.Infof("InitEtcd success")//拉取要收集日志文件的配置项err, collectEntryList := etcd.GetConf(ConfigObj.Etcdaddress.Key)if err != nil {logrus.Error("GetConf failed, err:%v", err)return}fmt.Println(collectEntryList)go etcd.WatchConf(ConfigObj.Etcdaddress.Key)//初始化tailerr = tailFile.InitTail(collectEntryList)if err != nil {logrus.Error("InitTail failed, err:%v", err)return}logrus.Infof("InitTail success")run()
}

我们在来看这个函数的具体逻辑:

func WatchConf(key string) {rch := client.Watch(context.Background(), key)var newConf []common.CollectEntryfor wresp := range rch {logrus.Infof("get new conf fromn etcd")for _, ev := range wresp.Events {fmt.Printf("%s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value)err := json.Unmarshal(ev.Kv.Value, &newConf)if err != nil {logrus.Error("json unmarshal failed,err:%v", err)continue}tailFile.SendNewConf(newConf)}}
}

与之前有关etcd的文章中的操作例子不同,这里我们并没有定义上下文,主要是因为这里我们不确定什么时候终止这个程序,所以不使用上下文了。

发送新配置到tailFile中

在上面我们已经完成etcd的监控,现在我们需要把新的配置消息发送到tailFile,这里我们第一反应是写一个死循环一直独缺,但是这样其实不大方便,毕竟储蓄一直运行会占掉大量不必要消耗的资源,这里我们可以让双方使用管道来进行通信,平时管道处于阻塞状态,只有监测到新配置才会进行通信,这样会使资源得到最大化的利用,我们来看一看具体的代码实现:

  • 首先我们来定义一下用于通信的管道
var (confchan chan []common.CollectEntry
)
  • 然后我们要对管道进行初始化,并且读取管道中新的配置信息:
confchan = make(chan []common.CollectEntry)newConf := <-confchanlogrus.Infof("get newconf from etcd", newConf)

最后,由于我们这里管道只用于etcd模块与tailFile模块之间的通信,所以这里我们就不暴露管道,而是选择暴露函数:

func SendNewConf(newConf []common.CollectEntry) {confchan <- newConf
}

结语

最后附上上述变化模块的代码:

  • main.go
package mainimport ("fmt""github.com/Shopify/toxiproxy/Godeps/_workspace/src/github.com/Sirupsen/logrus""github.com/go-ini/ini""log-agent/Kafka""log-agent/etcd""log-agent/tailFile"
)type Config struct {Kafakaddress Kafkaddress `ini:"kafka"`LogFilePath  LogFilePath `ini:"collect"`Etcdaddress  EtcdAddress `ini:"etcd"`
}type Kafkaddress struct {Addr        []string `ini:"address"`Topic       string   `ini:"topic"`MessageSize int64    `ini:"chan_size"`
}type LogFilePath struct {Path string `ini:"logfile_path"`
}type EtcdAddress struct {Addr []string `ini:"address"`Key  string   `ini:"collect_key"`
}func run() {select {}
}func main() {//读取配置文件,获取配置信息filename := "G:\\goproject\\-goroutine-\\log-agent\\conf\\config.ini"ConfigObj := new(Config)err := ini.MapTo(ConfigObj, filename)if err != nil {logrus.Error("%s Load failed,err:", filename, err)}//初始化Kafkaerr = Kafka.InitKafka(ConfigObj.Kafakaddress.Addr, ConfigObj.Kafakaddress.MessageSize)if err != nil {logrus.Error("InitKafka failed, err:%v", err)return}logrus.Infof("InitKafka success")//初始化etcderr = etcd.Init(ConfigObj.Etcdaddress.Addr)if err != nil {logrus.Error("InitEtcd failed, err:%v", err)return}logrus.Infof("InitEtcd success")//拉取要收集日志文件的配置项err, collectEntryList := etcd.GetConf(ConfigObj.Etcdaddress.Key)if err != nil {logrus.Error("GetConf failed, err:%v", err)return}fmt.Println(collectEntryList)go etcd.WatchConf(ConfigObj.Etcdaddress.Key)//初始化tailerr = tailFile.InitTail(collectEntryList)if err != nil {logrus.Error("InitTail failed, err:%v", err)return}logrus.Infof("InitTail success")run()
}
  • etcd.go
package etcdimport ("encoding/json""fmt""github.com/Shopify/toxiproxy/Godeps/_workspace/src/github.com/Sirupsen/logrus"clientv3 "go.etcd.io/etcd/client/v3""golang.org/x/net/context""log-agent/common""log-agent/tailFile""time"
)var client *clientv3.Clientfunc Init(address []string) (err error) {client, err = clientv3.New(clientv3.Config{Endpoints:   address,DialTimeout: 5 * time.Second,})if err != nil {logrus.Error("etcd client connect failed,err:%v", err)return}return
}func GetConf(key string) (err error, collectEntryList []common.CollectEntry) {ctx, cancel := context.WithTimeout(context.Background(), time.Second)response, err := client.Get(ctx, key)cancel()if err != nil {logrus.Error("get conf from etcd failed,err:%v", err)return}if len(response.Kvs) == 0 {logrus.Warningf("get len:0 conf from etcd failed,err:%v", err)return}fmt.Println(response.Kvs[0].Value)                             //此时还是json字符串err = json.Unmarshal(response.Kvs[0].Value, &collectEntryList) //把值反序列化到collectEntryListif err != nil {logrus.Error("json unmarshal failed,err:%v", err)return}return
}func WatchConf(key string) {rch := client.Watch(context.Background(), key)var newConf []common.CollectEntryfor wresp := range rch {logrus.Infof("get new conf fromn etcd")for _, ev := range wresp.Events {fmt.Printf("%s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value)err := json.Unmarshal(ev.Kv.Value, &newConf)if err != nil {logrus.Error("json unmarshal failed,err:%v", err)continue}tailFile.SendNewConf(newConf)}}
}
  • tailFile.go
package tailFileimport ("github.com/Shopify/sarama""github.com/Shopify/toxiproxy/Godeps/_workspace/src/github.com/Sirupsen/logrus""github.com/hpcloud/tail""log-agent/Kafka""log-agent/common""strings""time"
)type tailTask struct {path    stringtopic   stringTailObj *tail.Tail
}var (confchan chan []common.CollectEntry
)func NewTailTask(path, topic string) (tt *tailTask) {tt = &tailTask{path:  path,topic: topic,}return tt
}func (task *tailTask) Init() (err error) {config := tail.Config{Follow:    true,ReOpen:    true,MustExist: true,Poll:      true,Location:  &tail.SeekInfo{Offset: 0, Whence: 2},}task.TailObj, err = tail.TailFile(task.path, config)if err != nil {logrus.Error("tail create tailObj for path:%s,err:%v", task.path, err)return}return
}func InitTail(collectEntryList []common.CollectEntry) (err error) {for _, entry := range collectEntryList {tt := NewTailTask(entry.Path, entry.Topic)err = tt.Init()if err != nil {logrus.Error("tail create tailObj for path:%s,err:%v", entry.Path, err)continue}go tt.run()}//初始化新配置的管道confchan = make(chan []common.CollectEntry)newConf := <-confchanlogrus.Infof("get newconf from etcd", newConf)return
}func (t *tailTask) run() {for {line, ok := <-t.TailObj.Linesif !ok {logrus.Warn("tailFile.TailObj.Lines channel closed,path:%s\n", t.path)time.Sleep(2 * time.Second)continue}if len(strings.Trim(line.Text, "\r")) == 0 {continue}msg := &sarama.ProducerMessage{}msg.Topic = t.topicmsg.Value = sarama.StringEncoder(line.Text)Kafka.MesChan(msg)}
}func SendNewConf(newConf []common.CollectEntry) {confchan <- newConf
}

这篇关于go语言并发实战——日志收集系统(十一)基于etcd来监视配置文件的变化的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/967123

相关文章

SpringBoot中四种AOP实战应用场景及代码实现

《SpringBoot中四种AOP实战应用场景及代码实现》面向切面编程(AOP)是Spring框架的核心功能之一,它通过预编译和运行期动态代理实现程序功能的统一维护,在SpringBoot应用中,AO... 目录引言场景一:日志记录与性能监控业务需求实现方案使用示例扩展:MDC实现请求跟踪场景二:权限控制与

Windows系统宽带限制如何解除?

《Windows系统宽带限制如何解除?》有不少用户反映电脑网速慢得情况,可能是宽带速度被限制的原因,只需解除限制即可,具体该如何操作呢?本文就跟大家一起来看看Windows系统解除网络限制的操作方法吧... 有不少用户反映电脑网速慢得情况,可能是宽带速度被限制的原因,只需解除限制即可,具体该如何操作呢?本文

CentOS和Ubuntu系统使用shell脚本创建用户和设置密码

《CentOS和Ubuntu系统使用shell脚本创建用户和设置密码》在Linux系统中,你可以使用useradd命令来创建新用户,使用echo和chpasswd命令来设置密码,本文写了一个shell... 在linux系统中,你可以使用useradd命令来创建新用户,使用echo和chpasswd命令来设

电脑找不到mfc90u.dll文件怎么办? 系统报错mfc90u.dll丢失修复的5种方案

《电脑找不到mfc90u.dll文件怎么办?系统报错mfc90u.dll丢失修复的5种方案》在我们日常使用电脑的过程中,可能会遇到一些软件或系统错误,其中之一就是mfc90u.dll丢失,那么,mf... 在大部分情况下出现我们运行或安装软件,游戏出现提示丢失某些DLL文件或OCX文件的原因可能是原始安装包

电脑显示mfc100u.dll丢失怎么办?系统报错mfc90u.dll丢失5种修复方案

《电脑显示mfc100u.dll丢失怎么办?系统报错mfc90u.dll丢失5种修复方案》最近有不少兄弟反映,电脑突然弹出“mfc100u.dll已加载,但找不到入口点”的错误提示,导致一些程序无法正... 在计算机使用过程中,我们经常会遇到一些错误提示,其中最常见的就是“找不到指定的模块”或“缺少某个DL

Java并发编程之如何优雅关闭钩子Shutdown Hook

《Java并发编程之如何优雅关闭钩子ShutdownHook》这篇文章主要为大家详细介绍了Java如何实现优雅关闭钩子ShutdownHook,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起... 目录关闭钩子简介关闭钩子应用场景数据库连接实战演示使用关闭钩子的注意事项开源框架中的关闭钩子机制1.

SpringBoot中配置文件的加载顺序解读

《SpringBoot中配置文件的加载顺序解读》:本文主要介绍SpringBoot中配置文件的加载顺序,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录SpringBoot配置文件的加载顺序1、命令⾏参数2、Java系统属性3、操作系统环境变量5、项目【外部】的ap

C语言中位操作的实际应用举例

《C语言中位操作的实际应用举例》:本文主要介绍C语言中位操作的实际应用,总结了位操作的使用场景,并指出了需要注意的问题,如可读性、平台依赖性和溢出风险,文中通过代码介绍的非常详细,需要的朋友可以参... 目录1. 嵌入式系统与硬件寄存器操作2. 网络协议解析3. 图像处理与颜色编码4. 高效处理布尔标志集合

Go语言开发实现查询IP信息的MCP服务器

《Go语言开发实现查询IP信息的MCP服务器》随着MCP的快速普及和广泛应用,MCP服务器也层出不穷,本文将详细介绍如何在Go语言中使用go-mcp库来开发一个查询IP信息的MCP... 目录前言mcp-ip-geo 服务器目录结构说明查询 IP 信息功能实现工具实现工具管理查询单个 IP 信息工具的实现服

Spring Boot读取配置文件的五种方式小结

《SpringBoot读取配置文件的五种方式小结》SpringBoot提供了灵活多样的方式来读取配置文件,这篇文章为大家介绍了5种常见的读取方式,文中的示例代码简洁易懂,大家可以根据自己的需要进... 目录1. 配置文件位置与加载顺序2. 读取配置文件的方式汇总方式一:使用 @Value 注解读取配置方式二