手写分布式存储系统v0.1版本

2024-02-01 10:36

本文主要是介绍手写分布式存储系统v0.1版本,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

引言

这是手写分布式存储系统v0.1版本,只有一个目标就是支持通过tcp接收数据并落地到磁盘文件(单机模式),那接下来就开始吧

设计

实现一个系统,设计是最过瘾的过程没有之一,类似你搭积木前在脑海设计构建一副大致的“雏形”,只有有了这个东西之后才能够指导最终实现的方向以及确保不会偏离的太差。这里我对v0.1的预期是如下的,只要客户端能够通过tcp将数据请求到Linux机器的端口,咱们的v0.1版本就能够监听到并且将数据落地到磁盘,只需要实现这个功能就可以了。
在这里插入图片描述

代码实现

这个功能中会跟网络和写磁盘打交道,那直接用Netty现成的包就好了,至于写磁盘的话用JDK原生自带的就够了。大致抽象出两个对应的接口以及实现,如下

public interface NetService {void start();void stop();
}public class NetServiceImpl implements NetService{private static final Logger LOG = LoggerFactory.getLogger(NetServiceImpl.class);private EventLoopGroup bossGroup = null;private EventLoopGroup workerGroup = null;public void start() {//bossGroup就是parentGroup,是负责处理TCP/IP连接的EventLoopGroup bossGroup = new NioEventLoopGroup(1);//workerGroup就是childGroup,是负责处理Channel(通道)EventLoopGroup workerGroup = new NioEventLoopGroup(30);try {ServerBootstrap bootstrap = new ServerBootstrap().group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)//初始化服务端可连接队列,指定了队列的大小128.option(ChannelOption.SO_BACKLOG, 128)//通过NoDelay禁用Nagle,使消息立即发出去,不用等待到一定的数据量才发出去.option(ChannelOption.TCP_NODELAY, true)//保持长连接.childOption(ChannelOption.SO_KEEPALIVE, true).handler(new LoggingHandler(LogLevel.INFO)).childHandler(new ServerInitializer());ChannelFuture future = bootstrap.bind(8888).sync();future.channel().closeFuture().sync();} catch (Exception e){}finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}public void stop() {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}
}

再写下数据存储相关的接口和类如下

public interface DataStorage<T> {void save(T t) throws Exception;
}public class LocalDataStorageImpl implements DataStorage<String>{private static MappedByteBuffer mappedByteBuffer;private static Integer _1Gb = 1024*1024*1024;private static Integer _1MB = 1024*1024;public LocalDataStorageImpl() {try {FileChannel fileChannel = new RandomAccessFile("./testWrite", "rw").getChannel();mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, _1MB);} catch (Exception e) {e.printStackTrace();}}@Overridepublic void save(String data) throws Exception{System.out.println("start writeDataToFile data is :"+data);mappedByteBuffer.put(data.getBytes());System.out.println("writeDataToFile end!");}
}

再实现Netty相关的逻辑

@ChannelHandler.Sharable
public class ServerHandler extends SimpleChannelInboundHandler<String> {private static final Logger LOG = LoggerFactory.getLogger(ServerHandler.class);private DataStorage dataStorage = new LocalDataStorageImpl();@Overridepublic void channelActive(ChannelHandlerContext channelHandlerContext) throws Exception {channelHandlerContext.write("Welcome to sherlock home!");channelHandlerContext.write("It is "+ new Date()+"\n");channelHandlerContext.flush();}@Overridepublic void channelRead0(ChannelHandlerContext ctx, String request) throws Exception {LOG.info("========readdata, request is {}=========", request);//异步通过专门的EventLoop线程池进行处理dataStorage.save(request);String response;boolean close = false;if (request.isEmpty()) {response = "Please type something.\r\n";} else if ("bye".equals(request.toLowerCase())) {response = "Have a good day!\r\n";close = true;} else {response = "Did you say '" + request + "'?\r\n";}ChannelFuture future = ctx.write(response);if (close) {future.addListener(ChannelFutureListener.CLOSE);}}@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) {ctx.flush();}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {cause.printStackTrace();ctx.close();}
}public class ServerInitializer extends ChannelInitializer<SocketChannel> {private static final Logger LOG = LoggerFactory.getLogger(ServerInitializer.class);private static final StringDecoder DECODER = new StringDecoder();private static final StringEncoder ENCODER = new StringEncoder();private static final ServerHandler SERVER_HANDLER = new ServerHandler();@Overridepublic void initChannel(SocketChannel socketChannel) throws Exception {ChannelPipeline pipeline = socketChannel.pipeline();pipeline.addLast(new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));pipeline.addLast(DECODER);pipeline.addLast(ENCODER);pipeline.addLast(SERVER_HANDLER);}
}

最后,咱们再来实现主函数逻辑

public class Main {public static void main(String[] args) {NetService netService = new NetServiceImpl();netService.start();}
}

基本上就差不多了,代码优化往后放放,现在嘛,能跑就行☺️

运行调试

启动服务后,咱们通过下列指令往接口插入数据

(echo 'hello'; sleep 2) | telnet 127.0.0.1 8888
(echo 'sherlock'; sleep 2) | telnet 127.0.0.1 8888
(echo 'thanks'; sleep 2) | telnet 127.0.0.1 8888
(echo 'are you ok?'; sleep 2) | telnet 127.0.0.1 8888

通过下面控制台的信息能够看到接收到完整的数据了,说明v0.1版本通过socket端口读取数据的链路是正常的
在这里插入图片描述

再看看本地磁盘文件,通过打印出来能够看到数据是已经落到磁盘的
在这里插入图片描述

小结

以上就是实现的整个过程,代码不可谓不粗糙,不过咱们讲究的就是一个莽,快速闭环看到效果才是最重要的,至于优化嘛,放到后面的版本慢慢优化~

这篇关于手写分布式存储系统v0.1版本的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!


原文地址:
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.chinasem.cn/article/667002

相关文章

基于MongoDB实现文件的分布式存储

《基于MongoDB实现文件的分布式存储》分布式文件存储的方案有很多,今天分享一个基于mongodb数据库来实现文件的存储,mongodb支持分布式部署,以此来实现文件的分布式存储,需要的朋友可以参考... 目录一、引言二、GridFS 原理剖析三、Spring Boot 集成 GridFS3.1 添加依赖

Android NDK版本迭代与FFmpeg交叉编译完全指南

《AndroidNDK版本迭代与FFmpeg交叉编译完全指南》在Android开发中,使用NDK进行原生代码开发是一项常见需求,特别是当我们需要集成FFmpeg这样的多媒体处理库时,本文将深入分析A... 目录一、android NDK版本迭代分界线二、FFmpeg交叉编译关键注意事项三、完整编译脚本示例四

查看MySQL数据库版本的四种方法

《查看MySQL数据库版本的四种方法》查看MySQL数据库的版本信息可以通过多种方法实现,包括使用命令行工具、SQL查询语句和图形化管理工具等,以下是详细的步骤和示例代码,需要的朋友可以参考下... 目录方法一:使用命令行工具1. 使用 mysql 命令示例:方法二:使用 mysqladmin 命令示例:方

Java版本不兼容问题详细解决方案步骤

《Java版本不兼容问题详细解决方案步骤》:本文主要介绍Java版本不兼容问题解决的相关资料,详细分析了问题原因,并提供了解决方案,包括统一JDK版本、修改项目配置和清理旧版本残留等步骤,需要的朋... 目录错误原因分析解决方案步骤第一步:统一 JDK 版本第二步:修改项目配置第三步:清理旧版本残留兼容性对

Redis实现分布式锁全解析之从原理到实践过程

《Redis实现分布式锁全解析之从原理到实践过程》:本文主要介绍Redis实现分布式锁全解析之从原理到实践过程,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、背景介绍二、解决方案(一)使用 SETNX 命令(二)设置锁的过期时间(三)解决锁的误删问题(四)Re

Gradle下如何搭建SpringCloud分布式环境

《Gradle下如何搭建SpringCloud分布式环境》:本文主要介绍Gradle下如何搭建SpringCloud分布式环境问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地... 目录Gradle下搭建SpringCloud分布式环境1.idea配置好gradle2.创建一个空的gr

Linux搭建单机MySQL8.0.26版本的操作方法

《Linux搭建单机MySQL8.0.26版本的操作方法》:本文主要介绍Linux搭建单机MySQL8.0.26版本的操作方法,本文通过图文并茂的形式给大家讲解的非常详细,感兴趣的朋友一起看看吧... 目录概述环境信息数据库服务安装步骤下载前置依赖服务下载方式一:进入官网下载,并上传到宿主机中,适合离线环境

C#使用StackExchange.Redis实现分布式锁的两种方式介绍

《C#使用StackExchange.Redis实现分布式锁的两种方式介绍》分布式锁在集群的架构中发挥着重要的作用,:本文主要介绍C#使用StackExchange.Redis实现分布式锁的... 目录自定义分布式锁获取锁释放锁自动续期StackExchange.Redis分布式锁获取锁释放锁自动续期分布式

深入理解Apache Kafka(分布式流处理平台)

《深入理解ApacheKafka(分布式流处理平台)》ApacheKafka作为现代分布式系统中的核心中间件,为构建高吞吐量、低延迟的数据管道提供了强大支持,本文将深入探讨Kafka的核心概念、架构... 目录引言一、Apache Kafka概述1.1 什么是Kafka?1.2 Kafka的核心概念二、Ka

浅谈配置MMCV环境,解决报错,版本不匹配问题

《浅谈配置MMCV环境,解决报错,版本不匹配问题》:本文主要介绍浅谈配置MMCV环境,解决报错,版本不匹配问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录配置MMCV环境,解决报错,版本不匹配错误示例正确示例总结配置MMCV环境,解决报错,版本不匹配在col