自定义基于netty的rpc框架(4)---zk和utils以及protocol的实现

2024-04-03 18:48

本文主要是介绍自定义基于netty的rpc框架(4)---zk和utils以及protocol的实现,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

1、zk的实现

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"><parent><artifactId>rpc.demo</artifactId><groupId>tj.cmcc.org</groupId><version>1.0-SNAPSHOT</version></parent><modelVersion>4.0.0</modelVersion><artifactId>rpc-zk</artifactId><packaging>jar</packaging><name>rpc-zk Maven Webapp</name><url>http://maven.apache.org</url><dependencies><!-- SLF4J --><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId></dependency><!-- ZooKeeper --><dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId></dependency></dependencies><build><finalName>rpc-zk</finalName></build>
</project>

cn.tianjun.zk.Constant

package cn.tianjun.zk;/*** 常量*/
public class Constant {public static final int ZK_SESSION_TIMEOUT = 5000;//zk超时时间public static final String ZK_REGISTRY_PATH = "/registry";//注册节点public static final String ZK_DATA_PATH = ZK_REGISTRY_PATH + "/data";//节点
}

cn.tianjun.zk.ServiceDiscovery

package cn.tianjun.zk;import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;/*** 本类用于client发现server节点的变化 ,实现负载均衡**/
public class ServiceDiscovery {private static final Logger LOGGER = LoggerFactory.getLogger(ServiceDiscovery.class);private CountDownLatch latch = new CountDownLatch(1);private volatile List<String> dataList = new ArrayList<String>();private String registryAddress;/*** zk链接** @param registryAddress*/public ServiceDiscovery(String registryAddress) {this.registryAddress = registryAddress;ZooKeeper zk = connectServer();if (zk != null) {watchNode(zk);}}/*** 发现新节点** @return*/public String discover() {String data = null;int size = dataList.size();// 存在新节点,使用即可if (size > 0) {if (size == 1) {data = dataList.get(0);LOGGER.debug("using only data: {}", data);} else {data = dataList.get(ThreadLocalRandom.current().nextInt(size));LOGGER.debug("using random data: {}", data);}}return data;}/*** 链接** @return*/private ZooKeeper connectServer() {ZooKeeper zk = null;try {zk = new ZooKeeper(registryAddress, Constant.ZK_SESSION_TIMEOUT,new Watcher() {public void process(WatchedEvent event) {if (event.getState() == Event.KeeperState.SyncConnected) {latch.countDown();}}});latch.await();} catch (Exception e) {LOGGER.error("", e);}return zk;}/*** 监听** @param zk*/private void watchNode(final ZooKeeper zk) {try {// 获取所有子节点List<String> nodeList = zk.getChildren(Constant.ZK_REGISTRY_PATH,new Watcher() {public void process(WatchedEvent event) {// 节点改变if (event.getType() == Event.EventType.NodeChildrenChanged) {watchNode(zk);}}});List<String> dataList = new ArrayList<String>();// 循环子节点for (String node : nodeList) {// 获取节点中的服务器地址byte[] bytes = zk.getData(Constant.ZK_REGISTRY_PATH + "/"+ node, false, null);// 存储到list中dataList.add(new String(bytes));}LOGGER.debug("node data: {}", dataList);// 将节点信息记录在成员变量this.dataList = dataList;} catch (Exception e) {LOGGER.error("", e);}}
}

cn.tianjun.zk.ServiceRegistry

package cn.tianjun.zk;import java.util.concurrent.CountDownLatch;import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;/*** 服务注册 ,ZK 在该架构中扮演了“服务注册表”的角色,用于注册所有服务器的地�?与端口,并对客户端提供服务发现的功能* */
public class ServiceRegistry {private static final Logger LOGGER = LoggerFactory.getLogger(ServiceRegistry.class);private CountDownLatch latch = new CountDownLatch(1);private String registryAddress;public ServiceRegistry(String registryAddress) {//zookeeper的地�?this.registryAddress = registryAddress;}/*** 创建zookeeper链接* * @param data*/public void register(String data) {if (data != null) {ZooKeeper zk = connectServer();if (zk != null) {createNode(zk, data);}}}/*** 创建zookeeper链接,监�?* * @return*/private ZooKeeper connectServer() {ZooKeeper zk = null;try {zk = new ZooKeeper(registryAddress, Constant.ZK_SESSION_TIMEOUT,new Watcher() {public void process(WatchedEvent event) {if (event.getState() == Event.KeeperState.SyncConnected) {latch.countDown();}}});latch.await();} catch (Exception e) {LOGGER.error("", e);}return zk;}/*** 创建节点* * @param zk* @param data*/private void createNode(ZooKeeper zk, String data) {try {byte[] bytes = data.getBytes();if (zk.exists(Constant.ZK_REGISTRY_PATH, null) == null) {zk.create(Constant.ZK_REGISTRY_PATH, null, Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);}String path = zk.create(Constant.ZK_DATA_PATH, bytes,Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);LOGGER.debug("create zookeeper node ({} => {})", path, data);} catch (Exception e) {LOGGER.error("", e);}}
}

2、utils的实现

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"><parent><artifactId>rpc.demo</artifactId><groupId>tj.cmcc.org</groupId><version>1.0-SNAPSHOT</version></parent><modelVersion>4.0.0</modelVersion><artifactId>rpc-utils</artifactId><packaging>jar</packaging><name>rpc-utils Maven Webapp</name><url>http://maven.apache.org</url><dependencies><!-- SLF4J --><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId></dependency><!-- Netty --><dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId></dependency><!-- Protostuff --><dependency><groupId>com.dyuproject.protostuff</groupId><artifactId>protostuff-core</artifactId></dependency><dependency><groupId>com.dyuproject.protostuff</groupId><artifactId>protostuff-runtime</artifactId></dependency><!-- Objenesis --><dependency><groupId>org.objenesis</groupId><artifactId>objenesis</artifactId></dependency></dependencies><build><finalName>rpc-utils</finalName></build>
</project>

cn.tianjun.rpc.utils.RpcDecoder

package cn.tianjun.rpc.utils;import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import java.util.List;/*** RPC 解码器**/
public class RpcDecoder extends ByteToMessageDecoder {private Class<?> genericClass;// 构造函数传入向反序列化的classpublic RpcDecoder(Class<?> genericClass) {this.genericClass = genericClass;}@Overridepublic final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {if (in.readableBytes() < 4) {return;}in.markReaderIndex();int dataLength = in.readInt();if (dataLength < 0) {ctx.close();}if (in.readableBytes() < dataLength) {in.resetReaderIndex();}//将ByteBuf转换为byte[]byte[] data = new byte[dataLength];in.readBytes(data);//将data转换成objectObject obj = SerializationUtil.deserialize(data, genericClass);out.add(obj);}
}

cn.tianjun.rpc.utils.RpcEncoder

package cn.tianjun.rpc.utils;import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;/*** RPC 编码器**/
public class RpcEncoder extends MessageToByteEncoder<Object> {private Class<?> genericClass;// 构造函数传入向反序列化的classpublic RpcEncoder(Class<?> genericClass) {this.genericClass = genericClass;}@Overridepublic void encode(ChannelHandlerContext ctx, Object inob, ByteBuf out)throws Exception {//序列化if (genericClass.isInstance(inob)) {byte[] data = SerializationUtil.serialize(inob);out.writeInt(data.length);out.writeBytes(data);}}
}

cn.tianjun.rpc.utils.RpcRequest

package cn.tianjun.rpc.utils;/*** 封装 RPC 请求*  封装发送的object的反射属性*/
public class RpcRequest {private String requestId;private String className;private String methodName;private Class<?>[] parameterTypes;private Object[] parameters;public String getRequestId() {return requestId;}public void setRequestId(String requestId) {this.requestId = requestId;}public String getClassName() {return className;}public void setClassName(String className) {this.className = className;}public String getMethodName() {return methodName;}public void setMethodName(String methodName) {this.methodName = methodName;}public Class<?>[] getParameterTypes() {return parameterTypes;}public void setParameterTypes(Class<?>[] parameterTypes) {this.parameterTypes = parameterTypes;}public Object[] getParameters() {return parameters;}public void setParameters(Object[] parameters) {this.parameters = parameters;}
}

cn.tianjun.rpc.utils.RpcResponse

package cn.tianjun.rpc.utils;/*** 封装 RPC 响应* 封装相应object*/
public class RpcResponse {private String requestId;private Throwable error;private Object result;public boolean isError() {return error != null;}public String getRequestId() {return requestId;}public void setRequestId(String requestId) {this.requestId = requestId;}public Throwable getError() {return error;}public void setError(Throwable error) {this.error = error;}public Object getResult() {return result;}public void setResult(Object result) {this.result = result;}
}

cn.tianjun.rpc.utils.SerializationUtil

package cn.tianjun.rpc.utils;import com.dyuproject.protostuff.LinkedBuffer;
import com.dyuproject.protostuff.ProtostuffIOUtil;
import com.dyuproject.protostuff.Schema;
import com.dyuproject.protostuff.runtime.RuntimeSchema;import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;import org.objenesis.Objenesis;
import org.objenesis.ObjenesisStd;/*** 序列化工具类(基于 Protostuff 实现)**/
public class SerializationUtil {private static Map<Class<?>, Schema<?>> cachedSchema = new ConcurrentHashMap<Class<?>, Schema<?>>();private static Objenesis objenesis = new ObjenesisStd(true);private SerializationUtil() {}/*** 获取类的schema* @param cls* @return*/@SuppressWarnings("unchecked")private static <T> Schema<T> getSchema(Class<T> cls) {Schema<T> schema = (Schema<T>) cachedSchema.get(cls);if (schema == null) {schema = RuntimeSchema.createFrom(cls);if (schema != null) {cachedSchema.put(cls, schema);}}return schema;}/*** 序列化(对象 -> 字节数组)*/@SuppressWarnings("unchecked")public static <T> byte[] serialize(T obj) {Class<T> cls = (Class<T>) obj.getClass();LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);try {Schema<T> schema = getSchema(cls);return ProtostuffIOUtil.toByteArray(obj, schema, buffer);//序列�?} catch (Exception e) {throw new IllegalStateException(e.getMessage(), e);} finally {buffer.clear();}}/*** 反序列化(字节数组 -> 对象)*/public static <T> T deserialize(byte[] data, Class<T> cls) {try {/** 如果一个类没有参数为空的构造方法时候,那么你直接调用newInstance方法试图得到一个实例对象的时候是会抛出异常的* 通过ObjenesisStd可以完美的避开这个问题* */T message = (T) objenesis.newInstance(cls);//实例化Schema<T> schema = getSchema(cls);//获取类的schemaProtostuffIOUtil.mergeFrom(data, message, schema);return message;} catch (Exception e) {throw new IllegalStateException(e.getMessage(), e);}}
}

3、protocol的实现

这个只是接口的定义,pom.xml里面为空。

cn.tianjun.rpc.protocol.HelloService

package cn.tianjun.rpc.protocol;public interface HelloService {String hello(String name);String hello(Person person);
}

cn.tianjun.rpc.protocol.Person

package cn.tianjun.rpc.protocol;public class Person {private String firstName;private String lastName;public Person() {}public Person(String firstName, String lastName) {this.firstName = firstName;this.lastName = lastName;}public String getFirstName() {return firstName;}public void setFirstName(String firstName) {this.firstName = firstName;}public String getLastName() {return lastName;}public void setLastName(String lastName) {this.lastName = lastName;}
}

这篇关于自定义基于netty的rpc框架(4)---zk和utils以及protocol的实现的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/873702

相关文章

Linux下删除乱码文件和目录的实现方式

《Linux下删除乱码文件和目录的实现方式》:本文主要介绍Linux下删除乱码文件和目录的实现方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录linux下删除乱码文件和目录方法1方法2总结Linux下删除乱码文件和目录方法1使用ls -i命令找到文件或目录

SpringBoot+EasyExcel实现自定义复杂样式导入导出

《SpringBoot+EasyExcel实现自定义复杂样式导入导出》这篇文章主要为大家详细介绍了SpringBoot如何结果EasyExcel实现自定义复杂样式导入导出功能,文中的示例代码讲解详细,... 目录安装处理自定义导出复杂场景1、列不固定,动态列2、动态下拉3、自定义锁定行/列,添加密码4、合并

mybatis执行insert返回id实现详解

《mybatis执行insert返回id实现详解》MyBatis插入操作默认返回受影响行数,需通过useGeneratedKeys+keyProperty或selectKey获取主键ID,确保主键为自... 目录 两种方式获取自增 ID:1. ​​useGeneratedKeys+keyProperty(推

Spring Boot集成Druid实现数据源管理与监控的详细步骤

《SpringBoot集成Druid实现数据源管理与监控的详细步骤》本文介绍如何在SpringBoot项目中集成Druid数据库连接池,包括环境搭建、Maven依赖配置、SpringBoot配置文件... 目录1. 引言1.1 环境准备1.2 Druid介绍2. 配置Druid连接池3. 查看Druid监控

Linux在线解压jar包的实现方式

《Linux在线解压jar包的实现方式》:本文主要介绍Linux在线解压jar包的实现方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录linux在线解压jar包解压 jar包的步骤总结Linux在线解压jar包在 Centos 中解压 jar 包可以使用 u

c++ 类成员变量默认初始值的实现

《c++类成员变量默认初始值的实现》本文主要介绍了c++类成员变量默认初始值,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧... 目录C++类成员变量初始化c++类的变量的初始化在C++中,如果使用类成员变量时未给定其初始值,那么它将被

Qt使用QSqlDatabase连接MySQL实现增删改查功能

《Qt使用QSqlDatabase连接MySQL实现增删改查功能》这篇文章主要为大家详细介绍了Qt如何使用QSqlDatabase连接MySQL实现增删改查功能,文中的示例代码讲解详细,感兴趣的小伙伴... 目录一、创建数据表二、连接mysql数据库三、封装成一个完整的轻量级 ORM 风格类3.1 表结构

基于Python实现一个图片拆分工具

《基于Python实现一个图片拆分工具》这篇文章主要为大家详细介绍了如何基于Python实现一个图片拆分工具,可以根据需要的行数和列数进行拆分,感兴趣的小伙伴可以跟随小编一起学习一下... 简单介绍先自己选择输入的图片,默认是输出到项目文件夹中,可以自己选择其他的文件夹,选择需要拆分的行数和列数,可以通过

Python中将嵌套列表扁平化的多种实现方法

《Python中将嵌套列表扁平化的多种实现方法》在Python编程中,我们常常会遇到需要将嵌套列表(即列表中包含列表)转换为一个一维的扁平列表的需求,本文将给大家介绍了多种实现这一目标的方法,需要的朋... 目录python中将嵌套列表扁平化的方法技术背景实现步骤1. 使用嵌套列表推导式2. 使用itert

Python使用pip工具实现包自动更新的多种方法

《Python使用pip工具实现包自动更新的多种方法》本文深入探讨了使用Python的pip工具实现包自动更新的各种方法和技术,我们将从基础概念开始,逐步介绍手动更新方法、自动化脚本编写、结合CI/C... 目录1. 背景介绍1.1 目的和范围1.2 预期读者1.3 文档结构概述1.4 术语表1.4.1 核