Java 网络编程之TCP(五):分析服务端注册OP_WRITE写数据的各种场景(三)

2024-04-25 12:52

本文主要是介绍Java 网络编程之TCP(五):分析服务端注册OP_WRITE写数据的各种场景(三),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

在服务端使用多线程对同个客户端进行读写,会带来意想不到的问题。

前面的文章中,服务端都是在一个单线程main中,处理所有接收到的IO事件,为了提高效率,会自然的想到,为OP_READ和OP_WRITE事件分配多线程处理。

需求:服务端把接收到的数据,原样返回给客户端

服务端代码如下:

直接在单线程的代码上,把单线程的read和write逻辑,放入一个单独的线程

服务代码如下:

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;public class SocketMultiplexingSingleThreadv2_2 {private ServerSocketChannel server = null;private Selector selector = null;   //linux 多路复用器(select poll epoll) nginx  event{}int port = 9090;public void initServer() {try {server = ServerSocketChannel.open();server.configureBlocking(false);server.bind(new InetSocketAddress(port));selector = Selector.open();  //  select  poll  *epollserver.register(selector, SelectionKey.OP_ACCEPT);} catch (IOException e) {e.printStackTrace();}}public void start() {initServer();System.out.println("服务器启动了。。。。。");try {while (true) {
//                Set<SelectionKey> keys = selector.keys();
//                System.out.println(keys.size()+"   size");while (selector.select(50) > 0) {Set<SelectionKey> selectionKeys = selector.selectedKeys();Iterator<SelectionKey> iter = selectionKeys.iterator();while (iter.hasNext()) {SelectionKey key = iter.next();iter.remove();if (key.isAcceptable()) {acceptHandler(key);} else if (key.isReadable()) {
//                            key.cancel();  //现在多路复用器里把key  cancel了System.out.println("in.....");readHandler(key);//还是阻塞的嘛? 即便以抛出了线程去读取,但是在时差里,这个key的read事件会被重复触发} else if(key.isWritable()){  //我之前没讲过写的事件!!!!!//写事件<--  send-queue  只要是空的,就一定会给你返回可以写的事件,就会回调我们的写方法//你真的要明白:什么时候写?不是依赖send-queue是不是有空间//1,你准备好要写什么了,这是第一步//2,第二步你才关心send-queue是否有空间//3,so,读 read 一开始就要注册,但是write依赖以上关系,什么时候用什么时候注册//4,如果一开始就注册了write的事件,进入死循环,一直调起!!!
//                            key.cancel();key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);writeHandler(key);}}}}} catch (IOException e) {e.printStackTrace();}}private void writeHandler(SelectionKey key) {new Thread(()->{System.out.println("write handler...");SocketChannel client = (SocketChannel) key.channel();ByteBuffer buffer = (ByteBuffer) key.attachment();buffer.flip();while (buffer.hasRemaining()) {try {int write = client.write(buffer);System.out.println("write " + Thread.currentThread().getName()+ " " + write);} catch (IOException e) {e.printStackTrace();}}try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}buffer.clear();
//            key.cancel();//            try {client.shutdownOutput();
//client.close();
//
//            } catch (IOException e) {
//                e.printStackTrace();
//            }}).start();}public void acceptHandler(SelectionKey key) {try {ServerSocketChannel ssc = (ServerSocketChannel) key.channel();SocketChannel client = ssc.accept();client.configureBlocking(false);ByteBuffer buffer = ByteBuffer.allocate(8192);client.register(selector, SelectionKey.OP_READ, buffer);System.out.println("-------------------------------------------");System.out.println("新客户端:" + client.getRemoteAddress());System.out.println("-------------------------------------------");} catch (IOException e) {e.printStackTrace();}}public void readHandler(SelectionKey key) {new Thread(()->{System.out.println("read handler.....");SocketChannel client = (SocketChannel) key.channel();ByteBuffer buffer = (ByteBuffer) key.attachment();buffer.clear();int read = 0;try {while (true) {read = client.read(buffer);System.out.println("read " + Thread.currentThread().getName()+ " " + read);if (read > 0) {client.register(key.selector(), key.interestOps() + SelectionKey.OP_WRITE,buffer);} else if (read == 0) {break;} else {client.close();break;}}} catch (IOException e) {try {System.out.println("client " + client.getRemoteAddress() + " disconnected");client.close();} catch (IOException ex) {throw new RuntimeException(ex);}e.printStackTrace();}}).start();}public static void main(String[] args) {SocketMultiplexingSingleThreadv2_2 service = new SocketMultiplexingSingleThreadv2_2();service.start();}
}

测试:

先启动一个服务端,再启动一个客户端,客户端发送数据

服务端日志:

服务器启动了。。。。。
-------------------------------------------
新客户端:/127.0.0.1:21598
-------------------------------------------
in.....
in.....
read handler.....
in.....
read handler.....
read Thread-0 5
read Thread-1 0
read handler.....
read Thread-2 0
read Thread-0 0
write handler...

客户端日志:

client connected to server
1234
client receive data from consolejava.io.BufferedInputStream@6acfcaf3 : 1234

可以看到,客户端发送数据,没有接收到服务端返回的数据;

服务端接收到数据后,在写数据的时候,buffer中没有数据可写;

再仔细看下服务端的日志,可以同个客户端只发送一条数据的时候,有3个线程来处理,其他两个线程读到的数据都是0;


一个客户端的读事件,分配一个线程处理,但是线程还没处理完,下个读事件就来了,就又分配一个线程处理。。。而同一个客户端共享一个buffer,在register OP_READ的时候attach的。
这样使得buffer中的数据还没来得及写出去,就被其他读线程给冲掉了(read == 0);

tip:read事件来的时候,如果不读取数据,read事件会一直有的

解决方法:不可以并发读同一个client, 在处理一个Client的 OP_READ的时候先取消 OP_READ的注册,读完了后,在注册一个 OP_READ

新的服务端代码:

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Set;public class SocketMultiplexingSingleThreadv2 {private ServerSocketChannel server = null;private Selector selector = null;   //linux 多路复用器(select poll epoll) nginx  event{}int port = 9090;public void initServer() {try {server = ServerSocketChannel.open();server.configureBlocking(false);server.bind(new InetSocketAddress(port));selector = Selector.open();  //  select  poll  *epollserver.register(selector, SelectionKey.OP_ACCEPT);} catch (IOException e) {e.printStackTrace();}}public void start() {initServer();System.out.println("服务器启动了。。。。。");try {while (true) {
//                Set<SelectionKey> keys = selector.keys();
//                System.out.println(keys.size()+"   size");while (selector.select(50) > 0) {Set<SelectionKey> selectionKeys = selector.selectedKeys();Iterator<SelectionKey> iter = selectionKeys.iterator();while (iter.hasNext()) {SelectionKey key = iter.next();iter.remove();if (key.isAcceptable()) {acceptHandler(key);} else if (key.isReadable()) {
//                            key.cancel();  //现在多路复用器里把key  cancel了System.out.println("in.....");// 同一个Client,读之前先取消OP_READ,防止多线程冲突吹key.interestOps(key.interestOps() & ~SelectionKey.OP_READ);readHandler(key);//还是阻塞的嘛? 即便以抛出了线程去读取,但是在时差里,这个key的read事件会被重复触发} else if(key.isWritable()){  //我之前没讲过写的事件!!!!!//写事件<--  send-queue  只要是空的,就一定会给你返回可以写的事件,就会回调我们的写方法//你真的要明白:什么时候写?不是依赖send-queue是不是有空间//1,你准备好要写什么了,这是第一步//2,第二步你才关心send-queue是否有空间//3,so,读 read 一开始就要注册,但是write依赖以上关系,什么时候用什么时候注册//4,如果一开始就注册了write的事件,进入死循环,一直调起!!!
//                            key.cancel();key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);writeHandler(key);}}}}} catch (IOException e) {e.printStackTrace();}}private void writeHandler(SelectionKey key) {new Thread(()->{System.out.println("write handler...");SocketChannel client = (SocketChannel) key.channel();ByteBuffer buffer = (ByteBuffer) key.attachment();buffer.flip();while (buffer.hasRemaining()) {try {int write = client.write(buffer);System.out.println("write " + Thread.currentThread().getName()+ " " + write);} catch (IOException e) {e.printStackTrace();}}try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}buffer.clear();
//            key.cancel();//            try {client.shutdownOutput();
//client.close();
//
//            } catch (IOException e) {
//                e.printStackTrace();
//            }}).start();}public void acceptHandler(SelectionKey key) {try {ServerSocketChannel ssc = (ServerSocketChannel) key.channel();SocketChannel client = ssc.accept();client.configureBlocking(false);ByteBuffer buffer = ByteBuffer.allocate(8192);client.register(selector, SelectionKey.OP_READ, buffer);System.out.println("-------------------------------------------");System.out.println("新客户端:" + client.getRemoteAddress());System.out.println("-------------------------------------------");} catch (IOException e) {e.printStackTrace();}}public void readHandler(SelectionKey key) {new Thread(()->{System.out.println("read handler.....");SocketChannel client = (SocketChannel) key.channel();ByteBuffer buffer = (ByteBuffer) key.attachment();buffer.clear();int read = 0;try {while (true) {read = client.read(buffer);System.out.println(Thread.currentThread().getName()+ " " + read);if (read > 0) {// 同一个Client,读完数据后,要再次注册OP_READ,读后面发送过来的数据key.interestOps(  SelectionKey.OP_READ);client.register(key.selector(), key.interestOps() + SelectionKey.OP_WRITE,buffer);} else if (read == 0) {break;} else {client.close();break;}}} catch (IOException e) {try {System.out.println("client " + client.getRemoteAddress() + " disconnected");client.close();} catch (IOException ex) {throw new RuntimeException(ex);}e.printStackTrace();}}).start();}public static void main(String[] args) {SocketMultiplexingSingleThreadv2 service = new SocketMultiplexingSingleThreadv2();service.start();}
}

测试:

先启动一个服务端,再启动一个客户端1,客户端1发送数据

服务端日志:

服务器启动了。。。。。
-------------------------------------------
新客户端:/127.0.0.1:24029
-------------------------------------------
in.....
read handler.....
Thread-0 8
Thread-0 0
write handler...
write Thread-1 8

客户端1日志:

client connected to server
client1
client receive data from consolejava.io.BufferedInputStream@65231a33 : client1client receive data from serverjava.net.Socket$SocketInputStream@4629104a data size:8: client1

可以看到,客户单和服务端都可以正常接收和发送数据。

再添加一个客户端2,发送数据

服务端日志:

服务器启动了。。。。。
-------------------------------------------
新客户端:/127.0.0.1:24029
-------------------------------------------
in.....
read handler.....
Thread-0 8
Thread-0 0
write handler...
write Thread-1 8
-------------------------------------------
新客户端:/127.0.0.1:24105
-------------------------------------------
in.....
read handler.....
Thread-2 8
Thread-2 0
write handler...
write Thread-3 8

客户端2的日志:

client connected to server
client2
client receive data from consolejava.io.BufferedInputStream@65231a33 : client2client receive data from serverjava.net.Socket$SocketInputStream@27f8302d data size:8: client2

可以看到,客户端2和服务端都可以正常接收和发送数据。

客户端1,再次发送数据

客户端日志:

client connected to server
client1
client receive data from consolejava.io.BufferedInputStream@65231a33 : client1client receive data from serverjava.net.Socket$SocketInputStream@4629104a data size:8: client1clent1_2
client receive data from consolejava.io.BufferedInputStream@65231a33 : clent1_2client receive data from serverjava.net.Socket$SocketInputStream@4629104a data size:9: clent1_2

服务端日志:

服务器启动了。。。。。
-------------------------------------------
新客户端:/127.0.0.1:24029
-------------------------------------------
in.....
read handler.....
Thread-0 8
Thread-0 0
write handler...
write Thread-1 8
-------------------------------------------
新客户端:/127.0.0.1:24105
-------------------------------------------
in.....
read handler.....
Thread-2 8
Thread-2 0
write handler...
write Thread-3 8
in.....
read handler.....
Thread-4 9
Thread-4 0
write handler...
write Thread-5 9

从服务端日志中,可以看到,每个客户端的读事件,只有一个线程处理。

整个处理流程是服务预期的。

这篇关于Java 网络编程之TCP(五):分析服务端注册OP_WRITE写数据的各种场景(三)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/934738

相关文章

SpringBoot 获取请求参数的常用注解及用法

《SpringBoot获取请求参数的常用注解及用法》SpringBoot通过@RequestParam、@PathVariable等注解支持从HTTP请求中获取参数,涵盖查询、路径、请求体、头、C... 目录SpringBoot 提供了多种注解来方便地从 HTTP 请求中获取参数以下是主要的注解及其用法:1

HTTP 与 SpringBoot 参数提交与接收协议方式

《HTTP与SpringBoot参数提交与接收协议方式》HTTP参数提交方式包括URL查询、表单、JSON/XML、路径变量、头部、Cookie、GraphQL、WebSocket和SSE,依据... 目录HTTP 协议支持多种参数提交方式,主要取决于请求方法(Method)和内容类型(Content-Ty

深度解析Java @Serial 注解及常见错误案例

《深度解析Java@Serial注解及常见错误案例》Java14引入@Serial注解,用于编译时校验序列化成员,替代传统方式解决运行时错误,适用于Serializable类的方法/字段,需注意签... 目录Java @Serial 注解深度解析1. 注解本质2. 核心作用(1) 主要用途(2) 适用位置3

深入浅出Spring中的@Autowired自动注入的工作原理及实践应用

《深入浅出Spring中的@Autowired自动注入的工作原理及实践应用》在Spring框架的学习旅程中,@Autowired无疑是一个高频出现却又让初学者头疼的注解,它看似简单,却蕴含着Sprin... 目录深入浅出Spring中的@Autowired:自动注入的奥秘什么是依赖注入?@Autowired

Debian 13升级后网络转发等功能异常怎么办? 并非错误而是管理机制变更

《Debian13升级后网络转发等功能异常怎么办?并非错误而是管理机制变更》很多朋友反馈,更新到Debian13后网络转发等功能异常,这并非BUG而是Debian13Trixie调整... 日前 Debian 13 Trixie 发布后已经有众多网友升级到新版本,只不过升级后发现某些功能存在异常,例如网络转

Spring 依赖注入与循环依赖总结

《Spring依赖注入与循环依赖总结》这篇文章给大家介绍Spring依赖注入与循环依赖总结篇,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧... 目录1. Spring 三级缓存解决循环依赖1. 创建UserService原始对象2. 将原始对象包装成工

Java中如何正确的停掉线程

《Java中如何正确的停掉线程》Java通过interrupt()通知线程停止而非强制,确保线程自主处理中断,避免数据损坏,线程池的shutdown()等待任务完成,shutdownNow()强制中断... 目录为什么不强制停止为什么 Java 不提供强制停止线程的能力呢?如何用interrupt停止线程s

SpringBoot请求参数传递与接收示例详解

《SpringBoot请求参数传递与接收示例详解》本文给大家介绍SpringBoot请求参数传递与接收示例详解,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋... 目录I. 基础参数传递i.查询参数(Query Parameters)ii.路径参数(Path Va

SpringBoot路径映射配置的实现步骤

《SpringBoot路径映射配置的实现步骤》本文介绍了如何在SpringBoot项目中配置路径映射,使得除static目录外的资源可被访问,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一... 目录SpringBoot路径映射补:springboot 配置虚拟路径映射 @RequestMapp

Java MCP 的鉴权深度解析

《JavaMCP的鉴权深度解析》文章介绍JavaMCP鉴权的实现方式,指出客户端可通过queryString、header或env传递鉴权信息,服务器端支持工具单独鉴权、过滤器集中鉴权及启动时鉴权... 目录一、MCP Client 侧(负责传递,比较简单)(1)常见的 mcpServers json 配置