Flinkx如何通过json文件定位读写插件

2023-11-11 21:58
文章标签 json 读写 定位 插件 flinkx

本文主要是介绍Flinkx如何通过json文件定位读写插件,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

前言

Flinkx作为数据同步工具,它通过json配置文件来确定多源到多源的数据同步和同步策略,这次就来看看Flinkx是如何通过json配置文件来定位reader和writer端的

整体结构

在这里插入图片描述

从Flinkx项目里可以看到,它对每个数据源都定义了core,reader输出端,writer输入端。当然也有意外,hive和redis只要输入。

Flinkx是如何通过json配置文件定位这个源呢

看源码

定位到D:\Projects\flinkx-1.8.5\flinkx-core\src\main\java\com\dtstack\flinkx\Main.java

1. 先看下如何定位reader端,在第113行

BaseDataReader dataReader = DataReaderFactory.getDataReader(config, env);
// 进入getDataReader方法:
public static BaseDataReader getDataReader(DataTransferConfig config, StreamExecutionEnvironment env) {try {// 通过配置文件获取插件名String pluginName = config.getJob().getContent().get(0).getReader().getName();// 通过插件名找到插件的类名String pluginClassName = PluginUtil.getPluginClassName(pluginName);// 在获取插件名的文件路径和公共插件文件路径Set<URL> urlList = PluginUtil.getJarFileDirPath(pluginName, config.getPluginRoot());// 通过文件路径加载类,得到构造函数,实例化插件return ClassLoaderManager.newInstance(urlList, cl -> {Class<?> clazz = cl.loadClass(pluginClassName);Constructor constructor = clazz.getConstructor(DataTransferConfig.class, StreamExecutionEnvironment.class);return (BaseDataReader)constructor.newInstance(config, env);});} catch (Exception e) {throw new RuntimeException(e);}}

在每个json配置文件中,都需要配置job.content.reader.name="xxxreader"参数,这个name就是插件名,通过该名字才可以找到相应的插件。实例化插件还需要导入公共的插件plugins/common/flinkx-rdb-**.jar

2. 在看看writer端,在122行

BaseDataWriter dataWriter = DataWriterFactory.getDataWriter(config);
// 进入getDataWriter方法
public static BaseDataWriter getDataWriter(DataTransferConfig config) {try {String pluginName = config.getJob().getContent().get(0).getWriter().getName();String pluginClassName = PluginUtil.getPluginClassName(pluginName);Set<URL> urlList = PluginUtil.getJarFileDirPath(pluginName, config.getPluginRoot());return ClassLoaderManager.newInstance(urlList, cl -> {Class<?> clazz = cl.loadClass(pluginClassName);Constructor constructor = clazz.getConstructor(DataTransferConfig.class);return (BaseDataWriter)constructor.newInstance(config);});} catch (Exception e) {throw new RuntimeException(e);}}

与reader端同理,通过name确定相应的writer端

这篇关于Flinkx如何通过json文件定位读写插件的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/393067

相关文章

Python进行JSON和Excel文件转换处理指南

《Python进行JSON和Excel文件转换处理指南》在数据交换与系统集成中,JSON与Excel是两种极为常见的数据格式,本文将介绍如何使用Python实现将JSON转换为格式化的Excel文件,... 目录将 jsON 导入为格式化 Excel将 Excel 导出为结构化 JSON处理嵌套 JSON:

详解MySQL中JSON数据类型用法及与传统JSON字符串对比

《详解MySQL中JSON数据类型用法及与传统JSON字符串对比》MySQL从5.7版本开始引入了JSON数据类型,专门用于存储JSON格式的数据,本文将为大家简单介绍一下MySQL中JSON数据类型... 目录前言基本用法jsON数据类型 vs 传统JSON字符串1. 存储方式2. 查询方式对比3. 索引

C#解析JSON数据全攻略指南

《C#解析JSON数据全攻略指南》这篇文章主要为大家详细介绍了使用C#解析JSON数据全攻略指南,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录一、为什么jsON是C#开发必修课?二、四步搞定网络JSON数据1. 获取数据 - HttpClient最佳实践2. 动态解析 - 快速

MySQL 8 中的一个强大功能 JSON_TABLE示例详解

《MySQL8中的一个强大功能JSON_TABLE示例详解》JSON_TABLE是MySQL8中引入的一个强大功能,它允许用户将JSON数据转换为关系表格式,从而可以更方便地在SQL查询中处理J... 目录基本语法示例示例查询解释应用场景不适用场景1. ‌jsON 数据结构过于复杂或动态变化‌2. ‌性能要

Spring的RedisTemplate的json反序列泛型丢失问题解决

《Spring的RedisTemplate的json反序列泛型丢失问题解决》本文主要介绍了SpringRedisTemplate中使用JSON序列化时泛型信息丢失的问题及其提出三种解决方案,可以根据性... 目录背景解决方案方案一方案二方案三总结背景在使用RedisTemplate操作redis时我们针对

Spring Boot Maven 插件如何构建可执行 JAR 的核心配置

《SpringBootMaven插件如何构建可执行JAR的核心配置》SpringBoot核心Maven插件,用于生成可执行JAR/WAR,内置服务器简化部署,支持热部署、多环境配置及依赖管理... 目录前言一、插件的核心功能与目标1.1 插件的定位1.2 插件的 Goals(目标)1.3 插件定位1.4 核

java使用protobuf-maven-plugin的插件编译proto文件详解

《java使用protobuf-maven-plugin的插件编译proto文件详解》:本文主要介绍java使用protobuf-maven-plugin的插件编译proto文件,具有很好的参考价... 目录protobuf文件作为数据传输和存储的协议主要介绍在Java使用maven编译proto文件的插件

C#读写文本文件的多种方式详解

《C#读写文本文件的多种方式详解》这篇文章主要为大家详细介绍了C#中各种常用的文件读写方式,包括文本文件,二进制文件、CSV文件、JSON文件等,有需要的小伙伴可以参考一下... 目录一、文本文件读写1. 使用 File 类的静态方法2. 使用 StreamReader 和 StreamWriter二、二进

怎样通过分析GC日志来定位Java进程的内存问题

《怎样通过分析GC日志来定位Java进程的内存问题》:本文主要介绍怎样通过分析GC日志来定位Java进程的内存问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、GC 日志基础配置1. 启用详细 GC 日志2. 不同收集器的日志格式二、关键指标与分析维度1.

Java进程异常故障定位及排查过程

《Java进程异常故障定位及排查过程》:本文主要介绍Java进程异常故障定位及排查过程,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、故障发现与初步判断1. 监控系统告警2. 日志初步分析二、核心排查工具与步骤1. 进程状态检查2. CPU 飙升问题3. 内存