Producer-Consumer Problem

aka Bounded Buffer Problem

  • Producer: Generates resources
  • Consumer: Uses up resources
  • Buffers: Fixed-size, used to hold resources between production and consumption

Problem is because producer and consumer can execute at different rates:

  • No serialization of one behind the other
  • There can be multiple producers and multiple consumers
  • Tasks are independent
  • The buffer allows each to run without explicit handoff

Synchronization allows us to ensure that concurrent producers and consumers access the buffer in a correct way.

Theory

Semaphores are a synchronization variable that takes on non-negative integer values, and supports two operations:

  • wait(): An atomic operation that waits for the semaphore to become greater than 0, then decrements it by 1
  • signal(): An atomic operation that increments the semaphore by 1
  • Initialize the semaphore to some value
  • Cannot read the semaphoreโ€™s value directly

Spinning

wait(s) {
	while (s <= 0);
	s--;
}
 
signal(s) {
	s++;
}

Blocking

wait(s) {
	if (s <= 0)
		sleep();
	s--;
}
 
signal(s) {
	if (queued thread)
		wakeup();
	s++;
}

Blocking Semaphores

Each semaphore is associated with a queue of waiting thread.

wait():

  • If a semaphore is open (positive), thread continues
  • If a semaphore is closed (non-positive), thread blocks on queue signal() opens the semaphore:
  • If a thread is waiting on the queue, the thread is unblocked
  • If no threads are waiting on the queue, the signal is remembered for the next thread
  • Has โ€œhistoryโ€, basically a counter to track surplus signals (i.e. the number of available permits/units of resources to be consumed by future wait() calls - see Implementation)

Types

Binary Semaphore

  • Represents access to a single resource
  • Guarantees mutual exclusion to a critical section
  • Behaves like a lock/mutex

Counting Semaphore

  • Represents a resource with many units available
  • Multiple threads can pass the semaphore at once
  • Number of threads determined by the semaphore โ€œcountโ€

Binary has count , Counting has count =

Semaphores vs. Locks

Semaphores have a value, enabling more semantics

  • When at most one, can be used for mutual exclusion (only 1 thread in a critical section)
  • When > 1, can allow multiple threads to access resources

Essentially, locks only provide mutual exclusion while semaphores can provide mutex and coordination, have a counting value, and can remember past signals (unlike ๐Ÿ”ฎ Condition Variables).

Use Cases:

  • Mutual exclusion - Only 1 thread accessing a resource at a time
  • Event sequencing / thread coordination - Permit threads to wait for certain things to happen

Producer-Consumer with Semaphores

  • signal(s) increments s
    • s value is how many items have been produced
  • wait(s) will return without waiting only if s > 0

Constraints:

  • Consumer must wait for the producer to produce items
  • Producer must wait for the consumer to empty spaces
  • Only one thread can manipulate the buffer at once

We use a semaphore for first two constraints (full_count and empty_count), and a lock/semaphore for the third.

We call wait(empty) before producing, wait(mutex) to enter the critical section, signal(mutex) after exiting the critical section, and signal(full) to notify consumers.

Readers-Writers Problem

An object is shared among several threads. Some threads only read the object, others only write it. We can allow multiple readers, but only one writer.

Using semaphores gives to constraints:

  • Writers can only proceed if there no readers or writers
  • Readers can only proceed if there are no writers
int read_count = 0;
semaphore mutex = 1;
semaphore block_write = 1;
 
write() {
	wait(block_write); // wait until no readers or writers
	
	// do the writing
	
	signal(block_write);
}
 
read() {
	wait(mutex);
	read_count++;
	if (read_count == 1)
		wait(block_write); // wait until no writers
	signal(mutex);
	
	// do the reading
	
	wait(mutex);
	read_count--;
	if (read_count == 0)
		signal(block_write); // let a writer run
	signal(mutex);
}

Implementation

struct semaphore {
    int   count = 1;
    bool  guard = False;
    queue Q;
};
 
void wait(semaphore *s) {
    disable_interrupts();
    // acquire guard
    while (test_and_set(&s->guard))
        ;
 
    if (s->count <= 0) {
        // no resources โ€“ block
        enqueue(&s->Q, current_thread());
        s->guard = False;
        block_current_thread();
    }
 
    // consume one unit
    s->count--;
    // release guard
    s->guard = False;
    enable_interrupts();
}
 
void signal(semaphore *s) {
    disable_interrupts();
    // acquire guard
    while (test_and_set(&s->guard))
        ;
 
    if (is_empty(s->Q)) {
        // nobody waiting: increment count
        s->count++;
    } else {
        // wake one waiter
        thread_t *t = dequeue(&s->Q);
        move_to_ready_queue(t);
    }
 
    // release guard
    s->guard = False;
    enable_interrupts();
}

Test-and-Set

An atomic hardware instruction that:

  1. Reads the current value of a lock variable
  2. Sets it to โ€˜lockedโ€™ (true)
  3. Returns the previous value