go与kafka(github.com/Shopify/sarama)

2023-12-21 00:08
文章标签 go com kafka github shopify sarama

本文主要是介绍go与kafka(github.com/Shopify/sarama),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

启动kafka

分别命令行启动zookeeper和kafka

.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties
.\bin\windows\kafka-server-start.bat .\config\server.properties

 producer

import ("fmt""github.com/Shopify/sarama"
)func main() {config := sarama.NewConfig()config.Producer.RequiredAcks = sarama.WaitForAll  //赋值为-1:这意味着producer在follower副本确认接收到数据后才算一次发送完成。config.Producer.Partitioner = sarama.NewRandomPartitioner  //写到随机分区中,默认设置8个分区config.Producer.Return.Successes = truemsg := &sarama.ProducerMessage{}msg.Topic = `nginx_log`msg.Value = sarama.StringEncoder("this is a good test")client,err := sarama.NewSyncProducer([]string{"127.0.0.1:9092"},config)if err != nil{fmt.Println("producer close err, ",err)return}defer client.Close()pid, offset, err := client.SendMessage(msg)if err != nil{fmt.Println("send message failed, ", err)return}fmt.Printf("分区ID:%v, offset:%v \n", pid, offset)}

consumer

func main() {var wg sync.WaitGroupconsumer, err := sarama.NewConsumer([]string{"127.0.0.1:9092"}, nil)if err != nil{fmt.Println("Failed to start consumer: %s", err)return}partitionList, err := consumer.Partitions("nginx_log")  //获得该topic所有的分区if err != nil{fmt.Println("Failed to get the list of partition:, ",err)return}fmt.Println(partitionList)for partition := range partitionList{pc, err := consumer.ConsumePartition("nginx_log", int32(partition), sarama.OffsetNewest)if err != nil{fmt.Println("Failed to start consumer for partition %d: %s\n", partition, err)return}wg.Add(1)go func(sarama.PartitionConsumer) { //为每个分区开一个go协程去取值for msg := range pc.Messages(){  //阻塞直到有值发送过来,然后再继续等待fmt.Printf("Partition:%d, Offset:%d, key:%s, value:%s\n", msg.Partition, msg.Offset, string(msg.Key),string(msg.Value))}defer pc.AsyncClose()wg.Done()}(pc)}wg.Wait()consumer.Close()
}

 

这篇关于go与kafka(github.com/Shopify/sarama)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/518022

相关文章

Go学习记录之runtime包深入解析

《Go学习记录之runtime包深入解析》Go语言runtime包管理运行时环境,涵盖goroutine调度、内存分配、垃圾回收、类型信息等核心功能,:本文主要介绍Go学习记录之runtime包的... 目录前言:一、runtime包内容学习1、作用:① Goroutine和并发控制:② 垃圾回收:③ 栈和

Go语言中泄漏缓冲区的问题解决

《Go语言中泄漏缓冲区的问题解决》缓冲区是一种常见的数据结构,常被用于在不同的并发单元之间传递数据,然而,若缓冲区使用不当,就可能引发泄漏缓冲区问题,本文就来介绍一下问题的解决,感兴趣的可以了解一下... 目录引言泄漏缓冲区的基本概念代码示例:泄漏缓冲区的产生项目场景:Web 服务器中的请求缓冲场景描述代码

Go语言如何判断两张图片的相似度

《Go语言如何判断两张图片的相似度》这篇文章主要为大家详细介绍了Go语言如何中实现判断两张图片的相似度的两种方法,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 在介绍技术细节前,我们先来看看图片对比在哪些场景下可以用得到:图片去重:自动删除重复图片,为存储空间"瘦身"。想象你是一个

Go语言中Recover机制的使用

《Go语言中Recover机制的使用》Go语言的recover机制通过defer函数捕获panic,实现异常恢复与程序稳定性,具有一定的参考价值,感兴趣的可以了解一下... 目录引言Recover 的基本概念基本代码示例简单的 Recover 示例嵌套函数中的 Recover项目场景中的应用Web 服务器中

github打不开的问题分析及解决

《github打不开的问题分析及解决》:本文主要介绍github打不开的问题分析及解决,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、找到github.com域名解析的ip地址二、找到github.global.ssl.fastly.net网址解析的ip地址三

SpringBoot实现Kafka动态反序列化的完整代码

《SpringBoot实现Kafka动态反序列化的完整代码》在分布式系统中,Kafka作为高吞吐量的消息队列,常常需要处理来自不同主题(Topic)的异构数据,不同的业务场景可能要求对同一消费者组内的... 目录引言一、问题背景1.1 动态反序列化的需求1.2 常见问题二、动态反序列化的核心方案2.1 ht

Go语言中使用JWT进行身份验证的几种方式

《Go语言中使用JWT进行身份验证的几种方式》本文主要介绍了Go语言中使用JWT进行身份验证的几种方式,包括dgrijalva/jwt-go、golang-jwt/jwt、lestrrat-go/jw... 目录简介1. github.com/dgrijalva/jwt-go安装:使用示例:解释:2. gi

go rate 原生标准限速库的使用

《gorate原生标准限速库的使用》本文主要介绍了Go标准库golang.org/x/time/rate实现限流,采用令牌桶算法控制请求速率,提供Allow/Reserve/Wait方法,具有一定... 目录介绍安装API介绍rate.NewLimiter:创建限流器limiter.Allow():请求是否

Go 语言中的 Struct Tag 的用法详解

《Go语言中的StructTag的用法详解》在Go语言中,结构体字段标签(StructTag)是一种用于给字段添加元信息(metadata)的机制,常用于序列化(如JSON、XML)、ORM映... 目录一、结构体标签的基本语法二、json:"token"的具体含义三、常见的标签格式变体四、使用示例五、使用

Ubuntu上手动安装Go环境并解决“可执行文件格式错误”问题

《Ubuntu上手动安装Go环境并解决“可执行文件格式错误”问题》:本文主要介绍Ubuntu上手动安装Go环境并解决“可执行文件格式错误”问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未... 目录一、前言二、系统架构检测三、卸载旧版 Go四、下载并安装正确版本五、配置环境变量六、验证安装七、常见