• 数据库连接池


    前言

            Mysql数据库是一个C/S模型,通过客户端来访问服务端。底层使用的是TCP协议。

            在连接数据库和断开数据库时,需要进行一下几个步骤:

    1. 建立通信连接的TCP三次握手。
    2. 数据库服务器的连接认证。
    3. 数据库服务器关闭连接时的资源回收。
    4. 断开通信连接的TCP四次挥手。

            当在我们在程序中需要频繁的访问数据库,也就是需要频繁的连接数据库和断开数据库时,会需要频繁的进行上面的操作。在高并发的情况下会消耗很多时间和资源。

            使用连接池可以减少这部分的消耗。指挥在创建连接池时,建立连接,在销毁连接池对象时,断开连接。需要连接数据库时,在连接池中取出连接,不需要使用数据库时,将连接放会连接池即可。

    连接池的实现

            1. 技术点

    • 多线程编程
    • 线程同步与互斥。互斥锁和条件变量的使用。
    • 处理时间和日期的chrono库。
    • 智能指针
    • lambda表达式
    • MySql C语言库的使用。
    • 单例模式
    • 生产者消费者模型
    • Jsoncpp库的使用

            2. 连接池的设计

            在网络通信中,服务器可能收到多个客户端的请求。连接池部署在服务器上,并且服务端会部署各种组件,其中数据库组件可能部署在另外一台服务器上,需要操作数据库,就需要连接另外一台服务器。

            使用线程池和连接池可以大大的提高效率。

            多线程,可以用来处理不用的任务,比如:接受客户端的请求,处理客户端的请求,从连接池中取出连接等。

            对于连接池,首先在连接池中,与数据库建立若干个连接,需要使用连接数据库时,从连接池中取,不需要连接时,将连接放入连接池中。

     实现的功能点:

    • 连接池只需要一个实例,所以连接池是一个单例模式的类。
    • 所有数据库连接需要维护到一个安全的队列中
      • 方便数据库的连接的添加和删除
      • 由于连接池是只有一个实例,所以需要是线程安全的,需要使用互斥锁来保护队列数据的压入和弹出。
    • 在需要的时候可以从数据库中得到一个或者多个可用的数据库连接。
      • 如果有可用连接直接取出
      • 如果没有,阻塞等待一段时间,在重试
    • 如果队列中没有多余可用连接,需要动态创建。不能超过最大连接数。
    • 如果队列中空闲连接太多,需要动态销毁一部分。
    • 数据库操作完毕,需要将连接归还到连接池中。

    细节分析:

    • 数据库连接的存储:使用生产者消费者模型,将连接保存到队列中。
      • 生产者:专门生产数据库连接的线程。
      • 消费者:需要访问数据库的线程。
      • 处理生产者消费者模型,需要通过条件变量来使用线程的同步。
    • 连接池连接的动态创建:交给一个单独的线程来处理。
    • 连接池连接的动态销毁:交给一个动态的线程处理。
    • 连接池的默认连接数量:连接池中提供了用连接的最小数量。如果不够,就动态创建,如果太多就动态销毁。
    • 连接池的最大连接数量:能够创建的最大有效数据库的连接数量。
    • 最大空闲时间:创建的数据库连接在指定的一定时间内,未被使用,需要进行销毁。
    • 连接超时:当连接池中没有可用连接,消费者线程无法获取连接,阻塞等待的时间。

            3. 封装MySql API的类

            释放数据库的资源,需要释放创建数据库的资源和结果集的资源。

            使用的C语言MySql接口:用C语言API(常用)操作MySql数据库_两片空白的博客-CSDN博客

    1. #pragma once
    2. #include
    3. #include
    4. #include
    5. using namespace std;
    6. using namespace chrono;
    7. class MysqlConn
    8. {
    9. public:
    10. //初始化数据库
    11. MysqlConn();
    12. //释放数据库资源
    13. ~MysqlConn();
    14. //连接数据库
    15. bool connect(string user, string passwd, string dbName, string ip, unsigned short port = 3306);
    16. //更新数据库 update delete insert
    17. bool update(string sql);
    18. //查询数据库
    19. bool query(string sql);
    20. //遍历数据库,主要是查找结果集中的下一行数据
    21. bool next();
    22. //得到结果集中需要行的index列的字段
    23. string value(int index);
    24. //事务操作
    25. bool transaction();
    26. //提交事务
    27. bool commit();
    28. //事务回滚
    29. bool roolback();
    30. //刷新连接等待时间
    31. void refreshAliveTime();
    32. //获得连接等待时间
    33. long long getAliveTime();
    34. private:
    35. //释放结果集的资源
    36. void releaseRes();
    37. MYSQL* m_conn = nullptr;
    38. MYSQL_RES* m_res = nullptr;
    39. MYSQL_ROW m_row = nullptr;
    40. steady_clock::time_point m_aliveTime;
    41. };
    1. #include "MysqlConn.h"
    2. MysqlConn::MysqlConn()
    3. {
    4. m_conn = mysql_init(nullptr);
    5. }
    6. MysqlConn::~MysqlConn()
    7. {
    8. if (m_conn)
    9. {
    10. mysql_close(m_conn);
    11. }
    12. releaseRes();
    13. }
    14. bool MysqlConn::connect(string user, string passwd, string dbName, string ip, unsigned short port)
    15. {
    16. MYSQL* ptr = mysql_real_connect(m_conn, ip.c_str(), user.c_str(), passwd.c_str(), dbName.c_str(), port, nullptr, 0);
    17. if (ptr)
    18. {
    19. return true;
    20. }
    21. return false;
    22. }
    23. bool MysqlConn::update(string sql)
    24. {
    25. if (mysql_query(m_conn, sql.c_str()))
    26. {
    27. return false;
    28. }
    29. return true;
    30. }
    31. bool MysqlConn::query(string sql)
    32. {
    33. //在查询结果集时,需要清空上一次结果集的内存
    34. releaseRes();
    35. if (mysql_query(m_conn, sql.c_str()) == 0)
    36. {
    37. m_res = mysql_store_result(m_conn);
    38. return true;
    39. }
    40. return false;
    41. }
    42. bool MysqlConn::next()
    43. {
    44. if (m_res != nullptr)
    45. {
    46. m_row = mysql_fetch_row(m_res);
    47. if (m_row)
    48. {
    49. return true;
    50. }
    51. }
    52. return false;
    53. }
    54. //参数时这一行的第几列
    55. string MysqlConn::value(int index)
    56. {
    57. //得到当前行中,列的数量
    58. int resNum = mysql_num_fields(m_res);
    59. if (index < 0 || index >= resNum)
    60. {
    61. return string();
    62. }
    63. char* str = m_row[index];
    64. unsigned long length = mysql_fetch_lengths(m_res)[index];//获得index列字段的长度
    65. return string(str, length);
    66. }
    67. bool MysqlConn::transaction()
    68. {
    69. return mysql_autocommit(m_conn, false);
    70. }
    71. bool MysqlConn::commit()
    72. {
    73. return mysql_commit(m_conn);
    74. }
    75. bool MysqlConn::roolback()
    76. {
    77. return mysql_rollback(m_conn);
    78. }
    79. void MysqlConn::refreshAliveTime()
    80. {
    81. m_aliveTime = steady_clock::now();
    82. }
    83. long long MysqlConn::getAliveTime()
    84. {
    85. nanoseconds res = steady_clock::now() - m_aliveTime;
    86. milliseconds millsec = duration_cast(res);
    87. return millsec.count();
    88. }
    89. void MysqlConn::releaseRes()
    90. {
    91. if (m_res)
    92. {
    93. mysql_free_result(m_res);
    94. }
    95. }

      数据库连接池类

    • 使用队列来保存个连接,创建minSize个连接。
    • 使用两个单独的线程,一个用来创建连接,一个用来销毁连接,使队列中的线程个数维持在minSize个。
    • 队列是临界资源,pop和push时需要使用互斥锁加锁。
    • 队列为空,针对消费者和队列连接个数等于minSize,针对生产者都需要时用条件变量等待。当生产者生产连接到队列,消费者消费连接,需要唤醒消费者和生产者。
    • 当使用连接个数大于maxSize,也需要使用条件变量等待。
    • 销毁连接,当连接个数大于minSize,并且连接空闲时长大于maxIdleTime。
    1. #pragma once
    2. #include "MysqlConn.h"
    3. #include
    4. #include
    5. #include
    6. class ConnectionPool
    7. {
    8. public:
    9. ConnectionPool* getConnectionPool();
    10. ConnectionPool(const ConnectionPool& pool) = delete;
    11. ConnectionPool& operator=(const ConnectionPool& pool) = delete;
    12. shared_ptr getConnection();
    13. private:
    14. ConnectionPool();
    15. ~ConnectionPool();
    16. bool parseJsonFile();
    17. void productConnection();
    18. void recycleConnection();
    19. void addConnection();
    20. string m_ip;
    21. string m_userName;
    22. string m_passwd;
    23. string m_dbName;
    24. unsigned short m_port;
    25. int m_maxSize;//最大使用连接数
    26. int m_minSize;//连接池中最少连接数
    27. int m_timeout;
    28. int m_maxIdleTime;
    29. int m_useCount;//使用的连接数
    30. queue m_pool;
    31. mutex m_lock;
    32. condition_variable m_cond;
    33. };
    1. #include "ConnectionPool.h"
    2. #include
    3. #include
    4. #include
    5. using namespace Json;
    6. ConnectionPool::ConnectionPool()
    7. {
    8. //读取配置
    9. if (!parseJsonFile())
    10. {
    11. return;
    12. }
    13. //生成连接
    14. for (int i = 0; i < m_minSize; ++i)
    15. {
    16. addConnection();
    17. }
    18. //判断连接个数,是否需要创建连接或者销毁连接
    19. //单独用两个线程处理,处理的是连接池中的连接数
    20. thread produtor(&ConnectionPool::productConnection, this);
    21. thread recyclor(&ConnectionPool::recycleConnection, this);
    22. produtor.detach();
    23. recyclor.detach();
    24. }
    25. ConnectionPool::~ConnectionPool()
    26. {
    27. while (!m_pool.empty())
    28. {
    29. MysqlConn* conn = m_pool.front();
    30. m_pool.pop();
    31. delete conn;
    32. }
    33. }
    34. ConnectionPool* ConnectionPool::getConnectionPool()
    35. {
    36. static ConnectionPool pool;
    37. return &pool;
    38. }
    39. bool ConnectionPool::parseJsonFile()
    40. {
    41. ifstream ifs("dbConfig.json");
    42. Reader rd;
    43. Value root;
    44. rd.parse(ifs, root);
    45. if (root.isObject())
    46. {
    47. m_ip = root["ip"].asString();
    48. m_port = root["port"].asInt();
    49. m_userName = root["userName"].asString();
    50. m_passwd = root["passwd"].asString();
    51. m_dbName = root["dbName"].asString();
    52. m_minSize = root["minSize"].asInt();
    53. m_maxSize = root["maxSize"].asInt();
    54. m_maxIdleTime = root["maxIdleTime"].asInt();
    55. m_timeout = root["timeout"].asInt();
    56. return true;
    57. }
    58. return false;
    59. }
    60. void ConnectionPool::productConnection()
    61. {
    62. while (true)
    63. {
    64. //对整个局域加锁
    65. unique_lock locker(m_lock);
    66. while (m_pool.size() >= m_minSize)
    67. {
    68. m_cond.wait(locker);
    69. }
    70. addConnection();
    71. //唤醒消费者,生产者只有一个线程,就是当前线程
    72. m_cond.notify_all();
    73. }
    74. }
    75. //当连接池的连接个数大于minSize并且等待时长大于最大等待时间,需要销毁
    76. void ConnectionPool::recycleConnection()
    77. {
    78. while (true)
    79. {
    80. this_thread::sleep_for(milliseconds(500));
    81. lock_guard locker(m_lock);
    82. while (m_pool.size() > m_minSize)
    83. {
    84. MysqlConn* conn = m_pool.front();
    85. if (conn->getAliveTime() >= m_maxIdleTime)
    86. {
    87. //加锁
    88. m_pool.pop();
    89. delete conn;
    90. conn = nullptr;
    91. }
    92. else
    93. {
    94. break;
    95. }
    96. }
    97. }
    98. }
    99. void ConnectionPool::addConnection()
    100. {
    101. MysqlConn* conn = new MysqlConn;
    102. conn->connect(m_userName, m_passwd, m_dbName, m_ip, m_port);
    103. conn->refreshAliveTime();
    104. m_pool.push(conn);
    105. }
    106. shared_ptr ConnectionPool::getConnection()
    107. {
    108. //加锁
    109. unique_lock locker(m_lock);
    110. while (m_pool.empty() || m_useCount >= m_maxSize)
    111. {
    112. //等待一段时间
    113. if (cv_status::no_timeout == m_cond.wait_for(locker, milliseconds(m_timeout)))
    114. {
    115. //一段时间内没有被唤醒
    116. if (m_pool.empty() || m_useCount >= m_maxSize)
    117. {
    118. continue;
    119. }
    120. }
    121. }
    122. //从连接池中取连接
    123. shared_ptr conn(m_pool.front(), [this](MysqlConn* conn) {
    124. //删除器,将连接放回连接池
    125. lock_guard lock(m_lock);
    126. conn->refreshAliveTime();
    127. m_pool.push(conn);
    128. m_useCount--;
    129. m_cond.notify_all();
    130. });
    131. //lock_guard lock(m_lock);上面加了
    132. m_pool.pop();
    133. m_useCount++;
    134. //唤醒生产者,由于生产者和消费者使用的同一个环境变量,会将消费者也唤醒
    135. //但是不影响,会继续判断
    136. m_cond.notify_all();
    137. return conn;
    138. }

    测试

    1. #include
    2. #include "MysqlConn.h"
    3. #include "ConnectionPool.h"
    4. //测试数据库API
    5. void query()
    6. {
    7. MysqlConn conn;
    8. conn.connect("root", "Root_123", "testdb", "114.55.92.82", 3306);
    9. string sql = "insert into person value(2, 20, \"man\", \"jj\")";
    10. conn.query(sql);
    11. sql = "select * from person";
    12. bool flag = conn.query(sql);
    13. cout << "flag value" << flag << endl;
    14. while (conn.next())
    15. {
    16. cout << conn.value(0) << ","
    17. << conn.value(1) << ","
    18. << conn.value(2) << ","
    19. << conn.value(3) << ","
    20. << endl;
    21. }
    22. }
    23. void op1(int begin, int end)
    24. {
    25. for (int i = begin; i < end; ++i)
    26. {
    27. MysqlConn conn;
    28. conn.connect("root", "Root_123", "testdb", "114.55.92.82", 3306);
    29. //string sql;
    30. char sql[1024] = { 0 };
    31. sprintf(sql, "insert into person value(%d, 20, \"man\", \"jj\")", i);
    32. conn.update(sql);
    33. }
    34. }
    35. void op2(ConnectionPool* pool, int begin, int end)
    36. {
    37. for (int i = begin; i < end; ++i)
    38. {
    39. shared_ptr ptr = pool->getConnection();
    40. char sql[1024] = {0};
    41. sprintf(sql, "insert into person value(%d, 20, \"man\", \"jj\")", i);
    42. ptr->update(sql);
    43. }
    44. }
    45. //单线程
    46. void test1()
    47. {
    48. #if 1
    49. //非连接池 单线程 耗时108848850600纳秒 108848毫秒
    50. steady_clock::time_point begin = steady_clock::now();
    51. op1(0, 1000);
    52. steady_clock::time_point end = steady_clock::now();
    53. auto length = end - begin;
    54. cout << "非连接池 单线程 耗时" << length.count() << "纳秒 "
    55. << length.count() / 1000000 << "毫秒" << endl;
    56. #else
    57. //连接池 单线程 耗时16893997100纳秒 16893毫秒
    58. ConnectionPool* pool = ConnectionPool::getConnectionPool();
    59. steady_clock::time_point begin = steady_clock::now();
    60. op2(pool, 0, 1000);
    61. steady_clock::time_point end = steady_clock::now();
    62. auto length = end - begin;
    63. cout << "连接池 单线程 耗时" << length.count() << "纳秒 "
    64. << length.count() / 1000000 << "毫秒" << endl;
    65. #endif
    66. }
    67. void test2()
    68. {
    69. #if 0
    70. //非连接池 多线程 耗时20575826400纳秒 20575毫秒
    71. //如果同一时间多个线程连接数据库,会导致一些线程连接失败
    72. MysqlConn conn;
    73. conn.connect("root", "Root_123", "testdb", "114.55.92.82", 3306);
    74. steady_clock::time_point begin = steady_clock::now();
    75. thread t1(op1, 0, 200);
    76. thread t2(op1, 200, 400);
    77. thread t3(op1, 400, 600);
    78. thread t4(op1, 600, 800);
    79. thread t5(op1, 800, 1000);
    80. t1.join();
    81. t2.join();
    82. t3.join();
    83. t4.join();
    84. t5.join();
    85. steady_clock::time_point end = steady_clock::now();
    86. auto length = end - begin;
    87. cout << "非连接池 多线程 耗时" << length.count() << "纳秒 "
    88. << length.count() / 1000000 << "毫秒" << endl;
    89. #else
    90. //连接池 多线程 耗时4014272600纳秒 4014毫秒
    91. ConnectionPool* pool = ConnectionPool::getConnectionPool();
    92. steady_clock::time_point begin = steady_clock::now();
    93. thread t1(op2, pool, 0, 200);
    94. thread t2(op2, pool, 200, 400);
    95. thread t3(op2, pool, 400, 600);
    96. thread t4(op2, pool, 600, 800);
    97. thread t5(op2, pool, 800, 1000);
    98. t1.join();
    99. t2.join();
    100. t3.join();
    101. t4.join();
    102. t5.join();
    103. steady_clock::time_point end = steady_clock::now();
    104. auto length = end - begin;
    105. cout << "连接池 多线程 耗时" << length.count() << "纳秒 "
    106. << length.count() / 1000000 << "毫秒" << endl;
    107. #endif
    108. }
    109. int main()
    110. {
    111. test2();
    112. return 0;
    113. }

    在vs中引用第三方库

    当前项目中会需要使用mysql和jsoncpp

    • 进入项目属性
    • 在下面两项中加入头文件路径和库文件路径

    •  在下面加上引用第三方库名

    遇到的问题

    • 编译时报错,找不到libmysql.lib库。

    找到对应动态库,将对应动态库加到项目对应exe文件下。

    • 随后报下面错误

            找到下载的mysql代码,在bin目录下,找到对应库文件,拷贝到项目的对应exe文件下

  • 相关阅读:
    PLC信号发生器(余弦信号)
    目标检测Neck:FPN(Feature Pyramid Network)与PAN(附python代码)
    LCA 的若干种求法
    通过 360 反馈提高团队绩效
    机器学习(公式推导与代码实现)--sklearn机器学习库
    vue3 + ts 项目实站 【二】 vue-router 安装. (后台管理系统)
    Go标准库Context
    MySQL数据库基础知识(一)
    Shell编程
    Fama-French三因子和五因子模型和Stata代码(内附原始数据)
  • 原文地址:https://blog.csdn.net/weixin_57023347/article/details/126694343