python玩玩kafka

2024-04-17 08:08
文章标签 python kafka 玩玩

本文主要是介绍python玩玩kafka,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。

kafka里面的一些概念:

  • producer:生产者。

  • consumer:消费者。

  • topic: 消息以topic为类别记录,Kafka将消息种子(Feed)分门别类,每一类的消息称之为一个主题(Topic)。

  • broker:以集群的方式运行,可以由一个或多个服务组成,每个服务叫做一个broker;消费者可以订阅一个或多个主题(topic),并从Broker拉数据,从而消费这些已发布的消息。


kafka有四个核心API:producer API,consumer  API,streams API,connector API


kafka有什么用?

可它以有效的获取系统和应用程序之间的数据,对数据流进行转换或者反应。


关于kafka的下载安装就不过多介绍了,下面主要介绍的是使用python操作kafka。


首先安装kafka的模块:

pip install kafka


安装完我们就可以尝试着去跑个例子:

首先看看producer是怎么跑起来的:

from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers=['127.0.0.1:9092'])

for i in range(3):
    msg = "msg%d" % i
    producer.send('test', msg)
producer.close()

调用KafkaProducer指定server地址即可


类似的来看看consumer例子:

from kafka import KafkaConsumer

consumer = KafkaConsumer('test',
                         bootstrap_servers=['127.0.0.1:9092'])
                        
for message in consumer:
    print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
                                          message.offset, message.key,
                                          message.value))


对于consumer group(消费者群组),我们需要给一个群组id(用来区分单个消费者或是群组):

from kafka import KafkaConsumer

consumer = KafkaConsumer('test',
                         group_id='my-group',
                         bootstrap_servers=['127.0.0.1:9092'])
                        
for message in consumer:
    print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
                                          message.offset, message.key,
                                          message.value))


使用consumer订阅多个主题,需要使用subscribe方法,传入需要订阅的标题:

from kafka import KafkaConsumer
from kafka.structs import TopicPartition

consumer = KafkaConsumer(bootstrap_servers=['127.0.0.1:9092'])
consumer.subscribe(topics=('topic1','topic2','top3'))  #订阅要消费的主题
print consumer.topics()
print consumer.position(TopicPartition(topic=u'test', partition=0)) #获取当前主题的最新偏移量
for message in consumer:
    print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
                                          message.offset, message.key,
                                          message.value))


如果需要手动拉取信息,那我们需要加一个循环,在这个循环里监听,一直获取服务器信息:

from kafka import KafkaConsumer

consumer = KafkaConsumer(bootstrap_servers=['127.0.0.1:9092'])
consumer.subscribe(topics=('topic1','topic2','top3'))
while True:
    msg = consumer.poll(timeout_ms=5)   #从kafka获取消息
    print msg



如果想挂起consumer可以调用pause()方法,恢复调用resume()方法:

from kafka import KafkaConsumer
from kafka.structs import TopicPartition
import time

consumer = KafkaConsumer(bootstrap_servers=['127.0.0.1:9092'])
consumer.subscribe(topics=('topic1'))
consumer.topics()
consumer.pause(TopicPartition(topic=u'test', partition=0))
num = 0
while True:
    print num
    print consumer.paused()   #获取当前挂起的消费者
    msg = consumer.poll(timeout_ms=5)
    print msg
    time.sleep(2)
    num = num + 1
    if num == 10:
        consumer.resume(TopicPartition(topic=u'test', partition=0))
        print "resume......"




关于简单的操作就介绍到这里了,想了解更多:

https://pypi.org/project/kafka-python/




640?wx_fmt=gif

Pls follow It!!


这篇关于python玩玩kafka的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/911248

相关文章

基于Python开发Windows屏幕控制工具

《基于Python开发Windows屏幕控制工具》在数字化办公时代,屏幕管理已成为提升工作效率和保护眼睛健康的重要环节,本文将分享一个基于Python和PySide6开发的Windows屏幕控制工具,... 目录概述功能亮点界面展示实现步骤详解1. 环境准备2. 亮度控制模块3. 息屏功能实现4. 息屏时间

Python如何去除图片干扰代码示例

《Python如何去除图片干扰代码示例》图片降噪是一个广泛应用于图像处理的技术,可以提高图像质量和相关应用的效果,:本文主要介绍Python如何去除图片干扰的相关资料,文中通过代码介绍的非常详细,... 目录一、噪声去除1. 高斯噪声(像素值正态分布扰动)2. 椒盐噪声(随机黑白像素点)3. 复杂噪声(如伪

Python中图片与PDF识别文本(OCR)的全面指南

《Python中图片与PDF识别文本(OCR)的全面指南》在数据爆炸时代,80%的企业数据以非结构化形式存在,其中PDF和图像是最主要的载体,本文将深入探索Python中OCR技术如何将这些数字纸张转... 目录一、OCR技术核心原理二、python图像识别四大工具库1. Pytesseract - 经典O

基于Linux的ffmpeg python的关键帧抽取

《基于Linux的ffmpegpython的关键帧抽取》本文主要介绍了基于Linux的ffmpegpython的关键帧抽取,实现以按帧或时间间隔抽取关键帧,文中通过示例代码介绍的非常详细,对大家的学... 目录1.FFmpeg的环境配置1) 创建一个虚拟环境envjavascript2) ffmpeg-py

python使用库爬取m3u8文件的示例

《python使用库爬取m3u8文件的示例》本文主要介绍了python使用库爬取m3u8文件的示例,可以使用requests、m3u8、ffmpeg等库,实现获取、解析、下载视频片段并合并等步骤,具有... 目录一、准备工作二、获取m3u8文件内容三、解析m3u8文件四、下载视频片段五、合并视频片段六、错误

Python中提取文件名扩展名的多种方法实现

《Python中提取文件名扩展名的多种方法实现》在Python编程中,经常会遇到需要从文件名中提取扩展名的场景,Python提供了多种方法来实现这一功能,不同方法适用于不同的场景和需求,包括os.pa... 目录技术背景实现步骤方法一:使用os.path.splitext方法二:使用pathlib模块方法三

Python打印对象所有属性和值的方法小结

《Python打印对象所有属性和值的方法小结》在Python开发过程中,调试代码时经常需要查看对象的当前状态,也就是对象的所有属性和对应的值,然而,Python并没有像PHP的print_r那样直接提... 目录python中打印对象所有属性和值的方法实现步骤1. 使用vars()和pprint()2. 使

使用Python和OpenCV库实现实时颜色识别系统

《使用Python和OpenCV库实现实时颜色识别系统》:本文主要介绍使用Python和OpenCV库实现的实时颜色识别系统,这个系统能够通过摄像头捕捉视频流,并在视频中指定区域内识别主要颜色(红... 目录一、引言二、系统概述三、代码解析1. 导入库2. 颜色识别函数3. 主程序循环四、HSV色彩空间详解

一文深入详解Python的secrets模块

《一文深入详解Python的secrets模块》在构建涉及用户身份认证、权限管理、加密通信等系统时,开发者最不能忽视的一个问题就是“安全性”,Python在3.6版本中引入了专门面向安全用途的secr... 目录引言一、背景与动机:为什么需要 secrets 模块?二、secrets 模块的核心功能1. 基

python常见环境管理工具超全解析

《python常见环境管理工具超全解析》在Python开发中,管理多个项目及其依赖项通常是一个挑战,下面:本文主要介绍python常见环境管理工具的相关资料,文中通过代码介绍的非常详细,需要的朋友... 目录1. conda2. pip3. uvuv 工具自动创建和管理环境的特点4. setup.py5.