服务消费方发送请求,经过多次调用后,会将请求数据送至 Netty NioClientSocketChannel。这样做的原因是通过 Exchange 层为框架引入 Request 和 Response 语义,首先分析 ReferenceCountExchangeClient 的源码。
final class ReferenceCountExchangeClient implements ExchangeClient {
private final URL url;
private final AtomicInteger referenceCount = new AtomicInteger(0);
public ReferenceCountExchangeClient(ExchangeClient client, ConcurrentMap<String, LazyConnectExchangeClient> ghostClientMap) {
this.client = client;
// 引用计数自增
referenceCount.incrementAndGet();
this.url = client.getUrl();
// ...
}
@Override
public ResponseFuture request(Object request) throws RemotingException {
// 直接调用被装饰对象的同签名方法
return client.request(request);
}
@Override
public ResponseFuture request(Object request, int timeout) throws RemotingException {
// 直接调用被装饰对象的同签名方法
return client.request(request, timeout);
}
/** 引用计数自增,该方法由外部调用 */
public void incrementAndGetCount() {
// referenceCount 自增
referenceCount.incrementAndGet();
}
@Override
public void close(int timeout) {
// referenceCount 自减
if (referenceCount.decrementAndGet() <= 0) {
if (timeout == 0) {
client.close();
} else {
client.close(timeout);
}
client = replaceWithLazyClient();
}
}
// ......
}
ReferenceCountExchangeClient 内部定义了一个引用计数变量 referenceCount,每当该对象被引用一次 referenceCount 都会进行自增。每当 close 方法被调用时,referenceCount 进行自减。ReferenceCountExchangeClient 内部仅实现了一个引用计数的功能,其他方法并无复杂逻辑,均是直接调用被装饰对象的相关方法。
public class HeaderExchangeClient implements ExchangeClient {
private static final ScheduledThreadPoolExecutor scheduled = new
ScheduledThreadPoolExecutor(2, new NamedThreadFactory
("dubbo-remoting-client-heartbeat", true));
private final Client client;
private final ExchangeChannel channel;
private ScheduledFuture<?> heartbeatTimer;
private int heartbeat;
private int heartbeatTimeout;
public HeaderExchangeClient(Client client, boolean needHeartbeat) {
if (client == null) {
throw new IllegalArgumentException("client == null");
}
this.client = client;
// 创建 HeaderExchangeChannel 对象
this.channel = new HeaderExchangeChannel(client);
// 以下代码均与心跳检测逻辑有关
String dubbo = client.getUrl().getParameter(Constants.DUBBO_VERSION_KEY);
this.heartbeat = client.getUrl().getParameter
(Constants.HEARTBEAT_KEY, dubbo != null && dubbo.startsWith("1.0.") ?
Constants.DEFAULT_HEARTBEAT : 0);
this.heartbeatTimeout = client.getUrl().getParameter
(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3);
if (heartbeatTimeout < heartbeat * 2) {
throw new IllegalStateException("heartbeatTimeout);
}
if (needHeartbeat) {
// 开启心跳检测定时器
startHeartbeatTimer();
}
}
@Override
public ResponseFuture request(Object request) throws RemotingException {
// 直接 HeaderExchangeChannel 对象的同签名方法
return channel.request(request);
}
@Override
public ResponseFuture request(Object request, int timeout) throws
RemotingException {
// 直接 HeaderExchangeChannel 对象的同签名方法
return channel.request(request, timeout);
}
@Override
public void close() {
doClose();
channel.close();
}
private void doClose() {
// 停止心跳检测定时器
stopHeartbeatTimer();
}
private void startHeartbeatTimer() {
stopHeartbeatTimer();
if (heartbeat > 0) {
heartbeatTimer = scheduled.scheduleWithFixedDelay(
new HeartBeatTask(new HeartBeatTask.ChannelProvider() {
@Override
public Collection<Channel> getChannels() {
return Collections.
<Channel>singletonList(HeaderExchangeClient.this);
}
}, heartbeat, heartbeatTimeout),
heartbeat, heartbeat, TimeUnit.MILLISECONDS);
}
}
private void stopHeartbeatTimer() {
if (heartbeatTimer != null && !heartbeatTimer.isCancelled()) {
try {
heartbeatTimer.cancel(true);
scheduled.purge();
} catch (Throwable e) {
if (logger.isWarnEnabled()) {
logger.warn(e.getMessage(), e);
}
}
}
heartbeatTimer = null;
}
// .....
}
HeaderExchangeClient 中很多方法只有一行代码,即调用 HeaderExchangeChannel 对象的同签名方法。那HeaderExchangeClient 封装了一些关于心跳检测的逻辑。
final class HeaderExchangeChannel implements ExchangeChannel {
private final Channel channel;
HeaderExchangeChannel(Channel channel) {
if (channel == null) {
throw new IllegalArgumentException("channel == null");
}
// 这里的 channel 指向的是 NettyClient
this.channel = channel;
}
@Override
public ResponseFuture request(Object request) throws RemotingException {
return request(request,
channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY,
Constants.DEFAULT_TIMEOUT));
}
@Override
public ResponseFuture request(Object request, int timeout) throws
RemotingException {
if (closed) {
throw new RemotingException(..., "Failed to send request ...");
}
// 创建 Request 对象
Request req = new Request();
req.setVersion(Version.getProtocolVersion());
// 设置双向通信标志为 true
req.setTwoWay(true);
// 这里的 request 变量类型为 RpcInvocation
req.setData(request);
// 创建 DefaultFuture 对象
DefaultFuture future = new DefaultFuture(channel, req, timeout);
try {
// 调用 NettyClient 的 send 方法发送请求
channel.send(req);
} catch (RemotingException e) {
future.cancel();
throw e;
}
// 返回 DefaultFuture 对象
return future;
}
}
首先定义了一个 Request 对象,然后再将该对象传给 NettyClient 的 send 方法,进行后续的调用。需要说明的是,NettyClient 中并未实现 send 方法,该方法继承自父类 AbstractPeer
public abstract class AbstractPeer implements Endpoint, ChannelHandler {
@Override
public void send(Object message) throws RemotingException {
// 该方法由 AbstractClient 类实现
send(message, url.getParameter(Constants.SENT_KEY, false));
}
// ....
}
public abstract class AbstractClient extends AbstractEndpoint implements Client {
@Override
public void send(Object message, boolean sent) throws RemotingException {
if (send_reconnect && !isConnected()) {
connect();
}
// 获取 Channel,getChannel 是一个抽象方法,具体由子类实现
Channel channel = getChannel();
if (channel == null || !channel.isConnected()) {
throw new RemotingException(this, "message can not send ...");
}
// 继续向下调用
channel.send(message, sent);
}
protected abstract Channel getChannel();
// .....
}
默认情况下,Dubbo 使用 Netty 作为底层的通信框架,因此到 NettyClient 类中看一下 getChannel 方法的实现。
public class NettyClient extends AbstractClient {
// 这里的 Channel 全限定名称为 org.jboss.netty.channel.Channel
private volatile Channel channel;
@Override
protected com.alibaba.dubbo.remoting.Channel getChannel() {
Channel c = channel;
if (c == null || !c.isConnected())
return null;
// 获取一个 NettyChannel 类型对象
return NettyChannel.getOrAddChannel(c, getUrl(), this);
}
}
final class NettyChannel extends AbstractChannel {
private static final ConcurrentMap<org.jboss.netty.channel.Channel, NettyChannel>
channelMap =
new ConcurrentHashMap<org.jboss.netty.channel.Channel, NettyChannel>();
private final org.jboss.netty.channel.Channel channel;
/** 私有构造方法 */
private NettyChannel(org.jboss.netty.channel.Channel channel, URL url,
ChannelHandler handler) {
super(url, handler);
if (channel == null) {
throw new IllegalArgumentException("netty channel == null;");
}
this.channel = channel;
}
static NettyChannel getOrAddChannel(org.jboss.netty.channel.Channel ch, URL url,
ChannelHandler handler) {
if (ch == null) {
return null;
}
// 尝试从集合中获取 NettyChannel 实例
NettyChannel ret = channelMap.get(ch);
if (ret == null) {
// 如果 ret = null,则创建一个新的 NettyChannel 实例
NettyChannel nc = new NettyChannel(ch, url, handler);
if (ch.isConnected()) {
// 将 键值对存入 channelMap 集合中
ret = channelMap.putIfAbsent(ch, nc);
}
if (ret == null) {
ret = nc;
}
}
return ret;
}
}
获取到 NettyChannel 实例后,即可进行后续的调用。看一下 NettyChannel 的 send 方法。
public void send(Object message, boolean sent) throws RemotingException {
super.send(message, sent);
boolean success = true;
int timeout = 0;
try {
// 发送消息(包含请求和响应消息)
ChannelFuture future = channel.write(message);
//sent 的值源于 中 sent 的配置值,有两种配置值:
// 1. true: 等待消息发出,消息发送失败将抛出异常
// 2. false: 不等待消息发出,将消息放入 IO 队列,即刻返回
// 默认情况下 sent = false;
if (sent) {
timeout = getUrl().getPositiveParameter(Constants.TIMEOUT_KEY,
Constants.DEFAULT_TIMEOUT);
// 等待消息发出,若在规定时间没能发出,success 会被置为 false
success = future.await(timeout);
}
Throwable cause = future.getCause();
if (cause != null) {
throw cause;
}
} catch (Throwable e) {
throw new RemotingException(this, "Failed to send message ...");
}
// 若 success 为 false,这里抛出异常
if (!success) {
throw new RemotingException(this, "Failed to send message ...");
}
}
经历多次调用,到这里请求数据的发送过程就结束了。以 DemoService 为例,将 sayHello 方法的整个调用路径贴出来。
proxy0#sayHello(String)
—> InvokerInvocationHandler#invoke(Object, Method, Object[])
—> MockClusterInvoker#invoke(Invocation)
—> AbstractClusterInvoker#invoke(Invocation)
—> FailoverClusterInvoker#doInvoke(Invocation, List<Invoker<T>>, LoadBalance)
—> Filter#invoke(Invoker, Invocation) // 包含多个 Filter 调用
—> ListenerInvokerWrapper#invoke(Invocation)
—> AbstractInvoker#invoke(Invocation)
—> DubboInvoker#doInvoke(Invocation)
—> ReferenceCountExchangeClient#request(Object, int)
—> HeaderExchangeClient#request(Object, int)
—> HeaderExchangeChannel#request(Object, int)
—> AbstractPeer#send(Object)
—> AbstractClient#send(Object, boolean)
—> NettyChannel#send(Object, boolean)
—> NioClientSocketChannel#write(Object)
Dubbo 数据包结构
Dubbo 数据包分为消息头和消息体,消息头用于存储一些元信息,比如魔数(Magic),数据包类型(Request/Response),消息体长度(Data Length)等。消息体中用于存储具体的调用消息,比如方法名称,参数列表等。下面简单列举一下消息头的内容。
了解了 Dubbo 数据包格式,接下来就可以探索编码过程了。
public class ExchangeCodec extends TelnetCodec {
// 消息头长度
protected static final int HEADER_LENGTH = 16;
// 魔数内容
protected static final short MAGIC = (short) 0xdabb;
protected static final byte MAGIC_HIGH = Bytes.short2bytes(MAGIC)[0];
protected static final byte MAGIC_LOW = Bytes.short2bytes(MAGIC)[1];
protected static final byte FLAG_REQUEST = (byte) 0x80;
protected static final byte FLAG_TWOWAY = (byte) 0x40;
protected static final byte FLAG_EVENT = (byte) 0x20;
protected static final int SERIALIZATION_MASK = 0x1f;
private static final Logger logger = LoggerFactory.getLogger(ExchangeCodec.class);
public Short getMagicCode() {
return MAGIC;
}
@Override
public void encode(Channel channel, ChannelBuffer buffer, Object msg) throws IOException {
if (msg instanceof Request) {
// 对 Request 对象进行编码
encodeRequest(channel, buffer, (Request) msg);
} else if (msg instanceof Response) {
// 对 Response 对象进行编码,后面分析
encodeResponse(channel, buffer, (Response) msg);
} else {
super.encode(channel, buffer, msg);
}
}
protected void encodeRequest(Channel channel, ChannelBuffer buffer, Request req) throws IOException {
Serialization serialization = getSerialization(channel);
// 创建消息头字节数组,长度为 16
byte[] header = new byte[HEADER_LENGTH];
// 设置魔数
Bytes.short2bytes(MAGIC, header);
// 设置数据包类型(Request/Response)和序列化器编号
header[2] = (byte) (FLAG_REQUEST | serialization.getContentTypeId());
// 设置通信方式(单向/双向)
if (req.isTwoWay()) {
header[2] |= FLAG_TWOWAY;
}
// 设置事件标识
if (req.isEvent()) {
header[2] |= FLAG_EVENT;
}
// 设置请求编号,8个字节,从第4个字节开始设置
Bytes.long2bytes(req.getId(), header, 4);
// 获取 buffer 当前的写位置
int savedWriteIndex = buffer.writerIndex();
// 更新 writerIndex,为消息头预留 16 个字节的空间
buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);
ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);
// 创建序列化器,比如 Hessian2ObjectOutput
ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
if (req.isEvent()) {
// 对事件数据进行序列化操作
encodeEventData(channel, out, req.getData());
} else {
// 对请求数据进行序列化操作
encodeRequestData(channel, out, req.getData(), req.getVersion());
}
out.flushBuffer();
if (out instanceof Cleanable) {
((Cleanable) out).cleanup();
}
bos.flush();
bos.close();
// 获取写入的字节数,也就是消息体长度
int len = bos.writtenBytes();
checkPayload(channel, len);
// 将消息体长度写入到消息头中
Bytes.int2bytes(len, header, 12);
// 将 buffer 指针移动到 savedWriteIndex,为写消息头做准备
buffer.writerIndex(savedWriteIndex);
// 从 savedWriteIndex 下标处写入消息头
buffer.writeBytes(header);
// 设置新的 writerIndex,writerIndex = 原写下标 + 消息头长度 + 消息体长度
buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len);
}
// ......
}
以上就是请求对象的编码过程,该过程首先会通过位运算将消息头写入到 header 数组中。然后对 Request 对象的 data 字段执行序列化操作,序列化后的数据最终会存储到 ChannelBuffer 中。序列化操作执行完后,可得到数据序列化后的长度 len,紧接着将 len 写入到 header 指定位置处。最后再将消息头字节数组 header 写入到 ChannelBuffer 中,整个编码过程就结束了。再来看一下 Request 对象的 data 字段序列化过程,也就是 encodeRequestData 方法
public class DubboCodec extends ExchangeCodec implements Codec2 {
protected void encodeRequestData(Channel channel, ObjectOutput out, Object data,
String version) throws IOException {
RpcInvocation inv = (RpcInvocation) data;
// 依次序列化 dubbo version、path
out.writeUTF(version);
out.writeUTF(inv.getAttachment(Constants.PATH_KEY));
out.writeUTF(inv.getAttachment(Constants.VERSION_KEY));
// 序列化调用方法名
out.writeUTF(inv.getMethodName());
// 将参数类型转换为字符串,并进行序列化
out.writeUTF(ReflectUtils.getDesc(inv.getParameterTypes()));
Object[] args = inv.getArguments();
if (args != null)
for (int i = 0; i < args.length; i++) {
// 对运行时参数进行序列化
out.writeObject(encodeInvocationArgument(channel, inv, i));
}
// 序列化 attachments
out.writeObject(inv.getAttachments());
}
}