In this problem, we shall redo the bounded buffer problem using a monitor and condition variables. The desired monitor has the buffer and the two pointers as its local variable and provides an initialization function BufferInit(), a function for retrieving data PUT() and a function for depositing data PUT(). The function prototypes are shown below.
Click here to download a copy of this file.
void BufferInit(void); /* initializes the monitor */ int GET(void); /* get an item from buffer */ int PUT(int value); /* add an item to buffer */
The following is the implementation file, buf-m-2.c. Click here to download a copy of this program.
#include <thread.h> #include <synch.h> #include "buf-m-2.h" #define BUF_SIZE 5 /* buffer size */ static int Buf[BUF_SIZE]; /* the actual buffer */ static int in; /* next empty location */ static int out; /* next filled location */ static int count; /* no. of items in buffer */ static mutex_t MonitorLock; /* monitor lock */ static cond_t UntilNotFull; /* wait until full cv */ static cond_t UntilNotEmpty; /* wait until full cv */ void BufferInit(void) { in = out = count = 0; mutex_init(&MonitorLock, USYNC_THREAD, (void *) NULL); cond_init(&UntilNotFull, USYNC_THREAD, (void *) NULL); cond_init(&UntilNotEmpty, USYNC_THREAD, (void *) NULL); } int GET(void) { int value; mutex_lock(&MonitorLock); /* lock the monitor */ while (count == 0) /* while nothing in buffer */ cond_wait(&UntilNotEmpty, &MonitorLock); /* then wait*/ value = Buf[out]; /* retrieve an item */ out = (out + 1) % BUF_SIZE; /* advance out pointer */ count--; /* decrease counter */ cond_signal(&UntilNotFull); /* signal consumers */ mutex_unlock(&MonitorLock); /* release monitor */ return value; /* return retrieved vague */ } int PUT(int value) { mutex_lock(&MonitorLock); /* lock the buffer */ while (count == BUF_SIZE) /* while buffer is full */ cond_wait(&UntilNotFull, &MonitorLock); /* then wait */ Buf[in] = value; /* add the value to buffer */ in = (in + 1) % BUF_SIZE; /* advance in pointer */ count++; /* increase count */ cond_signal(&UntilNotEmpty); /* signal producers */ mutex_unlock(&MonitorLock); /* release the lock */ return value; /* return the value */ }
The main program is similar to a previous one. We have equal number of producer threads and consumer threads. Each producer thread deposits some number of positive integers into the buffer followed by an end-of-data mark EOD. Each consumer keeps retrieving data from the buffer until a EOD is read. Since all relevant programming details have been moved to a monitor, the main program and the producer and consumer threads look simpler.
Click here to download a copy of this file.
#include <stdio.h> #include <thread.h> #include "buf-m-2.h" #define MAX_ITEMS 20 #define MAX_THREADS 5 #define EOD -1 mutex_t ScreenLock; int Max_Run; /* max # of deposit/withdraw*/ void Fill(char x[], int n) { int i; for (i = 0; i < n*2; i++) x[i] = ' '; x[i] = '\0'; } void *Producer(void *voidPTR) { int *intPTR = (int *) voidPTR; int ID = *intPTR; int i, data; char filler[100]; Fill(filler, ID); mutex_lock(&ScreenLock); printf("%sProducer %d started ...\n", filler, ID); mutex_unlock(&ScreenLock); for (i = 0; i < Max_Run; i++) { /* for each iteration */ thr_yield(); /* rest for unspecified time*/ data = ID*10000 + i; /* generate a data item */ PUT(data); mutex_lock(&ScreenLock); /* lock the screen */ printf("%sProducer %d has added %d to the buffer\n", filler, ID, data);/* display a message */ mutex_unlock(&ScreenLock); /* release the screen */ } thr_yield(); /* rest for unspecified time*/ PUT(EOD); mutex_lock(&ScreenLock); /* lock the screen */ printf("%sProducer %d adds EOD and exits\n", filler, ID); mutex_unlock(&ScreenLock); /* release the screen */ thr_exit(0); } void *Consumer(void *voidPTR) { int *intPTR = (int *) voidPTR; int ID = *intPTR; int i, data; char filler[100]; Fill(filler, ID+10); mutex_lock(&ScreenLock); printf("%sConsumer %d started ...\n", filler, ID); mutex_unlock(&ScreenLock); do { /* iterate until EOD */ thr_yield(); /* rest for unspecified time*/ mutex_lock(&ScreenLock); /* lock the screen */ printf("%sConsumer %d is waiting for an item\n", filler, ID); /* display a message */ mutex_unlock(&ScreenLock); /* release the screen */ data = GET(); if (data != EOD) { mutex_lock(&ScreenLock); printf("%sConsumer %d has taken %d from the buffer\n", filler, ID, data); /* display data */ mutex_unlock(&ScreenLock);/* release the buffer */ } } while (data != EOD); /* do until EOD is seen */ mutex_lock(&ScreenLock); printf("%sConsumer %d receives EOD and exits\n", filler, ID); mutex_unlock(&ScreenLock); thr_exit(0); } void main(int argc, char *argv[]) { thread_t pID[MAX_THREADS]; /* producer ID */ thread_t cID[MAX_THREADS]; /* consumer ID */ size_t pStatus[MAX_THREADS]; /* producer status */ size_t cStatus[MAX_THREADS]; /* consumer status */ int pArg[MAX_THREADS]; /* producer argument */ int cArg[MAX_THREADS]; /* consumer argument */ int Threads; /* # of producers/consumers */ int i; if (argc != 3) { printf("Use %s #-of-iterations #-of-producers/consumers\n", argv[0]); exit(0); } Max_Run = abs(atoi(argv[1])); Threads = abs(atoi(argv[2])); if (Threads > MAX_THREADS) { printf("The no. of producers/consumers is too large. Reset to %d\n", MAX_THREADS); Threads = MAX_THREADS; } printf("Parent started ...\n"); mutex_init(&ScreenLock, USYNC_THREAD, (void *) NULL); BufferInit(); for (i = 0; i < Threads; i++) { /* start consumers first */ cArg[i] = i + 1; thr_create(NULL, 0, Consumer, (void *) &(cArg[i]), 0, (void *) &(cID[i])); } for (i = 0; i < Threads; i++) { /* followed by producers */ pArg[i] = i + 1; thr_create(NULL, 0, Producer, (void *) &(pArg[i]), 0, (void *) &(pID[i])); } for (i = 0; i < Threads; i++) /* wait producers/consumers */ thr_join(cID[i], 0, (void *) &(cStatus[i])); for (i = 0; i < Threads; i++) thr_join(pID[i], 0, (void *) &(pStatus[i])); printf("Parent exits ...\n"); }