Java 如何使用aws的kinesis实现消费端,消费流中数据

2024-05-24 01:12

本文主要是介绍Java 如何使用aws的kinesis实现消费端,消费流中数据,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

1.前言

AWS 官网给了两种方式实现:

java 1.x

java 2.x 

这两种方式,包是不一样的,1.x是com.amazonaws,2是software.amazon.kinesis

使用也是天差地别,而且国内对kinesis这个资料简直少的可怜,这也就增加了开发难度,

2.什么是kinesis

我说一下kinesis是啥吧,其实和咱们队列很像,服务端的数据,需要客户端监听消费,拿到数据解析之后怎么处理就是自己的事情啦,我主要的业务就是实现流中的数据,流中的数据都是url等相关信息,主要是点击链接就消费,所以可以实现点击量的处理等等.

maven包java2.x:Maven Central: software.amazon.kinesis:amazon-kinesis-client

kinesis怎么使用的介绍

地址:在 Java 中开发 Kinesis Client Library 消费端 - Amazon Kinesis Data Streams

3.开始前的准备

代码不难,难的是没有相关的资源资料去实现,所以我这次实现代码主要靠AI,它实现了代码其实也不准,但是确实是给了我灵感,一遍一遍让AI生成代码,一遍一遍试错,调试,最后终于成功!

在写代码之前我们需要一些配置:

1.应用名称,这个自己起个名字就行

2.流名,AWS关于kinesis控制台有,可以去拿

3.区域,AWS的区域

3.aws凭证密钥和key

4.代码

首先,我们需要启动监听,配置aws凭证,区域啊,workerid等,最后启动worker线程使其能够监听,

下面我在main方法中启动监听的演示代码,也可以多线程哦

public static void main(String[] args) throws UnknownHostException, ParseException {// 硬编码的AWS凭证String awsAccessKeyId = "xxx";String awsSecretAccessKey = "xxxxx";AWSCredentials credentials = new BasicAWSCredentials(awsAccessKeyId, awsSecretAccessKey);AWSStaticCredentialsProvider credentialsProvider = new AWSStaticCredentialsProvider(credentials);// 配置 KCL workerworkerId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + UUID.randomUUID();KinesisClientLibConfiguration kinesisClientLibConfiguration = new KinesisClientLibConfiguration("test","CloudFront-apk-download-log",credentialsProvider,workerId).withInitialPositionInStream(InitialPositionInStream.LATEST).withRegionName("ap-south-1");// 创建并启动 workerWorker worker = new Worker.Builder().recordProcessorFactory(new MyRecordProcessorFactory()).config(kinesisClientLibConfiguration).build();worker.run(); // 这将启动 worker 并开始从 Kinesis 流中读取数据
}

创建一个接口MyRecordProcessorFactory,实现IRecordProcessorFactory,返回实例化监听端处理的类.这样那边产生数据,这边开始进入监听类处理.

public class MyRecordProcessorFactory implements IRecordProcessorFactory {@Overridepublic IRecordProcessor createProcessor() {return new MyRecordProcessor();}
}

创建MyRecordProcessor类 , 实现IRecordProcessor, 然后就会实现三个接口,初始化,监听数据,关闭资源这三个接口,

初始化initialize(): 在启动程序时会进入到初始化方法,我们可以拿到分片id以及从哪个序列号取出数据.

监听数据方法processRecords(): 此方法就会服务端生成的信息,这边就能同步监听到,并把信息给到你,你可以从给的参数中取出数据,这个你服务监听什么就会给你返什么. 你就可以解析, 解析完放到实体或者什么自己自定义处理吧.

public class MyRecordProcessor implements IRecordProcessor {private static final Logger LOG = LoggerFactory.getLogger(KCLExample.class);@Overridepublic void initialize(InitializationInput initializationInput) {// 初始化LOG.info("初始化shardId:{}", initializationInput.getShardId());LOG.info("初始化序列号:{}", initializationInput.getExtendedSequenceNumber());LOG.info("初始化检查点序列号:{}", initializationInput.getPendingCheckpointSequenceNumber());}@Overridepublic synchronized void processRecords(ProcessRecordsInput processRecordsInput) {List<Record> records = processRecordsInput.getRecords();System.out.println("批次:" + records.size());for (Record record : records) {ByteBuffer byteBuffer = record.getData();// 接收数据转换成strString str = StandardCharsets.UTF_8.decode(byteBuffer).toString();byteBuffer.flip();LOG.info("数据:{}", str);}// 检查点,目的是为了知道此次读取到了哪里IRecordProcessorCheckpointer checkpointer = processRecordsInput.getCheckpointer();try {checkpointer.checkpoint();} catch (InvalidStateException e) {throw new RuntimeException(e);} catch (ShutdownException e) {throw new RuntimeException(e);}} @Overridepublic void shutdown(ShutdownInput shutdownInput) {ShutdownReason reson = shutdownInput.getShutdownReason();// 关闭资源等清理工作LOG.info("关闭资源:{}", reson.toString());}
}

pom.xml 

   <dependency><groupId>com.amazonaws</groupId><artifactId>amazon-kinesis-client</artifactId><version>1.11.0</version></dependency>

 启动就可以监听数据啦!

这篇关于Java 如何使用aws的kinesis实现消费端,消费流中数据的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/996951

相关文章

使用Python和OpenCV库实现实时颜色识别系统

《使用Python和OpenCV库实现实时颜色识别系统》:本文主要介绍使用Python和OpenCV库实现的实时颜色识别系统,这个系统能够通过摄像头捕捉视频流,并在视频中指定区域内识别主要颜色(红... 目录一、引言二、系统概述三、代码解析1. 导入库2. 颜色识别函数3. 主程序循环四、HSV色彩空间详解

Windows下C++使用SQLitede的操作过程

《Windows下C++使用SQLitede的操作过程》本文介绍了Windows下C++使用SQLite的安装配置、CppSQLite库封装优势、核心功能(如数据库连接、事务管理)、跨平台支持及性能优... 目录Windows下C++使用SQLite1、安装2、代码示例CppSQLite:C++轻松操作SQ

PostgreSQL中MVCC 机制的实现

《PostgreSQL中MVCC机制的实现》本文主要介绍了PostgreSQL中MVCC机制的实现,通过多版本数据存储、快照隔离和事务ID管理实现高并发读写,具有一定的参考价值,感兴趣的可以了解一下... 目录一 MVCC 基本原理python1.1 MVCC 核心概念1.2 与传统锁机制对比二 Postg

SpringBoot整合Flowable实现工作流的详细流程

《SpringBoot整合Flowable实现工作流的详细流程》Flowable是一个使用Java编写的轻量级业务流程引擎,Flowable流程引擎可用于部署BPMN2.0流程定义,创建这些流程定义的... 目录1、流程引擎介绍2、创建项目3、画流程图4、开发接口4.1 Java 类梳理4.2 查看流程图4

一文详解如何在idea中快速搭建一个Spring Boot项目

《一文详解如何在idea中快速搭建一个SpringBoot项目》IntelliJIDEA作为Java开发者的‌首选IDE‌,深度集成SpringBoot支持,可一键生成项目骨架、智能配置依赖,这篇文... 目录前言1、创建项目名称2、勾选需要的依赖3、在setting中检查maven4、编写数据源5、开启热

SQL Server修改数据库名及物理数据文件名操作步骤

《SQLServer修改数据库名及物理数据文件名操作步骤》在SQLServer中重命名数据库是一个常见的操作,但需要确保用户具有足够的权限来执行此操作,:本文主要介绍SQLServer修改数据... 目录一、背景介绍二、操作步骤2.1 设置为单用户模式(断开连接)2.2 修改数据库名称2.3 查找逻辑文件名

C++中零拷贝的多种实现方式

《C++中零拷贝的多种实现方式》本文主要介绍了C++中零拷贝的实现示例,旨在在减少数据在内存中的不必要复制,从而提高程序性能、降低内存使用并减少CPU消耗,零拷贝技术通过多种方式实现,下面就来了解一下... 目录一、C++中零拷贝技术的核心概念二、std::string_view 简介三、std::stri

Python常用命令提示符使用方法详解

《Python常用命令提示符使用方法详解》在学习python的过程中,我们需要用到命令提示符(CMD)进行环境的配置,:本文主要介绍Python常用命令提示符使用方法的相关资料,文中通过代码介绍的... 目录一、python环境基础命令【Windows】1、检查Python是否安装2、 查看Python的安

C++高效内存池实现减少动态分配开销的解决方案

《C++高效内存池实现减少动态分配开销的解决方案》C++动态内存分配存在系统调用开销、碎片化和锁竞争等性能问题,内存池通过预分配、分块管理和缓存复用解决这些问题,下面就来了解一下... 目录一、C++内存分配的性能挑战二、内存池技术的核心原理三、主流内存池实现:TCMalloc与Jemalloc1. TCM

OpenCV实现实时颜色检测的示例

《OpenCV实现实时颜色检测的示例》本文主要介绍了OpenCV实现实时颜色检测的示例,通过HSV色彩空间转换和色调范围判断实现红黄绿蓝颜色检测,包含视频捕捉、区域标记、颜色分析等功能,具有一定的参考... 目录一、引言二、系统概述三、代码解析1. 导入库2. 颜色识别函数3. 主程序循环四、HSV色彩空间