自定义基于netty的rpc框架(4)---zk和utils以及protocol的实现

2024-04-03 18:48

本文主要是介绍自定义基于netty的rpc框架(4)---zk和utils以及protocol的实现,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

1、zk的实现

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"><parent><artifactId>rpc.demo</artifactId><groupId>tj.cmcc.org</groupId><version>1.0-SNAPSHOT</version></parent><modelVersion>4.0.0</modelVersion><artifactId>rpc-zk</artifactId><packaging>jar</packaging><name>rpc-zk Maven Webapp</name><url>http://maven.apache.org</url><dependencies><!-- SLF4J --><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId></dependency><!-- ZooKeeper --><dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId></dependency></dependencies><build><finalName>rpc-zk</finalName></build>
</project>

cn.tianjun.zk.Constant

package cn.tianjun.zk;/*** 常量*/
public class Constant {public static final int ZK_SESSION_TIMEOUT = 5000;//zk超时时间public static final String ZK_REGISTRY_PATH = "/registry";//注册节点public static final String ZK_DATA_PATH = ZK_REGISTRY_PATH + "/data";//节点
}

cn.tianjun.zk.ServiceDiscovery

package cn.tianjun.zk;import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;/*** 本类用于client发现server节点的变化 ,实现负载均衡**/
public class ServiceDiscovery {private static final Logger LOGGER = LoggerFactory.getLogger(ServiceDiscovery.class);private CountDownLatch latch = new CountDownLatch(1);private volatile List<String> dataList = new ArrayList<String>();private String registryAddress;/*** zk链接** @param registryAddress*/public ServiceDiscovery(String registryAddress) {this.registryAddress = registryAddress;ZooKeeper zk = connectServer();if (zk != null) {watchNode(zk);}}/*** 发现新节点** @return*/public String discover() {String data = null;int size = dataList.size();// 存在新节点,使用即可if (size > 0) {if (size == 1) {data = dataList.get(0);LOGGER.debug("using only data: {}", data);} else {data = dataList.get(ThreadLocalRandom.current().nextInt(size));LOGGER.debug("using random data: {}", data);}}return data;}/*** 链接** @return*/private ZooKeeper connectServer() {ZooKeeper zk = null;try {zk = new ZooKeeper(registryAddress, Constant.ZK_SESSION_TIMEOUT,new Watcher() {public void process(WatchedEvent event) {if (event.getState() == Event.KeeperState.SyncConnected) {latch.countDown();}}});latch.await();} catch (Exception e) {LOGGER.error("", e);}return zk;}/*** 监听** @param zk*/private void watchNode(final ZooKeeper zk) {try {// 获取所有子节点List<String> nodeList = zk.getChildren(Constant.ZK_REGISTRY_PATH,new Watcher() {public void process(WatchedEvent event) {// 节点改变if (event.getType() == Event.EventType.NodeChildrenChanged) {watchNode(zk);}}});List<String> dataList = new ArrayList<String>();// 循环子节点for (String node : nodeList) {// 获取节点中的服务器地址byte[] bytes = zk.getData(Constant.ZK_REGISTRY_PATH + "/"+ node, false, null);// 存储到list中dataList.add(new String(bytes));}LOGGER.debug("node data: {}", dataList);// 将节点信息记录在成员变量this.dataList = dataList;} catch (Exception e) {LOGGER.error("", e);}}
}

cn.tianjun.zk.ServiceRegistry

package cn.tianjun.zk;import java.util.concurrent.CountDownLatch;import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;/*** 服务注册 ,ZK 在该架构中扮演了“服务注册表”的角色,用于注册所有服务器的地�?与端口,并对客户端提供服务发现的功能* */
public class ServiceRegistry {private static final Logger LOGGER = LoggerFactory.getLogger(ServiceRegistry.class);private CountDownLatch latch = new CountDownLatch(1);private String registryAddress;public ServiceRegistry(String registryAddress) {//zookeeper的地�?this.registryAddress = registryAddress;}/*** 创建zookeeper链接* * @param data*/public void register(String data) {if (data != null) {ZooKeeper zk = connectServer();if (zk != null) {createNode(zk, data);}}}/*** 创建zookeeper链接,监�?* * @return*/private ZooKeeper connectServer() {ZooKeeper zk = null;try {zk = new ZooKeeper(registryAddress, Constant.ZK_SESSION_TIMEOUT,new Watcher() {public void process(WatchedEvent event) {if (event.getState() == Event.KeeperState.SyncConnected) {latch.countDown();}}});latch.await();} catch (Exception e) {LOGGER.error("", e);}return zk;}/*** 创建节点* * @param zk* @param data*/private void createNode(ZooKeeper zk, String data) {try {byte[] bytes = data.getBytes();if (zk.exists(Constant.ZK_REGISTRY_PATH, null) == null) {zk.create(Constant.ZK_REGISTRY_PATH, null, Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);}String path = zk.create(Constant.ZK_DATA_PATH, bytes,Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);LOGGER.debug("create zookeeper node ({} => {})", path, data);} catch (Exception e) {LOGGER.error("", e);}}
}

2、utils的实现

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"><parent><artifactId>rpc.demo</artifactId><groupId>tj.cmcc.org</groupId><version>1.0-SNAPSHOT</version></parent><modelVersion>4.0.0</modelVersion><artifactId>rpc-utils</artifactId><packaging>jar</packaging><name>rpc-utils Maven Webapp</name><url>http://maven.apache.org</url><dependencies><!-- SLF4J --><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId></dependency><!-- Netty --><dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId></dependency><!-- Protostuff --><dependency><groupId>com.dyuproject.protostuff</groupId><artifactId>protostuff-core</artifactId></dependency><dependency><groupId>com.dyuproject.protostuff</groupId><artifactId>protostuff-runtime</artifactId></dependency><!-- Objenesis --><dependency><groupId>org.objenesis</groupId><artifactId>objenesis</artifactId></dependency></dependencies><build><finalName>rpc-utils</finalName></build>
</project>

cn.tianjun.rpc.utils.RpcDecoder

package cn.tianjun.rpc.utils;import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import java.util.List;/*** RPC 解码器**/
public class RpcDecoder extends ByteToMessageDecoder {private Class<?> genericClass;// 构造函数传入向反序列化的classpublic RpcDecoder(Class<?> genericClass) {this.genericClass = genericClass;}@Overridepublic final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {if (in.readableBytes() < 4) {return;}in.markReaderIndex();int dataLength = in.readInt();if (dataLength < 0) {ctx.close();}if (in.readableBytes() < dataLength) {in.resetReaderIndex();}//将ByteBuf转换为byte[]byte[] data = new byte[dataLength];in.readBytes(data);//将data转换成objectObject obj = SerializationUtil.deserialize(data, genericClass);out.add(obj);}
}

cn.tianjun.rpc.utils.RpcEncoder

package cn.tianjun.rpc.utils;import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;/*** RPC 编码器**/
public class RpcEncoder extends MessageToByteEncoder<Object> {private Class<?> genericClass;// 构造函数传入向反序列化的classpublic RpcEncoder(Class<?> genericClass) {this.genericClass = genericClass;}@Overridepublic void encode(ChannelHandlerContext ctx, Object inob, ByteBuf out)throws Exception {//序列化if (genericClass.isInstance(inob)) {byte[] data = SerializationUtil.serialize(inob);out.writeInt(data.length);out.writeBytes(data);}}
}

cn.tianjun.rpc.utils.RpcRequest

package cn.tianjun.rpc.utils;/*** 封装 RPC 请求*  封装发送的object的反射属性*/
public class RpcRequest {private String requestId;private String className;private String methodName;private Class<?>[] parameterTypes;private Object[] parameters;public String getRequestId() {return requestId;}public void setRequestId(String requestId) {this.requestId = requestId;}public String getClassName() {return className;}public void setClassName(String className) {this.className = className;}public String getMethodName() {return methodName;}public void setMethodName(String methodName) {this.methodName = methodName;}public Class<?>[] getParameterTypes() {return parameterTypes;}public void setParameterTypes(Class<?>[] parameterTypes) {this.parameterTypes = parameterTypes;}public Object[] getParameters() {return parameters;}public void setParameters(Object[] parameters) {this.parameters = parameters;}
}

cn.tianjun.rpc.utils.RpcResponse

package cn.tianjun.rpc.utils;/*** 封装 RPC 响应* 封装相应object*/
public class RpcResponse {private String requestId;private Throwable error;private Object result;public boolean isError() {return error != null;}public String getRequestId() {return requestId;}public void setRequestId(String requestId) {this.requestId = requestId;}public Throwable getError() {return error;}public void setError(Throwable error) {this.error = error;}public Object getResult() {return result;}public void setResult(Object result) {this.result = result;}
}

cn.tianjun.rpc.utils.SerializationUtil

package cn.tianjun.rpc.utils;import com.dyuproject.protostuff.LinkedBuffer;
import com.dyuproject.protostuff.ProtostuffIOUtil;
import com.dyuproject.protostuff.Schema;
import com.dyuproject.protostuff.runtime.RuntimeSchema;import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;import org.objenesis.Objenesis;
import org.objenesis.ObjenesisStd;/*** 序列化工具类(基于 Protostuff 实现)**/
public class SerializationUtil {private static Map<Class<?>, Schema<?>> cachedSchema = new ConcurrentHashMap<Class<?>, Schema<?>>();private static Objenesis objenesis = new ObjenesisStd(true);private SerializationUtil() {}/*** 获取类的schema* @param cls* @return*/@SuppressWarnings("unchecked")private static <T> Schema<T> getSchema(Class<T> cls) {Schema<T> schema = (Schema<T>) cachedSchema.get(cls);if (schema == null) {schema = RuntimeSchema.createFrom(cls);if (schema != null) {cachedSchema.put(cls, schema);}}return schema;}/*** 序列化(对象 -> 字节数组)*/@SuppressWarnings("unchecked")public static <T> byte[] serialize(T obj) {Class<T> cls = (Class<T>) obj.getClass();LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);try {Schema<T> schema = getSchema(cls);return ProtostuffIOUtil.toByteArray(obj, schema, buffer);//序列�?} catch (Exception e) {throw new IllegalStateException(e.getMessage(), e);} finally {buffer.clear();}}/*** 反序列化(字节数组 -> 对象)*/public static <T> T deserialize(byte[] data, Class<T> cls) {try {/** 如果一个类没有参数为空的构造方法时候,那么你直接调用newInstance方法试图得到一个实例对象的时候是会抛出异常的* 通过ObjenesisStd可以完美的避开这个问题* */T message = (T) objenesis.newInstance(cls);//实例化Schema<T> schema = getSchema(cls);//获取类的schemaProtostuffIOUtil.mergeFrom(data, message, schema);return message;} catch (Exception e) {throw new IllegalStateException(e.getMessage(), e);}}
}

3、protocol的实现

这个只是接口的定义,pom.xml里面为空。

cn.tianjun.rpc.protocol.HelloService

package cn.tianjun.rpc.protocol;public interface HelloService {String hello(String name);String hello(Person person);
}

cn.tianjun.rpc.protocol.Person

package cn.tianjun.rpc.protocol;public class Person {private String firstName;private String lastName;public Person() {}public Person(String firstName, String lastName) {this.firstName = firstName;this.lastName = lastName;}public String getFirstName() {return firstName;}public void setFirstName(String firstName) {this.firstName = firstName;}public String getLastName() {return lastName;}public void setLastName(String lastName) {this.lastName = lastName;}
}

这篇关于自定义基于netty的rpc框架(4)---zk和utils以及protocol的实现的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/873702

相关文章

C++中unordered_set哈希集合的实现

《C++中unordered_set哈希集合的实现》std::unordered_set是C++标准库中的无序关联容器,基于哈希表实现,具有元素唯一性和无序性特点,本文就来详细的介绍一下unorder... 目录一、概述二、头文件与命名空间三、常用方法与示例1. 构造与析构2. 迭代器与遍历3. 容量相关4

C++中悬垂引用(Dangling Reference) 的实现

《C++中悬垂引用(DanglingReference)的实现》C++中的悬垂引用指引用绑定的对象被销毁后引用仍存在的情况,会导致访问无效内存,下面就来详细的介绍一下产生的原因以及如何避免,感兴趣... 目录悬垂引用的产生原因1. 引用绑定到局部变量,变量超出作用域后销毁2. 引用绑定到动态分配的对象,对象

SpringBoot基于注解实现数据库字段回填的完整方案

《SpringBoot基于注解实现数据库字段回填的完整方案》这篇文章主要为大家详细介绍了SpringBoot如何基于注解实现数据库字段回填的相关方法,文中的示例代码讲解详细,感兴趣的小伙伴可以了解... 目录数据库表pom.XMLRelationFieldRelationFieldMapping基础的一些代

Java HashMap的底层实现原理深度解析

《JavaHashMap的底层实现原理深度解析》HashMap基于数组+链表+红黑树结构,通过哈希算法和扩容机制优化性能,负载因子与树化阈值平衡效率,是Java开发必备的高效数据结构,本文给大家介绍... 目录一、概述:HashMap的宏观结构二、核心数据结构解析1. 数组(桶数组)2. 链表节点(Node

Java AOP面向切面编程的概念和实现方式

《JavaAOP面向切面编程的概念和实现方式》AOP是面向切面编程,通过动态代理将横切关注点(如日志、事务)与核心业务逻辑分离,提升代码复用性和可维护性,本文给大家介绍JavaAOP面向切面编程的概... 目录一、AOP 是什么?二、AOP 的核心概念与实现方式核心概念实现方式三、Spring AOP 的关

Python实现字典转字符串的五种方法

《Python实现字典转字符串的五种方法》本文介绍了在Python中如何将字典数据结构转换为字符串格式的多种方法,首先可以通过内置的str()函数进行简单转换;其次利用ison.dumps()函数能够... 目录1、使用json模块的dumps方法:2、使用str方法:3、使用循环和字符串拼接:4、使用字符

Linux下利用select实现串口数据读取过程

《Linux下利用select实现串口数据读取过程》文章介绍Linux中使用select、poll或epoll实现串口数据读取,通过I/O多路复用机制在数据到达时触发读取,避免持续轮询,示例代码展示设... 目录示例代码(使用select实现)代码解释总结在 linux 系统里,我们可以借助 select、

Linux挂载linux/Windows共享目录实现方式

《Linux挂载linux/Windows共享目录实现方式》:本文主要介绍Linux挂载linux/Windows共享目录实现方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地... 目录文件共享协议linux环境作为服务端(NFS)在服务器端安装 NFS创建要共享的目录修改 NFS 配

通过React实现页面的无限滚动效果

《通过React实现页面的无限滚动效果》今天我们来聊聊无限滚动这个现代Web开发中不可或缺的技术,无论你是刷微博、逛知乎还是看脚本,无限滚动都已经渗透到我们日常的浏览体验中,那么,如何优雅地实现它呢?... 目录1. 早期的解决方案2. 交叉观察者:IntersectionObserver2.1 Inter

Spring Gateway动态路由实现方案

《SpringGateway动态路由实现方案》本文主要介绍了SpringGateway动态路由实现方案,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随... 目录前沿何为路由RouteDefinitionRouteLocator工作流程动态路由实现尾巴前沿S