DAOS的事件队列(EventQueue)与事件(Event)和任务调度引擎(TSE)及源码分析

本文主要是介绍DAOS的事件队列(EventQueue)与事件(Event)和任务调度引擎(TSE)及源码分析,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

简介

事件和事件队列

DAOS API 函数可以在阻塞或非阻塞模式下使用。 这是通过传递给每个 API 调用的指向 DAOS 事件的指针来确定的:如果 NULL 表示操作将被阻塞。 操作完成后会返回。 所有失败情况的错误码都将通过API函数本身的返回码返回。 如果使用有效的事件,则该操作将以非阻塞模式运行,并在内部调度程序中调度该操作以及将 RPC 提交到底层堆栈后立即返回。 如果调度成功,则操作的返回值为success,但并不表示实际操作成功。 返回时可以捕获的错误要么是无效参数,要么是调度问题。 当事件完成时,操作的实际返回代码将在事件错误代码 (event.ev_error) 中提供。 必须首先通过单独的 API 调用创建要使用的有效事件。 为了允许用户一次跟踪多个事件,可以将事件创建为事件队列的一部分,事件队列基本上是可以一起进行和轮询的事件的集合。 事件队列还在内部为所有 DAOS 任务创建一个单独的任务调度程序以及一个新的网络上下文。 在某些网络提供商上,网络上下文创建是一项昂贵的操作,因此用户应尝试限制在 DAOS 之上的应用程序或 IO 中间件库中创建的事件队列的数量。 或者,可以在没有事件队列的情况下创建事件,并单独跟踪。 在这种情况下,对于阻塞操作,将使用内部全局任务调度程序和网络上下文来代替为事件队列创建的独立任务调度程序和网络上下文。 事件完成后,它可以重新用于另一个 DAOS API 调用,以最大限度地减少 DAOS 库内事件创建和分配的需要

DAOS Task API 提供了一种以非阻塞方式使用 DAOS API 的替代方法,同时在 DAOS API 操作之间构建任务依赖树。 这对于使用 DAOS 并需要构建彼此之间具有依赖关系(N-1、1-N、N-N)的 DAOS 操作计划的应用程序和中间件库非常有用

要利用任务 API,用户需要创建一个调度程序,其中可以创建 DAOS 任务作为其中的一部分。 任务 API 足够通用,允许用户混合 DAOS 特定任务(通过 DAOS 任务 API)和其他用户定义的任务,并在这些任务之间添加依赖关系

有关如何在客户端库中使用 TSE 的更多详细信息,请参阅 TSE 内部文档(https://github.com/ssbandjl/daos/blob/master/src/common/README.md)以获取更多详细信息

事件与事件队列及任务调度引擎流程图

在这里插入图片描述

流程说明(dfuse为例)

以DAOS用户态文件系统dfuse为例

  • 在初始化客户端库中初始化事件队列, 关联全局网络上下文, 设置调度器

  • 启动文件系统中注册了SLAB, 绑定事件队列和事件, 参考: daos_event_init

  • 开启轮训线程dfuse_progress_thread, 参考daos_eq_poll

  • 文件系统执行写

    客户端写数据:xb/write.c -> write(fd, direct_write_buf, BUF_SIZE)
    write -> dfuse_cb_write 回调写 src/client/dfuse/fuse3
    
  • 封装ev, 并将ev传下去: dfs_write(oh->doh_dfs, oh->doh_obj, &ev->de_sgl, position, &ev->de_ev)

    DAOS用户态文件系统, 写流程
    master -> src/client/dfuse/ops/write.c -> dfuse_cb_write(fuse_req_t req, fuse_ino_t ino, struct fuse_bufvec *bufv, off_t position, struct fuse_file_info *fi)
    struct dfuse_projection_info *fs_handle = fuse_req_userdata(req)
    eqt_idx = atomic_fetch_add_relaxed(&fs_handle->di_eqt_idx, 1) -> 原子递增,每次返回+1前的值, 比如: eqt_idx=7
    eqt = &fs_handle->di_eqt[eqt_idx % fs_handle->di_eq_count] -> 取余打散到eq
    ev = d_slab_acquire(eqt->de_write_slab) -> 分配EV, 需要提前注册: d_slab_register(&fs_handle->di_slab, &write_slab, eqt, &eqt->de_write_slab)
    ev->de_complete_cb = dfuse_cb_write_complete
    dfs_write(oh->doh_dfs, oh->doh_obj, &ev->de_sgl, position, &ev->de_ev) -> dfs_write(dfs_t *dfs, dfs_obj_t *obj, d_sg_list_t *sgl, daos_off_t off, daos_event_t *ev)daos_array_write(obj->oh, DAOS_TX_NONE, &iod, sgl, ev) -> daos_array_write(daos_handle_t oh, daos_handle_t th, daos_array_iod_t *iod, d_sg_list_t *sgl, daos_event_t *ev)dc_task_create(dc_array_write, NULL, ev, &task) -> 关联EV和tasksched = daos_ev2sched(ev) -> 拿到调度器指针, 初始化调度器return dc_task_schedule(task, true)
    sem_post(&eqt->de_sem) -> 唤醒EQ
    d_slab_restock(eqt->de_write_slab) -> 重用slab
    
  • 与tse结合构造task, 调度task

  • 网络回复后, 在轮训线程中trigger到, 拿到ev和task, 逐层向上级执行回调函数, 最终执行业务回调

源码分析

客户端mount, master, gdb --args /opt/daos/bin/dfuse --mountpoint=/tmp/sxb --pool=sxb --cont=sxb -f -> 默认后台启动
dfuse -m /mnt/sxb --pool sxb --cont sxb | dfuse --mountpoint=/tmp/sxb --pool=sxb --cont=sxb
dfuse_main.c -> maindaos_debug_init(DAOS_LOG_DEFAULT)d_log_init_adv 高级日志初始化, 客户端日志文件log_file = getenv(D_LOG_FILE_ENV) export D_LOG_FILE=/tmp/daos_client.logdebug_prio_err_load_envd_log_openfreopen(mst.log_file 重新关联标准输出或错误输出setlinebuf(stderr) 设置错误输出为行缓冲d_log_sync_maskdfuse_info->di_eq_count = 1daos_init -> 初始化客户端库daos_debug_initdaos_hhash_init_featsdc_agent_initdc_job_initdc_mgmt_net_cfgdaos_eq_lib_init -> 初始化事件队列库 -> static tse_sched_t daos_sched_g -> 指向不属于 EQ 一部分的事件的全局调度程序的指针。 作为 EQ 一部分初始化的事件将在该 EQ 调度程序中进行跟踪crt_init_optcrt_context_create(&daos_eq_ctx) -> 全局共享网络上下文, 所有事件队列(eq)共享使用这个上下文tse_sched_init(&daos_sched_g, NULL, daos_eq_ctx) -> 初始化调度器(无事件队列), 为无eq事件设置调度器dc_mgmt_net_cfg_checkpl_initdc_mgmt_initdc_pool_initdc_cont_initdc_obj_initdfuse_fs_init(dfuse_info) -> daos用户态文件信息=文件系统控制器D_ALLOC_ARRAY(fs_handle->di_eqt, fs_handle->di_eq_count) -> eq数组d_hash_table_create_inplace dpi_pool_table 打开的池表, 创建hash表 大小=power2(n次方), 操作方法dpi_iet open inodesfor (i = 0; i < fs_handle->di_eq_count; i++)struct dfuse_eq *eqt = &fs_handle->di_eqt[i] -> 根据传入的EQ数量, 将eq与文件系统句柄中的eq表绑定eqt->de_handle = fs_handle -> 互存指针,双向绑定sem_init(&eqt->de_sem, 0, 0) -> 在 eq 之前创建信号量,因为无法检查 sem_init() 是否已被调用,如果没有调用 sem_destroy 也是无效的。 这样我们就可以避免添加额外的内存来跟踪信号量的状态daos_eq_create(&eqt->de_eq) -> 一个事件队列关联一个网络上下文, 跟踪池的多个事件 -> 创建事件队列。 事件队列用于保存和池化多个事件。 创建的每个事件队列都将创建一个与事件队列关联的网络(cart)上下文。 网络上下文创建是一项昂贵的操作,并且在某些系统上网络上下文的数量可能受到限制。 因此,建议不要在用户应用程序或中间件中创建大量事件队列eq = daos_eq_alloc() -> 分配eqD_INIT_LIST_HEAD(&eq->eq_running)D_INIT_LIST_HEAD(&eq->eq_comp)daos_hhash_hlink_init(&eqx->eqx_hlink, &eq_h_ops)return eqcrt_context_create(&eqx->eqx_ctx)crt_contpext_provider_createcrt_context_initdaos_eq_insert(eqx)daos_hhash_link_insert(&eqx->eqx_hlink, DAOS_HTYPE_EQ) -> 插入全局hash表(struct daos_hhash_table	daos_ht)daos_eq_handle(eqx, eqh)daos_hhash_link_key(&eqx->eqx_hlink, &h->cookie) -> 关联keytse_sched_init(&eqx->eqx_sched, NULL, eqx->eqx_ctx) -> 初始化调度器 -> struct tse_sched_private -> 调度器及队列,属性等struct tse_sched_private	*dsp = tse_sched2priv(sched) -> 设置调度器私有指针dspD_INIT_LIST_HEAD(&dsp->dsp_init_list); -> 初始队列D_INIT_LIST_HEAD(&dsp->dsp_running_list); -> 运行队列D_INIT_LIST_HEAD(&dsp->dsp_complete_list) -> 完成队列D_INIT_LIST_HEAD(&dsp->dsp_sleeping_list) -> 睡眠队列D_INIT_LIST_HEAD(&dsp->dsp_comp_cb_list); -> 完成回调队列tse_sched_register_comp_cb(sched, comp_cb, udata) -> 初始回调为空dsc->dsc_comp_cb = comp_cb -> 设置调度器的完成回调和回调参数(udata)d_list_add(&dsc->dsc_list, &dsp->dsp_comp_cb_list) -> 将调度器的完成回调 -> 调度器的完成回调队列sched->ds_udata = udata -> 将网络上下文 daos_eq_ctx 设置到调度器的用户数据指针上(也可以是回调数据等)daos_eq_putref(eqx) -> 减一次引用计数(ch_rec_decref)duns_resolve_pathdfuse_pool_connectdfuse_cont_opendfuse_fs_start 启动文件系统d_hash_rec_insert(&fs_handle->dpi_iet 将根插入hash表, 在 dfuse_reply_entry 中也会插入: d_hash_rec_find_insert(&fs_handle->dpi_ietd_slab_init(&fs_handle->di_slab, fs_handle)for (i = 0; i < fs_handle->di_eq_count; i++)d_slab_register(&fs_handle->di_slab, &read_slab, eqt, &eqt->de_read_slab)create_many(type)ptr   = create(type) -> create(struct d_slab_type *type)type->st_reg.sr_init(ptr, type->st_arg) -> dfuse_event_init -> ev->de_eqt = handle -> 为ev绑定daos_event_tif (!type->st_reg.sr_reset(ptr)) -> dfuse_read_event_reset(void *arg) -> 重置读事件evD_ALLOC(ev->de_iov.iov_buf, DFUSE_MAX_READ) -> 读最大1MBev->de_sgl.sg_nr       = 1daos_event_init(&ev->de_ev, ev->de_eqt->de_eq, NULL) -> 父事件为空, 也支持父事件,: daos_event_init(child_events[i], DAOS_HDL_INVAL, &event)evx->evx_status	= DAOS_EVS_READYif (daos_handle_is_valid(eqh)) -> 句柄有效eqx = daos_eq_lookup(eqh)evx->evx_ctx = eqx->eqx_ctxevx->evx_sched = &eqx->eqx_sched -> 继承EQ的网络和调度器entry = ptr + type->st_reg.sr_offsetd_list_add_tail(entry, &type->st_free_list) -> 将对象加入空闲列表,计数器+1type->st_free_count++d_list_add_tail(&type->st_type_list, &slab->slab_list) -> 将slab放入列表备用for (i = 0; i < fs_handle->di_eq_count; i++) dfuse_progress_thread pthread_create(&fs_handle->dpi_thread, NULL, dfuse_progress_thread, fs_handle) 异步进度线程,该线程在启动时使用事件队列启动,并阻塞在信号量上,直到创建异步事件,此时线程唤醒并在 daos_eq_poll() 中忙于轮询直到完成sem_waitdaos_eq_poll  从 EQ 中检索完成事件daos_eq_lookup 查找私有事件队列daos_hhash_link_lookupcrt_progress_cond(epa.eqx->eqx_ctx, timeout, eq_progress_cb, &epa)eq_progress_cbdfuse_launch_fuse(fs_handle, &args) 创建fuse文件系统fuse_session_new(args, &dfuse_ops, sizeof(dfuse_ops), fs_handle)fuse_session_mountdfuse_send_to_fgdfuse_loopdfuse_fs_fini

总结

  • DAOS的任务调度引擎结合事件队列和事件, 与网络上下文绑定完成抽象封装, 可作为项目第三方组件引入, 结合业务, 完成同步和异步任务调度(依赖任务处理,如多副本写, EC), 事件, 事件队列, 任务, 调度器, HASH表, SLAB, 各种运行队列, 完成队列, 完成回调队列, 延迟队列…, 可应对复杂的业务调度和管理需求
  • 一个文件系统绑定多个事件队列, IO打散到每个事件队列, 负载均衡
  • 全局HASH表结合cookie作为key, 快速捞回事件队列

参考

DAOS客户端API_事件和事件队列及任务调度引擎

晓兵

博客: https://logread.cn | https://blog.csdn.net/ssbandjl | https://cloud.tencent.com/developer/user/5060293/articles

weixin: ssbandjl

公众号: 云原生云

DAOS IO全路径详解(视频)
DAOS 项目简介(视频)
欢迎对DAOS, SPDK, RDMA等高性能技术感兴趣的朋友加入[DAOS技术交流(群)]

这篇关于DAOS的事件队列(EventQueue)与事件(Event)和任务调度引擎(TSE)及源码分析的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/190281

相关文章

LiteFlow轻量级工作流引擎使用示例详解

《LiteFlow轻量级工作流引擎使用示例详解》:本文主要介绍LiteFlow是一个灵活、简洁且轻量的工作流引擎,适合用于中小型项目和微服务架构中的流程编排,本文给大家介绍LiteFlow轻量级工... 目录1. LiteFlow 主要特点2. 工作流定义方式3. LiteFlow 流程示例4. LiteF

SpringBoot集成LiteFlow实现轻量级工作流引擎的详细过程

《SpringBoot集成LiteFlow实现轻量级工作流引擎的详细过程》LiteFlow是一款专注于逻辑驱动流程编排的轻量级框架,它以组件化方式快速构建和执行业务流程,有效解耦复杂业务逻辑,下面给大... 目录一、基础概念1.1 组件(Component)1.2 规则(Rule)1.3 上下文(Conte

MyBatis Plus 中 update_time 字段自动填充失效的原因分析及解决方案(最新整理)

《MyBatisPlus中update_time字段自动填充失效的原因分析及解决方案(最新整理)》在使用MyBatisPlus时,通常我们会在数据库表中设置create_time和update... 目录前言一、问题现象二、原因分析三、总结:常见原因与解决方法对照表四、推荐写法前言在使用 MyBATis

Python主动抛出异常的各种用法和场景分析

《Python主动抛出异常的各种用法和场景分析》在Python中,我们不仅可以捕获和处理异常,还可以主动抛出异常,也就是以类的方式自定义错误的类型和提示信息,这在编程中非常有用,下面我将详细解释主动抛... 目录一、为什么要主动抛出异常?二、基本语法:raise关键字基本示例三、raise的多种用法1. 抛

Python基于微信OCR引擎实现高效图片文字识别

《Python基于微信OCR引擎实现高效图片文字识别》这篇文章主要为大家详细介绍了一款基于微信OCR引擎的图片文字识别桌面应用开发全过程,可以实现从图片拖拽识别到文字提取,感兴趣的小伙伴可以跟随小编一... 目录一、项目概述1.1 开发背景1.2 技术选型1.3 核心优势二、功能详解2.1 核心功能模块2.

github打不开的问题分析及解决

《github打不开的问题分析及解决》:本文主要介绍github打不开的问题分析及解决,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、找到github.com域名解析的ip地址二、找到github.global.ssl.fastly.net网址解析的ip地址三

Mysql的主从同步/复制的原理分析

《Mysql的主从同步/复制的原理分析》:本文主要介绍Mysql的主从同步/复制的原理分析,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录为什么要主从同步?mysql主从同步架构有哪些?Mysql主从复制的原理/整体流程级联复制架构为什么好?Mysql主从复制注意

java -jar命令运行 jar包时运行外部依赖jar包的场景分析

《java-jar命令运行jar包时运行外部依赖jar包的场景分析》:本文主要介绍java-jar命令运行jar包时运行外部依赖jar包的场景分析,本文给大家介绍的非常详细,对大家的学习或工作... 目录Java -jar命令运行 jar包时如何运行外部依赖jar包场景:解决:方法一、启动参数添加: -Xb

Apache 高级配置实战之从连接保持到日志分析的完整指南

《Apache高级配置实战之从连接保持到日志分析的完整指南》本文带你从连接保持优化开始,一路走到访问控制和日志管理,最后用AWStats来分析网站数据,对Apache配置日志分析相关知识感兴趣的朋友... 目录Apache 高级配置实战:从连接保持到日志分析的完整指南前言 一、Apache 连接保持 - 性

MySQL 存储引擎 MyISAM详解(最新推荐)

《MySQL存储引擎MyISAM详解(最新推荐)》使用MyISAM存储引擎的表占用空间很小,但是由于使用表级锁定,所以限制了读/写操作的性能,通常用于中小型的Web应用和数据仓库配置中的只读或主要... 目录mysql 5.5 之前默认的存储引擎️‍一、MyISAM 存储引擎的特性️‍二、MyISAM 的主