go与kafka(github.com/Shopify/sarama)

2023-12-21 00:08
文章标签 go com kafka github shopify sarama

本文主要是介绍go与kafka(github.com/Shopify/sarama),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

启动kafka

分别命令行启动zookeeper和kafka

.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties
.\bin\windows\kafka-server-start.bat .\config\server.properties

 producer

import ("fmt""github.com/Shopify/sarama"
)func main() {config := sarama.NewConfig()config.Producer.RequiredAcks = sarama.WaitForAll  //赋值为-1:这意味着producer在follower副本确认接收到数据后才算一次发送完成。config.Producer.Partitioner = sarama.NewRandomPartitioner  //写到随机分区中,默认设置8个分区config.Producer.Return.Successes = truemsg := &sarama.ProducerMessage{}msg.Topic = `nginx_log`msg.Value = sarama.StringEncoder("this is a good test")client,err := sarama.NewSyncProducer([]string{"127.0.0.1:9092"},config)if err != nil{fmt.Println("producer close err, ",err)return}defer client.Close()pid, offset, err := client.SendMessage(msg)if err != nil{fmt.Println("send message failed, ", err)return}fmt.Printf("分区ID:%v, offset:%v \n", pid, offset)}

consumer

func main() {var wg sync.WaitGroupconsumer, err := sarama.NewConsumer([]string{"127.0.0.1:9092"}, nil)if err != nil{fmt.Println("Failed to start consumer: %s", err)return}partitionList, err := consumer.Partitions("nginx_log")  //获得该topic所有的分区if err != nil{fmt.Println("Failed to get the list of partition:, ",err)return}fmt.Println(partitionList)for partition := range partitionList{pc, err := consumer.ConsumePartition("nginx_log", int32(partition), sarama.OffsetNewest)if err != nil{fmt.Println("Failed to start consumer for partition %d: %s\n", partition, err)return}wg.Add(1)go func(sarama.PartitionConsumer) { //为每个分区开一个go协程去取值for msg := range pc.Messages(){  //阻塞直到有值发送过来,然后再继续等待fmt.Printf("Partition:%d, Offset:%d, key:%s, value:%s\n", msg.Partition, msg.Offset, string(msg.Key),string(msg.Value))}defer pc.AsyncClose()wg.Done()}(pc)}wg.Wait()consumer.Close()
}

 

这篇关于go与kafka(github.com/Shopify/sarama)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/518022

相关文章

go动态限制并发数量的实现示例

《go动态限制并发数量的实现示例》本文主要介绍了Go并发控制方法,通过带缓冲通道和第三方库实现并发数量限制,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面... 目录带有缓冲大小的通道使用第三方库其他控制并发的方法因为go从语言层面支持并发,所以面试百分百会问到

Go语言并发之通知退出机制的实现

《Go语言并发之通知退出机制的实现》本文主要介绍了Go语言并发之通知退出机制的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧... 目录1、通知退出机制1.1 进程/main函数退出1.2 通过channel退出1.3 通过cont

Go语言编译环境设置教程

《Go语言编译环境设置教程》Go语言支持高并发(goroutine)、自动垃圾回收,编译为跨平台二进制文件,云原生兼容且社区活跃,开发便捷,内置测试与vet工具辅助检测错误,依赖模块化管理,提升开发效... 目录Go语言优势下载 Go  配置编译环境配置 GOPROXYIDE 设置(VS Code)一些基本

使用Go实现文件复制的完整流程

《使用Go实现文件复制的完整流程》本案例将实现一个实用的文件操作工具:将一个文件的内容完整复制到另一个文件中,这是文件处理中的常见任务,比如配置文件备份、日志迁移、用户上传文件转存等,文中通过代码示例... 目录案例说明涉及China编程知识点示例代码代码解析示例运行练习扩展小结案例说明我们将通过标准库 os

深入理解Go语言中二维切片的使用

《深入理解Go语言中二维切片的使用》本文深入讲解了Go语言中二维切片的概念与应用,用于表示矩阵、表格等二维数据结构,文中通过示例代码介绍的非常详细,需要的朋友们下面随着小编来一起学习学习吧... 目录引言二维切片的基本概念定义创建二维切片二维切片的操作访问元素修改元素遍历二维切片二维切片的动态调整追加行动态

go中的时间处理过程

《go中的时间处理过程》:本文主要介绍go中的时间处理过程,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录1 获取当前时间2 获取当前时间戳3 获取当前时间的字符串格式4 相互转化4.1 时间戳转时间字符串 (int64 > string)4.2 时间字符串转时间

Go语言中make和new的区别及说明

《Go语言中make和new的区别及说明》:本文主要介绍Go语言中make和new的区别及说明,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录1 概述2 new 函数2.1 功能2.2 语法2.3 初始化案例3 make 函数3.1 功能3.2 语法3.3 初始化

Go语言中nil判断的注意事项(最新推荐)

《Go语言中nil判断的注意事项(最新推荐)》本文给大家介绍Go语言中nil判断的注意事项,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧... 目录1.接口变量的特殊行为2.nil的合法类型3.nil值的实用行为4.自定义类型与nil5.反射判断nil6.函数返回的

Go语言数据库编程GORM 的基本使用详解

《Go语言数据库编程GORM的基本使用详解》GORM是Go语言流行的ORM框架,封装database/sql,支持自动迁移、关联、事务等,提供CRUD、条件查询、钩子函数、日志等功能,简化数据库操作... 目录一、安装与初始化1. 安装 GORM 及数据库驱动2. 建立数据库连接二、定义模型结构体三、自动迁

Go语言代码格式化的技巧分享

《Go语言代码格式化的技巧分享》在Go语言的开发过程中,代码格式化是一个看似细微却至关重要的环节,良好的代码格式化不仅能提升代码的可读性,还能促进团队协作,减少因代码风格差异引发的问题,Go在代码格式... 目录一、Go 语言代码格式化的重要性二、Go 语言代码格式化工具:gofmt 与 go fmt(一)