10-信号量和管程

发布时间 2023-11-02 20:10:48作者: Oh,mydream!

10-信号量和管程

背景

  • 并发问题:竞争条件(竞态条件)
    多程序并发存在问题
  • 同步
    多线程共享数据的协调执行
    包括互斥与条件同步
    互斥:在同一时间只有一个线程可以执行临界区
  • 确保同步正确很难?
    需要高层次的编程抽象(如:锁)
    从底层硬件支持编译

多程序理念 临界区
高层次抽象 锁
硬件支持 禁用中断 原子操作(如:test-and-set) 原子(load/store)

信号量

  • 抽象数据类型
    一个整形(sem),两个原子操作
    P():sem减1,如果sem<0,等待,否则继续
    V():sem加1,如果sem<=1,唤醒一个等待的P

P:Prolaag(荷兰语:尝试减少)
V:Verhoog(荷兰语:增加)

  • 信号量是整数

  • 信号量是被保护的变量
    初始化完成后,唯一改变一个信号量的值的方法是通过P()和V()
    操作必须是原子

  • P()能够阻塞,V()不会阻塞

  • 我们假定信号量是“公平的”
    没有线程被阻塞在P()如果V()被无限频繁调用(在同一个信号量)
    在实践中,FIFO经常被使用

  • 两种信号类型
    二进制信号:可以是0或1
    一般、计数信号量:可取任何非负值
    两者相互表现(给定一个可以实现另一个)

  • 信号量可以用在2个方面
    互斥
    条件同步(调度约束--一个线程等待另一个线程的事情发生)

信号量使用

用二进制信号量实现的互斥

mutex = new Semaphore(1);

mutex->P();
...
Critical Section;
...
mutex->V();

用二进制信号量实现的调度约束

condition = new Semaphore(0);

// Thread A
...
condition->P();
...

// Thread B
···
condition->V();
···

P()等待,V()发出信号

  • 一个线程等待另一个线程处理事情
    比如生产东西或者消费东西
    互斥(锁机制)是不够的

  • 例如:有界缓冲区的生产者-消费者问题
    一个或多个生产者产生数据将数据放在一个缓冲区里
    单个消费者每次从缓冲区去取数据
    在任何一个时间只有一个生产者或消费者可以访问该缓冲区

  • 正确性要求
    在任何一个时间只能有一个线程操作缓冲区(互斥)
    当缓冲区为空,消费者必须等待生产者(调度/同步约束)
    当缓存区满,生产者必须等待消费者(调度、同步约束)

  • 每个约束用一个单独的信号量
    二进制信号量互斥
    一般信号量fullBuffers
    一般信号量emptyBuffers

class BounderBuffer {
  mutex = new Semaphore(1);
  fullBuffers = new Semaphore(0);
  emptyBuffers = new Semaphore(n);
}

BounderBuffer::Deposite(c) {
  emptyBuffers->P();
  mutex->P();
  Add c to the buffer;
  mutex->V();
  fullBuffers->V();
}

BounderBuffer::Remove(c) {
    fullBuffers->P();
    mutex->P();
    Remove c from buffer;
    mutex -> V();
    emptyBuffers->V();
}

信号量实现

使用硬件原语
禁用中断
原子指令(test-and-set)

类似锁
例如:使用‘禁用中断’

class Semaphore {
    int sem;
    WaitQueue q;
}

Semaphore::P() {
    sem--;
    if(sem<0) {
        Add this thread t to q;
        block(p);
    }
}

Semaphore::V() {
    sem++;
    if(sem<=0) {
        Remove a thread t from q;
        wakeup(t);
    }
}
  • 信号量的双用途
    互斥和条件同步
    但等待条件是独立的互斥
  • 读/开发代码比较困难
    程序员必须非常精通信号量
  • 容易出错
    使用信号量已经被另一个线程占用
    忘记释放信号量
  • 不能够处理死锁问题

管程(英文:monitor)

  • 目的:分离互斥和条件同步的关注

  • 什么是管程
    一个锁:指定临界区
    0或者多个条件变量:等待/通知信号量用于管理并发访问共享数据

  • 一般方法
    收集在对象/模块中的相关共享数据
    定义方法来访问共享数据

  • Lock
    Lock::Acquire() - 等待直到锁可用,然后抢占锁
    Lock::Release() - 释放锁,唤醒等待者如果有

  • Condition Variable

    • 允许等待状态进入临界区
      允许处于等待(睡眠)的线程进入临界区
      某个时刻原子释放锁进入睡眠
    • Wait() operation
      释放锁,睡眠,重新获得锁后返回
    • Signal() operation (or broadcast() operation)
      唤醒等待者(或者所有等待者),如果有
  • 实现
    需要维持每个条件队列
    现成等待的天剑等待signal()

Class Condition {
    int numWaiting = 0;
    WaitQueue q;
}

Condition::Wait(lock){
    numWaiting++;
    Add this thread t to q;
    release(lock);
    schedule();// need mutex
    require(lock);
}

Condition::Signal(){
    if(numWaiting>0) {
        Remove a thread t from q;
        wakeup(t);
        numWaiting--;
    }
}

class BoundedBuffer {
    ...
    Lock lock;
    int count =0;
    Condition notFull, notEmpty;
}

BoundedBuffer::Deposit(t) {
    LOCK->Acquire();
    while(count ==n)
      notFull.wait(&lock);
    Add c to the buffer;
    count++;
    notEmpty.Signal();
    lock->Release();
}

BoundedBuffer::Remove(c){
    lock->Acquire();
    while(count ==0)
      notEmpty.Wait(&lock);
    Remove c from buffer;
    count--;
    notFull.Signal();
    lock->Release();
}

根据执行完Signal()方法后立即执行新的线程还是执行原有的Signal()所在的线程

Hansen-style(执行原有的Signal()所在的线程, 大部分真实的操作系统,或者java等语言)

l.acquire()
...
x.wait()   T1 blocks
    T2 starts l.acquire()
              ...
              x.signal()
              ...
   T2 fhinish l.release()
...        T1 resumes
l.release()     

Hoare-style(立即执行新的线程,大部分教科书中的做法)

l.acquire()
...
x.wait()   T1 blocks
    T2 starts l.acquire()
              ...
              x.signal()
...         T1 resumes
l.release() T1 finishes
   T2 resumes ...
              l.release()

实现
Hansen-style
信号只是一个条件可能为真的提示
需要再次确认
优点:实现高效

Hansen-style:Deposit(){
    lock->acquire();
    while(count==n) {
        notFull.wait(&lock);
    }
    Add thing;
    count++;
    notEmpty.signal();
    lock->release();
}

Hoare-style
清晰,好证明
当条件变量改变时,算法不变
实现不高效

并发编程: 临界区 Monitor
高层抽象:信号量 锁 条件变量
硬件支持:禁用中断 原子指令(如test-and-set) 原子操作(Load/Store)

Hoare-style:Deposit(){
    lock->acquire();
    if(count==n) {
        notFull.wait(&lock);
    }
    Add thing;
    count++;
    notEmpty.signal();
    lock->release();
}

经典同步问题

读者-写者问题

  • 动机:共享数据的访问

  • 两种类型使用者
    读者:不需要修改数据
    写者:读取和修改数据

  • 问题的约束
    允许同一时间有多个读者,但在任何时候只有一个写者
    当没有写者时读者才能访问数据
    当没有读者和写者时写者才能访问数据
    在任何时候只能有一个线程可以操作共享变量

  • 多个并发进程的数据集共享
    读者-只读数据集;他们不执行任何更新

  • 共享数据
    数据集
    信号量CountMutex初始化为1
    信号量WriteMutex初始化为1
    整数Rcount初始化为0

  • 读者优先策略

# Writer
sem_wait(WriteMutex);
write;
sem_post(WriteMutex);


# Reader
sem_wait(CountMutex);
if(Rcount==0)
  sem_wait(WriteMutex);
++Rcount;
sem_post(CountMutex);

read;
sem_wait(CountMutex);
--Rcount;
if(Rcount==0)
  sem_post(WriteMutex);
sem_post(CountMutex)

基于读者优先策略的方法:只要有一个读者处于活动状态,后来的读者都会被接纳。如果读者源源不断地出现的话,那么写者就始终处于阻塞状态
基于写者优先策略的方法:一旦写者就绪,那么写者会尽可能快的执行写操作。如果写者远远不断地出现的话,那么读者就是始终处于阻塞状态

基于写优先的读者-写者问题

基本结构: 两个方法

DataBase::Read() {
    Wait until no writers;
    read database;
    Check out - wake up waiting writers;
}

DataBase::Write() {
    Wait until no readers/writers;
    write database;
    Check out - wake up waiting readers/writers;
}

管程的状态变量

AR = 0;        // active readers 
AW = 0;        // active writers
WR = 0;        // waiting readers
WW = 0;        // waiting writers
Condition okToRead;
Condition okToWrite;
Lock lock;

总体代码

AR = 0;        // active readers 
AW = 0;        // active writers
WR = 0;        // waiting readers
WW = 0;        // waiting writers
Condition okToRead;
Condition okToWrite;
Lock lock;

public Database::Read() {
    //Wait until no writers;
    StartRead();
    read database;
    //check out - wake up waiting writers;
    DoneRead(); 
}

Private DataBase::StartRead() {
    lock.Acquire();
    while((AW+WW)>0) {
        WR++;
        okToRead.wait(&lock);
        WR--;
    }
    AR++;
    lock.Release();
}

Private DataBase::DoneRead() {
    lock.Acquire();
    AR--;
    if(AR==0 && WW>0) {
        okToWrite.signal();
    }
    lock.Release();
}

public DataBase::Write() {
    //Wait until no readers/writers;
    StartWrite();
    write database;
    //Check out - wake up waiting readers/writers;
    DoneWrite();
}

private Database::StartWrite(){
    lock.Acquire();
    while((AW+AR)>0) {
        WW++;
        okToWrite.wait(&lock);
        WW--;
    }
    AW++;
    lock.Release();
}

private Database::DoneWrite() {
    lock.Acquire();
    AW--;
    if(WW>0) {
        okToWrite.signal();
    } else if(WR>0) {
        okToRead.broadcast();
    }
    lock.Release();
}

哲学家就餐问题

问题描述:(1965年由Gijkstra首先提出并解决)5个哲学家围绕一张圆桌而坐,桌子上放着5支叉子,每两个哲学家之间放一支;哲学家的动作包括思考和进餐,进餐时需要同时拿起他左边和右边的两只叉子,思考时则同时将两只叉子放回原处。如何保证哲学家们的动作有序进行?如:不出现有人永远拿不到叉子

共享数据
Bowl of rice (data set)
Semaphore fork [5] initialized to 1
take_fork(i): P(fork[i]) put_fork(i): V(fork[i])

方法一:不正确,当大家都左手拿着叉子,可能导致死锁

# define N 5 // 哲学家个数
void philosopher(int i) // 哲学家编号:0-4
while(TRUE){
    think(); // 哲学家在思考
    take_fork(i); // 去拿左边的叉子
    take_fork((i+1)%N) // 去拿右边的叉子
    eat(); // 吃面条中
    put_fork(i);  // 放下左边的叉子
    put_fork((i+1)%N) // 放下右边的叉子
}

方法二:对拿叉子的进程进行了改进,但仍然不正确,由于等待时间一致,大家总是一起拿起左叉子,又一起放下

# define N 5 // 哲学家个数
void philosopher(int i) // 哲学家编号:0-4
while(TRUE){
    take_fork(i); // 去拿左边的叉子
    if (fork(i+1)%N){ // 右边的叉子还在吗?
        take_fork((i+1)%N) // 去拿右边的叉子
        break
    } else{
        put_fork(i);      // 放下左边的叉子
        wait_some_time(); // 等待一会儿
    }
}

方法三:等待时间随机变化。可行,但有可能会导致有人一直饥饿

# define N 5 // 哲学家个数
void philosopher(int i) // 哲学家编号:0-4
while(TRUE){
    take_fork(i); // 去拿左边的叉子
    if (fork(i+1)%N){ // 右边的叉子还在吗?
        take_fork((i+1)%N) // 去拿右边的叉子
        break
    } else{
        put_fork(i);      // 放下左边的叉子
        wait_random_time(); // 等待随机时长
    }
}

方法四:互斥范文,正确,但每次只允许一个人,造成资源的浪费

semaphore mutex
void philosopher(int i) // 哲学家编号:0-4
while(TRUE){
    think(); // 哲学家思考
    P(mutex);
        take_fork(i); // 去拿左边的叉子
        take_fork((i+1)%N) // 去拿右边的叉子
        eat(); // 吃面条中
        put_fork(i);  // 放下左边的叉子
        put_fork((i+1)%N) // 放下右边的叉子
    V(mutex);
}

正确解法

思路1:要么不拿,要么就拿两把叉子。
S1 思考中。。。
S2 进入饥饿状态;
S3 如果左邻居或右邻居正在进餐,等待;否则转S4
S4 拿起两把叉子
S5 吃面条
S6 放下左叉子
S7 放下右叉子
S8 新的循环又开始了

思路2:计算机程序怎么来解决这个问题?
指导原则:不能浪费CPU时间;进程间相互通信
S1 思考中
S2 进入饥饿状态;
S3 如果左邻居或右邻居正在进餐,进程进入阻塞状态;否则转S4
S4 拿起两把叉子
S5 吃面条
S6 放下左叉子,看看左邻居现在能否进餐(饥饿状态,两把叉子都在),若能则唤醒之
S7 放下右叉子,看看右邻居现在能否进餐(饥饿状态,两把叉子都在),若能则唤醒之
S8 新的循环又开始了 ,转S1

思路3:怎么样来编写程序?

  1. 必须有数据结构,来描述每个哲学家的当前状态;
  2. 该状态是一个临界资源,各个哲学家对它的访问应该互斥地进行--进程互斥
  3. 一个哲学家吃饱后,可能要唤醒它的左邻右舍,两者之间存在着同步关系--进程同步
// 1. 必须有数据结构,来描述每个哲学家的当前状态;
#define N 5          // 哲学家个数
#define LEFT i       // 第i个哲学家的左邻居
#define RIGHT (i+1)%N // 第i个哲学家的右邻居 
#define THINKING 0   // 思考状态
#define HUNGRY 1     // 饥饿状态
#define EATING 2     // 进餐状态
int state[N];        // 记录每个人的状态
// 2. 该状态是一个临界资源,各个哲学家对它的访问应该互斥地进行--进程互斥
semaphore mutex // 互斥信号量,初值1
// 3. 一个哲学家吃饱后,可能要唤醒它的左邻右舍,两者之间存在着同步关系--进程同步
semaphore s[N] // 同步信号量,初值0

void philosopher(int i) { // i的取值:0到N-1
    while(TRUE){          // 封闭式循环
        think();          // 思考中 S1
        take_forks(i);    // 拿到两把叉子或被阻塞 S2-S4
        eat();            // 吃面条中 S5
        put_forks(i);     // 把两把叉子放回原处 S6-S7
    }
}

// 功能:要么拿到两把叉子,要么被阻塞起来
void take_forks(int i) {             // i的取值:0到N-1
    P(mutex);                        // 进入临界区
    state[i] = HUNGRY;               // 我饿了
    test_take_left_right_forks(i);   // 试图拿两把叉子
    V(mutex);                        // 退出临界区
    P(s[i]);                         // 没有叉子便阻塞
}

void test_take_left_right_forks(int i) { // i:0到N-1
    if(state[i]==HUNGRY && state[LEFT] != EATING && state[RIGHT] != EATING) { //i:我自己,or 其他人
        state[i] =EATING; // 两把叉子到手
        V[s[i];]          // 通知第i人可以吃饭了
    }
}


void put_forks(int i) {               // i的取值:0到N-1
    P(mutex);                         // 进入临界区
    state[i]=THINKING;                // 交出两把刷子
    test_take_left_right_forks(LEFT); // 看左邻居能否进餐
    test_take_left_right_forks(RIGHT);// 看右邻居能否进餐
    V(mutex);                         // 退出临界区
}