Topic : Multi-Threaded Programming
Author : LUPG
Page : << Previous 13  Next >>
Go to page :


p_cond_var;
    pool->requests = requests;

    return pool;
}

/* spawn a new handler thread and add it to the threads pool. */
void
add_handler_thread(struct handler_threads_pool* pool)
{
    struct handler_thread* a_thread;      /* thread's data       */
    struct handler_thread_params* params; /* thread's parameters */

    /* sanity check */
    assert(pool);

    /* create the new thread's structure and initialize it */
    a_thread = (struct handler_thread*)malloc(sizeof(struct handler_thread));
    if (!a_thread) {
 fprintf(stderr, "add_handler_thread: out of memory. exiting\n");
 exit(1);
    }
    a_thread->thr_id = pool->max_thr_id++;
    a_thread->next = NULL;

    /* create the thread's parameters structure */
    params = (struct handler_thread_params*)
                            malloc(sizeof(struct handler_thread_params));
    if (!params) {
 fprintf(stderr, "add_handler_thread: out of memory. exiting\n");
 exit(1);
    }
    params->thread_id = a_thread->thr_id;
    params->request_mutex = pool->p_mutex;
    params->got_request = pool->p_cond_var;
    params->requests = pool->requests;

    /* spawn the thread, and place its ID in the thread's structure */
    pthread_create(&a_thread->thread,
     NULL,
     handle_requests_loop,
     (void*)params);

    /* add the thread's structure to the end of the pool's list. */
    if (pool->num_threads == 0) { /* special case - list is empty */
        pool->threads = a_thread;
        pool->last_thread = a_thread;
    }
    else {
        pool->last_thread->next = a_thread;
        pool->last_thread = a_thread;
    }

    /* increase total number of threads by one. */
    pool->num_threads++;
}

/* remove the first thread from the threads pool (do NOT cancel the thread) */
static struct handler_thread*
remove_first_handler_thread(struct handler_threads_pool* pool)
{
    struct handler_thread* a_thread = NULL; /* temporary holder */

    /* sanity check */
    assert(pool);

    if (pool->num_threads > 0 && pool->threads) {
 a_thread = pool->threads;
 pool->threads = a_thread->next;
 a_thread->next = NULL;
        pool->num_threads--;
    }

    return a_thread;
}

/* delete the first thread from the threads pool (and cancel the thread) */
void
delete_handler_thread(struct handler_threads_pool* pool)
{
    struct handler_thread* a_thread; /* the thread to cancel */

    /* sanity check */
    assert(pool);

    a_thread = remove_first_handler_thread(pool);
    if (a_thread) {
 pthread_cancel(a_thread->thread);
        free(a_thread);
    }
}

/* get the number of handler threads currently in the threads pool */
int
get_handler_threads_number(struct handler_threads_pool* pool)
{
    /* sanity check */
    assert(pool);

    return pool->num_threads;
}

/*
* free the resources taken by the given requests queue,
* and cancel all its threads.
*/
void
delete_handler_threads_pool(struct handler_threads_pool* pool)
{
    void* thr_retval;               /* thread's return value */
    struct handler_thread* a_thread;  /* one thread's structure */

    /* sanity check */
    assert(pool);

    /* use pthread_join() to wait for all threads to terminate. */
    while (pool->num_threads > 0) {
 a_thread = remove_first_handler_thread(pool);
 assert(a_thread); /* sanity check */
 pthread_join(a_thread->thread, &thr_retval);
        free(a_thread);
    }
}





handler_thread.c


#include <stdio.h>       /* standard I/O routines                     */
#include <pthread.h>     /* pthread functions and data structures     */
#include <stdlib.h>      /* malloc() and free()                       */
#include <assert.h>      /* assert()                                  */

#include "requests_queue.h"   /* requests queue routines/structs      */
#include "handler_thread.h"   /* handler thread functions/structs     */

extern int done_creating_requests;   /* are we done creating new requests? */

/*
* function cleanup_free_mutex(): free the mutex, if it's locked.
* input:     pointer to a mutex structure.
* output:    none.
*/
static void
cleanup_free_mutex(void* a_mutex)
{
    pthread_mutex_t* p_mutex = (pthread_mutex_t*)a_mutex;

    if (p_mutex)
        pthread_mutex_unlock(p_mutex);
}

/*
* function handle_request(): handle a single given request.
* algorithm: prints a message stating that the given thread handled
*            the given request.
* input:     request pointer, id of calling thread.
* output:    none.
*/
static void
handle_request(struct request* a_request, int thread_id)
{
    if (a_request) {
 int i;
 /*
 printf("Thread '%d' handled request '%d'\n",
        thread_id, a_request->number);
 fflush(stdout);
 */
 for (i = 0; i<100000; i++)
     ;
    }
}

/*
* function handle_requests_loop(): infinite loop of requests handling
* algorithm: forever, if there are requests to handle, take the first
*            and handle it. Then wait on the given condition variable,
*            and when it is signaled, re-do the loop.
*            increases number of pending requests by one.
* input:     id of thread, for printing purposes.
* output:    none.
*/
void*
handle_requests_loop(void* thread_params)
{
    int rc;                     /* return code of pthreads functions.  */
    struct request* a_request;      /* pointer to a request.               */
    struct handler_thread_params *data;
        /* hadler thread's parameters */

    /* sanity check -make sure data isn't NULL */
    data = (struct handler_thread_params*)thread_params;
    assert(data);

    printf("Starting thread '%d'\n", data->thread_id);
    fflush(stdout);

    /* set my cancel state to 'enabled', and cancel type to 'defered'. */
    pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
    pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);

    /* set thread cleanup handler */
    pthread_cleanup_push(cleanup_free_mutex, (void*)data->request_mutex);

    /* lock the mutex, to access the requests list exclusively. */
    rc = pthread_mutex_lock(data->request_mutex);

#ifdef DEBUG
    printf("thread '%d' after pthread_mutex_lock\n", data->thread_id);
    fflush(stdout);
#endif /* DEBUG */

    /* do forever.... */
    while (1) {
        int num_requests = get_requests_number(data->requests);

#ifdef DEBUG
     printf("thread '%d', num_requests =  %d\n",
        data->thread_id, num_requests);
     fflush(stdout);
#endif /* DEBUG */
 if (num_requests > 0) { /* a request is pending */
     a_request = get_request(data->requests);
     if (a_request) { /* got a request - handle it and free it */
      /* unlock mutex - so other threads would be able to handle */
  /* other reqeusts waiting in the queue paralelly.          */
      rc = pthread_mutex_unlock(data->request_mutex);
  handle_request(a_request, data->thread_id);
  free(a_request);
      /* and lock the mutex again. */
      rc = pthread_mutex_lock(data->request_mutex);
     }
 }
 else {
     /* the thread checks the flag before waiting            */
     /* on the condition variable.                           */
     /* if no new requests are going to be generated, exit.  */
     if (done_creating_requests) {
                pthread_mutex_unlock(data->request_mutex);
  printf("thread '%d' exiting\n", data->thread_id);
  fflush(stdout);
  pthread_exit(NULL);
     }
     /* wait for a request to arrive. note the mutex will be */
     /* unlocked here, thus allowing other threads access to */
     /* requests list.                          


Page : << Previous 13  Next >>