pyflink读取kafka数据写入mysql实例

2023-10-11 18:45

本文主要是介绍pyflink读取kafka数据写入mysql实例,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

依赖包下载

https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka/1.17.1/

版本

flink:1.16.0

kafka:2.13-3.2.0

实例

import logging
import sysfrom pyflink.common import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.jdbc import JdbcSink, JdbcConnectionOptions
from pyflink.datastream.connectors.kafka import FlinkKafkaProducer, FlinkKafkaConsumer
from pyflink.datastream.formats.json import JsonRowSerializationSchema, JsonRowDeserializationSchemadef write_to_kafka(env):type_info = Types.ROW([Types.INT(), Types.STRING()])ds = env.from_collection([(1, 'hi'), (2, 'hello'), (3, 'hi'), (4, 'hello'), (5, 'hi'), (6, 'hello'), (6, 'hello')],type_info=type_info)serialization_schema = JsonRowSerializationSchema.Builder() \.with_type_info(type_info) \.build()kafka_producer = FlinkKafkaProducer(topic='test_json_topic',serialization_schema=serialization_schema,producer_config={'security.protocol': 'SASL_PLAINTEXT', 'sasl.mechanism': 'PLAIN', 'bootstrap.servers': '192.168.1.110:9092', 'group.id': 'test-consumer-group', 'sasl.jaas.config': 'org.apache.kafka.common.security.scram.ScramLoginModule required username=\"aaaaaaaaa\" password=\"bbbbbbb\";'})# note that the output type of ds must be RowTypeInfods.add_sink(kafka_producer)env.execute()def read_from_kafka(env):deserialization_schema = JsonRowDeserializationSchema.Builder() \.type_info(Types.ROW([Types.INT(), Types.STRING()])) \.build()kafka_consumer = FlinkKafkaConsumer(topics='test_json_topic',deserialization_schema=deserialization_schema,properties={'security.protocol': 'SASL_PLAINTEXT', 'sasl.mechanism': 'PLAIN', 'bootstrap.servers': '192.168.1.110:9092', 'group.id': 'test-consumer-group', 'sasl.jaas.config': 'org.apache.kafka.common.security.scram.ScramLoginModule required username=\"aaaaa\" password=\"bbbbbb\";'})kafka_consumer.set_start_from_earliest()env.add_source(kafka_consumer).print()env.execute()def wirte_data_todb(env, data):type_info = Types.ROW([Types.INT(), Types.STRING()])env.from_collection([(101, "Stream Processing with Apache Flink"),(102, "Streaming Systems"),(103, "Designing Data-Intensive Applications"),(104, "Kafka: The Definitive Guide")], type_info=type_info) \.add_sink(JdbcSink.sink("insert into flink (id, title) values (?, ?)",type_info,JdbcConnectionOptions.JdbcConnectionOptionsBuilder().with_url('jdbc:mysql://192.168.1.110:23006/test').with_driver_name('com.mysql.jdbc.Driver').with_user_name('sino').with_password('Caib@sgcc-56').build()))env.execute()if __name__ == '__main__':logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")env = StreamExecutionEnvironment.get_execution_environment()#env.add_jars("file:///opt/flink/flink-sql-connector-kafka-1.15.0.jar")#env.add_jars("file:///opt/flink/kafka-clients-2.8.1.jar")#env.add_jars("file:///opt/flink/flink-connector-jdbc-1.16.0.jar")#env.add_jars("file:///opt/flink/mysql-connector-java-8.0.29.jar")print("start reading data from kafka")read_from_kafka(env)#wirte_data_todb(env, "")

这篇关于pyflink读取kafka数据写入mysql实例的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/190090

相关文章

MySQL中EXISTS与IN用法使用与对比分析

《MySQL中EXISTS与IN用法使用与对比分析》在MySQL中,EXISTS和IN都用于子查询中根据另一个查询的结果来过滤主查询的记录,本文将基于工作原理、效率和应用场景进行全面对比... 目录一、基本用法详解1. IN 运算符2. EXISTS 运算符二、EXISTS 与 IN 的选择策略三、性能对比

MySQL常用字符串函数示例和场景介绍

《MySQL常用字符串函数示例和场景介绍》MySQL提供了丰富的字符串函数帮助我们高效地对字符串进行处理、转换和分析,本文我将全面且深入地介绍MySQL常用的字符串函数,并结合具体示例和场景,帮你熟练... 目录一、字符串函数概述1.1 字符串函数的作用1.2 字符串函数分类二、字符串长度与统计函数2.1

SpringBoot多环境配置数据读取方式

《SpringBoot多环境配置数据读取方式》SpringBoot通过环境隔离机制,支持properties/yaml/yml多格式配置,结合@Value、Environment和@Configura... 目录一、多环境配置的核心思路二、3种配置文件格式详解2.1 properties格式(传统格式)1.

SQL Server跟踪自动统计信息更新实战指南

《SQLServer跟踪自动统计信息更新实战指南》本文详解SQLServer自动统计信息更新的跟踪方法,推荐使用扩展事件实时捕获更新操作及详细信息,同时结合系统视图快速检查统计信息状态,重点强调修... 目录SQL Server 如何跟踪自动统计信息更新:深入解析与实战指南 核心跟踪方法1️⃣ 利用系统目录

MySQL 内存使用率常用分析语句

《MySQL内存使用率常用分析语句》用户整理了MySQL内存占用过高的分析方法,涵盖操作系统层确认及数据库层bufferpool、内存模块差值、线程状态、performance_schema性能数据... 目录一、 OS层二、 DB层1. 全局情况2. 内存占js用详情最近连续遇到mysql内存占用过高导致

解决pandas无法读取csv文件数据的问题

《解决pandas无法读取csv文件数据的问题》本文讲述作者用Pandas读取CSV文件时因参数设置不当导致数据错位,通过调整delimiter和on_bad_lines参数最终解决问题,并强调正确参... 目录一、前言二、问题复现1. 问题2. 通过 on_bad_lines=‘warn’ 跳过异常数据3

Mysql中设计数据表的过程解析

《Mysql中设计数据表的过程解析》数据库约束通过NOTNULL、UNIQUE、DEFAULT、主键和外键等规则保障数据完整性,自动校验数据,减少人工错误,提升数据一致性和业务逻辑严谨性,本文介绍My... 目录1.引言2.NOT NULL——制定某列不可以存储NULL值2.UNIQUE——保证某一列的每一

解密SQL查询语句执行的过程

《解密SQL查询语句执行的过程》文章讲解了SQL语句的执行流程,涵盖解析、优化、执行三个核心阶段,并介绍执行计划查看方法EXPLAIN,同时提出性能优化技巧如合理使用索引、避免SELECT*、JOIN... 目录1. SQL语句的基本结构2. SQL语句的执行过程3. SQL语句的执行计划4. 常见的性能优

SQL Server 中的 WITH (NOLOCK) 示例详解

《SQLServer中的WITH(NOLOCK)示例详解》SQLServer中的WITH(NOLOCK)是一种表提示,等同于READUNCOMMITTED隔离级别,允许查询在不获取共享锁的情... 目录SQL Server 中的 WITH (NOLOCK) 详解一、WITH (NOLOCK) 的本质二、工作

MySQL 强制使用特定索引的操作

《MySQL强制使用特定索引的操作》MySQL可通过FORCEINDEX、USEINDEX等语法强制查询使用特定索引,但优化器可能不采纳,需结合EXPLAIN分析执行计划,避免性能下降,注意版本差异... 目录1. 使用FORCE INDEX语法2. 使用USE INDEX语法3. 使用IGNORE IND