Introduction
In a series of blogs up to this one, I have explained some basic concepts of process and threads in C++ and how to perform threads synchronization with mutex
, and mutex wrappers such as guard_locks
and unique_lock
.
- this blog explains basic of process and threads. Thread and Process in C++ β My sky (freewindcode.com). The highlighted properties of threads is that they could access one another data directly. Without synchronization tool, it is unknown which thread can utilize CPU resources first.
- this blog explains the uses of mutext, guard_locks, and unique_lock. Thread synchronization with Mutex, Lock_guard, Unique_lock β My sky (freewindcode.com) We know that a critical section, the codes and data that all threads can access can be blocked using appropriate tools. We can achieve concurrency by using the mutex or mutex wrappers such that the threads can alternatively execute/access the critical section.
In this post, conditional_variable
is explained. Conditional_variable, when used with unique_lock can block access to critical section and unblock when certain conditions are met.
Concepts
Conditional_variable
is a class that can:
- acquire a mutex, usually
unique_lock
. - block its own thread until a condition is met by function
wait, wait_for, wait_until
. - if its current thread is not blocked, it can notify (wake up) other threads by function
notify_one
,notify_all
.
Code
The following programs demonstrates the use of unique_lock and conditional_variables. The programs has a main thread and a worker thread:
- Main thread does:
- spawns the worker thread.
- sends data to worker thread.
- waits for worker thread to process data. Notification is presented as boolean value
data=true
. - after receiving notification from worker threads, wakes up, prints data, then exit program.
- Worker thread does:
- waits for data from main threads
- process data and notify main thread. Notification is presented as boolean value
processed=true
. - terminate the thread.
The 2 global boolean values, default as false, are used by conditional_variables:
bool data
: set to be true by main thread. Worker thread is blocked and waits until notified by main thread. Worker thread then checksdata
if it is true and wakes up.- bool processes: set to be true by worker threads. Main thread is blocked and waits until notified by main thread. Main thread then checks
data
if it is true and wakes up.
#include <iostream>
#include <condition_variable>
#include <mutex>
#include <thread>
#include <string>
std::mutex m;
std::condition_variable cv;
std::string datastream;
bool data = false;
bool processed = false;
void worker_thread_task()
{
std::cout<<"Worker threadID: "<<std::this_thread::get_id()<<std::endl;
// 1. Worker threads waits until main() sends data
std::unique_lock<std::mutex> lk(m);
cv.wait(lk, []{ return data==true; });
// 2. after the notification from main, worker thread wakes up
std::cout<<" Worker thread is processing data"<<std::endl;
datastream += " after processing 4,5,6";
// 3. send data back to main()
processed = true;
std::cout<<" Worker thread signals data processing completed"<<std::endl;
// 4. manual unlocking is done before notifying, to avoid waking up
// the waiting thread only to block again
lk.unlock();
// 5. notify main thread (waiting thread) to wake up.
cv.notify_one();
}
void main_thread_sending_data(){
// 1. unique_lock
std::unique_lock<std::mutex> lk(m);
std::this_thread::sleep_for(std::chrono::seconds(2));
std::cout<<"main() signals data ready for processing, threadID "<<std::this_thread::get_id()<<std::endl;
data = true;
cv.notify_one();
// 2. main thread is blocked and waits till condition processed==true
// worker thread execute its task: worker_thread_task and send notification.
cv.wait(lk, [](){
return processed == true;
});
}
void main_thread_wait_for_processed_data(){
// 1. main thread is blocked and waits until worker thread sends notification
std::unique_lock<std::mutex> lk(m);
cv.wait(lk, []{ return processed==true; });
// 2. after wake up, main thread exits this function, deletes unique_lock
std::cout<<"Back in main(), threadID "<<std::this_thread::get_id()<<std::endl;
std::cout<<" data = "<<datastream<<std::endl;
}
int main()
{
std::thread worker(worker_thread_task);
datastream = "Example data 1,2,3...";
// send data to the worker thread
main_thread_sending_data();
cv.notify_one();
// wait for the worker
main_thread_wait_for_processed_data();
worker.join();
}
Output:
Worker threadID: 133442899719744
main() signals data ready for processing, threadID 133442899724096
Worker thread is processing data
Worker thread signals data processing completed
Back in main(), threadID 133442899724096
data = Example data 1,2,3... after processing 4,5,6
References
Further reading: Concurrency Programming β Implementing Thread Pool with C++ and Kotlin β My sky (freewindcode.com)