一. 目的

​创建一个线程池管理模块,他仅需要满足如下功能即可:

  • 创建一个支持N线程的线程池
  • 销毁一个线程池
  • 线程池中的线程数量可动态调整
    • 向线程池中增加线程
    • 从线程池中减少线程
  • 可以向线程池中添加任务

二. 总体设计

每个线程交给一个worker的结构体进行管理,创建一个线程池就是创建一个worker队列,在对线程池进行CRUD时需要使用线程锁来保护数据。

​我们希望创建的线程阻塞在一个名为“job add”的信号上,只有我们为线程池添加任务的时候,线程才会去真正执行这个任务。

​我们将新创建的任务称为job,并存储在一个名为job的队列中,当为线程池添加任务的时候,就把任务入队,并且广播这个事件。

​线程池中线程数量的调整是一个特殊的job。

​我们希望在销毁线程池的时候终止所有job,并释放相关资源,所以针对job,他需要有一个cancel()的方法,在线程退出后我们希望他要尝试向主线程发送一个终止信号,以便主线程进行资源的回收。

三. 详细设计

3.1 结构体设计

​我们将从小到大,来设计整个模块中的结构体

1. 工作项

​job用来描述用户提供的任务,每个任务都应该有自己的执行状态、执行函数、取消函数、销毁函数。所以我们将他设计为如下结构。

​status字段表示当前的job进行到什么阶段了。也是被线程池调度时关键的参数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
typedef struct job_t job_t;
typedef enum job_status_t job_status_t;

enum job_status_t
{
JOB_STATUS_QUEUED = 0,
JOB_STATUS_EXECUTING,
JOB_STATUS_CANCELED,
JOB_STATUS_DONE,
};

struct job_t
{
job_status_t status;
bool (*cancel)(job_t *this);
void (*execute)(job_t *this);
void (*destory)(job_t *this);
};

为了管理多任务,我们job保存在一个队列中

1
2
3
4
5
6
7
8
9
typedef struct job_queue job_q_t;

struct job_entry
{
job_t job;
TAILQ_ENTRY(job_entry);
};

TAILQ_HEAD(job_queue, job_entry);

2. 异步执行环境

我们希望有一个结构体可以用来描述一个线程

1
2
3
4
5
6
7
8
9
typedef void *(*thread_main_t)(void *arg);
typedef struct thread_t thread_t;

struct thread_t
{
pthread_t tid;
thread_main_t main;
void *arg;
};

3. 工作队列

  • 节点:
    工作队列中的每个节点将会把异步环境(一个线程)和一个工作项(job结构体,记录工作函数)进行关联,同时,为了每个线程方便地了解整个线程池的状态(例如锁状态、信号量状态、期望的线程数量等),也会保留一个指向线程池的指针。
1
2
3
4
5
6
struct worker_thread_t
{
processor_t *processor;
job_t *job;
thread_t *thread;
};
  • 队列

这里使用的<sys/queue.h>提供的TAILQ队列。

1
2
3
4
5
6
7
8
9
typedef struct thread_queue thread_q_t;

struct worker_entry
{
worker_thread_t worker;
TAILQ_ENTRY(worker_entry) entries;
};

TAILQ_HEAD(thread_queue, worker_entry);

4. 线程池

线程池需要一些统计量来保存当前线程池状态,例如当前线程池线程数量,期望线程数量,执行工作的线程数量,一个管理线程的队列,一个管理任务的队列,一个保护线程池的锁,一个新增任务的信号。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
typedef struct processor_t processor_t;

struct processor_t
{
/* public interface */


/* private member */
int total_threads;
int desired_threads;
int working_threads;
thread_q_t threads;
job_q_t jobs;
pthread_mutex_t mutex;
pthread_cond_t job_add;
};

3.2 接口设计

本节将从上到下来设计所需要的接口

1. 线程池

创建线程池,取消所有worker线程,销毁线程池,配置线程池:

1
2
3
4
5
6
7
8
9
10
11
processor_t *processor_create();

struct processor_t
{
/* public interface */
void (*set_threads)(processor_t *this, int count);
void (*cancel)(processor_t *this);
void (*destory)(processor_t *this);

/* private member */
};

set_threads()将创建count个worker线程,每个worker线程阻塞等待job add信号。这里需要注意,不论是创建worker还是期望释放线程,都会广播job_add信号,结合process_jobs函数(后面会提到),就可以实现线程数的调整。

1
static void __processor_set_threads(processor_t *this, int count)

image-20231110170146608

​cancel()将期望子线程数归零,并通知每个job取消执行任务,并阻塞在一个线程终止信号上,如果等到当前子线程数归零,则开始回收线程资源,并释放worker队列和job队列。

image-20231111105122088

2. 创建线程实例

每个worker entry就代表一个线程实例。

1
static struct worker_entry *worker_create(processor_t *processor, thread_main_t main)

在这个函数中,我们会对worker_entry结构体中的字段进行初始化,核心的动作是创建一个线程实例,该线程实例将会调用第二个参数main所指的回调函数。

由于我们的worker要做的事情是等待job_add的信号,有job的时候去处理他,这也是线程池中的一个核心函数,我们会从job队列中取出job,并且去执行这个job。

由于在set_threads的时候会广播job_add信号,所以所有空闲的线程都会收到,如果发现我们期望释放这个线程,那么这个线程就会退出,实现了线程数量的调整。

1
static void *process_jobs(worker_thread_t *worker)

image-20231110170118453

3. thread

​这里仅仅是对pthread_create()的一层封装,期间会为thread_t结构体做初始化。有了这一层,我们可以对thread进行更好的控制,例如对线程清理函数的注册等,当然这些都是后话,未来会逐渐完善这些东西,目前我们只是打印当前线程的TID而已。

1
static thread_t *thread_create(thread_main_t main, worker_thread_t *worker);

4. job

​每个job代表用户期待要执行的任务,用户在创建job时就需要为他注册相关执行、取消、销毁函数。方便线程池进行管理。

​在这里引入线程池对job的两个方法:queue_job()和execute_job(),前者仅仅job入队,是否会被线程池调用取决于当先线程池是否有资源来出队这个job并去执行它。而后者会先查询当前是否有空闲线程,如果有,则将这个job加入到队列的首位,让他优先被执行。否则的话将在当前上下文中去执行这个job。

1
2
static void __processor_queue_job(processor_t *this, struct job_entry *entry);
static void __processor_execute_job(processor_t *this, struct job_entry *entry);

image-20231111125238745

3.3 任务调度

​我们现在有了线程池,有了将任务入队的接口,那么接下来就要分析下如果将任务出队并且来执行他们了。

1. 出队

​这个很简单,由于我们此时已经持有锁了,可以直接出队。

1
static bool get_job(processor_t *processor, worker_thread_t *worker);

2. 调度

​这是任务调度的一个核心函数。非常关键。

1
static void process_job(processor_t *processor, worker_thread_t *worker);

image-20231111140052213

​在描述这一段之前,我们要解释下job的一个属性——调度状态。

​job的exec方法进行优化,它返回值值将被用于判断调度行为。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
typedef enum job_requeue_type_t job_requeue_type_t;
typedef struct job_requeue_t job_requeue_t;

enum job_requeue_type_t {
JOB_REQUEUE_TYPE_NONE = 0,
JOB_REQUEUE_TYPE_FAIR,
JOB_REQUEUE_TYPE_DIRECT,
JOB_REQUEUE_TYPE_SCHEDULE,
};

struct job_requeue_t {
job_requeue_type_t type;
unsigned int rel;
};

​我们将job的调度状态分为四种。不调度直接释放、公平调度、不入队直接再次执行、周期性调度。

image-20231111141416660

四. 源码

threads_pool_00.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
#ifndef __THREADS_POOL_00_H__
#define __THREADS_POOL_00_H__

#include <pthread.h>
#include <stdbool.h>
#include <sys/queue.h>

typedef void *(*thread_main_t)(void *arg);

typedef struct processor_t processor_t;
typedef struct thread_t thread_t;
typedef struct worker_thread_t worker_thread_t;
typedef enum job_status_t job_status_t;
typedef enum job_requeue_type_t job_requeue_type_t;
typedef struct job_requeue_t job_requeue_t;
typedef struct job_t job_t;
typedef struct job_queue job_q_t;
typedef struct thread_queue thread_q_t;

struct thread_t
{
pthread_t tid;
thread_main_t main;
void *arg;
};

enum job_status_t
{
JOB_STATUS_QUEUED = 0,
JOB_STATUS_EXECUTING,
JOB_STATUS_CANCELED,
JOB_STATUS_DONE,
};

enum job_requeue_type_t {
JOB_REQUEUE_TYPE_NONE = 0,
JOB_REQUEUE_TYPE_FAIR,
JOB_REQUEUE_TYPE_DIRECT,
JOB_REQUEUE_TYPE_SCHEDULE,
};

struct job_requeue_t {
job_requeue_type_t type;
unsigned int rel;
};

struct job_t
{
job_status_t status;
bool (*cancel)(job_t *this);
job_requeue_t (*execute)(job_t *this);
void (*destory)(job_t *this);
};

struct job_entry
{
job_t *job;
TAILQ_ENTRY(job_entry) entries;
};

TAILQ_HEAD(job_queue, job_entry);

struct worker_thread_t
{
processor_t *processor;
job_t *job;
thread_t *thread;
};

struct worker_entry
{
worker_thread_t worker;
TAILQ_ENTRY(worker_entry) entries;
};

TAILQ_HEAD(thread_queue, worker_entry);

struct processor_t
{
/* public interface */
void (*set_threads)(processor_t *this, int count);
void (*queue_job) (processor_t *this, struct job_entry *job_entry);
void (*execute_job)(processor_t *this, struct job_entry *job_entry);
void (*cancel)(processor_t *this);
void (*destory)(processor_t *this);

/* private member */
int total_threads;
int desired_threads;
int working_threads;
thread_q_t threads;
job_q_t jobs;
pthread_mutex_t mutex;
pthread_cond_t job_add;
pthread_cond_t thread_terminated;
};

processor_t *processor_create();

#endif

threads_pool_00.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
#include "threads_pool_00.h"
#include <pthread.h>
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/queue.h>
#include <unistd.h>

static void *thread_main(thread_t *this)
{
void *res;

// printf( "created thread %ld\n", this->tid);
res = this->main(this->arg);
return res;
}

static thread_t *thread_create(thread_main_t main, worker_thread_t *worker)
{
thread_t *this = calloc(1, sizeof(*this));

this->main = main;
this->arg = worker;

if (pthread_create(&this->tid, NULL, (void *)thread_main, this) != 0)
{
free(this);
return NULL;
}
return this;
}

static struct worker_entry *worker_create(processor_t *processor, thread_main_t main)
{
struct worker_entry *entry = calloc(1, sizeof(*entry));

entry->worker.processor = processor;
entry->worker.thread = thread_create(main, &entry->worker);

if (entry->worker.thread == NULL)
{
fprintf(stderr, "create thread failed!\n");
free(entry);
return NULL;
}

return entry;
}

static void __processor_cancel(processor_t *this)
{
struct worker_entry *worker, *worker_del;
struct job_entry *job, *job_del;

pthread_mutex_lock(&this->mutex);
this->desired_threads = 0;

worker = TAILQ_FIRST(&this->threads);
while (worker)
{
if (worker->worker.job && worker->worker.job->cancel)
{
if (!worker->worker.job->cancel(worker->worker.job))
{
pthread_cancel(worker->worker.thread->tid);
}
}
worker = TAILQ_NEXT(worker, entries);
}

while (this->total_threads > 0)
{
pthread_cond_broadcast(&this->job_add);
pthread_cond_wait(&this->thread_terminated, &this->mutex);
}

worker_del = TAILQ_FIRST(&this->threads);
while (worker_del)
{
worker = TAILQ_NEXT(worker_del, entries);
pthread_join(worker_del->worker.thread->tid, NULL);
free(worker_del->worker.thread);
free(worker_del);
worker_del = worker;
}

job_del = TAILQ_FIRST(&this->jobs);
while (job_del)
{
job = TAILQ_NEXT(job_del, entries);
printf("destory\n");
job_del->job->destory(job_del->job);
free(job_del);
job_del = job;
}
pthread_mutex_unlock(&this->mutex);
}

static void __processor_destory(processor_t *this)
{
__processor_cancel(this);
pthread_mutex_destroy(&this->mutex);
pthread_cond_destroy(&this->job_add);

free(this);
}

static bool get_job(processor_t *processor, worker_thread_t *worker)
{
struct job_entry *job_entry;
job_entry = TAILQ_FIRST(&processor->jobs);

if (job_entry)
{
TAILQ_REMOVE(&processor->jobs, job_entry, entries);
worker->job = job_entry->job;
free(job_entry);
return true;
}
return false;
}

static void process_job(processor_t *processor, worker_thread_t *worker)
{
job_requeue_t requeue;
struct job_entry *job_entry;
job_t *this_job, *to_destory;

this_job = worker->job;
to_destory = NULL;
processor->working_threads++;

this_job->status = JOB_STATUS_EXECUTING;
pthread_mutex_unlock(&processor->mutex);

while (true)
{
requeue = this_job->execute(this_job);
if (requeue.type != JOB_REQUEUE_TYPE_DIRECT)
{
break;
}
else if (!this_job->cancel)
{
requeue.type = JOB_REQUEUE_TYPE_FAIR;
break;
}
}

pthread_mutex_lock(&processor->mutex);
processor->working_threads--;
if (this_job->status == JOB_STATUS_CANCELED)
{
to_destory = this_job;
}
else
{
switch (requeue.type)
{
case JOB_REQUEUE_TYPE_NONE:
this_job->status = JOB_STATUS_DONE;
to_destory = this_job;
break;
case JOB_REQUEUE_TYPE_FAIR:
this_job->status = JOB_STATUS_QUEUED;
job_entry = calloc(1, sizeof(*job_entry));
job_entry->job = this_job;
TAILQ_INSERT_TAIL(&processor->jobs, job_entry, entries);
pthread_cond_signal(&processor->job_add);
break;
case JOB_REQUEUE_TYPE_SCHEDULE:
printf("add a timer thread\n");
break;
default:
break;
}
}

worker->job = NULL;

if (to_destory)
{
pthread_mutex_unlock(&processor->mutex);
to_destory->destory(to_destory);
pthread_mutex_lock(&processor->mutex);
}

return ;
}

static void *process_jobs(worker_thread_t *worker)
{
processor_t *processor = worker->processor;

pthread_mutex_lock(&processor->mutex);
// printf( "started worker thread %ld\n", worker->thread->tid);

while (processor->desired_threads >= processor->total_threads)
{
if (get_job(processor, worker))
{
process_job(processor, worker);
}
else
{
pthread_cond_wait(&processor->job_add, &processor->mutex);
}
}
processor->total_threads--;
pthread_cond_signal(&processor->thread_terminated);
pthread_mutex_unlock(&processor->mutex);
// printf( "end worker thread %ld\n", worker->thread->tid);
return NULL;
}

static void __processor_set_threads(processor_t *this, int count)
{
pthread_mutex_lock(&this->mutex);

if (count > this->total_threads)
{
struct worker_entry *entry;
this->desired_threads = count;

#if 0
while (count > this->total_threads)
{
entry = worker_create(this, NULL);
if (entry)
{
TAILQ_INSERT_TAIL(&this->threads, entry, entries);
this->total_threads++;
}
}
#endif
for (int i = this->total_threads; i < count; i++)
{
entry = worker_create(this, (thread_main_t)process_jobs);
if (entry)
{
TAILQ_INSERT_TAIL(&this->threads, entry, entries);
this->total_threads++;
}
}
}
else if (count < this->total_threads)
{
this->desired_threads = count;
}

pthread_cond_broadcast(&this->job_add);
pthread_mutex_unlock(&this->mutex);
}

static void __processor_queue_job(processor_t *this, struct job_entry *entry)
{
pthread_mutex_lock(&this->mutex);
TAILQ_INSERT_TAIL(&this->jobs, entry, entries);
pthread_cond_signal(&this->job_add);
pthread_mutex_unlock(&this->mutex);
}

static void __processor_execute_job(processor_t *this, struct job_entry *entry)
{
bool queued = false;

pthread_mutex_lock(&this->mutex);
if (this->desired_threads && (this->total_threads > this->working_threads))
{
entry->job->status = JOB_STATUS_QUEUED;
TAILQ_INSERT_HEAD(&this->jobs, entry, entries);
queued = true;
}
pthread_cond_signal(&this->job_add);
pthread_mutex_unlock(&this->mutex);

if (!queued)
{
entry->job->execute(entry->job);
entry->job->destory(entry->job);
free(entry);
}
}

processor_t *processor_create()
{
processor_t *this = NULL;

this = calloc(1, sizeof(*this));
this->set_threads = __processor_set_threads;
this->queue_job = __processor_queue_job;
this->execute_job = __processor_execute_job;
this->cancel = __processor_cancel;
this->destory = __processor_destory;

pthread_mutex_init(&this->mutex, NULL);
pthread_cond_init(&this->job_add, NULL);
pthread_cond_init(&this->thread_terminated, NULL);

TAILQ_INIT(&this->threads);
TAILQ_INIT(&this->jobs);

return this;
}

static job_requeue_t __job_exec(job_t *this)
{
job_requeue_t requeue;
srand(time(NULL) + pthread_self());
int timeout = rand()%10;
sleep(timeout < 1 ? 1 : timeout);
printf("%ld [%s]\n", pthread_self(), __func__);
requeue.type = JOB_REQUEUE_TYPE_FAIR;
return requeue;
}

static void __job_destory(job_t *this)
{
printf("[%s]\n", __func__);
free(this);
}

static bool __job_cancel(job_t *this)
{
printf("[%s]\n", __func__);
return true;
}

int main(void)
{
processor_t *processor = processor_create();

processor->set_threads(processor, 4);

struct job_entry *entry1 = calloc(1, sizeof(*entry1));
job_t *job1 = calloc(1, sizeof(*job1));
job1->execute = __job_exec;
job1->destory = __job_destory;
job1->cancel = __job_cancel;
entry1->job = job1;

struct job_entry *entry2 = calloc(1, sizeof(*entry2));
job_t *job2 = calloc(1, sizeof(*job2));
job2->execute = __job_exec;
job2->destory = __job_destory;
job2->cancel = __job_cancel;
entry2->job = job2;

struct job_entry *entry3 = calloc(1, sizeof(*entry3));
job_t *job3 = calloc(1, sizeof(*job3));
job3->execute = __job_exec;
job3->destory = __job_destory;
job3->cancel = __job_cancel;
entry3->job = job3;

struct job_entry *entry4 = calloc(1, sizeof(*entry4));
job_t *job4 = calloc(1, sizeof(*job4));
job4->execute = __job_exec;
job4->destory = __job_destory;
job4->cancel = __job_cancel;
entry4->job = job4;

processor->queue_job(processor, entry1);
processor->queue_job(processor, entry2);
processor->queue_job(processor, entry3);
processor->execute_job(processor, entry4);

while(!getchar());
processor->destory(processor);

return 0;
}