Concurrency Programming – Implementing Thread Pool with C++ and Kotlin

Introduction

The main goal of this blog is to present different implementation of Thread Pools with C++ and Kotlin. Thread Pool is a way to organize and manage multi threads of a program. It must be explained first by differentiating the 2 concepts, Concurrency vs Parallelism. After that, we will know why there are reasons for Thread Pool. In addition, I attempt to explain Process and Thread. Finally, a Thread Pool is explained in detail.

Kotlin provides built-in Thread Pool, while C++ standard library doesn't, but it has other tools construct a primitive Thread Pool. The implementation by C++ is first explained so that Kotlin's conciseness could be appreciated.

Concepts

Concurrency vs Parallelism

  • Concurrency: executing many tasks in interleaving others, controlled manner. The tasks are handled by multiple threads/processes.
  • Parallelism: executing many tasks by multiple threads/processes that runs simultaneously/at the same time.

Process vs Threads

  • Process: a whole program running. 2 different processes cannot access data from the other unless using interprocess communication mechanism.
    • For C++, when a program runs, a virtual memory layout that includes stacks, heap, data segment etc are created as dedicated resources for the program. When a child process is "forked/created" from the parent process, the data from virtual memory layout of the parent process is copied to child's virtual memory layout.
    • For Java/Kotlin: a process is run an instance of Java Virtual Machine.
  • Thread: a segment of a process. Thus 2 different threads could access the same data, and sometime causes race condition if concurrency programming is not good to manage the threads. For most programming languages, a thread has 4 states: new, runnable, blocked, and terminated.
4 main states of a thread. Resource: LinkedIn Learning.

Thread Pools

  • According to Wikipedia, a thread pool is a design pattern that achieves concurrency execution of a program. A thread pool includes a number of reserved threads for executing task. "By maintaining a pool of threads, the model increases performance and avoids latency in execution due to frequent creation and destruction of threads for short-lived tasks".
    • Creating new threads is done during the creation of the pool so that to reduce overhead caused by creation/destroying individual threads.
    • The number of reserved threads could be dynamically managed, based on number of waiting tasks.
Source: Wikipedia
  • Some requirements of a thread pool design:
    • Thread Management
    • Task Queuing
    • Task Scheduling
    • Thread Lifecycle Management
    • Resource Limitations
    • Error Handling
    • Scalability

C++ Implementation

C++ doesn't use thread pools as part of the standard C++ library. C++ boost library however does.

  • The first program demonstrates creating 100 threads to run 100 tasks, identified by taskId.
  • In the second program, I attempt to use a combination of mutex and condition variables, to imitate a "primitive thread pools".
  • In the third program, I use the demo from LinkedIn Learning to show how C++ boost library could help achieving a Thread Pool.

Running tasks without thread pool

#include <iostream>
#include <thread>
#include <vector>

// Function that represents the running task
void runningTask(int taskId) {
    std::cout << "Thread ID: " << std::this_thread::get_id() << ", Task ID: " << taskId << std::endl;
}

int main() {
    const int numTasks = 100;
    std::vector<std::thread> threads;

    // Main thread creates 100 tasks, running by 100 threads
    for (int taskId = 0; taskId < numTasks; ++taskId) {
        threads.emplace_back(runningTask, taskId);
    }

    // Main thread waits for each thread to finish
    for (auto& thread : threads) {
        thread.join();
    }

        return 0;
}

"Primitive thread pool" with mutex and conditional variables

I call the program running Primitive thread pool for a reason: it is not a good thread pool, and is only used to demonstrate the setback, and complexity of designing your thread pool.

The following programs creates 100 tasks and uses only 3 threads to run the tasks.

  • a queue to adds the 100 tasks, one by one. It pops the task each time a thread finish runningTask.
  • worker represent the actions that a single thread does:
    • acquiring the lock on line 38.
    • waiting for notification and checking the condition that: task is available or all task is done. When notify_one or notify_all is invoked by another thread, the sleeping thread wakes up, lock is open. (line 41)
    • retrieving the task from task queue.
    • unlocking the lock on the line 38.
    • performing the task by invoking runningTask.
  • runningTask represents the execution of a task. Time delay is added to simulate the time taken to work the task.
    • each task aquires the lock and run the task, increment the taskCount. Therefore, we have 3 threads executing runningTask not at the same time!
    • once the task is finish, the thread will notify other waiting threads to check on the condition_variable and conditions on line 41.
    • if conditions on line 41 satisfy, condition_variable wakes the sleep thread and opens the lock on line 38. The thread then runs below code lines.
    • if taskQueue is empty and all task have been done, the thread termitates by breaking out of function worker.

The following diagram illustrates the work flow of the programs. It is worth noting that the thread pool uses 3 threads to run the task running one after another. They still have some overlap upto when a lock is created. Consequently, it is almost the same as having a single thread running the task one after another. This program could still be modified to have the threads running in overlapping, concurrent manner. But I leave it at that for the sake of demonstration a good thread pool vs a bad thread pool.

#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <deque>
#include <vector>

std::mutex mtx;
std::condition_variable cv;
std::deque<int> taskQueue;
int taskCount = 0;

const int numTasks = 100;
const int maxThreads = 3;

void task(int taskId) {
    std::this_thread::sleep_for(std::chrono::seconds(2)); 

    {
        std::lock_guard<std::mutex> lock(mtx);
        std::cout << "Thread ID: " << std::this_thread::get_id() << " Task ID: " << taskId << std::endl;
        taskCount++;
    }   // closing braces, lock is released
    
    // Notify the waiting thread to check for conditions on line 38
    cv.notify_one(); 
}

/* worker retrieves the task (identified by taskId from taskQueue and runs tasks continuously
 * each worker runs on a single thread, perform one task after another.
 * 1. When retrieveing that task from taskQueue, condition_variable locks access to taskQueue
 * 2. When running the task, there is no lock. 
 * 3. After finishing the task, the worker goes back to retrieve another task. 
 */
void worker() {
    while (true) {
        // Mutex locks the access to thread queue when worker retrieves task. 
        std::unique_lock<std::mutex> lock(mtx);
        // condition_variable opens the lock if return condition is true. 
        // meaning waiting threads can now access taskQueue and perfrom task.
        cv.wait(lock, [&]() { return !taskQueue.empty() || taskCount==numTasks ; });

        if (taskQueue.empty() && taskCount ==numTasks){
            // when no task is available on taskQueue or task 
            cv.notify_all();
            break;
        }
        
        // 1. retrieving task from taskID
        int taskId = taskQueue.front();
        taskQueue.pop_front();
        // after retrieving the task, lock is open to allow access to taskQueue
        lock.unlock();
        // 2. running task
        task(taskId);
        // 3. loop back to retrieve another task until task queue is empty. 
    }
}

int main() {
    

    std::vector<std::thread> threads;
    for (int thread = 0; thread < maxThreads; thread++) {
        // create thread object and push to vector. 
        // Thread will run wroker function rightaway.
        threads.emplace_back(worker);
    }

    // main threads continously pushes tasks to queue
    for (int task = 0; task < numTasks; task++) {
        std::lock_guard<std::mutex> lock(mtx);
        taskQueue.push_back(task);
        // Notify the waiting thread to check for conditions on line 38
        cv.notify_one();
    }
    
    // main thread waits for all 3 child threads to finish
    for (auto& thread : threads) {
        thread.join();
    }

    std::cout << "All tasks completed!" << std::endl;

    return 0;
}

C++ boost library

The following program runs 100 tasks, identified by taskId. It uses boost library to create a thread pool of only 4 threads, which will consecutively perform the tasks.

Requirements to build this program:

  • include boost library header file boost/asio.hpp
  • build configuration such as makefile must include boost library such as LDLIBS = -lboost_s
  • the thread pool will automatically cycle the threads and does it best to execute the task concurrently, simultaneously.
#include <boost/asio.hpp>
#include <iostream>
using namespace std;

void runningTask (int taskId) {
    cout<<"Running taskId="<<taskId<<"with threadId="<<this_thread::get_id()<<endl; 
}

int main() {
    boost::asio::thread_pool pool(4);

    for (int taskId=0; taskId<100; taskId++){
        boost:asio::post(pool, [i](){ runningTask(taskId); });
    }
    pool.join();
}

Kotlin implementation

Kotlin provides Thread Pool implementation, and thus significantly reduces the developer's work complexity.

In order to complete the program, we need:

  • import java.util.concurrent.Executors
  • a threadPool instance, created by Executors.newFixedThreadPool(3)
  • a instance of the task to be run. Here, I created a class Task that accepts taskId.
  • the task class inherits Runnable interface, which is borrowed from java and represents a task that can be executed concurrently on a thread. It is defined in the java.lang package.

The program will print out the main threads, and threads from the pools, together with taskId. We see that the program generates only 3 threads to run 100 tasks. The threads

import java.util.concurrent.Executors

// Create a class to run task on parallel threads.
class Task (val taskId : Int) : Runnable {
    override fun run(){
        println("Thread=${Thread.currentThread().id} TaskId=${taskId}")
        println(Thread.currentThread().name)
    }
}

// Create a thread pool with a fixed number of threads
val threadPool = Executors.newFixedThreadPool(3)

fun main() {
    // print out main thread name
    println(Thread.currentThread().name)

    // Submit tasks to the thread pool 
    // and run each task on a thread from the thread pool
    for (i in 0..100) {
        threadPool.submit (Task(i))
    }

    // Shutdown the thread pool after all tasks are submitted
    threadPool.shutdown()
}

Leave a Reply

Your email address will not be published. Required fields are marked *