mysql数据库连接池在系统中的应用场景:
操作步骤:
#ifndef _DB_H_
#define _DB_H_
#include
#include
#include
#include
#include
using namespace std;
//连接池有什么
//1. 获取一个可用连接
//2. 执行增删改查
//3. 释放连接
typedef map<string,string> Record;
typedef vector<Record> TableData;
class Dbconnection
{
public:
Dbconnection(){
}
virtual ~Dbconnection(){
}
//初始化连接
virtual int InitConnect(const string& ip,int port,const string& user,const string& passwd,const string& db) = 0;
//断开连接
virtual int DeInit() = 0;
//设置占用
virtual int Setflag(int flag) = 0;
//获取占用
virtual int Getflag() = 0;
//执行语句
virtual int ExecOperation(const string& str) = 0;
//查询数据
virtual int ExecQuery(const string& str,TableData& tableData) = 0;
//获取错误
virtual string GetLastError() = 0;
};
class Dbpool
{
public:
Dbpool(const string& strKey,int nSize,const string& ip,int port,const string& user,const string& passwd,const string& db){
m_strKey = strKey;
m_nSize = nSize;
m_ip = ip;
m_port = port;
m_user = user;
m_passwd = passwd;
m_db = db;
}
~Dbpool(){
}
//获取标识
string GetPoolKey(){
return m_strKey;
}
//创建一个连接池
int DbpoolAdd(Dbconnection* pConnection){
std::lock_guard<mutex> lk(m_mutex);
if(m_vecofConnections.size() >= m_nSize){
return -1;
}
m_vecofConnections.push_back(pConnection);
return 0;
}
//获取一个连接
Dbconnection* GetConnection() {
std::lock_guard<mutex> lk(m_mutex);
if(!m_vecofConnections.empty()){
for(auto& it: m_vecofConnections){
if(it->Setflag(1) == 0){
return it;
}
}
}
return NULL;
}
//释放连接
int FreeConnection(Dbconnection * pCt){
if(pCt != NULL){
pCt->Setflag(0);
}
}
//创建pool
virtual int CreatePool() = 0;
//销毁pool
virtual int DestoryPool() = 0;
protected:
string m_strKey;
string m_ip;
int m_port;
string m_user;
string m_passwd;
string m_db;
int m_nSize;
mutex m_mutex;
vector<Dbconnection*> m_vecofConnections;
};
#endif
#ifndef _DB_MANAGER_H_
#define _DB_MANAGER_H_
#include
#include
#include
#include
#include "db.h"
using namespace std;
class Dbpoolmanager
{
public:
Dbpoolmanager();
~Dbpoolmanager(){
m_mapOfpools.clear();
std::cout<<"Dbpoolmanager uninit"<<std::endl;
}
//注册一个连接池,以一个关键字串做值
int RegisterDbpool(Dbpool* pPool);
//根据关键字获取一个Dbpool连接
Dbpool* GetDbpool(const string& strKey);
private:
mutex m_mutex;
map<string,Dbpool*> m_mapOfpools;
};
#endif
dbpoolmanager.cpp
#include "dbpoolmanager.h"
Dbpoolmanager::Dbpoolmanager()
{
//m_mapOfpools.clear();
}
//注册一个连接池,以一个关键字串做值
int Dbpoolmanager::RegisterDbpool(Dbpool* pPool)
{
std::lock_guard<mutex> lk(m_mutex);
if(m_mapOfpools.find(pPool->GetPoolKey()) != m_mapOfpools.end()){
return -1;
}
m_mapOfpools[pPool->GetPoolKey()] = pPool;
return 0;
}
//根据关键字获取一个Dbpool连接
Dbpool* Dbpoolmanager::GetDbpool(const string& strKey)
{
std::lock_guard<mutex> lk(m_mutex);
return m_mapOfpools[strKey];
}
#ifndef _MYSQL_DB_H_
#define _MYSQL_DB_H_
#include
#include
#include
#include
#include "db.h"
using namespace std;
class MsqlDbconnection : public Dbconnection
{
public:
//初始化连接
virtual int InitConnect(const string& ip,int port,const string& user,const string& passwd,const string& db) override;
//断开连接
virtual int DeInit() override;
//设置占用
virtual int Setflag(int flag) override;
//获取占用
virtual int Getflag() override;
//执行语句
virtual int ExecOperation(const string& str) override;
//查询数据
virtual int ExecQuery(const string& str,TableData& tableData) override;
//获取错误
virtual string GetLastError() override;
private:
MYSQL* m_mYsqlPtr = NULL;
mutex m_mutex;
int m_nUseflag = 0;
};
class MsqlDbpool : public Dbpool
{
public:
MsqlDbpool(const string& strKey,int nSize,const string& ip,int port,const string& user,const string& passwd,const string& db)
: Dbpool(strKey,nSize,ip,port,user,passwd,db){
}
//创建pool
virtual int CreatePool() override;
//销毁pool
virtual int DestoryPool() override;
};
#endif
mysqldb.cpp
#include "mysqldb.h"
//初始化连接
int MsqlDbconnection::InitConnect(const string& ip,int port,const string& user,const string& passwd,const string& db)
{
std::cout<<ip<<","<<port<<","<<user<<","<<passwd<<","<<db<<std::endl;
try {
m_mYsqlPtr = mysql_init(NULL);
if (m_mYsqlPtr == NULL){
return -1;
}
unsigned int timeout = 2*1000;
int ret = mysql_options(m_mYsqlPtr, MYSQL_OPT_CONNECT_TIMEOUT, (const char *)&timeout);
if (!mysql_real_connect(m_mYsqlPtr, ip.c_str(), user.c_str(), passwd.c_str(),db.c_str(), port, NULL, 0)){
return -1;
}
if (mysql_set_character_set(m_mYsqlPtr,"utf8")){
return -1;
}
if (mysql_select_db(m_mYsqlPtr,db.c_str())){
return -1;
}
return 0;
}
catch (...){
return -1;
}
return -1;
}
//断开连接
int MsqlDbconnection::DeInit()
{
mysql_close(m_mYsqlPtr);
m_mYsqlPtr = NULL;
return 0;
}
//设置占用
int MsqlDbconnection::Setflag(int flag)
{
std::lock_guard<mutex> lk(m_mutex);
if(flag == 0){
m_nUseflag = 0;
}else{
//占用它,需要判断当前释放已经占用呢
if(m_nUseflag == 1){
return -1; //被其他占用了
}else{
//等于0,空闲时,可以占用
m_nUseflag = 1;
}
}
return 0;
}
//获取占用
int MsqlDbconnection::Getflag()
{
std::lock_guard<mutex> lk(m_mutex);
return m_nUseflag;
}
//执行语句
int MsqlDbconnection::ExecOperation(const string& str)
{
mysql_ping((MYSQL *)m_mYsqlPtr);
if (mysql_query((MYSQL *)m_mYsqlPtr, str.c_str())){
return 0;
}
return -1;
}
//查询数据
int MsqlDbconnection::ExecQuery(const string& str,TableData& tableData)
{
MYSQL_RES *result = NULL;
MYSQL_ROW row;
std::cout << str<<std::endl;
if (mysql_ping(m_mYsqlPtr) != 0){
return -1;
}
int iRes = mysql_query(m_mYsqlPtr, str.c_str());
if(iRes != 0){
return -1;
}
result = mysql_store_result(m_mYsqlPtr);
MYSQL_FIELD *fields = mysql_fetch_fields(result); //返回所有列w
int num = mysql_num_fields(result);
vector<string> m_vecfields;
for (int j = 0; j<num; j++){
//std::cout << fields[j].name<
m_vecfields.push_back(fields[j].name);
}
//开始遍历每行
while (row = mysql_fetch_row(result)){
Record temp;
for (int i = 0; i < m_vecfields.size(); i++){
temp[m_vecfields[i]] = string(row[i]);
}
tableData.push_back(temp);
}
mysql_free_result(result);
return 0;
}
//获取错误
string MsqlDbconnection::GetLastError()
{
if(m_mYsqlPtr != NULL){
return string(mysql_error(m_mYsqlPtr));
}
return string();
}
//创建pool中的各个连接
int MsqlDbpool::CreatePool()
{
//创建5个连接,加入到连接池
for(int i = 0; i < m_nSize; i ++){
MsqlDbconnection * pCt = new MsqlDbconnection();
if(pCt->InitConnect(m_ip.c_str(),m_port,m_user.c_str(),m_passwd.c_str(),m_db.c_str()) == 0){
std::cout <<"connect db ok,"<<pCt<<std::endl;
}else{
std::cout<<"connect db failed"<<std::endl;
std::cout<<pCt->GetLastError()<<std::endl;
}
DbpoolAdd(pCt);
}
return 0;
}
//销毁pool中的各个连接
int MsqlDbpool::DestoryPool()
{
//创建5个连接,加入到连接池
for(int i = 0; i < m_nSize; i ++){
MsqlDbconnection * pCt = (MsqlDbconnection *)m_vecofConnections[i];
pCt->DeInit();
delete pCt;
std::cout <<"delete:"<<pCt<<std::endl;
pCt = NULL;
}
return 0;
}
#include
#include
#include
#include
#include
#include "db.h"
#include "mysqldb.h"
#include "dbpoolmanager.h"
mutex gMutex;
queue<string> gData;
int InsertGroupAsRhread(Dbpoolmanager * pManager)
{
string value;
while (true){
std::unique_lock<std::mutex> lk(gMutex);
if(!gData.empty()){
value = gData.front();
gData.pop();
lk.unlock();
Dbpool* pDbpool = pManager->GetDbpool(string("group_as"));
Dbconnection* pct = pDbpool->GetConnection();
if(pct != NULL){
std::cout<<"current thread:"<<",the Connection is:"<<pct<<std::endl;
pct->ExecOperation(value);
pDbpool->FreeConnection(pct);
}else{
std::cout<<"empty"<<std::endl;
}
}else{
lk.unlock();
}
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
}
int processThread()
{
while(1){
std::unique_lock<mutex> lck(gMutex);
char data[100]= {0};
sprintf(data,"Insert into tbl_test(id,a,b) values(%d,1,2)",rand()%1000);
gData.push(string(data));
std::cout << "Insert into tbl_test(\"a\",\"b\") values(1,2)" <<std::endl;
lck.unlock();
std::this_thread::sleep_for(std::chrono::milliseconds(50));
}
}
//g++ * -std=c++11 -lmysqlclient -pthread -o test
int main(int argc,char * argv[])
{
//创建一个 poolmanager
Dbpoolmanager m_poolmanager;
//创建一个mysql连接池
MsqlDbpool m_dbpool1("group_as",5,"127.0.0.1",3306,"root","123456","group_as");
m_dbpool1.CreatePool();
//将它注册到管理,用于分库
m_poolmanager.RegisterDbpool(&m_dbpool1);
//创建5个处理线程
vector<std::thread> m_threads;
//1. 启动5个线程
for(int i = 0; i < 5; i ++){
m_threads.push_back( std::thread(InsertGroupAsRhread,&m_poolmanager));
}
#if 0
//测试占用连接
Dbpool* pDbpool = m_poolmanager.GetDbpool(string("group_as"));
Dbconnection* pct = pDbpool->GetConnection();
m_poolmanager.GetDbpool(string("group_as"));
pDbpool->GetConnection();
m_poolmanager.GetDbpool(string("group_as"));
pDbpool->GetConnection();
#endif
#if 1
//测试插入数据
m_threads.push_back(std::thread(processThread));
m_threads.push_back(std::thread(processThread));
m_threads.push_back(std::thread(processThread));
m_threads.push_back(std::thread(processThread));
m_threads.push_back(std::thread(processThread));
m_threads.push_back(std::thread(processThread));
#endif
#if 0
//测试读取数据
while(true){
TableData data;
Dbpool* pDbpool = m_poolmanager.GetDbpool("group_as");
Dbconnection* pct = pDbpool->GetConnection();
pct->ExecQuery("select * from tbl_test",data);
for(auto &it : data){
std::cout<<"Record:";
for(auto &it2: it){
std::cout<<it2.first<<"="<<it2.second;
}
std::cout<<std::endl;
}
std::this_thread::sleep_for(std::chrono::seconds(3));
}
#endif
std::cout << "start ok"<<std::endl;
for(auto & it: m_threads){
it.join();
}
m_dbpool1.DestoryPool();
return 0;
}