Java技术栈 —— Spark入门(三)之实时视频流

2024-08-28 16:52

本文主要是介绍Java技术栈 —— Spark入门(三)之实时视频流,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

Java技术栈 —— Spark入门(三)之实时视频流转灰度图像

  • 一、将摄像头数据发送至kafka
  • 二、Kafka准备topic
  • 三、spark读取kafka图像数据并处理
  • 四、本地显示灰度图像(存在卡顿现象,待优化)

项目整体结构图如下

在这里插入图片描述

参考文章或视频链接
[1] Architecture-for-real-time-video-streaming-analytics

一、将摄像头数据发送至kafka

这个代码将运行在你有摄像头的机器上,缺依赖就装依赖

import cv2
import kafka
import numpy as np# 设置 Kafka Producer
# 注意修改你的kafka地址
producer = kafka.KafkaProducer(bootstrap_servers='localhost:9092')# 打开摄像头(0 为默认摄像头)
cap = cv2.VideoCapture(0)while True:# 从摄像头捕获帧ret, frame = cap.read()if not ret:break# 将图像编码为 JPEG 格式_, buffer = cv2.imencode('.jpg', frame)# 将图像作为字节数组发送到 Kafkaproducer.send('camera-images', buffer.tobytes())# 显示当前捕获的帧cv2.imshow('Video', frame)# 按 'q' 键退出if cv2.waitKey(1) & 0xFF == ord('q'):break# 释放资源
cap.release()
cv2.destroyAllWindows()
producer.close()

二、Kafka准备topic

在准备topic之前,要先配置kafka中的config/server.properties文件,否则其它机器无法联通kafka,配置好后重启kafka。

# 找到这两个选项并修改成如下内容
listeners=PLAINTEXT://0.0.0.0:9092
# 改成你的kafka所在服务器ip
advertised.listeners=PLAINTEXT://{your_ip}:9092

如果你之前创建过topic,那就清空这些topic中的数据


开始正式创建topic

# 创建输入图片所在topic
bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic camera-images --partitions 1 --replication-factor 1
# 创建输出的gray灰度图片所在topic
bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic result-gray-images --partitions 1 --replication-factor 1# 准备好后查看下topic list进行验证
bin/kafka-topics.sh --bootstrap-server localhost:9092 --list

三、spark读取kafka图像数据并处理

首先给你的spark脚本所运行的python环境(这个环境一般可以为conda等虚拟环境),安装必要的依赖库

pip install opencv-python-headless

准备脚本文件

from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import BinaryType
import cv2
import numpy as npbootstrapServers = "localhost:9092"# 创建 SparkSession
spark = SparkSession.builder \.appName("Kafka-Spark-OpenCV") \.getOrCreate()# 初始化 Kafka Producer,用于发送处理后的图像
# 如果不这样做,会出现PicklingError,因为如果UDF中,包含了无法被序列化的对象,例如线程锁(_thread.RLock)或 Kafka 的 KafkaProducer 实例,序列化就会失败。
# 因此,在每个执行器内部,创建 KafkaProducer 实例
producer = None# 从 Kafka 读取数据流
df = spark \.readStream \.format("kafka") \.option("kafka.bootstrap.servers", "localhost:9092") \.option("subscribe", "camera-images") \.load()# UDF 用于将图像转换为灰度
def convert_to_gray(image_bytes):global producer# 创建 KafkaProducer 实例(在每个执行器上只初始化一次)if producer is None:producer = KafkaProducer(bootstrap_servers = bootstrapServers)# 将字节数组转换为 numpy 数组nparr = np.frombuffer(image_bytes, np.uint8)# 将 numpy 数组解码为图像img = cv2.imdecode(nparr, cv2.IMREAD_COLOR)# 将图像转换为灰度gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)# 将灰度图像编码为 JPEG_, buffer = cv2.imencode('.jpg', gray)# 将处理后的图像发送到 Kafka 'result-gray-images' 主题producer.send('result-gray-images', buffer.tobytes())return buffer.tobytes()# 注册 UDF
convert_to_gray_udf = udf(convert_to_gray, BinaryType())# 应用 UDF 对数据进行灰度化处理
gray_df = df.withColumn("gray_image", convert_to_gray_udf("value"))# 将处理后的数据写入文件或其他输出
query = gray_df.writeStream \.outputMode("append") \.format("console") \.start()# query = gray_df\
#     .writeStream \
#     .format('kafka') \
#     .outputMode('update') \
#     .option("kafka.bootstrap.servers", bootstrapServers) \
#     .option('checkpointLocation', '/spark/job-checkpoint') \
#     .option("topic", "result-gray-images") \
#     .start()query.awaitTermination()

spark-submit提交脚本文件:

# 1.提高内存
# 2.调整 Kafka 批次大小,减少单个批次的数据量,从而降低内存使用(这个步骤存疑)
/opt/spark-3.5.2-bin-hadoop3/bin/spark-submit \
--executor-memory 4g \
--driver-memory 4g \
--conf "spark.kafka.maxOffsetsPerTrigger=1000" \
--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.2,org.apache.kafka:kafka-clients:3.5.2 \
/opt/spark-3.5.2-bin-hadoop3/jobs/pyjobs/kafka_to_spark.py

四、本地显示灰度图像(存在卡顿现象,待优化)

import cv2
import numpy as np
from kafka import KafkaConsumer# 设置 Kafka Consumer
consumer = KafkaConsumer('result-gray-images',bootstrap_servers='{your_kafka_ip}:9092',# auto_offset_reset='earliest',auto_offset_reset='latest',enable_auto_commit=True,# group_id='image-display-group'
)# 从 Kafka 主题读取灰度图像并显示
for message in consumer:# print("reading gray image.... ")# 将消息转换为 numpy 数组nparr = np.frombuffer(message.value, np.uint8)# 解码为图像gray_img = cv2.imdecode(nparr, cv2.IMREAD_GRAYSCALE)# 显示灰度图像cv2.imshow('Gray Video', gray_img)if cv2.waitKey(1) & 0xFF == ord('q'):break# 释放资源
cv2.destroyAllWindows()
consumer.close()

这篇关于Java技术栈 —— Spark入门(三)之实时视频流的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1115343

相关文章

一文详解如何在idea中快速搭建一个Spring Boot项目

《一文详解如何在idea中快速搭建一个SpringBoot项目》IntelliJIDEA作为Java开发者的‌首选IDE‌,深度集成SpringBoot支持,可一键生成项目骨架、智能配置依赖,这篇文... 目录前言1、创建项目名称2、勾选需要的依赖3、在setting中检查maven4、编写数据源5、开启热

OpenCV实现实时颜色检测的示例

《OpenCV实现实时颜色检测的示例》本文主要介绍了OpenCV实现实时颜色检测的示例,通过HSV色彩空间转换和色调范围判断实现红黄绿蓝颜色检测,包含视频捕捉、区域标记、颜色分析等功能,具有一定的参考... 目录一、引言二、系统概述三、代码解析1. 导入库2. 颜色识别函数3. 主程序循环四、HSV色彩空间

Java对异常的认识与异常的处理小结

《Java对异常的认识与异常的处理小结》Java程序在运行时可能出现的错误或非正常情况称为异常,下面给大家介绍Java对异常的认识与异常的处理,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参... 目录一、认识异常与异常类型。二、异常的处理三、总结 一、认识异常与异常类型。(1)简单定义-什么是

SpringBoot项目配置logback-spring.xml屏蔽特定路径的日志

《SpringBoot项目配置logback-spring.xml屏蔽特定路径的日志》在SpringBoot项目中,使用logback-spring.xml配置屏蔽特定路径的日志有两种常用方式,文中的... 目录方案一:基础配置(直接关闭目标路径日志)方案二:结合 Spring Profile 按环境屏蔽关

Java使用HttpClient实现图片下载与本地保存功能

《Java使用HttpClient实现图片下载与本地保存功能》在当今数字化时代,网络资源的获取与处理已成为软件开发中的常见需求,其中,图片作为网络上最常见的资源之一,其下载与保存功能在许多应用场景中都... 目录引言一、Apache HttpClient简介二、技术栈与环境准备三、实现图片下载与保存功能1.

SpringBoot排查和解决JSON解析错误(400 Bad Request)的方法

《SpringBoot排查和解决JSON解析错误(400BadRequest)的方法》在开发SpringBootRESTfulAPI时,客户端与服务端的数据交互通常使用JSON格式,然而,JSON... 目录问题背景1. 问题描述2. 错误分析解决方案1. 手动重新输入jsON2. 使用工具清理JSON3.

java中long的一些常见用法

《java中long的一些常见用法》在Java中,long是一种基本数据类型,用于表示长整型数值,接下来通过本文给大家介绍java中long的一些常见用法,感兴趣的朋友一起看看吧... 在Java中,long是一种基本数据类型,用于表示长整型数值。它的取值范围比int更大,从-922337203685477

java Long 与long之间的转换流程

《javaLong与long之间的转换流程》Long类提供了一些方法,用于在long和其他数据类型(如String)之间进行转换,本文将详细介绍如何在Java中实现Long和long之间的转换,感... 目录概述流程步骤1:将long转换为Long对象步骤2:将Longhttp://www.cppcns.c

SpringBoot集成LiteFlow实现轻量级工作流引擎的详细过程

《SpringBoot集成LiteFlow实现轻量级工作流引擎的详细过程》LiteFlow是一款专注于逻辑驱动流程编排的轻量级框架,它以组件化方式快速构建和执行业务流程,有效解耦复杂业务逻辑,下面给大... 目录一、基础概念1.1 组件(Component)1.2 规则(Rule)1.3 上下文(Conte

SpringBoot服务获取Pod当前IP的两种方案

《SpringBoot服务获取Pod当前IP的两种方案》在Kubernetes集群中,SpringBoot服务获取Pod当前IP的方案主要有两种,通过环境变量注入或通过Java代码动态获取网络接口IP... 目录方案一:通过 Kubernetes Downward API 注入环境变量原理步骤方案二:通过