自定义基于netty的rpc框架(4)---zk和utils以及protocol的实现

2024-04-03 18:48

本文主要是介绍自定义基于netty的rpc框架(4)---zk和utils以及protocol的实现,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

1、zk的实现

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"><parent><artifactId>rpc.demo</artifactId><groupId>tj.cmcc.org</groupId><version>1.0-SNAPSHOT</version></parent><modelVersion>4.0.0</modelVersion><artifactId>rpc-zk</artifactId><packaging>jar</packaging><name>rpc-zk Maven Webapp</name><url>http://maven.apache.org</url><dependencies><!-- SLF4J --><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId></dependency><!-- ZooKeeper --><dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId></dependency></dependencies><build><finalName>rpc-zk</finalName></build>
</project>

cn.tianjun.zk.Constant

package cn.tianjun.zk;/*** 常量*/
public class Constant {public static final int ZK_SESSION_TIMEOUT = 5000;//zk超时时间public static final String ZK_REGISTRY_PATH = "/registry";//注册节点public static final String ZK_DATA_PATH = ZK_REGISTRY_PATH + "/data";//节点
}

cn.tianjun.zk.ServiceDiscovery

package cn.tianjun.zk;import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;/*** 本类用于client发现server节点的变化 ,实现负载均衡**/
public class ServiceDiscovery {private static final Logger LOGGER = LoggerFactory.getLogger(ServiceDiscovery.class);private CountDownLatch latch = new CountDownLatch(1);private volatile List<String> dataList = new ArrayList<String>();private String registryAddress;/*** zk链接** @param registryAddress*/public ServiceDiscovery(String registryAddress) {this.registryAddress = registryAddress;ZooKeeper zk = connectServer();if (zk != null) {watchNode(zk);}}/*** 发现新节点** @return*/public String discover() {String data = null;int size = dataList.size();// 存在新节点,使用即可if (size > 0) {if (size == 1) {data = dataList.get(0);LOGGER.debug("using only data: {}", data);} else {data = dataList.get(ThreadLocalRandom.current().nextInt(size));LOGGER.debug("using random data: {}", data);}}return data;}/*** 链接** @return*/private ZooKeeper connectServer() {ZooKeeper zk = null;try {zk = new ZooKeeper(registryAddress, Constant.ZK_SESSION_TIMEOUT,new Watcher() {public void process(WatchedEvent event) {if (event.getState() == Event.KeeperState.SyncConnected) {latch.countDown();}}});latch.await();} catch (Exception e) {LOGGER.error("", e);}return zk;}/*** 监听** @param zk*/private void watchNode(final ZooKeeper zk) {try {// 获取所有子节点List<String> nodeList = zk.getChildren(Constant.ZK_REGISTRY_PATH,new Watcher() {public void process(WatchedEvent event) {// 节点改变if (event.getType() == Event.EventType.NodeChildrenChanged) {watchNode(zk);}}});List<String> dataList = new ArrayList<String>();// 循环子节点for (String node : nodeList) {// 获取节点中的服务器地址byte[] bytes = zk.getData(Constant.ZK_REGISTRY_PATH + "/"+ node, false, null);// 存储到list中dataList.add(new String(bytes));}LOGGER.debug("node data: {}", dataList);// 将节点信息记录在成员变量this.dataList = dataList;} catch (Exception e) {LOGGER.error("", e);}}
}

cn.tianjun.zk.ServiceRegistry

package cn.tianjun.zk;import java.util.concurrent.CountDownLatch;import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;/*** 服务注册 ,ZK 在该架构中扮演了“服务注册表”的角色,用于注册所有服务器的地�?与端口,并对客户端提供服务发现的功能* */
public class ServiceRegistry {private static final Logger LOGGER = LoggerFactory.getLogger(ServiceRegistry.class);private CountDownLatch latch = new CountDownLatch(1);private String registryAddress;public ServiceRegistry(String registryAddress) {//zookeeper的地�?this.registryAddress = registryAddress;}/*** 创建zookeeper链接* * @param data*/public void register(String data) {if (data != null) {ZooKeeper zk = connectServer();if (zk != null) {createNode(zk, data);}}}/*** 创建zookeeper链接,监�?* * @return*/private ZooKeeper connectServer() {ZooKeeper zk = null;try {zk = new ZooKeeper(registryAddress, Constant.ZK_SESSION_TIMEOUT,new Watcher() {public void process(WatchedEvent event) {if (event.getState() == Event.KeeperState.SyncConnected) {latch.countDown();}}});latch.await();} catch (Exception e) {LOGGER.error("", e);}return zk;}/*** 创建节点* * @param zk* @param data*/private void createNode(ZooKeeper zk, String data) {try {byte[] bytes = data.getBytes();if (zk.exists(Constant.ZK_REGISTRY_PATH, null) == null) {zk.create(Constant.ZK_REGISTRY_PATH, null, Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);}String path = zk.create(Constant.ZK_DATA_PATH, bytes,Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);LOGGER.debug("create zookeeper node ({} => {})", path, data);} catch (Exception e) {LOGGER.error("", e);}}
}

2、utils的实现

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"><parent><artifactId>rpc.demo</artifactId><groupId>tj.cmcc.org</groupId><version>1.0-SNAPSHOT</version></parent><modelVersion>4.0.0</modelVersion><artifactId>rpc-utils</artifactId><packaging>jar</packaging><name>rpc-utils Maven Webapp</name><url>http://maven.apache.org</url><dependencies><!-- SLF4J --><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId></dependency><!-- Netty --><dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId></dependency><!-- Protostuff --><dependency><groupId>com.dyuproject.protostuff</groupId><artifactId>protostuff-core</artifactId></dependency><dependency><groupId>com.dyuproject.protostuff</groupId><artifactId>protostuff-runtime</artifactId></dependency><!-- Objenesis --><dependency><groupId>org.objenesis</groupId><artifactId>objenesis</artifactId></dependency></dependencies><build><finalName>rpc-utils</finalName></build>
</project>

cn.tianjun.rpc.utils.RpcDecoder

package cn.tianjun.rpc.utils;import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import java.util.List;/*** RPC 解码器**/
public class RpcDecoder extends ByteToMessageDecoder {private Class<?> genericClass;// 构造函数传入向反序列化的classpublic RpcDecoder(Class<?> genericClass) {this.genericClass = genericClass;}@Overridepublic final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {if (in.readableBytes() < 4) {return;}in.markReaderIndex();int dataLength = in.readInt();if (dataLength < 0) {ctx.close();}if (in.readableBytes() < dataLength) {in.resetReaderIndex();}//将ByteBuf转换为byte[]byte[] data = new byte[dataLength];in.readBytes(data);//将data转换成objectObject obj = SerializationUtil.deserialize(data, genericClass);out.add(obj);}
}

cn.tianjun.rpc.utils.RpcEncoder

package cn.tianjun.rpc.utils;import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;/*** RPC 编码器**/
public class RpcEncoder extends MessageToByteEncoder<Object> {private Class<?> genericClass;// 构造函数传入向反序列化的classpublic RpcEncoder(Class<?> genericClass) {this.genericClass = genericClass;}@Overridepublic void encode(ChannelHandlerContext ctx, Object inob, ByteBuf out)throws Exception {//序列化if (genericClass.isInstance(inob)) {byte[] data = SerializationUtil.serialize(inob);out.writeInt(data.length);out.writeBytes(data);}}
}

cn.tianjun.rpc.utils.RpcRequest

package cn.tianjun.rpc.utils;/*** 封装 RPC 请求*  封装发送的object的反射属性*/
public class RpcRequest {private String requestId;private String className;private String methodName;private Class<?>[] parameterTypes;private Object[] parameters;public String getRequestId() {return requestId;}public void setRequestId(String requestId) {this.requestId = requestId;}public String getClassName() {return className;}public void setClassName(String className) {this.className = className;}public String getMethodName() {return methodName;}public void setMethodName(String methodName) {this.methodName = methodName;}public Class<?>[] getParameterTypes() {return parameterTypes;}public void setParameterTypes(Class<?>[] parameterTypes) {this.parameterTypes = parameterTypes;}public Object[] getParameters() {return parameters;}public void setParameters(Object[] parameters) {this.parameters = parameters;}
}

cn.tianjun.rpc.utils.RpcResponse

package cn.tianjun.rpc.utils;/*** 封装 RPC 响应* 封装相应object*/
public class RpcResponse {private String requestId;private Throwable error;private Object result;public boolean isError() {return error != null;}public String getRequestId() {return requestId;}public void setRequestId(String requestId) {this.requestId = requestId;}public Throwable getError() {return error;}public void setError(Throwable error) {this.error = error;}public Object getResult() {return result;}public void setResult(Object result) {this.result = result;}
}

cn.tianjun.rpc.utils.SerializationUtil

package cn.tianjun.rpc.utils;import com.dyuproject.protostuff.LinkedBuffer;
import com.dyuproject.protostuff.ProtostuffIOUtil;
import com.dyuproject.protostuff.Schema;
import com.dyuproject.protostuff.runtime.RuntimeSchema;import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;import org.objenesis.Objenesis;
import org.objenesis.ObjenesisStd;/*** 序列化工具类(基于 Protostuff 实现)**/
public class SerializationUtil {private static Map<Class<?>, Schema<?>> cachedSchema = new ConcurrentHashMap<Class<?>, Schema<?>>();private static Objenesis objenesis = new ObjenesisStd(true);private SerializationUtil() {}/*** 获取类的schema* @param cls* @return*/@SuppressWarnings("unchecked")private static <T> Schema<T> getSchema(Class<T> cls) {Schema<T> schema = (Schema<T>) cachedSchema.get(cls);if (schema == null) {schema = RuntimeSchema.createFrom(cls);if (schema != null) {cachedSchema.put(cls, schema);}}return schema;}/*** 序列化(对象 -> 字节数组)*/@SuppressWarnings("unchecked")public static <T> byte[] serialize(T obj) {Class<T> cls = (Class<T>) obj.getClass();LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);try {Schema<T> schema = getSchema(cls);return ProtostuffIOUtil.toByteArray(obj, schema, buffer);//序列�?} catch (Exception e) {throw new IllegalStateException(e.getMessage(), e);} finally {buffer.clear();}}/*** 反序列化(字节数组 -> 对象)*/public static <T> T deserialize(byte[] data, Class<T> cls) {try {/** 如果一个类没有参数为空的构造方法时候,那么你直接调用newInstance方法试图得到一个实例对象的时候是会抛出异常的* 通过ObjenesisStd可以完美的避开这个问题* */T message = (T) objenesis.newInstance(cls);//实例化Schema<T> schema = getSchema(cls);//获取类的schemaProtostuffIOUtil.mergeFrom(data, message, schema);return message;} catch (Exception e) {throw new IllegalStateException(e.getMessage(), e);}}
}

3、protocol的实现

这个只是接口的定义,pom.xml里面为空。

cn.tianjun.rpc.protocol.HelloService

package cn.tianjun.rpc.protocol;public interface HelloService {String hello(String name);String hello(Person person);
}

cn.tianjun.rpc.protocol.Person

package cn.tianjun.rpc.protocol;public class Person {private String firstName;private String lastName;public Person() {}public Person(String firstName, String lastName) {this.firstName = firstName;this.lastName = lastName;}public String getFirstName() {return firstName;}public void setFirstName(String firstName) {this.firstName = firstName;}public String getLastName() {return lastName;}public void setLastName(String lastName) {this.lastName = lastName;}
}

这篇关于自定义基于netty的rpc框架(4)---zk和utils以及protocol的实现的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/873702

相关文章

Flutter实现文字镂空效果的详细步骤

《Flutter实现文字镂空效果的详细步骤》:本文主要介绍如何使用Flutter实现文字镂空效果,包括创建基础应用结构、实现自定义绘制器、构建UI界面以及实现颜色选择按钮等步骤,并详细解析了混合模... 目录引言实现原理开始实现步骤1:创建基础应用结构步骤2:创建主屏幕步骤3:实现自定义绘制器步骤4:构建U

SpringBoot中四种AOP实战应用场景及代码实现

《SpringBoot中四种AOP实战应用场景及代码实现》面向切面编程(AOP)是Spring框架的核心功能之一,它通过预编译和运行期动态代理实现程序功能的统一维护,在SpringBoot应用中,AO... 目录引言场景一:日志记录与性能监控业务需求实现方案使用示例扩展:MDC实现请求跟踪场景二:权限控制与

Android实现定时任务的几种方式汇总(附源码)

《Android实现定时任务的几种方式汇总(附源码)》在Android应用中,定时任务(ScheduledTask)的需求几乎无处不在:从定时刷新数据、定时备份、定时推送通知,到夜间静默下载、循环执行... 目录一、项目介绍1. 背景与意义二、相关基础知识与系统约束三、方案一:Handler.postDel

使用Python实现IP地址和端口状态检测与监控

《使用Python实现IP地址和端口状态检测与监控》在网络运维和服务器管理中,IP地址和端口的可用性监控是保障业务连续性的基础需求,本文将带你用Python从零打造一个高可用IP监控系统,感兴趣的小伙... 目录概述:为什么需要IP监控系统使用步骤说明1. 环境准备2. 系统部署3. 核心功能配置系统效果展

Python实现微信自动锁定工具

《Python实现微信自动锁定工具》在数字化办公时代,微信已成为职场沟通的重要工具,但临时离开时忘记锁屏可能导致敏感信息泄露,下面我们就来看看如何使用Python打造一个微信自动锁定工具吧... 目录引言:当微信隐私遇到自动化守护效果展示核心功能全景图技术亮点深度解析1. 无操作检测引擎2. 微信路径智能获

Python中pywin32 常用窗口操作的实现

《Python中pywin32常用窗口操作的实现》本文主要介绍了Python中pywin32常用窗口操作的实现,pywin32主要的作用是供Python开发者快速调用WindowsAPI的一个... 目录获取窗口句柄获取最前端窗口句柄获取指定坐标处的窗口根据窗口的完整标题匹配获取句柄根据窗口的类别匹配获取句

在 Spring Boot 中实现异常处理最佳实践

《在SpringBoot中实现异常处理最佳实践》本文介绍如何在SpringBoot中实现异常处理,涵盖核心概念、实现方法、与先前查询的集成、性能分析、常见问题和最佳实践,感兴趣的朋友一起看看吧... 目录一、Spring Boot 异常处理的背景与核心概念1.1 为什么需要异常处理?1.2 Spring B

Python位移操作和位运算的实现示例

《Python位移操作和位运算的实现示例》本文主要介绍了Python位移操作和位运算的实现示例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一... 目录1. 位移操作1.1 左移操作 (<<)1.2 右移操作 (>>)注意事项:2. 位运算2.1

如何在 Spring Boot 中实现 FreeMarker 模板

《如何在SpringBoot中实现FreeMarker模板》FreeMarker是一种功能强大、轻量级的模板引擎,用于在Java应用中生成动态文本输出(如HTML、XML、邮件内容等),本文... 目录什么是 FreeMarker 模板?在 Spring Boot 中实现 FreeMarker 模板1. 环

Qt实现网络数据解析的方法总结

《Qt实现网络数据解析的方法总结》在Qt中解析网络数据通常涉及接收原始字节流,并将其转换为有意义的应用层数据,这篇文章为大家介绍了详细步骤和示例,感兴趣的小伙伴可以了解下... 目录1. 网络数据接收2. 缓冲区管理(处理粘包/拆包)3. 常见数据格式解析3.1 jsON解析3.2 XML解析3.3 自定义