Chapter 07 Parallelism and Concurrency

    7.2 Mutex and Critical Section

    We have already learned the basics of concurrency technology in the operating system, or in the database, and mutex is one of the cores. C++11 introduces a class related to mutex, with all related functions in the <mutex> header file.

    std::mutex is the most basic mutex class in C++11, and you can create a mutex by instantiating std::mutex. It can be locked by its member function lock(), and unlock() can be unlocked. But in the process of actually writing the code, it is best not to directly call the member function, Because calling member functions, you need to call unlock() at the exit of each critical section, and of course, exceptions. At this time, C++11 also provides a template class std::lock_gurad for the RAII syntax for the mutex.

    RAII guarantees the exceptional security of the code while losing the simplicity of the code.

    1. #include <iostream>
    2. #include <thread>
    3. int v = 1;
    4. void critical_section(int change_v) {
    5. static std::mutex mtx;
    6. std::lock_guard<std::mutex> lock(mtx);
    7. // execute contention works
    8. v = change_v;
    9. // mtx will be released after leaving the scope
    10. }
    11. int main() {
    12. std::thread t1(critical_section, 2), t2(critical_section, 3);
    13. t1.join();
    14. t2.join();
    15. std::cout << v << std::endl;
    16. return 0;
    17. }

    Because C++ guarantees that all stack objects will be destroyed at the end of the declaration period, such code is also extremely safe. Whether critical_section() returns normally or if an exception is thrown in the middle, a stack rollback is thrown, and unlock() is automatically called.

    And std::unique_lock is more flexible than std::lock_guard, std::unique_lock is more flexible. Objects of std::unique_lock manage the locking and unlocking operations on the mutex object with exclusive ownership (no other unique_lock objects owning the ownership of a mutex object). So in concurrent programming, it is recommended to use std::unique_lock.

    std::lock_guard cannot explicitly call lock and unlock, and std::unique_lock can be called anywhere after the declaration. It can reduce the scope of the lock and provide higher concurrency.

    If you use the condition variable std::condition_variable::wait you must use std::unique_lock as a parameter.

    For instance:

    1. #include <iostream>
    2. #include <thread>
    3. int v = 1;
    4. void critical_section(int change_v) {
    5. static std::mutex mtx;
    6. std::unique_lock<std::mutex> lock(mtx);
    7. // do contention operations
    8. v = change_v;
    9. std::cout << v << std::endl;
    10. // release the lock
    11. lock.unlock();
    12. // during this period,
    13. // others are allowed to acquire v
    14. // start another group of contention operations
    15. // lock again
    16. lock.lock();
    17. v += 1;
    18. std::cout << v << std::endl;
    19. }
    20. int main() {
    21. std::thread t1(critical_section, 2), t2(critical_section, 3);
    22. t1.join();
    23. t2.join();
    24. return 0;
    25. }

    7.3 Future

    The Future is represented by std::future, which provides a way to access the results of asynchronous operations. This sentence is very difficult to understand. In order to understand this feature, we need to understand the multi-threaded behavior before C++11.

    Imagine if our main thread A wants to open a new thread B to perform some of our expected tasks and return me a result. At this time, thread A may be busy with other things, and have no time to take into account the results of B. So we naturally hope to get the result of thread B at a certain time.

    Before the introduction of std::future in C++11, the usual practice is: Create a thread A, start task B in thread A, send an event when it is ready, and save the result in a global variable. The main function thread A is doing other things. When the result is needed, a thread is called to wait for the function to get the result of the execution.

    The std::future provided by C++11 simplifies this process and can be used to get the results of asynchronous tasks. Naturally, we can easily imagine it as a simple means of thread synchronization, namely the barrier.

    To see an example, we use extra std::packaged_task, which can be used to wrap any target that can be called for asynchronous calls. For example:

    1. #include <iostream>
    2. #include <thread>
    3. #include <future>
    4. int main() {
    5. // pack a lambda expression that returns 7 into a std::packaged_task
    6. std::packaged_task<int()> task([](){return 7;});
    7. // get the future of task
    8. std::future<int> result = task.get_future(); // run task in a thread
    9. std::thread(std::move(task)).detach();
    10. std::cout << "waiting...";
    11. result.wait(); // block until future has arrived
    12. // output result
    13. std::cout << "done!" << std:: endl << "future result is " << result.get() << std::endl;
    14. return 0;
    15. }

    After encapsulating the target to be called, you can use get_future() to get a std::future object to implement thread synchronization later.

    The condition variable std::condition_variable was born to solve the deadlock and was introduced when the mutex operation was not enough. For example, a thread may need to wait for a condition to be true to continue execution. A dead wait loop can cause all other threads to fail to enter the critical section so that when the condition is true, a deadlock occurs. Therefore, the condition_variable instance is created primarily to wake up the waiting thread and avoid deadlocks. notd_one() of std::condition_variable is used to wake up a thread; notify_all() is to notify all threads. Below is an example of a producer and consumer model:

    1. #include <queue>
    2. #include <chrono>
    3. #include <mutex>
    4. #include <thread>
    5. #include <iostream>
    6. #include <condition_variable>
    7. int main() {
    8. std::queue<int> produced_nums;
    9. std::mutex mtx;
    10. std::condition_variable cv;
    11. bool notified = false; // notification sign
    12. auto producer = [&]() {
    13. for (int i = 0; ; i++) {
    14. std::this_thread::sleep_for(std::chrono::milliseconds(500));
    15. std::unique_lock<std::mutex> lock(mtx);
    16. std::cout << "producing " << i << std::endl;
    17. produced_nums.push(i);
    18. notified = true;
    19. };
    20. auto consumer = [&]() {
    21. while (true) {
    22. std::unique_lock<std::mutex> lock(mtx);
    23. while (!notified) { // avoid spurious wakeup
    24. cv.wait(lock);
    25. }
    26. // temporal unlock to allow producer produces more rather than
    27. // let consumer hold the lock until its consumed.
    28. lock.unlock();
    29. std::this_thread::sleep_for(std::chrono::milliseconds(1000)); // consumer is slower
    30. lock.lock();
    31. if (!produced_nums.empty()) {
    32. std::cout << "consuming " << produced_nums.front() << std::endl;
    33. produced_nums.pop();
    34. }
    35. notified = false;
    36. }
    37. };
    38. std::thread p(producer);
    39. std::thread cs[2];
    40. for (int i = 0; i < 2; ++i) {
    41. cs[i] = std::thread(consumer);
    42. }
    43. p.join();
    44. for (int i = 0; i < 2; ++i) {
    45. cs[i].join();
    46. }
    47. return 0;
    48. }

    7.5 Atomic Operation and Memory Model

    Careful readers may be tempted by the fact that the example of the producer consumer model in the previous section may have compiler optimizations that cause program errors. For example, the boolean notified is not modified by volatile, and the compiler may have optimizations for this variable, such as the value of a register. As a result, the consumer thread can never observe the change of this value. This is a good question. To explain this problem, we need to further discuss the concept of memory model introduced from C++11. Let’s first look at a question. What is the output of the following code?

    1. #include <thread>
    2. #include <iostream>
    3. int main() {
    4. int a = 0;
    5. volatile int flag = 0;
    6. std::thread t1([&]() {
    7. while (flag != 1);
    8. int b = a;
    9. std::cout << "b = " << b << std::endl;
    10. });
    11. std::thread t2([&]() {
    12. a = 5;
    13. flag = 1;
    14. });
    15. t1.join();
    16. t2.join();
    17. return 0;
    18. }

    Intuitively, ʻa = 5;int2seems to always execute beforeflag = 1;, andwhile (flag != 1)int1seems to guaranteestd ::cout << “b = “ << b << std::endl;will not be executed before the mark is changed. Logically, it seems that the value ofbshould be equal to 5. But the actual situation is much more complicated than this, or the code itself is undefined behavior, because foraandflag, they are read and written in two parallel threads. There has been competition. In addition, even if we ignore competing reading and writing, it is still possible to receive out-of-order execution of the CPU, and the impact of the compiler on the rearrangement of instructions. Causea = 5to occur afterflag = 1. Thusb` may output 0.

    std::mutex can solve the problem of concurrent read and write, but the mutex is an operating system level function. This is because the implementation of a mutex usually contains two basic principles:

    1. Provide automatic state transition between threads, that is, “lock” state
    2. Ensure that the memory of the manipulated variable is isolated from the critical section during the mutex operation

    This is a very strong set of synchronization conditions, in other words, when it is finally compiled into a CPU instruction, it will behave as a lot of instructions (we will look at how to implement a simple mutex later). This seems too harsh for a variable that requires only atomic operations (no intermediate state).

    The research on synchronization conditions has a very long history, and we will not go into details here. Readers should understand that under the modern CPU architecture, atomic operations at the CPU instruction level are provided. Therefore, in the C + + 11 multi-threaded shared variable reading and writing, the introduction of the std::atomic template, so that we instantiate an atomic type, will be a Atomic type read and write operations are minimized from a set of instructions to a single CPU instruction. E.g:

    And provides basic numeric member functions for atomic types of integers or floating-point numbers, for example, Including fetch_add, fetch_sub, etc., and the corresponding +, - version is provided by overload. For example, the following example:

    1. #include <atomic>
    2. #include <thread>
    3. #include <iostream>
    4. std::atomic<int> count = {0};
    5. int main() {
    6. std::thread t1([](){
    7. count.fetch_add(1);
    8. });
    9. std::thread t2([](){
    10. count++; // identical to fetch_add
    11. count += 1; // identical to fetch_add
    12. });
    13. t1.join();
    14. t2.join();
    15. std::cout << count << std::endl;
    16. return 0;
    17. }

    Of course, not all types provide atomic operations because the feasibility of atomic operations depends on the architecture of the CPU and whether the type structure being instantiated satisfies the memory alignment requirements of the architecture, so we can always pass Std::atomic::is_lock_free` to check if the atom type needs to support atomic operations, for example:

    1. #include <atomic>
    2. #include <iostream>
    3. struct A {
    4. float x;
    5. int y;
    6. long long z;
    7. };
    8. std::atomic<A> a;
    9. std::cout << std::boolalpha << a.is_lock_free() << std::endl;
    10. return 0;
    11. }

    Multiple threads executing in parallel, discussed at some macro level, can be roughly considered a distributed system. In a distributed system, any communication or even local operation takes a certain amount of time, and even unreliable communication occurs.

    If we force the operation of a variable v between multiple threads to be atomic, that is, any thread after the operation of v Other threads can synchronize to perceive changes in v, for the variable v, which appears as a sequential execution of the program, it does not have any efficiency gains due to the introduction of multithreading. Is there any way to accelerate this properly? The answer is to weaken the synchronization conditions between processes in atomic operations.

    In principle, each thread can correspond to a cluster node, and communication between threads is almost equivalent to communication between cluster nodes. Weakening the synchronization conditions between processes, usually we will consider four different consistency models:

    1. Linear consistency: Also known as strong consistency or atomic consistency. It requires that any read operation can read the most recent write of a certain data, and the order of operation of all threads is consistent with the order under the global clock.

      1. x.store(1) x.load()
      2. T1 ---------+----------------+------>
      3. T2 -------------------+------------->
      4. x.store(2)

      In this case, thread T1, T2 is twice atomic to x, and x.store(1) is strictly before x.store(2). x.store(2) strictly occurs before x.load(). It is worth mentioning that linear consistency requirements for global clocks are difficult to achieve, which is why people continue to study other consistent algorithms under this weaker consistency.

    2. Sequential consistency: It is also required that any read operation can read the last data written by the data, but it is not required to be consistent with the order of the global clock.

      1. x.store(1) x.store(3) x.load()
      2. T1 ---------+-----------+----------+----->
      3. T2 ---------------+---------------------->
      4. x.store(2)
      5. or
      6. T1 ---------+-----------+----------+----->
      7. x.store(2)

      Under the order consistency requirement, x.load() must read the last written data, so x.store(2) and x.store(1) do not have any guarantees, ie As long as `x.store(2) of T2 occurs before x.store(3).

    3. Causal consistency: its requirements are further reduced, only the sequence of causal operations is guaranteed, and the order of non-causal operations is not required.

      1. a = 1 b = 2
      2. T1 ----+-----------+---------------------------->
      3. T2 ------+--------------------+--------+-------->
      4. x.store(3) c = a + b y.load()
      5. or
      6. a = 1 b = 2
      7. T1 ----+-----------+---------------------------->
      8. T2 ------+--------------------+--------+-------->
      9. x.store(3) y.load() c = a + b
      10. or
      11. b = 2 a = 1
      12. T1 ----+-----------+---------------------------->
      13. T2 ------+--------------------+--------+-------->
      14. y.load() c = a + b x.store(3)
    4. Final Consistency: It is the weakest consistency requirement. It only guarantees that an operation will be observed at a certain point in the future, but does not require the observed time. So we can even strengthen this condition a bit, for example, to specify that the time observed for an operation is always bounded. Of course this is no longer within our discussion.

      ```

      T1 ——+—————-+——————————————————————>

    1. T2 ---------+------------+--------------------+--------+-------->
    2. x.read() x.read() x.read() x.read()
    3. ```
    4. In the above case, if we assume that the initial value of x is 0, then the four times ``x.read()` in `T2` may be but not limited to the following:
    5. ```
    6. 3 4 4 4 // The write operation of x was quickly observed
    7. 0 3 3 4 // There is a delay in the observed time of the x write operation
    8. 0 0 0 4 // The last read read the final value of x, but the previous changes were not observed.
    9. 0 0 0 0 // The write operation of x is not observed in the current time period, but the situation that x is 4 can be observed at some point in the future.
    10. ```

    In order to achieve the ultimate performance and achieve consistency of various strength requirements, C++11 defines six different memory sequences for atomic operations. The option std::memory_order expresses four synchronization models between multiple threads:

    1. Relaxed model: Under this model, atomic operations within a single thread are executed sequentially, and instruction reordering is not allowed, but the order of atomic operations between different threads is arbitrary. The type is specified by std::memory_order_relaxed. Let’s look at an example:

      1. std::atomic<int> counter = {0};
      2. std::vector<std::thread> vt;
      3. for (int i = 0; i < 100; ++i) {
      4. vt.emplace_back([](){
      5. counter.fetch_add(1, std::memory_order_relaxed);
      6. });
      7. }
      8. for (auto& t : vt) {
      9. t.join();
      10. }
      11. std::cout << "current counter:" << counter << std::endl;
    1. Release/consumption model: In this model, we begin to limit the order of operations between processes. If a thread needs to modify a value, but another thread will have a dependency on that operation of the value, that is, the latter depends. former. Specifically, thread A has completed three writes to x, and thread B relies only on the third x write operation, regardless of the first two write behaviors of x, then A When active x.release() (ie using std::memory_order_release), the option std::memory_order_consume ensures that B observes A when calling x.load() Three writes to x. Let’s look at an example:

      1. std::atomic<int*> ptr;
      2. int v;
      3. std::thread producer([&]() {
      4. int* p = new int(42);
      5. v = 1024;
      6. ptr.store(p, std::memory_order_release);
      7. });
      8. std::thread consumer([&]() {
      9. int* p;
      10. while(!(p = ptr.load(std::memory_order_consume)));
      11. std::cout << "p: " << *p << std::endl;
      12. std::cout << "v: " << v << std::endl;
      13. });
      14. producer.join();
      15. consumer.join();
    2. Release/Acquire model: Under this model, we can further tighten the order of atomic operations between different threads, specifying the timing between releasing std::memory_order_release and getting std::memory_order_acquire. All write operations before the release operation are visible to any other thread, ie, happens-before.

      As you can see, std::memory_order_release ensures that the write behavior after it does not occur before the release operation, is a backward barrier, and std::memory_order_acquire ensures the previous write behavior after it, no It will happen after the get operation, it is a forward barrier. For the option std::memory_order_acq_rel, it combines the characteristics of the two, and only determines a memory barrier, so that the current thread reads and writes to the memory. Will not be rearranged before and after this operation.

      Let’s check an example:

      1. std::vector<int> v;
      2. std::atomic<int> flag = {0};
      3. std::thread release([&]() {
      4. v.push_back(42);
      5. flag.store(1, std::memory_order_release);
      6. });
      7. std::thread acqrel([&]() {
      8. int expected = 1; // must before compare_exchange_strong
      9. while(!flag.compare_exchange_strong(expected, 2, std::memory_order_acq_rel)) {
      10. expected = 1; // must after compare_exchange_strong
      11. }
      12. // flag has changed to 2
      13. });
      14. std::thread acquire([&]() {
      15. while(flag.load(std::memory_order_acquire) < 2);
      16. std::cout << v.at(0) << std::endl; // must be 42
      17. });
      18. release.join();
      19. acqrel.join();
      20. acquire.join();

      In this case we used compare_exchange_strong, which is the Compare-and-swap primitive, which has a weaker version, compare_exchange_weak, which allows a failure to be returned even if the exchange is successful. The reason is due to a false failure on some platforms, specifically, when the CPU performs a context switch, another thread loads the same address to produce an inconsistency. In addition, the performance of compare_exchange_strong may be slightly worse than compare_exchange_weak, but in most cases, compare_exchange_strong should be limited.

    3. Sequential Consistent Model: Under this model, atomic operations satisfy sequence consistency, which in turn can cause performance loss. It can be specified explicitly by std::memory_order_seq_cst. Let’s look a final example:

      1. std::atomic<int> counter = {0};
      2. std::vector<std::thread> vt;
      3. for (int i = 0; i < 100; ++i) {
      4. vt.emplace_back([](){
      5. counter.fetch_add(1, std::memory_order_seq_cst);
      6. });
      7. }
      8. for (auto& t : vt) {
      9. t.join();
      10. }
      11. std::cout << "current counter:" << counter << std::endl;

      This example is essentially the same as the first loose model example. Just change the memory order of the atomic operation to memory_order_seq_cst. Interested readers can write their own programs to measure the performance difference caused by these two different memory sequences.

    Conclusion

    The C++11 language layer provides support for concurrent programming. This section briefly introduces std::thread/std::mutex/std::future, an important tool that can’t be avoided in concurrent programming. In addition, we also introduced the “memory model” as one of the most important features of C++11. They provide an critical foundation for standardized high performance computing for C++.

    1. Write a simple thread pool that provides the following features:

    2. Use to implement a mutex.

    Further Readings

    • Thread document
    • Herlihy, M. P., & Wing, J. M. (1990). Linearizability: a correctness condition for concurrent objects. ACM Transactions on Programming Languages and Systems, 12(3), 463–492.

    Licenses


    This work was written by and licensed under a Creative Commons Attribution-NonCommercial-NoDerivatives 4.0 International License. The code of this repository is open sourced under the .`