根据相关资料,自己实现的线程池

  1 #include <stdio.h>
  2 #include <stdlib.h>
  3 #include <unistd.h>
  4 #include <pthread.h>
  5 #include <sys/types.h>
  6 
  7 typedef struct CThread_worker
  8 {
  9     void *(*process)(void *arg);
 10     void *arg;
 11     struct CThread_worker *next;
 12 }CThread_worker;
 13 
 14 
 15 typedef struct 
 16 {
 17     pthread_mutex_t queue_head;
 18     pthread_cond_t queue_ready;
 19 
 20     struct CThread_worker *queue_worker;
 21     pthread_t *pthread_id;
 22 
 23     int max_task_num;
 24     int cur_task_num;
 25     int shutdown;
 26 
 27 }CThread_pool;
 28 
 29 static CThread_pool *pool = NULL;
 30 
 31 void pool_init(int);
 32 void add_task_to_pool(void *(*)(void *), void *);
 33 void *pthread_fun(void *);
 34 void pool_destroy();
 35 void *my_process(void *);
 36 
 37 int main(int argc, char *argv[])
 38 {
 39     int max_task_num = 0;
 40     int i = 0;
 41     max_task_num = 5;
 42 
 43     pool_init(max_task_num);
 44      
 45     int *worker_num;
 46     worker_num = (int *)malloc(10 * sizeof(int));
 47     for (i=0; i<10; i++)
 48     {
 49         worker_num[i] = i;
 50         add_task_to_pool(my_process, &worker_num[i]);
 51     }
 52     
 53     sleep(12);
 54     pool_destroy();
 55     
 56     free(worker_num);
 57     worker_num = NULL;
 58 
 59     return 0;
 60 }
 61 
 62 
 63 void pool_init(int num)
 64 {
 65     int i = 0;    
 66     pool = (CThread_pool *)malloc(sizeof(CThread_pool));
 67     
 68     pthread_mutex_init(&(pool->queue_head), NULL);
 69     pthread_cond_init(&(pool->queue_ready), NULL);
 70 
 71     pool->queue_worker = NULL;
 72     pool->max_task_num = num;
 73     pool->cur_task_num = 0;
 74     pool->shutdown = 0;
 75     pool->pthread_id = (pthread_t *)malloc(num * sizeof(pthread_t));
 76 
 77     for (i=0; i<num; ++i)
 78     {
 79         pthread_create(&(pool->pthread_id[i]), NULL, pthread_fun, NULL);
 80     }
 81     
 82 }
 83 
 84 void *pthread_fun(void *arg)
 85 {
 86     CThread_worker *worker = NULL;
 87 
 88     printf("pthread %u is starting\n", (unsigned int)pthread_self());
 89     while (1)
 90     {
 91         pthread_mutex_lock(&(pool->queue_head));
 92         while (pool->cur_task_num == 0 && !pool->shutdown)
 93         {
 94             printf("pthread %u is waiting task...\n\n", (unsigned int)(pthread_self()));
 95             pthread_cond_wait(&(pool->queue_ready), &(pool->queue_head));
 96         }
 97         if (pool->shutdown)
 98         {
 99             /*线程退出之前,必须解锁,以让其他的线程得以访问该共享资源.*/
100             pthread_mutex_unlock(&(pool->queue_head));
101             printf("pthread %u is exiting\n", (unsigned int)pthread_self());
102             pthread_exit(NULL);
103         }
104 
105         pool->cur_task_num--;
106         
107         worker = pool->queue_worker;
108         pool->queue_worker = worker->next;
109 /*
110         while (worker->next != NULL)
111         {
112             worker = worker->next;
113         }    
114 */        
115         pthread_mutex_unlock(&(pool->queue_head));
116 
117         (*(worker->process))(worker->arg);
118     }
119     pthread_exit(NULL);
120 }
121 
122 void add_task_to_pool(void *(*my_process)(void *), void *arg)
123 {
124     CThread_worker *worker = NULL;
125     CThread_worker *p = NULL;
126     
127     worker = (CThread_worker *)malloc(sizeof(CThread_worker));
128     worker->process = my_process;
129     worker->arg = arg;
130     worker->next = NULL;
131 
132     pthread_mutex_lock(&(pool->queue_head));
133 
134     p = pool->queue_worker;
135     if ( p == NULL)
136     {
137         pool->queue_worker = worker;
138     }
139     else
140     {
141         while (p->next != NULL)
142         {
143             p = p->next;
144         }
145         p->next = worker;
146     }
147 
148     pool->cur_task_num++;
149 
150     pthread_mutex_unlock(&(pool->queue_head));
151 
152     pthread_cond_signal(&(pool->queue_ready));
153 
154 }
155 
156 
157 void pool_destroy()
158 {
159     int i = 0;
160     CThread_worker *p = NULL;
161 
162     pool->shutdown = 1;
163     /*唤醒等待该条件的所有线程.否则线程将处于阻塞状态,等待条件满足.*/
164     pthread_cond_broadcast(&(pool->queue_ready));    
165     /*阻塞等待线程退出,否则成为僵尸线程.*/
166     for (i=0; i<pool->max_task_num; ++i)
167     {
168         pthread_join(pool->pthread_id[i], NULL);
169     }
170     free (pool->pthread_id);
171 
172     while (pool->queue_worker != NULL)
173     {
174         p = pool->queue_worker;
175         pool->queue_worker = p->next;
176         free(p);
177     }
178     /*信号量和条件变量需要销毁.*/    
179     pthread_mutex_destroy(&(pool->queue_head));
180     pthread_cond_destroy(&(pool->queue_ready));
181 
182     free(pool);
183     /*为避免pool成为野指针,将其赋值为空.*/
184     pool = NULL;    
185 }
186 
187 void *my_process(void *task_id)
188 {
189     printf("thread %u is doing task %d\n",(unsigned int)pthread_self(), *(int *)task_id);
190     sleep(1);
191 }

 

郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。