自定义基于netty的rpc框架(4)---zk和utils以及protocol的实现

2024-04-03 18:48

本文主要是介绍自定义基于netty的rpc框架(4)---zk和utils以及protocol的实现,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

1、zk的实现

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"><parent><artifactId>rpc.demo</artifactId><groupId>tj.cmcc.org</groupId><version>1.0-SNAPSHOT</version></parent><modelVersion>4.0.0</modelVersion><artifactId>rpc-zk</artifactId><packaging>jar</packaging><name>rpc-zk Maven Webapp</name><url>http://maven.apache.org</url><dependencies><!-- SLF4J --><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId></dependency><!-- ZooKeeper --><dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId></dependency></dependencies><build><finalName>rpc-zk</finalName></build>
</project>

cn.tianjun.zk.Constant

package cn.tianjun.zk;/*** 常量*/
public class Constant {public static final int ZK_SESSION_TIMEOUT = 5000;//zk超时时间public static final String ZK_REGISTRY_PATH = "/registry";//注册节点public static final String ZK_DATA_PATH = ZK_REGISTRY_PATH + "/data";//节点
}

cn.tianjun.zk.ServiceDiscovery

package cn.tianjun.zk;import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;/*** 本类用于client发现server节点的变化 ,实现负载均衡**/
public class ServiceDiscovery {private static final Logger LOGGER = LoggerFactory.getLogger(ServiceDiscovery.class);private CountDownLatch latch = new CountDownLatch(1);private volatile List<String> dataList = new ArrayList<String>();private String registryAddress;/*** zk链接** @param registryAddress*/public ServiceDiscovery(String registryAddress) {this.registryAddress = registryAddress;ZooKeeper zk = connectServer();if (zk != null) {watchNode(zk);}}/*** 发现新节点** @return*/public String discover() {String data = null;int size = dataList.size();// 存在新节点,使用即可if (size > 0) {if (size == 1) {data = dataList.get(0);LOGGER.debug("using only data: {}", data);} else {data = dataList.get(ThreadLocalRandom.current().nextInt(size));LOGGER.debug("using random data: {}", data);}}return data;}/*** 链接** @return*/private ZooKeeper connectServer() {ZooKeeper zk = null;try {zk = new ZooKeeper(registryAddress, Constant.ZK_SESSION_TIMEOUT,new Watcher() {public void process(WatchedEvent event) {if (event.getState() == Event.KeeperState.SyncConnected) {latch.countDown();}}});latch.await();} catch (Exception e) {LOGGER.error("", e);}return zk;}/*** 监听** @param zk*/private void watchNode(final ZooKeeper zk) {try {// 获取所有子节点List<String> nodeList = zk.getChildren(Constant.ZK_REGISTRY_PATH,new Watcher() {public void process(WatchedEvent event) {// 节点改变if (event.getType() == Event.EventType.NodeChildrenChanged) {watchNode(zk);}}});List<String> dataList = new ArrayList<String>();// 循环子节点for (String node : nodeList) {// 获取节点中的服务器地址byte[] bytes = zk.getData(Constant.ZK_REGISTRY_PATH + "/"+ node, false, null);// 存储到list中dataList.add(new String(bytes));}LOGGER.debug("node data: {}", dataList);// 将节点信息记录在成员变量this.dataList = dataList;} catch (Exception e) {LOGGER.error("", e);}}
}

cn.tianjun.zk.ServiceRegistry

package cn.tianjun.zk;import java.util.concurrent.CountDownLatch;import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;/*** 服务注册 ,ZK 在该架构中扮演了“服务注册表”的角色,用于注册所有服务器的地�?与端口,并对客户端提供服务发现的功能* */
public class ServiceRegistry {private static final Logger LOGGER = LoggerFactory.getLogger(ServiceRegistry.class);private CountDownLatch latch = new CountDownLatch(1);private String registryAddress;public ServiceRegistry(String registryAddress) {//zookeeper的地�?this.registryAddress = registryAddress;}/*** 创建zookeeper链接* * @param data*/public void register(String data) {if (data != null) {ZooKeeper zk = connectServer();if (zk != null) {createNode(zk, data);}}}/*** 创建zookeeper链接,监�?* * @return*/private ZooKeeper connectServer() {ZooKeeper zk = null;try {zk = new ZooKeeper(registryAddress, Constant.ZK_SESSION_TIMEOUT,new Watcher() {public void process(WatchedEvent event) {if (event.getState() == Event.KeeperState.SyncConnected) {latch.countDown();}}});latch.await();} catch (Exception e) {LOGGER.error("", e);}return zk;}/*** 创建节点* * @param zk* @param data*/private void createNode(ZooKeeper zk, String data) {try {byte[] bytes = data.getBytes();if (zk.exists(Constant.ZK_REGISTRY_PATH, null) == null) {zk.create(Constant.ZK_REGISTRY_PATH, null, Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);}String path = zk.create(Constant.ZK_DATA_PATH, bytes,Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);LOGGER.debug("create zookeeper node ({} => {})", path, data);} catch (Exception e) {LOGGER.error("", e);}}
}

2、utils的实现

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"><parent><artifactId>rpc.demo</artifactId><groupId>tj.cmcc.org</groupId><version>1.0-SNAPSHOT</version></parent><modelVersion>4.0.0</modelVersion><artifactId>rpc-utils</artifactId><packaging>jar</packaging><name>rpc-utils Maven Webapp</name><url>http://maven.apache.org</url><dependencies><!-- SLF4J --><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId></dependency><!-- Netty --><dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId></dependency><!-- Protostuff --><dependency><groupId>com.dyuproject.protostuff</groupId><artifactId>protostuff-core</artifactId></dependency><dependency><groupId>com.dyuproject.protostuff</groupId><artifactId>protostuff-runtime</artifactId></dependency><!-- Objenesis --><dependency><groupId>org.objenesis</groupId><artifactId>objenesis</artifactId></dependency></dependencies><build><finalName>rpc-utils</finalName></build>
</project>

cn.tianjun.rpc.utils.RpcDecoder

package cn.tianjun.rpc.utils;import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import java.util.List;/*** RPC 解码器**/
public class RpcDecoder extends ByteToMessageDecoder {private Class<?> genericClass;// 构造函数传入向反序列化的classpublic RpcDecoder(Class<?> genericClass) {this.genericClass = genericClass;}@Overridepublic final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {if (in.readableBytes() < 4) {return;}in.markReaderIndex();int dataLength = in.readInt();if (dataLength < 0) {ctx.close();}if (in.readableBytes() < dataLength) {in.resetReaderIndex();}//将ByteBuf转换为byte[]byte[] data = new byte[dataLength];in.readBytes(data);//将data转换成objectObject obj = SerializationUtil.deserialize(data, genericClass);out.add(obj);}
}

cn.tianjun.rpc.utils.RpcEncoder

package cn.tianjun.rpc.utils;import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;/*** RPC 编码器**/
public class RpcEncoder extends MessageToByteEncoder<Object> {private Class<?> genericClass;// 构造函数传入向反序列化的classpublic RpcEncoder(Class<?> genericClass) {this.genericClass = genericClass;}@Overridepublic void encode(ChannelHandlerContext ctx, Object inob, ByteBuf out)throws Exception {//序列化if (genericClass.isInstance(inob)) {byte[] data = SerializationUtil.serialize(inob);out.writeInt(data.length);out.writeBytes(data);}}
}

cn.tianjun.rpc.utils.RpcRequest

package cn.tianjun.rpc.utils;/*** 封装 RPC 请求*  封装发送的object的反射属性*/
public class RpcRequest {private String requestId;private String className;private String methodName;private Class<?>[] parameterTypes;private Object[] parameters;public String getRequestId() {return requestId;}public void setRequestId(String requestId) {this.requestId = requestId;}public String getClassName() {return className;}public void setClassName(String className) {this.className = className;}public String getMethodName() {return methodName;}public void setMethodName(String methodName) {this.methodName = methodName;}public Class<?>[] getParameterTypes() {return parameterTypes;}public void setParameterTypes(Class<?>[] parameterTypes) {this.parameterTypes = parameterTypes;}public Object[] getParameters() {return parameters;}public void setParameters(Object[] parameters) {this.parameters = parameters;}
}

cn.tianjun.rpc.utils.RpcResponse

package cn.tianjun.rpc.utils;/*** 封装 RPC 响应* 封装相应object*/
public class RpcResponse {private String requestId;private Throwable error;private Object result;public boolean isError() {return error != null;}public String getRequestId() {return requestId;}public void setRequestId(String requestId) {this.requestId = requestId;}public Throwable getError() {return error;}public void setError(Throwable error) {this.error = error;}public Object getResult() {return result;}public void setResult(Object result) {this.result = result;}
}

cn.tianjun.rpc.utils.SerializationUtil

package cn.tianjun.rpc.utils;import com.dyuproject.protostuff.LinkedBuffer;
import com.dyuproject.protostuff.ProtostuffIOUtil;
import com.dyuproject.protostuff.Schema;
import com.dyuproject.protostuff.runtime.RuntimeSchema;import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;import org.objenesis.Objenesis;
import org.objenesis.ObjenesisStd;/*** 序列化工具类(基于 Protostuff 实现)**/
public class SerializationUtil {private static Map<Class<?>, Schema<?>> cachedSchema = new ConcurrentHashMap<Class<?>, Schema<?>>();private static Objenesis objenesis = new ObjenesisStd(true);private SerializationUtil() {}/*** 获取类的schema* @param cls* @return*/@SuppressWarnings("unchecked")private static <T> Schema<T> getSchema(Class<T> cls) {Schema<T> schema = (Schema<T>) cachedSchema.get(cls);if (schema == null) {schema = RuntimeSchema.createFrom(cls);if (schema != null) {cachedSchema.put(cls, schema);}}return schema;}/*** 序列化(对象 -> 字节数组)*/@SuppressWarnings("unchecked")public static <T> byte[] serialize(T obj) {Class<T> cls = (Class<T>) obj.getClass();LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);try {Schema<T> schema = getSchema(cls);return ProtostuffIOUtil.toByteArray(obj, schema, buffer);//序列�?} catch (Exception e) {throw new IllegalStateException(e.getMessage(), e);} finally {buffer.clear();}}/*** 反序列化(字节数组 -> 对象)*/public static <T> T deserialize(byte[] data, Class<T> cls) {try {/** 如果一个类没有参数为空的构造方法时候,那么你直接调用newInstance方法试图得到一个实例对象的时候是会抛出异常的* 通过ObjenesisStd可以完美的避开这个问题* */T message = (T) objenesis.newInstance(cls);//实例化Schema<T> schema = getSchema(cls);//获取类的schemaProtostuffIOUtil.mergeFrom(data, message, schema);return message;} catch (Exception e) {throw new IllegalStateException(e.getMessage(), e);}}
}

3、protocol的实现

这个只是接口的定义,pom.xml里面为空。

cn.tianjun.rpc.protocol.HelloService

package cn.tianjun.rpc.protocol;public interface HelloService {String hello(String name);String hello(Person person);
}

cn.tianjun.rpc.protocol.Person

package cn.tianjun.rpc.protocol;public class Person {private String firstName;private String lastName;public Person() {}public Person(String firstName, String lastName) {this.firstName = firstName;this.lastName = lastName;}public String getFirstName() {return firstName;}public void setFirstName(String firstName) {this.firstName = firstName;}public String getLastName() {return lastName;}public void setLastName(String lastName) {this.lastName = lastName;}
}

这篇关于自定义基于netty的rpc框架(4)---zk和utils以及protocol的实现的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/873702

相关文章

Python的Darts库实现时间序列预测

《Python的Darts库实现时间序列预测》Darts一个集统计、机器学习与深度学习模型于一体的Python时间序列预测库,本文主要介绍了Python的Darts库实现时间序列预测,感兴趣的可以了解... 目录目录一、什么是 Darts?二、安装与基本配置安装 Darts导入基础模块三、时间序列数据结构与

Python使用FastAPI实现大文件分片上传与断点续传功能

《Python使用FastAPI实现大文件分片上传与断点续传功能》大文件直传常遇到超时、网络抖动失败、失败后只能重传的问题,分片上传+断点续传可以把大文件拆成若干小块逐个上传,并在中断后从已完成分片继... 目录一、接口设计二、服务端实现(FastAPI)2.1 运行环境2.2 目录结构建议2.3 serv

C#实现千万数据秒级导入的代码

《C#实现千万数据秒级导入的代码》在实际开发中excel导入很常见,现代社会中很容易遇到大数据处理业务,所以本文我就给大家分享一下千万数据秒级导入怎么实现,文中有详细的代码示例供大家参考,需要的朋友可... 目录前言一、数据存储二、处理逻辑优化前代码处理逻辑优化后的代码总结前言在实际开发中excel导入很

SpringBoot+RustFS 实现文件切片极速上传的实例代码

《SpringBoot+RustFS实现文件切片极速上传的实例代码》本文介绍利用SpringBoot和RustFS构建高性能文件切片上传系统,实现大文件秒传、断点续传和分片上传等功能,具有一定的参考... 目录一、为什么选择 RustFS + SpringBoot?二、环境准备与部署2.1 安装 RustF

Nginx部署HTTP/3的实现步骤

《Nginx部署HTTP/3的实现步骤》本文介绍了在Nginx中部署HTTP/3的详细步骤,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学... 目录前提条件第一步:安装必要的依赖库第二步:获取并构建 BoringSSL第三步:获取 Nginx

MyBatis Plus实现时间字段自动填充的完整方案

《MyBatisPlus实现时间字段自动填充的完整方案》在日常开发中,我们经常需要记录数据的创建时间和更新时间,传统的做法是在每次插入或更新操作时手动设置这些时间字段,这种方式不仅繁琐,还容易遗漏,... 目录前言解决目标技术栈实现步骤1. 实体类注解配置2. 创建元数据处理器3. 服务层代码优化填充机制详

Python实现Excel批量样式修改器(附完整代码)

《Python实现Excel批量样式修改器(附完整代码)》这篇文章主要为大家详细介绍了如何使用Python实现一个Excel批量样式修改器,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一... 目录前言功能特性核心功能界面特性系统要求安装说明使用指南基本操作流程高级功能技术实现核心技术栈关键函

Java实现字节字符转bcd编码

《Java实现字节字符转bcd编码》BCD是一种将十进制数字编码为二进制的表示方式,常用于数字显示和存储,本文将介绍如何在Java中实现字节字符转BCD码的过程,需要的小伙伴可以了解下... 目录前言BCD码是什么Java实现字节转bcd编码方法补充总结前言BCD码(Binary-Coded Decima

SpringBoot全局域名替换的实现

《SpringBoot全局域名替换的实现》本文主要介绍了SpringBoot全局域名替换的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一... 目录 项目结构⚙️ 配置文件application.yml️ 配置类AppProperties.Ja

Python实现批量CSV转Excel的高性能处理方案

《Python实现批量CSV转Excel的高性能处理方案》在日常办公中,我们经常需要将CSV格式的数据转换为Excel文件,本文将介绍一个基于Python的高性能解决方案,感兴趣的小伙伴可以跟随小编一... 目录一、场景需求二、技术方案三、核心代码四、批量处理方案五、性能优化六、使用示例完整代码七、小结一、