Author : LUPG
Page : << Previous 13 Next >>
p_cond_var;
pool->requests = requests;
return pool;
}
/* spawn a new handler thread and add it to the threads pool. */
void
add_handler_thread(struct handler_threads_pool* pool)
{
struct handler_thread* a_thread; /* thread's data */
struct handler_thread_params* params; /* thread's parameters */
/* sanity check */
assert(pool);
/* create the new thread's structure and initialize it */
a_thread = (struct handler_thread*)malloc(sizeof(struct handler_thread));
if (!a_thread) {
fprintf(stderr, "add_handler_thread: out of memory. exiting\n");
exit(1);
}
a_thread->thr_id = pool->max_thr_id++;
a_thread->next = NULL;
/* create the thread's parameters structure */
params = (struct handler_thread_params*)
malloc(sizeof(struct handler_thread_params));
if (!params) {
fprintf(stderr, "add_handler_thread: out of memory. exiting\n");
exit(1);
}
params->thread_id = a_thread->thr_id;
params->request_mutex = pool->p_mutex;
params->got_request = pool->p_cond_var;
params->requests = pool->requests;
/* spawn the thread, and place its ID in the thread's structure */
pthread_create(&a_thread->thread,
NULL,
handle_requests_loop,
(void*)params);
/* add the thread's structure to the end of the pool's list. */
if (pool->num_threads == 0) { /* special case - list is empty */
pool->threads = a_thread;
pool->last_thread = a_thread;
}
else {
pool->last_thread->next = a_thread;
pool->last_thread = a_thread;
}
/* increase total number of threads by one. */
pool->num_threads++;
}
/* remove the first thread from the threads pool (do NOT cancel the thread) */
static struct handler_thread*
remove_first_handler_thread(struct handler_threads_pool* pool)
{
struct handler_thread* a_thread = NULL; /* temporary holder */
/* sanity check */
assert(pool);
if (pool->num_threads > 0 && pool->threads) {
a_thread = pool->threads;
pool->threads = a_thread->next;
a_thread->next = NULL;
pool->num_threads--;
}
return a_thread;
}
/* delete the first thread from the threads pool (and cancel the thread) */
void
delete_handler_thread(struct handler_threads_pool* pool)
{
struct handler_thread* a_thread; /* the thread to cancel */
/* sanity check */
assert(pool);
a_thread = remove_first_handler_thread(pool);
if (a_thread) {
pthread_cancel(a_thread->thread);
free(a_thread);
}
}
/* get the number of handler threads currently in the threads pool */
int
get_handler_threads_number(struct handler_threads_pool* pool)
{
/* sanity check */
assert(pool);
return pool->num_threads;
}
/*
* free the resources taken by the given requests queue,
* and cancel all its threads.
*/
void
delete_handler_threads_pool(struct handler_threads_pool* pool)
{
void* thr_retval; /* thread's return value */
struct handler_thread* a_thread; /* one thread's structure */
/* sanity check */
assert(pool);
/* use pthread_join() to wait for all threads to terminate. */
while (pool->num_threads > 0) {
a_thread = remove_first_handler_thread(pool);
assert(a_thread); /* sanity check */
pthread_join(a_thread->thread, &thr_retval);
free(a_thread);
}
}
#include <stdio.h> /* standard I/O routines */
#include <pthread.h> /* pthread functions and data structures */
#include <stdlib.h> /* malloc() and free() */
#include <assert.h> /* assert() */
#include "requests_queue.h" /* requests queue routines/structs */
#include "handler_thread.h" /* handler thread functions/structs */
extern int done_creating_requests; /* are we done creating new requests? */
/*
* function cleanup_free_mutex(): free the mutex, if it's locked.
* input: pointer to a mutex structure.
* output: none.
*/
static void
cleanup_free_mutex(void* a_mutex)
{
pthread_mutex_t* p_mutex = (pthread_mutex_t*)a_mutex;
if (p_mutex)
pthread_mutex_unlock(p_mutex);
}
/*
* function handle_request(): handle a single given request.
* algorithm: prints a message stating that the given thread handled
* the given request.
* input: request pointer, id of calling thread.
* output: none.
*/
static void
handle_request(struct request* a_request, int thread_id)
{
if (a_request) {
int i;
/*
printf("Thread '%d' handled request '%d'\n",
thread_id, a_request->number);
fflush(stdout);
*/
for (i = 0; i<100000; i++)
;
}
}
/*
* function handle_requests_loop(): infinite loop of requests handling
* algorithm: forever, if there are requests to handle, take the first
* and handle it. Then wait on the given condition variable,
* and when it is signaled, re-do the loop.
* increases number of pending requests by one.
* input: id of thread, for printing purposes.
* output: none.
*/
void*
handle_requests_loop(void* thread_params)
{
int rc; /* return code of pthreads functions. */
struct request* a_request; /* pointer to a request. */
struct handler_thread_params *data;
/* hadler thread's parameters */
/* sanity check -make sure data isn't NULL */
data = (struct handler_thread_params*)thread_params;
assert(data);
printf("Starting thread '%d'\n", data->thread_id);
fflush(stdout);
/* set my cancel state to 'enabled', and cancel type to 'defered'. */
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
/* set thread cleanup handler */
pthread_cleanup_push(cleanup_free_mutex, (void*)data->request_mutex);
/* lock the mutex, to access the requests list exclusively. */
rc = pthread_mutex_lock(data->request_mutex);
#ifdef DEBUG
printf("thread '%d' after pthread_mutex_lock\n", data->thread_id);
fflush(stdout);
#endif /* DEBUG */
/* do forever.... */
while (1) {
int num_requests = get_requests_number(data->requests);
#ifdef DEBUG
printf("thread '%d', num_requests = %d\n",
data->thread_id, num_requests);
fflush(stdout);
#endif /* DEBUG */
if (num_requests > 0) { /* a request is pending */
a_request = get_request(data->requests);
if (a_request) { /* got a request - handle it and free it */
/* unlock mutex - so other threads would be able to handle */
/* other reqeusts waiting in the queue paralelly. */
rc = pthread_mutex_unlock(data->request_mutex);
handle_request(a_request, data->thread_id);
free(a_request);
/* and lock the mutex again. */
rc = pthread_mutex_lock(data->request_mutex);
}
}
else {
/* the thread checks the flag before waiting */
/* on the condition variable. */
/* if no new requests are going to be generated, exit. */
if (done_creating_requests) {
pthread_mutex_unlock(data->request_mutex);
printf("thread '%d' exiting\n", data->thread_id);
fflush(stdout);
pthread_exit(NULL);
}
/* wait for a request to arrive. note the mutex will be */
/* unlocked here, thus allowing other threads access to */
/* requests list.
Page : << Previous 13 Next >>