Gamgee
You miserable little maggot. I'll stove your head in!
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
thread_pool.h
Go to the documentation of this file.
1 /*
2 Copyright (c) 2013 Genome Research Ltd.
3 Author: James Bonfield <jkb@sanger.ac.uk>
4 
5 Redistribution and use in source and binary forms, with or without
6 modification, are permitted provided that the following conditions are met:
7 
8  1. Redistributions of source code must retain the above copyright notice,
9 this list of conditions and the following disclaimer.
10 
11  2. Redistributions in binary form must reproduce the above copyright notice,
12 this list of conditions and the following disclaimer in the documentation
13 and/or other materials provided with the distribution.
14 
15  3. Neither the names Genome Research Ltd and Wellcome Trust Sanger
16 Institute nor the names of its contributors may be used to endorse or promote
17 products derived from this software without specific prior written permission.
18 
19 THIS SOFTWARE IS PROVIDED BY GENOME RESEARCH LTD AND CONTRIBUTORS "AS IS" AND
20 ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21 WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22 DISCLAIMED. IN NO EVENT SHALL GENOME RESEARCH LTD OR CONTRIBUTORS BE LIABLE
23 FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24 DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
25 SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26 CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
27 OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28 OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29 */
30 
31 /*
32  * This file implements a thread pool for multi-threading applications.
33  * It consists of two distinct interfaces: thread pools an results queues.
34  *
35  * The pool of threads is given a function pointer and void* data to pass in.
36  * This means the pool can run jobs of multiple types, albeit first come
37  * first served with no job scheduling.
38  *
39  * Upon completion, the return value from the function pointer is added to
40  * a results queue. We may have multiple queues in use for the one pool.
41  *
42  * An example: reading from BAM and writing to CRAM with 10 threads. We'll
43  * have a pool of 10 threads and two results queues holding decoded BAM blocks
44  * and encoded CRAM blocks respectively.
45  */
46 
47 #ifndef _THREAD_POOL_H_
48 #define _THREAD_POOL_H_
49 
50 #include <pthread.h>
51 
52 struct t_pool;
53 struct t_results_queue;
54 
55 typedef struct t_pool_job {
56  void *(*func)(void *arg);
57  void *arg;
58  struct t_pool_job *next;
59 
60  struct t_pool *p;
61  struct t_results_queue *q;
62  int serial;
63 } t_pool_job;
64 
65 typedef struct t_res {
66  struct t_res *next;
67  int serial; // sequential number for ordering
68  void *data; // result itself
70 
71 struct t_pool;
72 
73 typedef struct {
74  struct t_pool *p;
75  int idx;
76  pthread_t tid;
77  pthread_cond_t pending_c;
78  long long wait_time;
80 
81 typedef struct t_pool {
82  int qsize; // size of queue
83  int njobs; // pending job count
84  int nwaiting; // how many workers waiting for new jobs
85  int shutdown; // true if pool is being destroyed
86 
87  // queue of pending jobs
89 
90  // threads
91  int tsize; // maximum number of jobs
93 
94  // Mutexes
95  pthread_mutex_t pool_m; // used when updating head/tail
96 
97  pthread_cond_t empty_c;
98  pthread_cond_t pending_c; // not empty
99  pthread_cond_t full_c;
100 
101  // array of worker IDs free
103 
104  // Debugging to check wait time
105  long long total_time, wait_time;
106 } t_pool;
107 
108 typedef struct t_results_queue {
113  int queue_len; // number of items in queue
114  int pending; // number of pending items (in progress or in pool list)
115  pthread_mutex_t result_m;
116  pthread_cond_t result_avail_c;
118 
119 
120 /*
121  * Creates a worker pool of length qsize with tsize worker threads.
122  *
123  * Returns pool pointer on success;
124  * NULL on failure
125  */
126 t_pool *t_pool_init(int qsize, int tsize);
127 
128 /*
129  * Adds an item to the work pool.
130  *
131  * FIXME: Maybe return 1,0,-1 and distinguish between job dispathed vs
132  * result returned. Ie rather than blocking on full queue we're permitted
133  * to return early on "result available" event too.
134  * Caller would then have a while loop around t_pool_dispatch.
135  * Or, return -1 and set errno to E_AGAIN to indicate job not yet submitted.
136  *
137  * Returns 0 on success
138  * -1 on failure
139  */
141  void *(*func)(void *arg), void *arg);
143  void *(*func)(void *arg), void *arg, int nonblock);
144 
145 /*
146  * Flushes the pool, but doesn't exit. This simply drains the queue and
147  * ensures all worker threads have finished their current task.
148  *
149  * Returns 0 on success;
150  * -1 on failure
151  */
152 int t_pool_flush(t_pool *p);
153 
154 /*
155  * Destroys a thread pool. If 'kill' is true the threads are terminated now,
156  * otherwise they are joined into the main thread so they will finish their
157  * current work load.
158  *
159  * Use t_pool_destroy(p,0) after a t_pool_flush(p) on a normal shutdown or
160  * t_pool_destroy(p,1) to quickly exit after a fatal error.
161  */
162 void t_pool_destroy(t_pool *p, int kill);
163 
164 /*
165  * Pulls a result off the head of the result queue. Caller should
166  * free it (and any internals as appropriate) after use. This doesn't
167  * wait for a result to be present.
168  *
169  * Results will be returned in strict order.
170  *
171  * Returns t_pool_result pointer if a result is ready.
172  * NULL if not.
173  */
176 
177 /*
178  * Frees a result 'r' and if free_data is true also frees
179  * the internal r->data result too.
180  */
181 void t_pool_delete_result(t_pool_result *r, int free_data);
182 
183 /*
184  * Initialises a results queue.
185  *
186  * Results queue pointer on success;
187  * NULL on failure
188  */
190 
191 /* Deallocates memory for a results queue */
193 
194 /*
195  * Returns true if there are no items on the finished results queue and
196  * also none still pending.
197  */
199 
200 /*
201  * Returns the number of completed jobs on the results queue.
202  */
204 
205 /*
206  * Returns the number of completed jobs plus the number queued up to run.
207  */
209 
210 #endif /* _THREAD_POOL_H_ */
int curr_serial
Definition: thread_pool.h:112
struct t_pool_job t_pool_job
long long wait_time
Definition: thread_pool.h:105
int nwaiting
Definition: thread_pool.h:84
t_pool_result * t_pool_next_result_wait(t_results_queue *q)
Definition: thread_pool.c:175
long long total_time
Definition: thread_pool.h:105
int next_serial
Definition: thread_pool.h:111
int t_pool_results_queue_len(t_results_queue *q)
Definition: thread_pool.c:217
struct t_pool * p
Definition: thread_pool.h:74
int t_pool_dispatch(t_pool *p, t_results_queue *q, void *(*func)(void *arg), void *arg)
Definition: thread_pool.c:488
long long wait_time
Definition: thread_pool.h:78
int serial
Definition: thread_pool.h:67
int t_pool_results_queue_sz(t_results_queue *q)
Definition: thread_pool.c:227
void * arg
Definition: thread_pool.h:57
Definition: thread_pool.h:81
struct t_results_queue t_results_queue
t_pool_worker_t * t
Definition: thread_pool.h:92
pthread_mutex_t result_m
Definition: thread_pool.h:115
pthread_mutex_t pool_m
Definition: thread_pool.h:95
int tsize
Definition: thread_pool.h:91
pthread_t tid
Definition: thread_pool.h:76
struct t_pool t_pool
struct t_res t_pool_result
Definition: thread_pool.h:65
t_results_queue * t_results_queue_init(void)
Definition: thread_pool.c:257
int t_pool_flush(t_pool *p)
Definition: thread_pool.c:634
int queue_len
Definition: thread_pool.h:113
int t_pool_dispatch2(t_pool *p, t_results_queue *q, void *(*func)(void *arg), void *arg, int nonblock)
Definition: thread_pool.c:550
pthread_cond_t pending_c
Definition: thread_pool.h:98
pthread_cond_t pending_c
Definition: thread_pool.h:77
int qsize
Definition: thread_pool.h:82
pthread_cond_t result_avail_c
Definition: thread_pool.h:116
t_pool_job * tail
Definition: thread_pool.h:88
int * t_stack
Definition: thread_pool.h:102
t_pool_result * t_pool_next_result(t_results_queue *q)
Definition: thread_pool.c:157
int idx
Definition: thread_pool.h:75
int t_stack_top
Definition: thread_pool.h:102
int t_pool_results_queue_empty(t_results_queue *q)
Definition: thread_pool.c:203
void * data
Definition: thread_pool.h:68
int njobs
Definition: thread_pool.h:83
t_pool * t_pool_init(int qsize, int tsize)
Definition: thread_pool.c:421
struct t_results_queue * q
Definition: thread_pool.h:61
struct t_pool * p
Definition: thread_pool.h:60
int shutdown
Definition: thread_pool.h:85
int pending
Definition: thread_pool.h:114
pthread_cond_t full_c
Definition: thread_pool.h:99
Definition: thread_pool.h:108
Definition: thread_pool.h:73
struct t_pool_job * next
Definition: thread_pool.h:58
void t_results_queue_destroy(t_results_queue *q)
Definition: thread_pool.c:274
struct t_res * next
Definition: thread_pool.h:66
pthread_cond_t empty_c
Definition: thread_pool.h:97
void t_pool_destroy(t_pool *p, int kill)
Definition: thread_pool.c:670
t_pool_job * head
Definition: thread_pool.h:88
t_pool_result * result_head
Definition: thread_pool.h:109
int serial
Definition: thread_pool.h:62
t_pool_result * result_tail
Definition: thread_pool.h:110
void t_pool_delete_result(t_pool_result *r, int free_data)
Definition: thread_pool.c:241
Definition: thread_pool.h:55