如何在Spark的Worker节点中给RocketMq发送消息

2024-06-02 15:38

本文主要是介绍如何在Spark的Worker节点中给RocketMq发送消息,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

 

1.背景

    之前使用spark进行数据计算,需要将计算结果发送到rocketmq上去,有两种做法:第一种是将计算结果collect到Driver端,然后统一发送。第二种是直接在各个计算结果的partition(即foreachPartition函数)分片中发送。第一种存在的问题是,如果计算结果的数据量非常庞大,如上千万,就需要很大的内存来支持,同时增加了网络传输开销。如果是第二种就不存在这种问题,直接在worker节点发送完毕,不存在数据堆积和网络开销。

    既然说是要发送数据到rocketMQ就要说到rocketmq客户端DefaultMQProducer类,该类是没有实现java的Serializable接口的,所以无法定义一个全局变量,让各个worker直接使用该变量来发送数据,所以需要用到另一种写法——静态类工具。

2.Java序列化基本规则

    上面说到需要使用静态类工具来实现在各个partition分别发送mq消息,其理论基础就是Java序列化规则。我们知道Java在默认情况下,不会对被static和transient关键词修饰的属性进行序列化和反序列化。这个可以验证,静态属性反序列化有还是默认值,利用这个原理封装rocketmq工具。

public class JavaBean {private String name;private int version;
}public class WrapperBean implements Serializable {private static JavaBean javaBean;//由于改对象没有实现Serializable接口,所以必须定义为静态属性,否则报错private static String staticName="默认静态变量值";
}###序列化
public class JdkSerializableMain {public static void main(String[] args) {String file = "D:/demo/javabean.seri";serializable(file);}private static void serializable(String file) {ObjectOutputStream oos = null;try{oos = new ObjectOutputStream(new FileOutputStream(file));Object object = getObject();System.out.println("序列化对象:"+object.toString());oos.writeObject(object);oos.flush();}catch (Exception e){e.printStackTrace();}finally {if(oos !=  null){try {oos.close();} catch (IOException e) {e.printStackTrace();}}}}private static Object getObject() {JavaBean javaBean = new JavaBean("Java设计原本", 44);WrapperBean wb = new WrapperBean(javaBean,"修改后的静态变量值");return wb;}
}
#####反序列化
public class JdkDeSerializableMain {public static void main(String[] args) {String file = "D:/demo/javabean.seri";deserializable(file);}private static void deserializable(String file) {ObjectInputStream ois = null;try{ois = new ObjectInputStream(new FileInputStream(file));Object o = ois.readObject();if(o != null){System.out.println("Class :"+o.getClass());WrapperBean jb = (WrapperBean)o;System.out.println("反序列化结果:"+jb.toString());}}catch (Exception e){e.printStackTrace();}finally {if(ois != null){try {ois.close();} catch (IOException e) {e.printStackTrace();}}}}
}结果:
序列化对象:WrapperBean{javaBean=JavaBean{name='Java设计原本', version=44}},staticName=修改后的静态变量值
反序列化结果:WrapperBean{javaBean=null},staticName=默认静态变量值

3.RocketMq工具

    该工具利用静态属性无法被序列化原理,在各个worker节点中调用getInstance()方法时,实际拿到的是该worker节点加载RocketMqUtils初始化静态代码块拿到的DefaultMQProducer实例,所以可以正常在foreachPartition()中调用发送rocketmq消息

 

public class RocketMqUtils implements Serializable {
 
    private static Logger log = LoggerFactory.getLogger(RocketMqUtils.class);
 
    private static DefaultMQProducer producer=null;
    private static  RocketMqUtils rocketMqUtils = null;
    static {
        ClassPathResource classPathResource = new ClassPathResource("/task-config.properties");
        Properties properties = null;
        try {
            properties = PropertiesLoaderUtils.loadProperties(classPathResource);
            String address = properties.getProperty("mq.namesrvAddr");
            String produceGroup = properties.getProperty("mq.producerGroup");
            log.info("初始化RocketMq,Address={},producerGroup={}",address,produceGroup);
            producer = new DefaultMQProducer(produceGroup);
            producer.setNamesrvAddr(address);
            producer.start();
        } catch (Exception e) {
            log.error("初始化RocketMq失败",e);
        }
    }
 
    public static synchronized RocketMqUtils getInstance(){
        if(rocketMqUtils ==null){
            rocketMqUtils = new RocketMqUtils();
        }
        return rocketMqUtils;
    }
 
    public static void main(String[] args) throws Exception {
        RocketMqUtils rm = new RocketMqUtils();
        Message msg = new Message();
        msg.setTopic("test_jcc");
        msg.setTags("jcc");
        msg.setKeys("kkk");
        msg.setBody("test msg".getBytes());
        rm.sendMsg(msg);
        rm.shutDownMq();
    }
 
    public  void sendMsg(Message msg) throws Exception {
        try {
            SendResult sendResult = producer.send(msg);
            log.info("sendMsg = " + sendResult.toString());
            System.out.println(sendResult.toString());
        } catch (Exception var3) {
            log.error("MQ send ERROR", var3);
            throw new Exception("操作MQ出错!");
        }
    }
 
    public void shutDownMq(){
        if (producer != null){
            producer.shutdown();
        }
    }
}

 

转载于:https://my.oschina.net/u/1159254/blog/2999520

这篇关于如何在Spark的Worker节点中给RocketMq发送消息的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1024387

相关文章

基于Python实现自动化邮件发送系统的完整指南

《基于Python实现自动化邮件发送系统的完整指南》在现代软件开发和自动化流程中,邮件通知是一个常见且实用的功能,无论是用于发送报告、告警信息还是用户提醒,通过Python实现自动化的邮件发送功能都能... 目录一、前言:二、项目概述三、配置文件 `.env` 解析四、代码结构解析1. 导入模块2. 加载环

使用Python的requests库来发送HTTP请求的操作指南

《使用Python的requests库来发送HTTP请求的操作指南》使用Python的requests库发送HTTP请求是非常简单和直观的,requests库提供了丰富的API,可以发送各种类型的HT... 目录前言1. 安装 requests 库2. 发送 GET 请求3. 发送 POST 请求4. 发送

聊聊springboot中如何自定义消息转换器

《聊聊springboot中如何自定义消息转换器》SpringBoot通过HttpMessageConverter处理HTTP数据转换,支持多种媒体类型,接下来通过本文给大家介绍springboot中... 目录核心接口springboot默认提供的转换器如何自定义消息转换器Spring Boot 中的消息

基于Python编写自动化邮件发送程序(进阶版)

《基于Python编写自动化邮件发送程序(进阶版)》在数字化时代,自动化邮件发送功能已成为企业和个人提升工作效率的重要工具,本文将使用Python编写一个简单的自动化邮件发送程序,希望对大家有所帮助... 目录理解SMTP协议基础配置开发环境构建邮件发送函数核心逻辑实现完整发送流程添加附件支持功能实现htm

Django开发时如何避免频繁发送短信验证码(python图文代码)

《Django开发时如何避免频繁发送短信验证码(python图文代码)》Django开发时,为防止频繁发送验证码,后端需用Redis限制请求频率,结合管道技术提升效率,通过生产者消费者模式解耦业务逻辑... 目录避免频繁发送 验证码1. www.chinasem.cn避免频繁发送 验证码逻辑分析2. 避免频繁

解决RocketMQ的幂等性问题

《解决RocketMQ的幂等性问题》重复消费因调用链路长、消息发送超时或消费者故障导致,通过生产者消息查询、Redis缓存及消费者唯一主键可以确保幂等性,避免重复处理,本文主要介绍了解决RocketM... 目录造成重复消费的原因解决方法生产者端消费者端代码实现造成重复消费的原因当系统的调用链路比较长的时

python运用requests模拟浏览器发送请求过程

《python运用requests模拟浏览器发送请求过程》模拟浏览器请求可选用requests处理静态内容,selenium应对动态页面,playwright支持高级自动化,设置代理和超时参数,根据需... 目录使用requests库模拟浏览器请求使用selenium自动化浏览器操作使用playwright

RabbitMQ消息总线方式刷新配置服务全过程

《RabbitMQ消息总线方式刷新配置服务全过程》SpringCloudBus通过消息总线与MQ实现微服务配置统一刷新,结合GitWebhooks自动触发更新,避免手动重启,提升效率与可靠性,适用于配... 目录前言介绍环境准备代码示例测试验证总结前言介绍在微服务架构中,为了更方便的向微服务实例广播消息,

Python办公自动化实战之打造智能邮件发送工具

《Python办公自动化实战之打造智能邮件发送工具》在数字化办公场景中,邮件自动化是提升工作效率的关键技能,本文将演示如何使用Python的smtplib和email库构建一个支持图文混排,多附件,多... 目录前言一、基础配置:搭建邮件发送框架1.1 邮箱服务准备1.2 核心库导入1.3 基础发送函数二、

java向微信服务号发送消息的完整步骤实例

《java向微信服务号发送消息的完整步骤实例》:本文主要介绍java向微信服务号发送消息的相关资料,包括申请测试号获取appID/appsecret、关注公众号获取openID、配置消息模板及代码... 目录步骤1. 申请测试系统2. 公众号账号信息3. 关注测试号二维码4. 消息模板接口5. Java测试