Kettle-ActiveMQ Product插件开发笔记

2024-04-29 08:32

本文主要是介绍Kettle-ActiveMQ Product插件开发笔记,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

ActiveMQ Product插件开发笔记

概览

前提

该插件基于kettle 8.1.0.0-365 开发

如果是其他版本,不保证可用。(由于继承的BaseStreamingDialog等父类会随版本而变化)

本插件模仿官方Kafka插件源码编写:

https://github.com/pentaho/big-data-plugin/tree/master/kettle-plugins/kafka

暂不支持topic,需要的可自行修改源码(工程量应该不大)。

必备模板

相对ActiveMQ Consumer插件,Product插件相对来说简单多了。

由于Product不需要阻塞,所以就当作普通插件来开发。集成官方推荐的父类即可。

  • ActiveMQProduct extends BaseStep implements StepInterface
  • ActiveMQProductData extends BaseStepData implements StepDataInterface
  • ActiveMQProductDialog extends BaseStepDialog implements StepDialogInterface
  • ActiveMQProductMeta extends BaseStepMeta implements StepMetaInterface

ActiveMQProductMeta

关键属性的话就下面4个,是获取ActiveMQ 连接和消费数据必备的属性

/*** 连接地址*/
@Injection(name = "BROKER_URL")
private String brokerUrl;
/*** 队列名称*/
@Injection(name = "QUEUE")
private String queue;
/*** 发送的字段*/
@Injection(name = "MSG")
private String msgField;
/*** 存放xml 中的advancedConfig option*/
private Map<String, String> config = new LinkedHashMap<>();
  • msgField: 这个字段是从前一个步骤获取的。这个字段对应的值就是我们发送到AMQ的值,所以很重要。

然后就是模板方法,也是必备的:

  • getXML()
  • loadXML()
  • saveRep()
  • readRep()

ActiveMQProductDialog

继承自普通的BaseStepDialog

注意构造方法:将Object强转成BaseStepMeta 和 ActiveMQProductMeta

public ActiveMQProductDialog(Shell parent, Object in, TransMeta transMeta, String stepname) {super(parent, (BaseStepMeta) in, transMeta, stepname);this.meta = (ActiveMQProductMeta) in;
}

唯一需要实现的是open()方法。open()很多代码可以直接copy过来。

需要自己实现SetupOptions标签

其中Setup中的Message是需要从前一个步骤获取的,代码如下:

wMsgField = new ComboVar(transMeta, wSetupComp, SWT.SINGLE | SWT.LEFT | SWT.BORDER);
props.setLook(wMsgField);
wMsgField.addModifyListener(lsMod);
FormData fdMsgField = new FormData();
fdMsgField.left = new FormAttachment(0, 0);
fdMsgField.top = new FormAttachment(wlMsgField, 5);
fdMsgField.right = new FormAttachment(0, INPUT_WIDTH);
wMsgField.setLayoutData(fdMsgField);
Listener lsMsgFocus = event -> {String current = wMsgField.getText();wMsgField.getCComboWidget().removeAll();wMsgField.setText(current);//重要的地方:从前个步骤获取字段try {RowMetaInterface rmi = transMeta.getPrevStepFields(stepname);//上一步骤的所有列-添加到下拉框中final List<ValueMetaInterface> ls = rmi.getValueMetaList();for (int i = 0; i < ls.size(); i++) {final ValueMetaBase vmb = (ValueMetaBase) ls.get(i);wMsgField.add(vmb.getName());}} catch (KettleStepException e) {e.printStackTrace();}
};
wMsgField.getCComboWidget().addListener(SWT.FocusIn, lsMsgFocus);

剩下的就没什么好说的了。

ActiveMQProduct

继承自 BaseStep,所以需要实现 init()processRow()

init

固定的格式啊

@Override
public boolean init(StepMetaInterface smi, StepDataInterface sdi) {super.init(smi, sdi);meta = (ActiveMQProductMeta) smi;data = (ActiveMQProductData) sdi;return true;
}

processRow

if (first) {//找出我们选择的Message列在上一步骤中排第几列,存储到ActiveMQProductData.msgFieldIndexdata.msgFieldIndex = getInputRowMeta().indexOfValue(environmentSubstitute(meta.getMsgField()));try {//还要创建AMQ连接,因为连接只需在刚开始时创建就行了,不要重复创建data.conn = ActiveMQFactory.getConn(meta.getActiveMQEntity());} catch (JMSException e) {//如果创建失败,就直接退出log.logError(e.getMessage(), e);setOutputDone();return false;}first = false;
}
//r表示上一步骤传递过来的数据,在初始化的时候我们已经知道要去哪一列拿目标数据了
//所以这里的content就是我们要发送的数据
String content = (String) r[data.msgFieldIndex];
TextMessage msg = session.createTextMessage(content);
producer.send(msg);
//记得提交给AMQ
session.commit();
//提交后记录+1
incrementLinesOutput();
//表示在此步骤后还可以接上另一个步骤(原封不动地把上一步骤的数据转发到下一步骤)
putRow(getInputRowMeta(), r);

这篇关于Kettle-ActiveMQ Product插件开发笔记的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/945594

相关文章

SpringBoot 多环境开发实战(从配置、管理与控制)

《SpringBoot多环境开发实战(从配置、管理与控制)》本文详解SpringBoot多环境配置,涵盖单文件YAML、多文件模式、MavenProfile分组及激活策略,通过优先级控制灵活切换环境... 目录一、多环境开发基础(单文件 YAML 版)(一)配置原理与优势(二)实操示例二、多环境开发多文件版

使用docker搭建嵌入式Linux开发环境

《使用docker搭建嵌入式Linux开发环境》本文主要介绍了使用docker搭建嵌入式Linux开发环境,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面... 目录1、前言2、安装docker3、编写容器管理脚本4、创建容器1、前言在日常开发全志、rk等不同

RabbitMQ 延时队列插件安装与使用示例详解(基于 Delayed Message Plugin)

《RabbitMQ延时队列插件安装与使用示例详解(基于DelayedMessagePlugin)》本文详解RabbitMQ通过安装rabbitmq_delayed_message_exchan... 目录 一、什么是 RabbitMQ 延时队列? 二、安装前准备✅ RabbitMQ 环境要求 三、安装延时队

Python实战之SEO优化自动化工具开发指南

《Python实战之SEO优化自动化工具开发指南》在数字化营销时代,搜索引擎优化(SEO)已成为网站获取流量的重要手段,本文将带您使用Python开发一套完整的SEO自动化工具,需要的可以了解下... 目录前言项目概述技术栈选择核心模块实现1. 关键词研究模块2. 网站技术seo检测模块3. 内容优化分析模

基于Java开发一个极简版敏感词检测工具

《基于Java开发一个极简版敏感词检测工具》这篇文章主要为大家详细介绍了如何基于Java开发一个极简版敏感词检测工具,文中的示例代码简洁易懂,感兴趣的小伙伴可以跟随小编一起学习一下... 目录你是否还在为敏感词检测头疼一、极简版Java敏感词检测工具的3大核心优势1.1 优势1:DFA算法驱动,效率提升10

Python学习笔记之getattr和hasattr用法示例详解

《Python学习笔记之getattr和hasattr用法示例详解》在Python中,hasattr()、getattr()和setattr()是一组内置函数,用于对对象的属性进行操作和查询,这篇文章... 目录1.getattr用法详解1.1 基本作用1.2 示例1.3 原理2.hasattr用法详解2.

Python开发简易网络服务器的示例详解(新手入门)

《Python开发简易网络服务器的示例详解(新手入门)》网络服务器是互联网基础设施的核心组件,它本质上是一个持续运行的程序,负责监听特定端口,本文将使用Python开发一个简单的网络服务器,感兴趣的小... 目录网络服务器基础概念python内置服务器模块1. HTTP服务器模块2. Socket服务器模块

Java 与 LibreOffice 集成开发指南(环境搭建及代码示例)

《Java与LibreOffice集成开发指南(环境搭建及代码示例)》本文介绍Java与LibreOffice的集成方法,涵盖环境配置、API调用、文档转换、UNO桥接及REST接口等技术,提供... 目录1. 引言2. 环境搭建2.1 安装 LibreOffice2.2 配置 Java 开发环境2.3 配

Python38个游戏开发库整理汇总

《Python38个游戏开发库整理汇总》文章介绍了多种Python游戏开发库,涵盖2D/3D游戏开发、多人游戏框架及视觉小说引擎,适合不同需求的开发者入门,强调跨平台支持与易用性,并鼓励读者交流反馈以... 目录PyGameCocos2dPySoyPyOgrepygletPanda3DBlenderFife

使用Python开发一个Ditto剪贴板数据导出工具

《使用Python开发一个Ditto剪贴板数据导出工具》在日常工作中,我们经常需要处理大量的剪贴板数据,下面将介绍如何使用Python的wxPython库开发一个图形化工具,实现从Ditto数据库中读... 目录前言运行结果项目需求分析技术选型核心功能实现1. Ditto数据库结构分析2. 数据库自动定位3