TcpConnection和Acceptor比,要处理的逻辑更多,也更复杂。
Acceptor只需处理listenfd上发生的事件,而TcpConnection不仅要维护和客户端连接的connfd的数据收发和连接的断开,还要处理eventfd触发的新连接,还要保证runInLoop。
在断开TcpConnection的连接时,如果还要数据未发出,需要等待数据发送完毕,然后才能关闭该连接。
关闭连接的函数shutdown并没有立刻就把TcpConnection给Close调,而只是把它的状态设置成了kDisconnecting正在断开连接。然后在ioLoop中检测channel中还是否有数据未发送,如果没有则直接Close,有的话则什么都不做。
在发送数据完成后会检测TcpConnection的状态,如果为kDisconnecting则断开与TcpConnection的连接。
重写TcpConnection.h:
#pragma once
#include "noncopyable.h"
#include "InetAddress.h"
#include "Callbacks.h"
#include "Buffer.h"
#include "Timestamp.h"
#include
#include
#include
#include
class Channel;
class EventLoop;
class Socket;
/**
* TcpServer => Acceptor => 有一个新用户连接,通过accept函数拿到connfd
* =》 TcpConnection 设置回调 =》 Channel => Poller => Channel的回调操作
*/
class TcpConnection : noncopyable, public std::enable_shared_from_this<TcpConnection>
{
public:
TcpConnection(EventLoop* loop, const std::string &name, int sockfd, const InetAddress &localAddr, const InetAddress &peerAddr);
~TcpConnection();
EventLoop* getLoop() const { return loop_; }
const std::string& name() const { return name_; }
const InetAddress& localAddress() const { return localAddr_; }
const InetAddress& peerAddress() const { return peerAddr_; }
bool connected() const { return state_ == kConnected; }
//发送数据
//void send(const void* message, int len);
void send(const std::string& buf);
//关闭连接
void shutdown();
void setConnectionCallback(const ConnectionCallback& cb) { connectionCallback_ = cb; }
void setMessageCallback(const MessageCallback& cb) { messageCallback_ = cb; }
void setWriteCompleteCallback(const WriteCompleteCallback& cb) { writeCompleteCallback_ = cb; }
void setHighWaterMarkCallback(const HighWaterMarkCallback& cb, size_t len) { highWaterMark_ = len; highWaterMarkCallback_ = cb;}
void setCloseCallback(const CloseCallback& cb) { closeCallback_ = cb; }
//建立连接
void connectEstablished();
//销毁连接
void connectDestroyed();
private:
// 已断开连接 正在连接 已连接 正在断开连接
enum StateE{ kDisconnected, kConnecting, kConnected, kDisconnecting};
void setState(StateE state) { state_ = state; }
void handleRead(Timestamp receiveTime);
void handleWrite();
void handleClose();
void handleError();
void sendInLoop(const void* message, size_t len);
void shutdownInLoop();
EventLoop* loop_; //这里绝对不是baseLoop,因为TcpConnection都是在subLoop里边管理的
const std::string name_;
std::atomic_int state_;
bool reading_;
//和Acceptor类似 Acceptor =》 mainLoop TcpConnection =》 subLoop
std::unique_ptr<Socket> socket_;
std::unique_ptr<Channel> channel_;
const InetAddress localAddr_;
const InetAddress peerAddr_;
ConnectionCallback connectionCallback_; //新连接回调
MessageCallback messageCallback_; //有读写消息时的回调
WriteCompleteCallback writeCompleteCallback_; //消息发送完成以后的回调
CloseCallback closeCallback_;
HighWaterMarkCallback highWaterMarkCallback_;
size_t highWaterMark_;
Buffer inputBuffer_; //接收数据的缓冲区
Buffer outputBuffer_; //发送数据的缓冲区
};
重写TcpConnection.cc:
#include "TcpConnection.h"
#include "Logger.h"
#include "Socket.h"
#include "Channel.h"
#include "EventLoop.h"
#include
#include
#include
#include
#include
static EventLoop* CheckLoopNotNull(EventLoop* loop)
{
if(loop == nullptr)
{
LOG_FATAL("%s:%s:%d Loop is null! \n", __FILE__, __FUNCTION__, __LINE__);
}
return loop;
}
TcpConnection::TcpConnection(EventLoop* loop, const std::string &nameArg, int sockfd, const InetAddress &localAddr, const InetAddress &peerAddr)
:loop_(CheckLoopNotNull(loop))
,name_(nameArg)
,state_(kConnecting)
,reading_(true)
,socket_(new Socket(sockfd))
,channel_(new Channel(loop, sockfd))
,localAddr_(localAddr)
,peerAddr_(peerAddr)
,highWaterMark_(64*1024*1024) //64M
{
//给Channel设置相应的回调函数,poller给channel通知感兴趣的事件发生了,channel会回调相应的操作函数
channel_->setReadCallback(
std::bind(&TcpConnection::handleRead, this, std::placeholders::_1)
);
channel_->setWriteCallback(
std::bind(&TcpConnection::handleWrite, this)
);
channel_->setCloseCallback(
std::bind(&TcpConnection::handleClose, this)
);
channel_->setErrorCallback(
std::bind(&TcpConnection::handleError, this)
);
LOG_INFO("TcpConnection::ctor[%s] at fd=%d\n",name_.c_str(), sockfd);
socket_->setKeepAlive(true);
}
TcpConnection::~TcpConnection()
{
LOG_INFO("TcpConnection::dtor[%s] at fd=%d state=%d \n", name_.c_str(), channel_->fd(), (int)state_);
}
void TcpConnection::send(const std::string &buf)
{
if(state_ == kConnected)
{
if(loop_->isInLoopThread())
{
sendInLoop(buf.c_str(), buf.size());
}
else
{
loop_->runInLoop(std::bind(
&TcpConnection::sendInLoop
,this
,buf.c_str()
,buf.size()
));
}
}
}
/**
* 发送数据 应用写的快, 而内核发送数据慢, 需要把待发送数据写入缓冲区, 而且设置水位回调
*/
void TcpConnection::sendInLoop(const void* data, size_t len)
{
ssize_t nwrote = 0;
size_t remainging = len;
bool faultErrno = false;
//之前调过该connection的shutdown,不能再进行发送了
if(state_ == kDisconnected)
{
LOG_ERROR("disconnected, give up writing");
return ;
}
//表示channel_第一次开始写数据,而且缓冲区没有待发送数据
if(!channel_->isWriting() && outputBuffer_.readableBytes() == 0)
{
nwrote = ::write(channel_->fd(), data, len);
if(nwrote >= 0)
{
remainging = len - nwrote;
if(remainging == 0 && writeCompleteCallback_)
{
//在这数据全部发送完成,就不用给channel设置epollout事件了
loop_->queueInLoop(
std::bind(writeCompleteCallback_, shared_from_this())
);
}
}
else
{
nwrote = 0;
if(errno != EWOULDBLOCK)
{
LOG_ERROR("TcpConnection::sendInLoop");
if(errno == EPIPE || errno == ECONNRESET) //EPIPE ECONNRESET
{
faultErrno = true;
}
}
}
}
//说明当前这次write,并没有把数据全部发送出去,剩余的数据需要保存到发送缓冲区当中,然后给channel注册epollout事件,
//poller发现tcp的发送缓冲区有空间,会通知相应的sock-channel,调用handleWrite回调方法
//也就是嗲用TcpConnection::handleWrite方法,把发送缓冲区中的数据全部发送完成
if(!faultErrno && remainging > 0)
{
//目前发送缓冲区剩余的待发送数据的长度
size_t oldLen = outputBuffer_.readableBytes();
if(oldLen + remainging >= highWaterMark_
&& oldLen < highWaterMark_
&& highWaterMarkCallback_)
{
loop_->queueInLoop(std::bind(
highWaterMarkCallback_, shared_from_this(), oldLen + remainging
));
}
outputBuffer_.append((char*)data + nwrote, remainging);
if(!channel_->isWriting())
{
channel_->enableWriting(); //这里一定要注册channel的写事件,否则poller不会给channel通知epollout
}
}
}
void TcpConnection::handleRead(Timestamp receiveTime)
{
int savedErrno = 0;
ssize_t n = inputBuffer_.readFd(channel_->fd(), &savedErrno);
if(n > 0)
{
//已连接的用户有可读事件发生了,调用该用户的传入的回调操作
messageCallback_(shared_from_this(), &inputBuffer_, receiveTime);
}
else if( n == 0)
{
handleClose();
}
else
{
errno = savedErrno;
LOG_ERROR("TcpConnrction::handleRead");
handleError();
}
}
void TcpConnection::handleWrite()
{
if(channel_->isWriting())
{
int saveErrno;
ssize_t n = outputBuffer_.writeFd(channel_->fd(), &saveErrno);
if( n > 0)
{
outputBuffer_.retrieve(n);
if(outputBuffer_.readableBytes() == 0)
{
channel_->disableWriting();
if(writeCompleteCallback_)
{
//唤醒loop_对象的thread线程,执行回调
loop_->queueInLoop(
std::bind(writeCompleteCallback_, shared_from_this())
);
}
if(state_ == kDisconnecting)
{
shutdownInLoop();
}
}
}
else
{
LOG_ERROR("TcpConnection::handleWrite");
}
}
else
{
LOG_ERROR("TcpConnection fd=%d is down, no more writing \n", channel_->fd());
}
}
void TcpConnection::handleClose()
{
LOG_INFO("TcpConnection::handleClose fd=%d state=%d \n", channel_->fd(), (int)state_);
setState(kDisconnected);
channel_->disableALL();
TcpConnectionPtr connPtr(shared_from_this());
connectionCallback_(connPtr); //执行连接关闭的回调
closeCallback_(connPtr); //关闭连接的回调
}
void TcpConnection::handleError()
{
int optval;
socklen_t optlen = sizeof optval;
int err = 0;
if(::getsockopt(channel_->fd(), SOL_SOCKET, SO_ERROR, &optval, &optlen) < 0)
{
err = errno;
}
else
{
err = optval;
}
LOG_ERROR("TcpConnection::handleError name:%s - SO_ERROR:%d \n", name_.c_str(), err);
}
//建立连接
void TcpConnection::connectEstablished()
{
setState(kConnected);
channel_->tie(shared_from_this());
channel_->enableReading(); //向poller注册channel的epollin事件
//新连接建立,执行回调
connectionCallback_(shared_from_this());
}
//销毁连接
void TcpConnection::connectDestroyed()
{
if(state_ == kConnected)
{
setState(kDisconnected);
channel_->disableALL(); //把channel的所有感兴趣的事件,从poller中delete
connectionCallback_(shared_from_this());
}
channel_->remove(); //把channel从poller中删除掉
}
void TcpConnection::shutdown()
{
if(state_ == kConnected)
{
setState(kDisconnecting);
loop_->runInLoop(
std::bind(&TcpConnection::shutdownInLoop, this)
);
}
}
void TcpConnection::shutdownInLoop()
{
if(!channel_->isWriting()) //说明outputBuffer中的数据已经全部发送完成
{
socket_->shutdownWrite(); //关闭写端
}
}