C语言pthread使用互斥锁及条件变量的跨进程多生产多消费模型

本文主要是介绍C语言pthread使用互斥锁及条件变量的跨进程多生产多消费模型,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

创作灵感:

格局要打开,进程也要打开,看看进程外的世界

代码描述:

1 此代码基于一个:

使用互斥锁及条件变量的线程间生产消费模型

该模型使用粗粒度锁 将整个生产或消费过程锁住

直到生产满 或 消费空 才使用条件变量通知对方

在唤醒后执行检查 只有仓库全满 或 仓库全空 才就继续执行生产消费逻辑 

否则依然是通知对方并等待

该模型同时还提供了一个监控线程 用于观察仓库情况

 2 变形记

将此线程间模型 分为两个进程(两个main函数)

使用posix风格的共享内存存放共享资源

使用pthread的互斥锁和条件变量对共享资源进行保护

为每个进程配置一个监视线程

并未实现2号生产者和消费者的逻辑

使用注意事项:

1 编译后在两个独立终端运行

2 不能使用大型IDE集成运行

3 unix-like系统 或 支持gnu_c的系统

4 对生产者按ctrl+c,生产消费都会清理资源并结束,输出all done

#define _GNU_SOURCE
#include <pthread.h>
#include <stdio.h>
#include <fcntl.h>
#include <unistd.h>
#include <sys/types.h>
#include <string.h>
#include <signal.h>
#include <sys/mman.h>// 速度基数,0.1秒
int tenth_s = 100000;
// 生产者1的速度
int producer2_speed = 5;
// 生产者2的速度
int producer1_speed = 5;
// 监视刷新速度
int monitor_speed = 5;
// 定义线程执行函数指针类型
typedef void *(*func)(void *);
// 互斥锁属性类型 只被初始化1次
pthread_mutexattr_t mutexattr;
// 条件变量属性类型 只被初始化1次
pthread_condattr_t condattr;
// 能装3个线程的数组
pthread_t tids[3] = {0};
// 用于共享内存中的数据,放到一个结构体里
struct shm_data
{pthread_cond_t cv_prod;pthread_cond_t cv_cons;pthread_mutex_t mutexlock;int warehouse[10];pid_t pid;
};
// 初始化共享内存后,返回的指针,转换成struct shm_data *类型
// 复制一份到全局变量,供整个进程使用
struct shm_data *global_addr;
// 这是生产者2的执行线程
void *producer2(void *p)
{// 现在不需要他,让他睡觉printf("producer2\n");sleep(999);pthread_exit(NULL);
}
// 这是收到信号后生产者1的清理函数
void clean_up(void *p)
{// 释放锁pthread_mutex_unlock(&global_addr->mutexlock);
}
// 这是生产者1的执行线程
void *producer1(void *p)
{printf("producer1\n");// 无限循环 套在最外面while (1){// 压入清理函数 防止意外终止pthread_cleanup_push(clean_up, NULL);// 上锁,注意是共享内存中的锁pthread_mutex_lock(&global_addr->mutexlock);// 用于记录仓库中产品的数量int count_p = 0;for (size_t i = 0; i < 10; i++){// 如果在某个位置上没货if (global_addr->warehouse[i] == 0){// 生产消耗时间usleep(producer1_speed * tenth_s);// 生产完毕global_addr->warehouse[i] += 1;}// 在某个位置有货记录+1if (global_addr->warehouse[i] == 1){count_p += 1;}}// 如果10次循环 每个位置都有货,表示仓库已满if (count_p == 10){// do...while用于检查唤醒后仓库的状态// 如果没有任何产品则继续生产,如果存在产品未消耗则通知消费者并等待do{// 告诉消费者可以消费了pthread_cond_signal(&global_addr->cv_cons);printf("producer %d wait\n", gettid());// 放锁,睡眠pthread_cond_wait(&global_addr->cv_prod, &global_addr->mutexlock);// 被唤醒后检查仓库状态count_p = 0;for (size_t i = 0; i < 10; i++){if (global_addr->warehouse[i] == 0){count_p += 1;}}} while (count_p != 10);// 真正醒来printf("producer %d wake up\n", gettid());}// 放锁,新一轮生产pthread_mutex_unlock(&global_addr->mutexlock);pthread_cleanup_pop(1);}pthread_exit(NULL);
}
// 这是一个人类可读的监视进程
void *show_warehouse(void *p)
{while (1){for (size_t i = 0; i < 10; i++){printf("[%d] ", global_addr->warehouse[i]);}printf("\n");// 刷新速度usleep(monitor_speed * tenth_s);}
}
// ctrl+c 信号处理函数
void sig_handler(int signum)
{// 打印信号描述printf("%s\n", strsignal(signum));
}
// 此函数用于初始化共享内存
void shm_init()
{// 打开共享内存文件描述符int fd = shm_open("/shm1", O_CREAT | O_RDWR, 0700);if (fd == -1){perror("shm_open");}// 扩容if (ftruncate(fd, 1024) == -1){perror("ftruncate");}// 附加到进程,并转换成struct shm_data *类型struct shm_data *addr = (struct shm_data *)mmap(NULL, 1024, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);if (addr == MAP_FAILED){perror("mmap");}else{// 赋值全局变量,全局可用global_addr = addr;}
}
void ips_init()
{// 初始化mutexattrpthread_mutexattr_init(&mutexattr);// 设共享pthread_mutexattr_setpshared(&mutexattr, PTHREAD_PROCESS_SHARED);// 初始化mutexpthread_mutex_init(&global_addr->mutexlock, &mutexattr);// 初始化condattrpthread_condattr_init(&condattr);// 设共享pthread_condattr_setpshared(&condattr, PTHREAD_PROCESS_SHARED);// 初始化condpthread_cond_init(&global_addr->cv_cons, &condattr);pthread_cond_init(&global_addr->cv_prod, &condattr);// 初始化仓库memset(global_addr->warehouse, 0, sizeof(global_addr->warehouse));// 创建线程两个生产者,一个监视func fun_c[3] = {producer1, producer2, show_warehouse};int i = 0;for (i = 0; i < 3; i++){pthread_create(&tids[i], NULL, fun_c[i], NULL);}
}
int main()
{// 注册信号处理函数 sig_handlersignal(SIGINT, sig_handler);// 初始化共享内存,赋值给一个struct shm_data 类型的全局指针变量shm_init();// 一些把线程共享改成进程共享的准备工作ips_init();// 暂停主线程pause();// 关闭消费者进程kill(global_addr->pid, SIGINT);// 关闭生产线程int i;for (i = 0; i < 3; i++){pthread_cancel(tids[i]);pthread_join(tids[i], NULL);}// 销毁条件变量属性pthread_condattr_destroy(&condattr);// 销毁互斥锁属性pthread_mutexattr_destroy(&mutexattr);// 销毁互斥锁pthread_mutex_destroy(&global_addr->mutexlock);// 销毁条件变量pthread_cond_destroy(&global_addr->cv_prod);pthread_cond_destroy(&global_addr->cv_cons);// 与共享内存分离munmap(global_addr, 1024);// 删除共享内存shm_unlink("/shm1");// 全剧终printf("all done\n");return 0;
}
#define _GNU_SOURCE
#include <pthread.h>
#include <stdio.h>
#include <fcntl.h>
#include <unistd.h>
#include <sys/types.h>
#include <string.h>
#include <signal.h>
#include <sys/mman.h>
// 调速
int tenth_s = 100000;
int consumer_speed = 5;
int producer_speed = 5;
int monitor_speed = 5;pthread_t tids[3] = {0};
typedef void *(*func)(void *);
struct shm_data
{pthread_cond_t cv_prod;pthread_cond_t cv_cons;pthread_mutex_t mutexlock;int warehouse[10];pid_t pid;
};
struct shm_data *global_addr;
// 清理函数
void clean_up(void *p)
{pthread_mutex_unlock(&global_addr->mutexlock);
}
// consumer1线程执行函数
void *consumer1(void *p)
{printf("consumer1\n");while (1){pthread_cleanup_push(clean_up, NULL);pthread_mutex_lock(&global_addr->mutexlock);int count_c = 0;for (size_t i = 0; i < 10; i++){if (global_addr->warehouse[i] == 1){usleep(consumer_speed * tenth_s);global_addr->warehouse[i] -= 1;}if (global_addr->warehouse[i] == 0){count_c += 1;}}if (count_c == 10){do{pthread_cond_signal(&global_addr->cv_prod);printf("consumer %d wait\n", gettid());pthread_cond_wait(&global_addr->cv_cons, &global_addr->mutexlock);// 被唤醒后检查仓库状态count_c = 0;for (size_t i = 0; i < 10; i++){if (global_addr->warehouse[i] == 1){count_c += 1;}}// 如果仓库中所有位置都有商品,则通过检查,继续消费} while (count_c != 10);printf("consumer %d wake up\n", gettid());}pthread_mutex_unlock(&global_addr->mutexlock);pthread_cleanup_pop(1);}pthread_exit(NULL);
}
// consumer1线程执行函数
void *consumer2(void *p)
{printf("consumer2\n");sleep(999);pthread_exit(NULL);
}
// 这是一个人类可读的监视进程
void *show_warehouse(void *p)
{while (1){for (size_t i = 0; i < 10; i++){printf("[%d] ", global_addr->warehouse[i]);}printf("\n");usleep(monitor_speed * tenth_s);}pthread_exit(NULL);
}
// 共享内存初始化函数
void shm_init()
{int fd = shm_open("/shm1", O_CREAT | O_RDWR, 0700);if (fd == -1){perror("shm_open");}struct shm_data *addr = (struct shm_data *)mmap(NULL, 1024, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);if (addr == MAP_FAILED){perror("mmap");}else{// 注意这里赋值了共享资源pid,用于在生产进程按ctrl+c时 同时关闭清理消费进程addr->pid = getpid();global_addr = addr;}
}
void pthread_create_init()
{func fun_c[3] = {consumer1, consumer2, show_warehouse};int i = 0;for (i = 0; i < 3; i++){pthread_create(&tids[i], NULL, fun_c[i], NULL);}
}
// 信号处理函数
void sig_handler(int signum)
{printf("%s\n", strsignal(signum));
}
int main()
{// 注册信号处理函数signal(SIGINT, sig_handler);// 初始化共享内存shm_init();// 创建进程pthread_create_init();// 主线程暂停pause();// 收到信号后运行下面代码int i;for (i = 0; i < 3; i++){pthread_cancel(tids[i]);pthread_join(tids[i], NULL);}// 与共享内存分离munmap(global_addr, 1024);printf("all done\n");return 0;
}

这篇关于C语言pthread使用互斥锁及条件变量的跨进程多生产多消费模型的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/808623

相关文章

Java使用Javassist动态生成HelloWorld类

《Java使用Javassist动态生成HelloWorld类》Javassist是一个非常强大的字节码操作和定义库,它允许开发者在运行时创建新的类或者修改现有的类,本文将简单介绍如何使用Javass... 目录1. Javassist简介2. 环境准备3. 动态生成HelloWorld类3.1 创建CtC

使用Python批量将.ncm格式的音频文件转换为.mp3格式的实战详解

《使用Python批量将.ncm格式的音频文件转换为.mp3格式的实战详解》本文详细介绍了如何使用Python通过ncmdump工具批量将.ncm音频转换为.mp3的步骤,包括安装、配置ffmpeg环... 目录1. 前言2. 安装 ncmdump3. 实现 .ncm 转 .mp34. 执行过程5. 执行结

Java使用jar命令配置服务器端口的完整指南

《Java使用jar命令配置服务器端口的完整指南》本文将详细介绍如何使用java-jar命令启动应用,并重点讲解如何配置服务器端口,同时提供一个实用的Web工具来简化这一过程,希望对大家有所帮助... 目录1. Java Jar文件简介1.1 什么是Jar文件1.2 创建可执行Jar文件2. 使用java

C#使用Spire.Doc for .NET实现HTML转Word的高效方案

《C#使用Spire.Docfor.NET实现HTML转Word的高效方案》在Web开发中,HTML内容的生成与处理是高频需求,然而,当用户需要将HTML页面或动态生成的HTML字符串转换为Wor... 目录引言一、html转Word的典型场景与挑战二、用 Spire.Doc 实现 HTML 转 Word1

Java中的抽象类与abstract 关键字使用详解

《Java中的抽象类与abstract关键字使用详解》:本文主要介绍Java中的抽象类与abstract关键字使用详解,本文通过实例代码给大家介绍的非常详细,感兴趣的朋友跟随小编一起看看吧... 目录一、抽象类的概念二、使用 abstract2.1 修饰类 => 抽象类2.2 修饰方法 => 抽象方法,没有

MyBatis ParameterHandler的具体使用

《MyBatisParameterHandler的具体使用》本文主要介绍了MyBatisParameterHandler的具体使用,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参... 目录一、概述二、源码1 关键属性2.setParameters3.TypeHandler1.TypeHa

Spring 中的切面与事务结合使用完整示例

《Spring中的切面与事务结合使用完整示例》本文给大家介绍Spring中的切面与事务结合使用完整示例,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考... 目录 一、前置知识:Spring AOP 与 事务的关系 事务本质上就是一个“切面”二、核心组件三、完

使用docker搭建嵌入式Linux开发环境

《使用docker搭建嵌入式Linux开发环境》本文主要介绍了使用docker搭建嵌入式Linux开发环境,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面... 目录1、前言2、安装docker3、编写容器管理脚本4、创建容器1、前言在日常开发全志、rk等不同

使用Python实现Word文档的自动化对比方案

《使用Python实现Word文档的自动化对比方案》我们经常需要比较两个Word文档的版本差异,无论是合同修订、论文修改还是代码文档更新,人工比对不仅效率低下,还容易遗漏关键改动,下面通过一个实际案例... 目录引言一、使用python-docx库解析文档结构二、使用difflib进行差异比对三、高级对比方

sky-take-out项目中Redis的使用示例详解

《sky-take-out项目中Redis的使用示例详解》SpringCache是Spring的缓存抽象层,通过注解简化缓存管理,支持Redis等提供者,适用于方法结果缓存、更新和删除操作,但无法实现... 目录Spring Cache主要特性核心注解1.@Cacheable2.@CachePut3.@Ca