Java 如何使用aws的kinesis实现消费端,消费流中数据

2024-05-24 01:12

本文主要是介绍Java 如何使用aws的kinesis实现消费端,消费流中数据,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

1.前言

AWS 官网给了两种方式实现:

java 1.x

java 2.x 

这两种方式,包是不一样的,1.x是com.amazonaws,2是software.amazon.kinesis

使用也是天差地别,而且国内对kinesis这个资料简直少的可怜,这也就增加了开发难度,

2.什么是kinesis

我说一下kinesis是啥吧,其实和咱们队列很像,服务端的数据,需要客户端监听消费,拿到数据解析之后怎么处理就是自己的事情啦,我主要的业务就是实现流中的数据,流中的数据都是url等相关信息,主要是点击链接就消费,所以可以实现点击量的处理等等.

maven包java2.x:Maven Central: software.amazon.kinesis:amazon-kinesis-client

kinesis怎么使用的介绍

地址:在 Java 中开发 Kinesis Client Library 消费端 - Amazon Kinesis Data Streams

3.开始前的准备

代码不难,难的是没有相关的资源资料去实现,所以我这次实现代码主要靠AI,它实现了代码其实也不准,但是确实是给了我灵感,一遍一遍让AI生成代码,一遍一遍试错,调试,最后终于成功!

在写代码之前我们需要一些配置:

1.应用名称,这个自己起个名字就行

2.流名,AWS关于kinesis控制台有,可以去拿

3.区域,AWS的区域

3.aws凭证密钥和key

4.代码

首先,我们需要启动监听,配置aws凭证,区域啊,workerid等,最后启动worker线程使其能够监听,

下面我在main方法中启动监听的演示代码,也可以多线程哦

public static void main(String[] args) throws UnknownHostException, ParseException {// 硬编码的AWS凭证String awsAccessKeyId = "xxx";String awsSecretAccessKey = "xxxxx";AWSCredentials credentials = new BasicAWSCredentials(awsAccessKeyId, awsSecretAccessKey);AWSStaticCredentialsProvider credentialsProvider = new AWSStaticCredentialsProvider(credentials);// 配置 KCL workerworkerId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + UUID.randomUUID();KinesisClientLibConfiguration kinesisClientLibConfiguration = new KinesisClientLibConfiguration("test","CloudFront-apk-download-log",credentialsProvider,workerId).withInitialPositionInStream(InitialPositionInStream.LATEST).withRegionName("ap-south-1");// 创建并启动 workerWorker worker = new Worker.Builder().recordProcessorFactory(new MyRecordProcessorFactory()).config(kinesisClientLibConfiguration).build();worker.run(); // 这将启动 worker 并开始从 Kinesis 流中读取数据
}

创建一个接口MyRecordProcessorFactory,实现IRecordProcessorFactory,返回实例化监听端处理的类.这样那边产生数据,这边开始进入监听类处理.

public class MyRecordProcessorFactory implements IRecordProcessorFactory {@Overridepublic IRecordProcessor createProcessor() {return new MyRecordProcessor();}
}

创建MyRecordProcessor类 , 实现IRecordProcessor, 然后就会实现三个接口,初始化,监听数据,关闭资源这三个接口,

初始化initialize(): 在启动程序时会进入到初始化方法,我们可以拿到分片id以及从哪个序列号取出数据.

监听数据方法processRecords(): 此方法就会服务端生成的信息,这边就能同步监听到,并把信息给到你,你可以从给的参数中取出数据,这个你服务监听什么就会给你返什么. 你就可以解析, 解析完放到实体或者什么自己自定义处理吧.

public class MyRecordProcessor implements IRecordProcessor {private static final Logger LOG = LoggerFactory.getLogger(KCLExample.class);@Overridepublic void initialize(InitializationInput initializationInput) {// 初始化LOG.info("初始化shardId:{}", initializationInput.getShardId());LOG.info("初始化序列号:{}", initializationInput.getExtendedSequenceNumber());LOG.info("初始化检查点序列号:{}", initializationInput.getPendingCheckpointSequenceNumber());}@Overridepublic synchronized void processRecords(ProcessRecordsInput processRecordsInput) {List<Record> records = processRecordsInput.getRecords();System.out.println("批次:" + records.size());for (Record record : records) {ByteBuffer byteBuffer = record.getData();// 接收数据转换成strString str = StandardCharsets.UTF_8.decode(byteBuffer).toString();byteBuffer.flip();LOG.info("数据:{}", str);}// 检查点,目的是为了知道此次读取到了哪里IRecordProcessorCheckpointer checkpointer = processRecordsInput.getCheckpointer();try {checkpointer.checkpoint();} catch (InvalidStateException e) {throw new RuntimeException(e);} catch (ShutdownException e) {throw new RuntimeException(e);}} @Overridepublic void shutdown(ShutdownInput shutdownInput) {ShutdownReason reson = shutdownInput.getShutdownReason();// 关闭资源等清理工作LOG.info("关闭资源:{}", reson.toString());}
}

pom.xml 

   <dependency><groupId>com.amazonaws</groupId><artifactId>amazon-kinesis-client</artifactId><version>1.11.0</version></dependency>

 启动就可以监听数据啦!

这篇关于Java 如何使用aws的kinesis实现消费端,消费流中数据的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/996951

相关文章

Spring Boot集成/输出/日志级别控制/持久化开发实践

《SpringBoot集成/输出/日志级别控制/持久化开发实践》SpringBoot默认集成Logback,支持灵活日志级别配置(INFO/DEBUG等),输出包含时间戳、级别、类名等信息,并可通过... 目录一、日志概述1.1、Spring Boot日志简介1.2、日志框架与默认配置1.3、日志的核心作用

Python使用Tenacity一行代码实现自动重试详解

《Python使用Tenacity一行代码实现自动重试详解》tenacity是一个专为Python设计的通用重试库,它的核心理念就是用简单、清晰的方式,为任何可能失败的操作添加重试能力,下面我们就来看... 目录一切始于一个简单的 API 调用Tenacity 入门:一行代码实现优雅重试精细控制:让重试按我

破茧 JDBC:MyBatis 在 Spring Boot 中的轻量实践指南

《破茧JDBC:MyBatis在SpringBoot中的轻量实践指南》MyBatis是持久层框架,简化JDBC开发,通过接口+XML/注解实现数据访问,动态代理生成实现类,支持增删改查及参数... 目录一、什么是 MyBATis二、 MyBatis 入门2.1、创建项目2.2、配置数据库连接字符串2.3、入

Springboot项目启动失败提示找不到dao类的解决

《Springboot项目启动失败提示找不到dao类的解决》SpringBoot启动失败,因ProductServiceImpl未正确注入ProductDao,原因:Dao未注册为Bean,解决:在启... 目录错误描述原因解决方法总结***************************APPLICA编

深度解析Spring Security 中的 SecurityFilterChain核心功能

《深度解析SpringSecurity中的SecurityFilterChain核心功能》SecurityFilterChain通过组件化配置、类型安全路径匹配、多链协同三大特性,重构了Spri... 目录Spring Security 中的SecurityFilterChain深度解析一、Security

MySQL中EXISTS与IN用法使用与对比分析

《MySQL中EXISTS与IN用法使用与对比分析》在MySQL中,EXISTS和IN都用于子查询中根据另一个查询的结果来过滤主查询的记录,本文将基于工作原理、效率和应用场景进行全面对比... 目录一、基本用法详解1. IN 运算符2. EXISTS 运算符二、EXISTS 与 IN 的选择策略三、性能对比

Redis客户端连接机制的实现方案

《Redis客户端连接机制的实现方案》本文主要介绍了Redis客户端连接机制的实现方案,包括事件驱动模型、非阻塞I/O处理、连接池应用及配置优化,具有一定的参考价值,感兴趣的可以了解一下... 目录1. Redis连接模型概述2. 连接建立过程详解2.1 连php接初始化流程2.2 关键配置参数3. 最大连

SpringBoot多环境配置数据读取方式

《SpringBoot多环境配置数据读取方式》SpringBoot通过环境隔离机制,支持properties/yaml/yml多格式配置,结合@Value、Environment和@Configura... 目录一、多环境配置的核心思路二、3种配置文件格式详解2.1 properties格式(传统格式)1.

Apache Ignite 与 Spring Boot 集成详细指南

《ApacheIgnite与SpringBoot集成详细指南》ApacheIgnite官方指南详解如何通过SpringBootStarter扩展实现自动配置,支持厚/轻客户端模式,简化Ign... 目录 一、背景:为什么需要这个集成? 二、两种集成方式(对应两种客户端模型) 三、方式一:自动配置 Thick

Python实现网格交易策略的过程

《Python实现网格交易策略的过程》本文讲解Python网格交易策略,利用ccxt获取加密货币数据及backtrader回测,通过设定网格节点,低买高卖获利,适合震荡行情,下面跟我一起看看我们的第一... 网格交易是一种经典的量化交易策略,其核心思想是在价格上下预设多个“网格”,当价格触发特定网格时执行买