springboot 优雅使用函数式编程处理 websocket @OnMessage 消息

本文主要是介绍springboot 优雅使用函数式编程处理 websocket @OnMessage 消息,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

背景

现在大多业务功能使用 socket.io实现长连接,但是部分第三方设备对接 只支持基础的websocket。
spring中使用基础的websocket, @OnMessage 收到消息,对消息的处理,if else 将会繁琐,难以维护。

本文仅介绍了如何使用enum枚举、java.util.function jdk8 函数式接口,实现消息的处理。

websocket 定义JSON 数据交换格式

本文使用的 示例格式:

//连接成功
{"cmd":"connect","sn":"A7888","data":{...}}
//设置人员
{"cmd":"setUser","data":{"userId":"1"}}
//控制设备 --多层级 的格式,第二层里面解析 仍可按照同样的方式来处理
{"cmd":"to_client","data":{"type":"openDoor","value":"ON"}}

springboot 集成websocket

pom.xml 依赖
        <!-- spring websocket--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId></dependency><!-- spring websocket启动异常、排除spring-boot-starter-tomcat--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId><exclusions><exclusion><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-tomcat</artifactId></exclusion></exclusions></dependency>       
定义WebSocketConfig
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;@Configuration
public class WebSocketConfig {@Beanpublic ServerEndpointExporter serverEndpointExporter() {return new ServerEndpointExporter();}
}
定义 @ServerEndpoint
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.websocket.OnOpen;
import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnMessage;
import javax.websocket.Session;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;/*** 定义websocket端点*/
@Slf4j
@ServerEndpoint(value = "/socket/device")
@Component
public class DeviceServerEndpoint {/*** 记录当前在线连接数*/private static AtomicInteger onlineCount = new AtomicInteger(0);/*** 连接的对象*/public static final Map<String, Session> clientMap = new ConcurrentHashMap<>();/*** 收到客户端消息** @param message 客户端发送过来的消息* @throws*/@OnMessagepublic void onMessage(String message, Session session) {try {DeviceMsg deviceMsg = JSON.parseObject(message, DeviceMsg.class);if (deviceMsg != null && deviceMsg.getCmd() != null) {// jdk8 函数式处理消息deviceMsg.getCmd().consumer.accept(session, message);} else {log.info("无法自动处理,客户端消息:{}", message);}} catch (Exception e) {log.error("消息处理失败", session.getId(), message);e.printStackTrace();}}/*** 连接建立成功*/@OnOpenpublic void onOpen(Session session) {onlineCount.incrementAndGet(); // 在线数加1log.info("有新连接加入:{},当前在线数为:{}", session.getId(), onlineCount.get());}/*** 连接关闭*/@OnClosepublic void onClose(Session session) {onlineCount.decrementAndGet(); // 在线数减1log.info("有一连接关闭:{},当前在线数为:{}", session.getId(), onlineCount.get());}@OnErrorpublic void onError(Session session, Throwable error) {onlineCount.decrementAndGet(); // 在线数减1error.printStackTrace();}/*** 测试 控制开锁*/public static void openDoor() {//所有客户端发送消息clientMap.forEach((id, session) -> {session.getBasicRemote().sendText("ON");}}
deviceMsg实体类
import lombok.Data;
import java.io.Serializable;@Data
public class DeviceMsg implements Serializable {/*** 指令*/Cmd cmd;/*** 数据块*/JSONObject data;
}
Cmd 核心消息处理 枚举类

import com.alibaba.fastjson.JSON;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import javax.websocket.Session;
import java.util.function.BiConsumer;@Getter
@Slf4j
@AllArgsConstructor
public enum Cmd {connect("设备连接成功", (Session session, String msg) -> {//设备端连接成功,发送设置端消息,将Session记录起来DeviceServerEndpoint.clientMap.put(session.getId(), session);}),ping("设备心跳", (Session session, String msg) -> {session.getBasicRemote().sendText("pong");}),setUser("配置用户", (Session session, String msg) -> {//拿到msg 转换对象或者其他操作session.getBasicRemote().sendText("ok");}),to_client("客户端消息", (Session session, String msg) -> {		try {String string = JSON.parseObject(msg).getJSONObject("data").getString("type");Client clientCmd = Client.valueOf(string);clientCmd.consumer.accept(session, msg);} catch (Exception e) {log.info("to_client客户端消息,无法自动处理:{}", msg);}});/*** 描述*/String desc;/*** 处理*/BiConsumer<Session, String> consumer;
}
Client 消息处理枚举类

import com.alibaba.fastjson.JSON;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;import javax.websocket.Session;
import java.util.function.BiConsumer;@Getter
@Slf4j
@AllArgsConstructor
public enum Client{openDoor("客户端控制", (Session session, String msg) -> {//测试 连接成功 直接 开门DeviceServerEndpoint.openDoor();}),otherCmd("其他指令", (Session session, String msg) -> {session.getBasicRemote().sendText("ok");});/*** 描述*/String desc;/*** 处理*/BiConsumer<Session, String> consumer;
}

JDK8常用函数式编程接口介绍:

  • Function<T, R>:接受一个类型为 T 的参数,返回类型为 R 的结果。常用方法包括 apply(T t)。
  • Predicate:接受一个类型为 T 的参数,返回一个布尔值。常用方法包括 test(T t)。
  • Consumer:接受一个类型为 T 的参数,没有返回值。常用方法包括 accept(T t)。
  • Supplier:不接受任何参数,返回一个类型为 T 的结果。常用方法包括 get()。
  • UnaryOperator:继承自 Function<T, R>,接受一个类型为 T 的参数,返回类型也为 T 的结果。常用方法包括 apply(T t)。
  • BinaryOperator:继承自 BiFunction<T, U, R>,接受两个类型为 T 的参数,返回类型也为 T 的结果。常用方法包括 apply(T t1, T t2)。
  • BiFunction<T, U, R>:接受两个参数,一个类型为 T,一个类型为 U,返回类型为 R 的结果。常用方法包括 apply(T t, U u)。
  • BiPredicate<T, U>:接受两个参数,一个类型为 T,一个类型为 U,返回一个布尔值。常用方法包括 test(T t, U u)。
  • BiConsumer<T, U> :用于接受两个参数,一个类型为 T,一个类型为 U,并且没有返回值。

函数式编程接口的引入,使得在 Java 中能够更方便地实现函数式编程的特性,如Lambda表达式和方法引用。它们可以用于各种场景,例如集合的处理、条件判断、函数的组合等。通过使用这些接口,可以编写更简洁、可读性更高的代码。

附:
WebSocket介绍

这篇关于springboot 优雅使用函数式编程处理 websocket @OnMessage 消息的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/649514

相关文章

Python 函数详解:从基础语法到高级使用技巧

《Python函数详解:从基础语法到高级使用技巧》本文基于实例代码,全面讲解Python函数的定义、参数传递、变量作用域及类型标注等知识点,帮助初学者快速掌握函数的使用技巧,感兴趣的朋友跟随小编一起... 目录一、函数的基本概念与作用二、函数的定义与调用1. 无参函数2. 带参函数3. 带返回值的函数4.

MySQL中DATE_FORMAT时间函数的使用小结

《MySQL中DATE_FORMAT时间函数的使用小结》本文主要介绍了MySQL中DATE_FORMAT时间函数的使用小结,用于格式化日期/时间字段,可提取年月、统计月份数据、精确到天,对大家的学习或... 目录前言DATE_FORMAT时间函数总结前言mysql可以使用DATE_FORMAT获取日期字段

在 Spring Boot 中连接 MySQL 数据库的详细步骤

《在SpringBoot中连接MySQL数据库的详细步骤》本文介绍了SpringBoot连接MySQL数据库的流程,添加依赖、配置连接信息、创建实体类与仓库接口,通过自动配置实现数据库操作,... 目录一、添加依赖二、配置数据库连接三、创建实体类四、创建仓库接口五、创建服务类六、创建控制器七、运行应用程序八

基于Spring Boot 的小区人脸识别与出入记录管理系统功能

《基于SpringBoot的小区人脸识别与出入记录管理系统功能》文章介绍基于SpringBoot框架与百度AI人脸识别API的小区出入管理系统,实现自动识别、记录及查询功能,涵盖技术选型、数据模型... 目录系统功能概述技术栈选择核心依赖配置数据模型设计出入记录实体类出入记录查询表单出入记录 VO 类(用于

深入解析Java NIO在高并发场景下的性能优化实践指南

《深入解析JavaNIO在高并发场景下的性能优化实践指南》随着互联网业务不断演进,对高并发、低延时网络服务的需求日益增长,本文将深入解析JavaNIO在高并发场景下的性能优化方法,希望对大家有所帮助... 目录简介一、技术背景与应用场景二、核心原理深入分析2.1 Selector多路复用2.2 Buffer

Java中数组与栈和堆之间的关系说明

《Java中数组与栈和堆之间的关系说明》文章讲解了Java数组的初始化方式、内存存储机制、引用传递特性及遍历、排序、拷贝技巧,强调引用数据类型方法调用时形参可能修改实参,但需注意引用指向单一对象的特性... 目录Java中数组与栈和堆的关系遍历数组接下来是一些编程小技巧总结Java中数组与栈和堆的关系关于

SpringBoot利用树形结构优化查询速度

《SpringBoot利用树形结构优化查询速度》这篇文章主要为大家详细介绍了SpringBoot利用树形结构优化查询速度,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录一个真实的性能灾难传统方案为什么这么慢N+1查询灾难性能测试数据对比核心解决方案:一次查询 + O(n)算法解决

Go语言使用sync.Mutex实现资源加锁

《Go语言使用sync.Mutex实现资源加锁》数据共享是一把双刃剑,Go语言为我们提供了sync.Mutex,一种最基础也是最常用的加锁方式,用于保证在任意时刻只有一个goroutine能访问共享... 目录一、什么是 Mutex二、为什么需要加锁三、实战案例:并发安全的计数器1. 未加锁示例(存在竞态)

SpringBoot实现虚拟线程的方案

《SpringBoot实现虚拟线程的方案》Java19引入虚拟线程,本文就来介绍一下SpringBoot实现虚拟线程的方案,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,... 目录什么是虚拟线程虚拟线程和普通线程的区别SpringBoot使用虚拟线程配置@Async性能对比H

javaSE类和对象进阶用法举例详解

《javaSE类和对象进阶用法举例详解》JavaSE的面向对象编程是软件开发中的基石,它通过类和对象的概念,实现了代码的模块化、可复用性和灵活性,:本文主要介绍javaSE类和对象进阶用法的相关资... 目录前言一、封装1.访问限定符2.包2.1包的概念2.2导入包2.3自定义包2.4常见的包二、stati