网络编程离不开socket,学一些不常用的知识点
设置和读取socket的配置可以通过下面两个接口
#include /* See NOTES */
#include
int getsockopt(int sockfd, int level, int optname,void *optval, socklen_t *optlen);
int setsockopt(int sockfd, int level, int optname, const void *optval, socklen_t optlen);
sock:将要被设置或者获取选项的套接字。
level:选项所在的协议层。 level指定控制套接字的层次.可以取三种值:
SOL_SOCKET:通用套接字选项.
IPPROTO_IP:IP选项.
IPPROTO_TCP:TCP选项.
optname:需要访问的选项名。(下文中会有一个表)
optval:值,对于getsockopt(),指向返回选项值的缓冲;对于setsockopt(),指向包含新选项值的缓冲。
optlen:值长度,对于getsockopt(),作为入口参数时,选项值的最大长度;作为出口参数时,选项值的实际长度。对于setsockopt(),现选项的长度。
返回值: 成功执行时,返回0。失败返回-1,errno被设为以下的某个值 :
EBADF:sock不是有效的文件描述词
EFAULT:optval指向的内存并非有效的进程空间
EINVAL:在调用 setsockopt()时,optlen无效
ENOPROTOOPT:指定的协议层不能识别选项 ENOTSOCK:sock描述的不是套接字
optname 参数详细说明:
SOL_SOCKET
选项名称 | 说明 | 数据类型 |
---|---|---|
SO_BROADCAST | 允许发送广播数据 | int |
SO_DEBUG | 允许调试 | int |
SO_DONTROUTE | 不查找路由 | int |
SO_ERROR | 获得套接字错误 | int |
SO_KEEPALIVE | 保持连接 | int |
SO_LINGER | 延迟关闭连接 | struct linger |
SO_OOBINLINE | 带外数据放入正常数据流 | int |
SO_RCVBUF | 接收缓冲区大小 | int |
SO_SNDBUF | 发送缓冲区大小 | int |
SO_RCVLOWAT | 接收缓冲区下限 | int |
SO_SNDLOWAT | 发送缓冲区下限 | int |
SO_RCVTIMEO | 接收超时 | struct timeval |
SO_SNDTIMEO | 发送超时 | struct timeval |
SO_REUSERADDR | 允许重用本地地址和端口 | int |
SO_TYPE | 获得套接字类型 | int |
SO_BSDCOMPAT | 与BSD系统兼容 | int |
IPPROTO_IP
选项名称 | 说明 | 数据类型 |
---|---|---|
IP_HDRINCL | 在数据包中包含IP首部 | int |
IP_OPTINOS | IP首部选项 | int |
IP_TOS | 服务类型 | int |
IP_TTL | 生存时间 | int |
IPPRO_TCP
选项名称 | 说明 | 数据类型 |
---|---|---|
TCP_MAXSEG | TCP最大数据段的大小 | int |
TCP_NODELAY | 不使用Nagle算法 | int |
这些都是摘抄的
还是来看几个用法吧。
接收和发送数据都可以设置超时时间
方法
int set_sock_time(int fd, int read_sec, int write_sec)
{
struct timeval send_timeval;
struct timeval recv_timeval;
if(fd <= 0)
return -1;
send_timeval.tv_sec = write_sec<0?0:write_sec;
send_timeval.tv_usec = 0;
recv_timeval.tv_sec = read_sec<0?0:read_sec;;
recv_timeval.tv_usec = 0;
if(setsockopt(fd, SOL_SOCKET, SO_SNDTIMEO, &send_timeval, sizeof(send_timeval)) == -1)
{
return -1;
}
if(setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, &recv_timeval, sizeof(recv_timeval)) == -1)
{
return -1;
}
return 0;
}
接受和发送的缓冲区大小可以设置,在某些情况下可以减轻资源损耗
方法
int set_sock_bufsize(int fd,int read_size, int write_size)
{
int nRecvBuf=read_size; //设置为32K
int nSendBuf=write_size; //设置为32K
// 接收缓冲区
if(setsockopt(fd,SOL_SOCKET,SO_RCVBUF,(const char*)&nRecvBuf,sizeof(int))== -1)
{
return -1;
}
//发送缓冲区
if(setsockopt(fd,SOL_SOCKET,SO_SNDBUF,(const char*)&nSendBuf,sizeof(int))== -1)
{
return -1;
}
return 0;
}
这里用到了下面的结构体数据
/* Structure used to manipulate the SO_LINGER option. */
struct linger
{
int l_onoff; /* Nonzero to linger on close. */
int l_linger; /* Time to linger. */
};
根据linger结构体中两个成员变量的不同值,close系统调用可能产生如下3种行为之一:
(1)l_onoff等于0(关闭)。此时SO_LINGER选项不起作用,close用默认行为来关闭socket。如果send buffer中没有数据,close立即返回,如果send buffer中还有数据,close将会等到所有数据被发送完之后之后返回(相当于通信文件描述符是阻塞的)。由于我们并没有等待对方TCP发送的ACK信息,所以我们只能保证数据已经发送到对方,我们并不知道对方是否已经接受了数据。由于此种情况,TCP连接终止是按照正常的4次握手方式,需要经过TIME_WAIT。
(2)l_onoff不为0(开启),l_linger等于0。无论发送缓存区是否有数据,close系统调用立即返回,TCP模块将丢弃被关闭的socket对应的TCP发送缓冲区中残留的数据,同时给对方发送一个复位报文段(RST)。因此,这种情况给服务器提供了异常终止一个连接的方法。
(3)l_onoff不为0(开启),l_linger不等于0,实现优雅关闭的允许时间。如果套接口缓冲区中仍残留数据,进程将处于睡眠状态,直到所有数据发送完且被对方确认,之后进行正常的终止序列(描述字访问计数为0)或延迟时间l_linger到。此种情况下,应用程序检查close的返回值是非常重要的,如果在数据发送完并被确认之前l_linger时间到,close将返回EWOULDBLOCK错误且套接口发送缓冲区中的任何数据都丢失。close的成功返回仅告诉我们发送的数据(和FIN)已由对方TCP确认,它并不能告诉我们对方应用进程是否已读了数据。如果套接口设为非阻塞的,程序将不会等待close返回,send buffer中的所有数据都将会被丢弃,并将以WSAEWOULDBLOCK错误返回。
int set_sock_closelater(int fd,int onoff,int linger)
{
struct linger linger_setvalue;
linger_setvalue.l_onoff = onoff;
linger_setvalue.l_linger = linger;
if(setsockopt(fd, SOL_SOCKET, SO_LINGER, (char *)&linger_setvalue, sizeof(linger_setvalue))== -1)
{
return -1;
}
return 0;
}
以前写服务端,在bind函数调用时候经常会遇到下面的报错提示
Address already in use
我们可以看到。在不成功的时候,端口监听状态如下
[root@localhost ~]# netstat -tna | grep 80
tcp 0 0 192.168.32.94:80 192.168.31.2:1061 TIME_WAIT
一般来说,一个端口释放后会等待两分钟之后才能再被使用,TCP先调用close()的一方会进入TIME_WAIT状态。
SO_REUSEADDR是让端口释放后立即就可以被再次使用。
SO_REUSEADDR用于对TCP套接字处于TIME_WAIT状态下的socket,才可以重复绑定使用。server程序总是应该在调用bind()之前设置SO_REUSEADDR套接字选项。
方法
int set_sock_reuse(int fd)
{
int on=1;
if((setsockopt(fd,SOL_SOCKET,SO_REUSEADDR,&on,sizeof(on)))<0)
{
return -1;
}
return 0;
}
但是这个方法网上都不推荐,学习一下就行了
/* Set TCP keep alive option to detect dead peers. The interval option
* is only used for Linux as we are using Linux-specific APIs to set
* the probe send time, interval, and count. */
int anetKeepAlive(char *err, int fd, int interval)
{
int val = 1;
//开启keepalive机制
if (setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &val, sizeof(val)) == -1)
{
anetSetError(err, "setsockopt SO_KEEPALIVE: %s", strerror(errno));
return ANET_ERR;
}
#ifdef __linux__
/* Default settings are more or less garbage, with the keepalive time
* set to 7200 by default on Linux. Modify settings to make the feature
* actually useful. */
/* Send first probe after interval. */
val = interval;
if (setsockopt(fd, IPPROTO_TCP, TCP_KEEPIDLE, &val, sizeof(val)) < 0) {
anetSetError(err, "setsockopt TCP_KEEPIDLE: %s\n", strerror(errno));
return ANET_ERR;
}
/* Send next probes after the specified interval. Note that we set the
* delay as interval / 3, as we send three probes before detecting
* an error (see the next setsockopt call). */
val = interval/3;
if (val == 0) val = 1;
if (setsockopt(fd, IPPROTO_TCP, TCP_KEEPINTVL, &val, sizeof(val)) < 0) {
anetSetError(err, "setsockopt TCP_KEEPINTVL: %s\n", strerror(errno));
return ANET_ERR;
}
/* Consider the socket in error state after three we send three ACK
* probes without getting a reply. */
val = 3;
if (setsockopt(fd, IPPROTO_TCP, TCP_KEEPCNT, &val, sizeof(val)) < 0) {
anetSetError(err, "setsockopt TCP_KEEPCNT: %s\n", strerror(errno));
return ANET_ERR;
}
#endif
return ANET_OK;
}
socket编程的多线程,是在接收处理客户端消息的时候,采用子线程进行单独处理,并且线程分离,处理结束后,自动释放自己的资源。
服务端代码
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#define BACKLOG 13
#define SERVER_PORT 80
static int set_sock_time(int fd, int read_sec, int write_sec)
{
struct timeval send_timeval;
struct timeval recv_timeval;
if(fd <= 0)
{
printf("[ERROR]socket参数错误");
return -1;
}
send_timeval.tv_sec = write_sec<0?0:write_sec;
send_timeval.tv_usec = 0;
recv_timeval.tv_sec = read_sec<0?0:read_sec;;
recv_timeval.tv_usec = 0;
if(setsockopt(fd, SOL_SOCKET, SO_SNDTIMEO, &send_timeval, sizeof(send_timeval)) == -1)
{
printf("[ERROR]socket配置发送超时失败[%s]",strerror(errno));
return -1;
}
if(setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, &recv_timeval, sizeof(recv_timeval)) == -1)
{
printf("[ERROR]socket配置接收超时失败[%s]",strerror(errno));
return -1;
}
return 0;
}
static int set_sock_bufsize(int fd,int read_size, int write_size)
{
int nRecvBuf=read_size; //设置为32K
int nSendBuf=write_size; //设置为32K
// 接收缓冲区
if(setsockopt(fd,SOL_SOCKET,SO_RCVBUF,(const char*)&nRecvBuf,sizeof(int))== -1)
{
printf("[ERROR]socket配置接收缓冲区大小失败[%s]",strerror(errno));
return -1;
}
//发送缓冲区
if(setsockopt(fd,SOL_SOCKET,SO_SNDBUF,(const char*)&nSendBuf,sizeof(int))== -1)
{
printf("[ERROR]socket配置发送缓冲区大小失败[%s]",strerror(errno));
return -1;
}
return 0;
}
static int set_sock_closelater(int fd,int onoff,int linger)
{
struct linger linger_setvalue;
linger_setvalue.l_onoff = onoff;
linger_setvalue.l_linger = linger;
if(setsockopt(fd, SOL_SOCKET, SO_LINGER, (char *)&linger_setvalue, sizeof(linger_setvalue))== -1)
{
printf("[ERROR]socket配置延迟关闭失败[%s]",strerror(errno));
return -1;
}
return 0;
}
static int set_sock_reuse(int fd)
{
int on=1;
if((setsockopt(fd,SOL_SOCKET,SO_REUSEADDR,&on,sizeof(on)))<0)
{
printf("[ERROR]socket配置重复使用失败[%s]",strerror(errno));
return -1;
}
return 0;
}
static int socket_init(char *listen_ip,int listen_port)
{
int listenfd;
struct sockaddr_in servaddr;
if((listenfd=socket(AF_INET,SOCK_STREAM,0))<0)
{
printf("[ERROR]服务端创建TCPsocket失败[%s]\n",strerror(errno));
return -1;
}
printf("服务端创建TCPsocket[%d]成功\n",listenfd);
if(set_sock_reuse(listenfd)<0)
{
return -2;
}
memset(&servaddr,0,sizeof(servaddr));
servaddr.sin_family=AF_INET;
servaddr.sin_port=htons(listen_port);
if(!listen_ip)
{
servaddr.sin_addr.s_addr=htonl(INADDR_ANY);
}
else
{
servaddr.sin_addr.s_addr=inet_addr(listen_ip);
}
if(bind(listenfd,(struct sockaddr *)&servaddr,sizeof(servaddr))<0)
{
printf("[ERROR]socket[%d]绑定端口[%d]失败[%s]\n",listenfd,listen_port,strerror(errno));
return -2;
}
printf("socket[%d]绑定端口[%d]成功\n",listenfd,listen_port);
listen(listenfd,BACKLOG);
printf("开始监听端口[%d]\n",listen_port);
return listenfd;
}
static int thread_init(pthread_t *thread_id,void *(*start_routine) (void *),void *arg)/*start_route是一个函数指针,指向返回值为void *类型,参数也是void类型的函数*/
{
pthread_attr_t thread_attr;
int rv = -1;
if( pthread_attr_init(&thread_attr) )/*设置线程属性*/
{
printf("[ERROR]设置线程属性失败[%s]\n", strerror(errno));
return -1;;
}
if( pthread_attr_setstacksize(&thread_attr, 120*1024) )
{
printf("[ERROR]设置栈大小失败[%s]\n", strerror(errno));
return -2;
}
if( pthread_attr_setdetachstate(&thread_attr, PTHREAD_CREATE_DETACHED) )
{
printf("[ERROR]设置分离属性失败[%s]\n", strerror(errno));
return -3;
}
if( pthread_create(thread_id, &thread_attr, start_routine, arg) )/*创建子线程并执行start_routine函数*/
{
printf("[ERROR]创建线程失败[%s]\n", strerror(errno));
return -4;
}
return 0;
}
static void *thread_worker(void *ctx)/*子线程处理和客户端数据交换的处理函数*/
{
int cli_fd;
int rv;
char buf[1024];
int i;
if( !ctx )
{
printf("[ERROR]客户端socket无效,线程退出\n");
pthread_exit(NULL);
}
cli_fd = *(int *)ctx;/*强制类型转换成int *类型的并把ctx的值赋值给cli_fd*/
printf("子线程开始通信\n");
while(1)
{
memset(buf, 0, sizeof(buf));
rv=read(cli_fd, buf, sizeof(buf));
if( rv < 0)
{
printf("[ERROR]读取客户端[%d]数据失败[%s],线程退出\n",cli_fd,strerror(errno));
close(cli_fd);
pthread_exit(NULL);
}
else if( rv == 0)
{
printf("客户端[%d]连接结束 ,线程退出\n",cli_fd);
close(cli_fd);
pthread_exit(NULL);
}
else if( rv > 0 )
{
printf("从客户端[%d]读取[%d]字节内容[%s]\n",cli_fd, rv, buf);
}
/* convert letter from lowercase to uppercase */
for(i=0; i<rv; i++)/*收到client发送的数据后把它转换成大写字母并发送给客户端*/
{
buf[i]=toupper(buf[i]);
}
rv=write(cli_fd, buf, rv);
if(rv < 0)
{
printf("[ERROR]客户端[%d]应答失败 ,线程退出\n", cli_fd,strerror(errno));
close(cli_fd);
pthread_exit(NULL);
}
}
}
int main(int argc,char **argv)
{
int listen_fd;
int clifd=-1;
int rv=-2;
int opt;
struct sockaddr_in cliaddr;
socklen_t cliaddr_len =sizeof(cliaddr) ;
pthread_t tid;
if((listen_fd=socket_init(NULL,SERVER_PORT))<0)/*socket初始化返回一个listenfd*/
{
printf("socket初始化失败:%s",strerror(errno));
return -2;
}
while(1)
{
printf("开始等待新连接……\n");
clifd=accept(listen_fd, (struct sockaddr *)&cliaddr, &cliaddr_len);/*主线程接收新的客户端的连接*/
if(clifd < 0)
{
printf("连接新客户端失败: %s\n", strerror(errno));
continue;
}
printf("新客户端接入[%s:%d]成功\n", inet_ntoa(cliaddr.sin_addr),ntohs(cliaddr.sin_port));
thread_init(&tid, thread_worker, &clifd);/*创建子线程并让子线程和client进行数据的收发*/
}
}
这里客户端用了一个工具
[root@localhost test]# ./socket_pthread
服务端创建TCPsocket[3]成功
socket[3]绑定端口[80]成功
开始监听端口[80]
开始等待新连接……
新客户端接入[192.168.31.2:5784]成功
开始等待新连接……
子线程开始通信
新客户端接入[192.168.31.2:5785]成功
开始等待新连接……
子线程开始通信
新客户端接入[192.168.31.2:5786]成功
开始等待新连接……
子线程开始通信
从客户端[4]读取[4]字节内容[2345]
从客户端[6]读取[4]字节内容[2345]
从客户端[5]读取[4]字节内容[2345]
从客户端[4]读取[4]字节内容[2345]
从客户端[6]读取[4]字节内容[2345]
从客户端[5]读取[4]字节内容[2345]
客户端[6]连接结束 ,线程退出
客户端[5]连接结束 ,线程退出
客户端[4]连接结束 ,线程退出
都说线程池高级,那不妨用一下结合起来。
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#define BACKLOG 13
#define SERVER_PORT 80
// 任务结构体
typedef struct Task
{
void (*function)(void* arg);
void* arg;
}Task;
// 线程池结构体
struct ThreadPool
{
// 任务队列
Task* taskQ;
int queueCapacity; // 容量
int queueSize; // 当前任务个数
int queueFront; // 队头 -> 取数据
int queueRear; // 队尾 -> 放数据
pthread_t managerID; // 管理者线程ID
pthread_t *threadIDs; // 工作的线程ID
int minNum; // 最小线程数量
int maxNum; // 最大线程数量
int busyNum; // 忙的线程的个数
int liveNum; // 存活的线程的个数
int exitNum; // 要销毁的线程个数
pthread_mutex_t mutexPool; // 锁整个的线程池
pthread_cond_t notFull; // 任务队列是不是满了
pthread_cond_t notEmpty; // 任务队列是不是空了
pthread_mutex_t mutexBusy; // 锁busyNum变量
int shutdown; // 是不是要销毁线程池, 销毁为1, 不销毁为0
};
typedef struct ThreadPool ThreadPool;
void threadExit(ThreadPool* pool)
{
pthread_t tid = pthread_self();
for (int i = 0; i < pool->maxNum; ++i)
{
if (pool->threadIDs[i] == tid)
{
pool->threadIDs[i] = 0;
printf("threadExit() called, %ld exiting...\n", tid);
break;
}
}
pthread_exit(NULL);
}
void* worker(void* arg)
{
ThreadPool* pool = (ThreadPool*)arg;
while (1)
{
pthread_mutex_lock(&pool->mutexPool);
// 当前任务队列是否为空
while (pool->queueSize == 0 && !pool->shutdown)
{
// 阻塞工作线程
pthread_cond_wait(&pool->notEmpty, &pool->mutexPool);
// 判断是不是要销毁线程
if (pool->exitNum > 0)
{
pool->exitNum--;
if (pool->liveNum > pool->minNum)
{
pool->liveNum--;
pthread_mutex_unlock(&pool->mutexPool);
threadExit(pool);
}
}
}
// 判断线程池是否被关闭了
if (pool->shutdown)
{
pthread_mutex_unlock(&pool->mutexPool);
threadExit(pool);
}
// 从任务队列中取出一个任务
Task task;
task.function = pool->taskQ[pool->queueFront].function;
task.arg = pool->taskQ[pool->queueFront].arg;
// 移动头结点
pool->queueFront = (pool->queueFront + 1) % pool->queueCapacity;
pool->queueSize--;
// 解锁
pthread_cond_signal(&pool->notFull);
pthread_mutex_unlock(&pool->mutexPool);
printf("thread %ld start working...\n", pthread_self());
pthread_mutex_lock(&pool->mutexBusy);
pool->busyNum++;
pthread_mutex_unlock(&pool->mutexBusy);
task.function(task.arg);
//free(task.arg);
task.arg = NULL;
printf("thread %ld end working...\n", pthread_self());
pthread_mutex_lock(&pool->mutexBusy);
pool->busyNum--;
pthread_mutex_unlock(&pool->mutexBusy);
}
return NULL;
}
void* manager(void* arg)
{
ThreadPool* pool = (ThreadPool*)arg;
while (!pool->shutdown)
{
// 每隔3s检测一次
sleep(3);
// 取出线程池中任务的数量和当前线程的数量
pthread_mutex_lock(&pool->mutexPool);
int queueSize = pool->queueSize;
int liveNum = pool->liveNum;
pthread_mutex_unlock(&pool->mutexPool);
// 取出忙的线程的数量
pthread_mutex_lock(&pool->mutexBusy);
int busyNum = pool->busyNum;
pthread_mutex_unlock(&pool->mutexBusy);
const int NUMBER = 2;
// 添加线程
// 任务的个数>存活的线程个数 && 存活的线程数<最大线程数
if (queueSize > liveNum && liveNum < pool->maxNum)
{
pthread_mutex_lock(&pool->mutexPool);
int counter = 0;
for (int i = 0; i < pool->maxNum && counter < NUMBER
&& pool->liveNum < pool->maxNum; ++i)
{
if (pool->threadIDs[i] == 0)
{
pthread_create(&pool->threadIDs[i], NULL, worker, pool);
counter++;
pool->liveNum++;
}
}
pthread_mutex_unlock(&pool->mutexPool);
}
// 销毁线程
// 忙的线程*2 < 存活的线程数 && 存活的线程>最小线程数
if (busyNum * 2 < liveNum && liveNum > pool->minNum)
{
pthread_mutex_lock(&pool->mutexPool);
pool->exitNum = NUMBER;
pthread_mutex_unlock(&pool->mutexPool);
// 让工作的线程自杀
for (int i = 0; i < NUMBER; ++i)
{
pthread_cond_signal(&pool->notEmpty);
}
}
}
return NULL;
}
//min 最少
//max 最多线程数
//queueSize 任务队列大小
ThreadPool* threadPoolCreate(int min, int max, int queueSize)
{
ThreadPool* pool = (ThreadPool*)malloc(sizeof(ThreadPool));
do
{
if (pool == NULL)
{
printf("malloc threadpool fail...\n");
break;
}
//申请max个phtread_t 线程id
pool->threadIDs = (pthread_t*)malloc(sizeof(pthread_t) * max);
if (pool->threadIDs == NULL)
{
printf("malloc threadIDs fail...\n");
break;
}
memset(pool->threadIDs, 0, sizeof(pthread_t) * max);
pool->minNum = min;
pool->maxNum = max;
pool->busyNum = 0; //在忙的线程
pool->liveNum = min; //和最小个数相等
pool->exitNum = 0; //需要销毁的
//初始化锁和条件变量
if (pthread_mutex_init(&pool->mutexPool, NULL) != 0 ||
pthread_mutex_init(&pool->mutexBusy, NULL) != 0 ||
pthread_cond_init(&pool->notEmpty, NULL) != 0 ||
pthread_cond_init(&pool->notFull, NULL) != 0)
{
printf("mutex or condition init fail...\n");
break;
}
// 任务队列
pool->taskQ = (Task*)malloc(sizeof(Task) * queueSize);
pool->queueCapacity = queueSize;
pool->queueSize = 0; //当前任务数
pool->queueFront = 0; //队头
pool->queueRear = 0; //队尾
pool->shutdown = 0; //线程池启动销毁
// 创建管理线程
pthread_create(&pool->managerID, NULL, manager, pool);
// 创建工作线程,先创建min个线程
for (int i = 0; i < min; ++i)
{
pthread_create(&pool->threadIDs[i], NULL, worker, pool);
}
return pool;
} while (0);
// 释放资源
if (pool && pool->threadIDs) free(pool->threadIDs);
if (pool && pool->taskQ) free(pool->taskQ);
if (pool) free(pool);
return NULL;
}
int threadPoolDestroy(ThreadPool* pool)
{
if (pool == NULL)
{
return -1;
}
// 关闭线程池
pool->shutdown = 1;
// 阻塞回收管理者线程
pthread_join(pool->managerID, NULL);
// 唤醒阻塞的消费者线程
for (int i = 0; i < pool->liveNum; ++i)
{
pthread_cond_signal(&pool->notEmpty);
}
// 释放堆内存
if (pool->taskQ)
{
free(pool->taskQ);
}
if (pool->threadIDs)
{
free(pool->threadIDs);
}
pthread_mutex_destroy(&pool->mutexPool);
pthread_mutex_destroy(&pool->mutexBusy);
pthread_cond_destroy(&pool->notEmpty);
pthread_cond_destroy(&pool->notFull);
free(pool);
pool = NULL;
return 0;
}
void threadPoolAdd(ThreadPool* pool, void(*func)(void*), void* arg)
{
pthread_mutex_lock(&pool->mutexPool);
while (pool->queueSize == pool->queueCapacity && !pool->shutdown)
{
// 阻塞生产者线程
pthread_cond_wait(&pool->notFull, &pool->mutexPool);
}
if (pool->shutdown)
{
pthread_mutex_unlock(&pool->mutexPool);
return;
}
// 添加任务
pool->taskQ[pool->queueRear].function = func;
pool->taskQ[pool->queueRear].arg = arg;
pool->queueRear = (pool->queueRear + 1) % pool->queueCapacity;
pool->queueSize++;
pthread_cond_signal(&pool->notEmpty);
pthread_mutex_unlock(&pool->mutexPool);
}
int threadPoolBusyNum(ThreadPool* pool)
{
pthread_mutex_lock(&pool->mutexBusy);
int busyNum = pool->busyNum;
pthread_mutex_unlock(&pool->mutexBusy);
return busyNum;
}
int threadPoolAliveNum(ThreadPool* pool)
{
pthread_mutex_lock(&pool->mutexPool);
int aliveNum = pool->liveNum;
pthread_mutex_unlock(&pool->mutexPool);
return aliveNum;
}
static int set_sock_time(int fd, int read_sec, int write_sec)
{
struct timeval send_timeval;
struct timeval recv_timeval;
if(fd <= 0)
{
printf("[ERROR]socket参数错误");
return -1;
}
send_timeval.tv_sec = write_sec<0?0:write_sec;
send_timeval.tv_usec = 0;
recv_timeval.tv_sec = read_sec<0?0:read_sec;;
recv_timeval.tv_usec = 0;
if(setsockopt(fd, SOL_SOCKET, SO_SNDTIMEO, &send_timeval, sizeof(send_timeval)) == -1)
{
printf("[ERROR]socket配置发送超时失败[%s]",strerror(errno));
return -1;
}
if(setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, &recv_timeval, sizeof(recv_timeval)) == -1)
{
printf("[ERROR]socket配置接收超时失败[%s]",strerror(errno));
return -1;
}
return 0;
}
static int set_sock_bufsize(int fd,int read_size, int write_size)
{
int nRecvBuf=read_size; //设置为32K
int nSendBuf=write_size; //设置为32K
// 接收缓冲区
if(setsockopt(fd,SOL_SOCKET,SO_RCVBUF,(const char*)&nRecvBuf,sizeof(int))== -1)
{
printf("[ERROR]socket配置接收缓冲区大小失败[%s]",strerror(errno));
return -1;
}
//发送缓冲区
if(setsockopt(fd,SOL_SOCKET,SO_SNDBUF,(const char*)&nSendBuf,sizeof(int))== -1)
{
printf("[ERROR]socket配置发送缓冲区大小失败[%s]",strerror(errno));
return -1;
}
return 0;
}
static int set_sock_closelater(int fd,int onoff,int linger)
{
struct linger linger_setvalue;
linger_setvalue.l_onoff = onoff;
linger_setvalue.l_linger = linger;
if(setsockopt(fd, SOL_SOCKET, SO_LINGER, (char *)&linger_setvalue, sizeof(linger_setvalue))== -1)
{
printf("[ERROR]socket配置延迟关闭失败[%s]",strerror(errno));
return -1;
}
return 0;
}
static int set_sock_reuse(int fd)
{
int on=1;
if((setsockopt(fd,SOL_SOCKET,SO_REUSEADDR,&on,sizeof(on)))<0)
{
printf("[ERROR]socket配置重复使用失败[%s]",strerror(errno));
return -1;
}
return 0;
}
static int socket_init(char *listen_ip,int listen_port)
{
int listenfd;
struct sockaddr_in servaddr;
if((listenfd=socket(AF_INET,SOCK_STREAM,0))<0)
{
printf("[ERROR]服务端创建TCPsocket失败[%s]\n",strerror(errno));
return -1;
}
printf("服务端创建TCPsocket[%d]成功\n",listenfd);
if(set_sock_reuse(listenfd)<0)
{
return -2;
}
memset(&servaddr,0,sizeof(servaddr));
servaddr.sin_family=AF_INET;
servaddr.sin_port=htons(listen_port);
if(!listen_ip)
{
servaddr.sin_addr.s_addr=htonl(INADDR_ANY);
}
else
{
servaddr.sin_addr.s_addr=inet_addr(listen_ip);
}
if(bind(listenfd,(struct sockaddr *)&servaddr,sizeof(servaddr))<0)
{
printf("[ERROR]socket[%d]绑定端口[%d]失败[%s]\n",listenfd,listen_port,strerror(errno));
return -2;
}
printf("socket[%d]绑定端口[%d]成功\n",listenfd,listen_port);
listen(listenfd,BACKLOG);
printf("开始监听端口[%d]\n",listen_port);
return listenfd;
}
static int thread_init(pthread_t *thread_id,void *(*start_routine) (void *),void *arg)/*start_route是一个函数指针,指向返回值为void *类型,参数也是void类型的函数*/
{
pthread_attr_t thread_attr;
int rv = -1;
if( pthread_attr_init(&thread_attr) )/*设置线程属性*/
{
printf("[ERROR]设置线程属性失败[%s]\n", strerror(errno));
return -1;;
}
if( pthread_attr_setstacksize(&thread_attr, 120*1024) )
{
printf("[ERROR]设置栈大小失败[%s]\n", strerror(errno));
return -2;
}
if( pthread_attr_setdetachstate(&thread_attr, PTHREAD_CREATE_DETACHED) )
{
printf("[ERROR]设置分离属性失败[%s]\n", strerror(errno));
return -3;
}
if( pthread_create(thread_id, &thread_attr, start_routine, arg) )/*创建子线程并执行start_routine函数*/
{
printf("[ERROR]创建线程失败[%s]\n", strerror(errno));
return -4;
}
return 0;
}
static void *thread_worker(void *ctx)/*子线程处理和客户端数据交换的处理函数*/
{
int cli_fd;
int rv;
char buf[1024];
int i;
if( !ctx )
{
printf("[ERROR]客户端socket无效,线程退出\n");
pthread_exit(NULL);
}
cli_fd = *(int *)ctx;/*强制类型转换成int *类型的并把ctx的值赋值给cli_fd*/
printf("子线程开始通信\n");
while(1)
{
memset(buf, 0, sizeof(buf));
rv=read(cli_fd, buf, sizeof(buf));
if( rv < 0)
{
printf("[ERROR]读取客户端[%d]数据失败[%s],线程退出\n",cli_fd,strerror(errno));
close(cli_fd);
//pthread_exit(NULL);
break;
}
else if(rv == 0)
{
printf("客户端[%d]连接结束 ,线程退出\n",cli_fd);
close(cli_fd);
//pthread_exit(NULL);
break;
}
else if( rv > 0 )
{
printf("从客户端[%d]读取[%d]字节内容[%s]\n",cli_fd, rv, buf);
}
/* convert letter from lowercase to uppercase */
for(i=0; i<rv; i++)/*收到client发送的数据后把它转换成大写字母并发送给客户端*/
{
buf[i]=toupper(buf[i]);
}
rv=write(cli_fd, buf, rv);
if(rv < 0)
{
printf("[ERROR]客户端[%d]应答失败 ,线程退出\n", cli_fd,strerror(errno));
close(cli_fd);
//pthread_exit(NULL);
break;
}
}
}
int main(int argc,char **argv)
{
int listen_fd;
int clifd=-1;
int rv=-2;
int opt;
struct sockaddr_in cliaddr;
socklen_t cliaddr_len =sizeof(cliaddr) ;
pthread_t tid;
// 创建线程池
ThreadPool* pool = threadPoolCreate(3, 10, 100);
if((listen_fd=socket_init(NULL,SERVER_PORT))<0)/*socket初始化返回一个listenfd*/
{
printf("socket初始化失败:%s",strerror(errno));
return -2;
}
while(1)
{
printf("开始等待新连接……\n");
clifd=accept(listen_fd, (struct sockaddr *)&cliaddr, &cliaddr_len);/*主线程接收新的客户端的连接*/
if(clifd < 0)
{
printf("连接新客户端失败: %s\n", strerror(errno));
continue;
}
printf("新客户端接入[%s:%d]成功\n", inet_ntoa(cliaddr.sin_addr),ntohs(cliaddr.sin_port));
//thread_init(&tid, thread_worker, &clifd);/*创建子线程并让子线程和client进行数据的收发*/
threadPoolAdd(pool, thread_worker, &clifd);
}
}
不过这种需要循环的客户端,立马就占满了3个处理线程,第四个加入的时候,就需要等待条件变量了。如果是那种短暂连接,发送个消息就结束的情况,应该就比较合适了
epoll可以高效处理socket连接,那么用来代替accept或者select自然是更好,配合线程池处理业务,来实现高并发快速处理
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#define MAX 1024
#define listen_port 80
// 任务结构体
typedef struct Task
{
void (*function)(void* arg);
void* arg;
}Task;
// 线程池结构体
struct ThreadPool
{
// 任务队列
Task* taskQ;
int queueCapacity; // 容量
int queueSize; // 当前任务个数
int queueFront; // 队头 -> 取数据
int queueRear; // 队尾 -> 放数据
pthread_t managerID; // 管理者线程ID
pthread_t *threadIDs; // 工作的线程ID
int minNum; // 最小线程数量
int maxNum; // 最大线程数量
int busyNum; // 忙的线程的个数
int liveNum; // 存活的线程的个数
int exitNum; // 要销毁的线程个数
pthread_mutex_t mutexPool; // 锁整个的线程池
pthread_cond_t notFull; // 任务队列是不是满了
pthread_cond_t notEmpty; // 任务队列是不是空了
pthread_mutex_t mutexBusy; // 锁busyNum变量
int shutdown; // 是不是要销毁线程池, 销毁为1, 不销毁为0
};
typedef struct ThreadPool ThreadPool;
void threadExit(ThreadPool* pool)
{
pthread_t tid = pthread_self();
for (int i = 0; i < pool->maxNum; ++i)
{
if (pool->threadIDs[i] == tid)
{
pool->threadIDs[i] = 0;
printf("threadExit() called, %ld exiting...\n", tid);
break;
}
}
pthread_exit(NULL);
}
void* worker(void* arg)
{
ThreadPool* pool = (ThreadPool*)arg;
while (1)
{
pthread_mutex_lock(&pool->mutexPool);
// 当前任务队列是否为空
while (pool->queueSize == 0 && !pool->shutdown)
{
// 阻塞工作线程
pthread_cond_wait(&pool->notEmpty, &pool->mutexPool);
// 判断是不是要销毁线程
if (pool->exitNum > 0)
{
pool->exitNum--;
if (pool->liveNum > pool->minNum)
{
pool->liveNum--;
pthread_mutex_unlock(&pool->mutexPool);
threadExit(pool);
}
}
}
// 判断线程池是否被关闭了
if (pool->shutdown)
{
pthread_mutex_unlock(&pool->mutexPool);
threadExit(pool);
}
// 从任务队列中取出一个任务
Task task;
task.function = pool->taskQ[pool->queueFront].function;
task.arg = pool->taskQ[pool->queueFront].arg;
// 移动头结点
pool->queueFront = (pool->queueFront + 1) % pool->queueCapacity;
pool->queueSize--;
// 解锁
pthread_cond_signal(&pool->notFull);
pthread_mutex_unlock(&pool->mutexPool);
printf("thread %ld start working...\n", pthread_self());
pthread_mutex_lock(&pool->mutexBusy);
pool->busyNum++;
pthread_mutex_unlock(&pool->mutexBusy);
task.function(task.arg);
//free(task.arg);
task.arg = NULL;
printf("thread %ld end working...\n", pthread_self());
pthread_mutex_lock(&pool->mutexBusy);
pool->busyNum--;
pthread_mutex_unlock(&pool->mutexBusy);
}
return NULL;
}
void* manager(void* arg)
{
ThreadPool* pool = (ThreadPool*)arg;
while (!pool->shutdown)
{
// 每隔3s检测一次
sleep(3);
// 取出线程池中任务的数量和当前线程的数量
pthread_mutex_lock(&pool->mutexPool);
int queueSize = pool->queueSize;
int liveNum = pool->liveNum;
pthread_mutex_unlock(&pool->mutexPool);
// 取出忙的线程的数量
pthread_mutex_lock(&pool->mutexBusy);
int busyNum = pool->busyNum;
pthread_mutex_unlock(&pool->mutexBusy);
const int NUMBER = 2;
// 添加线程
// 任务的个数>存活的线程个数 && 存活的线程数<最大线程数
if (queueSize > liveNum && liveNum < pool->maxNum)
{
pthread_mutex_lock(&pool->mutexPool);
int counter = 0;
for (int i = 0; i < pool->maxNum && counter < NUMBER
&& pool->liveNum < pool->maxNum; ++i)
{
if (pool->threadIDs[i] == 0)
{
pthread_create(&pool->threadIDs[i], NULL, worker, pool);
counter++;
pool->liveNum++;
}
}
pthread_mutex_unlock(&pool->mutexPool);
}
// 销毁线程
// 忙的线程*2 < 存活的线程数 && 存活的线程>最小线程数
if (busyNum * 2 < liveNum && liveNum > pool->minNum)
{
pthread_mutex_lock(&pool->mutexPool);
pool->exitNum = NUMBER;
pthread_mutex_unlock(&pool->mutexPool);
// 让工作的线程自杀
for (int i = 0; i < NUMBER; ++i)
{
pthread_cond_signal(&pool->notEmpty);
}
}
}
return NULL;
}
//min 最少
//max 最多线程数
//queueSize 任务队列大小
ThreadPool* threadPoolCreate(int min, int max, int queueSize)
{
ThreadPool* pool = (ThreadPool*)malloc(sizeof(ThreadPool));
do
{
if (pool == NULL)
{
printf("malloc threadpool fail...\n");
break;
}
//申请max个phtread_t 线程id
pool->threadIDs = (pthread_t*)malloc(sizeof(pthread_t) * max);
if (pool->threadIDs == NULL)
{
printf("malloc threadIDs fail...\n");
break;
}
memset(pool->threadIDs, 0, sizeof(pthread_t) * max);
pool->minNum = min;
pool->maxNum = max;
pool->busyNum = 0; //在忙的线程
pool->liveNum = min; //和最小个数相等
pool->exitNum = 0; //需要销毁的
//初始化锁和条件变量
if (pthread_mutex_init(&pool->mutexPool, NULL) != 0 ||
pthread_mutex_init(&pool->mutexBusy, NULL) != 0 ||
pthread_cond_init(&pool->notEmpty, NULL) != 0 ||
pthread_cond_init(&pool->notFull, NULL) != 0)
{
printf("mutex or condition init fail...\n");
break;
}
// 任务队列
pool->taskQ = (Task*)malloc(sizeof(Task) * queueSize);
pool->queueCapacity = queueSize;
pool->queueSize = 0; //当前任务数
pool->queueFront = 0; //队头
pool->queueRear = 0; //队尾
pool->shutdown = 0; //线程池启动销毁
// 创建管理线程
pthread_create(&pool->managerID, NULL, manager, pool);
// 创建工作线程,先创建min个线程
for (int i = 0; i < min; ++i)
{
pthread_create(&pool->threadIDs[i], NULL, worker, pool);
}
return pool;
} while (0);
// 释放资源
if (pool && pool->threadIDs) free(pool->threadIDs);
if (pool && pool->taskQ) free(pool->taskQ);
if (pool) free(pool);
return NULL;
}
int threadPoolDestroy(ThreadPool* pool)
{
if (pool == NULL)
{
return -1;
}
// 关闭线程池
pool->shutdown = 1;
// 阻塞回收管理者线程
pthread_join(pool->managerID, NULL);
// 唤醒阻塞的消费者线程
for (int i = 0; i < pool->liveNum; ++i)
{
pthread_cond_signal(&pool->notEmpty);
}
// 释放堆内存
if (pool->taskQ)
{
free(pool->taskQ);
}
if (pool->threadIDs)
{
free(pool->threadIDs);
}
pthread_mutex_destroy(&pool->mutexPool);
pthread_mutex_destroy(&pool->mutexBusy);
pthread_cond_destroy(&pool->notEmpty);
pthread_cond_destroy(&pool->notFull);
free(pool);
pool = NULL;
return 0;
}
void threadPoolAdd(ThreadPool* pool, void(*func)(void*), void* arg)
{
pthread_mutex_lock(&pool->mutexPool);
while (pool->queueSize == pool->queueCapacity && !pool->shutdown)
{
// 阻塞生产者线程
pthread_cond_wait(&pool->notFull, &pool->mutexPool);
}
if (pool->shutdown)
{
pthread_mutex_unlock(&pool->mutexPool);
return;
}
// 添加任务
pool->taskQ[pool->queueRear].function = func;
pool->taskQ[pool->queueRear].arg = arg;
pool->queueRear = (pool->queueRear + 1) % pool->queueCapacity;
pool->queueSize++;
pthread_cond_signal(&pool->notEmpty);
pthread_mutex_unlock(&pool->mutexPool);
}
int threadPoolBusyNum(ThreadPool* pool)
{
pthread_mutex_lock(&pool->mutexBusy);
int busyNum = pool->busyNum;
pthread_mutex_unlock(&pool->mutexBusy);
return busyNum;
}
int threadPoolAliveNum(ThreadPool* pool)
{
pthread_mutex_lock(&pool->mutexPool);
int aliveNum = pool->liveNum;
pthread_mutex_unlock(&pool->mutexPool);
return aliveNum;
}
static void *thread_worker(void *ctx)/*子线程处理和客户端数据交换的处理函数*/
{
int cli_fd;
int rv;
char buf[1024];
int i;
if( !ctx )
{
printf("[ERROR]客户端socket无效,线程退出\n");
pthread_exit(NULL);
}
cli_fd = *(int *)ctx;/*强制类型转换成int *类型的并把ctx的值赋值给cli_fd*/
printf("子线程开始通信\n");
memset(buf, 0, sizeof(buf));
rv=read(cli_fd, buf, sizeof(buf));
if( rv < 0)
{
printf("[ERROR]读取客户端[%d]数据失败[%s],线程退出\n",cli_fd,strerror(errno));
close(cli_fd);
}
else if(rv == 0)
{
printf("客户端[%d]连接结束 ,线程退出\n",cli_fd);
close(cli_fd);
}
else if( rv > 0 )
{
printf("从客户端[%d]读取[%d]字节内容[%s]\n",cli_fd, rv, buf);
}
}
int main()
{
int listenfd=socket(AF_INET,SOCK_STREAM,0);
assert(listenfd!=-1);
struct sockaddr_in ser,cli;
ser.sin_family=AF_INET;
ser.sin_port=htons(listen_port);
ser.sin_addr.s_addr=htonl(INADDR_ANY);//inet_addr("127.0.0.1");
int res=bind(listenfd,(struct sockaddr*)&ser,sizeof(ser));
assert(res!=-1);
listen(listenfd,5);
int epfd=epoll_create(1); //创建内核事件表epfd
assert(epfd!=-1);
struct epoll_event ev;
ev.events=EPOLLIN;
ev.data.fd=listenfd; //初始化一个关于listenfd的event结构体
res=epoll_ctl(epfd,EPOLL_CTL_ADD,listenfd,&ev); //将关于listenfd的结构体放入内核事件表
assert(res!=-1);
struct epoll_event event[MAX]; //下面epoll_wait()要将就绪事件都放入该数组中返回回来
// 创建线程池
ThreadPool* pool = threadPoolCreate(10, 100, 100);
while(1)
{
int n=epoll_wait(epfd,event,MAX,-1); //核心函数;返回就绪文件描述符个数
if(n==-1)
{
printf("error!\n");
exit(0);
}
if(n==0)
{
printf("timeout\n");
continue;
}
int i=0;
for(;i<n;++i)
{
int fd=event[i].data.fd;
if(event[i].events & EPOLLRDHUP) //cli输入“end”
{
printf("break\n");
close(fd);
epoll_ctl(epfd,EPOLL_CTL_DEL,fd,NULL); //将关于fd的结构体从epfd中删除
continue;
}
if(event[i].events & EPOLLIN)
{
if(fd==listenfd)
{
int len=sizeof(cli);
int c=accept(listenfd,(struct sockaddr*)&cli,&len);
assert(c!=-1);
printf("link succese\n");
ev.events= EPOLLIN|EPOLLET;//EPOLLIN|EPOLLRDHUP;
ev.data.fd=c;
res=epoll_ctl(epfd,EPOLL_CTL_ADD,c,&ev);
assert(res!=-1);
}
else
{
threadPoolAdd(pool, thread_worker, &fd);
}
}
}
}
}
今天是国庆前最后一天
等了很久的放假,突然感觉也没那么有意思,回家也回不去,总是临近节假日,就开始有疫情。
虽然好多资本主义国家都宣布疫情结束了,他们一定是在骗自己,
再工作一会吧,站好最后一班岗