【源码编译】Apache SeaTunnel-Web 适配最新2.3.4版本教程

2024-03-12 12:44

本文主要是介绍【源码编译】Apache SeaTunnel-Web 适配最新2.3.4版本教程,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

Apache SeaTunnel新版本已经发布,感兴趣的小伙伴可以看之前版本发布的文章

file

本文主要给大家介绍为使用2.3.4版本的新特性,需要对Apache SeaTunnel-Web依赖的版本进行升级,而SeaTunnel2.3.4版本部分API跟之前版本不兼容,所以需要对 SeaTunnel-Web的源码进行修改适配。

源码修改编译

克隆SeaYunnel-Web源码到本地

  git  clone https://github.com/apache/seatunnel-web.git

在idea中打开项目

升级Pom中的SeaTunnel版本到2.3.4并重新导入依赖

  <seatunnel-framework.version>2.3.3</seatunnel-framework.version>改为<seatunnel-framework.version>2.3.4</seatunnel-framework.version>

因为大部分用户使用SeaTunnel Web都是基于SeaTunnel-2.3.3 版本做的适配,而最新发布的SeaTunnel2.3.4 部分API发生了改动导致直接升级的过程中会出现API不兼容的问题,所以本篇文章重点来了:我们需要对调用SeaTunnel API的SeaTunnel Web源码部分进行修改,修改完之后,我们就能完全适配2.3.4最新版本。

社区推出了2.3.X及Web系列专属的社群,感兴趣的小伙伴可以加社区小助手进群。

org.apache.dolphinscheduler.api.dto.seatunnel.bean.engine.EngineDataType

public static class SeaTunnelDataTypeConvertorimplements DataTypeConvertor<SeaTunnelDataType<?>> {@Overridepublic SeaTunnelDataType<?> toSeaTunnelType(String engineDataType) {return DATA_TYPE_MAP.get(engineDataType.toLowerCase(Locale.ROOT)).getRawType();}@Overridepublic SeaTunnelDataType<?> toSeaTunnelType(SeaTunnelDataType<?> seaTunnelDataType, Map<String, Object> map)throws DataTypeConvertException {return seaTunnelDataType;}@Overridepublic SeaTunnelDataType<?> toConnectorType(SeaTunnelDataType<?> seaTunnelDataType, Map<String, Object> map)throws DataTypeConvertException {return seaTunnelDataType;}@Overridepublic String getIdentity() {return "EngineDataTypeConvertor";}
}
// 改为
public static class SeaTunnelDataTypeConvertorimplements DataTypeConvertor<SeaTunnelDataType<?>> {@Overridepublic SeaTunnelDataType<?> toSeaTunnelType(String s, String s1) {return DATA_TYPE_MAP.get(s.toLowerCase(Locale.ROOT)).getRawType();}@Overridepublic SeaTunnelDataType<?> toSeaTunnelType(String s, SeaTunnelDataType<?> seaTunnelDataType, Map<String, Object> map) {return seaTunnelDataType;}@Overridepublic SeaTunnelDataType<?> toConnectorType(String s, SeaTunnelDataType<?> seaTunnelDataType, Map<String, Object> map) {return seaTunnelDataType;}@Overridepublic String getIdentity() {return "EngineDataTypeConvertor";}}

org.apache.seatunnel.app.service.impl.TableSchemaServiceImpl

public TableSchemaServiceImpl() throws IOException {Common.setStarter(true);Set<PluginIdentifier> pluginIdentifiers =SeaTunnelSinkPluginDiscovery.getAllSupportedPlugins(PluginType.SINK).keySet();ArrayList<PluginIdentifier> pluginIdentifiersList = new ArrayList<>();pluginIdentifiersList.addAll(pluginIdentifiers);List<URL> pluginJarPaths =new SeaTunnelSinkPluginDiscovery().getPluginJarPaths(pluginIdentifiersList);//        Path path = new SeaTunnelSinkPluginDiscovery().getPluginDir();if (!pluginJarPaths.isEmpty()) {//            List<URL> files = FileUtils.searchJarFiles(path);pluginJarPaths.addAll(FileUtils.searchJarFiles(Common.pluginRootDir()));factory =new DataTypeConvertorFactory(new URLClassLoader(pluginJarPaths.toArray(new URL[0])));} else {factory = new DataTypeConvertorFactory();}
}
// 改为public TableSchemaServiceImpl() throws IOException {Common.setStarter(true);Set<PluginIdentifier> pluginIdentifiers =SeaTunnelSinkPluginDiscovery.getAllSupportedPlugins(PluginType.SINK).keySet();ArrayList<PluginIdentifier> pluginIdentifiersList = new ArrayList<>();pluginIdentifiersList.addAll(pluginIdentifiers);List<URL> pluginJarPaths =new SeaTunnelSinkPluginDiscovery().getPluginJarPaths(pluginIdentifiersList);//        Path path = new SeaTunnelSinkPluginDiscovery().getPluginDir();if (!pluginJarPaths.isEmpty()) {//            List<URL> files = FileUtils.searchJarFiles(path);pluginJarPaths.addAll(FileUtils.searchJarFiles(Common.pluginRootDir()));factory =new DataTypeConvertorFactory(new URLClassLoader(pluginJarPaths.toArray(new URL[0])));} else {factory = new DataTypeConvertorFactory();}}SeaTunnelDataType<?> dataType = convertor.toSeaTunnelType(field.getType());
// 改为
SeaTunnelDataType<?> dataType =convertor.toSeaTunnelType(field.getName(), field.getType());

org.apache.seatunnel.app.service.impl.JobExecutorServiceImpl.executeJobBySeaTunnel()

 public Long executeJobBySeaTunnel(Integer userId, String filePath, Long jobInstanceId) {Common.setDeployMode(DeployMode.CLIENT);JobConfig jobConfig = new JobConfig();jobConfig.setName(jobInstanceId + "_job");try {SeaTunnelConfig seaTunnelConfig = new YamlSeaTunnelConfigBuilder().build();SeaTunnelClient seaTunnelClient = createSeaTunnelClient();ClientJobExecutionEnvironment jobExecutionEnv =seaTunnelClient.createExecutionContext(filePath, jobConfig, seaTunnelConfig);final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();JobInstance jobInstance = jobInstanceDao.getJobInstance(jobInstanceId);jobInstance.setJobEngineId(Long.toString(clientJobProxy.getJobId()));jobInstanceDao.update(jobInstance);CompletableFuture.runAsync(() -> {waitJobFinish(clientJobProxy,userId,jobInstanceId,Long.toString(clientJobProxy.getJobId()),seaTunnelClient);});} catch (ExecutionException | InterruptedException e) {ExceptionUtils.getMessage(e);throw new RuntimeException(e);}return jobInstanceId;}

org.apache.seatunnel.app.service.impl.JobInstanceServiceImpl

else if (statusList.contains("CANCELLING")) {jobStatus = JobStatus.CANCELLING.name();
// 改为
else if (statusList.contains("CANCELING")) {jobStatus = JobStatus.CANCELING.name();

org.apache.seatunnel.app.service.impl.SchemaDerivationServiceImpl

TableFactoryContext context =new TableFactoryContext(Collections.singletonList(table),ReadonlyConfig.fromMap(config),Thread.currentThread().getContextClassLoader());
// 改为
TableTransformFactoryContext context =new TableTransformFactoryContext(Collections.singletonList(table),ReadonlyConfig.fromMap(config),Thread.currentThread().getContextClassLoader());

org.apache.seatunnel.app.thirdparty.engine.SeaTunnelEngineProxy

public void restoreJob(@NonNull String filePath, @NonNull Long jobInstanceId, @NonNull Long jobEngineId) {SeaTunnelClient seaTunnelClient = new SeaTunnelClient(clientConfig);JobConfig jobConfig = new JobConfig();jobConfig.setName(jobInstanceId + "_job");try {seaTunnelClient.restoreExecutionContext(filePath, jobConfig, jobEngineId).execute();} catch (ExecutionException e) {throw new RuntimeException(e);} catch (InterruptedException e) {throw new RuntimeException(e);}
}
// 改为
public void restoreJob(@NonNull String filePath, @NonNull Long jobInstanceId, @NonNull Long jobEngineId) {SeaTunnelClient seaTunnelClient = new SeaTunnelClient(clientConfig);JobConfig jobConfig = new JobConfig();jobConfig.setName(jobInstanceId + "_job");SeaTunnelConfig seaTunnelConfig = new YamlSeaTunnelConfigBuilder().build();try {seaTunnelClient.restoreExecutionContext(filePath, jobConfig, seaTunnelConfig, jobEngineId).execute();} catch (ExecutionException e) {throw new RuntimeException(e);} catch (InterruptedException e) {throw new RuntimeException(e);}}

org.apache.seatunnel.app.thirdparty.framework.PluginDiscoveryUtil

public static Map<PluginIdentifier, ConnectorFeature> getConnectorFeatures(PluginType pluginType) throws IOException {Common.setStarter(true);if (!pluginType.equals(PluginType.SOURCE)) {throw new UnsupportedOperationException("ONLY support plugin type source");}Path path = new SeaTunnelSinkPluginDiscovery().getPluginDir();List<Factory> factories;if (path.toFile().exists()) {List<URL> files = FileUtils.searchJarFiles(path);factories =FactoryUtil.discoverFactories(new URLClassLoader(files.toArray(new URL[0])));} else {factories =FactoryUtil.discoverFactories(Thread.currentThread().getContextClassLoader());}Map<PluginIdentifier, ConnectorFeature> featureMap = new ConcurrentHashMap<>();factories.forEach(plugin -> {if (TableSourceFactory.class.isAssignableFrom(plugin.getClass())) {TableSourceFactory tableSourceFactory = (TableSourceFactory) plugin;PluginIdentifier info =PluginIdentifier.of("seatunnel",PluginType.SOURCE.getType(),plugin.factoryIdentifier());featureMap.put(info,new ConnectorFeature(SupportColumnProjection.class.isAssignableFrom(tableSourceFactory.getSourceClass())));}});return featureMap;
}
// 改为public static Map<PluginIdentifier, ConnectorFeature> getConnectorFeatures(PluginType pluginType) {Common.setStarter(true);if (!pluginType.equals(PluginType.SOURCE)) {throw new UnsupportedOperationException("ONLY support plugin type source");}ArrayList<PluginIdentifier> pluginIdentifiers = new ArrayList<>();pluginIdentifiers.addAll(SeaTunnelSinkPluginDiscovery.getAllSupportedPlugins(PluginType.SOURCE).keySet());List<URL> pluginJarPaths =new SeaTunnelSinkPluginDiscovery().getPluginJarPaths(pluginIdentifiers);List<Factory> factories;if (!pluginJarPaths.isEmpty()) {factories =FactoryUtil.discoverFactories(new URLClassLoader(pluginJarPaths.toArray(new URL[0])));} else {factories =FactoryUtil.discoverFactories(Thread.currentThread().getContextClassLoader());}Map<PluginIdentifier, ConnectorFeature> featureMap = new ConcurrentHashMap<>();factories.forEach(plugin -> {if (TableSourceFactory.class.isAssignableFrom(plugin.getClass())) {TableSourceFactory tableSourceFactory = (TableSourceFactory) plugin;PluginIdentifier info =PluginIdentifier.of("seatunnel",PluginType.SOURCE.getType(),plugin.factoryIdentifier());featureMap.put(info,new ConnectorFeature(SupportColumnProjection.class.isAssignableFrom(tableSourceFactory.getSourceClass())));}});return featureMap;

代码格式化

mvn spotless:apply

编译打包

mvn clean package -DskipTests

至此,seatunnel web 适配 seatunnel2.3.4版本完成,对应的安装包会在 seatunnel-web-dist/target目录下生成

Linux部署测试

这里具体请参考之前社区其他老师发布的文章Apache SeaTunnel Web部署指南

重要的配置项

1、seatunnel-web数据库相关配置(application.yml) 
用来web服务中的数据持久化2、SEATUNNEL_HOME(环境变量)
seatunnel-web调用seaunnel的插件获取的API,扫描connector相关的连接器3、ST_WEB_HOME(环境变量)
seatunnel-web会加载seatunnel-web/datasource下的插件包,这里决定了seatunnel-web支持哪些数据源的任务定义4、重要的配置文件:
connector-datasource-mapper.yaml 
该配置文件配置了支持的数据源类型以及该数据源支持的数据同步方式等信息(比如是否支持多表同步、是否支持cdc等)
hazelcast-client.yaml 
seatunnel-web服务通过seatunnel-api的方式与seatunnel集群进行交互,该配置文件配置了集群节点等相关信息

感谢大家的阅读,希望对各位兄弟有所帮助,如果有任何疑问,欢迎来社区找我交流!

本文由 白鲸开源科技 提供发布支持!

这篇关于【源码编译】Apache SeaTunnel-Web 适配最新2.3.4版本教程的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/801285

相关文章

Python版本与package版本兼容性检查方法总结

《Python版本与package版本兼容性检查方法总结》:本文主要介绍Python版本与package版本兼容性检查方法的相关资料,文中提供四种检查方法,分别是pip查询、conda管理、PyP... 目录引言为什么会出现兼容性问题方法一:用 pip 官方命令查询可用版本方法二:conda 管理包环境方法

全网最全Tomcat完全卸载重装教程小结

《全网最全Tomcat完全卸载重装教程小结》windows系统卸载Tomcat重新通过ZIP方式安装Tomcat,优点是灵活可控,适合开发者自定义配置,手动配置环境变量后,可通过命令行快速启动和管理... 目录一、完全卸载Tomcat1. 停止Tomcat服务2. 通过控制面板卸载3. 手动删除残留文件4.

Python的pandas库基础知识超详细教程

《Python的pandas库基础知识超详细教程》Pandas是Python数据处理核心库,提供Series和DataFrame结构,支持CSV/Excel/SQL等数据源导入及清洗、合并、统计等功能... 目录一、配置环境二、序列和数据表2.1 初始化2.2  获取数值2.3 获取索引2.4 索引取内容2

python依赖管理工具UV的安装和使用教程

《python依赖管理工具UV的安装和使用教程》UV是一个用Rust编写的Python包安装和依赖管理工具,比传统工具(如pip)有着更快、更高效的体验,:本文主要介绍python依赖管理工具UV... 目录前言一、命令安装uv二、手动编译安装2.1在archlinux安装uv的依赖工具2.2从github

C#实现SHP文件读取与地图显示的完整教程

《C#实现SHP文件读取与地图显示的完整教程》在地理信息系统(GIS)开发中,SHP文件是一种常见的矢量数据格式,本文将详细介绍如何使用C#读取SHP文件并实现地图显示功能,包括坐标转换、图形渲染、平... 目录概述功能特点核心代码解析1. 文件读取与初始化2. 坐标转换3. 图形绘制4. 地图交互功能缩放

java 恺撒加密/解密实现原理(附带源码)

《java恺撒加密/解密实现原理(附带源码)》本文介绍Java实现恺撒加密与解密,通过固定位移量对字母进行循环替换,保留大小写及非字母字符,由于其实现简单、易于理解,恺撒加密常被用作学习加密算法的入... 目录Java 恺撒加密/解密实现1. 项目背景与介绍2. 相关知识2.1 恺撒加密算法原理2.2 Ja

Nginx屏蔽服务器名称与版本信息方式(源码级修改)

《Nginx屏蔽服务器名称与版本信息方式(源码级修改)》本文详解如何通过源码修改Nginx1.25.4,移除Server响应头中的服务类型和版本信息,以增强安全性,需重新配置、编译、安装,升级时需重复... 目录一、背景与目的二、适用版本三、操作步骤修改源码文件四、后续操作提示五、注意事项六、总结一、背景与

Android实现图片浏览功能的示例详解(附带源码)

《Android实现图片浏览功能的示例详解(附带源码)》在许多应用中,都需要展示图片并支持用户进行浏览,本文主要为大家介绍了如何通过Android实现图片浏览功能,感兴趣的小伙伴可以跟随小编一起学习一... 目录一、项目背景详细介绍二、项目需求详细介绍三、相关技术详细介绍四、实现思路详细介绍五、完整实现代码

SpringBoot集成redisson实现延时队列教程

《SpringBoot集成redisson实现延时队列教程》文章介绍了使用Redisson实现延迟队列的完整步骤,包括依赖导入、Redis配置、工具类封装、业务枚举定义、执行器实现、Bean创建、消费... 目录1、先给项目导入Redisson依赖2、配置redis3、创建 RedissonConfig 配

Python一次性将指定版本所有包上传PyPI镜像解决方案

《Python一次性将指定版本所有包上传PyPI镜像解决方案》本文主要介绍了一个安全、完整、可离线部署的解决方案,用于一次性准备指定Python版本的所有包,然后导出到内网环境,感兴趣的小伙伴可以跟随... 目录为什么需要这个方案完整解决方案1. 项目目录结构2. 创建智能下载脚本3. 创建包清单生成脚本4