• 工业级Netty网关,京东是如何架构的?


    说在前面

    在40岁老架构师 尼恩的读者交流群(50+)中,很多小伙伴拿到一线互联网企业如阿里、网易、有赞、希音、百度、滴滴的面试资格。

    最近,尼恩指导一个小伙伴简历,写了一个《高并发网关项目》,此项目帮这个小伙拿到 字节/阿里/微博/汽车之家 面邀, 所以说,这是一个牛逼的项目。

    为了帮助大家拿到更多面试机会,拿到更多大厂offer。

    尼恩决定:给大家出一章视频介绍这个项目的架构和实操,《33章:10Wqps 高并发 Netty网关架构与实操》,预计月底发布。然后,提供一对一的简历指导,这里简历金光闪闪、脱胎换骨。

    《33章:10Wqps 高并发 Netty网关架构与实操》 海报如下:

    配合《33章:10Wqps 高并发 Netty网关架构与实操》, 尼恩会梳理几个工业级、生产级网关案例,作为架构素材、设计的素材。

    前面梳理了

    除了以上的8个案例,这里,尼恩又找到一个漂亮的生产级案例:《工业级Netty网关,京东是如何架构?

    注意,这又一个非常 牛逼的工业级、生产级网关案例

    这些案例,并不是尼恩的原创。这些案例,仅仅是尼恩在《33章:10Wqps 高并发 Netty网关架构与实操》备课的过程中,在互联网查找资料的时候,收集起来的,供大家学习和交流使用。

    《尼恩 架构笔记》《尼恩高并发三部曲》《尼恩Java面试宝典》的PDF,请到公号【技术自由圈】获取

    工业级Netty网关,京东是如何架构?

    作者:张松然,京东商家研发部架构师

    京麦是京东商城为其商家提供的一款后台管理工具,它能够让商家在不登录后台的情况下生成订单,快速完成订单下载和发货流程。这与淘宝的旺旺商家版(现已更名为淘宝千牛)类似。

    本文主要阐述了京麦 TCP 网关的技术架构以及 Netty 的应用实践。

    京东京麦商家管理平台从 2014 年开始搭建网关,从 HTTP 网关逐步升级为 TCP 网关。到了 2016 年,基于 Netty4.x+Protobuf3.x 技术,京麦构建了一个高可用、高性能和高稳定的 TCP 长连接网关,支持 PC 端和应用程序的上下行通信。

    早期的京麦主要依靠 HTTP 和 TCP 长连接来发送消息通知,而没有将其应用于 API 网关。

    然而,随着对 NIO 技术的深入了解和对 Netty 框架的熟练掌握,以及对系统通信稳定性要求的提高,京麦开始尝试运用 NIO 技术来实现 API 请求调用。这一设想在 2016 年终于实现,并成功支持业务运营。

    得益于采用了 TCP 长连接容器、Protobuf 序列化、服务泛化调用框架等多种优化措施,京麦的 TCP 网关性能比 HTTP 网关提升了 10 倍以上,稳定性也显著超过了 HTTP 网关。

    1、TCP网关的网络结构

    通过 Netty 构建京麦 TCP 网关的长连接容器,作为网关接入层提供服务 API 请求调用。

    客户端通过域名 + 端口访问 TCP 网关,不同域名对应不同运营商的 VIP,VIP 发布在 LVS 上,LVS 将请求转发给后端的 HAProxy,然后由 HAProxy 将请求转发给后端的 Netty 的 IP+Port。

    主要,这个是高并发接入层的标准架构

    LVS 将请求转发给后端的 HAProxy,经过 LVS 的请求,但响应是由 HAProxy 直接返回给客户端,这就是 LVS 的 DR 模式。

    注意:请点击图像以查看清晰的视图!

    LVS+Keepalived(DR模式)的深入学习,尼恩给大家写了一篇非常详细的、全面的文章, 建议大家收藏起,好好掌握:

    10Wqps网关接入层,LVS+Keepalived(DR模式)如何搭建?

    2、TCP网关长连接容器架构

    TCP网关的核心组件是Netty,而Netty的NIO模型是Reactor反应堆模型(Reactor相当于有分发功能的多路复用器Selector)。

    每一个连接对应一个Channel(多路指多个Channel,复用指多个连接复用了一个线程或少量线程,在Netty指EventLoop),一个Channel对应唯一的ChannelPipeline,多个Handler串行的加入到Pipeline中,每个Handler关联唯一的ChannelHandlerContext。TCP网关长连接容器的Handler就是放在Pipeline的中。

    我们知道TCP属于OSI的传输层,所以建立Session管理机制构建会话层来提供应用层服务,可以极大的降低系统复杂度。所以,每一个Channel对应一个Connection,一个Connection又对应一个Session,Session由Session Manager管理,Session与Connection是一一对应,Connection保存着ChannelHandlerContext (ChannelHanderContext可以找到Channel), Session通过心跳机制来保持Channel的Active状态。

    每一次Session的会话请求(ChannelRead)都是通过Proxy代理机制调用Service层,数据请求完毕后通过写入ChannelHandlerConext再传送到Channel中。

    数据下行主动推送也是如此,通过Session Manager找到Active的Session,轮询写入Session中的ChannelHandlerContext,就可以实现广播或点对点的数据推送逻辑。如下图所示。

    注意:请点击图像以查看清晰的视图!

    京麦TCP网关使用Netty Channel进行数据通信,使用Protobuf进行序列化和反序列化,每个请求都将被封装成Byte二进制字节流,在整个生命周期中,Channel保持长连接,而不是每次调用都重新创建Channel,达到链接的复用。

    我们接下来来看看基于Netty的具体技术实践。

    3、TCP网关Netty Server的IO模型

    具体的实现过程如下

    • 1)创建ServerBootstrap,设定BossGroup与WorkerGroup线程池;
    • 2)bind指定的port,开始侦听和接受客户端链接(如果系统只有一个服务端port需要监听,则BossGroup线程组线程数设置为1);
    • 3)在ChannelPipeline注册childHandler,用来处理客户端链接中的请求帧。

    4、TCP网关的线程模型

    TCP网关使用Netty的线程池,共三组线程池,分别为BossGroup、WorkerGroup和ExecutorGroup。

    其中,BossGroup用于接收客户端的TCP连接,WorkerGroup用于处理I/O、执行系统Task和定时任务,ExecutorGroup用于处理网关业务加解密、限流、路由,及将请求转发给后端的抓取服务等业务操作。

    注意:请点击图像以查看清晰的视图!

    NioEventLoop是Netty的Reactor线程,其角色

    • 1)Boss Group:作为服务端Acceptor线程,用于accept客户端链接,并转发给WorkerGroup中的线程;
    • 2)Worker Group:作为IO线程,负责IO的读写,从SocketChannel中读取报文或向SocketChannel写入报文;
    • 3)Task Queue/Delay Task Queu:作为定时任务线程,执行定时任务,例如链路空闲检测和发送心跳消息等。

    5、TCP网关执行时序图

    注意:请点击图像以查看清晰的视图!

    如上图所示,其中步骤一至步骤九是Netty服务端的创建时序,步骤十至步骤十三是TCP网关容器创建的时序。

    步骤一:创建ServerBootstrap实例,ServerBootstrap是Netty服务端的启动辅助类。

    步骤二:设置并绑定Reactor线程池,EventLoopGroup是Netty的Reactor线程池,EventLoop负责所有注册到本线程的Channel。

    步骤三:设置并绑定服务器Channel,Netty Server需要创建NioServerSocketChannel对象。

    步骤四:TCP链接建立时创建ChannelPipeline,ChannelPipeline本质上是一个负责和执行ChannelHandler的职责链。

    步骤五:添加并设置ChannelHandler,ChannelHandler串行的加入ChannelPipeline中。

    步骤六:绑定监听端口并启动服务端,将NioServerSocketChannel注册到Selector上。

    步骤七:Selector轮训,由EventLoop负责调度和执行Selector轮询操作。

    步骤八:执行网络请求事件通知,轮询准备就绪的Channel,由EventLoop执行ChannelPipeline。

    步骤九:执行Netty系统和业务ChannelHandler,依次调度并执行ChannelPipeline的ChannelHandler。

    步骤十:通过Proxy代理调用后端服务,ChannelRead事件后,通过发射调度后端Service。

    步骤十一:创建Session,Session与Connection是相互依赖关系。

    步骤十二:创建Connection,Connection保存ChannelHandlerContext。

    步骤十三:添加SessionListener,SessionListener监听SessionCreate和SessionDestory等事件。

    6、TCP网关源码分析

    6.1 Session管理

    Session是客户端与服务端建立的一次会话链接,会话信息中保存着SessionId、连接创建时间、上次访问事件,以及Connection和SessionListener,在Connection中保存了Netty的ChannelHandlerContext上下文信息。Session会话信息会保存在SessionManager内存管理器中。

    创建Session的源码

    @Override
    public synchronized Session createSession(String sessionId, ChannelHandlerContext ctx){
        Session session = sessions.get(sessionId);
        if (session != null){
            session.close();
        }
        session = new ExchangeSession();
        session.setSessionId(sessionId);
        session.setValid(true);
        session.setMaxInactiveInterval(this.getMaxInactiveInterval());
        session.setCreationTime(System.currentTimeMillis());
        session.setLastAccessedTime(System.currentTimeMillis());
        session.setSessionManager(this);
        session.setConnection(createTcpConnection(session, ctx));
        for (SessionListener listener : essionListeners){
            session.addSessionListener(listener);
        }
        return session;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    通过源码分析,如果Session已经存在销毁Session,但是这个需要特别注意,创建Session一定不要创建那些断线重连的Channel,否则会出现Channel被误销毁的问题。因为如果在已经建立Connection(1)的Channel上,再建立Connection(2),进入session.close方法会将cxt关闭,Connection(1)和Connection(2)的Channel都将会被关闭。在断线之后再建立连接Connection(3),由于Session是有一定延迟,Connection(3)和Connection(1/2)不是同一个,但Channel可能是同一个。

    所以,如何处理是否是断线重练的Channel,具体的方法是在Channel中存入SessionId,每次事件请求判断Channel中是否存在SessionId,如果Channel中存在SessionId则判断为断线重连的Channel,代码如下图所示。

    private String getChannelSessionHook(ChannelHandlerContext ctx){
        return ctx.channel().attr (Constants.SERVER_SESSION_HOOK).get();
    }
    
    private void setChannelSessionHook(ChannelHandlerContext ctx, String sessionId){
        ctx.channel().attr(Constants.SERVER_SESSION_HOOK).set(sessionId);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    6.2 心跳

    心跳用于检测保持连接的客户端是否仍然活跃,客户端每隔一段时间发送一次心跳包到服务端,服务端收到心跳后更新 Session 的最后访问时间。

    在服务端,长连接会话检测通过轮询 Session 集合来判断最后访问时间是否过期,如果过期,则关闭 Session 和 Connection,包括从内存中删除,同时注销 Channel 等。如下面代码所示。

    Session session = tcpSessionManager.createSession(wrapper.getSessionId(), ctx);
    session.addSessionListener(tcpHeartbeatListener);
    session.connect();
    
    tcpSessionManager.addSession(session);
    
    • 1
    • 2
    • 3
    • 4
    • 5

    通过源码分析,在每个Session创建成功之后,都会在Session中添加TcpHeartbeatListener这个心跳检测的监听,TcpHeartbeatListener是一个实现了SessionListener接口的守护线程,通过定时休眠轮询Sessions检查是否存在过期的Session,如果轮训出过期的Session,则关闭Session。如下面代码所示。

    public void checkHeartBeat(){
        Session[] sessions = tcpSessionManager.getSessions();
        for (Session session : sessions){
            if (session.expire()){
                session.close();
                logger.info("heart is expire, clear sessionId:"+ session.getSessionId());
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    同时,注意到session.connect方法,在connect方法中会对Session添加的Listeners进行添加时间,它会循环调用所有Listner的sessionCreated事件,其中TcpHeartbeatListener也是在这个过程中被唤起。如下面代码所示。

    private void addSessionEvent(){
        SessionEvent event = new SessionEvent(this);
        for (SessionListener listener : listeners){
            try{
                listener.sessionCreated(event);
                logger.info("SessionListener" + listener + ".sessionCreated() is invoked successfully!");
            } catch (Exception e){
                logger.error("addSessionEvent error.", e);
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    6.3 数据上行

    数据上行特指从客户端发送数据到服务端,数据从ChannelHander的channelRead方法获取数据。数据包括创建会话、发送心跳、数据请求等。这里注意的是,channelRead的数据包括客户端主动请求服务端的数据,以及服务端下行通知客户端的返回数据,所以在处理object数据时,通过数据标识区分是请求-应答,还是通知-回复。如下面代码所示。

    public void channelRead(ChannelHandlerContext ctx, Object o) throws Exception{
        try{
            if (o instanceof MessageBuf.JMTransfer) {
                SystemMessage sMsg = generateSystemMessage(ctx);
                MessageBuf.JMTransfer message = (MessageBuf.JMTransfer) o;
                //inbound
                if(message.getFormat() == SEND) {
                    MessageWrapper wrapper = proxy.invoke(sMsg, message);
                    if (wrapper != null)
                        this.receive(ctx, wrapper);
                }
                // outbound
                if (message.getFormat() == REPLY) {
                    notify.reply(message);
                }
            }else{
                logger. warn("TcpServerHandler channelRead message is not proto.");
            }
        }catch (Exception e) {
            logger.error("TcpServerHandler TcpServerHandler handler error.", e);
            throw e;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    6.4 数据下行

    数据下行通过MQ广播机制到所有服务器,所有服务器收到消息后,获取当前服务器所持有的所有Session会话,进行数据广播下行通知。

    如果是点对点的数据推送下行,数据也是先广播到所有服务器,每个服务器判断推送的端是否是当前服务器持有的会话,如果判断消息数据中的信息是在当前服务,则进行推送,否则抛弃。如下面代码所示。

    private Notifyfuture doSendAsync(long seq, Messagelrapper wrapper, int timeout) throws Exception {
        if (wrapper == null) {
            throw new Exception("wrapper cannot be null.");
        }
        String sessionId = wrapper.getSessionId();
        if (StringUtils.isBlank(sessionId)) {
            throw new Exception("sessionId cannot be null.")
        }
        if (tcpConnector.exist sessionId)) {
            //start.
            final NotifyFuture future = new NotifyFuture(timeout);
            this.futureMap.put(seq, future);
            tcpConnector.send(sessionId, wrapper.getBody());
            future.setSentTime(System.currentTimeMillis()); // 置为已发送return future.
        } else {
            // tcpConnector not exist sessionId
            return null;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    通过源码分析,数据下行则通过NotifyProxy的方式发送数据,需要注意的是Netty是NIO,如果下行通知需要获取返回值,则要将异步转同步,所以NotifyFuture是实现java.util.concurrent.Future的方法,通过设置超时时间,在channelRead获取到上行数据之后,通过seq来关联NotifyFuture的方法。如下面代码所示。

    public void reply (MessageBuf.JMTransfer message) throws Exception {
        try {
            long seg = message.getSeq();
            final NotifyFuture future = this.futureMap.get(seg);
            if (future != null){
                future.setSuccess(true);
                futureMap.remove(seg);
            }
        } catch (Exception e) {
            throw e;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    下行的数据通过TcpConnector的send方法发送,send方式则是通过ChannelHandlerContext的writeAndFlush方法写入Channel,并实现数据下行,这里需要注意的是,之前有另一种写法就是cf.await,通过阻塞的方式来判断写入是否成功,这种写法偶发出现BlockingOperationException的异常。如下面代码所示。

    ChannelFuture cf = cxt.writeAndFlush(message);
    cf.addListener(new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) throws PushException{
            if (future.isSuccess()) {
                logger.debug("send success.");
            } else {
                throw new PushException("Failed to send message.");
            }
            Throwable cause = future.cause() ;
            if (cause != null) {
                throw new PushException(cause);
            }
        }
    });
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    使用阻塞获取返回值的写法

    boolean success = true;
    boolean sent = true;
    int timeout = 60;
    try {
        ChannelFuture cf = cxt.write(message);
        cxt.flush();
        if (sent){
            success = cf.await(timeout);
        }
        if (cf.isSuccess()) {
            logger.debug("send success.");
        }
        Throwable cause = cf.cause();
        if (cause != null) {
            this.fireError(new PushException(cause));
        }
    } catch (Throwable e) {
        this.fireError(new PushException("Failed to send message, cause:" + e. getMessage(),e));
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    关于BlockingOperationException的问题我在StackOverflow进行提问,非常幸运的得到了Norman Maurer(Netty的核心贡献者之一)的解答

    最终结论大致分析出,在执行write方法时,Netty会判断current thread是否就是分给该Channe的EventLoop,如果是则行线程执行IO操作,否则提交executor等待分配。

    当执行await方法时,会从executor里fetch出执行线程,这里就需要checkDeadLock,判断执行线程和current threads是否时同一个线程,如果是就检测为死锁抛出异常BlockingOperationException。

    说在最后:有问题可以找老架构取经

    架构之路,充满了坎坷

    架构和高级开发不一样 , 架构问题是open/开放式的,架构问题是没有标准答案的

    正由于这样,很多小伙伴,尽管耗费很多精力,耗费很多金钱,但是,遗憾的是,一生都没有完成架构升级

    所以,在架构升级/转型过程中,确实找不到有效的方案,可以来找40岁老架构尼恩求助.

    前段时间一个小伙伴,他是跨专业来做Java,现在面临转架构的难题,但是经过尼恩几轮指导,顺利拿到了Java架构师+大数据架构师offer 。所以,如果遇到职业不顺,找老架构师帮忙一下,就顺利多了。

    推荐阅读

    百亿级访问量,如何做缓存架构设计

    多级缓存 架构设计

    消息推送 架构设计

    阿里2面:你们部署多少节点?1000W并发,当如何部署?

    美团2面:5个9高可用99.999%,如何实现?

    网易一面:单节点2000Wtps,Kafka怎么做的?

    字节一面:事务补偿和事务重试,关系是什么?

    网易一面:25Wqps高吞吐写Mysql,100W数据4秒写完,如何实现?

    亿级短视频,如何架构?

    炸裂,靠“吹牛”过京东一面,月薪40K

    太猛了,靠“吹牛”过顺丰一面,月薪30K

    炸裂了…京东一面索命40问,过了就50W+

    问麻了…阿里一面索命27问,过了就60W+

    百度狂问3小时,大厂offer到手,小伙真狠!

    饿了么太狠:面个高级Java,抖这多硬活、狠活

    字节狂问一小时,小伙offer到手,太狠了!

    收个滴滴Offer:从小伙三面经历,看看需要学点啥?

    《尼恩 架构笔记》《尼恩高并发三部曲》《尼恩Java面试宝典》PDF,请到下面公号【技术自由圈】取↓↓↓

  • 相关阅读:
    webgl 系列 —— 变换矩阵和动画
    12 款小众宝藏AI工具,90% 的开发者不了解
    【34W字CISSP备考笔记】域1:安全与风险管理
    C++求最大公因数(gcd)的六重境界
    第二课 Flink 安装部署、环境配置及运行应用程序(1)
    设计模式六大原则
    脏读、不可重复读、幻读、丢失更新
    JAVA计算机毕业设计扶贫信息管理系统Mybatis+源码+数据库+lw文档+系统+调试部署
    数仓开发之DIM层
    大语言模型助力审计问题自动定性
  • 原文地址:https://blog.csdn.net/crazymakercircle/article/details/133845013