漫话Redis源码之一百零七

2024-02-06 09:32

本文主要是介绍漫话Redis源码之一百零七,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

还不懂多线程使用的朋友,大可借鉴下文中多线程的用法:

#define REDISMODULE_EXPERIMENTAL_API
#include "redismodule.h"
#include <assert.h>
#include <stdio.h>
#include <pthread.h>#define UNUSED(V) ((void) V)void *sub_worker(void *arg) {// Get Redis module contextRedisModuleCtx *ctx = (RedisModuleCtx *)arg;// Try acquiring GILint res = RedisModule_ThreadSafeContextTryLock(ctx);// GIL is already taken by the calling thread expecting to fail.assert(res != REDISMODULE_OK);return NULL;
}void *worker(void *arg) {// Retrieve blocked clientRedisModuleBlockedClient *bc = (RedisModuleBlockedClient *)arg;// Get Redis module contextRedisModuleCtx *ctx = RedisModule_GetThreadSafeContext(bc);// Acquire GILRedisModule_ThreadSafeContextLock(ctx);// Create another thread which will try to acquire the GILpthread_t tid;int res = pthread_create(&tid, NULL, sub_worker, ctx);assert(res == 0);// Wait for threadpthread_join(tid, NULL);// Release GILRedisModule_ThreadSafeContextUnlock(ctx);// Reply to clientRedisModule_ReplyWithSimpleString(ctx, "OK");// Unblock clientRedisModule_UnblockClient(bc, NULL);// Free the Redis module contextRedisModule_FreeThreadSafeContext(ctx);return NULL;
}int acquire_gil(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
{UNUSED(argv);UNUSED(argc);int flags = RedisModule_GetContextFlags(ctx);int allFlags = RedisModule_GetContextFlagsAll();if ((allFlags & REDISMODULE_CTX_FLAGS_MULTI) &&(flags & REDISMODULE_CTX_FLAGS_MULTI)) {RedisModule_ReplyWithSimpleString(ctx, "Blocked client is not supported inside multi");return REDISMODULE_OK;}if ((allFlags & REDISMODULE_CTX_FLAGS_DENY_BLOCKING) &&(flags & REDISMODULE_CTX_FLAGS_DENY_BLOCKING)) {RedisModule_ReplyWithSimpleString(ctx, "Blocked client is not allowed");return REDISMODULE_OK;}/* This command handler tries to acquire the GIL twice* once in the worker thread using "RedisModule_ThreadSafeContextLock"* second in the sub-worker thread* using "RedisModule_ThreadSafeContextTryLock"* as the GIL is already locked. */RedisModuleBlockedClient *bc = RedisModule_BlockClient(ctx, NULL, NULL, NULL, 0);pthread_t tid;int res = pthread_create(&tid, NULL, worker, bc);assert(res == 0);return REDISMODULE_OK;
}typedef struct {RedisModuleString **argv;int argc;RedisModuleBlockedClient *bc;
} bg_call_data;void *bg_call_worker(void *arg) {bg_call_data *bg = arg;// Get Redis module contextRedisModuleCtx *ctx = RedisModule_GetThreadSafeContext(bg->bc);// Acquire GILRedisModule_ThreadSafeContextLock(ctx);// Call the commandconst char* cmd = RedisModule_StringPtrLen(bg->argv[1], NULL);RedisModuleCallReply* rep = RedisModule_Call(ctx, cmd, "v", bg->argv + 2, bg->argc - 2);// Release GILRedisModule_ThreadSafeContextUnlock(ctx);// Reply to clientif (!rep) {RedisModule_ReplyWithError(ctx, "NULL reply returned");} else {RedisModule_ReplyWithCallReply(ctx, rep);RedisModule_FreeCallReply(rep);}// Unblock clientRedisModule_UnblockClient(bg->bc, NULL);/* Free the arguments */for (int i=0; i<bg->argc; i++)RedisModule_FreeString(ctx, bg->argv[i]);RedisModule_Free(bg->argv);RedisModule_Free(bg);// Free the Redis module contextRedisModule_FreeThreadSafeContext(ctx);return NULL;
}int do_bg_rm_call(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
{UNUSED(argv);UNUSED(argc);/* Make sure we're not trying to block a client when we shouldn't */int flags = RedisModule_GetContextFlags(ctx);int allFlags = RedisModule_GetContextFlagsAll();if ((allFlags & REDISMODULE_CTX_FLAGS_MULTI) &&(flags & REDISMODULE_CTX_FLAGS_MULTI)) {RedisModule_ReplyWithSimpleString(ctx, "Blocked client is not supported inside multi");return REDISMODULE_OK;}if ((allFlags & REDISMODULE_CTX_FLAGS_DENY_BLOCKING) &&(flags & REDISMODULE_CTX_FLAGS_DENY_BLOCKING)) {RedisModule_ReplyWithSimpleString(ctx, "Blocked client is not allowed");return REDISMODULE_OK;}/* Make a copy of the arguments and pass them to the thread. */bg_call_data *bg = RedisModule_Alloc(sizeof(bg_call_data));bg->argv = RedisModule_Alloc(sizeof(RedisModuleString*)*argc);bg->argc = argc;for (int i=0; i<argc; i++)bg->argv[i] = RedisModule_HoldString(ctx, argv[i]);/* Block the client */bg->bc = RedisModule_BlockClient(ctx, NULL, NULL, NULL, 0);/* Start a thread to handle the request */pthread_t tid;int res = pthread_create(&tid, NULL, bg_call_worker, bg);assert(res == 0);return REDISMODULE_OK;
}int do_rm_call(RedisModuleCtx *ctx, RedisModuleString **argv, int argc){UNUSED(argv);UNUSED(argc);if(argc < 2){return RedisModule_WrongArity(ctx);}const char* cmd = RedisModule_StringPtrLen(argv[1], NULL);RedisModuleCallReply* rep = RedisModule_Call(ctx, cmd, "v", argv + 2, argc - 2);if(!rep){RedisModule_ReplyWithError(ctx, "NULL reply returned");}else{RedisModule_ReplyWithCallReply(ctx, rep);RedisModule_FreeCallReply(rep);}return REDISMODULE_OK;
}int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {REDISMODULE_NOT_USED(argv);REDISMODULE_NOT_USED(argc);if (RedisModule_Init(ctx, "blockedclient", 1, REDISMODULE_APIVER_1)== REDISMODULE_ERR)return REDISMODULE_ERR;if (RedisModule_CreateCommand(ctx, "acquire_gil", acquire_gil, "", 0, 0, 0) == REDISMODULE_ERR)return REDISMODULE_ERR;if (RedisModule_CreateCommand(ctx, "do_rm_call", do_rm_call, "", 0, 0, 0) == REDISMODULE_ERR)return REDISMODULE_ERR;if (RedisModule_CreateCommand(ctx, "do_bg_rm_call", do_bg_rm_call, "", 0, 0, 0) == REDISMODULE_ERR)return REDISMODULE_ERR;return REDISMODULE_OK;
}

这篇关于漫话Redis源码之一百零七的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/683861

相关文章

Redis 的 SUBSCRIBE命令详解

《Redis的SUBSCRIBE命令详解》Redis的SUBSCRIBE命令用于订阅一个或多个频道,以便接收发送到这些频道的消息,本文给大家介绍Redis的SUBSCRIBE命令,感兴趣的朋友跟随... 目录基本语法工作原理示例消息格式相关命令python 示例Redis 的 SUBSCRIBE 命令用于订

sky-take-out项目中Redis的使用示例详解

《sky-take-out项目中Redis的使用示例详解》SpringCache是Spring的缓存抽象层,通过注解简化缓存管理,支持Redis等提供者,适用于方法结果缓存、更新和删除操作,但无法实现... 目录Spring Cache主要特性核心注解1.@Cacheable2.@CachePut3.@Ca

Redis实现高效内存管理的示例代码

《Redis实现高效内存管理的示例代码》Redis内存管理是其核心功能之一,为了高效地利用内存,Redis采用了多种技术和策略,如优化的数据结构、内存分配策略、内存回收、数据压缩等,下面就来详细的介绍... 目录1. 内存分配策略jemalloc 的使用2. 数据压缩和编码ziplist示例代码3. 优化的

redis-sentinel基础概念及部署流程

《redis-sentinel基础概念及部署流程》RedisSentinel是Redis的高可用解决方案,通过监控主从节点、自动故障转移、通知机制及配置提供,实现集群故障恢复与服务持续可用,核心组件包... 目录一. 引言二. 核心功能三. 核心组件四. 故障转移流程五. 服务部署六. sentinel部署

基于Redis自动过期的流处理暂停机制

《基于Redis自动过期的流处理暂停机制》基于Redis自动过期的流处理暂停机制是一种高效、可靠且易于实现的解决方案,防止延时过大的数据影响实时处理自动恢复处理,以避免积压的数据影响实时性,下面就来详... 目录核心思路代码实现1. 初始化Redis连接和键前缀2. 接收数据时检查暂停状态3. 检测到延时过

Redis实现分布式锁全过程

《Redis实现分布式锁全过程》文章介绍Redis实现分布式锁的方法,包括使用SETNX和EXPIRE命令确保互斥性与防死锁,Redisson客户端提供的便捷接口,以及Redlock算法通过多节点共识... 目录Redis实现分布式锁1. 分布式锁的基本原理2. 使用 Redis 实现分布式锁2.1 获取锁

Redis中哨兵机制和集群的区别及说明

《Redis中哨兵机制和集群的区别及说明》Redis哨兵通过主从复制实现高可用,适用于中小规模数据;集群采用分布式分片,支持动态扩展,适合大规模数据,哨兵管理简单但扩展性弱,集群性能更强但架构复杂,根... 目录一、架构设计与节点角色1. 哨兵机制(Sentinel)2. 集群(Cluster)二、数据分片

redis数据结构之String详解

《redis数据结构之String详解》Redis以String为基础类型,因C字符串效率低、非二进制安全等问题,采用SDS动态字符串实现高效存储,通过RedisObject封装,支持多种编码方式(如... 目录一、为什么Redis选String作为基础类型?二、SDS底层数据结构三、RedisObject

Redis分布式锁中Redission底层实现方式

《Redis分布式锁中Redission底层实现方式》Redission基于Redis原子操作和Lua脚本实现分布式锁,通过SETNX命令、看门狗续期、可重入机制及异常处理,确保锁的可靠性和一致性,是... 目录Redis分布式锁中Redission底层实现一、Redission分布式锁的基本使用二、Red

redis和redission分布式锁原理及区别说明

《redis和redission分布式锁原理及区别说明》文章对比了synchronized、乐观锁、Redis分布式锁及Redission锁的原理与区别,指出在集群环境下synchronized失效,... 目录Redis和redission分布式锁原理及区别1、有的同伴想到了synchronized关键字