• dubbo的disconected问题


    现在在dubbo的provider中,出现了这样的日志:

    [INFO ] 2017-11-15 10:50:07,790--DubboServerHandler-10.255.242.97:20990-thread-517--[com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol]  [DUBBO] disconected from /10.255.242.96:11582,url:dubbo://10.255.242.97:20990/com.tms.express.service.ServiceA?anyhost=true&application=tms-express-service&channel.readonly.sent=true&codec=dubbo&default.accepts=4000&default.buffer=8192&default.connections=20&default.exporter.listener=apiMonitorProviderExporterListener&default.loadbalance=random&default.payload=88388608&default.queues=0&default.retries=0&default.service.filter=apiMonitorProviderFilter,,&default.threadpool=fixed&default.threads=600&default.weight=100&dubbo=2.8.3.2&generic=false&heartbeat=60000&interface=com.tms.express.service.ServiceA&logger=slf4j&methods=handlePassbackDataSf,orderWeightSchedule&owner=nobody&pid=6694&revision=1.0-SNAPSHOT&side=provider×tamp=1510563828892,dubbo version: 2.8.3.2, current host: 10.255.242.97
    
    • 1

    先说结论:

    1. 重启consumer服务。传说中的重启试试
    2. 如果一个服务器上部署了多个dubbo服务,最好修改dubbo缓存文件的位置,否则可能会因为争抢文件锁而导致缓存失败。缓存文件默认是/{user.home}/.dubbo/.dubbo/dubbo-registry-{zookeeper地址}.cache,里面记录了zookeeper中的provider和consumer地址,启动脚本里面加入-Ddubbo.registry.file参数,或者在dubbo配置文件中,都可以修改文件存放位置,具体方法网上一大堆。

    下面是分析过程:

    1. 日志里中说的是10.255.242.96:11582(consumer服务器)到10.255.242.97:20990(本机,provider服务器)的一个连接断开了,而且指定了service的名字是ServiceA。
    2. provider的日志每2秒报一次,甚至每2秒报很多次,每次的端口都不一样,但是能看到端口号是不断叠加的。
    3. 根据端口号11582并不能确定consumer服务器上具体是哪个dubbo服务和provider断开了连接,因为在consumer服务器上查看这个端口的时候,发现这个端口并没有被使用。
    4. 在consumer服务器上查看到provider服务器的连接,除了正常的dubbo连接之外,还有一个奇怪的端口连接了provider服务器,并且处在TIME_WAIT状态,没有pid。多次netstat之后发现每次处在TIME_WAIT状态的端口号都不一样。
    5. 根据service的名字可以知道具体是哪个dubbo服务,这个服务作为consumer连接了provider,在报这个日志的时候,两个服务之间的通信和调用都是正常的,不存在调不通的问题,系统功能并不受影响。

    根据service名字找到的consumer,在日志文件中可以看到这种日志:

    [INFO ] 2017-11-15 11:34:54,056--DubboClientReconnectTimer-thread-1--[com.alibaba.dubbo.remoting.transport.netty.NettyClient]  [DUBBO] Close new netty channel [id:0x56b32429, /10.255.242.96:45723 => /10.255.242.97:20990], because theclient closed., dubbo version: 2.8.3.2, current host: 10.255.242.96
    [INFO ] 2017-11-15 11:34:54,056--DubboSharedHandler-thread-337--[com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol]  [DUBBO] disconected from /10.255.242.97:20990,url:dubbo://10.255.242.97:20990/com.tms.express.service.ServiceA?anyhost=true&application=tms-kafka-producer&check=false&codec=dubbo&default.accepts=4000&default.buffer=8192&default.connections=20&default.exporter.listener=apiMonitorProviderExporterListener&default.loadbalance=random&default.payload=88388608&default.reference.filter=apiMonitorConsumerFilter,,&default.retries=0&default.service.filter=apiMonitorProviderFilter,,&default.weight=100&dubbo=2.8.3.2&generic=false&heartbeat=60000&interface=com.tms.express.service.ServiceA&logger=slf4j&methods=handlePassbackDataSf,OrderWeightSchedule&owner=nobody&pid=43193&protocol=dubbo&retries=0&revision=1.0-SNAPSHOT&side=consumer&timeout=60000×tamp=1502358440938,dubbo version: 2.8.3.2, current host: 10.255.242.96
    
    • 1
    • 2

    这种日志说明,consumer连接了provider,但是又把连接给关了,consumer关连接的时候,provider报了文章最上面的日志。

    很不幸的是有时候consumer里找不到同样service的disconected日志,但是provider里的disconected日志依然在刷,只有这一个consumer用到了ServiceA,最后发现情况是这样的:

    consumer日志中写的Service名字,和provider日志中写的service名字,不一定一样

    我的dubbo服务部署图

    provider服务器上有一个dubbo服务,提供了两个service,分别是ServiceA和ServiceB,consumer服务器上有两个dubbo服务,分别是ServiceA和ServiceB的consumer

    provider在报ServiceA的disconnect日志,而consumer服务1并没有类似日志,反倒是consumer服务2有这样的日志:

    [INFO ] 2017-11-15 11:34:54,056--DubboClientReconnectTimer-thread-1--[com.alibaba.dubbo.remoting.transport.netty.NettyClient]  [DUBBO] Close new netty channel [id:0x56b32429, /10.255.242.96:45723 => /10.255.242.97:20990], because theclient closed., dubbo version: 2.8.3.2, current host: 10.255.242.96
    [INFO ] 2017-11-15 11:34:54,056--DubboSharedHandler-thread-337-- [com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol]  [DUBBO] disconected from /10.255.242.97:20990,url:dubbo://10.255.242.97:20990/com.tms.express.service.ServiceB?anyhost=true&application=tms-kafka-producer&check=false&codec=dubbo&default.accepts=4000&default.buffer=8192&default.connections=20&default.exporter.listener=apiMonitorProviderExporterListener&default.loadbalance=random&default.payload=88388608&default.reference.filter=apiMonitorConsumerFilter,,&default.retries=0&default.service.filter=apiMonitorProviderFilter,,&default.weight=100&dubbo=2.8.3.2&generic=false&heartbeat=60000&interface=com.tms.express.service.ServiceB&logger=slf4j&methods=handlePassbackDataSf,OrderWeightSchedule&owner=nobody&pid=12193&protocol=dubbo&retries=0&revision=1.0-SNAPSHOT&side=consumer&timeout=60000×tamp=1502358440938,dubbo version: 2.8.3.2, current host: 10.255.242.96
    
    • 1
    • 2

    注意这段日志里写的service名字是ServiceB,和provider里提示的ServiceA并不一样,但是产生的原因是一样的。

    重启consumer服务2之后,provider和consumer服务2都没有这种日志了。

    我怀疑这种情况是dubbo搞错了

    下面是相关的dubbo源码:

    一,provider的日志来源:

    provider中disconected日志来源于com.alibaba.dubbo.rpc.protocol.dubbo包的DubboProtocol类,这个类里面有个属性ExchangeHandler,里面的部分方法是在属性定义的时候写的:

    public class DubboProtocol extendsAbstractProtocol {
       ......省略部分代码
       private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {
           public Object reply(ExchangeChannel channel, Object message) throwsRemotingException {
               if (message instanceof Invocation) {
                    Invocation inv = (Invocation)message;
                    Invoker invoker =getInvoker(channel, inv);
                    //如果是callback 需要处理高版本调用低版本的问题
                    if(Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))){
                        String methodsStr =invoker.getUrl().getParameters().get("methods");
                        boolean hasMethod = false;
                        if (methodsStr == null ||methodsStr.indexOf(",") == -1) {
                            hasMethod =inv.getMethodName().equals(methodsStr);
                        } else {
                            String[] methods =methodsStr.split(",");
                            for (String method :methods) {
                                if(inv.getMethodName().equals(method)) {
                                    hasMethod =true;
                                    break;
                                }
                            }
                        }
                        if (!hasMethod) {
                            logger.warn(newIllegalStateException("The methodName " + inv.getMethodName() +" not found in callback service interface ,invoke will be ignored. pleaseupdate the api interface. url is:" + invoker.getUrl()) + ",invocation is :" + inv);
                            return null;
                       }
                    }
                   RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
                    return invoker.invoke(inv);
               }
               throw new RemotingException(channel, "Unsupported request: " +message == null ? null : (message.getClass().getName() + ": " +message) + ", channel: consumer: " + channel.getRemoteAddress() +" --> provider: " + channel.getLocalAddress());
           }
     
           @Override
           public void received(Channel channel, Object message) throwsRemotingException {
               if (message instanceof Invocation) {
                    reply((ExchangeChannel)channel, message);
               } else {
                    super.received(channel,message);
               }
           }
     
           @Override
           public void connected(Channel channel) throws RemotingException {
               invoke(channel, Constants.ON_CONNECT_KEY);
           }
     
           @Override
           public void disconnected(Channel channel) throws RemotingException {
               if (logger.isInfoEnabled()) {
                    logger.info("disconectedfrom " + channel.getRemoteAddress() + ",url:" +channel.getUrl());
               }
               invoke(channel, Constants.ON_DISCONNECT_KEY);
           }
     
           private void invoke(Channel channel, String methodKey) {
               Invocation invocation = createInvocation(channel, channel.getUrl(),methodKey);
               if (invocation != null) {
                    try {
                        received(channel,invocation);
                   } catch (Throwable t) {
                        logger.warn("Failed toinvoke event method " + invocation.getMethodName() + "(), cause:" + t.getMessage(), t);
                    }
               }
           }
     
           private Invocation createInvocation(Channel channel, URL url, StringmethodKey) {
               String method = url.getParameter(methodKey);
               if (method == null || method.length() == 0) {
                    return null;
               }
               RpcInvocation invocation = new RpcInvocation(method, newClass[0], new Object[0]);
               invocation.setAttachment(Constants.PATH_KEY, url.getPath());
               invocation.setAttachment(Constants.GROUP_KEY,url.getParameter(Constants.GROUP_KEY));
               invocation.setAttachment(Constants.INTERFACE_KEY,url.getParameter(Constants.INTERFACE_KEY));
               invocation.setAttachment(Constants.VERSION_KEY,url.getParameter(Constants.VERSION_KEY));
               if (url.getParameter(Constants.STUB_EVENT_KEY, false)) {
                   invocation.setAttachment(Constants.STUB_EVENT_KEY,Boolean.TRUE.toString());
               }
               return invocation;
           }
        }
     
       ...省略部分代码
     
       private void openServer(URL url) {
           // find server.
            String key = url.getAddress();
           //client 也可以暴露一个只有server可以调用的服务。
           boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
           if (isServer) {
               ExchangeServer server = serverMap.get(key);
               if (server == null) {
                    serverMap.put(key,createServer(url));
               } else {
                    //server支持reset,配合override功能使用
                    server.reset(url);
               }
           }
        }
       private ExchangeServer createServer(URL url) {
            //默认开启server关闭时发送readonly事件
           url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY,Boolean.TRUE.toString());
           //默认开启heartbeat
           url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY,String.valueOf(Constants.DEFAULT_HEARTBEAT));
           String str = url.getParameter(Constants.SERVER_KEY,Constants.DEFAULT_REMOTING_SERVER);
     
           if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))
               throw new RpcException("Unsupported server type: " + str +", url: " + url);
     
           url = url.addParameter(Constants.CODEC_KEY,Version.isCompatibleVersion() ? COMPATIBLE_CODEC_NAME : DubboCodec.NAME);
           ExchangeServer server;
           try {
               server = Exchangers.bind(url, requestHandler);
           } catch (RemotingException e) {
               throw new RpcException("Fail to start server(url: " + url +") " + e.getMessage(), e);
           }
           str = url.getParameter(Constants.CLIENT_KEY);
           if (str != null && str.length() > 0) {
               Set supportedTypes =ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
                if (!supportedTypes.contains(str)) {
                    throw newRpcException("Unsupported client type: " + str);
               }
           }
           return server;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127

    requestHandler绑定到了provider的url上(DubboProtocol的openServer方法),用于响应dubbo的连接、断开、调用等请求,如果consumer到这个provider的连接断开了,就输出日志(requestHandler的disconnected方法)。

    二,consumer的日志来源:

    consumer的日志来自com.alibaba.dubbo.remoting.transport.netty包下的NettyClient类,doConnect方法:

       protected void doConnect() throws Throwable {
           long start = System.currentTimeMillis();
           ChannelFuture future = bootstrap.connect(getConnectAddress());
           try {
               boolean ret = future.awaitUninterruptibly(getConnectTimeout(),TimeUnit.MILLISECONDS);
     
               if (ret && future.isSuccess()) {
                    Channel newChannel =future.getChannel();
                   newChannel.setInterestOps(Channel.OP_READ_WRITE);
                    try {
                        // 关闭旧的连接
                        Channel oldChannel =NettyClient.this.channel; // copy reference
                        if (oldChannel != null) {
                            try {
                                if(logger.isInfoEnabled()) {
                                   logger.info("Close old netty channel " + oldChannel + "on create new netty channel " + newChannel);
                               }
                                oldChannel.close();
                            } finally {
                               NettyChannel.removeChannelIfDisconnected(oldChannel);
                            }
                        }
                    } finally {
                        if(NettyClient.this.isClosed()) {
                            try {
                                if(logger.isInfoEnabled()) {
                                   logger.info("Close new netty channel " + newChannel + ",because the client closed.");
                                }
                                newChannel.close();
                            } finally {
                               NettyClient.this.channel = null;
                               NettyChannel.removeChannelIfDisconnected(newChannel);
                            }
                        } else {
                           NettyClient.this.channel = newChannel;
                        }
                    }
               } else if (future.getCause() != null) {
                    throw newRemotingException(this, "client(url: " + getUrl() + ") failed toconnect to server "
                            + getRemoteAddress() +", error message is:" + future.getCause().getMessage(), future.getCause());
               } else {
                    throw newRemotingException(this, "client(url: " + getUrl() + ") failed toconnect to server "
                            + getRemoteAddress() +" client-side timeout "
                            + getConnectTimeout() +"ms (elapsed: " + (System.currentTimeMillis() - start) + "ms)from netty client "
                            +NetUtils.getLocalHost() + " using dubbo version " +Version.getVersion());
               }
           } finally {
               if (!isConnected()) {
                    future.cancel();
               }
           }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52

    这是consumer发起连接的时候调用的代码,先获取一个新连接,如果原来有个连接,就先把旧连接关掉,替换成新的连接,算是重连。如果新的连接也不能用,就把新的连接也关掉,也就是本文中的情况。

    本文中出现的disconect连接我觉得本不该出现,因为dubbo使用长连接,查看了provider和consumer之间的连接数也充足(配置的20个connection),所以除了这些之外的连接都是多余的,而且超过了20个之后再发起连接就只能失败,额外出现的这些试图发起的连接应该是在provider地址列表更新的时候,consumer的缓存文件缓存失败导致的错误。

  • 相关阅读:
    “蔚来杯“2022牛客暑期多校训练营(补题合集)
    c++ chrono
    会计学期末题库 含WORD版
    网络安全(黑客)——自学2024
    C#/.NET/.NET Core优秀项目框架推荐
    2021年下半年信息安全工程师下午真题及答案解析
    QT中多线程QThread使用解疑实例
    Spark scala编程练习题——通过日志统计网站访问量
    第九天 Python爬虫之Scrapy(框架工作原理 )
    深究防火墙
  • 原文地址:https://blog.csdn.net/bhegi_seg/article/details/126716916