【源码编译】Apache SeaTunnel-Web 适配最新2.3.4版本教程

2024-03-12 12:44

本文主要是介绍【源码编译】Apache SeaTunnel-Web 适配最新2.3.4版本教程,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

Apache SeaTunnel新版本已经发布,感兴趣的小伙伴可以看之前版本发布的文章

file

本文主要给大家介绍为使用2.3.4版本的新特性,需要对Apache SeaTunnel-Web依赖的版本进行升级,而SeaTunnel2.3.4版本部分API跟之前版本不兼容,所以需要对 SeaTunnel-Web的源码进行修改适配。

源码修改编译

克隆SeaYunnel-Web源码到本地

  git  clone https://github.com/apache/seatunnel-web.git

在idea中打开项目

升级Pom中的SeaTunnel版本到2.3.4并重新导入依赖

  <seatunnel-framework.version>2.3.3</seatunnel-framework.version>改为<seatunnel-framework.version>2.3.4</seatunnel-framework.version>

因为大部分用户使用SeaTunnel Web都是基于SeaTunnel-2.3.3 版本做的适配,而最新发布的SeaTunnel2.3.4 部分API发生了改动导致直接升级的过程中会出现API不兼容的问题,所以本篇文章重点来了:我们需要对调用SeaTunnel API的SeaTunnel Web源码部分进行修改,修改完之后,我们就能完全适配2.3.4最新版本。

社区推出了2.3.X及Web系列专属的社群,感兴趣的小伙伴可以加社区小助手进群。

org.apache.dolphinscheduler.api.dto.seatunnel.bean.engine.EngineDataType

public static class SeaTunnelDataTypeConvertorimplements DataTypeConvertor<SeaTunnelDataType<?>> {@Overridepublic SeaTunnelDataType<?> toSeaTunnelType(String engineDataType) {return DATA_TYPE_MAP.get(engineDataType.toLowerCase(Locale.ROOT)).getRawType();}@Overridepublic SeaTunnelDataType<?> toSeaTunnelType(SeaTunnelDataType<?> seaTunnelDataType, Map<String, Object> map)throws DataTypeConvertException {return seaTunnelDataType;}@Overridepublic SeaTunnelDataType<?> toConnectorType(SeaTunnelDataType<?> seaTunnelDataType, Map<String, Object> map)throws DataTypeConvertException {return seaTunnelDataType;}@Overridepublic String getIdentity() {return "EngineDataTypeConvertor";}
}
// 改为
public static class SeaTunnelDataTypeConvertorimplements DataTypeConvertor<SeaTunnelDataType<?>> {@Overridepublic SeaTunnelDataType<?> toSeaTunnelType(String s, String s1) {return DATA_TYPE_MAP.get(s.toLowerCase(Locale.ROOT)).getRawType();}@Overridepublic SeaTunnelDataType<?> toSeaTunnelType(String s, SeaTunnelDataType<?> seaTunnelDataType, Map<String, Object> map) {return seaTunnelDataType;}@Overridepublic SeaTunnelDataType<?> toConnectorType(String s, SeaTunnelDataType<?> seaTunnelDataType, Map<String, Object> map) {return seaTunnelDataType;}@Overridepublic String getIdentity() {return "EngineDataTypeConvertor";}}

org.apache.seatunnel.app.service.impl.TableSchemaServiceImpl

public TableSchemaServiceImpl() throws IOException {Common.setStarter(true);Set<PluginIdentifier> pluginIdentifiers =SeaTunnelSinkPluginDiscovery.getAllSupportedPlugins(PluginType.SINK).keySet();ArrayList<PluginIdentifier> pluginIdentifiersList = new ArrayList<>();pluginIdentifiersList.addAll(pluginIdentifiers);List<URL> pluginJarPaths =new SeaTunnelSinkPluginDiscovery().getPluginJarPaths(pluginIdentifiersList);//        Path path = new SeaTunnelSinkPluginDiscovery().getPluginDir();if (!pluginJarPaths.isEmpty()) {//            List<URL> files = FileUtils.searchJarFiles(path);pluginJarPaths.addAll(FileUtils.searchJarFiles(Common.pluginRootDir()));factory =new DataTypeConvertorFactory(new URLClassLoader(pluginJarPaths.toArray(new URL[0])));} else {factory = new DataTypeConvertorFactory();}
}
// 改为public TableSchemaServiceImpl() throws IOException {Common.setStarter(true);Set<PluginIdentifier> pluginIdentifiers =SeaTunnelSinkPluginDiscovery.getAllSupportedPlugins(PluginType.SINK).keySet();ArrayList<PluginIdentifier> pluginIdentifiersList = new ArrayList<>();pluginIdentifiersList.addAll(pluginIdentifiers);List<URL> pluginJarPaths =new SeaTunnelSinkPluginDiscovery().getPluginJarPaths(pluginIdentifiersList);//        Path path = new SeaTunnelSinkPluginDiscovery().getPluginDir();if (!pluginJarPaths.isEmpty()) {//            List<URL> files = FileUtils.searchJarFiles(path);pluginJarPaths.addAll(FileUtils.searchJarFiles(Common.pluginRootDir()));factory =new DataTypeConvertorFactory(new URLClassLoader(pluginJarPaths.toArray(new URL[0])));} else {factory = new DataTypeConvertorFactory();}}SeaTunnelDataType<?> dataType = convertor.toSeaTunnelType(field.getType());
// 改为
SeaTunnelDataType<?> dataType =convertor.toSeaTunnelType(field.getName(), field.getType());

org.apache.seatunnel.app.service.impl.JobExecutorServiceImpl.executeJobBySeaTunnel()

 public Long executeJobBySeaTunnel(Integer userId, String filePath, Long jobInstanceId) {Common.setDeployMode(DeployMode.CLIENT);JobConfig jobConfig = new JobConfig();jobConfig.setName(jobInstanceId + "_job");try {SeaTunnelConfig seaTunnelConfig = new YamlSeaTunnelConfigBuilder().build();SeaTunnelClient seaTunnelClient = createSeaTunnelClient();ClientJobExecutionEnvironment jobExecutionEnv =seaTunnelClient.createExecutionContext(filePath, jobConfig, seaTunnelConfig);final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();JobInstance jobInstance = jobInstanceDao.getJobInstance(jobInstanceId);jobInstance.setJobEngineId(Long.toString(clientJobProxy.getJobId()));jobInstanceDao.update(jobInstance);CompletableFuture.runAsync(() -> {waitJobFinish(clientJobProxy,userId,jobInstanceId,Long.toString(clientJobProxy.getJobId()),seaTunnelClient);});} catch (ExecutionException | InterruptedException e) {ExceptionUtils.getMessage(e);throw new RuntimeException(e);}return jobInstanceId;}

org.apache.seatunnel.app.service.impl.JobInstanceServiceImpl

else if (statusList.contains("CANCELLING")) {jobStatus = JobStatus.CANCELLING.name();
// 改为
else if (statusList.contains("CANCELING")) {jobStatus = JobStatus.CANCELING.name();

org.apache.seatunnel.app.service.impl.SchemaDerivationServiceImpl

TableFactoryContext context =new TableFactoryContext(Collections.singletonList(table),ReadonlyConfig.fromMap(config),Thread.currentThread().getContextClassLoader());
// 改为
TableTransformFactoryContext context =new TableTransformFactoryContext(Collections.singletonList(table),ReadonlyConfig.fromMap(config),Thread.currentThread().getContextClassLoader());

org.apache.seatunnel.app.thirdparty.engine.SeaTunnelEngineProxy

public void restoreJob(@NonNull String filePath, @NonNull Long jobInstanceId, @NonNull Long jobEngineId) {SeaTunnelClient seaTunnelClient = new SeaTunnelClient(clientConfig);JobConfig jobConfig = new JobConfig();jobConfig.setName(jobInstanceId + "_job");try {seaTunnelClient.restoreExecutionContext(filePath, jobConfig, jobEngineId).execute();} catch (ExecutionException e) {throw new RuntimeException(e);} catch (InterruptedException e) {throw new RuntimeException(e);}
}
// 改为
public void restoreJob(@NonNull String filePath, @NonNull Long jobInstanceId, @NonNull Long jobEngineId) {SeaTunnelClient seaTunnelClient = new SeaTunnelClient(clientConfig);JobConfig jobConfig = new JobConfig();jobConfig.setName(jobInstanceId + "_job");SeaTunnelConfig seaTunnelConfig = new YamlSeaTunnelConfigBuilder().build();try {seaTunnelClient.restoreExecutionContext(filePath, jobConfig, seaTunnelConfig, jobEngineId).execute();} catch (ExecutionException e) {throw new RuntimeException(e);} catch (InterruptedException e) {throw new RuntimeException(e);}}

org.apache.seatunnel.app.thirdparty.framework.PluginDiscoveryUtil

public static Map<PluginIdentifier, ConnectorFeature> getConnectorFeatures(PluginType pluginType) throws IOException {Common.setStarter(true);if (!pluginType.equals(PluginType.SOURCE)) {throw new UnsupportedOperationException("ONLY support plugin type source");}Path path = new SeaTunnelSinkPluginDiscovery().getPluginDir();List<Factory> factories;if (path.toFile().exists()) {List<URL> files = FileUtils.searchJarFiles(path);factories =FactoryUtil.discoverFactories(new URLClassLoader(files.toArray(new URL[0])));} else {factories =FactoryUtil.discoverFactories(Thread.currentThread().getContextClassLoader());}Map<PluginIdentifier, ConnectorFeature> featureMap = new ConcurrentHashMap<>();factories.forEach(plugin -> {if (TableSourceFactory.class.isAssignableFrom(plugin.getClass())) {TableSourceFactory tableSourceFactory = (TableSourceFactory) plugin;PluginIdentifier info =PluginIdentifier.of("seatunnel",PluginType.SOURCE.getType(),plugin.factoryIdentifier());featureMap.put(info,new ConnectorFeature(SupportColumnProjection.class.isAssignableFrom(tableSourceFactory.getSourceClass())));}});return featureMap;
}
// 改为public static Map<PluginIdentifier, ConnectorFeature> getConnectorFeatures(PluginType pluginType) {Common.setStarter(true);if (!pluginType.equals(PluginType.SOURCE)) {throw new UnsupportedOperationException("ONLY support plugin type source");}ArrayList<PluginIdentifier> pluginIdentifiers = new ArrayList<>();pluginIdentifiers.addAll(SeaTunnelSinkPluginDiscovery.getAllSupportedPlugins(PluginType.SOURCE).keySet());List<URL> pluginJarPaths =new SeaTunnelSinkPluginDiscovery().getPluginJarPaths(pluginIdentifiers);List<Factory> factories;if (!pluginJarPaths.isEmpty()) {factories =FactoryUtil.discoverFactories(new URLClassLoader(pluginJarPaths.toArray(new URL[0])));} else {factories =FactoryUtil.discoverFactories(Thread.currentThread().getContextClassLoader());}Map<PluginIdentifier, ConnectorFeature> featureMap = new ConcurrentHashMap<>();factories.forEach(plugin -> {if (TableSourceFactory.class.isAssignableFrom(plugin.getClass())) {TableSourceFactory tableSourceFactory = (TableSourceFactory) plugin;PluginIdentifier info =PluginIdentifier.of("seatunnel",PluginType.SOURCE.getType(),plugin.factoryIdentifier());featureMap.put(info,new ConnectorFeature(SupportColumnProjection.class.isAssignableFrom(tableSourceFactory.getSourceClass())));}});return featureMap;

代码格式化

mvn spotless:apply

编译打包

mvn clean package -DskipTests

至此,seatunnel web 适配 seatunnel2.3.4版本完成,对应的安装包会在 seatunnel-web-dist/target目录下生成

Linux部署测试

这里具体请参考之前社区其他老师发布的文章Apache SeaTunnel Web部署指南

重要的配置项

1、seatunnel-web数据库相关配置(application.yml) 
用来web服务中的数据持久化2、SEATUNNEL_HOME(环境变量)
seatunnel-web调用seaunnel的插件获取的API,扫描connector相关的连接器3、ST_WEB_HOME(环境变量)
seatunnel-web会加载seatunnel-web/datasource下的插件包,这里决定了seatunnel-web支持哪些数据源的任务定义4、重要的配置文件:
connector-datasource-mapper.yaml 
该配置文件配置了支持的数据源类型以及该数据源支持的数据同步方式等信息(比如是否支持多表同步、是否支持cdc等)
hazelcast-client.yaml 
seatunnel-web服务通过seatunnel-api的方式与seatunnel集群进行交互,该配置文件配置了集群节点等相关信息

感谢大家的阅读,希望对各位兄弟有所帮助,如果有任何疑问,欢迎来社区找我交流!

本文由 白鲸开源科技 提供发布支持!

这篇关于【源码编译】Apache SeaTunnel-Web 适配最新2.3.4版本教程的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/801285

相关文章

Web服务器-Nginx-高并发问题

《Web服务器-Nginx-高并发问题》Nginx通过事件驱动、I/O多路复用和异步非阻塞技术高效处理高并发,结合动静分离和限流策略,提升性能与稳定性... 目录前言一、架构1. 原生多进程架构2. 事件驱动模型3. IO多路复用4. 异步非阻塞 I/O5. Nginx高并发配置实战二、动静分离1. 职责2

Ubuntu如何升级Python版本

《Ubuntu如何升级Python版本》Ubuntu22.04Docker中,安装Python3.11后,使用update-alternatives设置为默认版本,最后用python3-V验证... 目China编程录问题描述前提环境解决方法总结问题描述Ubuntu22.04系统自带python3.10,想升级

基于C#实现PDF转图片的详细教程

《基于C#实现PDF转图片的详细教程》在数字化办公场景中,PDF文件的可视化处理需求日益增长,本文将围绕Spire.PDFfor.NET这一工具,详解如何通过C#将PDF转换为JPG、PNG等主流图片... 目录引言一、组件部署二、快速入门:PDF 转图片的核心 C# 代码三、分辨率设置 - 清晰度的决定因

Java Scanner类解析与实战教程

《JavaScanner类解析与实战教程》JavaScanner类(java.util包)是文本输入解析工具,支持基本类型和字符串读取,基于Readable接口与正则分隔符实现,适用于控制台、文件输... 目录一、核心设计与工作原理1.底层依赖2.解析机制A.核心逻辑基于分隔符(delimiter)和模式匹

SpringBoot通过main方法启动web项目实践

《SpringBoot通过main方法启动web项目实践》SpringBoot通过SpringApplication.run()启动Web项目,自动推断应用类型,加载初始化器与监听器,配置Spring... 目录1. 启动入口:SpringApplication.run()2. SpringApplicat

spring AMQP代码生成rabbitmq的exchange and queue教程

《springAMQP代码生成rabbitmq的exchangeandqueue教程》使用SpringAMQP代码直接创建RabbitMQexchange和queue,并确保绑定关系自动成立,简... 目录spring AMQP代码生成rabbitmq的exchange and 编程queue执行结果总结s

MyBatis的xml中字符串类型判空与非字符串类型判空处理方式(最新整理)

《MyBatis的xml中字符串类型判空与非字符串类型判空处理方式(最新整理)》本文给大家介绍MyBatis的xml中字符串类型判空与非字符串类型判空处理方式,本文给大家介绍的非常详细,对大家的学习或... 目录完整 Hutool 写法版本对比优化为什么status变成Long?为什么 price 没事?怎

更改linux系统的默认Python版本方式

《更改linux系统的默认Python版本方式》通过删除原Python软链接并创建指向python3.6的新链接,可切换系统默认Python版本,需注意版本冲突、环境混乱及维护问题,建议使用pyenv... 目录更改系统的默认python版本软链接软链接的特点创建软链接的命令使用场景注意事项总结更改系统的默

Linux升级或者切换python版本实现方式

《Linux升级或者切换python版本实现方式》本文介绍在Ubuntu/Debian系统升级Python至3.11或更高版本的方法,通过查看版本列表并选择新版本进行全局修改,需注意自动与手动模式的选... 目录升级系统python版本 (适用于全局修改)对于Ubuntu/Debian系统安装后,验证Pyt

MySQL 升级到8.4版本的完整流程及操作方法

《MySQL升级到8.4版本的完整流程及操作方法》本文详细说明了MySQL升级至8.4的完整流程,涵盖升级前准备(备份、兼容性检查)、支持路径(原地、逻辑导出、复制)、关键变更(空间索引、保留关键字... 目录一、升级前准备 (3.1 Before You Begin)二、升级路径 (3.2 Upgrade