• Android性能优化系列-腾讯matrix-流量监控之TrafficPlugin源码分析


    前言

    本篇进行matrix框架的网络流量监控模块的代码分析。你可能想,为什么需要对流量进行监控呢?我们平常进行的网络接口请求都是一些必要的操作,监控它的意义何在?首先我们要明确流量监控的对象是什么,是上行(发请求消耗的流量)和下行(接收到服务器返回的数据流量)这两块消耗的用户流量。通过这个监控,我们可以清晰的看到每个接口在每次调用时所消耗的流量的具体值,有了这个数据之后,我们可以从两个维度来分析流量问题。第一,明确单次接口请求是否存在过多消耗流量的情况,从而促进网络数据包的体积优化;第二,从多次请求的维度来看,也能帮助我们定位是否存在单个接口请求数量异常的问题,从而定位代码存在的业务逻辑问题。笔者在万能钥匙的时候,就曾遇到过类似的情况,某接口在应用启动阶段频繁调用,导致用户流量消耗过多被用户投诉的问题。试想假如当时做了流量监控,那么对开发团队来说,就可以更高效的定位到问题所在。

    言归正传,我们进入今天的代码分析,分析的对象时matrix中的TrafficPlugin,我们从它的几个关键方法入手。

    • 静态代码块
    • start
    • stop

    静态代码块

    static {
        System.loadLibrary("matrix-traffic");
    }
    
    • 1
    • 2
    • 3

    根据加载的名称matrix-traffic找到MatrixTraffic.cc这个c++的class,loadLibrary方法执行的时候会进入到JNI_OnLoad方法,JNI_OnLoad方法通常用来做一些准备性的工作,用于后边c++层和Java层的一个互调,下面是一部分关键代码,可以看到,这里将Java层的TrafficPlugin保存为全局引用,并获取了它的setStackTrace方法备用,并动态注册了一些Java层到native层方法的映射关系。

    JNIEXPORT jint JNICALL JNI_OnLoad(JavaVM *vm, void *) {
        jclass trafficCollectorCls = env->FindClass("com/tencent/matrix/traffic/TrafficPlugin");
        if (!trafficCollectorCls)
            return -1;
        //保存TrafficPlugin的jclass对象为全局引用
        gJ.TrafficPlugin = static_cast(env->NewGlobalRef(trafficCollectorCls));
        //保存TrafficPlugin的setStackTrace方法为全局引用
        gJ.TrafficPlugin_setFdStackTrace =
                env->GetStaticMethodID(trafficCollectorCls, "setStackTrace", "(Ljava/lang/String;Ljava/lang/String;)V");
        //动态注册一些Java层方法到native方法的映射关系
        if (env->RegisterNatives(
                trafficCollectorCls, TRAFFIC_METHODS, static_cast(NELEM(TRAFFIC_METHODS))) != 0)
            return -1;
        return JNI_VERSION_1_6;
    } 
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    start

    通过nativeInitMatrixTraffic方法调用进入native层,根据上边JNI_OnLoad动态注册的映射关系找到MatrixTraffic.cc中的nativeInitMatrixTraffic方法。

    @Override
    public void start() {
        //这里可以设置需要过滤的so
        String[] ignoreSoFiles = trafficConfig.getIgnoreSoFiles();
        //进入native层
        nativeInitMatrixTraffic(trafficConfig.isRxCollectorEnable(), trafficConfig.isTxCollectorEnable(), trafficConfig.willDumpStackTrace(), trafficConfig.willDumpNativeBackTrace(), trafficConfig.willLookupIpAddress(), ignoreSoFiles);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    MatrixTraffic.cc中的nativeInitMatrixTraffic方法。

    static void nativeInitMatrixTraffic(JNIEnv *env, jclass, jboolean rxEnable, jboolean txEnable, jboolean dumpStackTrace, jboolean dumpNativeBackTrace, jboolean lookupIpAddress, jobjectArray ignoreSoFiles) {
        //启动loop循环线程
        TrafficCollector::startLoop(dumpStackTrace == JNI_TRUE, lookupIpAddress == JNI_TRUE);
        //是否dump native堆栈
        sDumpNativeBackTrace = (dumpNativeBackTrace == JNI_TRUE);
        //需要过滤的so
        ignoreSo(env, ignoreSoFiles);
        //通过hook socket实现对网络请求的拦截
        hookSocket(rxEnable == JNI_TRUE, txEnable == JNI_TRUE);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    startLoop

    首先startLoop启动循环线程

    void TrafficCollector::startLoop(bool dumpStackTrace, bool lookupIpAddress) {
        thread loopThread(loop);
        loopThread.detach();
    }
    
    • 1
    • 2
    • 3
    • 4

    loop循环线程是作为一个消费者线程出现的,我们先跳过这个方法的具体实现,先把生产者生产数据的过程看一下。

    void loop() {
        while (loopRunning) {
            if (msgQueue.empty()) {
                queueMutex.lock();
            } else {
                ...
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    hookSocket

    网络请求最终都是通过底层socket进行发起的,所以通过hook socket的方式可以拦截到所有的网络请求,这里是用了plt hook的方式,什么是plt hook可以参考 爱奇艺的xhook框架介绍 。为了使代码更简洁,用…省略了部分代码。

    static void hookSocket(bool rxHook, bool txHook) {
        //连接和关闭
        xhook_grouped_register(..., ".*\.so$", "connect",(void *) my_connect, (void **) (&original_connect));
        xhook_grouped_register(..., ".*\.so$", "close",(void *) my_close, (void **) (&original_close));
        //接收的数据监控
        if (rxHook) {
            xhook_grouped_register(..., ".*\.so$", "read",(void *) my_read, (void **) (&original_read));
            xhook_grouped_register(..., ".*\.so$", "recv",(void *) my_recv, (void **) (&original_recv));
            xhook_grouped_register(..., ".*\.so$", "recvfrom",(void *) my_recvfrom, (void **) (&original_recvfrom));
            xhook_grouped_register(..., ".*\.so$", "recvmsg",(void *) my_recvmsg, (void **) (&original_recvmsg));
        }
        //上传的数据监控
        if (txHook) {
            xhook_grouped_register(..., ".*\.so$", "write",(void *) my_write, (void **) (&original_write));
            xhook_grouped_register(..., ".*\.so$", "send",(void *) my_send, (void **) (&original_send));
            xhook_grouped_register(.., ".*\.so$", "sendto",(void *) my_sendto, (void **) (&original_sendto));
            xhook_grouped_register(.., ".*\.so$", "sendmsg",(void *) my_sendmsg, (void **) (&original_sendmsg));
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    可以看到这里hook了socket的一些关键方法,这些方法被hook之后,当方法再次被调用的时候,我们就可以拦截到它的执行,从而做一些额外的处理。

    • connect
    • close
    • read
    • recv
    • recvfrom
    • recvmsg
    • write
    • send
    • sendto
    • sendmsg

    connect

    int my_connect(int fd, sockaddr *addr, socklen_t addr_length) {
        TrafficCollector::enQueueConnect(fd, addr, addr_length);
        return original_connect(fd, addr, addr_length);
    }
    
    • 1
    • 2
    • 3
    • 4

    通过调用TrafficCollector的enQueueConnect方法记录本次socket连接的信息,将MSG_TYPE_CONNECT类型,文件描述符,socket地址,调用栈等信息封装成TrafficMsg存入msgQueue队列。

    void TrafficCollector::enQueueConnect(int fd, sockaddr *addr, socklen_t addr_length) {
        //将MSG_TYPE_CONNECT类型,文件描述符,socket地址,调用栈等信息封装成TrafficMsg存入msgQueue队列
        shared_ptr msg = make_shared(MSG_TYPE_CONNECT, fd, addr->sa_family, getKeyAndSaveStack(fd), 0);
        msgQueue.push(msg);
        queueMutex.unlock();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    close

    int my_close(int fd) {
        TrafficCollector::enQueueClose(fd);
        return original_close(fd);
    }
    
    • 1
    • 2
    • 3
    • 4

    通过调用TrafficCollector的enQueueClose方法记录本次socket关闭的信息,将MSG_TYPE_CLOSE类型,文件描述符封装成TrafficMsg存入msgQueue队列。

    void TrafficCollector::enQueueClose(int fd) {
        shared_ptr msg = make_shared(MSG_TYPE_CLOSE, fd, 0, "", 0);
        msgQueue.push(msg);
        queueMutex.unlock();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    read

    ssize_t my_read(int fd, void *buf, size_t count) {
        ssize_t ret = original_read(fd, buf, count);
        TrafficCollector::enQueueRx(MSG_TYPE_READ, fd, ret);
        return ret;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    type为MSG_TYPE_READ,调用TrafficCollector的enQueueRx方法。

    void enQueueMsg(int type, int fd, size_t len) {
        shared_ptr msg = make_shared(type, fd, 0, getKeyAndSaveStack(fd), len);
        msgQueue.push(msg);
        queueMutex.unlock();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    recv

    ssize_t my_recv(int sockfd, void *buf, size_t len, int flags) {
        ssize_t ret = original_recv(sockfd, buf, len, flags);
        TrafficCollector::enQueueRx(MSG_TYPE_RECV, sockfd, ret);
        return ret;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    type为MSG_TYPE_RECV,调用TrafficCollector的enQueueRx方法。

    void enQueueMsg(int type, int fd, size_t len) {
        shared_ptr msg = make_shared(type, fd, 0, getKeyAndSaveStack(fd), len);
        msgQueue.push(msg);
        queueMutex.unlock();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    recvfrom

    ssize_t my_recvfrom(int sockfd, void *buf, size_t len, int flags,
                        struct sockaddr *src_addr, socklen_t *addrlen) {
        ssize_t ret = original_recvfrom(sockfd, buf, len, flags, src_addr, addrlen);
        TrafficCollector::enQueueRx(MSG_TYPE_RECVFROM, sockfd, ret);
        return ret;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    type为MSG_TYPE_RECVFROM,调用TrafficCollector的enQueueRx方法。

    recvmsg

    ssize_t my_write(int fd, const void *buf, size_t count) {
        ssize_t ret = original_write(fd, buf, count);
        TrafficCollector::enQueueTx(MSG_TYPE_WRITE, fd, ret);
        return ret;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    type为MSG_TYPE_RECVMSG,调用TrafficCollector的enQueueRx方法。

    write

    ssize_t my_send(int sockfd, const void *buf, size_t len, int flags) {
        ssize_t ret = original_send(sockfd, buf, len, flags);
        TrafficCollector::enQueueTx(MSG_TYPE_SEND, sockfd, ret);
        return ret;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    type为MSG_TYPE_WRITE,调用TrafficCollector的enQueueRx方法。

    send

    ssize_t my_send(int sockfd, const void *buf, size_t len, int flags) {
        ssize_t ret = original_send(sockfd, buf, len, flags);
        TrafficCollector::enQueueTx(MSG_TYPE_SEND, sockfd, ret);
        return ret;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    type为MSG_TYPE_SEND,调用TrafficCollector的enQueueRx方法。

    sendto

    ssize_t my_sendto(int sockfd, const void *buf, size_t len, int flags,
                      const struct sockaddr *dest_addr, socklen_t addrlen) {
        ssize_t ret = original_sendto(sockfd, buf, len, flags, dest_addr, addrlen);
        TrafficCollector::enQueueTx(MSG_TYPE_SENDTO, sockfd, ret);
        return ret;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    type为MSG_TYPE_SENDTO,调用TrafficCollector的enQueueRx方法。

    sendmsg

    ssize_t my_sendmsg(int sockfd, const struct msghdr *msg, int flags) {
        ssize_t ret = original_sendmsg(sockfd, msg, flags);
        TrafficCollector::enQueueTx(MSG_TYPE_SENDMSG, sockfd, ret);
        return ret;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    type为MSG_TYPE_SENDMSG,调用TrafficCollector的enQueueRx方法。

    enQueueRx

    可以看到上边除了connect和close方法外,其他都调用到了TrafficCollector的enQueueRx方法,我们看看这个方法做了什么。

    void TrafficCollector::enQueueTx(int type, int fd, size_t len) {
        enQueueMsg(type, fd, len);
    }
    
    • 1
    • 2
    • 3

    void enQueueMsg(int type, int fd, size_t len) {
        shared_ptr msg = make_shared(type, fd, 0, getKeyAndSaveStack(fd), len);
        msgQueue.push(msg);
        queueMutex.unlock();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    enQueueRx方法在不断的封装TrafficMsg对象并存入队列msgQueue,每个方法间区别在于type的定义不同。所以到这里我们可以有一个简单的结论了: 被hook的这些方法,执行时会不断的获取socket此时的信息,封装成TrafficMsg对象存入队列供消费者线程进行消费,所以这里扮演的角色就是生产者线程。此时我们回头再去看loop线程。

    loop

    loop线程就是上边提到的消费者线程,消费线程不断的循环,当msgQueue中有数据时,就开始做进一步的处理。

    void loop() {
        while (loopRunning) {
            if (msgQueue.empty()) {
                queueMutex.lock();
            } else {
                shared_ptr msg = msgQueue.front();
                if (msg->type == MSG_TYPE_CONNECT) {
                    //socket开始连接,以文件描述符为key, 地址为value存入fdFamilyMap中
                    fdFamilyMap[msg->fd] = msg->sa_family;
                } else if (msg->type == MSG_TYPE_READ) {
                    //接收数据,开始read,假如fdFamilyMap存在,这个fd,说明已连接过
                    if (fdFamilyMap.count(msg->fd) > 0) {
                        appendRxTraffic(msg->threadName, msg->len);
                    }
                } else if (msg->type >= MSG_TYPE_RECV && msg->type <= MSG_TYPE_RECVMSG) {
                    //接收数据
                    if (fdFamilyMap[msg->fd] != AF_LOCAL) {
                        appendRxTraffic(msg->threadName, msg->len);
                    }
                } else if (msg->type == MSG_TYPE_WRITE) {
                    //写入数据
                    if (fdFamilyMap.count(msg->fd) > 0) {
                        appendTxTraffic(msg->threadName, msg->len);
                    }
                } else if (msg->type >= MSG_TYPE_SEND && msg->type <= MSG_TYPE_SENDMSG) {
                    //写入数据
                    if (fdFamilyMap[msg->fd] != AF_LOCAL) {
                        appendTxTraffic(msg->threadName, msg->len);
                    }
                } else if (msg->type == MSG_TYPE_CLOSE) {
                    //关闭
                    fdThreadNameMapLock.lock();
                    fdThreadNameMap.erase(msg->fd);
                    fdThreadNameMapLock.unlock();
                    fdFamilyMap.erase(msg->fd);
                }
                msgQueue.pop();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40

    从上边代码的注释可以看到,关键的两个方法是上传数据时调用的appendTxTraffic用来记录上行的数据流量,接收数据时调用的appendRxTraffic方法用来记录下行的数据流量,所以读或写的过程也就是不断的实时记录流量的过程。

    appendTxTraffic

    以线程名为key, 流量值为value存入txTrafficInfoMap中。

    void appendTxTraffic(const string& threadName, long len) {
        txTrafficInfoMapLock.lock();
        txTrafficInfoMap[threadName] += len;
        txTrafficInfoMapLock.unlock();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    appendRxTraffic

    以线程名为key, 流量值为value存入rxTrafficInfoMap中。

    void appendRxTraffic(const string& threadName, long len) {
        rxTrafficInfoMapLock.lock();
        rxTrafficInfoMap[threadName] += len;
        rxTrafficInfoMapLock.unlock();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    看到这里感觉有点奇怪了,怎么只是将数据记录到对应的map中,什么时候取的数据?在TrafficPlugin.java中我们可以找到这个方法getTrafficInfoMap,它的返回值是HashMap,其实就是上边提到的存储起来的流量信息。

    getTrafficInfoMap

    public HashMap getTrafficInfoMap(int type) {
        //进入native层的方法
        return nativeGetTrafficInfoMap(type);
    }
    
    • 1
    • 2
    • 3
    • 4

    来到TrafficCollector的getTrafficInfoMap方法。

    static jobject nativeGetTrafficInfoMap(JNIEnv *env, jclass, jint type) {
        return TrafficCollector::getTrafficInfoMap(type);
    }
    
    • 1
    • 2
    • 3

    可以看到,下面的逻辑很清晰,通过构造一个Java层的HashMap对象,并将指定类型的信息从c++层的map对象中转移到HashMap中,这样一来,就实时的拿到了当前流量消耗的数据,数据包含线程名和流量值,拿到线程名后可以通过getStackTraceMap拿到线程名和堆栈信息的映射关系,从而获取到实时的调用堆栈信息。

    jobject TrafficCollector::getTrafficInfoMap(int type) {
        ...
        if (type == TYPE_GET_TRAFFIC_RX) {
            //接收的数据
            for (auto & it : rxTrafficInfoMap) {
                //线程名
                jstring threadName = env->NewStringUTF(it.first.c_str());
                //流量长度,是一个数值型
                jstring traffic = env->NewStringUTF(to_string(it.second).c_str());
                env->CallObjectMethod(jHashMap, mapPut, threadName, traffic);
            }
        } else if (type == TYPE_GET_TRAFFIC_TX) {
            //上传的数据
            for (auto & it : txTrafficInfoMap) {
                jstring threadName = env->NewStringUTF(it.first.c_str());
                jstring traffic = env->NewStringUTF(to_string(it.second).c_str());
                env->CallObjectMethod(jHashMap, mapPut, threadName, traffic);
            }
        }
        return jHashMap;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    看一个效果图

    stop

    stop方法做的就是清理资源的工作了,因为核心功能都在native层,所以清理的工作还是会进入native层做处理,第一跳出循环线程,第二清理内存中的map映射表,至此,流量监控的代码分析完成。

    @Override
    public void stop() {
        nativeReleaseMatrixTraffic();
    }
    
    • 1
    • 2
    • 3
    • 4

    static void nativeReleaseMatrixTraffic(JNIEnv *env, jclass) {
        //停止循环线程
        TrafficCollector::stopLoop();
        //清理所有的映射表
        TrafficCollector::clearTrafficInfo();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    总结

    流量监控的实现方式是通过hook c++层socket的发起和接收相关的方法,拦截到对应方法从而对过程中涉及到的流量信息进行采集,采集到的数据就可以实时的获取到,以做进一步的分析。

    Android 学习笔录

    Android 性能优化篇:https://qr18.cn/FVlo89
    Android Framework底层原理篇:https://qr18.cn/AQpN4J
    Android 车载篇:https://qr18.cn/F05ZCM
    Android 逆向安全学习笔记:https://qr18.cn/CQ5TcL
    Android 音视频篇:https://qr18.cn/Ei3VPD
    Jetpack全家桶篇(内含Compose):https://qr18.cn/A0gajp
    OkHttp 源码解析笔记:https://qr18.cn/Cw0pBD
    Kotlin 篇:https://qr18.cn/CdjtAF
    Gradle 篇:https://qr18.cn/DzrmMB
    Flutter 篇:https://qr18.cn/DIvKma
    Android 八大知识体:https://qr18.cn/CyxarU
    Android 核心笔记:https://qr21.cn/CaZQLo
    Android 往年面试题锦:https://qr18.cn/CKV8OZ
    2023年最新Android 面试题集:https://qr18.cn/CgxrRy
    Android 车载开发岗位面试习题:https://qr18.cn/FTlyCJ
    音视频面试题锦:https://qr18.cn/AcV6Ap

  • 相关阅读:
    备战蓝桥杯---贪心刷题1
    3288S Android11 适配红外遥控功能(超详细)
    JSON.stringify格式化数据美化显示效果(不呆板地一行显示)
    自动化测试—web自动化—selenium初识
    基于51单片机的智能自动感应垃圾桶
    九、MFC控件(一)
    基于ABP实现DDD--实体创建和更新
    iwebsec靶场 SQL注入漏洞通关笔记6- 宽字节注入
    使用 Python 进行自然语言处理第 4 部分:文本表示
    基于JAVA校园超话网站计算机毕业设计源码+数据库+lw文档+系统+部署
  • 原文地址:https://blog.csdn.net/weixin_61845324/article/details/133897252