内容纲要

多线程学习笔记

1. 了解进程线程的基本概念,能用一种语言在一个平台上实现一个多线程的例子。

进程相互独立,一个进程可以包含多个线程,线程是进程中用于并发执行的基本单位.

#include <iostream>
#include <thread>
#include <vector>
#include <mutex>
using namespace::std;
void work(int i){
    mtx.lock();
    cout<<"doing: "<<i<<" thr id:"<<this_thread::get_id()<<endl;
    mtx.unlock();
    return;
}
int main(int argc, const char * argv[]) {
    vector<thread> th;
    for(int i=0;i<8;i++){
        th.push_back(thread(work,i));
    }
    for(int i=0;i<8;i++){
        th[i].join();
    }
    cout<<"main thread"<<cnt<<endl;
    return 0;
}

2. 了解为什么要用Mutex之类的工具做锁来同步和保护资源。弄懂诸如racing condition,死锁之类的概念。50%公司的见面题,用来砍死大无畏。

(1)在并发编程中,多个线程可以同时访问共享的资源,这可能会导致一些问题,例如数据竞争、死锁和饥饿等。为了避免这些问题,需要使用锁来同步和保护资源,确保每个线程在访问共享资源时的顺序和正确性。

Mutex(互斥锁)是一种常用的锁机制,它通过在代码中插入加锁和解锁操作来确保同一时间只有一个线程能够访问共享资源。当一个线程获得了锁之后,其他线程必须等待该线程释放锁之后才能访问共享资源。这种机制可以防止多个线程同时访问共享资源导致数据竞争问题。

(2)条件竞争指多个线程或者进程在读写一个共享数据时结果依赖于它们执行的相对时间的情形。(同个资源)

数据竞争是指在多线程并发访问共享数据时,至少有一个线程对共享数据进行了写操作,而且没有使用同步机制来保护共享数据的访问,从而导致程序的输出结果不可预期的现象。

死锁是指在多个线程并发执行时,由于彼此相互等待对方释放锁资源而陷入无限等待的状态,导致程序无法继续执行的现象。如 因为出错没有释放、ABBA锁等等

3. 了解编译器优化带来的影响,了解cache的影响,了解volatile,memory barrier之类的概念。

(1)编译器优化:编译器可能会在循环中消除重复的计算,但如果这些计算对于程序的正确性非常重要,则可能会导致程序出错。

(2)cache的影响:在多线程编程中,如果多个线程同时访问同一份数据,并且这份数据被存储在Cache中,那么每个线程可能会看到不同的数据,导致程序输出不正确。

(3)volattile : Volatile是一种关键字,用于告诉编译器不要对变量进行优化。在多线程编程中,Volatile关键字还可以用于保证变量的可见性和原子性。Volatile可以保证每次访问变量都是从内存中读取最新的值,而不是从Cache中读取旧的值。

(4)内存屏障:是一种机制,用于保证对共享内存的访问顺序。在多线程编程中,内存屏障可以用于保证对共享内存的读写操作顺序的正确性,从而避免数据竞争和死锁等问题的发生。内存屏障可以分为Acquire Barrier和Release Barrier两种类型。

4. 了解一下你主攻平台+语言所提供的工具库,知道常用的工具的用法和使用场景:Mutex,Semaphore,原子操作。这几个算是比较常用的,在各个平台+语言也都有对应实现。

5. 了解常用的多线程设计范式,比如读写锁(Reader/Writer Lock,非常经典的范式,有偏向读和写的不同变形,至少被要求写过3次),生产消费范式(写过2次),一些常用容器的实现,比如BlockingQueue(写过3次)或者concurrentHashmap(写过2次)。

(1)读写锁:适用于多个线程同时访问一个共享资源的情况,其中大部分线程只读取共享资源而很少修改,只有少数线程会修改共享资源。读写锁允许多个线程同时读取共享资源,但只允许一个线程进行写操作。写的时候不可以读,可以多个一起读。比互斥锁性能好(在符合条件情况下)。
饥饿现象:写线程的数量较多时,可能会出现写饥饿现象。写线程一直等待读线程释放读锁,而读线程一直在读取共享资源,导致写线程一直无法获取写锁

   #include <iostream>
   #include <thread>
   #include <mutex>
   #include <shared_mutex>
   #include <chrono>
   #include <vector>
   // 共享资源
   int data = 0;
   // 读写锁
   std::shared_mutex rw_mutex;
   // 读操作
   void read_data(int reader_id)
   {
       // 获取共享锁
       std::shared_lock<std::shared_mutex> lock(rw_mutex);

       std::cout << "Reader " << reader_id << " starts reading data: " << data << std::endl;

       // 模拟读操作
       std::this_thread::sleep_for(std::chrono::milliseconds(100));

       std::cout << "Reader " << reader_id << " finishes reading data: " << data << std::endl;
   }

   // 写操作
   void write_data(int writer_id)
   {
       // 获取独占锁
       std::unique_lock<std::shared_mutex> lock(rw_mutex);

       std::cout << "Writer " << writer_id << " starts writing data: " << ++data << std::endl;

       // 模拟写操作
       std::this_thread::sleep_for(std::chrono::milliseconds(100));

       std::cout << "Writer " << writer_id << " finishes writing data: " << data << std::endl;
   }

   int main()
   {
       // 创建多个读线程和写线程
       std::vector<std::thread> threads;
       for (int i = 0; i < 5; ++i) {
           threads.push_back(std::thread(read_data, i));
           threads.push_back(std::thread(write_data, i));
       }

       // 等待所有线程结束
       for (auto& t : threads) {
           t.join();
       }

       return 0;
   }

(2)生产者-消费者范式:生产者将数据生成并放入一个共享的缓冲区中,而消费者则从缓冲区中取出数据并进行处理。

   #include 
   #include 
   #include 
   #include 
   #include 

   std::mutex mtx; // 互斥量,保证对共享数据的访问是原子的
   std::condition_variable cv; // 条件变量,用于线程间的通信
   std::queue buffer; // 共享缓冲区
   const int BUFFER_SIZE = 10; // 缓冲区的大小

   // 生产者线程函数
   void producer(int id)
   {
       for (int i = 0; i < 20; ++i) { // 生产20个数据
           std::unique_lock lock(mtx); // 获取互斥锁
           while (buffer.size() == BUFFER_SIZE) { // 如果缓冲区已满
               std::cout << "Producer " << id << " waiting...\n"; // 输出等待信息
               cv.wait(lock); // 等待条件变量的通知,同时释放互斥锁
           }
           buffer.push(i); // 将数据放入缓冲区
           std::cout << "Producer " << id << " produced " << i << "\n"; // 输出生产信息
           cv.notify_all(); // 通知等待的线程
       }
   }

   // 消费者线程函数
   void consumer(int id)
   {
       for (int i = 0; i < 20; ++i) { // 消费20个数据
           std::unique_lock lock(mtx); // 获取互斥锁
           while (buffer.empty()) { // 如果缓冲区为空
               std::cout << "Consumer " << id << " waiting...\n"; // 输出等待信息
               cv.wait(lock); // 等待条件变量的通知,同时释放互斥锁
           }
           int val = buffer.front(); // 取出缓冲区中的数据
           buffer.pop(); // 将数据从缓冲区中删除
           std::cout << "Consumer " << id << " consumed " << val << "\n"; // 输出消费信息
           cv.notify_all(); // 通知等待的线程
       }
   }

   int main()
   {
       std::vector threads;
       for (int i = 0; i < 3; ++i) {
           threads.push_back(std::thread(producer, i)); // 创建生产者线程
           threads.push_back(std::thread(consumer, i)); // 创建消费者线程
       }

       for (auto& t : threads) {
           t.join(); // 等待所有线程执行完毕
       }

       return 0;
   }

(3)阻塞队列:当队列为空时,从队列中获取元素的操作会被阻塞,直到有元素被加入队列;当队列已满时,向队列中加入元素的操作会被阻塞,直到有空闲位置可以加入元素;

#include <iostream>
#include <queue>
#include <mutex>
#include <condition_variable>

template <typename T>
class BlockingQueue {
public:
    // 构造函数
    BlockingQueue() {}

    // 阻塞等待队列非空,然后取出队首元素并弹出
    T take() {
        std::unique_lock<std::mutex> lock(m_mutex);
        m_notEmpty.wait(lock, [this] { return !m_queue.empty(); });
        T front(std::move(m_queue.front()));
        m_queue.pop();
        return front;
    }

    // 阻塞等待队列非满,然后加入新元素
    void put(const T& x) {
        std::unique_lock<std::mutex> lock(m_mutex);
        m_notFull.wait(lock, [this] { return m_queue.size() < m_maxSize; });
        m_queue.push(x);
    }

    // 获取当前队列大小
    size_t size() const {
        std::lock_guard<std::mutex> lock(m_mutex);
        return m_queue.size();
    }

private:
    std::queue<T> m_queue;  // 队列
    std::mutex m_mutex;     // 互斥锁
    std::condition_variable m_notEmpty;   // 非空条件变量
    std::condition_variable m_notFull;    // 非满条件变量
    size_t m_maxSize = 100;  // 队列最大容量
};

(4)线程安全哈希表 :可以并发读写的哈希表,多个线程可以同时访问哈希表。

#include <iostream>
#include <vector>
#include <shared_mutex>
#include <functional>

template <typename K, typename V, typename Hash = std::hash<K>>
class ConcurrentHashMap {
private:
    std::vector<std::pair<K, V>> buckets;
    std::vector<std::shared_mutex> locks;
    Hash hash;

public:
    ConcurrentHashMap(size_t num_buckets = 19, const Hash& hash = Hash())
        : buckets(num_buckets), locks(num_buckets), hash(hash) {}

    void put(const K& key, const V& value) {
        auto bucket_index = hash(key) % buckets.size();
        std::unique_lock<std::shared_mutex> lock(locks[bucket_index]);
        for (auto& pair : buckets[bucket_index]) {
            if (pair.first == key) {
                pair.second = value;
                return;
            }
        }
        buckets[bucket_index].emplace_back(key, value);
    }

    bool get(const K& key, V& value) {
        auto bucket_index = hash(key) % buckets.size();
        std::shared_lock<std::shared_mutex> lock(locks[bucket_index]);
        for (const auto& pair : buckets[bucket_index]) {
            if (pair.first == key) {
                value = pair.second;
                return true;
            }
        }
        return false;
    }
};
最后修改日期: 2023年 3月 22日

作者

留言

撰写回覆或留言

发布留言必须填写的电子邮件地址不会公开。