• 基于C++11的数据库连接池实现


    0.注意

    该篇文章为了让大家尽快看到效果,代码放置比较靠前,看代码前务必看下第4部分的基础知识。

    1.数据库连接池

    1.1 是什么?

    数据库连接池负责分配、管理和释放数据库连接,属于池化机制的一种,类似的还有线程池等。

    1.2 为什么用?

    各种池化技术的使用原因都是类似的,也就是单独操作比较浪费系统资源,利用池提前准备一些资源,在需要时可以重复使用这些预先准备的资源,从而减少系统开销,实现资源重复利用。对于数据库连接来关闭来说,需要经过四步:
    (1)建立通信连接的 TCP 三次握手
    (2)数据库服务器的连接认证
    (3)数据库服务器关闭连接时的资源回收
    (4)断开通信连接的 TCP 四次挥手
    而利用数据库连接池则减少了这几步的系统开销,更加的高效。

    1.3如何设计?

    原理类似于线性池,在数据库连接池中提前创建好多个数据库连接,使用时从数据库连接池中取出,使用完放回数据库连接池。数据库连接池中的数据库连接调度由数据库连接池调度。

    2.基于C++11的实现

    Talk is cheap. Show me the code.

    直接看程序,原理、函数在后面再介绍。

    2.1程序

    (0)全部代码:
    下载之一即可
    百度网盘链接:https://pan.baidu.com/s/1wvcLn0CgZxbDpYdapDVUow?pwd=bnd9 提取码:bnd9
    阿里云盘连接:https://www.aliyundrive.com/s/Emsy9UJLxiv 提取码: h46t
    夸克网盘链接:https://pan.quark.cn/s/58eb69a3f0fb 提取码:iRiw
    (1)MysqlConn.h

    #pragma once
    #include <iostream>
    #include <mysql.h>
    #include <chrono>
    using namespace std;
    using namespace chrono;
    class MysqlConn
    {
    public:
        // 初始化数据库连接
        MysqlConn();
        // 释放数据库连接
        ~MysqlConn();
        // 连接数据库
        bool connect(string user, string passwd, string dbName, string ip, unsigned short port = 3306);
        // 更新数据库: insert, update, delete
        bool update(string sql);
        // 查询数据库
        bool query(string sql);
        // 遍历查询得到的结果集
        bool next();
        // 得到结果集中指定位置的字段值
        string value(int index);
        // 事务操作(提交方式)
        bool transaction();
        // 提交事务
        bool commit();
        // 事务回滚 
        bool rollback();
        // 刷新起始的空闲时间点
        void refreshAliveTime();
        // 计算连接存活的总时长
        long long getAliveTime();
    private:
        void freeResult();//释放m_result空间
        MYSQL* m_conn = nullptr;
        MYSQL_RES* m_result = nullptr;
        MYSQL_ROW m_row = nullptr;
        steady_clock::time_point m_alivetime;//当前时间点
    };
    

    (2)MysqlConn.cpp

    #include "MysqlConn.h"
    
    MysqlConn::MysqlConn()
    {
        m_conn = mysql_init(nullptr);//初始化mysql
        mysql_set_character_set(m_conn, "utf8");//设置编码格式维utf8
    }
    
    MysqlConn::~MysqlConn()
    {
        if (m_conn != nullptr)
        {
            mysql_close(m_conn);
        }
        freeResult();
    }
    
    bool MysqlConn::connect(string user, string passwd, string dbName, string ip, unsigned short port)
    {
        MYSQL* ptr = mysql_real_connect(m_conn, ip.c_str(), user.c_str(), passwd.c_str(), dbName.c_str(), port, nullptr, 0);
        return ptr != nullptr;
    }
    
    bool MysqlConn::update(string sql)
    {
        if (mysql_query(m_conn, sql.c_str()))
        {
            return false;
        }
        return true;
    }
    
    bool MysqlConn::query(string sql)
    {
        freeResult();
        if (mysql_query(m_conn, sql.c_str()))
        {
            return false;
        }
        m_result = mysql_store_result(m_conn);
        return true;
    }
    
    bool MysqlConn::next()
    {
        if (m_result != nullptr)
        {
            m_row = mysql_fetch_row(m_result); //检索结果集的下一行,如果没有要检索的行,mysql_fetch_row()返回NULL
            if (m_row != nullptr)
            {
                return true;
            }
        }
        return false;
    }
    
    string MysqlConn::value(int index)
    {
        int rowCount = mysql_num_fields(m_result);
        if (index >= rowCount || index < 0)
        {
            return string();
        }
        char* val = m_row[index];
        unsigned long length = mysql_fetch_lengths(m_result)[index];//为了避免下一步在“/0”处被截断
        return string(val, length);
    }
    
    bool MysqlConn::transaction()
    {
        return mysql_autocommit(m_conn, false);//事务提交方式改为手动提交
    }
    
    bool MysqlConn::commit()
    {
        return mysql_commit(m_conn);
    }
    
    bool MysqlConn::rollback()
    {
        return mysql_rollback(m_conn);
    }
    
    void MysqlConn::refreshAliveTime()
    {
        m_alivetime = steady_clock::now();
    }
    
    long long MysqlConn::getAliveTime()
    {
        nanoseconds res = steady_clock::now() - m_alivetime;
        milliseconds millsec = duration_cast<milliseconds>(res);
        return millsec.count();
    }
    
    void MysqlConn::freeResult()
    {
        if (m_result)
        {
            mysql_free_result(m_result);
            m_result = nullptr;
        }
    }
    

    (3)ConnectionPool.h

    #pragma once
    #include <queue>
    #include <mutex>
    #include <condition_variable>
    #include "MysqlConn.h"
    using namespace std;
    class ConnectionPool
    {
    public:
        //单例模式
        static ConnectionPool* getConnectPool();
        ConnectionPool(const ConnectionPool& obj) = delete;
        ConnectionPool& operator=(const ConnectionPool& obj) = delete;
        ~ConnectionPool();
    
        shared_ptr<MysqlConn> getConnection();//任务从连接池中获取一个连接
        
    private:
        ConnectionPool();//单例模式
    
        bool parseJsonFile();//解析Json配置文件
        void produceConnection();//生产新的连接
        void recycleConnection();//回收多余连接
        void addConnection();//添加单个连接
    
        //MysqlConn::connect所需要的参数
        string m_ip;
        string m_user;
        string m_passwd;
        string m_dbName;
        unsigned short m_port;
    
        //连接池的参数
        int m_minSize;//最小连接数
        int m_maxSize;//最大连接数
        int m_timeout;//超时等待时间
        int m_maxIdleTime;//待回收线程的超时时间
    
        queue<MysqlConn*> m_connectionQ;//任务队列
        mutex m_mutexQ;//互斥锁
        condition_variable m_cond;//条件变量
    };
    

    (4)ConnectionPool.cpp

    #include "ConnectionPool.h"
    #include <json/json.h>
    #include <fstream>
    #include <thread>
    using namespace Json;
    ConnectionPool* ConnectionPool::getConnectPool()
    {
        static ConnectionPool pool;
        return &pool;
    }
    
    bool ConnectionPool::parseJsonFile()
    {
        ifstream ifs("dbconf.json");
        Reader rd;
        Value root;
        rd.parse(ifs, root);
        if (root.isObject())
        {
            m_ip = root["ip"].asString();
            m_port = root["port"].asInt();
            m_user = root["userName"].asString();
            m_passwd = root["password"].asString();
            m_dbName = root["dbName"].asString();
            m_minSize = root["minSize"].asInt();
            m_maxSize = root["maxSize"].asInt();
            m_maxIdleTime = root["maxIdleTime"].asInt();
            m_timeout = root["timeout"].asInt();
            return true;
        }
        return false;
    }
    
    void ConnectionPool::produceConnection()
    {
        while (true)
        {
            unique_lock<mutex> locker(m_mutexQ);
            while (m_connectionQ.size() >= m_minSize)
            {
                m_cond.wait(locker);
            }
            addConnection();
            m_cond.notify_all();
        }
    }
    
    void ConnectionPool::recycleConnection()
    {
        while (true)
        {
            this_thread::sleep_for(chrono::milliseconds(500));
            lock_guard<mutex> locker(m_mutexQ);
            while (m_connectionQ.size() > m_minSize)
            {
                MysqlConn* conn = m_connectionQ.front();
                if (conn->getAliveTime() >= m_maxIdleTime)
                {
                    m_connectionQ.pop();
                    delete conn;
                }
                else
                {
                    break;
                }
            }
        }
    }
    
    void ConnectionPool::addConnection()
    {
        MysqlConn* conn = new MysqlConn;
        conn->connect(m_user, m_passwd, m_dbName, m_ip, m_port);
        conn->refreshAliveTime();
        m_connectionQ.push(conn);
    }
    
    shared_ptr<MysqlConn> ConnectionPool::getConnection()
    {
        unique_lock<mutex> locker(m_mutexQ);
        while (m_connectionQ.empty())
        {
            if (cv_status::timeout == m_cond.wait_for(locker, chrono::milliseconds(m_timeout)))
            {
                if (m_connectionQ.empty())
                {
                    //return nullptr;
                    continue;
                }
            }
        }
        shared_ptr<MysqlConn> connptr(m_connectionQ.front(), [this](MysqlConn* conn) {
            lock_guard<mutex> locker(m_mutexQ);
            conn->refreshAliveTime();
            m_connectionQ.push(conn);
            });//自定义shared_ptr的析构方法
        m_connectionQ.pop();
        m_cond.notify_all();
        return connptr;
    }
    
    ConnectionPool::~ConnectionPool()
    {
        while (!m_connectionQ.empty())
        {
            MysqlConn* conn = m_connectionQ.front();
            m_connectionQ.pop();
            delete conn;
        }
    }
    
    ConnectionPool::ConnectionPool()
    {
        // 加载配置文件
        if (!parseJsonFile())
        {
            return;
        }
    
        for (int i = 0; i < m_minSize; ++i)
        {
            addConnection();
        }
        thread producer(&ConnectionPool::produceConnection, this);
        thread recycler(&ConnectionPool::recycleConnection, this);
        producer.detach();
        recycler.detach();
    }
    

    2.2 测试代码main.cpp

    #include <iostream>
    #include <memory>
    #include "MysqlConn.h"
    #include "ConnectionPool.h"
    using namespace std;
    // 1. 单线程: 使用/不使用连接池
    // 2. 多线程: 使用/不使用连接池
    
    void op1(int begin, int end)
    {
        for (int i = begin; i < end; ++i)
        {
            MysqlConn conn;
            conn.connect("root", "123159", "testdb", "192.168.237.131");
            char sql[1024] = { 0 };
            sprintf(sql, "insert into person values(%d, 25, 'man', 'tom')", i);
            conn.update(sql);
        }
    }
    
    void op2(ConnectionPool* pool, int begin, int end)
    {
        for (int i = begin; i < end; ++i)
        {
            shared_ptr<MysqlConn> conn = pool->getConnection();
            char sql[1024] = { 0 };
            sprintf(sql, "insert into person values(%d, 25, 'man', 'tom')", i);
            conn->update(sql);
        }
    }
    
    void test1()
    {
    #if 1
        // 非连接池, 单线程, 用时: 21037278300 纳秒, 21037 毫秒
        steady_clock::time_point begin = steady_clock::now();
        op1(0, 5000);
        steady_clock::time_point end = steady_clock::now();
        auto length = end - begin;
        cout << "非连接池, 单线程, 用时: " << length.count() << " 纳秒, "
            << length.count() / 1000000 << " 毫秒" << endl;
    #else
        // 连接池, 单线程, 用时: 8838406500 纳秒, 8838 毫秒
        ConnectionPool* pool = ConnectionPool::getConnectPool();
        steady_clock::time_point begin = steady_clock::now();
        op2(pool, 0, 5000);
        steady_clock::time_point end = steady_clock::now();
        auto length = end - begin;
        cout << "连接池, 单线程, 用时: " << length.count() << " 纳秒, "
            << length.count() / 1000000 << " 毫秒" << endl;
    
    #endif
    }
    
    void test2()
    {
    #if 0
        // 非连接池, 多单线程, 用时: 13277417000 纳秒, 13277 毫秒
        MysqlConn conn;
        conn.connect("root", "root", "testdb", "192.168.237.131");
        steady_clock::time_point begin = steady_clock::now();
        thread t1(op1, 0, 1000);
        thread t2(op1, 1000, 2000);
        thread t3(op1, 2000, 3000);
        thread t4(op1, 3000, 4000);
        thread t5(op1, 4000, 5000);
        t1.join();
        t2.join();
        t3.join();
        t4.join();
        t5.join();
        steady_clock::time_point end = steady_clock::now();
        auto length = end - begin;
        cout << "非连接池, 多单线程, 用时: " << length.count() << " 纳秒, "
            << length.count() / 1000000 << " 毫秒" << endl;
    
    #else
        // 连接池, 多单线程, 用时: 3938502100 纳秒, 3938 毫秒
        ConnectionPool* pool = ConnectionPool::getConnectPool();
        steady_clock::time_point begin = steady_clock::now();
        thread t1(op2, pool, 0, 1000);
        thread t2(op2, pool, 1000, 2000);
        thread t3(op2, pool, 2000, 3000);
        thread t4(op2, pool, 3000, 4000);
        thread t5(op2, pool, 4000, 5000);
        t1.join();
        t2.join();
        t3.join();
        t4.join();
        t5.join();
        steady_clock::time_point end = steady_clock::now();
        auto length = end - begin;
        cout << "连接池, 多单线程, 用时: " << length.count() << " 纳秒, "
            << length.count() / 1000000 << " 毫秒" << endl;
    
    #endif
    }
    
    int query()
    {
        MysqlConn conn;
        bool tt = conn.connect("root", "123159", "testdb", "127.0.0.1");
        cout << "tt:  " << tt << endl;
        string sql = "insert into person values(7, 25, 'man', 'tom')";
        bool flag = conn.update(sql);
        cout << "flag value:  " << flag << endl;
    
        sql = "select * from person";
        conn.query(sql);
        while (conn.next())
        {
            cout << conn.value(0) << ", "
                << conn.value(1) << ", "
                << conn.value(2) << ", "
                << conn.value(3) << endl;
        }
        return 0;
    }
    int main()
    {
        test2();
        return 0;
    }
    

    3.配置

    需要配置jsoncpp和mysql
    (1)配置jasoncpp

    (2)配置mysql

    4.基础知识

    4.1MySQL在C语言中的API

    参考:https://www.mysqlzh.com/doc/196/115.html

    4.2 jsoncpp的基本知识

    (1)基本函数

    (2)解析Json格式数据

    5.参考

    https://www.bilibili.com/video/BV1Fr4y1s7w4
    https://subingwen.cn/cpp/dbconnectionPool/

  • 相关阅读:
    Hadoop完全分布式环境搭建
    解码字母到整数映射(Java算法每日一题)
    如何合并多个PDF文件?这几个小妙招快来码住吧
    【StreamSets 】重置管道状态——管道的数据记忆
    【红队】ATT&CK - 自启动 - 注册表运行键、启动文件夹
    本地搭建vulfocus靶场&复现log4j2漏洞
    【前端优化 & Vue项目优化】 如何避免浏览器卡顿,实现性能优化cdn?
    【校招VIP】专业课考点之死锁
    win11 无法登录微软账户 终极解决方案
    LINQ to SQL语句(10)之Insert
  • 原文地址:https://www.cnblogs.com/yunmeng-shi/p/16307049.html