如何在Spark的Worker节点中给RocketMq发送消息

2024-06-02 15:38

本文主要是介绍如何在Spark的Worker节点中给RocketMq发送消息,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

 

1.背景

    之前使用spark进行数据计算,需要将计算结果发送到rocketmq上去,有两种做法:第一种是将计算结果collect到Driver端,然后统一发送。第二种是直接在各个计算结果的partition(即foreachPartition函数)分片中发送。第一种存在的问题是,如果计算结果的数据量非常庞大,如上千万,就需要很大的内存来支持,同时增加了网络传输开销。如果是第二种就不存在这种问题,直接在worker节点发送完毕,不存在数据堆积和网络开销。

    既然说是要发送数据到rocketMQ就要说到rocketmq客户端DefaultMQProducer类,该类是没有实现java的Serializable接口的,所以无法定义一个全局变量,让各个worker直接使用该变量来发送数据,所以需要用到另一种写法——静态类工具。

2.Java序列化基本规则

    上面说到需要使用静态类工具来实现在各个partition分别发送mq消息,其理论基础就是Java序列化规则。我们知道Java在默认情况下,不会对被static和transient关键词修饰的属性进行序列化和反序列化。这个可以验证,静态属性反序列化有还是默认值,利用这个原理封装rocketmq工具。

public class JavaBean {private String name;private int version;
}public class WrapperBean implements Serializable {private static JavaBean javaBean;//由于改对象没有实现Serializable接口,所以必须定义为静态属性,否则报错private static String staticName="默认静态变量值";
}###序列化
public class JdkSerializableMain {public static void main(String[] args) {String file = "D:/demo/javabean.seri";serializable(file);}private static void serializable(String file) {ObjectOutputStream oos = null;try{oos = new ObjectOutputStream(new FileOutputStream(file));Object object = getObject();System.out.println("序列化对象:"+object.toString());oos.writeObject(object);oos.flush();}catch (Exception e){e.printStackTrace();}finally {if(oos !=  null){try {oos.close();} catch (IOException e) {e.printStackTrace();}}}}private static Object getObject() {JavaBean javaBean = new JavaBean("Java设计原本", 44);WrapperBean wb = new WrapperBean(javaBean,"修改后的静态变量值");return wb;}
}
#####反序列化
public class JdkDeSerializableMain {public static void main(String[] args) {String file = "D:/demo/javabean.seri";deserializable(file);}private static void deserializable(String file) {ObjectInputStream ois = null;try{ois = new ObjectInputStream(new FileInputStream(file));Object o = ois.readObject();if(o != null){System.out.println("Class :"+o.getClass());WrapperBean jb = (WrapperBean)o;System.out.println("反序列化结果:"+jb.toString());}}catch (Exception e){e.printStackTrace();}finally {if(ois != null){try {ois.close();} catch (IOException e) {e.printStackTrace();}}}}
}结果:
序列化对象:WrapperBean{javaBean=JavaBean{name='Java设计原本', version=44}},staticName=修改后的静态变量值
反序列化结果:WrapperBean{javaBean=null},staticName=默认静态变量值

3.RocketMq工具

    该工具利用静态属性无法被序列化原理,在各个worker节点中调用getInstance()方法时,实际拿到的是该worker节点加载RocketMqUtils初始化静态代码块拿到的DefaultMQProducer实例,所以可以正常在foreachPartition()中调用发送rocketmq消息

 

public class RocketMqUtils implements Serializable {
 
    private static Logger log = LoggerFactory.getLogger(RocketMqUtils.class);
 
    private static DefaultMQProducer producer=null;
    private static  RocketMqUtils rocketMqUtils = null;
    static {
        ClassPathResource classPathResource = new ClassPathResource("/task-config.properties");
        Properties properties = null;
        try {
            properties = PropertiesLoaderUtils.loadProperties(classPathResource);
            String address = properties.getProperty("mq.namesrvAddr");
            String produceGroup = properties.getProperty("mq.producerGroup");
            log.info("初始化RocketMq,Address={},producerGroup={}",address,produceGroup);
            producer = new DefaultMQProducer(produceGroup);
            producer.setNamesrvAddr(address);
            producer.start();
        } catch (Exception e) {
            log.error("初始化RocketMq失败",e);
        }
    }
 
    public static synchronized RocketMqUtils getInstance(){
        if(rocketMqUtils ==null){
            rocketMqUtils = new RocketMqUtils();
        }
        return rocketMqUtils;
    }
 
    public static void main(String[] args) throws Exception {
        RocketMqUtils rm = new RocketMqUtils();
        Message msg = new Message();
        msg.setTopic("test_jcc");
        msg.setTags("jcc");
        msg.setKeys("kkk");
        msg.setBody("test msg".getBytes());
        rm.sendMsg(msg);
        rm.shutDownMq();
    }
 
    public  void sendMsg(Message msg) throws Exception {
        try {
            SendResult sendResult = producer.send(msg);
            log.info("sendMsg = " + sendResult.toString());
            System.out.println(sendResult.toString());
        } catch (Exception var3) {
            log.error("MQ send ERROR", var3);
            throw new Exception("操作MQ出错!");
        }
    }
 
    public void shutDownMq(){
        if (producer != null){
            producer.shutdown();
        }
    }
}

 

转载于:https://my.oschina.net/u/1159254/blog/2999520

这篇关于如何在Spark的Worker节点中给RocketMq发送消息的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1024387

相关文章

Python使用smtplib库开发一个邮件自动发送工具

《Python使用smtplib库开发一个邮件自动发送工具》在现代软件开发中,自动化邮件发送是一个非常实用的功能,无论是系统通知、营销邮件、还是日常工作报告,Python的smtplib库都能帮助我们... 目录代码实现与知识点解析1. 导入必要的库2. 配置邮件服务器参数3. 创建邮件发送类4. 实现邮件

C++ RabbitMq消息队列组件详解

《C++RabbitMq消息队列组件详解》:本文主要介绍C++RabbitMq消息队列组件的相关知识,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧... 目录1. RabbitMq介绍2. 安装RabbitMQ3. 安装 RabbitMQ 的 C++客户端库4. A

SpringCloud整合MQ实现消息总线服务方式

《SpringCloud整合MQ实现消息总线服务方式》:本文主要介绍SpringCloud整合MQ实现消息总线服务方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐... 目录一、背景介绍二、方案实践三、升级版总结一、背景介绍每当修改配置文件内容,如果需要客户端也同步更新,

使用Python和SQLAlchemy实现高效的邮件发送系统

《使用Python和SQLAlchemy实现高效的邮件发送系统》在现代Web应用中,邮件通知是不可或缺的功能之一,无论是订单确认、文件处理结果通知,还是系统告警,邮件都是最常用的通信方式之一,本文将详... 目录引言1. 需求分析2. 数据库设计2.1 User 表(存储用户信息)2.2 CustomerO

一文带你搞懂Redis Stream的6种消息处理模式

《一文带你搞懂RedisStream的6种消息处理模式》Redis5.0版本引入的Stream数据类型,为Redis生态带来了强大而灵活的消息队列功能,本文将为大家详细介绍RedisStream的6... 目录1. 简单消费模式(Simple Consumption)基本概念核心命令实现示例使用场景优缺点2

Redis消息队列实现异步秒杀功能

《Redis消息队列实现异步秒杀功能》在高并发场景下,为了提高秒杀业务的性能,可将部分工作交给Redis处理,并通过异步方式执行,Redis提供了多种数据结构来实现消息队列,总结三种,本文详细介绍Re... 目录1 Redis消息队列1.1 List 结构1.2 Pub/Sub 模式1.3 Stream 结

在Android平台上实现消息推送功能

《在Android平台上实现消息推送功能》随着移动互联网应用的飞速发展,消息推送已成为移动应用中不可或缺的功能,在Android平台上,实现消息推送涉及到服务端的消息发送、客户端的消息接收、通知渠道(... 目录一、项目概述二、相关知识介绍2.1 消息推送的基本原理2.2 Firebase Cloud Me

SpringKafka消息发布之KafkaTemplate与事务支持功能

《SpringKafka消息发布之KafkaTemplate与事务支持功能》通过本文介绍的基本用法、序列化选项、事务支持、错误处理和性能优化技术,开发者可以构建高效可靠的Kafka消息发布系统,事务支... 目录引言一、KafkaTemplate基础二、消息序列化三、事务支持机制四、错误处理与重试五、性能优

SpringIntegration消息路由之Router的条件路由与过滤功能

《SpringIntegration消息路由之Router的条件路由与过滤功能》本文详细介绍了Router的基础概念、条件路由实现、基于消息头的路由、动态路由与路由表、消息过滤与选择性路由以及错误处理... 目录引言一、Router基础概念二、条件路由实现三、基于消息头的路由四、动态路由与路由表五、消息过滤

如何使用C#串口通讯实现数据的发送和接收

《如何使用C#串口通讯实现数据的发送和接收》本文详细介绍了如何使用C#实现基于串口通讯的数据发送和接收,通过SerialPort类,我们可以轻松实现串口通讯,并结合事件机制实现数据的传递和处理,感兴趣... 目录1. 概述2. 关键技术点2.1 SerialPort类2.2 异步接收数据2.3 数据解析2.