[Linux] 线程同步分析:什么是条件变量?生产者消费者模型是什么?POSIX信号量怎么用?阻塞队列和环形队列模拟生产者消费者模型
这样合理吗?
只互斥的问题: 饥饿
优先级非常高
的线程, 也存在一部分 优先级非常低(与高优先级线程存在断层)
的线程, 其他线程的优先级也不太低. 那么可能会出现什么问题?所有线程又会抢临界资源、抢锁. 一直如此.
永远抢不到临界资源
、抢不到锁. 也就永远无法被调度
, 永远无法分配到资源
.饥饿 或 饿死
的问题, 即, 执行流长时间无法获得某种资源的情况, 被称为饥饿或饿死
没有错误
, 但是很可能存在类似的线程饥饿的情况, 所以 是不太合理
的.线程同步
在保证临界资源安全的前提下, 让执行流访问临界资源具有一定的顺序性
, 这种机制被称为同步
. 也就是本篇文章主要介绍的内容.虽然同步是指让执行流访问临界资源有一定顺序性的机制, 但是 互斥其实也是同步机制的一种.
虽然只采用互斥 执行流访问资源还是乱序的. 但它还是在一定程度上协调了多个线程的执行, 因为 互斥锁可以保证同一时刻 只有一个执行流访问临界资源.
不过
本篇文章介绍时
会将同步和互斥区别开, 即同步不包括互斥
, 不然非常容易混淆.
条件变量
条件变量
有时候如果临界资源不满足一定的条件, 是不允许线程执行一定的操作的
.想要实现 如果某条件不满足时, 需要让线程等待, 并且如果条件满足时, 可以让线程恢复继续执行的机制
, 就需要用到 条件变量
.pthread
库提供的 一个结构体类型(pthread_cond_t)
的变量, 并且 pthread
库中也提供的操作条件变量的一些接口.cond
及接口
cond
即 condition
, 是条件的意思.pthread_cond_t
即为 定义条件变量的类型
.pthread_cond_t
类型定义的.destroy
了pthread_cond_init()
接口, 来初始化, 第一个参数是条件变量的地址, 第二个参数是条件变量的属性(可以不考虑).init
接口初始化的条件变量, 在不需要使用时, 需要调用 pthread_cond_destroy()
接口进行销毁.pthread_cond_wait()
是 pthread
库提供的 使用条件变量等待的接口. 线程调用此接口, 线程就会立即进入等待.pthread_cond_timedwait()
也是 pthread
库提供的 使用条件变量等待的接口, 只不过 此接口是一种定时让线程等待的接口. 即, 可以通过此接口设置一定的时间, 在此时间内让线程等待. 如果此时间内 条件满足了, 则线程会自动被唤醒, 继续执行代码.需要一个互斥锁
. 从接口就可以反映出来, 条件变量一般是和互斥锁一起使用的. 具体如何一起使用, 由于还没有使用过条件变量, 我们后面再介绍.让线程等待
.pthread_cond_signal()
, 调用此接口, 可以让某个
通过指定条件变量陷入等待的 线程被唤醒
.pthread_cond_broadcast()
, 调用此接口, 则可以让通过指定条件变量陷入等待的所有线程唤被醒
cond
及接口的使用演示
#include <iostream>
#include <unistd.h>
#include <pthread.h>
using std::cin;
using std::cout;
using std::endl;
// 定义并初始化全局互斥锁
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
// 定义全局条件变量
pthread_cond_t cond;
void* waitCommand(void* args) {
pthread_detach(pthread_self()); // 先让线程自己分离自己, 我们就不在主线程中回收线程了
// 在此例中, 如果不分离, 线程回收会是个问题. 但具体问题后面再解释和解决.
// 这里我们只是展示一下 接口的最基本的用法和现象
const char* name = (const char*)args;
while (true) {
pthread_cond_wait(&cond, &mutex);
cout << name << ", tid: " << pthread_self() << ", run……" << endl;
}
return nullptr;
}
int main() {
pthread_cond_init(&cond, nullptr);
pthread_t tid1, tid2, tid3;
pthread_create(&tid1, nullptr, waitCommand, (void*)"Thread_1");
pthread_create(&tid2, nullptr, waitCommand, (void*)"Thread_2");
pthread_create(&tid3, nullptr, waitCommand, (void*)"Thread_3");
while (true) {
char c = 'a';
cout << "请输入你的命令(N/Q):: ";
cin >> c;
if (c == 'N' | c == 'n') {
pthread_cond_signal(&cond);
}
else
break;
usleep(1000); // 让主线程usleep一下, 防止线程之间在屏幕上打印干扰
}
pthread_cond_destroy(&cond);
return 0;
}
线程的唤醒是以一定顺序来执行的
.pthread_cond_signal()
来单个唤醒等待的线程.pthread_cond_broadcast()
来广播唤醒所有等待的线程:#include <iostream>
#include <unistd.h>
#include <pthread.h>
using std::cin;
using std::cout;
using std::endl;
// 定义并初始化全局互斥锁
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
// 定义全局条件变量
pthread_cond_t cond;
// // 定义一个全局退出变量, 用于判断条件
volatile bool quit = false;
void* waitCommand(void* args) {
pthread_detach(pthread_self()); // 先让线程自己分离自己, 我们就不在主线程中回收线程了
// 在此例中, 如果不分离, 线程回收会是个问题. 但具体问题后面再解释和解决.
// 这里我们只是展示一下 接口的最基本的用法和现象
const char* name = (const char*)args;
while (!quit) {
// 不满足退出条件, 就进来等待
pthread_cond_wait(&cond, &mutex);
cout << name << ", tid: " << pthread_self() << ", run……" << endl;
}
pthread_mutex_unlock(&mutex); // 暂时不解释 这里解锁的原因
cout << name << ", tid: " << pthread_self() << ", end……" << endl;
return nullptr;
}
int main() {
pthread_cond_init(&cond, nullptr);
pthread_t tid1, tid2, tid3;
pthread_create(&tid1, nullptr, waitCommand, (void*)"Thread_1");
pthread_create(&tid2, nullptr, waitCommand, (void*)"Thread_2");
pthread_create(&tid3, nullptr, waitCommand, (void*)"Thread_3");
while (true) {
char c = 'a';
cout << "请输入你的命令(N/Q):: ";
cin >> c;
if (c == 'N' | c == 'n') {
pthread_cond_broadcast(&cond);
}
else {
quit = true; // 修改条件为满足
pthread_cond_broadcast(&cond); // 然后唤醒线程, 再让线程判断条件是否满足
break;
}
usleep(1000); // 让主线程usleep一下, 防止线程之间在屏幕上打印干扰
}
pthread_cond_destroy(&cond);
return 0;
}
非 N 和 n 时, 让退出条件被满足, 并唤醒线程
.为什么条件变量需要与互斥锁一起使用?
pthread_cond_wait()
的使用需要同时用到 条件变量和互斥锁.条件
, 一般就是线程对应的需要访问的临界资源的状态
. 就像我们介绍互斥时的抢票动作, 需要保证票是>0的 才能抢票.临界资源数据的变化
. 所以 需要用互斥锁来保护临界资源
.先上锁
, 然后再判断是否满足条件
, 如果不满足
则条件等待并解锁
. 此时解锁的目的, 是让其他
可以让条件满足的线程获取锁
.等到陷入等待的线程的 条件满足之后
, 再唤醒
刚才等待的线程
并解锁
.被唤醒的线程
再次获取锁
判断条件是否满足, 满足就去执行操作, 否则再次陷入等待.保证整个过程中临界资源是被保护着的
pthread_cond_wait()
完成的.pthread_cond_wait()
解锁并等待. 在线程被唤醒时, 会自动再去竞争锁. 解锁和上锁的操作都是在 pthread_cond_wait()
接口内部实现的.第2行
, 我们让线程分离自己, 不用回收.第13行
, 我们执行了解锁操作. 因为 pthread_cond_wait()
陷入等待时, 会释放锁. 然后被唤醒的时候, 会竞争锁. 如果退出条件满足了. 也就意味着线程将要退出了.pthread_cond_wait()
需要条件变量和互斥锁一起使用?pthread_cond_wait()
接口需要执行释放锁和竞争锁的操作. 所以 需要先看到锁.生产者消费者模型介绍 **
这里的生产者和消费者, 我们
不以生物学的角度
看待.
以学生来代表消费者
.将工厂看作生产者
.通过超市
这个渠道的.提高了效率
解耦
.消费者 看作消费线程
, 将生产者 看作生产线程
.超市
其实就可以看作是 临界资源
.-
消费者与消费者之间是什么关系?
消费者与消费者之间好像是没有关系, 你买你的我买我的, 互不干扰. 但是 仔细想一想,
消费者与消费者之间实际是一种 竞争关系
.互不干扰是因为商品充足, 如果商品不足的话, 是需要竞争的. 其实是 竞争关系
而竞争关系, 其实就是 一种
互斥关系
-
生产者与生产者之间是什么关系?
生产者与生产者之间, 其实也是一种竞争关系. 竞争超市内的空间资源. 竞争谁可以给超市供货. 即
互斥关系
-
消费者与生产者之间需要什么关系?
消费者和生产者, 看似是没有之间的关系的. 但是思考一个问题, 既然超市是临界资源. 那么消费者和生产者是可能在同一时间访问临界资源的.
如果 供应商再给超市供货的时候, 货还没有供完, 货架上的东西还没有放完. 在生活中我们可以直接拿走一个, 然后去超市结账.
但是如果从计算机的角度来看, 生产线程还没有向临界资源内写完数据, 消费线程可以从临界资源中拿走数据吗? 很明显是不可以的. 因为 消费线程可能拿不到完整的资源.
所以, 以计算机的角度来说, 消费者和生产者首先 要
保持
一个互斥关系
而除了互斥之外, 其实还需要 保持
同步
.因为消费者不能在超市没有商品的时候购买商品, 需要等待, 让生产者先向超市供货. 生产者也不能在超市的空间资源已满的情况下继续向超市供货, 需要等待, 让消费者先购买商品.
等到超市有商品了, 再通知消费者来购买. 等到超市有空间了, 再通知生产者来供货.
3
类2
种角色1
个交易场所"交易"
的3、2、1
的思想来理解3、2、1
的模型, 来编写和解决问题. 即, 多线程访问临界资源, 生产线程之间保持互斥、消费线程之间也保持互斥、生产线程和消费线程之间保持互斥、同步. 这样来整理思路, 可以方便解决很多问题.如何让生产线程或消费线程等待?又如何让生产线程和消费线程被唤醒?又如何判断所需条件是否被满足?
条件变量
就可以为我们提供的生产者消费者模型的优点
- 解耦, 可以将两个角色之间的 强耦合关系 变为 松耦合关系.
- 支持并发.
- 支持忙闲不均.
以阻塞队列模拟生产者消费者模型 **
为空时
, 从队列获取元素
的操作将会被阻塞
, 直到队列中被放入了元素; 当队列满时
, 往队列里存放元素
的操作也会被阻塞
, 直到有元素被从队列中取出 (以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)阻塞队列的特点像一个管道
uint32_t _cap
记录阻塞队列的容量queue<T> _bq
, 即为阻塞队列本身_mutex
、_conCond
、_proCond
一个互斥锁, 两个线程分别用的条件变量
blockQueue.hpp:
#pragma once
#include <iostream>
#include <queue>
#include <pthread.h>
#include <unistd.h>
#include <cstdlib>
using std::queue;
using std::cout;
using std::endl;
const uint32_t gDefultCap = 5;
template <class T>
class blockQueue {
public:
// 构造函数
blockQueue(uint32_t cap = gDefultCap)
:_cap(cap) {
pthread_mutex_init(&_mutex, nullptr); // 初始化锁
pthread_cond_init(&_proCond, nullptr); // 初始化生产线程使用的条件变量
pthread_cond_init(&_conCond, nullptr); // 初始化消费线程使用的条件变量
}
// 析构函数
~blockQueue() {
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_conCond);
pthread_cond_destroy(&_proCond);
}
// 生产接口
void push(const T &in) {
// 生产的全过程为
// 1. 上锁
// 2. 判满. 满不生产 条件等待, 不满则生产.
// 3. 生产之后, 解锁
// 4. 唤醒消费接口
lockQueue(); // 上锁
while(isFull()) {
// 满 进入条件等待
condWait(_proCond); // 传入生产线程所用的条件变量, 让生产线程等待
}
// 不满 则生产
pushCore(in);
// 解锁
unlockQueue();
condWakeUp(_conCond); // 传入消费线程所用的条件变量, 唤醒消费线程
}
T pop() {
// 消费的全过程为
// 1. 上锁
// 2. 判空. 空则不消费 条件等待, 不空 则消费
// 3. 消费之后, 解锁
// 4. 唤醒生产接口
lockQueue();
while(isEmpty()) {
condWait(_conCond);
}
T tmp = popCore();
unlockQueue();
condWakeUp(_proCond);
return tmp;
}
private:
// 队列上锁
void lockQueue() {
pthread_mutex_lock(&_mutex);
}
// 队列解锁
void unlockQueue() {
pthread_mutex_unlock(&_mutex);
}
// 判空
bool isEmpty() {
return _bq.empty();
}
// 判满
bool isFull() {
return _bq.size() == _cap;
}
// 条件等待
void condWait(pthread_cond_t &cond) {
pthread_cond_wait(&cond, &_mutex);
}
// 唤醒等待
void condWakeUp(pthread_cond_t &cond) {
pthread_cond_signal(&cond);
}
// 生产任务
void pushCore(const T &in) {
// 即为向队列中添加任务
_bq.push(in);
}
// 消费任务
T popCore() {
// 即从队列中拿出任务
T tmp = _bq.front();
_bq.pop();
return tmp;
}
private:
uint32_t _cap; // 队列容量
queue<T> _bq; // 阻塞队列
pthread_mutex_t _mutex; // 互斥锁
pthread_cond_t _conCond; // 消费线程使用的条件变量
pthread_cond_t _proCond; // 生产线程使用的条件变量
};
blockQueue.cc:
#include <iostream>
#include <ctime>
#include "blockQueue.hpp"
using std::cout;
using std::endl;
void* productor(void* args) {
blockQueue<int>* pBq = static_cast<blockQueue<int>*>(args);
while (true) {
// 制作数据
int data = rand() % 10;
// 向队列中生产数据
pBq->push(data);
cout << "productor 生产数据完成……" << data << endl;
sleep(2);
}
return nullptr;
}
void* consumer(void* args) {
blockQueue<int>* pBq = static_cast<blockQueue<int>*>(args);
while (true) {
int data = pBq->pop();
cout << "consumer 消费数据完成……" << data << endl;
}
return nullptr;
}
int main() {
// 设置一个随机数种子
srand((unsigned long)time(nullptr) ^ getpid());
// 定义阻塞队列
// 创建两个线程
blockQueue<int> bq;
pthread_t pro, con;
pthread_create(&pro, nullptr, productor, &bq); // 生产线程
pthread_create(&con, nullptr, consumer, &bq); // 消费线程
pthread_join(pro, nullptr);
pthread_join(con, nullptr);
return 0;
}
每2s
push入队列中.问题1:条件判断的语句
while()
而不是 if()
为什么 ?pthread_cond_wait()
之后, 接着向后执行代码, 一定表示条件已经被满足了吗?不一定
. 第一种情况就是, 这个函数因为某种情况调用失败了. 调用失败很可能会继续执行之后的代码. 此时 条件很大概率是没有满足的.伪唤醒
的情况.不能只用一个 if() 判断. 唤醒之后 需要再次判断才能保证是否正确的唤醒
问题2:什么时候唤醒 或者 什么时候解锁?
队列的解锁是在唤醒线程之前的, 即先解锁, 再唤醒线程
可不可以 先唤醒线程, 再解锁呢?
有没有什么影响呢?没有数据安全的影响
理解生产者消费者模型的 并发
解耦
支持忙闲不均
的优点.支持并发的这一特点
并发
体现在哪里呢?Task.hpp:
#pragma once
#include <iostream>
#include <string>
class Task {
public:
Task(int one = 0, int two = 0, char op = 0)
: elemOne_(one)
, elemTwo_(two)
, operator_(op) {}
// 仿函数定义
int operator()() {
return run();
}
int run() {
int result = 0;
switch (operator_) {
case '+':
result = elemOne_ + elemTwo_;
break;
case '-':
result = elemOne_ - elemTwo_;
break;
case '*':
result = elemOne_ * elemTwo_;
break;
case '/':
// 除0处理
if (elemTwo_ == 0) {
std::cout << "div zero, abort" << std::endl;
result = -1;
}
else {
result = elemOne_ / elemTwo_;
}
break;
case '%':
// 除0处理
if (elemTwo_ == 0) {
std::cout << "mod zero, abort" << std::endl;
result = -1;
}
else {
result = elemOne_ % elemTwo_;
}
break;
default:
std::cout << "非法操作: " << operator_ << std::endl;
break;
}
return result;
}
int get(int* e1, int* e2, char* op) {
*e1 = elemOne_;
*e2 = elemTwo_;
*op = operator_;
return 0;
}
private:
int elemOne_;
int elemTwo_;
char operator_;
};
blockQueue.cc:
#include <iostream>
#include <ctime>
#include "blockQueue.hpp"
#include "Task.hpp"
using std::cout;
using std::endl;
const std::string ops = "+-*/%";
// 生产任务接口
void* productor(void* args) {
blockQueue<Task>* pBq = static_cast<blockQueue<Task>*>(args);
while (true) {
// 制作任务
int elemOne = rand() % 50;
int elemTwo = rand() % 10;
char oper = ops[rand() % 4]; // 操作符
Task t(elemOne, elemTwo, oper);
// 生产任务
pBq->push(t);
cout << "producter[" << pthread_self() << "] " <<
(unsigned long)time(nullptr) << " 生产了一个任务: " <<
elemOne << oper << elemTwo << "=?" << endl;
sleep(1);
}
return nullptr;
}
void* consumer(void* args) {
blockQueue<Task>* pBq = static_cast<blockQueue<Task>*>(args);
while (true) {
// 消费任务
Task t = pBq->pop();
// 处理任务
int result = t();
int elemOne, elemTwo;
char oper;
t.get(&elemOne, &elemTwo, &oper);
cout << "consumer[" << pthread_self() << "] " <<
(unsigned long)time(nullptr) << " 消费了一个任务: " <<
elemOne << oper << elemTwo << "=" << result << endl;
}
return nullptr;
}
int main() {
// 设置一个随机数种子
srand((unsigned long)time(nullptr) ^ getpid());
// 定义阻塞队列
// 创建两个线程
blockQueue<Task> bq;
pthread_t pro, con;
pthread_create(&pro, nullptr, productor, &bq); // 生产线程
pthread_create(&con, nullptr, consumer, &bq); // 消费线程
pthread_join(pro, nullptr);
pthread_join(con, nullptr);
return 0;
}
blockQueue.hpp
还是上面的内容.生产和消费加减乘除的任务
消费者从超市消费商品是互斥的, 生产者给超市生产商品也是互斥的. 消费者与生产者也不能同时从超市消费或给超市生产
并不是指 消费者和生产者可以并发的向"超市"消费或生产数据
.生产者制作商品\任务
、消费者处理商品\任务
的这种过程, 其实是可以并发执行
的.制作任务和处理任务的过程其实是需要消耗一定的资源的, 比如时间
.支持并发
的情况一般
情况下并不是在临界区
并发, 而是在 生产前和消费后支持并发
支持忙闲不均
, 同样体现在生产前和消费后
的制作和处理上.POSIX信号量
信号量
也是同步的一种机制.什么是信号量?
有限
的, 不过每一个空座位都可以被任何人选择
.选座位买票
, 每买一张票选一个座位
, 放映厅内的空座位就会少一个
.买到了电影票
, 实际上就是选了放映厅内指定的座位, 让空座位减了1
, 即 预定了放映厅内的座位
.放映厅
看作为一个 临界资源
, 每一个座位
都是临界资源的一小部分资源
, 这所有的座位
就可以 看作是信号量
. 当有人买票时选中了座位
, 接下来可选择的座位就少了一个, 可以看作是 信号量--
; 如果有人退票, 就可以看作是 信号量++
没有了空座位
, 就表示 信号量减到了0
, 其他人再想要买票
, 就需要等有人退票
.互斥锁!
如果信号量只有1, 那么此信号量就可以当互斥锁用.二元信号量
临界资源也可以分为一小部分一小部分的吗
?信号量操作
来让线程选中.如果申请到了信号量, 就表示一定获得了一部分临界资源吗?
只要申请到了信号量, 就一定获得了一部分临界资源
. 因为, 你申请到了, 只要你不释放, 别人就无法申请, 从原则上来说, 已经获得了这部分资源.--
和 ++
. 即 信号量也是一个临界资源. 即 信号量的申请和释放需要时原子性的.--
和 信号量的释放++
都是 原子性的
信号量的接口
sem_t
. 其常用的基本接口有:1. sem_init()初始化:
2. sem_destroy()销毁:
3. sem_wait()等待, 即申请信号量:
4. sem_post()释放信号量:
pthread
库提供的. 不过需要使用的是 semphore.h
头文件以环形队列模拟生产者消费者模型 **
环形队列
先进先出
. 而环形队列实现先进先出的方法是, 可变的队头
从 0位置出
, 然后将后面的元素向前移动一位.环形队列不同
. 环形队列可以看作将数组卷了起来:环形队列的队头是可以变化的
. 什么意思呢?队头指针指向0
, 队尾指针指向3 或是 4
.出队列
时, 需要取到0位置的数据, 然后将 队头指针++, 移动到 1
. 新的队头就是1位置
出队列时, 队头改变, 即 ++
. 那么也就是说, 环形队列的任意位置都可能是队头.不容易直接判断
队列是否 为满或为空
.队列为空
队头和队尾指针指向同一位置
, 队列为满
对头和队尾指针也指向同一位置
. 所以, 环形队列判断为空或未满, 一般通过入队列出队列计数器, 或者恒在队头的前一个位置留空.总结一下
长度是一定
的. 然后 队头和队尾用两个"指针"表示
. 数据入队列
, 队尾指针会向后++
. 数据出队列, 队头指针会向后++
.指针
移动到数组的最后一个元素
时, 再++ 就会回到0 位置
. (此操作, 一般由取模控制)- 如果刚创建的队列, 那就为空.
- 如果队尾指针刚追上队头指针, 那就为满
- 如果队头指针刚追上队尾指针, 那就为空
模拟模型
思路
线程分为生产者和消费者
. 生产者
生产数据, 即为 将数据入队列
. 消费者
消费数据, 即为 将数据出队列
.队列为空
时, 生产者可以生产
, 消费者不能消费
. 当队列为满
时, 生产者不能生产
, 消费者可以消费
. 此时的生产线程和消费线程需要访问同一个位置
. 是互斥与同步
的关系.其他情况
时, 生产者和消费者可以并发的生产和消费
. 因为, 此时生产线程和消费线程访问的不是同一个位置
.生产者不能超越消费者, 消费者也不能超越生产者. 即, 队列满时, 不能再生产, 队列空时, 不能再消费
.都是由信号量来保证的
-
生产者需要的是什么资源?
需要
空间资源
, 因为需要向队列中, 入数据 -
消费者需要的是什么资源?
需要
数据资源
, 因为需要从队列中, 出数据
roomSem
, 另一个是 表示数据资源量 dataSem
--
. 并且, 申请成功就表示获得了一块空间资源, 别人就无法获取. 然后 等到生产数据入队列之后, 数据资源信号量还需要++
, 因为 有数据入队列了.dataSem
, 申请 数据资源信号量, 申请成功 则数据资源信号量--
. 等到数据出队列之后, 空间资源信号量也是需要++
的.roomSem
应该为多少? dataSem
应该为多少?roomSem
应该为 N, dataSem
应该为 0.roomSem++
时, 对应的dataSem
需要--
; dataSem++
时, 对应的roomSem
需要--
. 就可以保证生产者、消费者不会互相超越.生产者和消费者访问的是同一块空间. 所以是需要互斥、同步的
.生产和消费的动作就可以并发
的执行.不同的线程访问的是临界资源的不同部分
使用信号量 模拟 生产者消费者模型
ringQueue.hpp:
#pragma once
#include <iostream>
#include <vector>
#include <semaphore.h>
using std::cout;
using std::endl;
using std::vector;
const int gDefultCap = 10;
template <class T>
class ringQueue {
public:
// 构造函数
ringQueue(const int cap = gDefultCap)
: _ringQueue(cap)
, _pIndex(0)
, _cIndex(0) {
sem_init(&_roomSem, 0, _ringQueue.size());
sem_init(&_dataSem, 0, 0);
// sem_init() 接口的
// 第一个参数是 需要初始化的信号量,
// 第二个参数是
// 第三个参数是 需要初始化为多少
}
// 析构函数
~ringQueue() {
sem_destroy(&_roomSem);
sem_destroy(&_dataSem);
}
// 生产接口
void push(const T &in) {
// 生产数据
// 先申请空间信号量
sem_wait(&_roomSem); // 申请成功则 _roomSem--, 否则等待
_ringQueue[_pIndex] = in; // 将数据放入 数组
sem_post(&_dataSem); // 数组中数据+1, 那么 dataSem 需要++
_pIndex++; // 生产者下一次生产数据的位置 ++
_pIndex %= _ringQueue.size(); // 跟新下标, 保证环形特性
}
// 消费接口
T pop() {
// 消费数据
// 先申请数据信号量
sem_wait(&_dataSem); // 申请成功则 _dataSem--, 否则等待
T tmp = _ringQueue[_cIndex]; // 存储应拿到的数据
sem_post(&_roomSem); // 拿出了数据, 空间+1, 那么 _roomSem ++
_cIndex++;
_cIndex %= _ringQueue.size();
return tmp;
}
private:
vector<T> _ringQueue; // 模拟循环队列的数组
sem_t _roomSem; // 空间资源信号量, 生产者申请
sem_t _dataSem; // 数据资源信号量, 消费者申请
uint32_t _pIndex; // 生产者生产数据的索引下标, 即插入数据的下标
uint32_t _cIndex; // 消费者消费数据的索引下标, 即获取数据的下标
};
- 一个数组, 用来模拟环形队列
- 一个空间信号量、一个数据信号量. 分别用来控制生产者对空间的申请, 消费者对数据的申请
- 一个生产数据的索引下标, 一个消费数据的索引下标, 分别用来表示插入数据的下标, 和拿出数据的下标
sem_init()
的使用:int sem_init(sem_t *sem, int pshared, unsigned int value);
sem_t* sem
, 需要初始化的信号量int pshared
, 信号量的类型. 我们暂不考虑, 传入0unsigned int value
, 信号量的初始值
_cIndex %= _ringQueue.size()
和 _pIndex %= _ringQueue.size()
来控制环形特性_pIndex
和 _cIndex
需要 ++
. 如果不控制, 迟早超出数组. 所以需要 取模控制一下, 并控制环形特性ringQueue.cc:
#include "ringQueue.hpp"
#include <iostream>
#include <ctime>
#include <unistd.h>
using std::cout;
// 消费线程调用函数
void* consumer(void* args) {
ringQueue<int>* ringQp = static_cast<ringQueue<int>*>(args);
while (true) {
sleep(3);
int data = ringQp->pop();
cout << "consumer_pthread[" << pthread_self() << "]"
<< " 消费了一个数据: " << data << endl;
}
}
// 生产线程调用函数
void* productor(void* args) {
ringQueue<int>* ringQp = static_cast<ringQueue<int>*>(args);
while (true) {
int data = rand() % 20;
ringQp->push(data);
cout << "productor_pthread[" << pthread_self() << "]"
<< " 生产了一个数据: " << data << endl;
sleep(1);
}
}
int main() {
srand((unsigned long)time(nullptr) ^ getpid());
ringQueue<int> ringQ;
pthread_t con, pro;
pthread_create(&con, nullptr, consumer, &ringQ);
pthread_create(&pro, nullptr, productor, &ringQ);
pthread_join(con, nullptr);
pthread_join(pro, nullptr);
return 0;
}
按照生产的顺序消费
. 到后面, 由于消费速度不快, 所以队列很快被占满, 但是生产者
也并没有超越消费者去生产数据
, 而是等着消费者消费之后, 在生产.由于信号量在控制着
. 队列被占满时, sem_wait(_roomSem)
是无法申请成功空间信号量的, 因为此时 _roomSem
为0. 队列为空时, 则相反.多生产线程和多消费线程
, 我们当前
模拟的生产者消费者模型的代码会出现错误
吗?对索引下标的保护就不合格
.ringQueue
封装的成员就需要改进一下:#pragma once
#include <iostream>
#include <vector>
#include <semaphore.h>
using std::cout;
using std::endl;
using std::vector;
const int gDefultCap = 30; // 实现了多线程, 适当的将队列放大
template <class T>
class ringQueue {
public:
// 构造函数
ringQueue(const int cap = gDefultCap)
: _ringQueue(cap)
, _pIndex(0)
, _cIndex(0) {
sem_init(&_roomSem, 0, _ringQueue.size());
sem_init(&_dataSem, 0, 0);
// sem_init() 接口的
// 第一个参数是 需要初始化的信号量,
// 第二个参数是
// 第三个参数是 需要初始化为多少
// 初始化锁
pthread_mutex_init(&_pMutex, nullptr);
pthread_mutex_init(&_cMutex, nullptr);
}
// 析构函数
~ringQueue() {
sem_destroy(&_roomSem);
sem_destroy(&_dataSem);
pthread_mutex_destroy(&_pMutex);
pthread_mutex_destroy(&_cMutex);
}
// 生产接口
void push(const T &in) {
// 生产数据
// 先申请空间信号量
sem_wait(&_roomSem); // 申请成功则 _roomSem--, 否则等待
pthread_mutex_lock(&_pMutex); // 申请信号量成功后, 加锁
_ringQueue[_pIndex] = in; // 将数据放入 数组
_pIndex++; // 生产者下一次生产数据的位置 ++
_pIndex %= _ringQueue.size(); // 跟新下标, 保证环形特性
pthread_mutex_unlock(&_pMutex); // 访问完临界资源, 解锁
sem_post(&_dataSem); // 数组中数据+1, 那么 dataSem 需要++
}
// 消费接口
T pop() {
// 消费数据
// 先申请数据信号量
sem_wait(&_dataSem); // 申请成功则 _dataSem--, 否则等待
pthread_mutex_lock(&_cMutex);
T tmp = _ringQueue[_cIndex]; // 存储应拿到的数据
_cIndex++;
_cIndex %= _ringQueue.size();
pthread_mutex_unlock(&_cMutex);
sem_post(&_roomSem); // 拿出了数据, 空间+1, 那么 _roomSem ++
return tmp;
}
private:
vector<T> _ringQueue; // 模拟循环队列的数组
sem_t _roomSem; // 空间资源信号量, 生产者申请
sem_t _dataSem; // 数据资源信号量, 消费者申请
uint32_t _pIndex; // 生产者生产数据的索引下标, 即插入数据的下标
uint32_t _cIndex; // 消费者消费数据的索引下标, 即获取数据的下标
// 保护索引下标的锁
pthread_mutex_t _cMutex; // 消费数据索引下标的锁
pthread_mutex_t _pMutex; // 生产数据索引下标的锁
};
申请到信号量之后上的锁
. 这是为什么?
申请到信号量 就表示其实可以看成已经获取到资源了
. 所以 申请到信号量之后再上锁, 可以实现让线程先预定资源
再等待的功能. 防止出现, 线程先因为锁 阻塞了一会, 终于抢到锁了 却申请不到资源的情况.#include "ringQueue.hpp"
#include <iostream>
#include <ctime>
#include <unistd.h>
using std::cout;
// 消费线程调用函数
void* consumer(void* args) {
sleep(10);
ringQueue<int>* ringQp = static_cast<ringQueue<int>*>(args);
while (true) {
sleep(1);
int data = ringQp->pop();
cout << "consumer_pthread[" << pthread_self() << "]"
<< " 消费了一个数据: " << data << endl;
}
}
// 生产线程调用函数
void* productor(void* args) {
ringQueue<int>* ringQp = static_cast<ringQueue<int>*>(args);
while (true) {
int data = rand() % 20;
ringQp->push(data);
cout << "productor_pthread[" << pthread_self() << "]"
<< " 生产了一个数据: " << data << endl;
usleep(500000);
}
}
int main() {
srand((unsigned long)time(nullptr) ^ getpid());
ringQueue<int> ringQ;
pthread_t con1, con2, con3, pro1, pro2, pro3;
pthread_create(&con1, nullptr, consumer, &ringQ);
pthread_create(&con2, nullptr, consumer, &ringQ);
pthread_create(&con3, nullptr, consumer, &ringQ);
pthread_create(&pro1, nullptr, productor, &ringQ);
pthread_create(&pro2, nullptr, productor, &ringQ);
pthread_create(&pro3, nullptr, productor, &ringQ);
pthread_join(con1, nullptr);
pthread_join(con2, nullptr);
pthread_join(con3, nullptr);
pthread_join(pro1, nullptr);
pthread_join(pro2, nullptr);
pthread_join(pro3, nullptr);
return 0;
}
没有出现生产或消费出错
的.打印首先是因为屏幕也是临界资源, 但我们没有保护.
其次是因为 cout
作者: 哈米d1ch 发表日期:2023 年 4 月 19 日