• C++闲谈03——多线程


    C++闲谈03——多线程

    多进程

    优点

    • 每个进程相互独立 不影响主程序的稳定性 子进程崩溃了没关系
    • 通过增加CPU 就可以扩充性能
    • 可以尽量减少线程加锁解锁的影响
    • 每个子进程都有3GB地址空间和相关资源,总体能够达到的性能上限非常大。

    缺点

    • 逻辑控制复杂,需要和主程序交互;
    • 需要跨进程边界,如果有大数据量传送,就不太好,适合小数据量传送、密集运算 多进程调度开销比较大;
    • 最好是多进程和多线程结合,即根据实际的需要,每个CPU开启一个子进程,这个子进程开启多线程可以为若干同类型的数据进行处理。当然你也可以利用多线程+多CPU+轮询方式来解决问题
    • 方法和手段是多样的,关键是自己看起来实现方便有能够满足要求,代价也合适。

    多线程

    优点

    • 无需跨进程边界;
    • 程序逻辑和控制方式简单;
    • 所有线程可以直接共享内存和变量等;
    • 线程方式消耗的总资源比进程方式好。

    缺点

    • 每个线程与主程序共用地址空间,受限于2GB地址空间;
    • 线程之间的同步和加锁控 制比较麻烦;
    • 一个线程的崩溃可能影响到整个程序的稳定性;
    • 到达一定的线程数程度后,即使再增加CPU也无法提高性能,例如Windows Server 2003,大约是1500个左右的线程数就快到极限了(线程堆栈设定为1M),如果设定线程堆栈为2M,还达不到1500个线程总数;
    • 线程能够提高的总性能有限,而且线程多了之后,线程本身的调度也是一个麻烦事儿,需要消耗较多的CPU。

    多线程和多进程的区别

    线程是进程的子集,一个进程可能由多个线程组成。

    多进程的数据是分开的、共享复杂,需要用到IPC通信,但是同步简单

    多线程共享进程的数据,共享简单,但是同步复杂

    C++11 ——thread

    阻塞分离

    join:Join 线程,调用该函数会阻塞当前线程,直到由 *this 所标示的线程执行完毕 join 才返回。

    detach: Detach 线程。 将当前线程对象所代表的执行实例与该线程对象分离,不会阻塞当前线程,使得线程的执行可以单独进行。一旦线程执行完毕,它所分配的资源将会被释放。

    • lock_guard 类似于智能指针里的scoped_ptr

    lock_guard 通常用来管理一个 std::mutex 类型的对象,通过定义一个 lock_guard 一个对象来管理 std::mutex 的上锁和解锁。在 lock_guard 初始化的时候进行上锁,然后在 lock_guard 析构的时候进行解锁。这样避免了人为的对 std::mutex 的上锁和解锁的管理。

    定义如下:

    template class lock_guard;

    它的特点如下:
    (1) 创建即加锁,作用域结束自动析构并解锁,无需手工解锁
    (2) 不能中途解锁,必须等作用域结束才解锁
    (3) 不能复制

    注意:

    lock_guard 并不管理 std::mutex 对象的声明周期,也就是说在使用 lock_guard 的过程中,如果 std::mutex 的对象被释放了,那么在 lock_guard 析构的时候进行解锁就会出现空指针错误。

    • unique_lock

    简单地讲,unique_lock 是 lock_guard 的升级加强版,它具有 lock_guard 的所有功能,同时又具有其他很多方法,使用起来更强灵活方便,能够应对更复杂的锁定需要。

    特点如下:

    创建时可以不锁定(通过指定第二个参数为 std::defer_lock),而在需要时再锁定
    可以随时加锁解锁
    作用域规则同 lock_grard,析构时自动释放锁
    不可复制,可移动
    条件变量需要该类型的锁作为参数(此时必须使用 unique_lock)

    生产者消费者模型

    C++11多线程实现生产者消费者

    单生产者单消费者模型

    #include 
    #include 
    #include 
    #include 
    #include  // C++ STL所有的容器都不是线程安全
    using namespace std;
    
    /*
    C++多线程编程 - 线程间的同步通信机制
    多线程编程两个问题:
    1.线程间的互斥
    竟态条件 =》 临界区代码段 =》 保证原子操作 =》互斥锁mutex  轻量级的无锁实现CAS
    strace ./a.out mutex => pthread_mutex_t
    2.线程间的同步通信
    生产者,消费者线程模型
    */
    
    std::mutex mtx; // 定义互斥锁,做线程间的互斥操作
    std::condition_variable cv; // 定义条件变量,做线程间的同步通信操作
    
    // 生产者生产一个物品,通知消费者消费一个;消费完了,消费者再通知生产者继续生产物品
    class Queue   // 对queue重新封装一下
    {
    public:
    	void put(int val) // 生产物品
    	{
    		//lock_guard guard(mtx); // scoped_ptr   不能同时使用两把锁
    		unique_lock<std::mutex> lck(mtx); // unique_ptr
    		while (!que.empty())
    		{
    			// que不为空,生产者应该通知消费者去消费,消费完了,再继续生产
    			// 生产者线程进入#1等待状态(阻塞状态),并且#2把mtx互斥锁释放掉 消费者线程就能抢到这把锁   不释放锁  无法消费
    			cv.wait(lck);  // lck.lock()  lck.unlock
    		}
    		que.push(val);
    		/* 
    		notify_one:通知另外的一个线程的
    		notify_all:通知其它所有线程的
    		通知其它所有的线程,我生产了一个物品,你们赶紧消费吧
    		其它线程得到该通知,就会从等待状态 =》 阻塞状态 =》 获取互斥锁才能继续执行
    		*/
    		cv.notify_all(); 
    		cout << "生产者 生产:" << val << "号物品" << endl;
    	}
    	int get() // 消费物品
    	{
    		//lock_guard guard(mtx); // scoped_ptr
    		unique_lock<std::mutex> lck(mtx); // unique_ptr
    		while (que.empty())
    		{
    			// 消费者线程发现que是空的,通知生产者线程先生产物品
    			// #1 进入等待状态 # 把互斥锁mutex释放
    			cv.wait(lck);
    		}
    		int val = que.front();
    		que.pop();
    		cv.notify_all(); // 通知其它线程我消费完了,赶紧生产吧
    		cout << "消费者 消费:" << val << "号物品" << endl;
    		return val;
    	}
    private:
    	queue<int> que;
    };
    
    void producer(Queue *que) // 生产者线程  生产10个物品
    {
    	for (int i = 1; i <= 10; ++i)
    	{
    		que->put(i);
    		std::this_thread::sleep_for(std::chrono::milliseconds(100));
    	}
    }
    void consumer(Queue *que) // 消费者线程
    {
    	for (int i = 1; i <= 10; ++i)
    	{
    		que->get();
    		std::this_thread::sleep_for(std::chrono::milliseconds(100));
    	}
    }
    int main()
    {
    	Queue que; // 两个线程共享的队列
    
    	std::thread t1(producer, &que);
    	std::thread t2(consumer, &que);
    
    	t1.join(); // 主线程等到两个子线程执行完  继续执行
    	t2.join();
        
        system("pause");
    	return 0;
    }
    
    /*
    线程间互斥 : 临界区 原子类型  互斥锁  信号量
    
    线程间同步 : 条件变量 信号量
    */
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99

    多生产者多消费者

    #include 
    #include 
    #include 
    #include 
    #include 
    using namespace std;
    
    class Queue
    {
    public:
    	Queue() :pro_counter(1) {}
    	void put(int val)
    	{
    		// lock_guard lock(mtx);
    		unique_lock<mutex> lck(mtx);
    		while (!que.empty())
    		{
    			// que不为空,生产者应该通知消费之去消费,
    			// 使用条件变量,当不为空,就将锁释放掉,
    
    			cv_no_full.wait(lck);
    		}
    		que.push(val);
    		cv_no_empty.notify_all(); // 通知所有的消费线程去消费
    		// cv.notify_one(); // 通知一个线程
    		// 其他线程得到该通知,就从等待状态,编程
    		// 阻塞状态,然后在获取互斥锁继续执行。
    	}
    
    	int get()
    	{
    		int val = 0;
    		{
    			unique_lock<mutex> lck(mtx);
    			while (que.empty())
    			{
    				// 通知生产者生产,
    				// 进入等待状态,释放互斥锁
    				cv_no_empty.wait(lck);
    			}
    			val = que.front();
    			que.pop();
    			cv_no_full.notify_all(); // 通知消费线程,我消费完了,赶紧生产吧
    		}
    		return val;
    	}
    
    
    	mutex producer_mtx; // 互斥生产者之间获取队列
    	mutex comsumer_mtx; // 互斥消费者之间获取队列
    	int pro_counter;  // 记录生产者已生产物品的个数
    	int con_counter;  // 记录消费者已消费物品的个数
    	const int MAX_PRODUCER = 10; // 最多生产的个数
    private:
    	queue<int> que;  // 共享队列
    	mutex mtx;       // 互斥生产者和消费者获取队列
    	condition_variable cv_no_full; // 用于共享队列不满
    	condition_variable cv_no_empty; // 用于共享队列不空
    };
    
    Queue que;
    
    
    void consumer(Queue *que)
    {
    	int flag = false;
    	for (;;)
    	{
    		this_thread::sleep_for(chrono::milliseconds(100));
    		{
    			lock_guard<mutex> lck(que->comsumer_mtx);
    			if (que->con_counter < que->MAX_PRODUCER)
    			{
    				int data = que->get();
    				++que->con_counter;
    				cout << this_thread::get_id() << "消费 : " << data << "商品" << endl;
    			}
    			else
    				flag = true;
    		}
    		if (flag)
    			break;
    	}
    }
    
    void producer(Queue *que)
    {
    	int flag(false);
    	for(;;)
    	{
    		{
    			lock_guard<mutex> lck(que->producer_mtx);
    			if (que->pro_counter <= que->MAX_PRODUCER)
    			{
    				que->put(que->pro_counter);
    				cout << this_thread::get_id() << "生产 : "<< que->pro_counter << "商品" << endl;
    				++que->pro_counter;
    			}
    			else
    				flag = true;
    		}
    		if (flag)
    			break;
    		this_thread::sleep_for(chrono::milliseconds(100));
    	}
    }
    
    
    int main()
    {
    	list<thread> pro_lst; // 生产者线程
    	for (int i = 0; i < 3; ++i)
    		pro_lst.push_back(thread(producer, &que));
    	list<thread> con_lst; // 消费者线程
    	for (int i = 0; i < 3; ++i)
    		pro_lst.push_back(thread(consumer, &que));
    
    	for (auto &it : pro_lst) // 注意这里只能使用引用&it,因为thread是没有左值的提供拷贝构造和赋值
    		it.join();
    	for (auto &it : con_lst)
    		it.join();
    	return 0;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123

    atomic底层实现

    c++11总结23——CAS(无锁队列)

    CAS操作是一条CPU的原子指令

    CAS(addr, old, new)
    
    • 1

    将addr存放的只与old比较,如果等于old,则将new赋值给addr

    //输入一个pAddr的地址,在函数内部判断其的值是否与期望值nExpected相等
    //如果相等那么就将pAddr的值改为nNew并同时返回true;否则就返回false,什么都不做
     
    bool compare_and_swap(int *pAddr, int nExpected, int nNew)
    {
        if(*pAddr == nExpected)
        {
            *pAddr = nNew;
            return true;
        }
        else
            return false;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    用预期值A1和内存值A2做对比,如果A1等于A2,则内存值修改成B并返回true,

    否则不操作并返回false。

    volatile

    volatile的底层原理与实现

  • 相关阅读:
    【知网检索征稿】第四届大数据经济与信息化管理国际学术会议(BDEIM2023)
    这几种母乳才不能吃,“隔夜”的母乳不能吃?那是对母乳不够了解
    配置sonarQube
    记一次多个Java Agent同时使用的类增强冲突问题及分析
    CAS:1260586-88-6_生物素-C5-叠氮_Biotin-C5-Azide
    Xcode14创建github远程仓库Token
    Java类和对象:类是对象的模板,对象是类的实例化
    如何快速高效压缩图片?
    RabbitMQ+SpringBoot企业版队列实战------【华为云版】
    企业想要做好数据分析,可以试试瓴羊Quick BI
  • 原文地址:https://blog.csdn.net/qq_41945053/article/details/128140525