Java 网络编程之TCP(五):分析服务端注册OP_WRITE写数据的各种场景(三)

2024-04-25 12:52

本文主要是介绍Java 网络编程之TCP(五):分析服务端注册OP_WRITE写数据的各种场景(三),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

在服务端使用多线程对同个客户端进行读写,会带来意想不到的问题。

前面的文章中,服务端都是在一个单线程main中,处理所有接收到的IO事件,为了提高效率,会自然的想到,为OP_READ和OP_WRITE事件分配多线程处理。

需求:服务端把接收到的数据,原样返回给客户端

服务端代码如下:

直接在单线程的代码上,把单线程的read和write逻辑,放入一个单独的线程

服务代码如下:

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;public class SocketMultiplexingSingleThreadv2_2 {private ServerSocketChannel server = null;private Selector selector = null;   //linux 多路复用器(select poll epoll) nginx  event{}int port = 9090;public void initServer() {try {server = ServerSocketChannel.open();server.configureBlocking(false);server.bind(new InetSocketAddress(port));selector = Selector.open();  //  select  poll  *epollserver.register(selector, SelectionKey.OP_ACCEPT);} catch (IOException e) {e.printStackTrace();}}public void start() {initServer();System.out.println("服务器启动了。。。。。");try {while (true) {
//                Set<SelectionKey> keys = selector.keys();
//                System.out.println(keys.size()+"   size");while (selector.select(50) > 0) {Set<SelectionKey> selectionKeys = selector.selectedKeys();Iterator<SelectionKey> iter = selectionKeys.iterator();while (iter.hasNext()) {SelectionKey key = iter.next();iter.remove();if (key.isAcceptable()) {acceptHandler(key);} else if (key.isReadable()) {
//                            key.cancel();  //现在多路复用器里把key  cancel了System.out.println("in.....");readHandler(key);//还是阻塞的嘛? 即便以抛出了线程去读取,但是在时差里,这个key的read事件会被重复触发} else if(key.isWritable()){  //我之前没讲过写的事件!!!!!//写事件<--  send-queue  只要是空的,就一定会给你返回可以写的事件,就会回调我们的写方法//你真的要明白:什么时候写?不是依赖send-queue是不是有空间//1,你准备好要写什么了,这是第一步//2,第二步你才关心send-queue是否有空间//3,so,读 read 一开始就要注册,但是write依赖以上关系,什么时候用什么时候注册//4,如果一开始就注册了write的事件,进入死循环,一直调起!!!
//                            key.cancel();key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);writeHandler(key);}}}}} catch (IOException e) {e.printStackTrace();}}private void writeHandler(SelectionKey key) {new Thread(()->{System.out.println("write handler...");SocketChannel client = (SocketChannel) key.channel();ByteBuffer buffer = (ByteBuffer) key.attachment();buffer.flip();while (buffer.hasRemaining()) {try {int write = client.write(buffer);System.out.println("write " + Thread.currentThread().getName()+ " " + write);} catch (IOException e) {e.printStackTrace();}}try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}buffer.clear();
//            key.cancel();//            try {client.shutdownOutput();
//client.close();
//
//            } catch (IOException e) {
//                e.printStackTrace();
//            }}).start();}public void acceptHandler(SelectionKey key) {try {ServerSocketChannel ssc = (ServerSocketChannel) key.channel();SocketChannel client = ssc.accept();client.configureBlocking(false);ByteBuffer buffer = ByteBuffer.allocate(8192);client.register(selector, SelectionKey.OP_READ, buffer);System.out.println("-------------------------------------------");System.out.println("新客户端:" + client.getRemoteAddress());System.out.println("-------------------------------------------");} catch (IOException e) {e.printStackTrace();}}public void readHandler(SelectionKey key) {new Thread(()->{System.out.println("read handler.....");SocketChannel client = (SocketChannel) key.channel();ByteBuffer buffer = (ByteBuffer) key.attachment();buffer.clear();int read = 0;try {while (true) {read = client.read(buffer);System.out.println("read " + Thread.currentThread().getName()+ " " + read);if (read > 0) {client.register(key.selector(), key.interestOps() + SelectionKey.OP_WRITE,buffer);} else if (read == 0) {break;} else {client.close();break;}}} catch (IOException e) {try {System.out.println("client " + client.getRemoteAddress() + " disconnected");client.close();} catch (IOException ex) {throw new RuntimeException(ex);}e.printStackTrace();}}).start();}public static void main(String[] args) {SocketMultiplexingSingleThreadv2_2 service = new SocketMultiplexingSingleThreadv2_2();service.start();}
}

测试:

先启动一个服务端,再启动一个客户端,客户端发送数据

服务端日志:

服务器启动了。。。。。
-------------------------------------------
新客户端:/127.0.0.1:21598
-------------------------------------------
in.....
in.....
read handler.....
in.....
read handler.....
read Thread-0 5
read Thread-1 0
read handler.....
read Thread-2 0
read Thread-0 0
write handler...

客户端日志:

client connected to server
1234
client receive data from consolejava.io.BufferedInputStream@6acfcaf3 : 1234

可以看到,客户端发送数据,没有接收到服务端返回的数据;

服务端接收到数据后,在写数据的时候,buffer中没有数据可写;

再仔细看下服务端的日志,可以同个客户端只发送一条数据的时候,有3个线程来处理,其他两个线程读到的数据都是0;


一个客户端的读事件,分配一个线程处理,但是线程还没处理完,下个读事件就来了,就又分配一个线程处理。。。而同一个客户端共享一个buffer,在register OP_READ的时候attach的。
这样使得buffer中的数据还没来得及写出去,就被其他读线程给冲掉了(read == 0);

tip:read事件来的时候,如果不读取数据,read事件会一直有的

解决方法:不可以并发读同一个client, 在处理一个Client的 OP_READ的时候先取消 OP_READ的注册,读完了后,在注册一个 OP_READ

新的服务端代码:

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Set;public class SocketMultiplexingSingleThreadv2 {private ServerSocketChannel server = null;private Selector selector = null;   //linux 多路复用器(select poll epoll) nginx  event{}int port = 9090;public void initServer() {try {server = ServerSocketChannel.open();server.configureBlocking(false);server.bind(new InetSocketAddress(port));selector = Selector.open();  //  select  poll  *epollserver.register(selector, SelectionKey.OP_ACCEPT);} catch (IOException e) {e.printStackTrace();}}public void start() {initServer();System.out.println("服务器启动了。。。。。");try {while (true) {
//                Set<SelectionKey> keys = selector.keys();
//                System.out.println(keys.size()+"   size");while (selector.select(50) > 0) {Set<SelectionKey> selectionKeys = selector.selectedKeys();Iterator<SelectionKey> iter = selectionKeys.iterator();while (iter.hasNext()) {SelectionKey key = iter.next();iter.remove();if (key.isAcceptable()) {acceptHandler(key);} else if (key.isReadable()) {
//                            key.cancel();  //现在多路复用器里把key  cancel了System.out.println("in.....");// 同一个Client,读之前先取消OP_READ,防止多线程冲突吹key.interestOps(key.interestOps() & ~SelectionKey.OP_READ);readHandler(key);//还是阻塞的嘛? 即便以抛出了线程去读取,但是在时差里,这个key的read事件会被重复触发} else if(key.isWritable()){  //我之前没讲过写的事件!!!!!//写事件<--  send-queue  只要是空的,就一定会给你返回可以写的事件,就会回调我们的写方法//你真的要明白:什么时候写?不是依赖send-queue是不是有空间//1,你准备好要写什么了,这是第一步//2,第二步你才关心send-queue是否有空间//3,so,读 read 一开始就要注册,但是write依赖以上关系,什么时候用什么时候注册//4,如果一开始就注册了write的事件,进入死循环,一直调起!!!
//                            key.cancel();key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);writeHandler(key);}}}}} catch (IOException e) {e.printStackTrace();}}private void writeHandler(SelectionKey key) {new Thread(()->{System.out.println("write handler...");SocketChannel client = (SocketChannel) key.channel();ByteBuffer buffer = (ByteBuffer) key.attachment();buffer.flip();while (buffer.hasRemaining()) {try {int write = client.write(buffer);System.out.println("write " + Thread.currentThread().getName()+ " " + write);} catch (IOException e) {e.printStackTrace();}}try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}buffer.clear();
//            key.cancel();//            try {client.shutdownOutput();
//client.close();
//
//            } catch (IOException e) {
//                e.printStackTrace();
//            }}).start();}public void acceptHandler(SelectionKey key) {try {ServerSocketChannel ssc = (ServerSocketChannel) key.channel();SocketChannel client = ssc.accept();client.configureBlocking(false);ByteBuffer buffer = ByteBuffer.allocate(8192);client.register(selector, SelectionKey.OP_READ, buffer);System.out.println("-------------------------------------------");System.out.println("新客户端:" + client.getRemoteAddress());System.out.println("-------------------------------------------");} catch (IOException e) {e.printStackTrace();}}public void readHandler(SelectionKey key) {new Thread(()->{System.out.println("read handler.....");SocketChannel client = (SocketChannel) key.channel();ByteBuffer buffer = (ByteBuffer) key.attachment();buffer.clear();int read = 0;try {while (true) {read = client.read(buffer);System.out.println(Thread.currentThread().getName()+ " " + read);if (read > 0) {// 同一个Client,读完数据后,要再次注册OP_READ,读后面发送过来的数据key.interestOps(  SelectionKey.OP_READ);client.register(key.selector(), key.interestOps() + SelectionKey.OP_WRITE,buffer);} else if (read == 0) {break;} else {client.close();break;}}} catch (IOException e) {try {System.out.println("client " + client.getRemoteAddress() + " disconnected");client.close();} catch (IOException ex) {throw new RuntimeException(ex);}e.printStackTrace();}}).start();}public static void main(String[] args) {SocketMultiplexingSingleThreadv2 service = new SocketMultiplexingSingleThreadv2();service.start();}
}

测试:

先启动一个服务端,再启动一个客户端1,客户端1发送数据

服务端日志:

服务器启动了。。。。。
-------------------------------------------
新客户端:/127.0.0.1:24029
-------------------------------------------
in.....
read handler.....
Thread-0 8
Thread-0 0
write handler...
write Thread-1 8

客户端1日志:

client connected to server
client1
client receive data from consolejava.io.BufferedInputStream@65231a33 : client1client receive data from serverjava.net.Socket$SocketInputStream@4629104a data size:8: client1

可以看到,客户单和服务端都可以正常接收和发送数据。

再添加一个客户端2,发送数据

服务端日志:

服务器启动了。。。。。
-------------------------------------------
新客户端:/127.0.0.1:24029
-------------------------------------------
in.....
read handler.....
Thread-0 8
Thread-0 0
write handler...
write Thread-1 8
-------------------------------------------
新客户端:/127.0.0.1:24105
-------------------------------------------
in.....
read handler.....
Thread-2 8
Thread-2 0
write handler...
write Thread-3 8

客户端2的日志:

client connected to server
client2
client receive data from consolejava.io.BufferedInputStream@65231a33 : client2client receive data from serverjava.net.Socket$SocketInputStream@27f8302d data size:8: client2

可以看到,客户端2和服务端都可以正常接收和发送数据。

客户端1,再次发送数据

客户端日志:

client connected to server
client1
client receive data from consolejava.io.BufferedInputStream@65231a33 : client1client receive data from serverjava.net.Socket$SocketInputStream@4629104a data size:8: client1clent1_2
client receive data from consolejava.io.BufferedInputStream@65231a33 : clent1_2client receive data from serverjava.net.Socket$SocketInputStream@4629104a data size:9: clent1_2

服务端日志:

服务器启动了。。。。。
-------------------------------------------
新客户端:/127.0.0.1:24029
-------------------------------------------
in.....
read handler.....
Thread-0 8
Thread-0 0
write handler...
write Thread-1 8
-------------------------------------------
新客户端:/127.0.0.1:24105
-------------------------------------------
in.....
read handler.....
Thread-2 8
Thread-2 0
write handler...
write Thread-3 8
in.....
read handler.....
Thread-4 9
Thread-4 0
write handler...
write Thread-5 9

从服务端日志中,可以看到,每个客户端的读事件,只有一个线程处理。

整个处理流程是服务预期的。

这篇关于Java 网络编程之TCP(五):分析服务端注册OP_WRITE写数据的各种场景(三)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/934738

相关文章

IDEA实现回退提交的git代码(四种常见场景)

《IDEA实现回退提交的git代码(四种常见场景)》:本文主要介绍IDEA实现回退提交的git代码(四种常见场景),具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录1.已提交commit,还未push到远端(Undo Commit)2.已提交commit并push到

java对接第三方接口的三种实现方式

《java对接第三方接口的三种实现方式》:本文主要介绍java对接第三方接口的三种实现方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录HttpURLConnection调用方法CloseableHttpClient调用RestTemplate调用总结在日常工作

IDEA下"File is read-only"可能原因分析及"找不到或无法加载主类"的问题

《IDEA下Fileisread-only可能原因分析及找不到或无法加载主类的问题》:本文主要介绍IDEA下Fileisread-only可能原因分析及找不到或无法加载主类的问题,具有很好的参... 目录1.File is read-only”可能原因2.“找不到或无法加载主类”问题的解决总结1.File

Spring 缓存在项目中的使用详解

《Spring缓存在项目中的使用详解》Spring缓存机制,Cache接口为缓存的组件规范定义,包扩缓存的各种操作(添加缓存、删除缓存、修改缓存等),本文给大家介绍Spring缓存在项目中的使用... 目录1.Spring 缓存机制介绍2.Spring 缓存用到的概念Ⅰ.两个接口Ⅱ.三个注解(方法层次)Ⅲ.

8种快速易用的Python Matplotlib数据可视化方法汇总(附源码)

《8种快速易用的PythonMatplotlib数据可视化方法汇总(附源码)》你是否曾经面对一堆复杂的数据,却不知道如何让它们变得直观易懂?别慌,Python的Matplotlib库是你数据可视化的... 目录引言1. 折线图(Line Plot)——趋势分析2. 柱状图(Bar Chart)——对比分析3

Spring Boot 整合 Redis 实现数据缓存案例详解

《SpringBoot整合Redis实现数据缓存案例详解》Springboot缓存,默认使用的是ConcurrentMap的方式来实现的,然而我们在项目中并不会这么使用,本文介绍SpringB... 目录1.添加 Maven 依赖2.配置Redis属性3.创建 redisCacheManager4.使用Sp

Spring Cache注解@Cacheable的九个属性详解

《SpringCache注解@Cacheable的九个属性详解》在@Cacheable注解的使用中,共有9个属性供我们来使用,这9个属性分别是:value、cacheNames、key、key... 目录1.value/cacheNames 属性2.key属性3.keyGeneratjavascriptor

redis在spring boot中异常退出的问题解决方案

《redis在springboot中异常退出的问题解决方案》:本文主要介绍redis在springboot中异常退出的问题解决方案,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴... 目录问题:解决 问题根源️ 解决方案1. 异步处理 + 提前ACK(关键步骤)2. 调整Redis消费者组

一文教你Java如何快速构建项目骨架

《一文教你Java如何快速构建项目骨架》在Java项目开发过程中,构建项目骨架是一项繁琐但又基础重要的工作,Java领域有许多代码生成工具可以帮助我们快速完成这一任务,下面就跟随小编一起来了解下... 目录一、代码生成工具概述常用 Java 代码生成工具简介代码生成工具的优势二、使用 MyBATis Gen

C#使用MQTTnet实现服务端与客户端的通讯的示例

《C#使用MQTTnet实现服务端与客户端的通讯的示例》本文主要介绍了C#使用MQTTnet实现服务端与客户端的通讯的示例,包括协议特性、连接管理、QoS机制和安全策略,具有一定的参考价值,感兴趣的可... 目录一、MQTT 协议简介二、MQTT 协议核心特性三、MQTTNET 库的核心功能四、服务端(BR