skynet源码分析4:actor生命周期管理

2024-05-07 05:18

本文主要是介绍skynet源码分析4:actor生命周期管理,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

skynet是基于多线程的,每个actor都会被单独的线程调度,且每个actor可以杀死其它actor,给其它actor发送消息,创建actor,也就是一个actor可能被多个线程持有,那么就会面临三个问题:

  1. 一个actor被同时使用时,如何安全释放。
  2. actor被释放后,外部使用时如何检测该actor已经无效了,以便流程能继续。
  3. 若信箱里的消息具有请求回应语义,那么如果通知消息源。

框架使用的是handle映射与引用计数的手法,对外暴露sc(skynet_context)的handle,而不是指针。这个模块实现在/skynet-src/skynet_handle.c中,来分析一下其具体原理,从头文件的接口入手:

skynet_handle.h

复制代码
 1 #ifndef SKYNET_CONTEXT_HANDLE_H
 2 #define SKYNET_CONTEXT_HANDLE_H
 3 
 4 #include <stdint.h>
 5 
 6 // reserve high 8 bits for remote id
 7 #define HANDLE_MASK 0xffffff
 8 #define HANDLE_REMOTE_SHIFT 24
 9 
10 struct skynet_context;
11 
12 uint32_t skynet_handle_register(struct skynet_context *);
13 int skynet_handle_retire(uint32_t handle);
14 struct skynet_context * skynet_handle_grab(uint32_t handle);
15 void skynet_handle_retireall();
16 
17 uint32_t skynet_handle_findname(const char * name);
18 const char * skynet_handle_namehandle(uint32_t handle, const char *name);
19 
20 void skynet_handle_init(int harbor);
21 
22 #endif
复制代码

handle是一个uint32_t的整数,高8位表示远程节点(这是框架自带的集群设施,后面的分析都会无视该部分,一来它不是框架核心,二来这个集群设施已经不被推荐使用)。

先来看看它内部的数据结构:

复制代码
#define DEFAULT_SLOT_SIZE 4
#define MAX_SLOT_SIZE 0x40000000struct handle_name {char * name;uint32_t handle;
};struct handle_storage {struct rwlock lock;uint32_t harbor;uint32_t handle_index;int slot_size;struct skynet_context ** slot;int name_cap;int name_count;struct handle_name *name;
};static struct handle_storage *H = NULL;
复制代码

看上去就一个sc的数组,看不出什么,看其它的方法吧,skynet_handle_register:

复制代码
 1 uint32_t
 2 skynet_handle_register(struct skynet_context *ctx) {
 3     struct handle_storage *s = H;
 4 
 5     rwlock_wlock(&s->lock);
 6     
 7     for (;;) {
 8         int i;
 9         for (i=0;i<s->slot_size;i++) {
10             uint32_t handle = (i+s->handle_index) & HANDLE_MASK;
11             int hash = handle & (s->slot_size-1);
12             if (s->slot[hash] == NULL) {
13                 s->slot[hash] = ctx;
14                 s->handle_index = handle + 1;
15 
16                 rwlock_wunlock(&s->lock);
17 
18                 handle |= s->harbor;
19                 return handle;
20             }
21         }
22         assert((s->slot_size*2 - 1) <= HANDLE_MASK);
23         struct skynet_context ** new_slot = skynet_malloc(s->slot_size * 2 * sizeof(struct skynet_context *));
24         memset(new_slot, 0, s->slot_size * 2 * sizeof(struct skynet_context *));
25         for (i=0;i<s->slot_size;i++) {
26             int hash = skynet_context_handle(s->slot[i]) & (s->slot_size * 2 - 1);
27             assert(new_slot[hash] == NULL);
28             new_slot[hash] = s->slot[i];
29         }
30         skynet_free(s->slot);
31         s->slot = new_slot;
32         s->slot_size *= 2;
33     }
34 }
复制代码

这个方法是添加一个sc的handle映射。

从代码看,是一种hash映射,用读写锁来保证线程安全。9-21行是哈希值的选取过程,就是一个自增长的整数,每计算一次就加1,用handle_index做计数器,用取模来映射到sc数组上。用二次探测法来解决冲突,第9行循环保证冲突探测会覆盖整个数组。

到22行,就说明数组满了,此时会成倍扩展原数组,将handle在新数组上重新取模映射一遍。这种hash的规则有两点好处:1、hash值不会重复.2、查找过程是真正O(1)的.

从第22行和handle_index的修改处,可以知道这个函数基于两个前提:1、数组大小不会超过0xffffff.2、handle_index没有处理溢出的情况,可能为0,也就是假定不会溢出。个人觉得handle_index还是处理一下溢出的情况较好,如果大于0xffffff,就设为1。

再来看看skynet_handle_grab:

复制代码
 1 struct skynet_context * 
 2 skynet_handle_grab(uint32_t handle) {
 3     struct handle_storage *s = H;
 4     struct skynet_context * result = NULL;
 5 
 6     rwlock_rlock(&s->lock);
 7 
 8     uint32_t hash = handle & (s->slot_size-1);
 9     struct skynet_context * ctx = s->slot[hash];
10     if (ctx && skynet_context_handle(ctx) == handle) {
11         result = ctx;
12         skynet_context_grab(result);
13     }
14 
15     rwlock_runlock(&s->lock);
16 
17     return result;
18 }
复制代码

这个函数作用是根据handle查找对应的sc,handle无效就返回NULL.上的是读锁,查找过程很简单,将handle取模,然后判断索引处的元素的handle是否一致。引用计数保存在sc里,并没有在本模块中,其实应该放在本模块中更为纯粹,sc里只需要知道如何释放自己就行了。查找成功会增加sc的计数(skynet_context_grab)。

再来看看skynet_handle_retire:

复制代码
 1 int
 2 skynet_handle_retire(uint32_t handle) {
 3     int ret = 0;
 4     struct handle_storage *s = H;
 5 
 6     rwlock_wlock(&s->lock);
 7 
 8     uint32_t hash = handle & (s->slot_size-1);
 9     struct skynet_context * ctx = s->slot[hash];
10 
11     if (ctx != NULL && skynet_context_handle(ctx) == handle) {
12         s->slot[hash] = NULL;
13         ret = 1;
14         int i;
15         int j=0, n=s->name_count;
16         for (i=0; i<n; ++i) {
17             if (s->name[i].handle == handle) {
18                 skynet_free(s->name[i].name);
19                 continue;
20             } else if (i!=j) {
21                 s->name[j] = s->name[i];
22             }
23             ++j;
24         }
25         s->name_count = j;
26     } else {
27         ctx = NULL;
28     }
29 
30     rwlock_wunlock(&s->lock);
31 
32     if (ctx) {
33         // release ctx may call skynet_handle_* , so wunlock first.
34         skynet_context_release(ctx);
35     }
36 
37     return ret;
38 }
复制代码

这个函数的作用是解除handle映射,而不是递减引用计数。

具体实现有两步:1、清空handle对应的槽,调用skynet_context_release.2、如果有注册命名,删除对应的节点。

其实将释放sc的控制放在本模块会更好。

其余的方法就是handle命名的支持,名称映射保存在数组中,按字典序排序,查找时用二分查找法。

现在可以看sc生命周期具体的场景了,看两个地方就行了:

  1. 消息调度处,skynet_context_message_dispatch函数里.
  2. sc的对外接口,主要是skynet_command.

在skynet_context_message_dispatch里可以看到(/skynet-src/skynet_server.c的285行):

struct skynet_context * ctx = skynet_handle_grab(handle);if (ctx == NULL) {struct drop_t d = { handle };skynet_mq_release(q, drop_message, &d);return skynet_globalmq_pop();}

通过skynet_handle_grab做了sc无效的检测,也就解决了开头提出的问题2。sc其它的对外接口也做了这样的判断。

那么剩下就是问题1,安全释放的问题。来看sc的对外释放接口,cmd_exit,cmd_kill,调的都是handle_exit:

复制代码
 1 static void
 2 handle_exit(struct skynet_context * context, uint32_t handle) {
 3     if (handle == 0) {
 4         handle = context->handle;
 5         skynet_error(context, "KILL self");
 6     } else {
 7         skynet_error(context, "KILL :%0x", handle);
 8     }
 9     if (G_NODE.monitor_exit) {
10         skynet_send(context,  handle, G_NODE.monitor_exit, PTYPE_CLIENT, 0, NULL, 0);
11     }
12     skynet_handle_retire(handle);
13 }
复制代码

这个函数最终调的skynet_handle_retire,它解除handle映射后调的是skynet_context_release。

来看看skynet_context_release:

复制代码
 1 static void 
 2 delete_context(struct skynet_context *ctx) {
 3     if (ctx->logfile) {
 4         fclose(ctx->logfile);
 5     }
 6     skynet_module_instance_release(ctx->mod, ctx->instance);
 7     skynet_mq_mark_release(ctx->queue);
 8     CHECKCALLING_DESTROY(ctx)
 9     skynet_free(ctx);
10     context_dec();
11 }
12 
13 struct skynet_context * 
14 skynet_context_release(struct skynet_context *ctx) {
15     if (ATOM_DEC(&ctx->ref) == 0) {
16         delete_context(ctx);
17         return NULL;
18     }
19     return ctx;
20 }
复制代码

引用计数为0后就会释放sc,那么问题1是这样来保证的:

调用handle_exit后会有两种情况:

1、其它逻辑流已经获取了sc,那么引用计数一定大于0,此时不会释放sc,当最后一个逻辑流递减引用计数时才会释放,是安全的。

2、sc被释放,其它逻辑流开始skynet_handle_grab,因为handle映射已经解除,所有查找无效,逻辑流可以知晓这一情况作出判断,是安全的。

sc释放时,没有释放信箱(message_queue),仅调用了skynet_mq_mark_release设置了释放标志,那它在哪里释放的呢?先来想想这样一个情况,如果sc释放了,信箱没被释放,那么skynet_handle_grab就会查找失败,而信箱还会在1级队列中,那么释放的地方只可能在skynet_context_message_dispatch里,回过头来看看它,就是在判断sc无效的分支里,调用了skynet_mq_release释放的信箱。

为什么信箱要独立出来分释放,而不和sc一起释放?因为sc是通过引用计数释放的,释放时机不明确,可能在任意一个逻辑流中,那么消息调度中是否应该将它压回1级队列就无法判断了,所以要独立出来。

只剩问题3的解决了,这只需要看信箱释放时是如何处理消息的就行了,在/skynet-src/skynet_server.c的drop_message里:

复制代码
static void
drop_message(struct skynet_message *msg, void *ud) {struct drop_t *d = ud;skynet_free(msg->data);uint32_t source = d->handle;assert(source);// report error to the message sourceskynet_send(NULL, source, msg->source, PTYPE_ERROR, 0, NULL, 0);
}
复制代码

通过向消息源发送一条PTYPE_ERROR来解决,这样期望收到回应的sc就有机会结束这条挂起的流程了。不过有个疑问,为什么回应时不带上session,难道要消息源自己查找信箱么?这点再消息分发的时候再看吧。

 


 

如果没有gc,那么在多线程编程中,如何安全释放资源是一定会面临的问题。通常将它独立到另外的模块中解决,有两种常用的方法:

  1.  本文的handle映射和引用计数。c++中通常用智能指针,通过析构、拷贝构造函数自动来加减引用计数做强制保证。个人觉得前者更为灵活。
  2. 释放时只打上标记,以一定频率定时回收资源。

ps:还是gc好,写代码时没有心里负担。

这篇关于skynet源码分析4:actor生命周期管理的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/966429

相关文章

使用jenv工具管理多个JDK版本的方法步骤

《使用jenv工具管理多个JDK版本的方法步骤》jenv是一个开源的Java环境管理工具,旨在帮助开发者在同一台机器上轻松管理和切换多个Java版本,:本文主要介绍使用jenv工具管理多个JD... 目录一、jenv到底是干啥的?二、jenv的核心功能(一)管理多个Java版本(二)支持插件扩展(三)环境隔

MyBatis Plus 中 update_time 字段自动填充失效的原因分析及解决方案(最新整理)

《MyBatisPlus中update_time字段自动填充失效的原因分析及解决方案(最新整理)》在使用MyBatisPlus时,通常我们会在数据库表中设置create_time和update... 目录前言一、问题现象二、原因分析三、总结:常见原因与解决方法对照表四、推荐写法前言在使用 MyBATis

Python主动抛出异常的各种用法和场景分析

《Python主动抛出异常的各种用法和场景分析》在Python中,我们不仅可以捕获和处理异常,还可以主动抛出异常,也就是以类的方式自定义错误的类型和提示信息,这在编程中非常有用,下面我将详细解释主动抛... 目录一、为什么要主动抛出异常?二、基本语法:raise关键字基本示例三、raise的多种用法1. 抛

github打不开的问题分析及解决

《github打不开的问题分析及解决》:本文主要介绍github打不开的问题分析及解决,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、找到github.com域名解析的ip地址二、找到github.global.ssl.fastly.net网址解析的ip地址三

Mysql的主从同步/复制的原理分析

《Mysql的主从同步/复制的原理分析》:本文主要介绍Mysql的主从同步/复制的原理分析,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录为什么要主从同步?mysql主从同步架构有哪些?Mysql主从复制的原理/整体流程级联复制架构为什么好?Mysql主从复制注意

Python中bisect_left 函数实现高效插入与有序列表管理

《Python中bisect_left函数实现高效插入与有序列表管理》Python的bisect_left函数通过二分查找高效定位有序列表插入位置,与bisect_right的区别在于处理重复元素时... 目录一、bisect_left 基本介绍1.1 函数定义1.2 核心功能二、bisect_left 与

java -jar命令运行 jar包时运行外部依赖jar包的场景分析

《java-jar命令运行jar包时运行外部依赖jar包的场景分析》:本文主要介绍java-jar命令运行jar包时运行外部依赖jar包的场景分析,本文给大家介绍的非常详细,对大家的学习或工作... 目录Java -jar命令运行 jar包时如何运行外部依赖jar包场景:解决:方法一、启动参数添加: -Xb

Spring中管理bean对象的方式(专业级说明)

《Spring中管理bean对象的方式(专业级说明)》在Spring框架中,Bean的管理是核心功能,主要通过IoC(控制反转)容器实现,下面给大家介绍Spring中管理bean对象的方式,感兴趣的朋... 目录1.Bean的声明与注册1.1 基于XML配置1.2 基于注解(主流方式)1.3 基于Java

基于Python+PyQt5打造一个跨平台Emoji表情管理神器

《基于Python+PyQt5打造一个跨平台Emoji表情管理神器》在当今数字化社交时代,Emoji已成为全球通用的视觉语言,本文主要为大家详细介绍了如何使用Python和PyQt5开发一个功能全面的... 目录概述功能特性1. 全量Emoji集合2. 智能搜索系统3. 高效交互设计4. 现代化UI展示效果

Apache 高级配置实战之从连接保持到日志分析的完整指南

《Apache高级配置实战之从连接保持到日志分析的完整指南》本文带你从连接保持优化开始,一路走到访问控制和日志管理,最后用AWStats来分析网站数据,对Apache配置日志分析相关知识感兴趣的朋友... 目录Apache 高级配置实战:从连接保持到日志分析的完整指南前言 一、Apache 连接保持 - 性