为什么需要池化技术?
池化技术能够减少资源对象的创建次数,提⾼程序的响应性能,特别是在⾼并发下这种提⾼更加明显。使用池化技术缓存的资源对象有如下共同特点:
对象创建时间长;
对象创建需要大量资源;
对象创建后可被重复使用像常见的线程池、内存池、连接池、对象池都具有以上的共同特点。
这里我们以数据库连接池举例,redis等连接池实现都是一样的道理。
定义:数据库连接池(Connection pooling)是程序启动时建立足够的数据库连接,并将这些连接组成一个连接池,由程序动态地对池中的连接进行申请,使用,释放。
1.TCP建立连接的三次握手(客户端与MySQL服务器的连接基于TCP协议)
2. MySQL认证的三次握手
3.真正的SQL执行
4.MySQL的关闭
5.TCP的四次握手关闭

可以看到,每执行⼀条SQL,需要进行TCP三次握手,Mysql认证、Mysql关闭、TCP四次挥手等其他操作,执行SQL操作在所有的操作占比非常低。
优点:实现简单
缺点:
可以看到:第⼀次访问的时候,需要建⽴连接。 但是之后的访问,均会复⽤之前创建的连接,直接执⾏SQL语句。
优点:
缺点:
从连接池获取或创建可⽤连接;
使⽤完毕之后,把连接返回给连接池;
在系统关闭前,断开所有连接并释放连接占⽤的系统资源;

代码文件在文末。
以mysql_pool 为例:
cd build
./test_Threadpool 4 4 1
这三个数字分别表示,线程数量,任务数量, 是否使用连接池。
连接池和线程池的区别:
线程池:主动调⽤任务。当任务队列不为空的时候从队列取任务取执⾏。
连接池:被动被任务使⽤。当某任务需要操作数据库时,只要从连接池中取出⼀个连接对象,当任务使 ⽤完该连接对象后,将该连接对象放回到连接池中。如果连接池中没有连接对象可以⽤,那么该任务就 必须等待。
线程池工作流程框架图:
我们可以看到有一个主循环:
我们可以想象一种情况,当前我们在处理网络相关信息,使用了epoll的方式,通过poll_wait()不断接收事件进行处理,对于一个事件我们有三种处理方式:
1.直接在主循环中处理;
2:直接将fd放入任务队列,从线程池中取出线程(工作线程)去处理。
3:在主循环中先接收数据,调用recv,因为这个recv过程并不耗时,可以接受在主循环中直接执行,再将得到的buffer放到任务队列中,让工作线程去处理。
下边是伪代码:
while(1){
int n = epoll_wait();
for(n){
#if //写法一 网络线程处理解析以及业务逻辑后直接发给客户端(单线程服务端)
recv(fd, buffer, length, 0);
parser();
send();
#elseif //写法二:网络线程把收到fd交给工作线程处理解析以及业务逻辑和发给客户端(多线程服务器)
//该模式有缺点:可能存在多个线程同时对一个fd进行操作!
//场景:同一个客户端短时间内发来多条请求,被分给了多个不同的线程处理,那么就出现多个线程同时对一个fd操作的情况。如果线程一个对fd写,另一个线程对fd进行close,就会引发错误
//因此需要特殊处理。处理方法:加入协程。每个协程处理一个IO。但是底层依然是依赖于epoll管理所有IO
task = fd;
push_tasks(task);
#else //写法三:网络线程解析完信息后,交给工作线程处理业务逻辑和发给客户端(多线程服务器)
recv(fd, buffer, length, 0);
push_task(buffer);
#endif
}
}
整体的工作就是,我们将一个个任务放到任务队列中,然后从线程池中不断取出线程去执行任务队列中的任务,这里我们需要思考以下几点:
1.如何将我们需要完成的事封装成一个任务,放入任务队列中。
2.若此时任务队列为空,线程池中的线程应该怎样处理。
从上图可以看出,一个线程池框架包含了三个主体部分。
工作线程部分:
typedef struct NWORKER {
pthread_t thread;
int terminate;
struct NWORKQUEUE *workqueue;
struct NWORKER *prev;
struct NWORKER *next;
} nWorker;
任务队列部分:
这里我们就可以解决上边的如何封装任务的疑问,一个任务包含了它对应的回调函数,一个参数(user_data),以及前后躯体指针。(单向链表也可以,双向更好操作)
typedef struct NJOB {
void (*job_function)(struct NJOB *job);
void *user_data;
struct NJOB *prev;
struct NJOB *next;
} nJob;
管理工作线程和任务队列的部分:
typedef struct NWORKQUEUE {
struct NWORKER *workers;
struct NJOB *waiting_jobs;
pthread_mutex_t jobs_mtx;
pthread_cond_t jobs_cond;
} nWorkQueue;
包含了工作线程队列,任务队列,以及一个互斥量和一个条件变量。
当一个线程尝试去任务队列中取任务,但任务队列为空时,可以看到,我们调用了pthread_cond_wait()对这个线程进行了条件变量阻塞,当有新的任务到达时,我们调用pthread_cond_signal(&workqueue->jobs_cond);从阻塞的线程队列中唤醒一个线程来执行工作。这样我们就解决了上边谈到的两个疑问。
static void *ntyWorkerThread(void *ptr) {
nWorker *worker = (nWorker*)ptr;
while (1) {
pthread_mutex_lock(&worker->workqueue->jobs_mtx);
while (worker->workqueue->waiting_jobs == NULL) {
if (worker->terminate) break;
pthread_cond_wait(&worker->workqueue->jobs_cond, &worker->workqueue->jobs_mtx);
}
nJob *job = worker->workqueue->waiting_jobs;
if (job != NULL) {
LL_REMOVE(job, worker->workqueue->waiting_jobs);
}
pthread_mutex_unlock(&worker->workqueue->jobs_mtx);
if (job == NULL) continue;//为什么这里需要判断,因为当有任务来临时,我们有可能唤醒所有睡眠的工作线程,所以有可能就算被唤醒也拿不到任务。
job->job_function(job);
}
free(worker);
pthread_exit(NULL);
}
通过工作线程队列,和任务队列来提高工作效率,而不是所有任务都在主循环中进行,极大提高工作效率
为什么要用内存池:
策略
内存池的整体架构就是这样,设计结构体如下struct mp_large_s { //对于大块内存
struct mp_large_s *next;
void *alloc;
};
对于小块内存(<4k)
struct mp_node_s {
unsigned char *last;
unsigned char *end;
struct mp_node_s *next;
size_t failed;
};
整个内存池的结构,对大块和小块的管理
struct mp_pool_s {
size_t max;
struct mp_node_s *current;//指向当前用到内存池的具体哪个块。
struct mp_large_s *large; //大块
struct mp_node_s head[0];
};
通过阅读代码你会明白:所有的这些结构体与将要分配出去的内存都在这个内存池的管理当中
struct mp_pool_s *mp_create_pool(size_t size) {
struct mp_pool_s *p;
int ret = posix_memalign((void **)&p, MP_ALIGNMENT, size + sizeof(struct mp_pool_s) + sizeof(struct mp_node_s));
p->max = (size < MP_MAX_ALLOC_FROM_POOL) ? size : MP_MAX_ALLOC_FROM_POOL;
p->current = p->head;
p->large = NULL;
p->head->last = (unsigned char *)p + sizeof(struct mp_pool_s) + sizeof(struct mp_node_s);
//p->head->last 后边表示可以分配的空间
p->head->end = p->head->last + size;
//p->head->end 是这一块能分配空间的最后位置的指针。
p->head->failed = 0;
//尝试次数,因为每一块在分配到最后的时候可能会出现碎片:
举例:一个4k的块还剩8byte可以分配,但我们需要一个12k的空间,显然不够,我们会重新开辟一个4k的空间挂在内存池对应的结点上,同时将p->current 指向还剩8byte空间的内存块,下一次继续判断它能否分配,当累计四次分配失败,就会放弃这块空间(这里对应的是8byte),然后将p->current指向下一个内存块。
return p;
}
总结:我们每次都是从p->current指向的内存块开始分配的,一开始这个p->current是4k的块,如果不够了就根据需要创建新块。这里需要理解一个细节点,就算有了新的内存块,我们也不一定下一次会从它开始分配,我们只认p->current,当这个p->current不满足条件时,再p = p->next。若一直循环到最后一块都不满足,才考虑新创建一个内存块(4k)挂到末尾。
当我们工作中接触到一个陌生服务,可通过htop查看内存状态,若发现虚拟内存持续上涨。
首先检查是否是自己写的内存池出现了内存泄漏问题,若是,在内存池中加入打印信息来排查问题。
若自己写的内存池没有问题,则去检查是否引用了第三方库,第三方库的内存池部分出现了内存泄漏问题。
这里大家可能会想,连接池和异步请求池有什么区别了,异步请求池最大的优势就在于它的异步执行。
首先我们要明白同步和异步的区别:
同步:简单点来说就是,当执行一条请求命令时,需要等到对方返回结果,程序才会继续往下执行,在等待对方返回结果时,程序会阻塞。(连接池就是这样的方式)
异步:当执行一条请求命令时,不必等待对方返回结果,而是继续往下执行,当对方返回结果时,系统再通知我们。(异步请求池就是这样的方式)
那么怎样才能实现异步的效果呢?
我们首先想到epoll。
我们先来理一下异步请求池的架构。
包含四元组:
这上边的四元组最主要的实现异步的过程是commit过程,在建立好网络连接后,用你自己的协议进行封装,然后用send发送到对方服务器(当然,要实现异步,这里的文件描述符(fd)必须设置成非阻塞的),然后将这个fd对应的事件设置成EPOLLIN,使用epoll_ctl加入到内核管理,使用epoll_wait循环检测,当产生可读事件时,通知我们进行相应处理。
通俗点说就是send后,将fd交由epoll进行管理,从而实现异步的目的。
在callback线程中,死循环中epoll_wait检测到EPOLLIN可读事件,然后调用(recv/recvfrom)和callback函数处理请求返回的response事件。
static void* callback(void *arg) {
struct async_context *ctx = (struct async_context*)arg;
int epfd = ctx->epfd;
while (1) {
struct epoll_event events[ASYNC_CLIENT_NUM] = {0};
int nready = epoll_wait(epfd, events, ASYNC_CLIENT_NUM, -1);
if (nready < 0) {
if (errno == EINTR || errno == EAGAIN) {
continue;
} else {
break;
}
} else if (nready == 0) {
continue;
}
printf("nready:%d\n", nready);
int i = 0;
for (i = 0;i < nready;i ++) {
struct ep_arg *data = (struct ep_arg*)events[i].data.ptr;
int sockfd = data->sockfd;
char buffer[1024] = {0};
struct sockaddr_in addr;
size_t addr_len = sizeof(struct sockaddr_in);
int n = recvfrom(sockfd, buffer, sizeof(buffer), 0, (struct sockaddr*)&addr, (socklen_t*)&addr_len);
struct dns_item *domain_list = NULL;
int count = dns_parse_response(buffer, &domain_list);
data->cb(domain_list, count); //call cb
int ret = epoll_ctl(epfd, EPOLL_CTL_DEL, sockfd, NULL);
//printf("epoll_ctl DEL --> sockfd:%d\n", sockfd);
close(sockfd); /
dns_async_client_free_domains(domain_list, count);
free(data);
}
}
}
不阻塞send,只管发,然后将fd交由epoll管理,这里有个细节,我们再发了之后,再epoll_wait接收到对应事件进行处理时,可能需要关闭对应fd,那么问题来了, 如果没有成功发送到对面,或者,对方返回结果时失败了(网络等原因),那么我们就接收不到对应的读事件,那么不就没办法关闭fd了,等后边需要用的时候就会出现问题,所以我们可以给fd设置一个定时器。实现超时重传的效果。
链接:https://pan.baidu.com/s/14nImn-E65h4Ey9Rdhxzwbg?pwd=m602
提取码:m602