手写线程池

​ 手写线程池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
#include<vector>
#include<string>
#include<list>
#include<thread>
#include<condition_variable>
using namespace std;
class ThreadPool {
public:
ThreadPool(int threadnum):started(false),thread_num(threadnum) {
//构造函数声明未启动和线程数量
}
~ThreadPool(){
//析构函数是停止,阻塞所有线程并将其从线程列表剔除后删除,清空线程列表。
stop();
for (int i = 0; i < thread_num; ++i) {
threadlist[i]->join();
}
for (int i = 0; i < thread_num; ++i) {
delete threadlist[i];
}
threadlist.clear(); //清空线程列表
}

void threadFunc(){}//线程执行函数,可自定义。
int getThreadNum() { return thread_num; }
void start() { //启动线程池函数,将num个线程绑定threadFunc自定义函数并执行,加入线程列表
if (thread_num > 0) {
started = true;
for (int i = 0; i < thread_num; ++i) {
thread* pthread = new thread(&threadFunc, this);
threadlist.push_back(pthread);
}
}
}
void stop() { //暂时停止线程,并由条件变量通知所有线程。
started = false;
condition.notify_all(); //用于唤醒所有等待条件变量的线程。
}

private:
int thread_num;
bool started;
vector<thread*> threadlist;
condition_variable condition;
};
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
package main

import (
"sync"
)

type ThreadPool struct {
threadNum int
started bool
stopChan chan struct{} // 用于通知所有goroutine停止
wg sync.WaitGroup // 用于等待所有goroutine完成
mu sync.Mutex // 保护started字段
}

// NewThreadPool 构造函数声明未启动和线程数量
func NewThreadPool(threadNum int) *ThreadPool {
return &ThreadPool{
threadNum: threadNum,
started: false,
stopChan: make(chan struct{}),
}
}

// Close 析构函数是停止,阻塞所有线程并将其从线程列表剔除后删除,清空线程列表。
func (tp *ThreadPool) Close() {
tp.Stop()
tp.wg.Wait() // 等待所有goroutine完成
close(tp.stopChan)
}

// threadFunc 线程执行函数,可自定义。
func (tp *ThreadPool) threadFunc() {
defer tp.wg.Done()
for {
select {
case <-tp.stopChan:
return
default:
// 自定义线程执行逻辑
}
}
}

// GetThreadNum 获取线程数量
func (tp *ThreadPool) GetThreadNum() int {
return tp.threadNum
}

// Start 启动线程池函数,将num个线程绑定threadFunc自定义函数并执行,加入线程列表
func (tp *ThreadPool) Start() {
tp.mu.Lock()
defer tp.mu.Unlock()

if tp.threadNum > 0 && !tp.started {
tp.started = true
for i := 0; i < tp.threadNum; i++ {
tp.wg.Add(1)
go tp.threadFunc()
}
}
}

// Stop 暂时停止线程,并由条件变量通知所有线程。
func (tp *ThreadPool) Stop() {
tp.mu.Lock()
defer tp.mu.Unlock()

if tp.started {
tp.started = false
close(tp.stopChan) // 用于唤醒所有等待条件变量的线程。
tp.stopChan = make(chan struct{}) // 重新创建channel以便可以再次启动
}
}
-------------本文结束 感谢阅读-------------
0%