大货车燃油盗窃案件管控应用详述

2023-12-21 05:58

本文主要是介绍大货车燃油盗窃案件管控应用详述,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

1、项目业务介绍

盗窃大货车燃油基本是深夜凌晨在高速服务区、出入口和路边等地作案。 由于夜间监控模糊,监控被大货车遮挡,同时嫌疑人故意绕开有监控的路 段,导致作案车辆监控抓拍线索非常少。因此,在侦查大货车燃油盗窃案 件时,常常会遇到作案车辆发现难,行车轨迹研判难这到两个难题。 


为此我们开发了几种找车的方法: 

1、 其次是模型找盗油车,我们根据民警积累的经验开发了一套计算模型,是一些车辆、人员的时空、外观特性信息,每天kafka监听后台大数据过车数据,通过过车数据和业务模型的条件匹配过滤出嫌疑车辆。再以 车辆线索和案件匹配,给用户提供了一种新的、高效的研判思路。

2、 接下来是人脸找相关车,结合用户经验,案件中有大量前科人员作案,这 里主要利用 rabbitMQ监听盗油前科等嫌疑人员人脸名单库布控报警,找出这些人 员出现的地点还有驾乘的车辆信息,为案件提供线索。

4、 最后是规律找生活车,盗油人员的生活规律一般是驾驶生活车辆到盗油车 存放地并驾驶盗油车作案,作案回到盗油车存放点后又驾驶生活车辆回到 居住地。利用该盗油车及生活交替出现的规律,找出盗油人员的生活车辆。通过http接口调用查询车辆轨迹,然后再利 用盗油车和生活车出行轨迹的规律进行分析研判,对明确嫌疑人的身份和 落脚区域起到很大帮助。 

2、项目架构


开发使用的技术栈:

spring boot 开发框架、consul注册中心,feign服务调用与项目其他组件通信,ribbon后端负载均衡,postgresql 数据库、kafka 和 rabbit MQ 消息监听,mybatis 数据接入层框架,redis 缓存,xxl-job定时任务,sharding-jdbc分库分表。 

部署架构:

Java Web做集群部署,最直接需要注意的应该就是缓存类的共享以及定时任务等的冲突。缓存共享比如session共享、项目中用到的内存缓存等,需要在所有集群内都可以共享到。然后就是一些只需要一个节点执行成功,其他节点不需要重复执行的业务,如定时任务等。

本应用方案为:nginx代理+前端负载均衡;部署两台服务器;kafka,rmq,redis,数据库也都是集群部署;定时任务使用xxl-job,其調度中心集群部署保证高可用。

3、项目亮点 

(1)与用户深度沟通,得出细致的业务模型。

(2)创造性的提出利用生活车和盗油车轨迹规律进行案件分析研判的思路。两种车轨迹交点很有可能为落脚点。

4、项目遇到的问题

4.1 kafka 0.10.2.0 消息堆积、重复消费

前提:部署了两台服务器。

问题出现现象:

1.其他订阅的消费者组均能够正常消费,只有本应用出现堆积

2.日志中发现,本应用的两台服务器都在消费

3.日志中还发现,搜索同一车牌号,有很多重复消费日志。

4.查看应用消费日志发现,该类有大量异常日志,是一个数据库异常,因为车牌+过车时间是唯一索引,因此在添加相同车牌时间数据的时候报错了。

5、kafka消费端日志报commitException:

08-09 11:01:11 131 pool-7-thread-3 ERROR [] - 
commit failed 
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:713) ~[MsgAgent-jar-with-dependencies.jar:na]at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:596) ~[MsgAgent-jar-with-dependencies.jar:na]at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1218) ~[MsgAgent-jar-with-dependencies.jar:na]at com.today.eventbus.common.MsgConsumer.run(MsgConsumer.java:121) ~[MsgAgent-jar-with-dependencies.jar:na]at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_161]at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_161]at java.lang.Thread.run(Thread.java:748) [na:1.8.0_161]

这个错误的意思是,消费者在处理完一批poll的消息后,在同步提交偏移量给broker时报的错。初步分析日志是由于当前消费者线程消费的分区已经被broker给回收了,因为kafka认为这个消费者死了,那么为什么呢?

查阅资料分析推测:

因为一个group只能有一台消费,两台出现消费是否因为出现连接出现问题,重新负载均衡了。

一条记录被循环消费,应该是因为本地提交消费位移失败,才回出现。

这里就涉及到问题是消费者在创建时会有一个属性max.poll.interval.ms,默认为300s,
该属性意思为kafka消费者在每一轮poll()调用之间的最大延迟,消费者在获取更多记录之前可以空闲的时间量的上限。如果此超时时间期满之前poll()没有被再次调用,则消费者被视为失败,并且分组将重新平衡,以便将分区重新分配给别的成员。

    public void start(KafkaConsumer consumer, String metadata) {try {while (true) {ConsumerRecords records = consumer.poll(Duration.ofSeconds(2));if (records != null && !records.isEmpty()) {log.info("---poll msg success from {} -----", metadata);process(records);consumer.commitAsync();} }} catch (Throwable e) {log.error("consumer exception", e);} finally {try {consumer.commitSync();} finally {consumer.close();}}}

kafka的偏移量(offset)是由消费者进行管理的,偏移量有两种,拉取偏移量(position)与提交偏移量(committed)。拉取偏移量代表当前消费者分区消费进度。每次消息消费后,需要提交偏移量。在提交偏移量时,kafka会使用拉取偏移量的值作为分区的提交偏移量发送给协调者。
如果没有提交偏移量,下一次消费者重新与broker连接后,会从当前消费者group已提交到broker的偏移量处开始消费。
所以,问题就在这里,当我们处理消息时间太长时,已经被broker剔除,提交偏移量又会报错。所以拉取偏移量没有提交到broker,分区又rebalance。下一次重新分配分区时,消费者会从最新的已提交偏移量处开始消费。这里就出现了重复消费的问题。

问题出现原因:

项目上线时max.poll.interval.ms的配置为300秒,max.poll.records是500

(1)这次问题出现的原因为随着项目推进和成熟,平台接入的抓拍机数量增多,项目初期抓拍机数量为220台,每天抓拍量200-250万之间,峰值抓拍数量为90条/s。条件判断加入库按0.2s计算,由于一批数据一般是500以内,耗时100秒,所以没出现问题。

(2)之后抓拍机数量扩充到5100台,每天抓拍量达到3500万,峰值抓拍数量为1400条/s。消费者线程处理能力赶不上消息的生产能力。消息就像滚雪球一样越来越多,出现消息堆积现象。因此修改max.poll.records为1500, 导致消息一次性poll()到的消息达到1500条,一批数据消费时间为300秒,从而可能会超出max.poll.interval.ms。导致消费者组重平衡,因此连接到另外一台消费服务器,然而另外一台服务器也出现超时,又进行Rebalance...如此循环,才出现了两台服务器都进行消费,并且一直重复消费。

解决办法:

(1)增大max.poll.interval.ms时间为600s,防范峰值流量。

(2) max.poll.records默认是500,减少该值也能解决这个问题,但是会造成消息堆积。

(3)批量入库,缩短单批次消息处理时间。

之前是每条记录走两次查询是否是被盗车和套牌车接口,入库一次

现在被盗车和套牌车信息放入redis缓存并定时更新,一批次消息统一入库一次。

(4)增加消费者线程数量

每台服务器开启2个消费者线程。一共两台服务器、4个消费者线程,同时增加两个分区使分区数量等于消费者数量。

(5)另外,发现平台的kafka broker和consumer client版本不一致也会导致性能下降:Producer、Consumer 和 Broker 的版本是相同的,它们之间的通信可以享受 Zero Copy 的快速通道;相反,一个低版本的 Consumer 程序想要与 Producer、Broker 交互的话,就只能依靠 JVM 堆中转一下,丢掉了快捷通道,就只能走慢速通道了。因此,在优化 Broker 这一层时,你只要保持服务器端和客户端版本的一致,就能获得很多性能收益了。

效果:

省去了绝大部分接口调用和数据库读写阻塞时间,处理一批记录耗时700-800ms,单个消费者平均每秒处理700条数据。3个消费者实例即能够抵御当前流量冲击。

流程:人脸/车辆抓拍数据获取流程、名单库布控流程

过车抓拍量统计:

(1)项目刚投入使用时:

(2)项目成熟时 

 

 

这篇关于大货车燃油盗窃案件管控应用详述的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/518857

相关文章

PHP应用中处理限流和API节流的最佳实践

《PHP应用中处理限流和API节流的最佳实践》限流和API节流对于确保Web应用程序的可靠性、安全性和可扩展性至关重要,本文将详细介绍PHP应用中处理限流和API节流的最佳实践,下面就来和小编一起学习... 目录限流的重要性在 php 中实施限流的最佳实践使用集中式存储进行状态管理(如 Redis)采用滑动

深入浅出Spring中的@Autowired自动注入的工作原理及实践应用

《深入浅出Spring中的@Autowired自动注入的工作原理及实践应用》在Spring框架的学习旅程中,@Autowired无疑是一个高频出现却又让初学者头疼的注解,它看似简单,却蕴含着Sprin... 目录深入浅出Spring中的@Autowired:自动注入的奥秘什么是依赖注入?@Autowired

PostgreSQL简介及实战应用

《PostgreSQL简介及实战应用》PostgreSQL是一种功能强大的开源关系型数据库管理系统,以其稳定性、高性能、扩展性和复杂查询能力在众多项目中得到广泛应用,本文将从基础概念讲起,逐步深入到高... 目录前言1. PostgreSQL基础1.1 PostgreSQL简介1.2 基础语法1.3 数据库

Python中的filter() 函数的工作原理及应用技巧

《Python中的filter()函数的工作原理及应用技巧》Python的filter()函数用于筛选序列元素,返回迭代器,适合函数式编程,相比列表推导式,内存更优,尤其适用于大数据集,结合lamb... 目录前言一、基本概念基本语法二、使用方式1. 使用 lambda 函数2. 使用普通函数3. 使用 N

Python中yield的用法和实际应用示例

《Python中yield的用法和实际应用示例》在Python中,yield关键字主要用于生成器函数(generatorfunctions)中,其目的是使函数能够像迭代器一样工作,即可以被遍历,但不会... 目录python中yield的用法详解一、引言二、yield的基本用法1、yield与生成器2、yi

Python多线程应用中的卡死问题优化方案指南

《Python多线程应用中的卡死问题优化方案指南》在利用Python语言开发某查询软件时,遇到了点击搜索按钮后软件卡死的问题,本文将简单分析一下出现的原因以及对应的优化方案,希望对大家有所帮助... 目录问题描述优化方案1. 网络请求优化2. 多线程架构优化3. 全局异常处理4. 配置管理优化优化效果1.

从基础到高阶详解Python多态实战应用指南

《从基础到高阶详解Python多态实战应用指南》这篇文章主要从基础到高阶为大家详细介绍Python中多态的相关应用与技巧,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录一、多态的本质:python的“鸭子类型”哲学二、多态的三大实战场景场景1:数据处理管道——统一处理不同数据格式

Java Stream 的 Collectors.toMap高级应用与最佳实践

《JavaStream的Collectors.toMap高级应用与最佳实践》文章讲解JavaStreamAPI中Collectors.toMap的使用,涵盖基础语法、键冲突处理、自定义Map... 目录一、基础用法回顾二、处理键冲突三、自定义 Map 实现类型四、处理 null 值五、复杂值类型转换六、处理

分布式锁在Spring Boot应用中的实现过程

《分布式锁在SpringBoot应用中的实现过程》文章介绍在SpringBoot中通过自定义Lock注解、LockAspect切面和RedisLockUtils工具类实现分布式锁,确保多实例并发操作... 目录Lock注解LockASPect切面RedisLockUtils工具类总结在现代微服务架构中,分布

Python标准库之数据压缩和存档的应用详解

《Python标准库之数据压缩和存档的应用详解》在数据处理与存储领域,压缩和存档是提升效率的关键技术,Python标准库提供了一套完整的工具链,下面小编就来和大家简单介绍一下吧... 目录一、核心模块架构与设计哲学二、关键模块深度解析1.tarfile:专业级归档工具2.zipfile:跨平台归档首选3.