• MessageQueue 深入理解Android卷2 学习笔记


    MessageQueue类封装了与消息队列有关的操作,一个以消息驱动的系统中,最重要部分就是消息队列和消息处理循环。
    MessageQueue的核心代码在native层,可以处理java和native的事件

    1.1MessageQueue创建

    构造方法,调用nativeInit
    frameworks/base/core/java/android/os/MessageQueue.java

    MessageQueue(boolean quitAllowed) {
        mQuitAllowed = quitAllowed;
        mPtr = nativeInit();
    }
    
    • 1
    • 2
    • 3
    • 4

    nativeInit是一个native方法,方法中调用构造初始化了native层的MessageQueue
    frameworks/base/core/jni/android_os_MessageQueue.cpp

    static jlong android_os_MessageQueue_nativeInit(JNIEnv* env, jclass clazz) {
        NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue();
        if (!nativeMessageQueue) {
            jniThrowRuntimeException(env, "Unable to allocate native queue");
            return 0;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    frameworks/base/core/jni/android_os_MessageQueue.cpp

    NativeMessageQueue::NativeMessageQueue() :
            mPollEnv(NULL), mPollObj(NULL), mExceptionObj(NULL) {
        mLooper = Looper::getForThread();
        if (mLooper == NULL) {
            mLooper = new Looper(false);
            Looper::setForThread(mLooper);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    如果mLooper为空,创建mLooper(是一种以线程为单位的单例模式)。通过setForThread与线程关联
    一个线程会有一个Looper来循环处理消息队列的消息,下列一行代码是获取线程的本地存储空间中的
    mLooper = Looper::getForThread();

    1.2 消息提取

    next()方法用于从消息队列中获取下一条消息,并返回该消息对象
    frameworks/base/core/java/android/os/MessageQueue.java

        @UnsupportedAppUsage
        Message next() {
            // Return here if the message loop has already quit and been disposed.
            // This can happen if the application tries to restart a looper after quit
            // which is not supported.
            final long ptr = mPtr;
            if (ptr == 0) {
                return null;
            }
    
            int pendingIdleHandlerCount = -1; // -1 only during first iteration
            int nextPollTimeoutMillis = 0;
            for (;;) {
                if (nextPollTimeoutMillis != 0) {
                    Binder.flushPendingCommands();
                }
    
                nativePollOnce(ptr, nextPollTimeoutMillis);
    
                synchronized (this) {
                    // Try to retrieve the next message.  Return if found.
                    final long now = SystemClock.uptimeMillis();
                    Message prevMsg = null;
                    Message msg = mMessages;
                    if (msg != null && msg.target == null) {
                        // Stalled by a barrier.  Find the next asynchronous message in the queue.
                        do {
                            prevMsg = msg;
                            msg = msg.next;
                        } while (msg != null && !msg.isAsynchronous());
                    }
                    if (msg != null) {
                        if (now < msg.when) {
                            // Next message is not ready.  Set a timeout to wake up when it is ready.
                            nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
                        } else {
                            // Got a message.
                            mBlocked = false;
                            if (prevMsg != null) {
                                prevMsg.next = msg.next;
                            } else {
                                mMessages = msg.next;
                            }
                            msg.next = null;
                            if (DEBUG) Log.v(TAG, "Returning message: " + msg);
                            msg.markInUse();
                            return msg;
                        }
                    } else {
                        // No more messages.
                        nextPollTimeoutMillis = -1;
                    }
    
                    // Process the quit message now that all pending messages have been handled.
                    if (mQuitting) {
                        dispose();
                        return null;
                    }
    
                    // If first time idle, then get the number of idlers to run.
                    // Idle handles only run if the queue is empty or if the first message
                    // in the queue (possibly a barrier) is due to be handled in the future.
                    if (pendingIdleHandlerCount < 0
                            && (mMessages == null || now < mMessages.when)) {
                        pendingIdleHandlerCount = mIdleHandlers.size();
                    }
                    if (pendingIdleHandlerCount <= 0) {
                        // No idle handlers to run.  Loop and wait some more.
                        mBlocked = true;
                        continue;
                    }
    
                    if (mPendingIdleHandlers == null) {
                        mPendingIdleHandlers = new IdleHandler[Math.max(pendingIdleHandlerCount, 4)];
                    }
                    mPendingIdleHandlers = mIdleHandlers.toArray(mPendingIdleHandlers);
                }
    
                // Run the idle handlers.
                // We only ever reach this code block during the first iteration.
                for (int i = 0; i < pendingIdleHandlerCount; i++) {
                    final IdleHandler idler = mPendingIdleHandlers[i];
                    mPendingIdleHandlers[i] = null; // release the reference to the handler
    
                    boolean keep = false;
                    try {
                        keep = idler.queueIdle();
                    } catch (Throwable t) {
                        Log.wtf(TAG, "IdleHandler threw exception", t);
                    }
    
                    if (!keep) {
                        synchronized (this) {
                            mIdleHandlers.remove(idler);
                        }
                    }
                }
    
                // Reset the idle handler count to 0 so we do not run them again.
                pendingIdleHandlerCount = 0;
    
                // While calling an idle handler, a new message could have been delivered
                // so go back and look again for a pending message without waiting.
                nextPollTimeoutMillis = 0;
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106

    首先,方法会检查消息循环是否已经退出并被销毁,如果是,则直接返回null。这种情况可能发生在应用程序在退出后尝试重新启动消息循环,但这是不被支持的。

    接下来,方法会初始化两个变量pendingIdleHandlerCount和nextPollTimeoutMillis。pendingIdleHandlerCount用于记录待运行的空闲处理器(IdleHandler)的数量,初始值为-1,表示在第一次迭代时。nextPollTimeoutMillis用于设置下一次轮询的超时时间,初始值为0。

    然后,方法进入一个无限循环,不断尝试从消息队列中获取下一条消息。

    在每次循环中,首先检查nextPollTimeoutMillis的值,如果不为0,则调用Binder.flushPendingCommands()方法来刷新待处理的Binder命令。

    接下来,调用nativePollOnce()方法来从底层获取下一条消息。该方法会阻塞当前线程,直到有消息到达或超时。

    然后,使用synchronized关键字对this进行同步,以确保多线程环境下的安全访问。

    在同步块中,首先尝试从消息队列中获取下一条消息。如果找到了消息,则判断该消息是否已经准备好处理。如果还未准备好,则计算出下一次轮询的超时时间,并更新nextPollTimeoutMillis的值。如果消息已经准备好处理,则将mBlocked标志设置为false,将该消息从消息队列中移除,并返回该消息。

    如果没有找到消息,则将nextPollTimeoutMillis的值设置为-1,表示没有更多的消息。

    接下来,检查mQuitting标志,如果为true,表示消息循环已经退出,需要进行清理操作,并返回null。

    然后,检查是否是第一次空闲处理。如果是第一次空闲处理,并且消息队列为空或者下一条消息的时间还未到达,则获取待运行的空闲处理器的数量。

    如果待运行的空闲处理器数量小于等于0,则将mBlocked标志设置为true,继续下一次循环。

    如果待运行的空闲处理器数量大于0,则将空闲处理器列表转换为数组,并存储在mPendingIdleHandlers中。

    接下来,运行空闲处理器。在第一次迭代中,会遍历所有待运行的空闲处理器,并调用它们的queueIdle()方法。如果queueIdle()方法返回false,则表示该空闲处理器不再需要运行,需要将其从空闲处理器列表中移除。

    最后,将pendingIdleHandlerCount重置为0,以确保下一次循环不再运行空闲处理器。然后,将nextPollTimeoutMillis的值设置为0,以便立即进行下一次轮询。

    总结起来,这段代码的作用是从消息队列中获取下一条消息,并返回该消息对象。它会根据消息的准备状态和时间戳来确定是否需要等待,同时还会处理空闲处理器的运行。这样,消息循环可以不断地处理消息,并在空闲时运行空闲处理器。

    Java层投递Message
    enqueueMessage()方法用于将消息添加到消息队列中。
    frameworks/base/core/java/android/os/MessageQueue.java

        boolean enqueueMessage(Message msg, long when) {
            if (msg.target == null) {
                throw new IllegalArgumentException("Message must have a target.");
            }
    
            synchronized (this) {
                if (msg.isInUse()) {
                    throw new IllegalStateException(msg + " This message is already in use.");
                }
    
                if (mQuitting) {
                    IllegalStateException e = new IllegalStateException(
                            msg.target + " sending message to a Handler on a dead thread");
                    Log.w(TAG, e.getMessage(), e);
                    msg.recycle();
                    return false;
                }
    
                msg.markInUse();
                msg.when = when;
                Message p = mMessages;
                boolean needWake;
                if (p == null || when == 0 || when < p.when) {
                    // New head, wake up the event queue if blocked.
                    msg.next = p;
                    mMessages = msg;
                    needWake = mBlocked;
                } else {
                    // Inserted within the middle of the queue.  Usually we don't have to wake
                    // up the event queue unless there is a barrier at the head of the queue
                    // and the message is the earliest asynchronous message in the queue.
                    needWake = mBlocked && p.target == null && msg.isAsynchronous();
                    Message prev;
                    for (;;) {
                        prev = p;
                        p = p.next;
                        if (p == null || when < p.when) {
                            break;
                        }
                        if (needWake && p.isAsynchronous()) {
                            needWake = false;
                        }
                    }
                    msg.next = p; // invariant: p == prev.next
                    prev.next = msg;
                }
    
                // We can assume mPtr != 0 because mQuitting is false.
                if (needWake) {
                    nativeWake(mPtr);
                }
            }
            return true;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54

    首先,方法会检查消息的target是否为null,如果是,则抛出IllegalArgumentException异常,表示消息必须具有目标。

    接下来,使用synchronized关键字对this进行同步,以确保多线程环境下的安全访问。

    在同步块中,首先检查消息是否已经在使用中,如果是,则抛出IllegalStateException异常,表示该消息已经在使用中。

    然后,检查mQuitting标志,如果为true,表示消息循环已经退出,需要进行清理操作。在这种情况下,会抛出IllegalStateException异常,并将消息回收后返回false。

    接下来,将消息标记为正在使用,并设置消息的时间戳为when。

    然后,获取消息队列中的第一条消息p。

    接下来,根据消息的时间戳when和当前消息队列中的消息的时间戳进行比较,确定消息的插入位置。

    如果消息队列为空,或者when为0,或者when小于第一条消息的时间戳,则将消息作为新的头部插入到消息队列中,并将needWake标志设置为mBlocked的值。

    如果消息需要插入到队列的中间位置,通常情况下不需要唤醒事件队列,除非队列头部有一个障碍(barrier)并且消息是队列中最早的异步消息。在这种情况下,将needWake标志设置为mBlocked的值,并遍历消息队列,找到合适的插入位置。

    最后,将消息插入到队列中,并更新前后消息的关联关系。

    最后,如果需要唤醒事件队列,则调用nativeWake()方法唤醒事件队列。

    总结起来,这段代码的作用是将消息添加到消息队列中。它会根据消息的时间戳和队列中已有消息的时间戳来确定插入位置,并根据需要唤醒事件队列。这样,消息循环可以按照一定的顺序处理消息,并在需要时唤醒事件队列。

    nativeWake
    nativeWake是一个native函数,用于唤醒队列
    frameworks/base/core/jni/android_os_MessageQueue.cpp

    static void android_os_MessageQueue_nativeWake(JNIEnv* env, jclass clazz, jlong ptr) {
        NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
        nativeMessageQueue->wake();
    }
    
    • 1
    • 2
    • 3
    • 4

    通过reinterpret_cast获取NativeMessageQueue对象,然后调用wake函数,wake函数调用Looper的wake

    void NativeMessageQueue::wake() {
        mLooper->wake();
    }
    
    • 1
    • 2
    • 3

    wake
    system/core/libutils/Looper.cpp

    void Looper::wake() {
    #if DEBUG_POLL_AND_WAKE
        ALOGD("%p ~ wake", this);
    #endif
    
        uint64_t inc = 1;
        ssize_t nWrite = TEMP_FAILURE_RETRY(write(mWakeEventFd.get(), &inc, sizeof(uint64_t)));
        if (nWrite != sizeof(uint64_t)) {
            if (errno != EAGAIN) {
                LOG_ALWAYS_FATAL("Could not write wake signal to fd %d (returned %zd): %s",
                                 mWakeEventFd.get(), nWrite, strerror(errno));
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    向文件描述符写入一个inc的值来实现唤醒操作

    1.3 nativePollOnce分析

    nativePollOnce()方法用来从底层获取下一条消息,该方法会阻塞当前线程,直到有消息到达或超时
    frameworks/base/core/jni/android_os_MessageQueue.cpp

    static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj,
            jlong ptr, jint timeoutMillis) {
        NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
        nativeMessageQueue->pollOnce(env, obj, timeoutMillis);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    调用了pollOnce方法
    frameworks/base/core/jni/android_os_MessageQueue.cpp

    void NativeMessageQueue::pollOnce(JNIEnv* env, jobject pollObj, int timeoutMillis) {
        mPollEnv = env;
        mPollObj = pollObj;
        mLooper->pollOnce(timeoutMillis);
        mPollObj = NULL;
        mPollEnv = NULL;
    
        if (mExceptionObj) {
            env->Throw(mExceptionObj);
            env->DeleteLocalRef(mExceptionObj);
            mExceptionObj = NULL;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    调用了Looper中的pollOnce方法
    system/core/libutils/Looper.cpp

    int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
        int result = 0;
        for (;;) {
            while (mResponseIndex < mResponses.size()) {
                const Response& response = mResponses.itemAt(mResponseIndex++);
                int ident = response.request.ident;
                if (ident >= 0) {
                    int fd = response.request.fd;
                    int events = response.events;
                    void* data = response.request.data;
    #if DEBUG_POLL_AND_WAKE
                    ALOGD("%p ~ pollOnce - returning signalled identifier %d: "
                            "fd=%d, events=0x%x, data=%p",
                            this, ident, fd, events, data);
    #endif
                    if (outFd != nullptr) *outFd = fd;
                    if (outEvents != nullptr) *outEvents = events;
                    if (outData != nullptr) *outData = data;
                    return ident;
                }
            }
    
            if (result != 0) {
    #if DEBUG_POLL_AND_WAKE
                ALOGD("%p ~ pollOnce - returning result %d", this, result);
    #endif
                if (outFd != nullptr) *outFd = 0;
                if (outEvents != nullptr) *outEvents = 0;
                if (outData != nullptr) *outData = nullptr;
                return result;
            }
    
            result = pollInner(timeoutMillis);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    首先,定义了一个int类型的变量result,并将其初始化为0。

    然后,使用一个无限循环来进行轮询操作。在每次循环中,首先通过while循环遍历mResponses中的响应对象。

    在遍历过程中,获取当前响应对象response,并从中提取出标识符ident、文件描述符fd、事件events和数据data。

    如果标识符ident大于等于0,则表示该响应对象是一个有效的事件。在这种情况下,将文件描述符、事件和数据分别赋值给outFd、outEvents和outData指向的变量,并返回标识符ident。

    如果遍历完所有响应对象后仍未找到有效的事件,则检查result的值。如果result不等于0,则表示在之前的轮询操作中发生了错误,直接返回result。

    如果以上两种情况都不满足,则调用pollInner()方法进行实际的轮询操作,并将返回的结果赋值给result。

    总结起来,这段代码的作用是在事件循环中进行一次轮询操作,等待并处理事件。它通过遍历响应对象来查找有效的事件,并将相关信息返回。如果没有找到有效的事件,则返回之前的轮询结果。

    int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData)
    
    • 1

    该方法接受四个参数:
    timeoutMillis:超时等待事件,如果为-1,表示无线等到,直到有事件发生。为0表示无需等待立即返回
    outFd:发生事件的那个文件描述符
    outEvents:在该文件描述符上发生了哪些事件,包括可读、可写、错误和终端四个事件。四个事件都是从epoll转化而来
    outData:存储上下文数据,由用户在添加监听句柄时传递,用于传递用户自定义的数据

    该方法的返回值也有特殊意义:
    返回值为ALOOPER_POLL_WAKE:这次返回由wake函数触发,也就是管道写端的那次写事件触发
    ALOOPER_POLL_TIMEOUT:等待超时
    ALOOPER_POLL_ERROR:等待过程中发生错误
    ALOOPER_POLL_CALLBACK:表示某个被监听的句柄因某种原因被触发
    pollInner
    pollinner方法很长,截取关键部分

    int Looper::pollInner(int timeoutMillis) {
    ...
     // Poll.
        int result = POLL_WAKE;
        mResponses.clear();
        mResponseIndex = 0;
    
        // We are about to idle.
        mPolling = true;
    
        struct epoll_event eventItems[EPOLL_MAX_EVENTS];
        int eventCount = epoll_wait(mEpollFd.get(), eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
    
        // No longer idling.
        mPolling = false;
    
        // Acquire lock.
        mLock.lock();
    
        // Rebuild epoll set if needed.
        if (mEpollRebuildRequired) {
            mEpollRebuildRequired = false;
            rebuildEpollLocked();
            goto Done;
        }
    
        // Check for poll error.
        if (eventCount < 0) {
            if (errno == EINTR) {
                goto Done;
            }
            ALOGW("Poll failed with an unexpected error: %s", strerror(errno));
            result = POLL_ERROR;
            goto Done;
        }
    
        // Check for poll timeout.
        if (eventCount == 0) {
    ...
            result = POLL_TIMEOUT;
            goto Done;
        }
    
        // Handle all events.
    #if DEBUG_POLL_AND_WAKE
        ALOGD("%p ~ pollOnce - handling events from %d fds", this, eventCount);
    #endif
    
        for (int i = 0; i < eventCount; i++) {
            int fd = eventItems[i].data.fd;
            uint32_t epollEvents = eventItems[i].events;
            if (fd == mWakeEventFd.get()) {
                if (epollEvents & EPOLLIN) {
                    awoken();
                } else {
                    ALOGW("Ignoring unexpected epoll events 0x%x on wake event fd.", epollEvents);
                }
            } else {
                ssize_t requestIndex = mRequests.indexOfKey(fd);
                if (requestIndex >= 0) {
                    int events = 0;
                    if (epollEvents & EPOLLIN) events |= EVENT_INPUT;
                    if (epollEvents & EPOLLOUT) events |= EVENT_OUTPUT;
                    if (epollEvents & EPOLLERR) events |= EVENT_ERROR;
                    if (epollEvents & EPOLLHUP) events |= EVENT_HANGUP;
                    pushResponse(events, mRequests.valueAt(requestIndex));
                } else {
                    ALOGW("Ignoring unexpected epoll events 0x%x on fd %d that is "
                            "no longer registered.", epollEvents, fd);
                }
            }
        }
    Done: ;
    ...
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75

    在这段代码中,首先调用epoll_wait函数等待事件的发生,然后根据事件的类型进行相应的处理。

    在事件处理过程中,代码会检查是否需要重新构建epoll集合。如果需要重新构建,会调用rebuildEpollLocked方法,并跳转到Done标签所在的位置。

    接下来,代码会检查事件的数量。如果事件数量小于0,表示发生了错误,会输出相应的错误日志,并将结果设置为POLL_ERROR,然后跳转到Done标签。

    如果事件数量为0,表示发生了超时,会将结果设置为POLL_TIMEOUT,然后跳转到Done标签。

    如果事件数量大于0,表示有事件发生,代码会遍历所有的事件,并根据事件的类型进行相应的处理。如果事件是唤醒事件,会调用awoken方法进行处理。如果事件是注册的文件描述符上的事件,会将事件类型和相关的请求信息添加到响应队列中。

    最后,代码会跳转到Done标签所在的位置,继续执行标签后面的代码。
    根据事件类型进行相应的处理后,就该处理事件了,下面是Done之后的代码

    int Looper::pollInner(int timeoutMillis) {
    ...
        // Invoke pending message callbacks.
        mNextMessageUptime = LLONG_MAX;
        while (mMessageEnvelopes.size() != 0) {
            nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
            const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0);
            if (messageEnvelope.uptime <= now) {
                // Remove the envelope from the list.
                // We keep a strong reference to the handler until the call to handleMessage
                // finishes.  Then we drop it so that the handler can be deleted *before*
                // we reacquire our lock.
                { // obtain handler
                    sp<MessageHandler> handler = messageEnvelope.handler;
                    Message message = messageEnvelope.message;
                    mMessageEnvelopes.removeAt(0);
                    mSendingMessage = true;
                    mLock.unlock();
    
    #if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS
                    ALOGD("%p ~ pollOnce - sending message: handler=%p, what=%d",
                            this, handler.get(), message.what);
    #endif
                    handler->handleMessage(message);
                } // release handler
    
                mLock.lock();
                mSendingMessage = false;
                result = POLL_CALLBACK;
            } else {
                // The last message left at the head of the queue determines the next wakeup time.
                mNextMessageUptime = messageEnvelope.uptime;
                break;
            }
        }
    
        // Release lock.
        mLock.unlock();
    
        // Invoke all response callbacks.
        for (size_t i = 0; i < mResponses.size(); i++) {
            Response& response = mResponses.editItemAt(i);
            if (response.request.ident == POLL_CALLBACK) {
                int fd = response.request.fd;
                int events = response.events;
                void* data = response.request.data;
    #if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS
                ALOGD("%p ~ pollOnce - invoking fd event callback %p: fd=%d, events=0x%x, data=%p",
                        this, response.request.callback.get(), fd, events, data);
    #endif
                // Invoke the callback.  Note that the file descriptor may be closed by
                // the callback (and potentially even reused) before the function returns so
                // we need to be a little careful when removing the file descriptor afterwards.
                int callbackResult = response.request.callback->handleEvent(fd, events, data);
                if (callbackResult == 0) {
                    removeFd(fd, response.request.seq);
                }
    
                // Clear the callback reference in the response structure promptly because we
                // will not clear the response vector itself until the next poll.
                response.request.callback.clear();
                result = POLL_CALLBACK;
            }
        }
        return result;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66

    在这段代码中,首先会设置mNextMessageUptime为一个很大的值,以确保在没有新消息到达时不会触发下一次唤醒。
    然后,代码进入一个循环,遍历消息队列中的消息。对于每个消息,代码会检查消息的到达时间是否小于等于当前时间。如果是,表示该消息已经到达,需要处理该消息。

    在处理消息之前,代码会获取消息的处理器(handler)和消息内容,并从消息队列中移除该消息。然后,代码会释放锁,并调用处理器的handleMessage方法来处理该消息。(处理Native的Message,调用Native Handler的handleMessage处理该Message)

    处理完消息后,代码会重新获取锁,并将mSendingMessage设置为false,表示消息处理完成。然后,将结果设置为POLL_CALLBACK,表示有回调发生。

    如果消息的到达时间大于当前时间,表示还有未到达的消息,代码会将下一次唤醒时间设置为该消息的到达时间,并跳出循环。

    接下来,代码会释放锁,并遍历所有的响应(response)。对于每个响应,代码会检查响应的标识是否为POLL_CALLBACK,如果是,表示需要调用回调函数。

    代码会获取响应中的文件描述符(fd)、事件(events)和数据(data),然后调用回调函数的handleEvent方法进行处理。如果回调函数返回值为0,表示需要移除该文件描述符。

    最后,代码会清除响应结构中的回调引用,并将结果设置为POLL_CALLBACK。然后,返回结果。

    总的来说,这段代码实现了处理消息和响应的逻辑。它会遍历消息队列中的消息,处理已到达的消息,并根据响应的标识调用相应的回调函数。处理完消息和响应后,返回相应的结果。
    添加监控请求
    添加监控请求就是调用epoll_ctl添加文件句柄,以Nativce的Activity的代码来分析mRequests
    frameworks/base/core/jni/android_app_NativeActivity.cpp

    loadNativeCode_native(JNIEnv* env, jobject clazz, jstring path, jstring funcName,
            jobject messageQueue, jstring internalDataDir, jstring obbDir,
            jstring externalDataDir, jint sdkVersion, jobject jAssetMgr,
            jbyteArray savedState, jobject classLoader, jstring libraryPath){
    ...        
            
        code->messageQueue->getLooper()->addFd(
                code->mainWorkRead, 0, ALOOPER_EVENT_INPUT, mainWorkCallback, code.get());     
    ...     
            
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    调用Looper的addFd函数,第一个参数表示监听的fd,第二个参数0表示ident,第三个参数表示需要监听的事件,这里只监听可读事件,第四个参数为回调函数,当该fd发生指定事件,looper会回调该函数,第五个参数为回调函数的参数

    system/core/libutils/Looper.cpp

    int Looper::addFd(int fd, int ident, int events, const sp<LooperCallback>& callback, void* data) {
    #if DEBUG_CALLBACKS
        ALOGD("%p ~ addFd - fd=%d, ident=%d, events=0x%x, callback=%p, data=%p", this, fd, ident,
                events, callback.get(), data);
    #endif
    
        if (!callback.get()) {
            if (! mAllowNonCallbacks) {
                ALOGE("Invalid attempt to set NULL callback but not allowed for this looper.");
                return -1;
            }
    
            if (ident < 0) {
                ALOGE("Invalid attempt to set NULL callback with ident < 0.");
                return -1;
            }
        } else {
            ident = POLL_CALLBACK;
        }
    
        { // acquire lock
            AutoMutex _l(mLock);
    
            Request request;
            request.fd = fd;
            request.ident = ident;
            request.events = events;
            request.seq = mNextRequestSeq++;
            request.callback = callback;
            request.data = data;
            if (mNextRequestSeq == -1) mNextRequestSeq = 0; // reserve sequence number -1
    
            struct epoll_event eventItem;
            request.initEventItem(&eventItem);
    
            ssize_t requestIndex = mRequests.indexOfKey(fd);
            if (requestIndex < 0) {
                int epollResult = epoll_ctl(mEpollFd.get(), EPOLL_CTL_ADD, fd, &eventItem);
                if (epollResult < 0) {
                    ALOGE("Error adding epoll events for fd %d: %s", fd, strerror(errno));
                    return -1;
                }
                mRequests.add(fd, request);
            } else {
                int epollResult = epoll_ctl(mEpollFd.get(), EPOLL_CTL_MOD, fd, &eventItem);
                if (epollResult < 0) {
                    if (errno == ENOENT) {
                        // Tolerate ENOENT because it means that an older file descriptor was
                        // closed before its callback was unregistered and meanwhile a new
                        // file descriptor with the same number has been created and is now
                        // being registered for the first time.  This error may occur naturally
                        // when a callback has the side-effect of closing the file descriptor
                        // before returning and unregistering itself.  Callback sequence number
                        // checks further ensure that the race is benign.
                        //
                        // Unfortunately due to kernel limitations we need to rebuild the epoll
                        // set from scratch because it may contain an old file handle that we are
                        // now unable to remove since its file descriptor is no longer valid.
                        // No such problem would have occurred if we were using the poll system
                        // call instead, but that approach carries others disadvantages.
    #if DEBUG_CALLBACKS
                        ALOGD("%p ~ addFd - EPOLL_CTL_MOD failed due to file descriptor "
                                "being recycled, falling back on EPOLL_CTL_ADD: %s",
                                this, strerror(errno));
    #endif
                        epollResult = epoll_ctl(mEpollFd.get(), EPOLL_CTL_ADD, fd, &eventItem);
                        if (epollResult < 0) {
                            ALOGE("Error modifying or adding epoll events for fd %d: %s",
                                    fd, strerror(errno));
                            return -1;
                        }
                        scheduleEpollRebuildLocked();
                    } else {
                        ALOGE("Error modifying epoll events for fd %d: %s", fd, strerror(errno));
                        return -1;
                    }
                }
                mRequests.replaceValueAt(requestIndex, request);
            }
        } // release lock
        return 1;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82

    addFd方法接受一个sp对象作为回调函数。在这段代码中,首先会检查回调函数是否为空。如果为空,会根据mAllowNonCallbacks的值来判断是否允许设置空回调函数。如果不允许,则会返回错误。如果允许,并且ident小于0,则会返回错误。

    如果回调函数不为空,则会将ident设置为POLL_CALLBACK,表示该文件描述符是一个回调文件描述符。

    然后,代码会获取锁,并创建一个Request对象,设置相应的属性。接着,代码会初始化一个epoll_event结构体,并调用epoll_ctl函数将文件描述符添加到epoll事件集合中。

    如果添加成功,则会将Request对象添加到mRequests集合中。如果添加失败,则会根据错误类型进行处理。如果错误类型是ENOENT,表示文件描述符在移除之前已经关闭,并且一个新的文件描述符与相同的编号已经创建并且正在首次注册。在这种情况下,代码会重新将文件描述符添加到epoll事件集合中,并调用scheduleEpollRebuildLocked方法重新构建epoll事件集合。如果是其他错误类型,则表示添加失败,会返回错误。

    最后,代码会释放锁,并返回成功。

    总的来说,这段代码实现了向Looper对象添加文件描述符和回调函数的逻辑。它会检查回调函数是否为空,并根据情况进行处理。在添加文件描述符时,会将文件描述符添加到epoll事件集合中,并将相应的信息保存到mRequests集合中。如果添加失败,则会根据错误类型进行处理。
    处理监控请求
    在pollInner中,当某个监控fd发生事件后,会调用对应的Request,也就是pollInner调用的pushResponse方法
    system/core/libutils/Looper.cpp

    void Looper::pushResponse(int events, const Request& request) {
        Response response;
        response.events = events;
        response.request = request;
        mResponses.push(response);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    该方法将一个Response添加到响应队列中,用于后续处理
    poolInner不是单独处理request,而是先收集request,等到NativeMessage消息处理完后再处理。这表明在处理逻辑上,NativeMessage的优先级高于监控fd的优先级

    添加Native的Message
    system/core/libutils/Looper.cpp

    void Looper::sendMessage(const sp<MessageHandler>& handler, const Message& message) {
        nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
        sendMessageAtTime(now, handler, message);
    }
    void Looper::sendMessageAtTime(nsecs_t uptime, const sp<MessageHandler>& handler,
            const Message& message) {
    #if DEBUG_CALLBACKS
        ALOGD("%p ~ sendMessageAtTime - uptime=%" PRId64 ", handler=%p, what=%d",
                this, uptime, handler.get(), message.what);
    #endif
    
        size_t i = 0;
        { // acquire lock
            AutoMutex _l(mLock);
    
            size_t messageCount = mMessageEnvelopes.size();
            while (i < messageCount && uptime >= mMessageEnvelopes.itemAt(i).uptime) {
                i += 1;
            }
    
            MessageEnvelope messageEnvelope(uptime, handler, message);
            mMessageEnvelopes.insertAt(messageEnvelope, i, 1);
    
            // Optimization: If the Looper is currently sending a message, then we can skip
            // the call to wake() because the next thing the Looper will do after processing
            // messages is to decide when the next wakeup time should be.  In fact, it does
            // not even matter whether this code is running on the Looper thread.
            if (mSendingMessage) {
                return;
            }
        } // release lock
    
        // Wake the poll loop only when we enqueue a new message at the head.
        if (i == 0) {
            wake();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37

    sendMessageAtTime方法用于向消息队列中添加一个延迟发送的消息。在这段代码中,首先会打印调试信息(如果定义了DEBUG_CALLBACKS宏),包括uptime、handler和message.what的值。

    然后,代码会获取锁,并根据uptime的值找到消息队列中合适的位置插入新的消息。具体地,代码会遍历消息队列,找到第一个uptime大于等于给定uptime的位置,并将新的消息插入到该位置。

    在插入消息后,代码会进行一个优化判断。如果当前Looper正在发送消息(mSendingMessage为true),则可以跳过调用wake()方法唤醒轮询循环,因为在处理完消息后,Looper会决定下一次唤醒的时间。实际上,这段代码是否在Looper线程上运行并不重要。

    最后,如果新插入的消息是队列中的第一个消息(即i == 0),则调用wake()方法唤醒轮询循环。

    总的来说,这段代码实现了向消息队列中添加延迟发送的消息的逻辑。它会根据给定的uptime值找到合适的位置插入消息,并在必要时唤醒轮询循环。

    1.4 MessageQueue总结

    在这里插入图片描述

    · Java层提供了Looper类和MessageQueue类,其中Looper类提供循环处理消息的机制,MessageQueue类提供一个消息队列,以及插入、删除和提取消息的函数接口。另外,Handler也是在Java层常用的与消息处理相关的类。

    · MessageQueue内部通过mPtr变量保存一个Native层的NativeMessageQueue对象,mMessages保存来自Java层的Message消息。

    · NativeMessageQueue保存一个native的Looper对象,该Looper从ALooper派生,提供pollOnce和addFd等函数。

    · Java层有Message类和Handler类,而Native层对应也有Message类和MessageHandler抽象类。在编码时,一般使用的是MessageHandler的派生类WeakMessageHandler类。

    MessageQueue处理流程总结
    MessageQueue核心逻辑下移到Native层后,极大地拓展了消息处理的范围,总结一下有以下几点:
    1.MessageQueue继续支持来自Java层的Message消息,也就是早期的Message加Handler的处理方式。

    2.MessageQueue在Native层的代表NativeMessageQueue支持来自Native层的Message,是通过Native的Message和MessageHandler来处理的。

    3.NativeMessageQueue还处理通过addFd添加的Request。在后面分析输入系统时,还会大量碰到这种方式。

    4.从处理逻辑上看,先是Native的Message,然后是Native的Request,最后才是Java的Message。

  • 相关阅读:
    蓝桥杯第三周算法竞赛D题&&E题
    关于#python#的问题:openssl3.0.7,并且我python编译时指定到了openssl的,request等都不行提示我没有ssl模块
    2009-2022年上市公司华证 ESG数据
    “购物返现积分兑换”——区块链思维的购物返利方式
    微信扫一扫抽奖活动怎么做
    SQLite DISTINCT 关键字
    x265参数-early-skip
    人民网_领导留言板data2022年-2023年
    七夕趣味玩法,用 MMGeneration 生成心仪的 TA
    Android--碎片
  • 原文地址:https://blog.csdn.net/qq_56892136/article/details/134060565