Thread synchronization with Conditional Variables and Unique_Lock

Introduction

In a series of blogs up to this one, I have explained some basic concepts of process and threads in C++ and how to perform threads synchronization with mutex, and mutex wrappers such as guard_locks and unique_lock.

In this post, conditional_variable is explained. Conditional_variable, when used with unique_lock can block access to critical section and unblock when certain conditions are met.

Concepts

Conditional_variable is a class that can:

  1. acquire a mutex, usually unique_lock.
  2. block its own thread until a condition is met by function wait, wait_for, wait_until.
  3. if its current thread is not blocked, it can notify (wake up) other threads by function notify_one, notify_all.

Code

The following programs demonstrates the use of unique_lock and conditional_variables. The programs has a main thread and a worker thread:

  • Main thread does:
    • spawns the worker thread.
    • sends data to worker thread.
    • waits for worker thread to process data. Notification is presented as boolean value data=true.
    • after receiving notification from worker threads, wakes up, prints data, then exit program.
  • Worker thread does:
    • waits for data from main threads
    • process data and notify main thread. Notification is presented as boolean value processed=true.
    • terminate the thread.

The 2 global boolean values, default as false, are used by conditional_variables:

  • bool data: set to be true by main thread. Worker thread is blocked and waits until notified by main thread. Worker thread then checks data if it is true and wakes up.
  • bool processes: set to be true by worker threads. Main thread is blocked and waits until notified by main thread. Main thread then checks data if it is true and wakes up.
#include <iostream>
#include <condition_variable>
#include <mutex>
#include <thread>
#include <string>
 
std::mutex m;
std::condition_variable cv;
std::string datastream;
bool data = false;
bool processed = false;
 
void worker_thread_task()
{
    std::cout<<"Worker threadID: "<<std::this_thread::get_id()<<std::endl;
    // 1. Worker threads waits until main() sends data
    std::unique_lock<std::mutex> lk(m);
    cv.wait(lk, []{ return data==true; });
 
    // 2. after the notification from main, worker thread wakes up
    std::cout<<"    Worker thread is processing data"<<std::endl;
    datastream += " after processing 4,5,6";

    // 3. send data back to main()
    processed = true;
    std::cout<<"    Worker thread signals data processing completed"<<std::endl;
 
    // 4. manual unlocking is done before notifying, to avoid waking up
    // the waiting thread only to block again
    lk.unlock();
    // 5. notify main thread (waiting thread) to wake up.
    cv.notify_one();
}

void main_thread_sending_data(){
    
    // 1. unique_lock 
    std::unique_lock<std::mutex> lk(m);
    std::this_thread::sleep_for(std::chrono::seconds(2));
    std::cout<<"main() signals data ready for processing, threadID "<<std::this_thread::get_id()<<std::endl;
    data = true; 
    
    cv.notify_one();
    
    // 2. main thread is blocked and waits till condition processed==true
    // worker thread execute its task: worker_thread_task and send notification.
    cv.wait(lk, [](){
        return processed == true;
    });
}

void main_thread_wait_for_processed_data(){

    // 1. main thread is blocked and waits until worker thread sends notification
    std::unique_lock<std::mutex> lk(m);
    cv.wait(lk, []{ return processed==true; });
    // 2. after wake up, main thread exits this function, deletes unique_lock
    std::cout<<"Back in main(), threadID "<<std::this_thread::get_id()<<std::endl;
    std::cout<<"    data = "<<datastream<<std::endl;
}
 
int main()
{
    std::thread worker(worker_thread_task);
 
    datastream = "Example data 1,2,3...";
    // send data to the worker thread
    main_thread_sending_data();
    cv.notify_one();
 
    // wait for the worker
    main_thread_wait_for_processed_data();
    
    worker.join();
}

Output:

Worker threadID: 133442899719744
main() signals data ready for processing, threadID 133442899724096
    Worker thread is processing data
    Worker thread signals data processing completed
Back in main(), threadID 133442899724096
    data = Example data 1,2,3... after processing 4,5,6

References

Further reading: Concurrency Programming – Implementing Thread Pool with C++ and Kotlin – My sky (freewindcode.com)

Leave a Reply

Your email address will not be published. Required fields are marked *