生产者消费者模型

​ 生产者消费者模型

生产者-消费者问题

生产者 ——> 放入数据 ——> 缓冲区 <—— 消费者

生产者-消费者问题描述:

  • ⽣产者在⽣成数据后,放在⼀个缓冲区中;
  • 消费者从缓冲区取出数据处理;
  • 任何时刻,只能有⼀个⽣产者或消费者可以访问缓冲区;

问题分析:

  • 任何时刻只能有⼀个线程操作缓冲区,说明操作缓冲区是临界代码,需要互斥
  • 缓冲区空时,消费者必须等待⽣产者⽣成数据;缓冲区满时,⽣产者必须等待消费者取出数据。说明⽣产者和消费者需要同步

需要三个信号量,分别是:

  • 互斥信号量 mutex :⽤于互斥访问缓冲区,初始化值为 1;
  • 资源信号量 fullBuffer :⽤于消费者询问缓冲区是否有数据,有数据则读取数据,初始化值为 0(表明缓冲区⼀开始为空);
  • 资源信号量 emptyBuffer :⽤于⽣产者询问缓冲区是否有空位,有空位则⽣成数据,初始化值为 n (缓冲区⼤⼩);

具体实现代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#define N 100
semaphore mutex = 1; //互斥信号量,用于临界区的互斥访问
semaphore emptyBuffer = N; //表示缓冲区 [空槽] 的个数
semaphore fullBuffer = 0; //表示缓冲区 [满槽] 的个数

//生产者线程函数
void producer(){
while(true){
P(emptyBuffer); //将空槽的个数 -1
P(mutex); //进入临界区
将生产的数据放到缓冲区中;
V(mutex); //离开临界区
V(fullBuffer); //将满槽的个数 +1
}
}

//消费者线程函数
void consumer(){
while(true){
P(fullBuffer); //将满槽个数 -1
P(mutex); //进入临界区
从缓冲区读取数据;
V(mutex); //离开临界区
V(emptyBuffer); //将空槽的个数 +1
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
package main

import (
"fmt"
"sync"
"time"
)

const N = 100

var (
buffer = make([]int, 0, N) // 缓冲区
mutex sync.Mutex // 互斥锁,用于临界区的互斥访问
emptyBuffer = make(chan struct{}, N) // 表示缓冲区 [空槽] 的个数
fullBuffer = make(chan struct{}, N) // 表示缓冲区 [满槽] 的个数
)

func init() {
// 初始化 emptyBuffer 为 N(缓冲区大小)
for i := 0; i < N; i++ {
emptyBuffer <- struct{}{}
}
// fullBuffer 初始化为 0(缓冲区一开始为空)
}

// 生产者协程函数
func producer(id int, wg *sync.WaitGroup) {
defer wg.Done()
for i := 0; i < 10; i++ {
<-emptyBuffer // 将空槽的个数 -1
mutex.Lock() // 进入临界区
data := id*100 + i
buffer = append(buffer, data)
fmt.Printf("生产者 %d 生产数据: %d\n", id, data)
mutex.Unlock() // 离开临界区
fullBuffer <- struct{}{} // 将满槽的个数 +1
time.Sleep(time.Millisecond * 100)
}
}

// 消费者协程函数
func consumer(id int, wg *sync.WaitGroup) {
defer wg.Done()
for i := 0; i < 10; i++ {
<-fullBuffer // 将满槽个数 -1
mutex.Lock() // 进入临界区
if len(buffer) > 0 {
data := buffer[0]
buffer = buffer[1:]
fmt.Printf("消费者 %d 消费数据: %d\n", id, data)
}
mutex.Unlock() // 离开临界区
emptyBuffer <- struct{}{} // 将空槽的个数 +1
time.Sleep(time.Millisecond * 150)
}
}

func main() {
var wg sync.WaitGroup

// 启动 3 个生产者
for i := 1; i <= 3; i++ {
wg.Add(1)
go producer(i, &wg)
}

// 启动 3 个消费者
for i := 1; i <= 3; i++ {
wg.Add(1)
go consumer(i, &wg)
}

wg.Wait()
fmt.Println("所有生产者和消费者已完成")
}

如果消费者线程⼀开始执⾏ P(fullBuffers) ,由于信号量 fullBuffers 初始值为 0,则此时 fullBuffers 的值从 0 变为 -1,说明缓冲区⾥没有数据,消费者只能等待。

接着,轮到⽣产者执⾏ P(emptyBuffers) ,表示减少 1 个空槽,如果当前没有其他⽣产者线程在临界区执⾏代码,那么该⽣产者线程就可以把数据放到缓冲区,放完后,执⾏V(fullBuffers) ,信号量 fullBuffers 从 -1 变成 0,表明有「消费者」线程正在阻塞等待数据,于是阻塞等待的消费者线程会被唤醒。

消费者线程被唤醒后,如果此时没有其他消费者线程在读数据,那么就可以直接进⼊临界区,从缓冲区读取数据。最后,离开临界区后,把空槽的个数 + 1。

-------------本文结束 感谢阅读-------------
0%