Apache Mina 源码再读2 IoSession创建过程源代码剖析

2024-02-18 06:58

本文主要是介绍Apache Mina 源码再读2 IoSession创建过程源代码剖析,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!



在调用bind()函数后,AcceptorOperationFuture 被注册到AbstractPollingIoAcceptor 类中的registerQueue 队列。在AbstractPollingIoAcceptor中存在IoProcessor 类。


/**
 * An internal interface to represent an 'I/O processor' that performs
 * actual I/O operations for {@link IoSession}s.  It abstracts existing
 * reactor frameworks such as Java NIO once again to simplify transport
 * implementations.
 *
 * @author <a href="http://mina.apache.org">Apache MINA Project</a>
 * 
 * @param <S> the type of the {@link IoSession} this processor can handle
 */
public interface IoProcessor<S extends IoSession> {}


IoProcessor 是执行IoSession 的I/O操作的内部接口类。  IoProcessor再次 抽象了在Java Nio 内部的reactor  frameworks 框架,并简化传输层的实现。


IoProcessor 实现类为SimpleIoProcessorPool 类。SImpleIoProcessorPool主要把IoSession分配到一个或者多个IoProcessor中。


An IoProcessor pool that distributes IoSessions into one or more IoProcessors. Most current transport implementations use this pool internally to perform better in a multi-core environment, and therefore, you won't need to use this pool directly unless you are running multiple IoServices in the same JVM.

If you are running multiple IoServices, you could want to share the pool among all services. To do so, you can create a new SimpleIoProcessorPool instance by yourself and provide the pool as a constructor parameter when you create the services.

This pool uses Java reflection API to create multiple IoProcessor instances. It tries to instantiate the processor in the following order:

  1. A public constructor with one ExecutorService parameter.
  2. A public constructor with one Executor parameter.
  3. A public default constructor
The following is an example for the NIO socket transport:
// Create a shared pool.SimpleIoProcessorPool<NioSession> pool = new SimpleIoProcessorPool<NioSession>(NioProcessor.class, 16);

 // Create two services that share the same pool.SocketAcceptor acceptor = new NioSocketAcceptor(pool);SocketConnector connector = new NioSocketConnector(pool)



在Acceptor 线程中,processHandles 会创建一个新的IoSession.


  /*** This method will process new sessions for the Worker class.  All* keys that have had their status updates as per the Selector.selectedKeys()* method will be processed here.  Only keys that are ready to accept* connections are handled here.* <p/>* Session objects are created by making new instances of SocketSessionImpl* and passing the session object to the SocketIoProcessor class.*/@SuppressWarnings("unchecked")private void processHandles(Iterator<H> handles) throws Exception {while (handles.hasNext()) {H handle = handles.next();handles.remove();// Associates a new created connection to a processor,// and get back a sessionS session = accept(processor, handle);//当有一个新socket链接时,返回一个IoSessionif (session == null) {continue;}initSession(session, null, null);// add the session to the SocketIoProcessor//当Acceptor发现新的socket,把socket和ioproces相关联。所有io事件由ioprocess处理session.getProcessor().add(session);}}}

  protected NioSession accept(IoProcessor<NioSession> processor, ServerSocketChannel handle) throws Exception {SelectionKey key = null;if (handle != null) {key = handle.keyFor(selector);}if ((key == null) || (!key.isValid()) || (!key.isAcceptable())) {return null;}// accept the connection from the clientSocketChannel ch = handle.accept();if (ch == null) {return null;}return new NioSocketSession(this, processor, ch);}


当accept事件触发时,会创建一个新NioSocketSession .

public abstract class NioSession extends AbstractIoSession {
    /** The NioSession processor */
    protected final IoProcessor<NioSession> processor;


    /** The communication channel */
    protected final Channel channel;


    /** The SelectionKey used for this session */
    protected SelectionKey key;


    /** The FilterChain created for this session */
    private final IoFilterChain filterChain;


    /**
     * 
     * Creates a new instance of NioSession, with its associated IoProcessor.
     * <br>
     * This method is only called by the inherited class.
     *
     * @param processor The associated IoProcessor
     */
    protected NioSession(IoProcessor<NioSession> processor, IoService service, Channel channel) {
        super(service);
        this.channel = channel;
        this.processor = processor;
        filterChain = new DefaultIoFilterChain(this);
    }

}

在抽象类NioSession中,每创建一个NioSession就会创建一个DefaultFilterChain对象。也就意味着每一个NioSession与自己独立的DefaultFilterChain相关联。


NioSession创建后,会通过initSession 方法来初始化NioSession. 


在初始化NioSession 之前,先看看一个IoSessionDataStructureFactory 工厂类。Provides data structures to a newly created session. 这个工厂类提供了创建NioSession的数据结构。NioSession中的数据结构主要包括存储自定义对象的IoSessionAttributeMap 类以及IoSession写入到缓冲区的写出队列WriteRequestQueue。


IoSessionDataStructFactory 接口源代码如下:

/*** Provides data structures to a newly created session.* * @author <a href="http://mina.apache.org">Apache MINA Project</a>*/
public interface IoSessionDataStructureFactory {/*** Returns an {@link IoSessionAttributeMap} which is going to be associated* with the specified <tt>session</tt>.  Please note that the returned* implementation must be thread-safe.*/IoSessionAttributeMap getAttributeMap(IoSession session) throws Exception;/*** Returns an {@link WriteRequest} which is going to be associated with* the specified <tt>session</tt>.  Please note that the returned* implementation must be thread-safe and robust enough to deal* with various messages types (even what you didn't expect at all),* especially when you are going to implement a priority queue which* involves {@link Comparator}.*/WriteRequestQueue getWriteRequestQueue(IoSession session) throws Exception;
}


在DefaultIoSessionDataStructureFactory 默认实现工厂类中实现了IoSessionAttributeMap 和WriteRequestQueue两个数据结构的实现。



IoSessionAttributeMap内部为一个并发安全的HashMap可以存储键值对。

 private static class DefaultIoSessionAttributeMap implements IoSessionAttributeMap {
        private final ConcurrentHashMap<Object, Object> attributes = new ConcurrentHashMap<Object, Object>(4);

}



WriteRequestQueue为一个并发安全的队列实现。

    private static class DefaultWriteRequestQueue implements WriteRequestQueue {
        /** A queue to store incoming write requests */
        private final Queue<WriteRequest> q = new ConcurrentLinkedQueue<WriteRequest>();

}


回头看一下,在initSession()初始化方法中,主要创建IoSessionAttributeMap和WriteRequestQueue队列两个结构。



初始化NioSession完成后,就需要把NioSession分配到指定的Processor线程中。


在分配SimpleIoProcessorPool中提供分配NioSession 与Processor线程相关联的策略。


    private IoProcessor<S> getProcessor(S session) {   //获取在IoSession中PROCESSOR 为键的对象IoProcessor<S> processor = (IoProcessor<S>) session.getAttribute(PROCESSOR);//如果该IoSession尚未与IoProcessor相关联,则把IoProccessor按照如下原则相关联if (processor == null) {if (disposed || disposing) {throw new IllegalStateException("A disposed processor cannot be accessed.");}//把processor和session相关联。根据session id和proceor pool 的大小来决定。 把IoSession的ID与线程池中顺序相关联processor = pool[Math.abs((int) session.getId()) % pool.length];if (processor == null) {throw new IllegalStateException("A disposed processor cannot be accessed.");}//设置PROCESSOR 的对象为IoProccessorsession.setAttributeIfAbsent(PROCESSOR, processor);}return processor;}

然后把NioSession 方法指定Processor线程的newSessions队列中,并启动Processor线程。


同时在内部存在IoServiceStatistics 类来统计IoSession相关事件。



此时,IoSession创建过程就完成了。


小结一下,NioSession创建过程。


1、Acceptor 线程,在select()中,轮询accept事件。

2、当accept事件触发时,创建NioSession对象,每一个NioSession都有一个DefaultFilterChain相关联。

3、当创建NioSession后,需要通过initSession()方法来初始化化NioSession.这里主要涉及IoSessionDataStructFactory工厂类创建IoSessionAttrubteMap 来存储用户自定义数据,创建WriteRequestQueue队列。 

4、当NioSession构建完成后,需要在SimpleIoProcessorPool来把NioSession 分配到指定的Processor线程中。

5、NioSession被添加到指定Processor线程中newSession队列中,然后启动start  Processor线程。






这篇关于Apache Mina 源码再读2 IoSession创建过程源代码剖析的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/720391

相关文章

oracle 11g导入\导出(expdp impdp)之导入过程

《oracle11g导入导出(expdpimpdp)之导入过程》导出需使用SEC.DMP格式,无分号;建立expdir目录(E:/exp)并确保存在;导入在cmd下执行,需sys用户权限;若需修... 目录准备文件导入(impdp)1、建立directory2、导入语句 3、更改密码总结上一个环节,我们讲了

ShardingProxy读写分离之原理、配置与实践过程

《ShardingProxy读写分离之原理、配置与实践过程》ShardingProxy是ApacheShardingSphere的数据库中间件,通过三层架构实现读写分离,解决高并发场景下数据库性能瓶... 目录一、ShardingProxy技术定位与读写分离核心价值1.1 技术定位1.2 读写分离核心价值二

MyBatis-plus处理存储json数据过程

《MyBatis-plus处理存储json数据过程》文章介绍MyBatis-Plus3.4.21处理对象与集合的差异:对象可用内置Handler配合autoResultMap,集合需自定义处理器继承F... 目录1、如果是对象2、如果需要转换的是List集合总结对象和集合分两种情况处理,目前我用的MP的版本

Java Kafka消费者实现过程

《JavaKafka消费者实现过程》Kafka消费者通过KafkaConsumer类实现,核心机制包括偏移量管理、消费者组协调、批量拉取消息及多线程处理,手动提交offset确保数据可靠性,自动提交... 目录基础KafkaConsumer类分析关键代码与核心算法2.1 订阅与分区分配2.2 拉取消息2.3

深度剖析SpringBoot日志性能提升的原因与解决

《深度剖析SpringBoot日志性能提升的原因与解决》日志记录本该是辅助工具,却为何成了性能瓶颈,SpringBoot如何用代码彻底破解日志导致的高延迟问题,感兴趣的小伙伴可以跟随小编一起学习一下... 目录前言第一章:日志性能陷阱的底层原理1.1 日志级别的“双刃剑”效应1.2 同步日志的“吞吐量杀手”

Spring创建Bean的八种主要方式详解

《Spring创建Bean的八种主要方式详解》Spring(尤其是SpringBoot)提供了多种方式来让容器创建和管理Bean,@Component、@Configuration+@Bean、@En... 目录引言一、Spring 创建 Bean 的 8 种主要方式1. @Component 及其衍生注解

AOP编程的基本概念与idea编辑器的配合体验过程

《AOP编程的基本概念与idea编辑器的配合体验过程》文章简要介绍了AOP基础概念,包括Before/Around通知、PointCut切入点、Advice通知体、JoinPoint连接点等,说明它们... 目录BeforeAroundAdvise — 通知PointCut — 切入点Acpect — 切面

C++ STL-string类底层实现过程

《C++STL-string类底层实现过程》本文实现了一个简易的string类,涵盖动态数组存储、深拷贝机制、迭代器支持、容量调整、字符串修改、运算符重载等功能,模拟标准string核心特性,重点强... 目录实现框架一、默认成员函数1.默认构造函数2.构造函数3.拷贝构造函数(重点)4.赋值运算符重载函数

MySQ中出现幻读问题的解决过程

《MySQ中出现幻读问题的解决过程》文章解析MySQLInnoDB通过MVCC与间隙锁机制在可重复读隔离级别下解决幻读,确保事务一致性,同时指出性能影响及乐观锁等替代方案,帮助开发者优化数据库应用... 目录一、幻读的准确定义与核心特征幻读 vs 不可重复读二、mysql隔离级别深度解析各隔离级别的实现差异

Nginx添加内置模块过程

《Nginx添加内置模块过程》文章指导如何检查并添加Nginx的with-http_gzip_static模块:确认该模块未默认安装后,需下载同版本源码重新编译,备份替换原有二进制文件,最后重启服务验... 目录1、查看Nginx已编辑的模块2、Nginx官网查看内置模块3、停止Nginx服务4、Nginx