• zfoo增加类似于mydog的路由


    1. /*
    2. * Copyright (C) 2020 The zfoo Authors
    3. *
    4. * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
    5. * in compliance with the License. You may obtain a copy of the License at
    6. *
    7. * http://www.apache.org/licenses/LICENSE-2.0
    8. *
    9. * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed
    10. * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    11. * See the License for the specific language governing permissions and limitations under the License.
    12. */
    13. package com.zfoo.net.handler;
    14. import com.zfoo.event.manager.EventBus;
    15. import com.zfoo.net.NetContext;
    16. import com.zfoo.net.consumer.balancer.ConsistentHashConsumerLoadBalancer;
    17. import com.zfoo.net.core.gateway.IGatewayLoadBalancer;
    18. import com.zfoo.net.core.gateway.model.GatewaySessionInactiveEvent;
    19. import com.zfoo.net.packet.common.Heartbeat;
    20. import com.zfoo.net.packet.common.Ping;
    21. import com.zfoo.net.packet.common.Pong;
    22. import com.zfoo.net.packet.model.DecodedPacketInfo;
    23. import com.zfoo.net.router.attachment.GatewayAttachment;
    24. import com.zfoo.net.router.attachment.IAttachment;
    25. import com.zfoo.net.router.attachment.SignalAttachment;
    26. import com.zfoo.net.session.model.AttributeType;
    27. import com.zfoo.net.session.model.Session;
    28. import com.zfoo.net.util.SessionUtils;
    29. import com.zfoo.protocol.IPacket;
    30. import com.zfoo.protocol.ProtocolManager;
    31. import com.zfoo.protocol.registration.IProtocolRegistration;
    32. import com.zfoo.protocol.util.JsonUtils;
    33. import com.zfoo.protocol.util.StringUtils;
    34. import com.zfoo.scheduler.util.TimeUtils;
    35. import io.netty.channel.ChannelHandler;
    36. import io.netty.channel.ChannelHandlerContext;
    37. import org.slf4j.Logger;
    38. import org.slf4j.LoggerFactory;
    39. import java.util.function.BiFunction;
    40. /**
    41. * @author jaysunxiao
    42. * @version 3.0
    43. */
    44. @ChannelHandler.Sharable
    45. public class GatewayRouteHandler extends ServerRouteHandler {
    46. private static final Logger logger = LoggerFactory.getLogger(GatewayRouteHandler.class);
    47. private final BiFunction packetFilter;
    48. private final BiFunction routerFunc;
    49. public GatewayRouteHandler(BiFunction packetFilter) {
    50. this(packetFilter, null);
    51. }
    52. public GatewayRouteHandler(BiFunction packetFilter, BiFunction routerFunc) {
    53. this.packetFilter = packetFilter;
    54. this.routerFunc = routerFunc;
    55. }
    56. // TODO test
    57. public static final BiFunction routerFunc1 = (session, packet) -> {
    58. var module = ProtocolManager.moduleByProtocolId(packet.protocolId());
    59. var name = module.getName();
    60. switch (name) {
    61. case "game":
    62. return (Integer) session.getAttribute(AttributeType.UID);
    63. case "guild":
    64. return (Integer) session.getAttribute(AttributeType.CONSUMER);
    65. }
    66. return null;
    67. };
    68. @Override
    69. public void channelRead(ChannelHandlerContext ctx, Object msg) {
    70. // 请求者的session,一般是serverSession
    71. var session = SessionUtils.getSession(ctx);
    72. if (session == null) {
    73. return;
    74. }
    75. var decodedPacketInfo = (DecodedPacketInfo) msg;
    76. var packet = decodedPacketInfo.getPacket();
    77. if (packet.protocolId() == Heartbeat.PROTOCOL_ID) {
    78. return;
    79. }
    80. if (packet.protocolId() == Ping.PROTOCOL_ID) {
    81. NetContext.getRouter().send(session, Pong.valueOf(TimeUtils.now()), null);
    82. return;
    83. }
    84. // 过滤非法包
    85. if (packetFilter != null && packetFilter.apply(session, packet)) {
    86. throw new IllegalArgumentException(StringUtils.format("[session:{}]发送了一个非法包[{}]"
    87. , SessionUtils.sessionInfo(ctx), JsonUtils.object2String(packet)));
    88. }
    89. var signalAttachment = (SignalAttachment) decodedPacketInfo.getAttachment();
    90. // 把客户端信息包装为一个GatewayAttachment,因此通过这个网关附加包可以得到玩家的uid、sid之类的信息
    91. var gatewayAttachment = new GatewayAttachment(session, signalAttachment);
    92. // 网关优先使用IGatewayLoadBalancer作为一致性hash的计算参数,然后才会使用客户端的session做参数
    93. // 例子:以聊天服务来说,玩家知道自己在哪个群组groupId中,那往这个群发送消息时,会在Packet中带上这个groupId做为一致性hash就可以了。
    94. if (packet instanceof IGatewayLoadBalancer) {
    95. var loadBalancerConsistentHashObject = ((IGatewayLoadBalancer) packet).loadBalancerConsistentHashObject();
    96. gatewayAttachment.useExecutorConsistentHash(loadBalancerConsistentHashObject);
    97. forwardingPacket(packet, gatewayAttachment, loadBalancerConsistentHashObject);
    98. return;
    99. } else {
    100. // 针对当前业务模块,是否session上绑定的有路由信息,从而直接转发给provider
    101. if (routerFunc != null) {
    102. Integer hashId = routerFunc.apply(session, packet);
    103. if (hashId != null) {
    104. forwardingPacket(packet, gatewayAttachment, hashId);
    105. return;
    106. }
    107. }
    108. // 使用用户的uid做一致性hash
    109. var uid = (Long) session.getAttribute(AttributeType.UID);
    110. if (uid != null) {
    111. forwardingPacket(packet, gatewayAttachment, uid);
    112. return;
    113. }
    114. }
    115. // 再使用session的sid做一致性hash,因为每次客户端连接过来sid都会改变,所以客户端重新建立连接的话可能会被路由到其它的服务器
    116. // 如果有特殊需求的话,可以考虑去重写网关的转发策略
    117. // 拿着玩家的sid做一致性hash,那肯定是:一旦重连sid就会一直变化。所以:一般情况下除非自己创建TcpClient,否则逻辑不应该走到这里。 而是走上面的通过UID做一致性hash
    118. var sid = session.getSid();
    119. forwardingPacket(packet, gatewayAttachment, sid);
    120. }
    121. /**
    122. * 转发网关收到的包到Provider
    123. */
    124. private void forwardingPacket(IPacket packet, IAttachment attachment, Object argument) {
    125. try {
    126. var consumerSession = ConsistentHashConsumerLoadBalancer.getInstance().loadBalancer(packet, argument);
    127. NetContext.getRouter().send(consumerSession, packet, attachment);
    128. } catch (Exception e) {
    129. logger.error("网关发生异常", e);
    130. } catch (Throwable t) {
    131. logger.error("网关发生错误", t);
    132. }
    133. }
    134. @Override
    135. public void channelInactive(ChannelHandlerContext ctx) throws Exception {
    136. var session = SessionUtils.getSession(ctx);
    137. if (session == null) {
    138. return;
    139. }
    140. var sid = session.getSid();
    141. var uid = (Long) session.getAttribute(AttributeType.UID);
    142. // 连接到网关的客户端断开了连接
    143. EventBus.asyncSubmit(GatewaySessionInactiveEvent.valueOf(sid, uid == null ? 0 : uid.longValue()));
    144. super.channelInactive(ctx);
    145. }
    146. }

    zfoo的gateway就相当于mydog的Connector服务器

     

  • 相关阅读:
    扩散模型(Diffusion Model,DDPM,GLIDE,DALLE2,Stable Diffusion)
    Datalogic,50年的成功
    揭秘计算机指令执行的神秘过程:CPU内部的绝密操作
    2023 年KPI (KPI:Key Performance Indicator) review
    一文详解Jenkins教程以及Jenkins中的CI/CD自动化部署机制
    计算机视觉--图像拼接
    spring boot中的标注@Component、@Service等
    前端学习C语言 - 开篇
    WANLSHOP 直播短视频种草多用户电商系统源码自营+多商户+多终端(H5+小程序+APP)
    java基础-第4章-面向对象(二)
  • 原文地址:https://blog.csdn.net/themagickeyjianan/article/details/126020537